From 698d2615c5bc30ab7f89f11ed5659df8bf248ea0 Mon Sep 17 00:00:00 2001 From: AppaRao Puli Date: Tue, 16 Mar 2021 15:37:24 +0000 Subject: [PATCH 5/6] Add SSE style subscription support to eventservice This commit adds the SSE style eventservice subscription style event. Using this, end user can subscribe for Redfish event logs using GET on SSE usri from browser. URI: /redfish/v1/EventService/Subscriptions/SSE Tested: - From Browser did GET on above SSE URI and generated some Redfish event logs(power cycle) and saw redfish event logs streaming on browser. - After SSE registration, Check Subscription collections and GET on individual subscription and saw desired response. - Ran RedfishValidation and its passed. Signed-off-by: AppaRao Puli Change-Id: I7f4b7a34974080739c4ba968ed570489af0474de --- http/http_connection.hpp | 2 +- include/eventservice_sse.hpp | 75 +++++ .../include/event_service_manager.hpp | 77 ++++- redfish-core/include/server_sent_events.hpp | 291 ------------------ redfish-core/lib/event_service.hpp | 4 +- src/webserver_main.cpp | 2 + 6 files changed, 149 insertions(+), 302 deletions(-) create mode 100644 include/eventservice_sse.hpp delete mode 100644 redfish-core/include/server_sent_events.hpp diff --git a/http/http_connection.hpp b/http/http_connection.hpp index 2c8bf40..1ab776c 100644 --- a/http/http_connection.hpp +++ b/http/http_connection.hpp @@ -350,7 +350,7 @@ class Connection : boost::iequals(req->getHeaderValue( boost::beast::http::field::upgrade), "websocket")) || - (req->url == "/sse")) + (req->url == "/redfish/v1/EventService/Subscriptions/SSE")) { BMCWEB_LOG_DEBUG << "Request: " << this << " is getting upgraded"; diff --git a/include/eventservice_sse.hpp b/include/eventservice_sse.hpp new file mode 100644 index 0000000..6c98e6e --- /dev/null +++ b/include/eventservice_sse.hpp @@ -0,0 +1,75 @@ +#pragma once + +#include +#include + +namespace redfish +{ +namespace eventservice_sse +{ + +static bool createSubscription(std::shared_ptr& conn, + const crow::Request& req, crow::Response& res) +{ + if ((EventServiceManager::getInstance().getNumberOfSubscriptions() >= + maxNoOfSubscriptions) || + EventServiceManager::getInstance().getNumberOfSSESubscriptions() >= + maxNoOfSSESubscriptions) + { + BMCWEB_LOG_ERROR << "Max SSE subscriptions reached"; + messages::eventSubscriptionLimitExceeded(res); + res.end(); + return false; + } + BMCWEB_LOG_DEBUG << "Request query param size: " << req.urlParams.size(); + + std::shared_ptr subValue = + std::make_shared(std::move(conn)); + + // GET on this URI means, Its SSE subscriptionType. + subValue->subscriptionType = redfish::subscriptionTypeSSE; + + // TODO: parse $filter query params and fill config. + subValue->protocol = "Redfish"; + subValue->retryPolicy = "TerminateAfterRetries"; + subValue->eventFormatType = "Event"; + + std::string id = + redfish::EventServiceManager::getInstance().addSubscription(subValue, + false); + if (id.empty()) + { + messages::internalError(res); + res.end(); + return false; + } + + return true; +} + +static void deleteSubscription(std::shared_ptr& conn) +{ + redfish::EventServiceManager::getInstance().deleteSubscription(conn); +} + +inline void requestRoutes(App& app) +{ + BMCWEB_ROUTE(app, "/redfish/v1/EventService/Subscriptions/SSE") + .privileges({"ConfigureComponents", "ConfigureManager"}) + .serverSentEvent() + .onopen([](std::shared_ptr& conn, + const crow::Request& req, crow::Response& res) { + BMCWEB_LOG_DEBUG << "Connection " << conn << " opened."; + if (createSubscription(conn, req, res)) + { + // All success, lets send SSE haader + conn->sendSSEHeader(); + } + }) + .onclose([](std::shared_ptr& conn) { + BMCWEB_LOG_DEBUG << "Connection " << conn << " closed"; + deleteSubscription(conn); + }); +} +} // namespace eventservice_sse +} // namespace redfish diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp index 5821e2e..f4d57c2 100644 --- a/redfish-core/include/event_service_manager.hpp +++ b/redfish-core/include/event_service_manager.hpp @@ -23,13 +23,15 @@ #include #include +#include #include #include #include #include -#include +#include #include +#include #include #include #include @@ -46,9 +48,13 @@ using EventServiceConfig = std::tuple; static constexpr const char* eventFormatType = "Event"; static constexpr const char* metricReportFormatType = "MetricReport"; +static constexpr const char* subscriptionTypeSSE = "SSE"; static constexpr const char* eventServiceFile = "/var/lib/bmcweb/eventservice_config.json"; +static constexpr const uint8_t maxNoOfSubscriptions = 20; +static constexpr const uint8_t maxNoOfSSESubscriptions = 10; + #ifndef BMCWEB_ENABLE_REDFISH_DBUS_LOG_ENTRIES static std::optional inotifyConn; static constexpr const char* redfishEventLogDir = "/var/log"; @@ -391,11 +397,9 @@ class Subscription path, uriProto); } - Subscription(const std::shared_ptr& adaptor) : - eventSeqNum(1) - { - sseConn = std::make_shared(adaptor); - } + Subscription(const std::shared_ptr& adaptor) : + sseConn(adaptor), eventSeqNum(1) + {} ~Subscription() = default; @@ -420,7 +424,7 @@ class Subscription if (sseConn != nullptr) { - sseConn->sendData(eventSeqNum, msg); + sseConn->sendEvent(std::to_string(eventSeqNum), msg); } } @@ -510,6 +514,7 @@ class Subscription this->sendEvent( msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace)); + this->eventSeqNum++; } #endif @@ -565,14 +570,39 @@ class Subscription return eventSeqNum; } + void setSubscriptionId(const std::string& id) + { + BMCWEB_LOG_DEBUG << "Subscription ID: " << id; + subId = id; + } + + std::string getSubscriptionId() + { + return subId; + } + + std::optional + getSubscriptionId(const std::shared_ptr& connPtr) + { + if (sseConn != nullptr && connPtr == sseConn) + { + BMCWEB_LOG_DEBUG << __FUNCTION__ + << " conn matched, subId: " << subId; + return subId; + } + + return std::nullopt; + } + private: + std::shared_ptr sseConn = nullptr; uint64_t eventSeqNum; std::string host; std::string port; std::string path; std::string uriProto; std::shared_ptr conn = nullptr; - std::shared_ptr sseConn = nullptr; + std::string subId; }; static constexpr const bool defaultEnabledState = true; @@ -963,6 +993,8 @@ class EventServiceManager subValue->updateRetryConfig(retryAttempts, retryTimeoutInterval); subValue->updateRetryPolicy(); + // Set Subscription ID for back trace + subValue->setSubscriptionId(id); return id; } @@ -987,11 +1019,40 @@ class EventServiceManager } } + void deleteSubscription(const std::shared_ptr& connPtr) + { + for (const auto& it : this->subscriptionsMap) + { + std::shared_ptr entry = it.second; + if (entry->subscriptionType == subscriptionTypeSSE) + { + std::optional id = + entry->getSubscriptionId(connPtr); + if (id) + { + deleteSubscription(*id); + return; + } + } + } + } + size_t getNumberOfSubscriptions() { return subscriptionsMap.size(); } + size_t getNumberOfSSESubscriptions() const + { + auto count = std::count_if( + subscriptionsMap.begin(), subscriptionsMap.end(), + [this](const std::pair>& + entry) { + return (entry.second->subscriptionType == subscriptionTypeSSE); + }); + return static_cast(count); + } + std::vector getAllIDs() { std::vector idList; diff --git a/redfish-core/include/server_sent_events.hpp b/redfish-core/include/server_sent_events.hpp deleted file mode 100644 index 578fa19..0000000 --- a/redfish-core/include/server_sent_events.hpp +++ /dev/null @@ -1,291 +0,0 @@ - -/* -// Copyright (c) 2020 Intel Corporation -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -*/ -#pragma once -#include "node.hpp" - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace crow -{ - -static constexpr uint8_t maxReqQueueSize = 50; - -enum class SseConnState -{ - startInit, - initInProgress, - initialized, - initFailed, - sendInProgress, - sendFailed, - idle, - suspended, - closed -}; - -class ServerSentEvents : public std::enable_shared_from_this -{ - private: - std::shared_ptr sseConn; - std::queue> requestDataQueue; - std::string outBuffer; - SseConnState state; - int retryCount; - int maxRetryAttempts; - - void sendEvent(const std::string& id, const std::string& msg) - { - if (msg.empty()) - { - BMCWEB_LOG_DEBUG << "Empty data, bailing out."; - return; - } - - if (state == SseConnState::sendInProgress) - { - return; - } - state = SseConnState::sendInProgress; - - if (!id.empty()) - { - outBuffer += "id: "; - outBuffer.append(id.begin(), id.end()); - outBuffer += "\n"; - } - - outBuffer += "data: "; - for (char character : msg) - { - outBuffer += character; - if (character == '\n') - { - outBuffer += "data: "; - } - } - outBuffer += "\n\n"; - - doWrite(); - } - - void doWrite() - { - if (outBuffer.empty()) - { - BMCWEB_LOG_DEBUG << "All data sent successfully."; - // Send is successful, Lets remove data from queue - // check for next request data in queue. - requestDataQueue.pop(); - state = SseConnState::idle; - checkQueue(); - return; - } - - sseConn->async_write_some( - boost::asio::buffer(outBuffer.data(), outBuffer.size()), - [self(shared_from_this())]( - boost::beast::error_code ec, - [[maybe_unused]] const std::size_t& bytesTransferred) { - self->outBuffer.erase(0, bytesTransferred); - - if (ec == boost::asio::error::eof) - { - // Send is successful, Lets remove data from queue - // check for next request data in queue. - self->requestDataQueue.pop(); - self->state = SseConnState::idle; - self->checkQueue(); - return; - } - - if (ec) - { - BMCWEB_LOG_ERROR << "async_write_some() failed: " - << ec.message(); - self->state = SseConnState::sendFailed; - self->checkQueue(); - return; - } - BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: " - << bytesTransferred; - - self->doWrite(); - }); - } - - void startSSE() - { - if (state == SseConnState::initInProgress) - { - return; - } - state = SseConnState::initInProgress; - - BMCWEB_LOG_DEBUG << "starting SSE connection "; - using BodyType = boost::beast::http::buffer_body; - auto response = - std::make_shared>( - boost::beast::http::status::ok, 11); - auto serializer = - std::make_shared>( - *response); - - // TODO: Add hostname in http header. - response->set(boost::beast::http::field::server, "iBMC"); - response->set(boost::beast::http::field::content_type, - "text/event-stream"); - response->body().data = nullptr; - response->body().size = 0; - response->body().more = true; - - boost::beast::http::async_write_header( - *sseConn, *serializer, - [this, response, - serializer](const boost::beast::error_code& ec, - [[maybe_unused]] const std::size_t& bytesTransferred) { - if (ec) - { - BMCWEB_LOG_ERROR << "Error sending header" << ec; - state = SseConnState::initFailed; - checkQueue(); - return; - } - - BMCWEB_LOG_DEBUG << "startSSE Header sent."; - state = SseConnState::initialized; - checkQueue(); - }); - } - - void checkQueue(const bool newRecord = false) - { - if (requestDataQueue.empty()) - { - BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n"; - return; - } - - if (retryCount >= maxRetryAttempts) - { - BMCWEB_LOG_ERROR << "Maximum number of retries is reached."; - - // Clear queue. - while (!requestDataQueue.empty()) - { - requestDataQueue.pop(); - } - - // TODO: Take 'DeliveryRetryPolicy' action. - // For now, doing 'SuspendRetries' action. - state = SseConnState::suspended; - return; - } - - if ((state == SseConnState::initFailed) || - (state == SseConnState::sendFailed)) - { - if (newRecord) - { - // We are already running async wait and retry. - // Since record is added to queue, it gets the - // turn in FIFO. - return; - } - - retryCount++; - // TODO: Perform async wait for retryTimeoutInterval before proceed. - } - else - { - // reset retry count. - retryCount = 0; - } - - switch (state) - { - case SseConnState::initInProgress: - case SseConnState::sendInProgress: - case SseConnState::suspended: - case SseConnState::startInit: - case SseConnState::closed: - // do nothing - break; - case SseConnState::initFailed: - { - startSSE(); - break; - } - case SseConnState::initialized: - case SseConnState::idle: - case SseConnState::sendFailed: - { - std::pair reqData = - requestDataQueue.front(); - sendEvent(std::to_string(reqData.first), reqData.second); - break; - } - } - - return; - } - - public: - ServerSentEvents(const ServerSentEvents&) = delete; - ServerSentEvents& operator=(const ServerSentEvents&) = delete; - ServerSentEvents(ServerSentEvents&&) = delete; - ServerSentEvents& operator=(ServerSentEvents&&) = delete; - - ServerSentEvents(const std::shared_ptr& adaptor) : - sseConn(adaptor), state(SseConnState::startInit), retryCount(0), - maxRetryAttempts(5) - { - startSSE(); - } - - ~ServerSentEvents() = default; - - void sendData(const uint64_t& id, const std::string& data) - { - if (state == SseConnState::suspended) - { - return; - } - - if (requestDataQueue.size() <= maxReqQueueSize) - { - requestDataQueue.push(std::pair(id, data)); - checkQueue(true); - } - else - { - BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; - } - } -}; - -} // namespace crow diff --git a/redfish-core/lib/event_service.hpp b/redfish-core/lib/event_service.hpp index be6f04d..1875ec9 100644 --- a/redfish-core/lib/event_service.hpp +++ b/redfish-core/lib/event_service.hpp @@ -34,8 +34,6 @@ static constexpr const std::array supportedResourceTypes = { "Task"}; #endif -static constexpr const uint8_t maxNoOfSubscriptions = 20; - class EventService : public Node { public: @@ -59,6 +57,8 @@ class EventService : public Node {"@odata.type", "#EventService.v1_5_0.EventService"}, {"Id", "EventService"}, {"Name", "Event Service"}, + {"ServerSentEventUri", + "/redfish/v1/EventService/Subscriptions/SSE"}, {"Subscriptions", {{"@odata.id", "/redfish/v1/EventService/Subscriptions"}}}, {"Actions", diff --git a/src/webserver_main.cpp b/src/webserver_main.cpp index 902c32b..c871faa 100644 --- a/src/webserver_main.cpp +++ b/src/webserver_main.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -83,6 +84,7 @@ int main(int /*argc*/, char** /*argv*/) #ifdef BMCWEB_ENABLE_REDFISH redfish::requestRoutes(app); redfish::RedfishService redfish(app); + redfish::eventservice_sse::requestRoutes(app); // Create EventServiceManager instance and initialize Config redfish::EventServiceManager::getInstance(); -- 2.17.1