diff options
Diffstat (limited to 'http')
-rw-r--r-- | http/app.hpp | 5 | ||||
-rw-r--r-- | http/http_connection.hpp | 9 | ||||
-rw-r--r-- | http/http_response.hpp | 8 | ||||
-rw-r--r-- | http/routing.hpp | 91 | ||||
-rw-r--r-- | http/server_sent_event.hpp | 368 |
5 files changed, 477 insertions, 4 deletions
diff --git a/http/app.hpp b/http/app.hpp index d3cf48c1b1..6388d84271 100644 --- a/http/app.hpp +++ b/http/app.hpp @@ -105,6 +105,11 @@ class App router.validate(); } + static bool isSseRoute(Request& req) + { + return Router::isSseRoute(req); + } + void run() { validate(); diff --git a/http/http_connection.hpp b/http/http_connection.hpp index 7b66ac851b..1b85c6b7b4 100644 --- a/http/http_connection.hpp +++ b/http/http_connection.hpp @@ -244,10 +244,11 @@ class Connection : self->completeRequest(thisRes); }); - if (thisReq.isUpgrade() && - boost::iequals( - thisReq.getHeaderValue(boost::beast::http::field::upgrade), - "websocket")) + if ((thisReq.isUpgrade() && + boost::iequals( + thisReq.getHeaderValue(boost::beast::http::field::upgrade), + "websocket")) || + (Handler::isSseRoute(*req))) { asyncResp->res.setCompleteRequestHandler( [self(shared_from_this())](crow::Response& thisRes) { diff --git a/http/http_response.hpp b/http/http_response.hpp index 1a4ef16d4e..06b693915f 100644 --- a/http/http_response.hpp +++ b/http/http_response.hpp @@ -16,10 +16,18 @@ namespace crow template <typename Adaptor, typename Handler> class Connection; +namespace sse_socket +{ +template <typename Adaptor> +class ConnectionImpl; +} // namespace sse_socket + struct Response { template <typename Adaptor, typename Handler> friend class crow::Connection; + template <typename Adaptor> + friend class crow::sse_socket::ConnectionImpl; using response_type = boost::beast::http::response<boost::beast::http::string_body>; diff --git a/http/routing.hpp b/http/routing.hpp index eb87e72b5d..ead3af9eca 100644 --- a/http/routing.hpp +++ b/http/routing.hpp @@ -8,6 +8,7 @@ #include "http_response.hpp" #include "logging.hpp" #include "privileges.hpp" +#include "server_sent_event.hpp" #include "sessions.hpp" #include "utility.hpp" #include "utils/dbus_utils.hpp" @@ -373,6 +374,72 @@ class WebSocketRule : public BaseRule std::function<void(crow::websocket::Connection&)> errorHandler; }; +class SseSocketRule : public BaseRule +{ + using self_t = SseSocketRule; + + public: + explicit SseSocketRule(const std::string& ruleIn) : BaseRule(ruleIn) {} + + void validate() override {} + + void handle(const Request& /*req*/, + const std::shared_ptr<bmcweb::AsyncResp>& asyncResp, + const std::vector<std::string>& /*params*/) override + { + asyncResp->res.result(boost::beast::http::status::not_found); + } + +#ifndef BMCWEB_ENABLE_SSL + void handleUpgrade(const Request& req, + const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/, + boost::asio::ip::tcp::socket&& adaptor) override + { + std::shared_ptr< + crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>> + myConnection = std::make_shared< + crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>( + req, std::move(adaptor), openHandler, closeHandler); + myConnection->start(); + } +#else + void handleUpgrade(const Request& req, + const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/, + boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&& + adaptor) override + { + std::shared_ptr<crow::sse_socket::ConnectionImpl< + boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>> + myConnection = std::make_shared<crow::sse_socket::ConnectionImpl< + boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>( + req, std::move(adaptor), openHandler, closeHandler); + myConnection->start(); + } +#endif + + template <typename Func> + self_t& onopen(Func f) + { + openHandler = f; + return *this; + } + + template <typename Func> + self_t& onclose(Func f) + { + closeHandler = f; + return *this; + } + + private: + std::function<void(std::shared_ptr<crow::sse_socket::Connection>&, + const crow::Request&, + const std::shared_ptr<bmcweb::AsyncResp>&)> + openHandler; + std::function<void(std::shared_ptr<crow::sse_socket::Connection>&)> + closeHandler; +}; + template <typename T> struct RuleParameterTraits { @@ -386,6 +453,21 @@ struct RuleParameterTraits return *p; } + SseSocketRule& serverSentEvent() + { + self_t* self = static_cast<self_t*>(this); + SseSocketRule* p = new SseSocketRule(self->rule); + self->ruleToUpgrade.reset(p); + return *p; + } + + self_t& name(std::string_view name) noexcept + { + self_t* self = static_cast<self_t*>(this); + self->nameStr = name; + return *self; + } + self_t& methods(boost::beast::http::verb method) { self_t* self = static_cast<self_t*>(this); @@ -1102,6 +1184,15 @@ class Router return true; } + static bool isSseRoute(Request& req) + { + return std::any_of(sse_socket::sseRoutes.begin(), + sse_socket::sseRoutes.end(), + [&req](const char* sseRoute) { + return (req.url().encoded_path() == sseRoute); + }); + } + static bool isUserPrivileged(Request& req, const std::shared_ptr<bmcweb::AsyncResp>& asyncResp, diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp new file mode 100644 index 0000000000..58659c84a0 --- /dev/null +++ b/http/server_sent_event.hpp @@ -0,0 +1,368 @@ +#pragma once +#include "async_resolve.hpp" +#include "async_resp.hpp" +#include "http_request.hpp" +#include "http_response.hpp" + +#include <boost/algorithm/string/predicate.hpp> +#include <boost/asio/buffer.hpp> +#include <boost/asio/steady_timer.hpp> +#include <boost/beast/core/multi_buffer.hpp> +#include <boost/beast/http/buffer_body.hpp> +#include <boost/beast/websocket.hpp> + +#include <array> +#include <functional> + +#ifdef BMCWEB_ENABLE_SSL +#include <boost/beast/websocket/ssl.hpp> +#endif + +namespace crow +{ + +namespace sse_socket +{ +static constexpr const std::array<const char*, 1> sseRoutes = { + "/redfish/v1/EventService/SSE"}; + +struct Connection : std::enable_shared_from_this<Connection> +{ + public: + explicit Connection(const crow::Request& reqIn) : req(reqIn) {} + + Connection(const Connection&) = delete; + Connection(Connection&&) = delete; + Connection& operator=(const Connection&) = delete; + Connection& operator=(const Connection&&) = delete; + virtual ~Connection() = default; + + virtual boost::asio::io_context& getIoContext() = 0; + virtual void sendSSEHeader() = 0; + virtual void completeRequest(crow::Response& thisRes) = 0; + virtual void close(std::string_view msg = "quit") = 0; + virtual void sendEvent(std::string_view id, std::string_view msg) = 0; + + crow::Request req; +}; + +template <typename Adaptor> +class ConnectionImpl : public Connection +{ + public: + ConnectionImpl( + const crow::Request& reqIn, Adaptor adaptorIn, + std::function<void(std::shared_ptr<Connection>&, const crow::Request&, + const std::shared_ptr<bmcweb::AsyncResp>&)> + openHandlerIn, + std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) : + Connection(reqIn), + adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)), + closeHandler(std::move(closeHandlerIn)) + { + BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this; + } + + ConnectionImpl(const ConnectionImpl&) = delete; + ConnectionImpl(const ConnectionImpl&&) = delete; + ConnectionImpl& operator=(const ConnectionImpl&) = delete; + ConnectionImpl& operator=(const ConnectionImpl&&) = delete; + + ~ConnectionImpl() override + { + BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this; + } + + boost::asio::io_context& getIoContext() override + { + return static_cast<boost::asio::io_context&>( + adaptor.get_executor().context()); + } + + void start() + { + if (openHandler) + { + auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); + std::shared_ptr<Connection> self = this->shared_from_this(); + + asyncResp->res.setCompleteRequestHandler( + [self(shared_from_this())](crow::Response& thisRes) { + if (thisRes.resultInt() != 200) + { + self->completeRequest(thisRes); + } + }); + + openHandler(self, req, asyncResp); + } + } + + void close(const std::string_view msg) override + { + BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg; + boost::beast::get_lowest_layer(adaptor).close(); + + // send notification to handler for cleanup + if (closeHandler) + { + std::shared_ptr<Connection> self = shared_from_this(); + closeHandler(self); + } + } + + void sendSSEHeader() override + { + BMCWEB_LOG_DEBUG << "Starting SSE connection"; + auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); + using BodyType = boost::beast::http::buffer_body; + auto response = + std::make_shared<boost::beast::http::response<BodyType>>( + boost::beast::http::status::ok, 11); + + serializer.emplace(*asyncResp->res.stringResponse); + + response->set(boost::beast::http::field::content_type, + "text/event-stream"); + response->body().more = true; + + boost::beast::http::async_write_header( + adaptor, *serializer, + std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, + shared_from_this())); + } + + void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, + const boost::beast::error_code& ec, + const std::size_t& /*unused*/) + { + if (ec) + { + BMCWEB_LOG_ERROR << "Error sending header" << ec; + close("async_write_header failed"); + return; + } + BMCWEB_LOG_DEBUG << "SSE header sent - Connection established"; + + serializer.reset(); + + // SSE stream header sent, So let us setup monitor. + // Any read data on this stream will be error in case of SSE. + setupRead(); + } + + void setupRead() + { + std::weak_ptr<Connection> weakSelf = weak_from_this(); + + boost::beast::http::async_read_some( + adaptor, outputBuffer, *parser, + std::bind_front(&ConnectionImpl::setupReadCallback, this, + weak_from_this())); + } + + void setupReadCallback(const std::weak_ptr<Connection>& weakSelf, + const boost::system::error_code& ec, + size_t bytesRead) + { + std::shared_ptr<Connection> self = weakSelf.lock(); + BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes"; + if (ec) + { + BMCWEB_LOG_ERROR << "Read error: " << ec; + } + + // After establishing SSE stream, Reading data on this + // stream means client is disobeys the SSE protocol. + // Read the data to avoid buffer attacks and close connection. + + self->close("Close SSE connection"); + } + + void doWrite() + { + onTimeout(); + + if (doingWrite) + { + return; + } + if (inputBuffer.size() == 0) + { + BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out"; + return; + } + doingWrite = true; + + adaptor.async_write_some( + inputBuffer.data(), + std::bind_front(&ConnectionImpl::doWriteCallback, this, + shared_from_this())); + } + + void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, + const boost::beast::error_code& ec, + const size_t bytesTransferred) + { + doingWrite = false; + inputBuffer.consume(bytesTransferred); + + if (ec == boost::asio::error::eof) + { + BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed"; + close("SSE stream closed"); + return; + } + + if (ec) + { + BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message(); + close("async_write_some failed"); + return; + } + BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: " + << bytesTransferred; + + doWrite(); + } + + void completeRequest(crow::Response& thisRes) override + { + auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); + asyncResp->res = std::move(thisRes); + + if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty()) + { + asyncResp->res.addHeader(boost::beast::http::field::content_type, + "application/json"); + asyncResp->res.body() = asyncResp->res.jsonValue.dump( + 2, ' ', true, nlohmann::json::error_handler_t::replace); + } + + asyncResp->res.preparePayload(); + + serializer.emplace(*asyncResp->res.stringResponse); + + boost::beast::http::async_write_some( + adaptor, *serializer, + std::bind_front(&ConnectionImpl::completeRequestCallback, this, + shared_from_this())); + } + + void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/, + const boost::system::error_code& ec, + std::size_t bytesTransferred) + { + auto asyncResp = std::make_shared<bmcweb::AsyncResp>(); + BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred + << " bytes"; + if (ec) + { + BMCWEB_LOG_DEBUG << this << " from async_write failed"; + return; + } + + BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid"; + serializer.reset(); + close("Request invalid"); + asyncResp->res.releaseCompleteRequestHandler(); + } + + void sendEvent(std::string_view id, std::string_view msg) override + { + if (msg.empty()) + { + BMCWEB_LOG_DEBUG << "Empty data, bailing out."; + return; + } + + dataFormat(id); + + doWrite(); + } + + void dataFormat(std::string_view id) + { + std::string_view msg; + std::string rawData; + if (!id.empty()) + { + rawData += "id: "; + rawData.append(id.begin(), id.end()); + rawData += "\n"; + } + + rawData += "data: "; + for (char character : msg) + { + rawData += character; + if (character == '\n') + { + rawData += "data: "; + } + } + rawData += "\n\n"; + + boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()), + boost::asio::buffer(rawData)); + inputBuffer.commit(rawData.size()); + } + + void onTimeout() + { + boost::asio::steady_timer timer(ioc); + std::weak_ptr<Connection> weakSelf = weak_from_this(); + timer.expires_after(std::chrono::seconds(30)); + timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, + this, weak_from_this())); + } + + void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, + const boost::system::error_code ec) + { + std::shared_ptr<Connection> self = weakSelf.lock(); + if (!self) + { + BMCWEB_LOG_CRITICAL << self << " Failed to capture connection"; + return; + } + + if (ec == boost::asio::error::operation_aborted) + { + BMCWEB_LOG_DEBUG << "operation aborted"; + // Canceled wait means the path succeeeded. + return; + } + if (ec) + { + BMCWEB_LOG_CRITICAL << self << " timer failed " << ec; + } + + BMCWEB_LOG_WARNING << self << "Connection timed out, closing"; + + self->close("closing connection"); + } + + private: + Adaptor adaptor; + + boost::beast::multi_buffer outputBuffer; + boost::beast::multi_buffer inputBuffer; + + std::optional<boost::beast::http::response_serializer< + boost::beast::http::string_body>> + serializer; + boost::asio::io_context& ioc = + crow::connections::systemBus->get_io_context(); + bool doingWrite = false; + std::optional< + boost::beast::http::request_parser<boost::beast::http::string_body>> + parser; + + std::function<void(std::shared_ptr<Connection>&, const crow::Request&, + const std::shared_ptr<bmcweb::AsyncResp>&)> + openHandler; + std::function<void(std::shared_ptr<Connection>&)> closeHandler; +}; +} // namespace sse_socket +} // namespace crow |