From ea7d0091545450721d8b9b901e2593401b50d24c Mon Sep 17 00:00:00 2001 From: AppaRao Puli Date: Fri, 12 Mar 2021 18:53:25 +0000 Subject: [PATCH] Add Server-Sent-Events support Server-Sent Events is a standard describing how servers can initiate data transmission towards clients once an initial client connection has been established. Unlike websockets (which are bidirectional), Server-Sent Events are unidirectional and commonly used to send message updates or continuous data streams to a browser client. This is base patch for adding Server-Sent events support to bmcweb. Redfish eventservice SSE style subscription uses this and will be loaded on top of this commit. Tested: - Tested using follow-up patch on top which adds support for Redfish EventService SSE style subscription and observed events are getting sent periodically. Change-Id: I36956565cbba30c2007852c9471f477f6d1736e9 Signed-off-by: AppaRao Puli Signed-off-by: P Dheeraj Srujan Kumar --- http/http_connection.hpp | 10 +- http/http_response.hpp | 7 +- http/routing.hpp | 71 ++++++++++ http/server_sent_event.hpp | 279 +++++++++++++++++++++++++++++++++++++ 4 files changed, 362 insertions(+), 5 deletions(-) create mode 100644 http/server_sent_event.hpp diff --git a/http/http_connection.hpp b/http/http_connection.hpp index 0f20761..9cf603d 100644 --- a/http/http_connection.hpp +++ b/http/http_connection.hpp @@ -362,11 +362,13 @@ class Connection : [self] { self->completeRequest(); }); }); - if (thisReq.isUpgrade() && - boost::iequals( - thisReq.getHeaderValue(boost::beast::http::field::upgrade), - "websocket")) + if ((thisReq.isUpgrade() && + boost::iequals( + thisReq.getHeaderValue(boost::beast::http::field::upgrade), + "websocket")) || + (req->url == "/sse")) { + BMCWEB_LOG_DEBUG << "Request: " << this << " is getting upgraded"; handler->handleUpgrade(thisReq, res, std::move(adaptor)); // delete lambda with self shared_ptr // to enable connection destruction diff --git a/http/http_response.hpp b/http/http_response.hpp index a983d4a..07b0265 100644 --- a/http/http_response.hpp +++ b/http/http_response.hpp @@ -15,10 +15,15 @@ namespace crow template class Connection; +template +class SseConnectionImpl; + struct Response { template friend class crow::Connection; + template + friend class crow::SseConnectionImpl; using response_type = boost::beast::http::response; @@ -143,8 +148,8 @@ struct Response private: bool completed{}; - std::function completeRequestHandler; std::function isAliveHelper; + std::function completeRequestHandler; // In case of a JSON object, set the Content-Type header void jsonMode() diff --git a/http/routing.hpp b/http/routing.hpp index fe9c7e9..c748580 100644 --- a/http/routing.hpp +++ b/http/routing.hpp @@ -7,6 +7,7 @@ #include "http_response.hpp" #include "logging.hpp" #include "privileges.hpp" +#include "server_sent_event.hpp" #include "sessions.hpp" #include "utility.hpp" #include "websocket.hpp" @@ -397,6 +398,68 @@ class WebSocketRule : public BaseRule std::function errorHandler; }; +class SseSocketRule : public BaseRule +{ + using self_t = SseSocketRule; + + public: + SseSocketRule(const std::string& ruleIn) : BaseRule(ruleIn) + {} + + void validate() override + {} + + void handle(const Request&, + const std::shared_ptr& asyncResp, + const RoutingParams&) override + { + asyncResp->res.result(boost::beast::http::status::not_found); + } + + void handleUpgrade(const Request& req, Response&, + boost::asio::ip::tcp::socket&& adaptor) override + { + std::shared_ptr> + myConnection = std::make_shared< + crow::SseConnectionImpl>( + req, std::move(adaptor), openHandler, closeHandler); + myConnection->start(); + } +#ifdef BMCWEB_ENABLE_SSL + void handleUpgrade(const Request& req, Response&, + boost::beast::ssl_stream&& + adaptor) override + { + std::shared_ptr>> + myConnection = std::make_shared>>( + req, std::move(adaptor), openHandler, closeHandler); + myConnection->start(); + } +#endif + + template + self_t& onopen(Func f) + { + openHandler = f; + return *this; + } + + template + self_t& onclose(Func f) + { + closeHandler = f; + return *this; + } + + private: + std::function&, + const crow::Request&, crow::Response&)> + openHandler; + std::function&)> closeHandler; +}; + template struct RuleParameterTraits { @@ -409,6 +472,14 @@ struct RuleParameterTraits return *p; } + SseSocketRule& serverSentEvent() + { + self_t* self = static_cast(this); + SseSocketRule* p = new SseSocketRule(self->rule); + self->ruleToUpgrade.reset(p); + return *p; + } + self_t& name(const std::string_view name) noexcept { self_t* self = static_cast(this); diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp new file mode 100644 index 0000000..41d18ed --- /dev/null +++ b/http/server_sent_event.hpp @@ -0,0 +1,279 @@ +#pragma once +#include "http_request.hpp" + +#include +#include +#include +#include + +#include +#include + +#ifdef BMCWEB_ENABLE_SSL +#include +#endif + +namespace crow +{ + +struct SseConnection : std::enable_shared_from_this +{ + public: + SseConnection(const crow::Request& reqIn) : req(reqIn) + {} + virtual ~SseConnection() = default; + + virtual boost::asio::io_context& getIoContext() = 0; + virtual void sendSSEHeader() = 0; + virtual void completeRequest() = 0; + virtual void close(const std::string_view msg = "quit") = 0; + virtual void sendEvent(const std::string_view id, + const std::string_view msg) = 0; + + crow::Request req; + crow::Response res; +}; + +template +class SseConnectionImpl : public SseConnection +{ + public: + SseConnectionImpl( + const crow::Request& reqIn, Adaptor adaptorIn, + std::function&, + const crow::Request&, crow::Response&)> + openHandler, + std::function&)> closeHandler) : + SseConnection(reqIn), + adaptor(std::move(adaptorIn)), openHandler(std::move(openHandler)), + closeHandler(std::move(closeHandler)) + { + BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this; + } + + ~SseConnectionImpl() override + { + res.completeRequestHandler = nullptr; + BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE destructor " << this; + } + + boost::asio::io_context& getIoContext() override + { + return static_cast( + adaptor.get_executor().context()); + } + + void start() + { + // Register for completion callback. + res.completeRequestHandler = [this, self(shared_from_this())] { + boost::asio::post(this->adaptor.get_executor(), + [self] { self->completeRequest(); }); + }; + + if (openHandler) + { + std::shared_ptr self = this->shared_from_this(); + openHandler(self, req, res); + } + } + + void close(const std::string_view msg) override + { + BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; + boost::beast::get_lowest_layer(adaptor).close(); + + // send notification to handler for cleanup + if (closeHandler) + { + std::shared_ptr self = this->shared_from_this(); + closeHandler(self); + } + } + + void sendSSEHeader() override + { + BMCWEB_LOG_DEBUG << "Starting SSE connection"; + using BodyType = boost::beast::http::buffer_body; + auto response = + std::make_shared>( + boost::beast::http::status::ok, 11); + auto serializer = + std::make_shared>( + *response); + + response->set(boost::beast::http::field::server, "bmcweb"); + response->set(boost::beast::http::field::content_type, + "text/event-stream"); + response->body().data = nullptr; + response->body().size = 0; + response->body().more = true; + + boost::beast::http::async_write_header( + adaptor, *serializer, + [this, self(shared_from_this()), response, serializer]( + const boost::beast::error_code& ec, const std::size_t&) { + if (ec) + { + BMCWEB_LOG_ERROR << "Error sending header" << ec; + close("async_write_header failed"); + return; + } + BMCWEB_LOG_DEBUG << "SSE header sent - Connection established"; + + // SSE stream header sent, So lets setup monitor. + // Any read data on this stream will be error in case of SSE. + setupRead(); + }); + } + + void setupRead() + { + adaptor.async_read_some( + outputBuffer.prepare(outputBuffer.capacity() - outputBuffer.size()), + [this](const boost::system::error_code& ec, std::size_t bytesRead) { + BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead + << " bytes"; + if (ec) + { + BMCWEB_LOG_ERROR << "Read error: " << ec; + } + outputBuffer.commit(bytesRead); + outputBuffer.consume(bytesRead); + + // After establishing SSE stream, Reading data on this + // stream means client is disobeys the SSE protocol. + // Read the data to avoid buffer attacks and close connection. + close("Close SSE connection"); + return; + }); + } + + void doWrite() + { + if (doingWrite) + { + return; + } + if (inputBuffer.size() == 0) + { + BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out"; + return; + } + doingWrite = true; + + adaptor.async_write_some( + inputBuffer.data(), [this, self(shared_from_this())]( + boost::beast::error_code ec, + const std::size_t& bytesTransferred) { + doingWrite = false; + inputBuffer.consume(bytesTransferred); + + if (ec == boost::asio::error::eof) + { + BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed"; + close("SSE stream closed"); + return; + } + + if (ec) + { + BMCWEB_LOG_ERROR << "async_write_some() failed: " + << ec.message(); + close("async_write_some failed"); + return; + } + BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: " + << bytesTransferred; + + doWrite(); + }); + } + + void completeRequest() override + { + BMCWEB_LOG_DEBUG << "SSE completeRequest() handler"; + if (res.body().empty() && !res.jsonValue.empty()) + { + res.addHeader("Content-Type", "application/json"); + res.body() = res.jsonValue.dump( + 2, ' ', true, nlohmann::json::error_handler_t::replace); + } + + res.preparePayload(); + auto serializer = + std::make_shared>(*res.stringResponse); + + boost::beast::http::async_write( + adaptor, *serializer, + [this, self(shared_from_this()), + serializer](const boost::system::error_code& ec, + std::size_t bytesTransferred) { + BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred + << " bytes"; + if (ec) + { + BMCWEB_LOG_DEBUG << this << " from async_write failed"; + return; + } + res.clear(); + + BMCWEB_LOG_DEBUG << this + << " Closing SSE connection - Request invalid"; + close("Request invalid"); + }); + + // delete lambda with self shared_ptr + // to enable connection destruction + res.completeRequestHandler = nullptr; + } + + void sendEvent(const std::string_view id, + const std::string_view msg) override + { + if (msg.empty()) + { + BMCWEB_LOG_DEBUG << "Empty data, bailing out."; + return; + } + + std::string rawData; + if (!id.empty()) + { + rawData += "id: "; + rawData.append(id.begin(), id.end()); + rawData += "\n"; + } + + rawData += "data: "; + for (char character : msg) + { + rawData += character; + if (character == '\n') + { + rawData += "data: "; + } + } + rawData += "\n\n"; + + boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), + boost::asio::buffer(rawData)); + inputBuffer.commit(rawData.size()); + + doWrite(); + } + + private: + Adaptor adaptor; + + boost::beast::flat_static_buffer<1024U * 8U> outputBuffer; + boost::beast::flat_static_buffer<1024U * 64U> inputBuffer; + bool doingWrite = false; + + std::function&, const crow::Request&, + crow::Response&)> + openHandler; + std::function&)> closeHandler; +}; +} // namespace crow -- 2.17.1