From 6fde95fad082fa7d6fc54f2ef8584e06fb116d42 Mon Sep 17 00:00:00 2001 From: Ed Tanous Date: Thu, 1 Jun 2023 07:33:34 -0700 Subject: Server-sent-event fixes This makes several changes to server-sent events to allow it to merge to master. The routing system has been removed in leiu of using content-type eventstream detection. Timers have been added to the sse connections, and sse connections now rely on async_wait, rather than a full read. Tested: WIP Signed-off-by: Ed Tanous Change-Id: Id0ff0ebc2b3a795b3dba008e440556a9fdd882c2 --- http/server_sent_event.hpp | 189 +++++++++++++-------------------------------- 1 file changed, 55 insertions(+), 134 deletions(-) (limited to 'http/server_sent_event.hpp') diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp index 58659c84a0..02af0b7523 100644 --- a/http/server_sent_event.hpp +++ b/http/server_sent_event.hpp @@ -1,6 +1,5 @@ #pragma once -#include "async_resolve.hpp" -#include "async_resp.hpp" +#include "dbus_singleton.hpp" #include "http_request.hpp" #include "http_response.hpp" @@ -23,13 +22,10 @@ namespace crow namespace sse_socket { -static constexpr const std::array sseRoutes = { - "/redfish/v1/EventService/SSE"}; - struct Connection : std::enable_shared_from_this { public: - explicit Connection(const crow::Request& reqIn) : req(reqIn) {} + Connection() = default; Connection(const Connection&) = delete; Connection(Connection&&) = delete; @@ -38,26 +34,19 @@ struct Connection : std::enable_shared_from_this virtual ~Connection() = default; virtual boost::asio::io_context& getIoContext() = 0; - virtual void sendSSEHeader() = 0; - virtual void completeRequest(crow::Response& thisRes) = 0; virtual void close(std::string_view msg = "quit") = 0; virtual void sendEvent(std::string_view id, std::string_view msg) = 0; - - crow::Request req; }; template class ConnectionImpl : public Connection { public: - ConnectionImpl( - const crow::Request& reqIn, Adaptor adaptorIn, - std::function&, const crow::Request&, - const std::shared_ptr&)> - openHandlerIn, - std::function&)> closeHandlerIn) : - Connection(reqIn), - adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)), + ConnectionImpl(Adaptor&& adaptorIn, + std::function openHandlerIn, + std::function closeHandlerIn) : + adaptor(std::move(adaptorIn)), + timer(ioc), openHandler(std::move(openHandlerIn)), closeHandler(std::move(closeHandlerIn)) { BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this; @@ -81,61 +70,47 @@ class ConnectionImpl : public Connection void start() { - if (openHandler) + if (!openHandler) { - auto asyncResp = std::make_shared(); - std::shared_ptr self = this->shared_from_this(); - - asyncResp->res.setCompleteRequestHandler( - [self(shared_from_this())](crow::Response& thisRes) { - if (thisRes.resultInt() != 200) - { - self->completeRequest(thisRes); - } - }); - - openHandler(self, req, asyncResp); + BMCWEB_LOG_CRITICAL << "No open handler???"; + return; } + openHandler(*this); } void close(const std::string_view msg) override { - BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; - boost::beast::get_lowest_layer(adaptor).close(); - // send notification to handler for cleanup if (closeHandler) { - std::shared_ptr self = shared_from_this(); - closeHandler(self); + closeHandler(*this); } + BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; + boost::beast::get_lowest_layer(adaptor).close(); } - void sendSSEHeader() override + void sendSSEHeader() { BMCWEB_LOG_DEBUG << "Starting SSE connection"; - auto asyncResp = std::make_shared(); using BodyType = boost::beast::http::buffer_body; - auto response = - std::make_shared>( - boost::beast::http::status::ok, 11); - - serializer.emplace(*asyncResp->res.stringResponse); - - response->set(boost::beast::http::field::content_type, - "text/event-stream"); - response->body().more = true; + boost::beast::http::response res( + boost::beast::http::status::ok, 11, BodyType{}); + res.set(boost::beast::http::field::content_type, "text/event-stream"); + res.body().more = true; + boost::beast::http::response_serializer& ser = + serializer.emplace(std::move(res)); boost::beast::http::async_write_header( - adaptor, *serializer, + adaptor, ser, std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, shared_from_this())); } void sendSSEHeaderCallback(const std::shared_ptr& /*self*/, - const boost::beast::error_code& ec, - const std::size_t& /*unused*/) + const boost::system::error_code& ec, + size_t /*bytesSent*/) { + serializer.reset(); if (ec) { BMCWEB_LOG_ERROR << "Error sending header" << ec; @@ -148,41 +123,29 @@ class ConnectionImpl : public Connection // SSE stream header sent, So let us setup monitor. // Any read data on this stream will be error in case of SSE. - setupRead(); - } - void setupRead() - { - std::weak_ptr weakSelf = weak_from_this(); - - boost::beast::http::async_read_some( - adaptor, outputBuffer, *parser, - std::bind_front(&ConnectionImpl::setupReadCallback, this, - weak_from_this())); + adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error, + std::bind_front(&ConnectionImpl::afterReadError, + this, shared_from_this())); } - void setupReadCallback(const std::weak_ptr& weakSelf, - const boost::system::error_code& ec, - size_t bytesRead) + void afterReadError(const std::shared_ptr& /*self*/, + const boost::system::error_code& ec) { - std::shared_ptr self = weakSelf.lock(); - BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes"; + if (ec == boost::asio::error::operation_aborted) + { + return; + } if (ec) { BMCWEB_LOG_ERROR << "Read error: " << ec; } - // After establishing SSE stream, Reading data on this - // stream means client is disobeys the SSE protocol. - // Read the data to avoid buffer attacks and close connection. - - self->close("Close SSE connection"); + close("Close SSE connection"); } void doWrite() { - onTimeout(); - if (doingWrite) { return; @@ -192,18 +155,25 @@ class ConnectionImpl : public Connection BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out"; return; } + startTimeout(); doingWrite = true; adaptor.async_write_some( inputBuffer.data(), std::bind_front(&ConnectionImpl::doWriteCallback, this, - shared_from_this())); + weak_from_this())); } - void doWriteCallback(const std::shared_ptr& /*self*/, + void doWriteCallback(const std::weak_ptr& weak, const boost::beast::error_code& ec, - const size_t bytesTransferred) + size_t bytesTransferred) { + auto self = weak.lock(); + if (self == nullptr) + { + return; + } + timer.cancel(); doingWrite = false; inputBuffer.consume(bytesTransferred); @@ -226,48 +196,6 @@ class ConnectionImpl : public Connection doWrite(); } - void completeRequest(crow::Response& thisRes) override - { - auto asyncResp = std::make_shared(); - asyncResp->res = std::move(thisRes); - - if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty()) - { - asyncResp->res.addHeader(boost::beast::http::field::content_type, - "application/json"); - asyncResp->res.body() = asyncResp->res.jsonValue.dump( - 2, ' ', true, nlohmann::json::error_handler_t::replace); - } - - asyncResp->res.preparePayload(); - - serializer.emplace(*asyncResp->res.stringResponse); - - boost::beast::http::async_write_some( - adaptor, *serializer, - std::bind_front(&ConnectionImpl::completeRequestCallback, this, - shared_from_this())); - } - - void completeRequestCallback(const std::shared_ptr& /*self*/, - const boost::system::error_code& ec, - std::size_t bytesTransferred) - { - auto asyncResp = std::make_shared(); - BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred - << " bytes"; - if (ec) - { - BMCWEB_LOG_DEBUG << this << " from async_write failed"; - return; - } - - BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid"; - serializer.reset(); - close("Request invalid"); - asyncResp->res.releaseCompleteRequestHandler(); - } - void sendEvent(std::string_view id, std::string_view msg) override { if (msg.empty()) @@ -276,19 +204,18 @@ class ConnectionImpl : public Connection return; } - dataFormat(id); + dataFormat(id, msg); doWrite(); } - void dataFormat(std::string_view id) + void dataFormat(std::string_view id, std::string_view msg) { - std::string_view msg; std::string rawData; if (!id.empty()) { rawData += "id: "; - rawData.append(id.begin(), id.end()); + rawData.append(id); rawData += "\n"; } @@ -308,9 +235,8 @@ class ConnectionImpl : public Connection inputBuffer.commit(rawData.size()); } - void onTimeout() + void startTimeout() { - boost::asio::steady_timer timer(ioc); std::weak_ptr weakSelf = weak_from_this(); timer.expires_after(std::chrono::seconds(30)); timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, @@ -318,7 +244,7 @@ class ConnectionImpl : public Connection } void onTimeoutCallback(const std::weak_ptr& weakSelf, - const boost::system::error_code ec) + const boost::system::error_code& ec) { std::shared_ptr self = weakSelf.lock(); if (!self) @@ -346,23 +272,18 @@ class ConnectionImpl : public Connection private: Adaptor adaptor; - boost::beast::multi_buffer outputBuffer; boost::beast::multi_buffer inputBuffer; std::optional> + boost::beast::http::buffer_body>> serializer; boost::asio::io_context& ioc = crow::connections::systemBus->get_io_context(); + boost::asio::steady_timer timer; bool doingWrite = false; - std::optional< - boost::beast::http::request_parser> - parser; - - std::function&, const crow::Request&, - const std::shared_ptr&)> - openHandler; - std::function&)> closeHandler; + + std::function openHandler; + std::function closeHandler; }; } // namespace sse_socket } // namespace crow -- cgit v1.2.3