From adaa5cb4c494148430b90edb248260eb2e66bca7 Mon Sep 17 00:00:00 2001
From: P Dheeraj Srujan Kumar
Date: Wed, 8 Sep 2021 15:42:52 +0530
Subject: [PATCH] Delete/Remove Terminated Event Subscription(s)
Added functionality to delete/remove event subscription(s) which are
configured to Terminate after retries.
Currently, when an Event is subscribed with Retry Policy as
"TerminateAfterRetries", the state of the connection is set to
"Terminated" after retrying, but the Subscription is not removed.
This commit adds the functionality to detect terminated connection and
remove the respective subscription.
Tested:
- Created a Subscription with
DeliveryRetryPolicy: "TerminateAfterRetries"
- Received Events successfully on Event listener
- Once the Event listener was stopped, the Subscription was
removed/deleted after retries.
Change-Id: If447acb2db74fb29a5d1cfe6194b77cda82bc8a1
Signed-off-by: P Dheeraj Srujan Kumar
---
http/http_client.hpp | 43 +++++++++++++++----
.../include/event_service_manager.hpp | 37 ++++++++++++++++
2 files changed, 71 insertions(+), 9 deletions(-)
diff --git a/http/http_client.hpp b/http/http_client.hpp
index aaf1b2d..4f62c40 100644
--- a/http/http_client.hpp
+++ b/http/http_client.hpp
@@ -56,6 +56,8 @@ enum class ConnState
closeInProgress,
closed,
suspended,
+ terminate,
+ terminateInProgress,
terminated,
abortConnection,
retry
@@ -263,7 +265,14 @@ class HttpClient : public std::enable_shared_from_this
}
void doClose()
{
- state = ConnState::closeInProgress;
+ if (state == ConnState::terminate)
+ {
+ state = ConnState::terminateInProgress;
+ }
+ else if (state != ConnState::suspended)
+ {
+ state = ConnState::closeInProgress;
+ }
// Set the timeout on the tcp stream socket for the async operation
conn.expires_after(std::chrono::seconds(30));
@@ -293,8 +302,11 @@ class HttpClient : public std::enable_shared_from_this
}
self->conn.close();
- if ((self->state != ConnState::suspended) &&
- (self->state != ConnState::terminated))
+ if (self->state == ConnState::terminateInProgress)
+ {
+ self->state = ConnState::terminated;
+ }
+ else if (self->state == ConnState::closeInProgress)
{
self->state = ConnState::closed;
self->handleConnState();
@@ -316,8 +328,11 @@ class HttpClient : public std::enable_shared_from_this
}
conn.close();
- if ((state != ConnState::suspended) &&
- (state != ConnState::terminated))
+ if (state == ConnState::terminateInProgress)
+ {
+ state = ConnState::terminated;
+ }
+ else if (state == ConnState::closeInProgress)
{
state = ConnState::closed;
handleConnState();
@@ -340,8 +355,7 @@ class HttpClient : public std::enable_shared_from_this
BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction;
if (retryPolicyAction == "TerminateAfterRetries")
{
- // TODO: delete subscription
- state = ConnState::terminated;
+ state = ConnState::terminate;
}
if (retryPolicyAction == "SuspendRetries")
{
@@ -392,6 +406,7 @@ class HttpClient : public std::enable_shared_from_this
case ConnState::sendInProgress:
case ConnState::recvInProgress:
case ConnState::closeInProgress:
+ case ConnState::terminateInProgress:
{
BMCWEB_LOG_DEBUG << "Async operation is already in progress";
break;
@@ -413,7 +428,7 @@ class HttpClient : public std::enable_shared_from_this
break;
}
case ConnState::suspended:
- case ConnState::terminated:
+ case ConnState::terminate:
{
doClose();
break;
@@ -480,7 +495,8 @@ class HttpClient : public std::enable_shared_from_this
}
void sendData(const std::string& data)
{
- if ((state == ConnState::suspended) || (state == ConnState::terminated))
+ if ((state == ConnState::terminate) ||
+ (state == ConnState::terminated) || (state == ConnState::suspended))
{
return;
}
@@ -489,6 +505,15 @@ class HttpClient : public std::enable_shared_from_this
return;
}
+ bool isTerminated()
+ {
+ if (state == ConnState::terminated)
+ {
+ return true;
+ }
+ return false;
+ }
+
void addHeaders(
const std::vector>& httpHeaders)
{
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index 8d7067b..79618f6 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -591,6 +591,14 @@ class Subscription : public persistent_data::UserSubscription
return std::nullopt;
}
+ bool isTerminated()
+ {
+ if (conn != nullptr)
+ return conn->isTerminated();
+
+ return false;
+ }
+
private:
std::shared_ptr sseConn = nullptr;
uint64_t eventSeqNum;
@@ -847,6 +855,22 @@ class EventServiceManager
}
}
+ void deleteTerminatedSubcriptions()
+ {
+ boost::container::flat_map>::iterator it =
+ subscriptionsMap.begin();
+ while (it != subscriptionsMap.end())
+ {
+ std::shared_ptr entry = it->second;
+ if (entry->isTerminated())
+ {
+ subscriptionsMap.erase(it);
+ }
+ it++;
+ }
+ }
+
void updateNoOfSubscribersCount()
{
size_t eventLogSubCount = 0;
@@ -881,6 +905,7 @@ class EventServiceManager
std::shared_ptr getSubscription(const std::string& id)
{
+ deleteTerminatedSubcriptions();
auto obj = subscriptionsMap.find(id);
if (obj == subscriptionsMap.end())
{
@@ -971,6 +996,7 @@ class EventServiceManager
bool isSubscriptionExist(const std::string& id)
{
+ deleteTerminatedSubcriptions();
auto obj = subscriptionsMap.find(id);
if (obj == subscriptionsMap.end())
{
@@ -1033,6 +1059,7 @@ class EventServiceManager
size_t getNumberOfSubscriptions()
{
+ deleteTerminatedSubcriptions();
return subscriptionsMap.size();
}
@@ -1049,6 +1076,7 @@ class EventServiceManager
std::vector getAllIDs()
{
+ deleteTerminatedSubcriptions();
std::vector idList;
for (const auto& it : subscriptionsMap)
{
@@ -1059,6 +1087,7 @@ class EventServiceManager
bool isDestinationExist(const std::string& destUrl)
{
+ deleteTerminatedSubcriptions();
for (const auto& it : subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1073,6 +1102,7 @@ class EventServiceManager
void sendTestEventLog()
{
+ deleteTerminatedSubcriptions();
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1100,6 +1130,8 @@ class EventServiceManager
}
eventRecord.push_back(eventMessage);
+ deleteTerminatedSubcriptions();
+
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1143,6 +1175,8 @@ class EventServiceManager
}
void sendBroadcastMsg(const std::string& broadcastMsg)
{
+ deleteTerminatedSubcriptions();
+
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1291,6 +1325,8 @@ class EventServiceManager
return;
}
+ deleteTerminatedSubcriptions();
+
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
--
2.17.1