diff options
author | Ed Tanous <edtanous@google.com> | 2023-06-01 17:33:34 +0300 |
---|---|---|
committer | Ed Tanous <ed@tanous.net> | 2023-06-01 23:43:11 +0300 |
commit | 6fde95fad082fa7d6fc54f2ef8584e06fb116d42 (patch) | |
tree | 31865ca598fcb00af96d5d33824cf53ae1df4f36 /http | |
parent | 88ada3bc05ea247b6c8a24db49ebfdd268c17f4d (diff) | |
download | bmcweb-6fde95fad082fa7d6fc54f2ef8584e06fb116d42.tar.xz |
Server-sent-event fixes
This makes several changes to server-sent events to allow it to merge
to master. The routing system has been removed in leiu of using
content-type eventstream detection. Timers have been added to the
sse connections, and sse connections now rely on async_wait, rather
than a full read.
Tested: WIP
Signed-off-by: Ed Tanous <edtanous@google.com>
Change-Id: Id0ff0ebc2b3a795b3dba008e440556a9fdd882c2
Diffstat (limited to 'http')
-rw-r--r-- | http/app.hpp | 5 | ||||
-rw-r--r-- | http/http_connection.hpp | 6 | ||||
-rw-r--r-- | http/http_response.hpp | 8 | ||||
-rw-r--r-- | http/routing.hpp | 32 | ||||
-rw-r--r-- | http/server_sent_event.hpp | 189 |
5 files changed, 65 insertions, 175 deletions
diff --git a/http/app.hpp b/http/app.hpp index 6388d84271..d3cf48c1b1 100644 --- a/http/app.hpp +++ b/http/app.hpp @@ -105,11 +105,6 @@ class App router.validate(); } - static bool isSseRoute(Request& req) - { - return Router::isSseRoute(req); - } - void run() { validate(); diff --git a/http/http_connection.hpp b/http/http_connection.hpp index 1b85c6b7b4..7ae22e9e49 100644 --- a/http/http_connection.hpp +++ b/http/http_connection.hpp @@ -243,12 +243,14 @@ class Connection : [self(shared_from_this())](crow::Response& thisRes) { self->completeRequest(thisRes); }); - + bool isSse = + isContentTypeAllowed(req->getHeaderValue("Accept"), + http_helpers::ContentType::EventStream, false); if ((thisReq.isUpgrade() && boost::iequals( thisReq.getHeaderValue(boost::beast::http::field::upgrade), "websocket")) || - (Handler::isSseRoute(*req))) + isSse) { asyncResp->res.setCompleteRequestHandler( [self(shared_from_this())](crow::Response& thisRes) { diff --git a/http/http_response.hpp b/http/http_response.hpp index 06b693915f..1a4ef16d4e 100644 --- a/http/http_response.hpp +++ b/http/http_response.hpp @@ -16,18 +16,10 @@ namespace crow template <typename Adaptor, typename Handler> class Connection; -namespace sse_socket -{ -template <typename Adaptor> -class ConnectionImpl; -} // namespace sse_socket - struct Response { template <typename Adaptor, typename Handler> friend class crow::Connection; - template <typename Adaptor> - friend class crow::sse_socket::ConnectionImpl; using response_type = boost::beast::http::response<boost::beast::http::string_body>; diff --git a/http/routing.hpp b/http/routing.hpp index ead3af9eca..ac1c310ae2 100644 --- a/http/routing.hpp +++ b/http/routing.hpp @@ -391,7 +391,7 @@ class SseSocketRule : public BaseRule } #ifndef BMCWEB_ENABLE_SSL - void handleUpgrade(const Request& req, + void handleUpgrade(const Request& /*req*/, const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/, boost::asio::ip::tcp::socket&& adaptor) override { @@ -399,11 +399,11 @@ class SseSocketRule : public BaseRule crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>> myConnection = std::make_shared< crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>( - req, std::move(adaptor), openHandler, closeHandler); + std::move(adaptor), openHandler, closeHandler); myConnection->start(); } #else - void handleUpgrade(const Request& req, + void handleUpgrade(const Request& /*req*/, const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/, boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&& adaptor) override @@ -412,7 +412,7 @@ class SseSocketRule : public BaseRule boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>> myConnection = std::make_shared<crow::sse_socket::ConnectionImpl< boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>( - req, std::move(adaptor), openHandler, closeHandler); + std::move(adaptor), openHandler, closeHandler); myConnection->start(); } #endif @@ -432,12 +432,8 @@ class SseSocketRule : public BaseRule } private: - std::function<void(std::shared_ptr<crow::sse_socket::Connection>&, - const crow::Request&, - const std::shared_ptr<bmcweb::AsyncResp>&)> - openHandler; - std::function<void(std::shared_ptr<crow::sse_socket::Connection>&)> - closeHandler; + std::function<void(crow::sse_socket::Connection&)> openHandler; + std::function<void(crow::sse_socket::Connection&)> closeHandler; }; template <typename T> @@ -461,13 +457,6 @@ struct RuleParameterTraits return *p; } - self_t& name(std::string_view name) noexcept - { - self_t* self = static_cast<self_t*>(this); - self->nameStr = name; - return *self; - } - self_t& methods(boost::beast::http::verb method) { self_t* self = static_cast<self_t*>(this); @@ -1184,15 +1173,6 @@ class Router return true; } - static bool isSseRoute(Request& req) - { - return std::any_of(sse_socket::sseRoutes.begin(), - sse_socket::sseRoutes.end(), - [&req](const char* sseRoute) { - return (req.url().encoded_path() == sseRoute); - }); - } - static bool isUserPrivileged(Request& req, const std::shared_ptr<bmcweb::AsyncResp>& asyncResp, diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp index 58659c84a0..02af0b7523 100644 --- a/http/server_sent_event.hpp +++ b/http/server_sent_event.hpp @@ -1,6 +1,5 @@ #pragma once -#include "async_resolve.hpp" -#include "async_resp.hpp" +#include "dbus_singleton.hpp" #include "http_request.hpp" #include "http_response.hpp" @@ -23,13 +22,10 @@ namespace crow namespace sse_socket { -static constexpr const std::array<const char*, 1> sseRoutes = { - "/redfish/v1/EventService/SSE"}; - struct Connection : std::enable_shared_from_this<Connection> { public: - explicit Connection(const crow::Request& reqIn) : req(reqIn) {} + Connection() = default; Connection(const Connection&) = delete; Connection(Connection&&) = delete; @@ -38,26 +34,19 @@ struct Connection : std::enable_shared_from_this<Connection> virtual ~Connection() = default; virtual boost::asio::io_context& getIoContext() = 0; - virtual void sendSSEHeader() = 0; - virtual void completeRequest(crow::Response& thisRes) = 0; virtual void close(std::string_view msg = "quit") = 0; virtual void sendEvent(std::string_view id, std::string_view msg) = 0; - - crow::Request req; }; template <typename Adaptor> class ConnectionImpl : public Connection { public: - ConnectionImpl( - const crow::Request& reqIn, Adaptor adaptorIn, - std::function<void(std::shared_ptr<Connection>&, const crow::Request&, - const std::shared_ptr<bmcweb::AsyncResp>&)> - openHandlerIn, - std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) : - Connection(reqIn), - adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)), + ConnectionImpl(Adaptor&& adaptorIn, + std::function<void(Connection&)> openHandlerIn, + std::function<void(Connection&)> closeHandlerIn) : + adaptor(std::move(adaptorIn)), + timer(ioc), openHandler(std::move(openHandlerIn)), closeHandler(std::move(closeHandlerIn)) { BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this; @@ -81,61 +70,47 @@ class ConnectionImpl : public Connection void start() { - if (openHandler) + if (!openHandler) { - auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); - std::shared_ptr<Connection> self = this->shared_from_this(); - - asyncResp->res.setCompleteRequestHandler( - [self(shared_from_this())](crow::Response& thisRes) { - if (thisRes.resultInt() != 200) - { - self->completeRequest(thisRes); - } - }); - - openHandler(self, req, asyncResp); + BMCWEB_LOG_CRITICAL << "No open handler???"; + return; } + openHandler(*this); } void close(const std::string_view msg) override { - BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; - boost::beast::get_lowest_layer(adaptor).close(); - // send notification to handler for cleanup if (closeHandler) { - std::shared_ptr<Connection> self = shared_from_this(); - closeHandler(self); + closeHandler(*this); } + BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; + boost::beast::get_lowest_layer(adaptor).close(); } - void sendSSEHeader() override + void sendSSEHeader() { BMCWEB_LOG_DEBUG << "Starting SSE connection"; - auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); using BodyType = boost::beast::http::buffer_body; - auto response = - std::make_shared<boost::beast::http::response<BodyType>>( - boost::beast::http::status::ok, 11); - - serializer.emplace(*asyncResp->res.stringResponse); - - response->set(boost::beast::http::field::content_type, - "text/event-stream"); - response->body().more = true; + boost::beast::http::response<BodyType> res( + boost::beast::http::status::ok, 11, BodyType{}); + res.set(boost::beast::http::field::content_type, "text/event-stream"); + res.body().more = true; + boost::beast::http::response_serializer<BodyType>& ser = + serializer.emplace(std::move(res)); boost::beast::http::async_write_header( - adaptor, *serializer, + adaptor, ser, std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, shared_from_this())); } void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, - const boost::beast::error_code& ec, - const std::size_t& /*unused*/) + const boost::system::error_code& ec, + size_t /*bytesSent*/) { + serializer.reset(); if (ec) { BMCWEB_LOG_ERROR << "Error sending header" << ec; @@ -148,41 +123,29 @@ class ConnectionImpl : public Connection // SSE stream header sent, So let us setup monitor. // Any read data on this stream will be error in case of SSE. - setupRead(); - } - void setupRead() - { - std::weak_ptr<Connection> weakSelf = weak_from_this(); - - boost::beast::http::async_read_some( - adaptor, outputBuffer, *parser, - std::bind_front(&ConnectionImpl::setupReadCallback, this, - weak_from_this())); + adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error, + std::bind_front(&ConnectionImpl::afterReadError, + this, shared_from_this())); } - void setupReadCallback(const std::weak_ptr<Connection>& weakSelf, - const boost::system::error_code& ec, - size_t bytesRead) + void afterReadError(const std::shared_ptr<Connection>& /*self*/, + const boost::system::error_code& ec) { - std::shared_ptr<Connection> self = weakSelf.lock(); - BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes"; + if (ec == boost::asio::error::operation_aborted) + { + return; + } if (ec) { BMCWEB_LOG_ERROR << "Read error: " << ec; } - // After establishing SSE stream, Reading data on this - // stream means client is disobeys the SSE protocol. - // Read the data to avoid buffer attacks and close connection. - - self->close("Close SSE connection"); + close("Close SSE connection"); } void doWrite() { - onTimeout(); - if (doingWrite) { return; @@ -192,18 +155,25 @@ class ConnectionImpl : public Connection BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out"; return; } + startTimeout(); doingWrite = true; adaptor.async_write_some( inputBuffer.data(), std::bind_front(&ConnectionImpl::doWriteCallback, this, - shared_from_this())); + weak_from_this())); } - void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, + void doWriteCallback(const std::weak_ptr<Connection>& weak, const boost::beast::error_code& ec, - const size_t bytesTransferred) + size_t bytesTransferred) { + auto self = weak.lock(); + if (self == nullptr) + { + return; + } + timer.cancel(); doingWrite = false; inputBuffer.consume(bytesTransferred); @@ -226,48 +196,6 @@ class ConnectionImpl : public Connection doWrite(); } - void completeRequest(crow::Response& thisRes) override - { - auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); - asyncResp->res = std::move(thisRes); - - if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty()) - { - asyncResp->res.addHeader(boost::beast::http::field::content_type, - "application/json"); - asyncResp->res.body() = asyncResp->res.jsonValue.dump( - 2, ' ', true, nlohmann::json::error_handler_t::replace); - } - - asyncResp->res.preparePayload(); - - serializer.emplace(*asyncResp->res.stringResponse); - - boost::beast::http::async_write_some( - adaptor, *serializer, - std::bind_front(&ConnectionImpl::completeRequestCallback, this, - shared_from_this())); - } - - void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/, - const boost::system::error_code& ec, - std::size_t bytesTransferred) - { - auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); - BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred - << " bytes"; - if (ec) - { - BMCWEB_LOG_DEBUG << this << " from async_write failed"; - return; - } - - BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid"; - serializer.reset(); - close("Request invalid"); - asyncResp->res.releaseCompleteRequestHandler(); - } - void sendEvent(std::string_view id, std::string_view msg) override { if (msg.empty()) @@ -276,19 +204,18 @@ class ConnectionImpl : public Connection return; } - dataFormat(id); + dataFormat(id, msg); doWrite(); } - void dataFormat(std::string_view id) + void dataFormat(std::string_view id, std::string_view msg) { - std::string_view msg; std::string rawData; if (!id.empty()) { rawData += "id: "; - rawData.append(id.begin(), id.end()); + rawData.append(id); rawData += "\n"; } @@ -308,9 +235,8 @@ class ConnectionImpl : public Connection inputBuffer.commit(rawData.size()); } - void onTimeout() + void startTimeout() { - boost::asio::steady_timer timer(ioc); std::weak_ptr<Connection> weakSelf = weak_from_this(); timer.expires_after(std::chrono::seconds(30)); timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, @@ -318,7 +244,7 @@ class ConnectionImpl : public Connection } void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, - const boost::system::error_code ec) + const boost::system::error_code& ec) { std::shared_ptr<Connection> self = weakSelf.lock(); if (!self) @@ -346,23 +272,18 @@ class ConnectionImpl : public Connection private: Adaptor adaptor; - boost::beast::multi_buffer outputBuffer; boost::beast::multi_buffer inputBuffer; std::optional<boost::beast::http::response_serializer< - boost::beast::http::string_body>> + boost::beast::http::buffer_body>> serializer; boost::asio::io_context& ioc = crow::connections::systemBus->get_io_context(); + boost::asio::steady_timer timer; bool doingWrite = false; - std::optional< - boost::beast::http::request_parser<boost::beast::http::string_body>> - parser; - - std::function<void(std::shared_ptr<Connection>&, const crow::Request&, - const std::shared_ptr<bmcweb::AsyncResp>&)> - openHandler; - std::function<void(std::shared_ptr<Connection>&)> closeHandler; + + std::function<void(Connection&)> openHandler; + std::function<void(Connection&)> closeHandler; }; } // namespace sse_socket } // namespace crow |