diff options
author | Michael Meeks <michael.meeks@collabora.com> | 2020-03-04 13:54:04 +0000 |
---|---|---|
committer | Michael Meeks <michael.meeks@collabora.com> | 2020-04-23 13:40:23 +0100 |
commit | 71c7816c518d4bf6315cfcfea2e41e8898d9639a (patch) | |
tree | 4b6e698332224ebc40e13eab4e2d4bf95613ff7f | |
parent | fd3af1db2bfc6650128fc7441da85ed42e8f680b (diff) |
Proxy websocket prototype.
Try to read/write avoiding a websocket.
Change-Id: I382039fa88f1030a63df1e47f687df2ee5a6055b
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | loleaflet/js/global.js | 94 | ||||
-rw-r--r-- | loleaflet/src/core/Socket.js | 2 | ||||
-rw-r--r-- | net/Socket.hpp | 5 | ||||
-rw-r--r-- | wsd/ClientSession.hpp | 3 | ||||
-rw-r--r-- | wsd/DocumentBroker.hpp | 9 | ||||
-rw-r--r-- | wsd/LOOLWSD.cpp | 123 | ||||
-rw-r--r-- | wsd/ProxyProtocol.cpp | 81 | ||||
-rw-r--r-- | wsd/ProxyProtocol.hpp | 97 |
9 files changed, 399 insertions, 17 deletions
diff --git a/Makefile.am b/Makefile.am index dcbfbdcbb..d28300996 100644 --- a/Makefile.am +++ b/Makefile.am @@ -109,6 +109,7 @@ loolwsd_sources = common/Crypto.cpp \ wsd/AdminModel.cpp \ wsd/Auth.cpp \ wsd/DocumentBroker.cpp \ + wsd/ProxyProtocol.cpp \ wsd/LOOLWSD.cpp \ wsd/ClientSession.cpp \ wsd/FileServer.cpp \ @@ -215,6 +216,7 @@ wsd_headers = wsd/Admin.hpp \ wsd/Auth.hpp \ wsd/ClientSession.hpp \ wsd/DocumentBroker.hpp \ + wsd/ProxyProtocol.hpp \ wsd/Exceptions.hpp \ wsd/FileServer.hpp \ wsd/LOOLWSD.hpp \ diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js index a3b1fb49c..477c66a5d 100644 --- a/loleaflet/js/global.js +++ b/loleaflet/js/global.js @@ -185,16 +185,97 @@ }; this.onopen = function() { }; + this.close = function() { + }; }; - - global.FakeWebSocket.prototype.close = function() { - }; - global.FakeWebSocket.prototype.send = function(data) { this.sendCounter++; window.postMobileMessage(data); }; + global.proxySocketCounter = 0; + global.ProxySocket = function (uri) { + this.uri = uri; + this.binaryType = 'arraybuffer'; + this.bufferedAmount = 0; + this.extensions = ''; + this.protocol = ''; + this.connected = true; + this.readyState = 0; // connecting + this.sessionId = 'fetchsession'; + this.id = window.proxySocketCounter++; + this.sendCounter = 0; + this.readWaiting = false; + this.onclose = function() { + }; + this.onerror = function() { + }; + this.onmessage = function() { + }; + this.send = function(msg) { + console.debug('send msg "' + msg + '"'); + var req = new XMLHttpRequest(); + req.open('POST', this.getEndPoint('write')); + req.setRequestHeader('SessionId', this.sessionId); + if (this.sessionId === 'fetchsession') + req.addEventListener('load', function() { + console.debug('got session: ' + this.responseText); + that.sessionId = this.responseText; + that.readyState = 1; + that.onopen(); + }); + req.send(msg); + }, + this.close = function() { + console.debug('close socket'); + this.readyState = 3; + this.onclose(); + }; + this.getEndPoint = function(type) { + var base = this.uri; + return base.replace(/^ws/, 'http') + '/' + type; + }; + console.debug('New proxy socket ' + this.id + ' ' + this.uri); + + // FIXME: perhaps a little risky. + this.send('fetchsession'); + var that = this; + + // horrors ... + this.readInterval = setInterval(function() { + if (this.readWaiting) // one at a time for now + return; + if (this.sessionId == 'fetchsession') + return; // waiting for our session id. + var req = new XMLHttpRequest(); + // fetch session id: + req.addEventListener('load', function() { + console.debug('read: ' + this.responseText); + if (this.status == 200) + { + that.onmessage({ data: this.response }); + } + else + { + console.debug('Handle error ' + this.status); + } + that.readWaiting = false; + }); + req.open('GET', that.getEndPoint('read')); + req.setRequestHeader('SessionId', this.sessionId); + req.send(that.sessionId); + that.readWaiting = true; + }, 250); + }; + + global.createWebSocket = function(uri) { + if (global.socketProxy) { + return new global.ProxySocket(uri); + } else { + return new WebSocket(uri); + } + }; + // If not debug, don't print anything on the console // except in tile debug mode (Ctrl-Shift-Alt-d) console.log2 = console.log; @@ -219,7 +300,8 @@ window.postMobileError(log); } else if (global.socket && (global.socket instanceof WebSocket) && global.socket.readyState === 1) { global.socket.send(log); - } else if (global.socket && (global.socket instanceof global.L.Socket) && global.socket.connected()) { + } else if (global.socket && global.L && global.L.Socket && + (global.socket instanceof global.L.Socket) && global.socket.connected()) { global.socket.sendMessage(log); } else { var req = new XMLHttpRequest(); @@ -296,7 +378,7 @@ var websocketURI = global.host + global.serviceRoot + '/lool/' + encodeURIComponent(global.docURL + (docParams ? '?' + docParams : '')) + '/ws' + wopiSrc; try { - global.socket = new WebSocket(websocketURI); + global.socket = global.createWebSocket(websocketURI); } catch (err) { console.log(err); } diff --git a/loleaflet/src/core/Socket.js b/loleaflet/src/core/Socket.js index a4aa948c5..fdd0b2db6 100644 --- a/loleaflet/src/core/Socket.js +++ b/loleaflet/src/core/Socket.js @@ -49,7 +49,7 @@ L.Socket = L.Class.extend({ } try { - this.socket = new WebSocket(this.getWebSocketBaseURI(map) + wopiSrc); + this.socket = window.createWebSocket(this.getWebSocketBaseURI(map) + wopiSrc); } catch (e) { // On IE 11 there is a limitation on the number of WebSockets open to a single domain (6 by default and can go to 128). // Detect this and hint the user. diff --git a/net/Socket.hpp b/net/Socket.hpp index bc44f208b..7ae042b86 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -445,6 +445,11 @@ public: } } + std::shared_ptr<ProtocolHandlerInterface> getProtocol() const + { + return _protocol; + } + /// Do we have something to send ? virtual bool hasQueuedMessages() const = 0; /// Please send them to me then. diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp index 0f425b117..f2bd63adc 100644 --- a/wsd/ClientSession.hpp +++ b/wsd/ClientSession.hpp @@ -37,9 +37,6 @@ public: void construct(); virtual ~ClientSession(); - /// Lookup any session by id. - static std::shared_ptr<ClientSession> getById(const std::string &id); - void setReadOnly() override; enum SessionState { diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp index 329bece7f..7d7b5e67d 100644 --- a/wsd/DocumentBroker.hpp +++ b/wsd/DocumentBroker.hpp @@ -140,6 +140,15 @@ public: const bool isReadOnly, const std::string& hostNoTrust); + /// Find or create a new client session for the PHP proxy + void handleProxyRequest( + const std::string& sessionId, + const std::string& id, + const Poco::URI& uriPublic, + const bool isReadOnly, + const std::string& hostNoTrust, + const std::shared_ptr<Socket> &moveSocket); + /// Thread safe termination of this broker if it has a lingering thread void joinThread(); diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 379e960fe..7aaa79e0d 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -236,6 +236,9 @@ namespace #if ENABLE_SUPPORT_KEY inline void shutdownLimitReached(const std::shared_ptr<ProtocolHandlerInterface>& proto) { + if (!proto) + return; + const std::string error = Poco::format(PAYLOAD_UNAVAILABLE_LIMIT_REACHED, LOOLWSD::MaxDocuments, LOOLWSD::MaxConnections); LOG_INF("Sending client 'hardlimitreached' message: " << error); @@ -1833,9 +1836,12 @@ static std::shared_ptr<DocumentBroker> if (docBroker->isMarkedToDestroy()) { LOG_WRN("DocBroker with docKey [" << docKey << "] that is marked to be destroyed. Rejecting client request."); - std::string msg("error: cmd=load kind=docunloading"); - proto->sendTextMessage(msg.data(), msg.size()); - proto->shutdown(true, "error: cmd=load kind=docunloading"); + if (proto) + { + std::string msg("error: cmd=load kind=docunloading"); + proto->sendTextMessage(msg.data(), msg.size()); + proto->shutdown(true, "error: cmd=load kind=docunloading"); + } return nullptr; } } @@ -1851,9 +1857,12 @@ static std::shared_ptr<DocumentBroker> } // Indicate to the client that we're connecting to the docbroker. - const std::string statusConnect = "statusindicator: connect"; - LOG_TRC("Sending to Client [" << statusConnect << "]."); - proto->sendTextMessage(statusConnect.data(), statusConnect.size()); + if (proto) + { + const std::string statusConnect = "statusindicator: connect"; + LOG_TRC("Sending to Client [" << statusConnect << "]."); + proto->sendTextMessage(statusConnect.data(), statusConnect.size()); + } if (!docBroker) { @@ -2325,6 +2334,13 @@ private: // Util::dumpHex(std::cerr, "clipboard:\n", "", socket->getInBuffer()); // lots of data ... handleClipboardRequest(request, message, disposition); } + else if (request.has("ProxyPrefix") && reqPathTokens.count() > 2 && + (reqPathTokens[reqPathTokens.count()-2] == "ws")) + { + std::string decodedUri; // WOPISrc + Poco::URI::decode(reqPathTokens[1], decodedUri); + handleClientProxyRequest(request, decodedUri, message, disposition); + } else if (!(request.find("Upgrade") != request.end() && Poco::icompare(request["Upgrade"], "websocket") == 0) && reqPathTokens.count() > 0 && reqPathTokens[0] == "lool") { @@ -2858,6 +2874,99 @@ private: } #endif + void handleClientProxyRequest(const Poco::Net::HTTPRequest& request, + std::string url, + Poco::MemoryInputStream& message, + SocketDisposition &disposition) + { + if (!request.has("SessionId")) + throw BadRequestException("No session id header on proxied request"); + + std::string sessionId = request.get("SessionId"); + + LOG_INF("URL [" << url << "]."); + const auto uriPublic = DocumentBroker::sanitizeURI(url); + LOG_INF("URI [" << uriPublic.getPath() << "]."); + const auto docKey = DocumentBroker::getDocKey(uriPublic); + LOG_INF("DocKey [" << docKey << "]."); + const std::string fileId = Util::getFilenameFromURL(docKey); + Util::mapAnonymized(fileId, fileId); // Identity mapping, since fileId is already obfuscated + + LOG_INF("Starting Proxy request handler for session [" << _id << "] on url [" << LOOLWSD::anonymizeUrl(url) << "]."); + + // Check if readonly session is required + bool isReadOnly = false; + for (const auto& param : uriPublic.getQueryParameters()) + { + LOG_DBG("Query param: " << param.first << ", value: " << param.second); + if (param.first == "permission" && param.second == "readonly") + { + isReadOnly = true; + } + } + + const std::string hostNoTrust = (LOOLWSD::ServerName.empty() ? request.getHost() : LOOLWSD::ServerName); + + LOG_INF("URL [" << LOOLWSD::anonymizeUrl(url) << "] is " << (isReadOnly ? "readonly" : "writable") << "."); + (void)request; (void)message; (void)disposition; + + std::shared_ptr<ProtocolHandlerInterface> none; + // Request a kit process for this doc. + std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker( + none, url, docKey, _id, uriPublic); + if (docBroker) + { + // need to move into the DocumentBroker context before doing session lookup / creation etc. + std::string id = _id; + disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId] + (const std::shared_ptr<Socket> &moveSocket) + { + LOG_TRC("Setting up docbroker thread for " << docBroker->getDocKey()); + // Make sure the thread is running before adding callback. + docBroker->startThread(); + + // We no longer own this socket. + moveSocket->setThreadOwner(std::thread::id()); + + docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, moveSocket]() + { + // Now inside the document broker thread ... + LOG_TRC("In the docbroker thread for " << docBroker->getDocKey()); + + auto streamSocket = std::static_pointer_cast<StreamSocket>(moveSocket); + try + { + docBroker->handleProxyRequest( + sessionId, id, uriPublic, isReadOnly, + hostNoTrust, moveSocket); + return; + } + catch (const UnauthorizedRequestException& exc) + { + LOG_ERR("Unauthorized Request while loading session for " << docBroker->getDocKey() << ": " << exc.what()); + } + catch (const StorageConnectionException& exc) + { + LOG_ERR("Error while loading : " << exc.what()); + } + catch (const std::exception& exc) + { + LOG_ERR("Error while loading : " << exc.what()); + } + // badness occured: + std::ostringstream oss; + oss << "HTTP/1.1 400\r\n" + << "Date: " << Util::getHttpTimeNow() << "\r\n" + << "User-Agent: LOOLWSD WOPI Agent\r\n" + << "Content-Length: 0\r\n" + << "\r\n"; + streamSocket->send(oss.str()); + streamSocket->shutdown(); + }); + }); + } + } + void handleClientWsUpgrade(const Poco::Net::HTTPRequest& request, const std::string& url, SocketDisposition &disposition) { @@ -2920,7 +3029,7 @@ private: // Request a kit process for this doc. std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker( std::static_pointer_cast<ProtocolHandlerInterface>(ws), url, docKey, _id, uriPublic); - if (docBroker) + if (docBroker) { #if MOBILEAPP const std::string hostNoTrust; diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp new file mode 100644 index 000000000..41043a57a --- /dev/null +++ b/wsd/ProxyProtocol.cpp @@ -0,0 +1,81 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include <config.h> + +#include "DocumentBroker.hpp" +#include "ClientSession.hpp" +#include "ProxyProtocol.hpp" +#include "Exceptions.hpp" +#include "LOOLWSD.hpp" +#include <Socket.hpp> + +#include <atomic> +#include <cassert> + +void DocumentBroker::handleProxyRequest( + const std::string& sessionId, + const std::string& id, + const Poco::URI& uriPublic, + const bool isReadOnly, + const std::string& hostNoTrust, + const std::shared_ptr<Socket> &socket) +{ + std::shared_ptr<ClientSession> clientSession; + if (sessionId == "fetchsession") + { + LOG_TRC("Create session for " << _docKey); + clientSession = createNewClientSession( + std::make_shared<ProxyProtocolHandler>(), + id, uriPublic, isReadOnly, hostNoTrust); + addSession(clientSession); + LOOLWSD::checkDiskSpaceAndWarnClients(true); + LOOLWSD::checkSessionLimitsAndWarnClients(); + } + else + { + LOG_TRC("Find session for " << _docKey << " with id " << sessionId); + for (const auto &it : _sessions) + { + if (it.second->getId() == sessionId) + { + clientSession = it.second; + break; + } + } + if (!clientSession) + { + LOG_ERR("Invalid session id used " << sessionId); + throw BadRequestException("invalid session id"); + } + } + + auto protocol = clientSession->getProtocol(); + auto streamSocket = std::static_pointer_cast<StreamSocket>(socket); + streamSocket->setHandler(protocol); + + // this DocumentBroker's poll handles reading & writing + addSocketToPoll(socket); + + auto proxy = std::static_pointer_cast<ProxyProtocolHandler>( + protocol); + + proxy->handleRequest(uriPublic.toString(), socket); +} + +void ProxyProtocolHandler::handleRequest(const std::string &uriPublic, + const std::shared_ptr<Socket> &socket) +{ + bool bRead = uriPublic.find("/write") == std::string::npos; + LOG_INF("Proxy handle request " << uriPublic << " type: " << + (bRead ? "read" : "write")); + (void)socket; +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp new file mode 100644 index 000000000..bd6beac3d --- /dev/null +++ b/wsd/ProxyProtocol.hpp @@ -0,0 +1,97 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include <net/Socket.hpp> + +/// Interface for building a websocket from this ... +class ProxyProtocolHandler : public ProtocolHandlerInterface +{ +public: + ProxyProtocolHandler() + { + } + + virtual ~ProxyProtocolHandler() + { + } + + /// Will be called exactly once by setHandler + void onConnect(const std::shared_ptr<StreamSocket>& /* socket */) override + { + } + + /// Called after successful socket reads. + void handleIncomingMessage(SocketDisposition &/* disposition */) override + { + assert("we get our data a different way" && false); + } + + int getPollEvents(std::chrono::steady_clock::time_point /* now */, + int &/* timeoutMaxMs */) override + { + // underlying buffer based polling is fine. + return POLLIN; + } + + void checkTimeout(std::chrono::steady_clock::time_point /* now */) override + { + } + + void performWrites() override + { + } + + void onDisconnect() override + { + // connections & sockets come and go a lot. + } + +public: + /// Clear all external references + virtual void dispose() { _msgHandler.reset(); } + + int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override + { + LOG_TRC("ProxyHack - send text msg " + std::string(msg, len)); + (void) flush; + return len; + } + + int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override + { + (void) data; (void) flush; + LOG_TRC("ProxyHack - send binary msg len " << len); + return len; + } + + void shutdown(bool goingAway = false, const std::string &statusMessage = "") override + { + LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage); + } + + void getIOStats(uint64_t &sent, uint64_t &recv) override + { + sent = recv = 0; + } + + void dumpState(std::ostream& os) + { + os << "proxy protocol\n"; + } + + void handleRequest(const std::string &uriPublic, + const std::shared_ptr<Socket> &socket); + +private: + std::vector<std::weak_ptr<StreamSocket>> _sockets; +}; + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ |