diff options
author | Ed Tanous <edtanous@google.com> | 2022-02-22 08:33:06 +0300 |
---|---|---|
committer | Ed Tanous <ed@tanous.net> | 2023-03-23 00:09:03 +0300 |
commit | 863c1c2e78a8cce2e6b5cf0e80b8cf75fbc801b3 (patch) | |
tree | 918e715fe718633d772802ba52b000c7f1f97b66 /http/websocket.hpp | |
parent | 2da6b8c94f7ef7d99d27de1815fd3568b350aaaf (diff) | |
download | bmcweb-863c1c2e78a8cce2e6b5cf0e80b8cf75fbc801b3.tar.xz |
nbd proxy and websocket cleanups
As-written, the nbd (and all websocket daemons) suffer from a problem
where there is no way to apply socket backpressure, so in certain
conditions, it's trivial to run the BMC out of memory on a given
message. This is a problem.
This commit implements the idea of an incremental callback handler, that
accepts a callback function to be run when the processing of the message
is complete. This allows applying backpressure on the socket, which in
turn, should provide pressure back to the client, and prevent buffering
crashes on slow connections, or connections with high latency.
Tested: NBD proxy not upstream, no way to test. No changes made to
normal websocket flow.
Signed-off-by: Michal Orzel <michalx.orzel@intel.com>
Signed-off-by: Ed Tanous <edtanous@google.com>
Change-Id: I3f116cc91eeadc949579deacbeb2d9f5e0f4fa53
Diffstat (limited to 'http/websocket.hpp')
-rw-r--r-- | http/websocket.hpp | 149 |
1 files changed, 126 insertions, 23 deletions
diff --git a/http/websocket.hpp b/http/websocket.hpp index 216e96fef6..9a5aa29737 100644 --- a/http/websocket.hpp +++ b/http/websocket.hpp @@ -3,6 +3,7 @@ #include "http_request.hpp" #include <boost/asio/buffer.hpp> +#include <boost/beast/core/multi_buffer.hpp> #include <boost/beast/websocket.hpp> #include <array> @@ -17,6 +18,12 @@ namespace crow namespace websocket { +enum class MessageType +{ + Binary, + Text, +}; + struct Connection : std::enable_shared_from_this<Connection> { public: @@ -30,9 +37,13 @@ struct Connection : std::enable_shared_from_this<Connection> virtual void sendBinary(std::string_view msg) = 0; virtual void sendBinary(std::string&& msg) = 0; + virtual void sendEx(MessageType type, std::string_view msg, + std::function<void()>&& onDone) = 0; virtual void sendText(std::string_view msg) = 0; virtual void sendText(std::string&& msg) = 0; virtual void close(std::string_view msg = "quit") = 0; + virtual void deferRead() = 0; + virtual void resumeRead() = 0; virtual boost::asio::io_context& getIoContext() = 0; virtual ~Connection() = default; @@ -48,12 +59,17 @@ class ConnectionImpl : public Connection std::function<void(Connection&)> openHandlerIn, std::function<void(Connection&, const std::string&, bool)> messageHandlerIn, + std::function<void(crow::websocket::Connection&, std::string_view, + crow::websocket::MessageType type, + std::function<void()>&& whenComplete)> + messageExHandlerIn, std::function<void(Connection&, const std::string&)> closeHandlerIn, std::function<void(Connection&)> errorHandlerIn) : Connection(reqIn), ws(std::move(adaptorIn)), inBuffer(inString, 131088), openHandler(std::move(openHandlerIn)), messageHandler(std::move(messageHandlerIn)), + messageExHandler(std::move(messageExHandlerIn)), closeHandler(std::move(closeHandlerIn)), errorHandler(std::move(errorHandlerIn)), session(reqIn.session) { @@ -126,28 +142,61 @@ class ConnectionImpl : public Connection void sendBinary(std::string_view msg) override { ws.binary(true); - outBuffer.emplace_back(msg); + outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), + boost::asio::buffer(msg))); doWrite(); } + void sendEx(MessageType type, std::string_view msg, + std::function<void()>&& onDone) override + { + if (doingWrite) + { + BMCWEB_LOG_CRITICAL + << "Cannot mix sendEx usage with sendBinary or sendText"; + onDone(); + return; + } + ws.binary(type == MessageType::Binary); + + ws.async_write(boost::asio::buffer(msg), + [weak(weak_from_this()), onDone{std::move(onDone)}]( + const boost::beast::error_code& ec, size_t) { + std::shared_ptr<Connection> self = weak.lock(); + + // Call the done handler regardless of whether we + // errored, but before we close things out + onDone(); + + if (ec) + { + BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec; + self->close("write error"); + } + }); + } + void sendBinary(std::string&& msg) override { ws.binary(true); - outBuffer.emplace_back(std::move(msg)); + outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), + boost::asio::buffer(msg))); doWrite(); } void sendText(std::string_view msg) override { ws.text(true); - outBuffer.emplace_back(msg); + outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), + boost::asio::buffer(msg))); doWrite(); } void sendText(std::string&& msg) override { ws.text(true); - outBuffer.emplace_back(std::move(msg)); + outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()), + boost::asio::buffer(msg))); doWrite(); } @@ -172,19 +221,41 @@ class ConnectionImpl : public Connection { BMCWEB_LOG_DEBUG << "Websocket accepted connection"; - doRead(); - if (openHandler) { openHandler(*this); } + doRead(); + } + + void deferRead() override + { + readingDefered = true; + + // If we're not actively reading, we need to take ownership of + // ourselves for a small portion of time, do that, and clear when we + // resume. + selfOwned = shared_from_this(); + } + + void resumeRead() override + { + readingDefered = false; + doRead(); + + // No longer need to keep ourselves alive now that read is active. + selfOwned.reset(); } void doRead() { - ws.async_read(inBuffer, - [this, self(shared_from_this())]( - boost::beast::error_code ec, std::size_t bytesRead) { + if (readingDefered) + { + return; + } + ws.async_read(inBuffer, [this, self(shared_from_this())]( + const boost::beast::error_code& ec, + size_t bytesRead) { if (ec) { if (ec != boost::beast::websocket::error::closed) @@ -198,16 +269,10 @@ class ConnectionImpl : public Connection } return; } - if (messageHandler) - { - messageHandler(*this, inString, ws.got_text()); - } - inBuffer.consume(bytesRead); - inString.clear(); - doRead(); + + handleMessage(bytesRead); }); } - void doWrite() { // If we're already doing a write, ignore the request, it will be picked @@ -217,17 +282,17 @@ class ConnectionImpl : public Connection return; } - if (outBuffer.empty()) + if (outBuffer.size() == 0) { // Done for now return; } doingWrite = true; - ws.async_write(boost::asio::buffer(outBuffer.front()), - [this, self(shared_from_this())]( - boost::beast::error_code ec, std::size_t) { + ws.async_write(outBuffer.data(), [this, self(shared_from_this())]( + const boost::beast::error_code& ec, + size_t bytesSent) { doingWrite = false; - outBuffer.erase(outBuffer.begin()); + outBuffer.consume(bytesSent); if (ec == boost::beast::websocket::error::closed) { // Do nothing here. doRead handler will call the @@ -245,21 +310,59 @@ class ConnectionImpl : public Connection } private: + void handleMessage(size_t bytesRead) + { + if (messageExHandler) + { + // Note, because of the interactions with the read buffers, + // this message handler overrides the normal message handler + messageExHandler(*this, inString, MessageType::Binary, + [this, self(shared_from_this()), bytesRead]() { + if (self == nullptr) + { + return; + } + + inBuffer.consume(bytesRead); + inString.clear(); + + doRead(); + }); + return; + } + + if (messageHandler) + { + messageHandler(*this, inString, ws.got_text()); + } + inBuffer.consume(bytesRead); + inString.clear(); + doRead(); + } + boost::beast::websocket::stream<Adaptor, false> ws; + bool readingDefered = false; std::string inString; boost::asio::dynamic_string_buffer<std::string::value_type, std::string::traits_type, std::string::allocator_type> inBuffer; - std::vector<std::string> outBuffer; + + boost::beast::multi_buffer outBuffer; bool doingWrite = false; std::function<void(Connection&)> openHandler; std::function<void(Connection&, const std::string&, bool)> messageHandler; + std::function<void(crow::websocket::Connection&, std::string_view, + crow::websocket::MessageType type, + std::function<void()>&& whenComplete)> + messageExHandler; std::function<void(Connection&, const std::string&)> closeHandler; std::function<void(Connection&)> errorHandler; std::shared_ptr<persistent_data::UserSession> session; + + std::shared_ptr<Connection> selfOwned; }; } // namespace websocket } // namespace crow |