summaryrefslogtreecommitdiff
path: root/http
diff options
context:
space:
mode:
authorEd Tanous <edtanous@google.com>2023-06-01 17:33:34 +0300
committerEd Tanous <ed@tanous.net>2023-06-01 23:43:11 +0300
commit6fde95fad082fa7d6fc54f2ef8584e06fb116d42 (patch)
tree31865ca598fcb00af96d5d33824cf53ae1df4f36 /http
parent88ada3bc05ea247b6c8a24db49ebfdd268c17f4d (diff)
downloadbmcweb-6fde95fad082fa7d6fc54f2ef8584e06fb116d42.tar.xz
Server-sent-event fixes
This makes several changes to server-sent events to allow it to merge to master. The routing system has been removed in leiu of using content-type eventstream detection. Timers have been added to the sse connections, and sse connections now rely on async_wait, rather than a full read. Tested: WIP Signed-off-by: Ed Tanous <edtanous@google.com> Change-Id: Id0ff0ebc2b3a795b3dba008e440556a9fdd882c2
Diffstat (limited to 'http')
-rw-r--r--http/app.hpp5
-rw-r--r--http/http_connection.hpp6
-rw-r--r--http/http_response.hpp8
-rw-r--r--http/routing.hpp32
-rw-r--r--http/server_sent_event.hpp189
5 files changed, 65 insertions, 175 deletions
diff --git a/http/app.hpp b/http/app.hpp
index 6388d84271..d3cf48c1b1 100644
--- a/http/app.hpp
+++ b/http/app.hpp
@@ -105,11 +105,6 @@ class App
router.validate();
}
- static bool isSseRoute(Request& req)
- {
- return Router::isSseRoute(req);
- }
-
void run()
{
validate();
diff --git a/http/http_connection.hpp b/http/http_connection.hpp
index 1b85c6b7b4..7ae22e9e49 100644
--- a/http/http_connection.hpp
+++ b/http/http_connection.hpp
@@ -243,12 +243,14 @@ class Connection :
[self(shared_from_this())](crow::Response& thisRes) {
self->completeRequest(thisRes);
});
-
+ bool isSse =
+ isContentTypeAllowed(req->getHeaderValue("Accept"),
+ http_helpers::ContentType::EventStream, false);
if ((thisReq.isUpgrade() &&
boost::iequals(
thisReq.getHeaderValue(boost::beast::http::field::upgrade),
"websocket")) ||
- (Handler::isSseRoute(*req)))
+ isSse)
{
asyncResp->res.setCompleteRequestHandler(
[self(shared_from_this())](crow::Response& thisRes) {
diff --git a/http/http_response.hpp b/http/http_response.hpp
index 06b693915f..1a4ef16d4e 100644
--- a/http/http_response.hpp
+++ b/http/http_response.hpp
@@ -16,18 +16,10 @@ namespace crow
template <typename Adaptor, typename Handler>
class Connection;
-namespace sse_socket
-{
-template <typename Adaptor>
-class ConnectionImpl;
-} // namespace sse_socket
-
struct Response
{
template <typename Adaptor, typename Handler>
friend class crow::Connection;
- template <typename Adaptor>
- friend class crow::sse_socket::ConnectionImpl;
using response_type =
boost::beast::http::response<boost::beast::http::string_body>;
diff --git a/http/routing.hpp b/http/routing.hpp
index ead3af9eca..ac1c310ae2 100644
--- a/http/routing.hpp
+++ b/http/routing.hpp
@@ -391,7 +391,7 @@ class SseSocketRule : public BaseRule
}
#ifndef BMCWEB_ENABLE_SSL
- void handleUpgrade(const Request& req,
+ void handleUpgrade(const Request& /*req*/,
const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
boost::asio::ip::tcp::socket&& adaptor) override
{
@@ -399,11 +399,11 @@ class SseSocketRule : public BaseRule
crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
myConnection = std::make_shared<
crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
- req, std::move(adaptor), openHandler, closeHandler);
+ std::move(adaptor), openHandler, closeHandler);
myConnection->start();
}
#else
- void handleUpgrade(const Request& req,
+ void handleUpgrade(const Request& /*req*/,
const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&&
adaptor) override
@@ -412,7 +412,7 @@ class SseSocketRule : public BaseRule
boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>
myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
- req, std::move(adaptor), openHandler, closeHandler);
+ std::move(adaptor), openHandler, closeHandler);
myConnection->start();
}
#endif
@@ -432,12 +432,8 @@ class SseSocketRule : public BaseRule
}
private:
- std::function<void(std::shared_ptr<crow::sse_socket::Connection>&,
- const crow::Request&,
- const std::shared_ptr<bmcweb::AsyncResp>&)>
- openHandler;
- std::function<void(std::shared_ptr<crow::sse_socket::Connection>&)>
- closeHandler;
+ std::function<void(crow::sse_socket::Connection&)> openHandler;
+ std::function<void(crow::sse_socket::Connection&)> closeHandler;
};
template <typename T>
@@ -461,13 +457,6 @@ struct RuleParameterTraits
return *p;
}
- self_t& name(std::string_view name) noexcept
- {
- self_t* self = static_cast<self_t*>(this);
- self->nameStr = name;
- return *self;
- }
-
self_t& methods(boost::beast::http::verb method)
{
self_t* self = static_cast<self_t*>(this);
@@ -1184,15 +1173,6 @@ class Router
return true;
}
- static bool isSseRoute(Request& req)
- {
- return std::any_of(sse_socket::sseRoutes.begin(),
- sse_socket::sseRoutes.end(),
- [&req](const char* sseRoute) {
- return (req.url().encoded_path() == sseRoute);
- });
- }
-
static bool
isUserPrivileged(Request& req,
const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
index 58659c84a0..02af0b7523 100644
--- a/http/server_sent_event.hpp
+++ b/http/server_sent_event.hpp
@@ -1,6 +1,5 @@
#pragma once
-#include "async_resolve.hpp"
-#include "async_resp.hpp"
+#include "dbus_singleton.hpp"
#include "http_request.hpp"
#include "http_response.hpp"
@@ -23,13 +22,10 @@ namespace crow
namespace sse_socket
{
-static constexpr const std::array<const char*, 1> sseRoutes = {
- "/redfish/v1/EventService/SSE"};
-
struct Connection : std::enable_shared_from_this<Connection>
{
public:
- explicit Connection(const crow::Request& reqIn) : req(reqIn) {}
+ Connection() = default;
Connection(const Connection&) = delete;
Connection(Connection&&) = delete;
@@ -38,26 +34,19 @@ struct Connection : std::enable_shared_from_this<Connection>
virtual ~Connection() = default;
virtual boost::asio::io_context& getIoContext() = 0;
- virtual void sendSSEHeader() = 0;
- virtual void completeRequest(crow::Response& thisRes) = 0;
virtual void close(std::string_view msg = "quit") = 0;
virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
-
- crow::Request req;
};
template <typename Adaptor>
class ConnectionImpl : public Connection
{
public:
- ConnectionImpl(
- const crow::Request& reqIn, Adaptor adaptorIn,
- std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
- const std::shared_ptr<bmcweb::AsyncResp>&)>
- openHandlerIn,
- std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) :
- Connection(reqIn),
- adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)),
+ ConnectionImpl(Adaptor&& adaptorIn,
+ std::function<void(Connection&)> openHandlerIn,
+ std::function<void(Connection&)> closeHandlerIn) :
+ adaptor(std::move(adaptorIn)),
+ timer(ioc), openHandler(std::move(openHandlerIn)),
closeHandler(std::move(closeHandlerIn))
{
BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this;
@@ -81,61 +70,47 @@ class ConnectionImpl : public Connection
void start()
{
- if (openHandler)
+ if (!openHandler)
{
- auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
- std::shared_ptr<Connection> self = this->shared_from_this();
-
- asyncResp->res.setCompleteRequestHandler(
- [self(shared_from_this())](crow::Response& thisRes) {
- if (thisRes.resultInt() != 200)
- {
- self->completeRequest(thisRes);
- }
- });
-
- openHandler(self, req, asyncResp);
+ BMCWEB_LOG_CRITICAL << "No open handler???";
+ return;
}
+ openHandler(*this);
}
void close(const std::string_view msg) override
{
- BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
- boost::beast::get_lowest_layer(adaptor).close();
-
// send notification to handler for cleanup
if (closeHandler)
{
- std::shared_ptr<Connection> self = shared_from_this();
- closeHandler(self);
+ closeHandler(*this);
}
+ BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
+ boost::beast::get_lowest_layer(adaptor).close();
}
- void sendSSEHeader() override
+ void sendSSEHeader()
{
BMCWEB_LOG_DEBUG << "Starting SSE connection";
- auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
using BodyType = boost::beast::http::buffer_body;
- auto response =
- std::make_shared<boost::beast::http::response<BodyType>>(
- boost::beast::http::status::ok, 11);
-
- serializer.emplace(*asyncResp->res.stringResponse);
-
- response->set(boost::beast::http::field::content_type,
- "text/event-stream");
- response->body().more = true;
+ boost::beast::http::response<BodyType> res(
+ boost::beast::http::status::ok, 11, BodyType{});
+ res.set(boost::beast::http::field::content_type, "text/event-stream");
+ res.body().more = true;
+ boost::beast::http::response_serializer<BodyType>& ser =
+ serializer.emplace(std::move(res));
boost::beast::http::async_write_header(
- adaptor, *serializer,
+ adaptor, ser,
std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
shared_from_this()));
}
void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
- const boost::beast::error_code& ec,
- const std::size_t& /*unused*/)
+ const boost::system::error_code& ec,
+ size_t /*bytesSent*/)
{
+ serializer.reset();
if (ec)
{
BMCWEB_LOG_ERROR << "Error sending header" << ec;
@@ -148,41 +123,29 @@ class ConnectionImpl : public Connection
// SSE stream header sent, So let us setup monitor.
// Any read data on this stream will be error in case of SSE.
- setupRead();
- }
- void setupRead()
- {
- std::weak_ptr<Connection> weakSelf = weak_from_this();
-
- boost::beast::http::async_read_some(
- adaptor, outputBuffer, *parser,
- std::bind_front(&ConnectionImpl::setupReadCallback, this,
- weak_from_this()));
+ adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
+ std::bind_front(&ConnectionImpl::afterReadError,
+ this, shared_from_this()));
}
- void setupReadCallback(const std::weak_ptr<Connection>& weakSelf,
- const boost::system::error_code& ec,
- size_t bytesRead)
+ void afterReadError(const std::shared_ptr<Connection>& /*self*/,
+ const boost::system::error_code& ec)
{
- std::shared_ptr<Connection> self = weakSelf.lock();
- BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes";
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ return;
+ }
if (ec)
{
BMCWEB_LOG_ERROR << "Read error: " << ec;
}
- // After establishing SSE stream, Reading data on this
- // stream means client is disobeys the SSE protocol.
- // Read the data to avoid buffer attacks and close connection.
-
- self->close("Close SSE connection");
+ close("Close SSE connection");
}
void doWrite()
{
- onTimeout();
-
if (doingWrite)
{
return;
@@ -192,18 +155,25 @@ class ConnectionImpl : public Connection
BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out";
return;
}
+ startTimeout();
doingWrite = true;
adaptor.async_write_some(
inputBuffer.data(),
std::bind_front(&ConnectionImpl::doWriteCallback, this,
- shared_from_this()));
+ weak_from_this()));
}
- void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
+ void doWriteCallback(const std::weak_ptr<Connection>& weak,
const boost::beast::error_code& ec,
- const size_t bytesTransferred)
+ size_t bytesTransferred)
{
+ auto self = weak.lock();
+ if (self == nullptr)
+ {
+ return;
+ }
+ timer.cancel();
doingWrite = false;
inputBuffer.consume(bytesTransferred);
@@ -226,48 +196,6 @@ class ConnectionImpl : public Connection
doWrite();
}
- void completeRequest(crow::Response& thisRes) override
- {
- auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
- asyncResp->res = std::move(thisRes);
-
- if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty())
- {
- asyncResp->res.addHeader(boost::beast::http::field::content_type,
- "application/json");
- asyncResp->res.body() = asyncResp->res.jsonValue.dump(
- 2, ' ', true, nlohmann::json::error_handler_t::replace);
- }
-
- asyncResp->res.preparePayload();
-
- serializer.emplace(*asyncResp->res.stringResponse);
-
- boost::beast::http::async_write_some(
- adaptor, *serializer,
- std::bind_front(&ConnectionImpl::completeRequestCallback, this,
- shared_from_this()));
- }
-
- void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/,
- const boost::system::error_code& ec,
- std::size_t bytesTransferred)
- {
- auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
- BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred
- << " bytes";
- if (ec)
- {
- BMCWEB_LOG_DEBUG << this << " from async_write failed";
- return;
- }
-
- BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid";
- serializer.reset();
- close("Request invalid");
- asyncResp->res.releaseCompleteRequestHandler();
- }
-
void sendEvent(std::string_view id, std::string_view msg) override
{
if (msg.empty())
@@ -276,19 +204,18 @@ class ConnectionImpl : public Connection
return;
}
- dataFormat(id);
+ dataFormat(id, msg);
doWrite();
}
- void dataFormat(std::string_view id)
+ void dataFormat(std::string_view id, std::string_view msg)
{
- std::string_view msg;
std::string rawData;
if (!id.empty())
{
rawData += "id: ";
- rawData.append(id.begin(), id.end());
+ rawData.append(id);
rawData += "\n";
}
@@ -308,9 +235,8 @@ class ConnectionImpl : public Connection
inputBuffer.commit(rawData.size());
}
- void onTimeout()
+ void startTimeout()
{
- boost::asio::steady_timer timer(ioc);
std::weak_ptr<Connection> weakSelf = weak_from_this();
timer.expires_after(std::chrono::seconds(30));
timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
@@ -318,7 +244,7 @@ class ConnectionImpl : public Connection
}
void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
- const boost::system::error_code ec)
+ const boost::system::error_code& ec)
{
std::shared_ptr<Connection> self = weakSelf.lock();
if (!self)
@@ -346,23 +272,18 @@ class ConnectionImpl : public Connection
private:
Adaptor adaptor;
- boost::beast::multi_buffer outputBuffer;
boost::beast::multi_buffer inputBuffer;
std::optional<boost::beast::http::response_serializer<
- boost::beast::http::string_body>>
+ boost::beast::http::buffer_body>>
serializer;
boost::asio::io_context& ioc =
crow::connections::systemBus->get_io_context();
+ boost::asio::steady_timer timer;
bool doingWrite = false;
- std::optional<
- boost::beast::http::request_parser<boost::beast::http::string_body>>
- parser;
-
- std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
- const std::shared_ptr<bmcweb::AsyncResp>&)>
- openHandler;
- std::function<void(std::shared_ptr<Connection>&)> closeHandler;
+
+ std::function<void(Connection&)> openHandler;
+ std::function<void(Connection&)> closeHandler;
};
} // namespace sse_socket
} // namespace crow