summaryrefslogtreecommitdiff
path: root/http/websocket.hpp
diff options
context:
space:
mode:
authorEd Tanous <edtanous@google.com>2022-02-22 08:33:06 +0300
committerEd Tanous <ed@tanous.net>2023-03-23 00:09:03 +0300
commit863c1c2e78a8cce2e6b5cf0e80b8cf75fbc801b3 (patch)
tree918e715fe718633d772802ba52b000c7f1f97b66 /http/websocket.hpp
parent2da6b8c94f7ef7d99d27de1815fd3568b350aaaf (diff)
downloadbmcweb-863c1c2e78a8cce2e6b5cf0e80b8cf75fbc801b3.tar.xz
nbd proxy and websocket cleanups
As-written, the nbd (and all websocket daemons) suffer from a problem where there is no way to apply socket backpressure, so in certain conditions, it's trivial to run the BMC out of memory on a given message. This is a problem. This commit implements the idea of an incremental callback handler, that accepts a callback function to be run when the processing of the message is complete. This allows applying backpressure on the socket, which in turn, should provide pressure back to the client, and prevent buffering crashes on slow connections, or connections with high latency. Tested: NBD proxy not upstream, no way to test. No changes made to normal websocket flow. Signed-off-by: Michal Orzel <michalx.orzel@intel.com> Signed-off-by: Ed Tanous <edtanous@google.com> Change-Id: I3f116cc91eeadc949579deacbeb2d9f5e0f4fa53
Diffstat (limited to 'http/websocket.hpp')
-rw-r--r--http/websocket.hpp149
1 files changed, 126 insertions, 23 deletions
diff --git a/http/websocket.hpp b/http/websocket.hpp
index 216e96fef6..9a5aa29737 100644
--- a/http/websocket.hpp
+++ b/http/websocket.hpp
@@ -3,6 +3,7 @@
#include "http_request.hpp"
#include <boost/asio/buffer.hpp>
+#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/websocket.hpp>
#include <array>
@@ -17,6 +18,12 @@ namespace crow
namespace websocket
{
+enum class MessageType
+{
+ Binary,
+ Text,
+};
+
struct Connection : std::enable_shared_from_this<Connection>
{
public:
@@ -30,9 +37,13 @@ struct Connection : std::enable_shared_from_this<Connection>
virtual void sendBinary(std::string_view msg) = 0;
virtual void sendBinary(std::string&& msg) = 0;
+ virtual void sendEx(MessageType type, std::string_view msg,
+ std::function<void()>&& onDone) = 0;
virtual void sendText(std::string_view msg) = 0;
virtual void sendText(std::string&& msg) = 0;
virtual void close(std::string_view msg = "quit") = 0;
+ virtual void deferRead() = 0;
+ virtual void resumeRead() = 0;
virtual boost::asio::io_context& getIoContext() = 0;
virtual ~Connection() = default;
@@ -48,12 +59,17 @@ class ConnectionImpl : public Connection
std::function<void(Connection&)> openHandlerIn,
std::function<void(Connection&, const std::string&, bool)>
messageHandlerIn,
+ std::function<void(crow::websocket::Connection&, std::string_view,
+ crow::websocket::MessageType type,
+ std::function<void()>&& whenComplete)>
+ messageExHandlerIn,
std::function<void(Connection&, const std::string&)> closeHandlerIn,
std::function<void(Connection&)> errorHandlerIn) :
Connection(reqIn),
ws(std::move(adaptorIn)), inBuffer(inString, 131088),
openHandler(std::move(openHandlerIn)),
messageHandler(std::move(messageHandlerIn)),
+ messageExHandler(std::move(messageExHandlerIn)),
closeHandler(std::move(closeHandlerIn)),
errorHandler(std::move(errorHandlerIn)), session(reqIn.session)
{
@@ -126,28 +142,61 @@ class ConnectionImpl : public Connection
void sendBinary(std::string_view msg) override
{
ws.binary(true);
- outBuffer.emplace_back(msg);
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
+ void sendEx(MessageType type, std::string_view msg,
+ std::function<void()>&& onDone) override
+ {
+ if (doingWrite)
+ {
+ BMCWEB_LOG_CRITICAL
+ << "Cannot mix sendEx usage with sendBinary or sendText";
+ onDone();
+ return;
+ }
+ ws.binary(type == MessageType::Binary);
+
+ ws.async_write(boost::asio::buffer(msg),
+ [weak(weak_from_this()), onDone{std::move(onDone)}](
+ const boost::beast::error_code& ec, size_t) {
+ std::shared_ptr<Connection> self = weak.lock();
+
+ // Call the done handler regardless of whether we
+ // errored, but before we close things out
+ onDone();
+
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec;
+ self->close("write error");
+ }
+ });
+ }
+
void sendBinary(std::string&& msg) override
{
ws.binary(true);
- outBuffer.emplace_back(std::move(msg));
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
void sendText(std::string_view msg) override
{
ws.text(true);
- outBuffer.emplace_back(msg);
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
void sendText(std::string&& msg) override
{
ws.text(true);
- outBuffer.emplace_back(std::move(msg));
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
@@ -172,19 +221,41 @@ class ConnectionImpl : public Connection
{
BMCWEB_LOG_DEBUG << "Websocket accepted connection";
- doRead();
-
if (openHandler)
{
openHandler(*this);
}
+ doRead();
+ }
+
+ void deferRead() override
+ {
+ readingDefered = true;
+
+ // If we're not actively reading, we need to take ownership of
+ // ourselves for a small portion of time, do that, and clear when we
+ // resume.
+ selfOwned = shared_from_this();
+ }
+
+ void resumeRead() override
+ {
+ readingDefered = false;
+ doRead();
+
+ // No longer need to keep ourselves alive now that read is active.
+ selfOwned.reset();
}
void doRead()
{
- ws.async_read(inBuffer,
- [this, self(shared_from_this())](
- boost::beast::error_code ec, std::size_t bytesRead) {
+ if (readingDefered)
+ {
+ return;
+ }
+ ws.async_read(inBuffer, [this, self(shared_from_this())](
+ const boost::beast::error_code& ec,
+ size_t bytesRead) {
if (ec)
{
if (ec != boost::beast::websocket::error::closed)
@@ -198,16 +269,10 @@ class ConnectionImpl : public Connection
}
return;
}
- if (messageHandler)
- {
- messageHandler(*this, inString, ws.got_text());
- }
- inBuffer.consume(bytesRead);
- inString.clear();
- doRead();
+
+ handleMessage(bytesRead);
});
}
-
void doWrite()
{
// If we're already doing a write, ignore the request, it will be picked
@@ -217,17 +282,17 @@ class ConnectionImpl : public Connection
return;
}
- if (outBuffer.empty())
+ if (outBuffer.size() == 0)
{
// Done for now
return;
}
doingWrite = true;
- ws.async_write(boost::asio::buffer(outBuffer.front()),
- [this, self(shared_from_this())](
- boost::beast::error_code ec, std::size_t) {
+ ws.async_write(outBuffer.data(), [this, self(shared_from_this())](
+ const boost::beast::error_code& ec,
+ size_t bytesSent) {
doingWrite = false;
- outBuffer.erase(outBuffer.begin());
+ outBuffer.consume(bytesSent);
if (ec == boost::beast::websocket::error::closed)
{
// Do nothing here. doRead handler will call the
@@ -245,21 +310,59 @@ class ConnectionImpl : public Connection
}
private:
+ void handleMessage(size_t bytesRead)
+ {
+ if (messageExHandler)
+ {
+ // Note, because of the interactions with the read buffers,
+ // this message handler overrides the normal message handler
+ messageExHandler(*this, inString, MessageType::Binary,
+ [this, self(shared_from_this()), bytesRead]() {
+ if (self == nullptr)
+ {
+ return;
+ }
+
+ inBuffer.consume(bytesRead);
+ inString.clear();
+
+ doRead();
+ });
+ return;
+ }
+
+ if (messageHandler)
+ {
+ messageHandler(*this, inString, ws.got_text());
+ }
+ inBuffer.consume(bytesRead);
+ inString.clear();
+ doRead();
+ }
+
boost::beast::websocket::stream<Adaptor, false> ws;
+ bool readingDefered = false;
std::string inString;
boost::asio::dynamic_string_buffer<std::string::value_type,
std::string::traits_type,
std::string::allocator_type>
inBuffer;
- std::vector<std::string> outBuffer;
+
+ boost::beast::multi_buffer outBuffer;
bool doingWrite = false;
std::function<void(Connection&)> openHandler;
std::function<void(Connection&, const std::string&, bool)> messageHandler;
+ std::function<void(crow::websocket::Connection&, std::string_view,
+ crow::websocket::MessageType type,
+ std::function<void()>&& whenComplete)>
+ messageExHandler;
std::function<void(Connection&, const std::string&)> closeHandler;
std::function<void(Connection&)> errorHandler;
std::shared_ptr<persistent_data::UserSession> session;
+
+ std::shared_ptr<Connection> selfOwned;
};
} // namespace websocket
} // namespace crow