summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Meeks <michael.meeks@collabora.com>2019-04-20 01:53:12 +0100
committerMichael Meeks <michael.meeks@collabora.com>2019-04-26 19:59:31 +0100
commit629b69b4d606d0402fd58a3ecc427c6548ee6227 (patch)
tree89354bfc9b9637d041c87c70c5a51ebd1adcc25f
parent92562991bd5f8dcd821d9ed7e2d9003d2eac732a (diff)
PNG compression - thread it.private/mmeeks/thread-png
Change-Id: I238fe6701a1d1cb486473c67faba8c56e9c98dcb
-rw-r--r--kit/Kit.cpp183
-rw-r--r--wsd/LOOLWSD.cpp1
-rw-r--r--wsd/TileDesc.hpp18
3 files changed, 163 insertions, 39 deletions
diff --git a/kit/Kit.cpp b/kit/Kit.cpp
index 25050e04b..6b66449b4 100644
--- a/kit/Kit.cpp
+++ b/kit/Kit.cpp
@@ -737,6 +737,101 @@ private:
static FILE* ProcSMapsFile = nullptr;
#endif
+class ThreadPool {
+ std::mutex _mutex;
+ std::condition_variable _cond;
+ std::condition_variable _complete;
+ typedef std::function<void()> ThreadFn;
+ std::queue<ThreadFn> _work;
+ std::vector<std::thread> _threads;
+ size_t _working;
+ std::atomic<bool> _exit;
+public:
+ ThreadPool()
+ : _working(0),
+ _exit(false)
+ {
+ int maxConcurrency = 2;
+#if MOBILEAPP
+# warning "Good defaults ? - 2 for iOS, 4 for Android ?"
+#else
+ const char *max = getenv("MAX_CONCURRENCY");
+ if (max)
+ maxConcurrency = atoi(max);
+#endif
+ LOG_TRC("PNG compression thread pool size " << maxConcurrency);
+ for (int i = 1; i < maxConcurrency; ++i)
+ _threads.push_back(std::thread(&ThreadPool::work, this));
+ }
+ ~ThreadPool()
+ {
+ assert(_working == 0);
+ _exit = true;
+ _cond.notify_all();
+ for (auto &it : _threads)
+ it.join();
+ }
+
+ size_t count() const
+ {
+ return _work.size();
+ }
+
+ void pushWorkUnlocked(const ThreadFn &fn)
+ {
+ _work.push(fn);
+ }
+
+ void runOne(std::unique_lock< std::mutex >& lock)
+ {
+ assert(!_work.empty());
+
+ ThreadFn fn = _work.front();
+ _work.pop();
+ _working++;
+ lock.unlock();
+
+ try {
+ fn();
+ } catch (...)
+ {
+ LOG_ERR("Exception during worker fn execution");
+ }
+
+ lock.lock();
+ _working--;
+ if (_work.empty() && _working == 0)
+ _complete.notify_all();
+ }
+
+ void run()
+ {
+ std::unique_lock< std::mutex > lock(_mutex);
+ assert(_working == 0);
+
+ // Avoid notifying threads if we don't need to.
+ bool useThreads = _threads.size() > 1 && _work.size() > 1;
+ if (useThreads)
+ _cond.notify_all();
+
+ while(!_work.empty())
+ runOne(lock);
+
+ if (useThreads && (_working > 0 || !_work.empty()))
+ _complete.wait(lock, [this]() { return _working == 0 && _work.empty(); } );
+ }
+
+ void work()
+ {
+ std::unique_lock< std::mutex > lock(_mutex);
+ while (!_exit)
+ {
+ _cond.wait(lock, [this]() { return !_work.empty(); });
+ runOne(lock);
+ }
+ }
+};
+
/// A document container.
/// Owns LOKitDocument instance and connections.
/// Manages the lifetime of a document.
@@ -948,6 +1043,14 @@ public:
renderTiles(tileCombined, true);
}
+ static void pushRendered(std::vector<TileDesc> &renderedTiles,
+ const TileDesc &desc, TileWireId wireId, size_t imgSize)
+ {
+ renderedTiles.push_back(desc);
+ renderedTiles.back().setWireId(wireId);
+ renderedTiles.back().setImgSize(imgSize);
+ }
+
void renderTiles(TileCombined &tileCombined, bool combined)
{
auto& tiles = tileCombined.getTiles();
@@ -1016,6 +1119,9 @@ public:
const int pixelWidth = tileCombined.getWidth();
const int pixelHeight = tileCombined.getHeight();
+ static ThreadPool pool;
+
+ std::vector<TileDesc> renderedTiles;
size_t tileIndex = 0;
for (Util::Rectangle& tileRect : tileRecs)
{
@@ -1035,13 +1141,15 @@ public:
// The tile content is identical to what the client already has, so skip it
LOG_TRC("Match for tile #" << tileIndex << " at (" << positionX << "," <<
positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping");
- tiles.erase(tiles.begin() + tileIndex);
+ tileIndex++;
continue;
}
- size_t imgSize;
+ size_t imgSize = -1;
+ if (_pngCache.copyFromCache(hash, output, imgSize))
+ pushRendered(renderedTiles, tiles[tileIndex], wireId, imgSize);
- if (!_pngCache.copyFromCache(hash, output, imgSize))
+ else
{
LOG_DBG("PNG cache with hash " << hash << " missed.");
@@ -1051,43 +1159,52 @@ public:
pixelWidth, pixelHeight,
mode);
- PngCache::CacheData data(new std::vector< char >() );
- data->reserve(pixmapWidth * pixmapHeight * 1);
+ // Queue to be executed inside 'run'
+ pool.pushWorkUnlocked([=,&output,&pixmap,&tiles,&renderedTiles](){
+ PngCache::CacheData data(new std::vector< char >() );
+ data->reserve(pixmapWidth * pixmapHeight * 1);
+
+ /*
+ * Disable for now - pushed in error.
+ *
+ if (_deltaGen.createDelta(pixmap, startX, startY, width, height,
+ bufferWidth, bufferHeight,
+ output, wid, oldWid))
+ else ...
+ */
+
+ LOG_DBG("Encode a new png for tile #" << tileIndex);
+ if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight,
+ pixmapWidth, pixmapHeight, *data, mode))
+ {
+ // FIXME: Return error.
+ // sendTextFrame("error: cmd=tile kind=failure");
+ LOG_ERR("Failed to encode tile into PNG.");
+ return;
+ }
-/*
- *Disable for now - pushed in error.
- *
- if (_deltaGen.createDelta(pixmap, startX, startY, width, height,
- bufferWidth, bufferHeight,
- output, wid, oldWid))
- else ...
-*/
-
- LOG_DBG("Encode a new png for this tile.");
- if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight,
- pixmapWidth, pixmapHeight, *data, mode))
- {
- // FIXME: Return error.
- // sendTextFrame("error: cmd=tile kind=failure");
- LOG_ERR("Failed to encode tile into PNG.");
- return;
- }
+ LOG_DBG("Tile " << tileIndex << " is " << data->size() << " bytes.");
+ std::unique_lock<std::mutex> pngLock(_pngMutex);
- output.insert(output.end(), data->begin(), data->end());
- imgSize = data->size();
- _pngCache.addToCache(data, wireId, hash);
+ output.insert(output.end(), data->begin(), data->end());
+ _pngCache.addToCache(data, wireId, hash);
+ pushRendered(renderedTiles, tiles[tileIndex], wireId, data->size());
+ });
}
-
LOG_TRC("Encoded tile #" << tileIndex << " at (" << positionX << "," << positionY << ") with oldWireId=" <<
tiles[tileIndex].getOldWireId() << ", hash=" << hash << " wireId: " << wireId << " in " << imgSize << " bytes.");
- if (imgSize == 0)
+ tileIndex++;
+ }
+
+ pool.run();
+
+ for (auto &i : renderedTiles)
+ {
+ if (i.getImgSize() == 0)
{
LOG_ERR("Encoded 0-sized tile!");
assert(!"0-sized tile enocded!");
}
- tiles[tileIndex].setWireId(wireId);
- tiles[tileIndex].setImgSize(imgSize);
- tileIndex++;
}
elapsed = timestamp.elapsed();
@@ -1103,7 +1220,7 @@ public:
std::string tileMsg;
if (combined)
- tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID);
+ tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID, renderedTiles);
else
tileMsg = tiles[0].serialize("tile:", ADD_DEBUG_RENDERID);
@@ -1996,6 +2113,8 @@ private:
std::shared_ptr<TileQueue> _tileQueue;
SocketPoll& _socketPoll;
std::shared_ptr<WebSocketHandler> _websocketHandler;
+
+ std::mutex _pngMutex;
PngCache _pngCache;
// Document password provided
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 57ddfd705..116e7c560 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -89,7 +89,6 @@ using Poco::Net::PartHandler;
#include <Poco/StreamCopier.h>
#include <Poco/StringTokenizer.h>
#include <Poco/TemporaryFile.h>
-#include <Poco/ThreadPool.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/HelpFormatter.h>
diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp
index 77fb46b26..d30b718da 100644
--- a/wsd/TileDesc.hpp
+++ b/wsd/TileDesc.hpp
@@ -357,27 +357,33 @@ public:
std::string serialize(const std::string& prefix = std::string(),
const std::string& suffix = std::string()) const
{
+ return serialize(prefix, suffix, _tiles);
+ }
+
+ std::string serialize(const std::string& prefix, const std::string &suffix,
+ const std::vector<TileDesc> &tiles) const
+ {
std::ostringstream oss;
oss << prefix
<< " part=" << _part
<< " width=" << _width
<< " height=" << _height
<< " tileposx=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getTilePosX() << ',';
}
oss.seekp(-1, std::ios_base::cur); // Seek back over last comma, overwritten below.
oss << " tileposy=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getTilePosY() << ',';
}
oss.seekp(-1, std::ios_base::cur); // Ditto.
oss << " imgsize=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getImgSize() << ','; // Ditto.
}
@@ -387,14 +393,14 @@ public:
<< " tileheight=" << _tileHeight;
oss << " ver=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getVersion() << ',';
}
oss.seekp(-1, std::ios_base::cur); // Ditto.
oss << " oldwid=";
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
oss << tile.getOldWireId() << ',';
}
@@ -403,7 +409,7 @@ public:
oss << " wid=";
bool comma = false;
- for (const auto& tile : _tiles)
+ for (const auto& tile : tiles)
{
if (comma)
oss << ',';