From f665ba085bb2310f008b7534f827fb401ad973c2 Mon Sep 17 00:00:00 2001 From: Krzysztof Grobelny Date: Tue, 12 Oct 2021 08:19:51 +0000 Subject: [PATCH] Delete/Remove Terminated Event Subscription(s) Added functionality to delete/remove event subscription(s) which are configured to Terminate after retries. Currently, when an Event is subscribed with Retry Policy as "TerminateAfterRetries", the state of the connection is set to "Terminated" after retrying, but the Subscription is not removed. This commit adds the functionality to detect terminated connection and remove the respective subscription. Tested: - Created a Subscription with DeliveryRetryPolicy: "TerminateAfterRetries" - Received Events successfully on Event listener - Once the Event listener was stopped, the Subscription was removed/deleted after retries. Change-Id: If447acb2db74fb29a5d1cfe6194b77cda82bc8a1 Signed-off-by: P Dheeraj Srujan Kumar --- http/http_client.hpp | 43 +++++++++++++++---- .../include/event_service_manager.hpp | 36 ++++++++++++++++ 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/http/http_client.hpp b/http/http_client.hpp index 5e7ff47..54ae2c3 100644 --- a/http/http_client.hpp +++ b/http/http_client.hpp @@ -55,6 +55,8 @@ enum class ConnState closeInProgress, closed, suspended, + terminate, + terminateInProgress, terminated, abortConnection, retry @@ -288,7 +290,14 @@ class HttpClient : public std::enable_shared_from_this } void doClose() { - state = ConnState::closeInProgress; + if (state == ConnState::terminate) + { + state = ConnState::terminateInProgress; + } + else if (state != ConnState::suspended) + { + state = ConnState::closeInProgress; + } // Set the timeout on the tcp stream socket for the async operation conn.expires_after(std::chrono::seconds(30)); @@ -318,8 +327,11 @@ class HttpClient : public std::enable_shared_from_this } self->conn.close(); - if ((self->state != ConnState::suspended) && - (self->state != ConnState::terminated)) + if (self->state == ConnState::terminateInProgress) + { + self->state = ConnState::terminated; + } + else if (self->state == ConnState::closeInProgress) { self->state = ConnState::closed; self->handleConnState(); @@ -341,8 +353,11 @@ class HttpClient : public std::enable_shared_from_this } conn.close(); - if ((state != ConnState::suspended) && - (state != ConnState::terminated)) + if (state == ConnState::terminateInProgress) + { + state = ConnState::terminated; + } + else if (state == ConnState::closeInProgress) { state = ConnState::closed; handleConnState(); @@ -365,8 +380,7 @@ class HttpClient : public std::enable_shared_from_this BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction; if (retryPolicyAction == "TerminateAfterRetries") { - // TODO: delete subscription - state = ConnState::terminated; + state = ConnState::terminate; } if (retryPolicyAction == "SuspendRetries") { @@ -423,6 +437,7 @@ class HttpClient : public std::enable_shared_from_this case ConnState::sendInProgress: case ConnState::recvInProgress: case ConnState::closeInProgress: + case ConnState::terminateInProgress: { BMCWEB_LOG_DEBUG << "Async operation is already in progress"; break; @@ -439,7 +454,7 @@ class HttpClient : public std::enable_shared_from_this break; } case ConnState::suspended: - case ConnState::terminated: + case ConnState::terminate: { doClose(); break; @@ -506,7 +521,8 @@ class HttpClient : public std::enable_shared_from_this } void sendData(const std::string& data) { - if ((state == ConnState::suspended) || (state == ConnState::terminated)) + if ((state == ConnState::terminate) || + (state == ConnState::terminated) || (state == ConnState::suspended)) { return; } @@ -524,6 +540,15 @@ class HttpClient : public std::enable_shared_from_this return; } + bool isTerminated() + { + if (state == ConnState::terminated) + { + return true; + } + return false; + } + void addHeaders( const std::vector>& httpHeaders) { diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp index 6f60a31..363adb0 100644 --- a/redfish-core/include/event_service_manager.hpp +++ b/redfish-core/include/event_service_manager.hpp @@ -591,6 +591,14 @@ class Subscription : public persistent_data::UserSubscription return std::nullopt; } + bool isTerminated() + { + if (conn != nullptr) + return conn->isTerminated(); + + return false; + } + private: std::shared_ptr sseConn = nullptr; uint64_t eventSeqNum; @@ -847,6 +855,22 @@ class EventServiceManager } } + void deleteTerminatedSubcriptions() + { + boost::container::flat_map>::iterator it = + subscriptionsMap.begin(); + while (it != subscriptionsMap.end()) + { + std::shared_ptr entry = it->second; + if (entry->isTerminated()) + { + subscriptionsMap.erase(it); + } + it++; + } + } + void updateNoOfSubscribersCount() { size_t eventLogSubCount = 0; @@ -881,6 +905,7 @@ class EventServiceManager std::shared_ptr getSubscription(const std::string& id) { + deleteTerminatedSubcriptions(); auto obj = subscriptionsMap.find(id); if (obj == subscriptionsMap.end()) { @@ -971,6 +996,7 @@ class EventServiceManager bool isSubscriptionExist(const std::string& id) { + deleteTerminatedSubcriptions(); auto obj = subscriptionsMap.find(id); if (obj == subscriptionsMap.end()) { @@ -1033,6 +1059,7 @@ class EventServiceManager size_t getNumberOfSubscriptions() { + deleteTerminatedSubcriptions(); return subscriptionsMap.size(); } @@ -1049,6 +1076,7 @@ class EventServiceManager std::vector getAllIDs() { + deleteTerminatedSubcriptions(); std::vector idList; for (const auto& it : subscriptionsMap) { @@ -1059,6 +1087,7 @@ class EventServiceManager bool isDestinationExist(const std::string& destUrl) { + deleteTerminatedSubcriptions(); for (const auto& it : subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1073,6 +1102,7 @@ class EventServiceManager void sendTestEventLog() { + deleteTerminatedSubcriptions(); for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1100,6 +1130,8 @@ class EventServiceManager } eventRecord.push_back(eventMessage); + deleteTerminatedSubcriptions(); + for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1143,6 +1175,8 @@ class EventServiceManager } void sendBroadcastMsg(const std::string& broadcastMsg) { + deleteTerminatedSubcriptions(); + for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; @@ -1291,6 +1325,8 @@ class EventServiceManager return; } + deleteTerminatedSubcriptions(); + for (const auto& it : this->subscriptionsMap) { std::shared_ptr entry = it.second; -- 2.25.1