From adaa5cb4c494148430b90edb248260eb2e66bca7 Mon Sep 17 00:00:00 2001 From: P Dheeraj Srujan Kumar Date: Wed, 8 Sep 2021 15:42:52 +0530 Subject: [PATCH] Delete/Remove Terminated Event Subscription(s) Added functionality to delete/remove event subscription(s) which are configured to Terminate after retries. Currently, when an Event is subscribed with Retry Policy as "TerminateAfterRetries", the state of the connection is set to "Terminated" after retrying, but the Subscription is not removed. This commit adds the functionality to detect terminated connection and remove the respective subscription. Tested: - Created a Subscription with DeliveryRetryPolicy: "TerminateAfterRetries" - Received Events successfully on Event listener - Once the Event listener was stopped, the Subscription was removed/deleted after retries. Change-Id: If447acb2db74fb29a5d1cfe6194b77cda82bc8a1 Signed-off-by: P Dheeraj Srujan Kumar --- http/http_client.hpp | 43 +++++++++++++++---- .../include/event_service_manager.hpp | 37 ++++++++++++++++ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/http/http_client.hpp b/http/http_client.hpp index aaf1b2d..4f62c40 100644 --- a/http/http_client.hpp +++ b/http/http_client.hpp @@ -56,6 +56,8 @@ enum class ConnState closeInProgress, closed, suspended, + terminate, + terminateInProgress, terminated, abortConnection, retry @@ -263,7 +265,14 @@ class HttpClient : public std::enable_shared_from_this } void doClose() { - state = ConnState::closeInProgress; + if (state == ConnState::terminate) + { + state = ConnState::terminateInProgress; + } + else if (state != ConnState::suspended) + { + state = ConnState::closeInProgress; + } // Set the timeout on the tcp stream socket for the async operation conn.expires_after(std::chrono::seconds(30)); @@ -293,8 +302,11 @@ class HttpClient : public std::enable_shared_from_this } self->conn.close(); - if ((self->state != ConnState::suspended) && - (self->state != ConnState::terminated)) + if (self->state == ConnState::terminateInProgress) + { + self->state = ConnState::terminated; + } + else if (self->state == ConnState::closeInProgress) { self->state = ConnState::closed; self->handleConnState(); @@ -316,8 +328,11 @@ class HttpClient : public std::enable_shared_from_this } conn.close(); - if ((state != ConnState::suspended) && - (state != ConnState::terminated)) + if (state == ConnState::terminateInProgress) + { + state = ConnState::terminated; + } + else if (state == ConnState::closeInProgress) { state = ConnState::closed; handleConnState(); @@ -340,8 +355,7 @@ class HttpClient : public std::enable_shared_from_this BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction; if (retryPolicyAction == "TerminateAfterRetries") { - // TODO: delete subscription - state = ConnState::terminated; + state = ConnState::terminate; } if (retryPolicyAction == "SuspendRetries") { @@ -392,6 +406,7 @@ class HttpClient : public std::enable_shared_from_this case ConnState::sendInProgress: case ConnState::recvInProgress: case ConnState::closeInProgress: + case ConnState::terminateInProgress: { BMCWEB_LOG_DEBUG << "Async operation is already in progress"; break; @@ -413,7 +428,7 @@ class HttpClient : public std::enable_shared_from_this break; } case ConnState::suspended: - case ConnState::terminated: + case ConnState::terminate: { doClose(); break; @@ -480,7 +495,8 @@ class HttpClient : public std::enable_shared_from_this } void sendData(const std::string& data) { - if ((state == ConnState::suspended) || (state == ConnState::terminated)) + if ((state == ConnState::terminate) || + (state == ConnState::terminated) || (state == ConnState::suspended)) { return; } @@ -489,6 +505,15 @@ class HttpClient : public std::enable_shared_from_this return; } + bool isTerminated() + { + if (state == ConnState::terminated) + { + return true; + } + return false; + } + void addHeaders( const std::vector>& httpHeaders) { diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp index 8d7067b..79618f6 100644 --- a/redfish-core/include/event_service_manager.hpp +++ b/redfish-core/include/event_service_manager.hpp @@ -591,6 +591,14 @@ class Subscription : public persistent_data::UserSubscription return std::nullopt; } + bool isTerminated() + { + if (conn != nullptr) + return conn->isTerminated(); + + return false; + } + private: std::shared_ptr sseConn = nullptr; uint64_t eventSeqNum; @@ -847,6 +855,22 @@ class EventServiceManager } } + void deleteTerminatedSubcriptions() + { + boost::container::flat_map>::iterator it = + subscriptionsMap.begin(); + while (it != subscriptionsMap.end()) + { + std::shared_ptr entry = it->second; + if (entry->isTerminated()) + { + subscriptionsMap.erase(it); + } + it++; + } + } + void updateNoOfSubscribersCount() { size_t eventLogSubCount = 0; @@ -881,6 +905,7 @@ class EventServiceManager std::shared_ptr getSubscription(const std::string& id) { + deleteTerminatedSubcriptions(); auto obj = subscriptionsMap.find(id); if (obj == subscriptionsMap.end()) { @@ -971,6 +996,7 @@ class EventServiceManager bool isSubscriptionExist(const std::string& id) { + deleteTerminatedSubcriptions(); auto obj = subscriptionsMap.find(id); if (obj == subscriptionsMap.end()) { @@ -1033,6 +1059,7 @@ class EventServiceManager size_t getNumberOfSubscriptions() { + deleteTerminatedSubcriptions(); return subscriptionsMap.size(); } @@ -1049,6 +1076,7 @@ class EventServiceManager std::vector getAllIDs() { + deleteTerminatedSubcriptions(); std::vector idList; for (const auto& it : subscriptionsMap) { @@ -1059,6 +1087,7 @@ class EventServiceManager bool isDestinationExist(const std::string& destUrl) { + deleteTerminatedSubcriptions(); for (const auto& it : subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1073,6 +1102,7 @@ class EventServiceManager void sendTestEventLog() { + deleteTerminatedSubcriptions(); for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1100,6 +1130,8 @@ class EventServiceManager } eventRecord.push_back(eventMessage); + deleteTerminatedSubcriptions(); + for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1143,6 +1175,8 @@ class EventServiceManager } void sendBroadcastMsg(const std::string& broadcastMsg) { + deleteTerminatedSubcriptions(); + for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1291,6 +1325,8 @@ class EventServiceManager return; } + deleteTerminatedSubcriptions(); + for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; -- 2.17.1