From 629b69b4d606d0402fd58a3ecc427c6548ee6227 Mon Sep 17 00:00:00 2001 From: Michael Meeks Date: Sat, 20 Apr 2019 01:53:12 +0100 Subject: PNG compression - thread it. Change-Id: I238fe6701a1d1cb486473c67faba8c56e9c98dcb --- kit/Kit.cpp | 183 +++++++++++++++++++++++++++++++++++++++++++++---------- wsd/LOOLWSD.cpp | 1 - wsd/TileDesc.hpp | 18 ++++-- 3 files changed, 163 insertions(+), 39 deletions(-) diff --git a/kit/Kit.cpp b/kit/Kit.cpp index 25050e04b..6b66449b4 100644 --- a/kit/Kit.cpp +++ b/kit/Kit.cpp @@ -737,6 +737,101 @@ private: static FILE* ProcSMapsFile = nullptr; #endif +class ThreadPool { + std::mutex _mutex; + std::condition_variable _cond; + std::condition_variable _complete; + typedef std::function ThreadFn; + std::queue _work; + std::vector _threads; + size_t _working; + std::atomic _exit; +public: + ThreadPool() + : _working(0), + _exit(false) + { + int maxConcurrency = 2; +#if MOBILEAPP +# warning "Good defaults ? - 2 for iOS, 4 for Android ?" +#else + const char *max = getenv("MAX_CONCURRENCY"); + if (max) + maxConcurrency = atoi(max); +#endif + LOG_TRC("PNG compression thread pool size " << maxConcurrency); + for (int i = 1; i < maxConcurrency; ++i) + _threads.push_back(std::thread(&ThreadPool::work, this)); + } + ~ThreadPool() + { + assert(_working == 0); + _exit = true; + _cond.notify_all(); + for (auto &it : _threads) + it.join(); + } + + size_t count() const + { + return _work.size(); + } + + void pushWorkUnlocked(const ThreadFn &fn) + { + _work.push(fn); + } + + void runOne(std::unique_lock< std::mutex >& lock) + { + assert(!_work.empty()); + + ThreadFn fn = _work.front(); + _work.pop(); + _working++; + lock.unlock(); + + try { + fn(); + } catch (...) + { + LOG_ERR("Exception during worker fn execution"); + } + + lock.lock(); + _working--; + if (_work.empty() && _working == 0) + _complete.notify_all(); + } + + void run() + { + std::unique_lock< std::mutex > lock(_mutex); + assert(_working == 0); + + // Avoid notifying threads if we don't need to. + bool useThreads = _threads.size() > 1 && _work.size() > 1; + if (useThreads) + _cond.notify_all(); + + while(!_work.empty()) + runOne(lock); + + if (useThreads && (_working > 0 || !_work.empty())) + _complete.wait(lock, [this]() { return _working == 0 && _work.empty(); } ); + } + + void work() + { + std::unique_lock< std::mutex > lock(_mutex); + while (!_exit) + { + _cond.wait(lock, [this]() { return !_work.empty(); }); + runOne(lock); + } + } +}; + /// A document container. /// Owns LOKitDocument instance and connections. /// Manages the lifetime of a document. @@ -948,6 +1043,14 @@ public: renderTiles(tileCombined, true); } + static void pushRendered(std::vector &renderedTiles, + const TileDesc &desc, TileWireId wireId, size_t imgSize) + { + renderedTiles.push_back(desc); + renderedTiles.back().setWireId(wireId); + renderedTiles.back().setImgSize(imgSize); + } + void renderTiles(TileCombined &tileCombined, bool combined) { auto& tiles = tileCombined.getTiles(); @@ -1016,6 +1119,9 @@ public: const int pixelWidth = tileCombined.getWidth(); const int pixelHeight = tileCombined.getHeight(); + static ThreadPool pool; + + std::vector renderedTiles; size_t tileIndex = 0; for (Util::Rectangle& tileRect : tileRecs) { @@ -1035,13 +1141,15 @@ public: // The tile content is identical to what the client already has, so skip it LOG_TRC("Match for tile #" << tileIndex << " at (" << positionX << "," << positionY << ") oldhash==hash (" << hash << "), wireId: " << wireId << " skipping"); - tiles.erase(tiles.begin() + tileIndex); + tileIndex++; continue; } - size_t imgSize; + size_t imgSize = -1; + if (_pngCache.copyFromCache(hash, output, imgSize)) + pushRendered(renderedTiles, tiles[tileIndex], wireId, imgSize); - if (!_pngCache.copyFromCache(hash, output, imgSize)) + else { LOG_DBG("PNG cache with hash " << hash << " missed."); @@ -1051,43 +1159,52 @@ public: pixelWidth, pixelHeight, mode); - PngCache::CacheData data(new std::vector< char >() ); - data->reserve(pixmapWidth * pixmapHeight * 1); + // Queue to be executed inside 'run' + pool.pushWorkUnlocked([=,&output,&pixmap,&tiles,&renderedTiles](){ + PngCache::CacheData data(new std::vector< char >() ); + data->reserve(pixmapWidth * pixmapHeight * 1); + + /* + * Disable for now - pushed in error. + * + if (_deltaGen.createDelta(pixmap, startX, startY, width, height, + bufferWidth, bufferHeight, + output, wid, oldWid)) + else ... + */ + + LOG_DBG("Encode a new png for tile #" << tileIndex); + if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight, + pixmapWidth, pixmapHeight, *data, mode)) + { + // FIXME: Return error. + // sendTextFrame("error: cmd=tile kind=failure"); + LOG_ERR("Failed to encode tile into PNG."); + return; + } -/* - *Disable for now - pushed in error. - * - if (_deltaGen.createDelta(pixmap, startX, startY, width, height, - bufferWidth, bufferHeight, - output, wid, oldWid)) - else ... -*/ - - LOG_DBG("Encode a new png for this tile."); - if (!Png::encodeSubBufferToPNG(pixmap.data(), offsetX, offsetY, pixelWidth, pixelHeight, - pixmapWidth, pixmapHeight, *data, mode)) - { - // FIXME: Return error. - // sendTextFrame("error: cmd=tile kind=failure"); - LOG_ERR("Failed to encode tile into PNG."); - return; - } + LOG_DBG("Tile " << tileIndex << " is " << data->size() << " bytes."); + std::unique_lock pngLock(_pngMutex); - output.insert(output.end(), data->begin(), data->end()); - imgSize = data->size(); - _pngCache.addToCache(data, wireId, hash); + output.insert(output.end(), data->begin(), data->end()); + _pngCache.addToCache(data, wireId, hash); + pushRendered(renderedTiles, tiles[tileIndex], wireId, data->size()); + }); } - LOG_TRC("Encoded tile #" << tileIndex << " at (" << positionX << "," << positionY << ") with oldWireId=" << tiles[tileIndex].getOldWireId() << ", hash=" << hash << " wireId: " << wireId << " in " << imgSize << " bytes."); - if (imgSize == 0) + tileIndex++; + } + + pool.run(); + + for (auto &i : renderedTiles) + { + if (i.getImgSize() == 0) { LOG_ERR("Encoded 0-sized tile!"); assert(!"0-sized tile enocded!"); } - tiles[tileIndex].setWireId(wireId); - tiles[tileIndex].setImgSize(imgSize); - tileIndex++; } elapsed = timestamp.elapsed(); @@ -1103,7 +1220,7 @@ public: std::string tileMsg; if (combined) - tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID); + tileMsg = tileCombined.serialize("tilecombine:", ADD_DEBUG_RENDERID, renderedTiles); else tileMsg = tiles[0].serialize("tile:", ADD_DEBUG_RENDERID); @@ -1996,6 +2113,8 @@ private: std::shared_ptr _tileQueue; SocketPoll& _socketPoll; std::shared_ptr _websocketHandler; + + std::mutex _pngMutex; PngCache _pngCache; // Document password provided diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 57ddfd705..116e7c560 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -89,7 +89,6 @@ using Poco::Net::PartHandler; #include #include #include -#include #include #include #include diff --git a/wsd/TileDesc.hpp b/wsd/TileDesc.hpp index 77fb46b26..d30b718da 100644 --- a/wsd/TileDesc.hpp +++ b/wsd/TileDesc.hpp @@ -356,6 +356,12 @@ public: /// Optionally prepend a prefix. std::string serialize(const std::string& prefix = std::string(), const std::string& suffix = std::string()) const + { + return serialize(prefix, suffix, _tiles); + } + + std::string serialize(const std::string& prefix, const std::string &suffix, + const std::vector &tiles) const { std::ostringstream oss; oss << prefix @@ -363,21 +369,21 @@ public: << " width=" << _width << " height=" << _height << " tileposx="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getTilePosX() << ','; } oss.seekp(-1, std::ios_base::cur); // Seek back over last comma, overwritten below. oss << " tileposy="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getTilePosY() << ','; } oss.seekp(-1, std::ios_base::cur); // Ditto. oss << " imgsize="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getImgSize() << ','; // Ditto. } @@ -387,14 +393,14 @@ public: << " tileheight=" << _tileHeight; oss << " ver="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getVersion() << ','; } oss.seekp(-1, std::ios_base::cur); // Ditto. oss << " oldwid="; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { oss << tile.getOldWireId() << ','; } @@ -403,7 +409,7 @@ public: oss << " wid="; bool comma = false; - for (const auto& tile : _tiles) + for (const auto& tile : tiles) { if (comma) oss << ','; -- cgit v1.2.3