From 799e47842e179f7c752712004f0e96d3219eee11 Mon Sep 17 00:00:00 2001 From: AppaRao Puli Date: Tue, 16 Mar 2021 15:37:24 +0000 Subject: [PATCH] Add SSE style subscription support to eventservice This commit adds the SSE style eventservice subscription style event. Using this, end user can subscribe for Redfish event logs using GET on SSE usri from browser. URI: /redfish/v1/EventService/Subscriptions/SSE Tested: - From Browser did GET on above SSE URI and generated some Redfish event logs(power cycle) and saw redfish event logs streaming on browser. - After SSE registration, Check Subscription collections and GET on individual subscription and saw desired response. - Ran RedfishValidation and its passed. Change-Id: I7f4b7a34974080739c4ba968ed570489af0474de Signed-off-by: AppaRao Puli Signed-off-by: P Dheeraj Srujan Kumar --- http/http_connection.hpp | 2 +- include/eventservice_sse.hpp | 75 +++++ .../include/event_service_manager.hpp | 109 +++++-- redfish-core/include/server_sent_events.hpp | 290 ------------------ redfish-core/lib/event_service.hpp | 8 +- src/webserver_main.cpp | 2 + 6 files changed, 164 insertions(+), 322 deletions(-) create mode 100644 include/eventservice_sse.hpp delete mode 100644 redfish-core/include/server_sent_events.hpp diff --git a/http/http_connection.hpp b/http/http_connection.hpp index a1bbfce..2d08501 100644 --- a/http/http_connection.hpp +++ b/http/http_connection.hpp @@ -382,7 +382,7 @@ class Connection : boost::iequals( thisReq.getHeaderValue(boost::beast::http::field::upgrade), "websocket")) || - (req->url == "/sse")) + (req->url == "/redfish/v1/EventService/Subscriptions/SSE")) { BMCWEB_LOG_DEBUG << "Request: " << this << " is getting upgraded"; handler->handleUpgrade(thisReq, res, std::move(adaptor)); diff --git a/include/eventservice_sse.hpp b/include/eventservice_sse.hpp new file mode 100644 index 0000000..14daf00 --- /dev/null +++ b/include/eventservice_sse.hpp @@ -0,0 +1,75 @@ +#pragma once + +#include +#include + +namespace redfish +{ +namespace eventservice_sse +{ + +static bool createSubscription(std::shared_ptr& conn, + const crow::Request& req, crow::Response& res) +{ + if ((EventServiceManager::getInstance().getNumberOfSubscriptions() >= + maxNoOfSubscriptions) || + EventServiceManager::getInstance().getNumberOfSSESubscriptions() >= + maxNoOfSSESubscriptions) + { + BMCWEB_LOG_ERROR << "Max SSE subscriptions reached"; + messages::eventSubscriptionLimitExceeded(res); + res.end(); + return false; + } + BMCWEB_LOG_DEBUG << "Request query param size: " << req.urlParams.size(); + + std::shared_ptr subValue = + std::make_shared(std::move(conn)); + + // GET on this URI means, Its SSE subscriptionType. + subValue->subscriptionType = redfish::subscriptionTypeSSE; + + // TODO: parse $filter query params and fill config. + subValue->protocol = "Redfish"; + subValue->retryPolicy = "TerminateAfterRetries"; + subValue->eventFormatType = "Event"; + + std::string id = + redfish::EventServiceManager::getInstance().addSubscription(subValue, + false); + if (id.empty()) + { + messages::internalError(res); + res.end(); + return false; + } + + return true; +} + +static void deleteSubscription(std::shared_ptr& conn) +{ + redfish::EventServiceManager::getInstance().deleteSubscription(conn); +} + +inline void requestRoutes(App& app) +{ + BMCWEB_ROUTE(app, "/redfish/v1/EventService/Subscriptions/SSE") + .privileges({{"ConfigureComponents", "ConfigureManager"}}) + .serverSentEvent() + .onopen([](std::shared_ptr& conn, + const crow::Request& req, crow::Response& res) { + BMCWEB_LOG_DEBUG << "Connection " << conn << " opened."; + if (createSubscription(conn, req, res)) + { + // All success, lets send SSE haader + conn->sendSSEHeader(); + } + }) + .onclose([](std::shared_ptr& conn) { + BMCWEB_LOG_DEBUG << "Connection " << conn << " closed"; + deleteSubscription(conn); + }); +} +} // namespace eventservice_sse +} // namespace redfish diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp index 3f398d7..dd833ce 100644 --- a/redfish-core/include/event_service_manager.hpp +++ b/redfish-core/include/event_service_manager.hpp @@ -22,15 +22,17 @@ #include #include +#include #include #include #include #include #include #include -#include +#include #include +#include #include #include #include @@ -46,9 +48,27 @@ using ReadingsObjType = static constexpr const char* eventFormatType = "Event"; static constexpr const char* metricReportFormatType = "MetricReport"; +static constexpr const char* subscriptionTypeSSE = "SSE"; static constexpr const char* eventServiceFile = "/var/lib/bmcweb/eventservice_config.json"; +static constexpr const uint8_t maxNoOfSubscriptions = 20; +static constexpr const uint8_t maxNoOfSSESubscriptions = 10; + +#ifndef BMCWEB_ENABLE_REDFISH_DBUS_LOG_ENTRIES +static std::optional inotifyConn; +static constexpr const char* redfishEventLogDir = "/var/log"; +static constexpr const char* redfishEventLogFile = "/var/log/redfish"; +static constexpr const size_t iEventSize = sizeof(inotify_event); +static int inotifyFd = -1; +static int dirWatchDesc = -1; +static int fileWatchDesc = -1; + +// +using EventLogObjectsType = + std::tuple>; + namespace message_registries { inline boost::beast::span @@ -68,24 +88,6 @@ inline boost::beast::span } return boost::beast::span(openbmc::registry); } -} // namespace message_registries - -#ifndef BMCWEB_ENABLE_REDFISH_DBUS_LOG_ENTRIES -static std::optional inotifyConn; -static constexpr const char* redfishEventLogDir = "/var/log"; -static constexpr const char* redfishEventLogFile = "/var/log/redfish"; -static constexpr const size_t iEventSize = sizeof(inotify_event); -static int inotifyFd = -1; -static int dirWatchDesc = -1; -static int fileWatchDesc = -1; - -// -using EventLogObjectsType = - std::tuple>; - -namespace message_registries -{ static const Message* getMsgFromRegistry(const std::string& messageKey, const boost::beast::span& registry) @@ -388,11 +390,9 @@ class Subscription : public persistent_data::UserSubscription path, uriProto); } - Subscription(const std::shared_ptr& adaptor) : - eventSeqNum(1) - { - sseConn = std::make_shared(adaptor); - } + Subscription(const std::shared_ptr& adaptor) : + sseConn(adaptor), eventSeqNum(1) + {} ~Subscription() = default; @@ -417,7 +417,7 @@ class Subscription : public persistent_data::UserSubscription if (sseConn != nullptr) { - sseConn->sendData(eventSeqNum, msg); + sseConn->sendEvent(std::to_string(eventSeqNum), msg); } } @@ -508,6 +508,7 @@ class Subscription : public persistent_data::UserSubscription this->sendEvent( msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace)); + this->eventSeqNum++; } #endif @@ -578,14 +579,39 @@ class Subscription : public persistent_data::UserSubscription return eventSeqNum; } + void setSubscriptionId(const std::string& id) + { + BMCWEB_LOG_DEBUG << "Subscription ID: " << id; + subId = id; + } + + std::string getSubscriptionId() + { + return subId; + } + + std::optional + getSubscriptionId(const std::shared_ptr& connPtr) + { + if (sseConn != nullptr && connPtr == sseConn) + { + BMCWEB_LOG_DEBUG << __FUNCTION__ + << " conn matched, subId: " << subId; + return subId; + } + + return std::nullopt; + } + private: + std::shared_ptr sseConn = nullptr; uint64_t eventSeqNum; std::string host; std::string port; std::string path; std::string uriProto; std::shared_ptr conn = nullptr; - std::shared_ptr sseConn = nullptr; + std::string subId; }; class EventServiceManager @@ -942,6 +968,8 @@ class EventServiceManager subValue->updateRetryConfig(retryAttempts, retryTimeoutInterval); subValue->updateRetryPolicy(); + // Set Subscription ID for back trace + subValue->setSubscriptionId(id); return id; } @@ -970,11 +998,40 @@ class EventServiceManager } } + void deleteSubscription(const std::shared_ptr& connPtr) + { + for (const auto& it : this->subscriptionsMap) + { + std::shared_ptr entry = it.second; + if (entry->subscriptionType == subscriptionTypeSSE) + { + std::optional id = + entry->getSubscriptionId(connPtr); + if (id) + { + deleteSubscription(*id); + return; + } + } + } + } + size_t getNumberOfSubscriptions() { return subscriptionsMap.size(); } + size_t getNumberOfSSESubscriptions() const + { + auto count = std::count_if( + subscriptionsMap.begin(), subscriptionsMap.end(), + [this](const std::pair>& + entry) { + return (entry.second->subscriptionType == subscriptionTypeSSE); + }); + return static_cast(count); + } + std::vector getAllIDs() { std::vector idList; diff --git a/redfish-core/include/server_sent_events.hpp b/redfish-core/include/server_sent_events.hpp deleted file mode 100644 index 7613d7b..0000000 --- a/redfish-core/include/server_sent_events.hpp +++ /dev/null @@ -1,290 +0,0 @@ - -/* -// Copyright (c) 2020 Intel Corporation -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -*/ -#pragma once - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace crow -{ - -static constexpr uint8_t maxReqQueueSize = 50; - -enum class SseConnState -{ - startInit, - initInProgress, - initialized, - initFailed, - sendInProgress, - sendFailed, - idle, - suspended, - closed -}; - -class ServerSentEvents : public std::enable_shared_from_this -{ - private: - std::shared_ptr sseConn; - std::queue> requestDataQueue; - std::string outBuffer; - SseConnState state; - int retryCount; - int maxRetryAttempts; - - void sendEvent(const std::string& id, const std::string& msg) - { - if (msg.empty()) - { - BMCWEB_LOG_DEBUG << "Empty data, bailing out."; - return; - } - - if (state == SseConnState::sendInProgress) - { - return; - } - state = SseConnState::sendInProgress; - - if (!id.empty()) - { - outBuffer += "id: "; - outBuffer.append(id.begin(), id.end()); - outBuffer += "\n"; - } - - outBuffer += "data: "; - for (char character : msg) - { - outBuffer += character; - if (character == '\n') - { - outBuffer += "data: "; - } - } - outBuffer += "\n\n"; - - doWrite(); - } - - void doWrite() - { - if (outBuffer.empty()) - { - BMCWEB_LOG_DEBUG << "All data sent successfully."; - // Send is successful, Lets remove data from queue - // check for next request data in queue. - requestDataQueue.pop(); - state = SseConnState::idle; - checkQueue(); - return; - } - - sseConn->async_write_some( - boost::asio::buffer(outBuffer.data(), outBuffer.size()), - [self(shared_from_this())]( - boost::beast::error_code ec, - [[maybe_unused]] const std::size_t& bytesTransferred) { - self->outBuffer.erase(0, bytesTransferred); - - if (ec == boost::asio::error::eof) - { - // Send is successful, Lets remove data from queue - // check for next request data in queue. - self->requestDataQueue.pop(); - self->state = SseConnState::idle; - self->checkQueue(); - return; - } - - if (ec) - { - BMCWEB_LOG_ERROR << "async_write_some() failed: " - << ec.message(); - self->state = SseConnState::sendFailed; - self->checkQueue(); - return; - } - BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: " - << bytesTransferred; - - self->doWrite(); - }); - } - - void startSSE() - { - if (state == SseConnState::initInProgress) - { - return; - } - state = SseConnState::initInProgress; - - BMCWEB_LOG_DEBUG << "starting SSE connection "; - using BodyType = boost::beast::http::buffer_body; - auto response = - std::make_shared>( - boost::beast::http::status::ok, 11); - auto serializer = - std::make_shared>( - *response); - - // TODO: Add hostname in http header. - response->set(boost::beast::http::field::server, "iBMC"); - response->set(boost::beast::http::field::content_type, - "text/event-stream"); - response->body().data = nullptr; - response->body().size = 0; - response->body().more = true; - - boost::beast::http::async_write_header( - *sseConn, *serializer, - [this, response, - serializer](const boost::beast::error_code& ec, - [[maybe_unused]] const std::size_t& bytesTransferred) { - if (ec) - { - BMCWEB_LOG_ERROR << "Error sending header" << ec; - state = SseConnState::initFailed; - checkQueue(); - return; - } - - BMCWEB_LOG_DEBUG << "startSSE Header sent."; - state = SseConnState::initialized; - checkQueue(); - }); - } - - void checkQueue(const bool newRecord = false) - { - if (requestDataQueue.empty()) - { - BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n"; - return; - } - - if (retryCount >= maxRetryAttempts) - { - BMCWEB_LOG_ERROR << "Maximum number of retries is reached."; - - // Clear queue. - while (!requestDataQueue.empty()) - { - requestDataQueue.pop(); - } - - // TODO: Take 'DeliveryRetryPolicy' action. - // For now, doing 'SuspendRetries' action. - state = SseConnState::suspended; - return; - } - - if ((state == SseConnState::initFailed) || - (state == SseConnState::sendFailed)) - { - if (newRecord) - { - // We are already running async wait and retry. - // Since record is added to queue, it gets the - // turn in FIFO. - return; - } - - retryCount++; - // TODO: Perform async wait for retryTimeoutInterval before proceed. - } - else - { - // reset retry count. - retryCount = 0; - } - - switch (state) - { - case SseConnState::initInProgress: - case SseConnState::sendInProgress: - case SseConnState::suspended: - case SseConnState::startInit: - case SseConnState::closed: - // do nothing - break; - case SseConnState::initFailed: - { - startSSE(); - break; - } - case SseConnState::initialized: - case SseConnState::idle: - case SseConnState::sendFailed: - { - std::pair reqData = - requestDataQueue.front(); - sendEvent(std::to_string(reqData.first), reqData.second); - break; - } - } - - return; - } - - public: - ServerSentEvents(const ServerSentEvents&) = delete; - ServerSentEvents& operator=(const ServerSentEvents&) = delete; - ServerSentEvents(ServerSentEvents&&) = delete; - ServerSentEvents& operator=(ServerSentEvents&&) = delete; - - ServerSentEvents(const std::shared_ptr& adaptor) : - sseConn(adaptor), state(SseConnState::startInit), retryCount(0), - maxRetryAttempts(5) - { - startSSE(); - } - - ~ServerSentEvents() = default; - - void sendData(const uint64_t& id, const std::string& data) - { - if (state == SseConnState::suspended) - { - return; - } - - if (requestDataQueue.size() <= maxReqQueueSize) - { - requestDataQueue.push(std::pair(id, data)); - checkQueue(true); - } - else - { - BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; - } - } -}; - -} // namespace crow diff --git a/redfish-core/lib/event_service.hpp b/redfish-core/lib/event_service.hpp index 8609862..249e594 100644 --- a/redfish-core/lib/event_service.hpp +++ b/redfish-core/lib/event_service.hpp @@ -37,8 +37,6 @@ static constexpr const std::array supportedResourceTypes = { "Task"}; #endif -static constexpr const uint8_t maxNoOfSubscriptions = 20; - inline void requestRoutesEventService(App& app) { BMCWEB_ROUTE(app, "/redfish/v1/EventService/") @@ -50,6 +48,8 @@ inline void requestRoutesEventService(App& app) {"@odata.type", "#EventService.v1_5_0.EventService"}, {"Id", "EventService"}, {"Name", "Event Service"}, + {"ServerSentEventUri", + "/redfish/v1/EventService/Subscriptions/SSE"}, {"Subscriptions", {{"@odata.id", "/redfish/v1/EventService/Subscriptions"}}}, {"Actions", @@ -90,9 +90,7 @@ inline void requestRoutesEventService(App& app) .privileges(redfish::privileges::patchEventService) .methods(boost::beast::http::verb::patch)( [](const crow::Request& req, - const std::shared_ptr& asyncResp) - - { + const std::shared_ptr& asyncResp) { std::optional serviceEnabled; std::optional retryAttemps; std::optional retryInterval; diff --git a/src/webserver_main.cpp b/src/webserver_main.cpp index bf98aae..53745d8 100644 --- a/src/webserver_main.cpp +++ b/src/webserver_main.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -83,6 +84,7 @@ int main(int /*argc*/, char** /*argv*/) #endif #ifdef BMCWEB_ENABLE_REDFISH + redfish::eventservice_sse::requestRoutes(app); redfish::requestRoutes(app); redfish::RedfishService redfish(app); -- 2.17.1