From 5d13aa636aba6c68ab5a74e589cac35f4e925129 Mon Sep 17 00:00:00 2001
From: P Dheeraj Srujan Kumar
Date: Tue, 12 Oct 2021 08:19:51 +0000
Subject: [PATCH] Delete/Remove Terminated Event Subscription(s)
Added functionality to delete/remove event subscription(s) which are
configured to Terminate after retries.
Currently, when an Event is subscribed with Retry Policy as
"TerminateAfterRetries", the state of the connection is set to
"Terminated" after retrying, but the Subscription is not removed.
This commit adds the functionality to detect terminated connection and
remove the respective subscription.
Tested:
- Created a Subscription with
DeliveryRetryPolicy: "TerminateAfterRetries"
- Received Events successfully on Event listener
- Once the Event listener was stopped, the Subscription was
removed/deleted after retries.
Change-Id: If447acb2db74fb29a5d1cfe6194b77cda82bc8a1
Signed-off-by: P Dheeraj Srujan Kumar
---
http/http_client.hpp | 48 +++++++++++++++----
.../include/event_service_manager.hpp | 36 ++++++++++++++
2 files changed, 75 insertions(+), 9 deletions(-)
diff --git a/http/http_client.hpp b/http/http_client.hpp
index 58b5402..745eeb6 100644
--- a/http/http_client.hpp
+++ b/http/http_client.hpp
@@ -56,6 +56,8 @@ enum class ConnState
closeInProgress,
closed,
suspended,
+ terminate,
+ terminateInProgress,
terminated,
abortConnection,
retry
@@ -290,7 +292,14 @@ class HttpClient : public std::enable_shared_from_this
}
void doClose()
{
- state = ConnState::closeInProgress;
+ if (state == ConnState::terminate)
+ {
+ state = ConnState::terminateInProgress;
+ }
+ else if (state != ConnState::suspended)
+ {
+ state = ConnState::closeInProgress;
+ }
// Set the timeout on the tcp stream socket for the async operation
conn.expires_after(std::chrono::seconds(30));
@@ -320,8 +329,11 @@ class HttpClient : public std::enable_shared_from_this
}
self->conn.close();
- if ((self->state != ConnState::suspended) &&
- (self->state != ConnState::terminated))
+ if (self->state == ConnState::terminateInProgress)
+ {
+ self->state = ConnState::terminated;
+ }
+ else if (self->state == ConnState::closeInProgress)
{
self->state = ConnState::closed;
self->handleConnState();
@@ -343,8 +355,11 @@ class HttpClient : public std::enable_shared_from_this
}
conn.close();
- if ((state != ConnState::suspended) &&
- (state != ConnState::terminated))
+ if (state == ConnState::terminateInProgress)
+ {
+ state = ConnState::terminated;
+ }
+ else if (state == ConnState::closeInProgress)
{
state = ConnState::closed;
handleConnState();
@@ -367,8 +382,7 @@ class HttpClient : public std::enable_shared_from_this
BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction;
if (retryPolicyAction == "TerminateAfterRetries")
{
- // TODO: delete subscription
- state = ConnState::terminated;
+ state = ConnState::terminate;
}
if (retryPolicyAction == "SuspendRetries")
{
@@ -425,6 +439,7 @@ class HttpClient : public std::enable_shared_from_this
case ConnState::sendInProgress:
case ConnState::recvInProgress:
case ConnState::closeInProgress:
+ case ConnState::terminateInProgress:
{
BMCWEB_LOG_DEBUG << "Async operation is already in progress";
break;
@@ -441,11 +456,16 @@ class HttpClient : public std::enable_shared_from_this
break;
}
case ConnState::suspended:
- case ConnState::terminated:
+ case ConnState::terminate:
{
doClose();
break;
}
+ case ConnState::terminated:
+ {
+ BMCWEB_LOG_DEBUG << "Connection Terminated";
+ break;
+ }
case ConnState::resolveFailed:
case ConnState::connectFailed:
case ConnState::handshakeFailed:
@@ -505,7 +525,8 @@ class HttpClient : public std::enable_shared_from_this
void sendData(const std::string& data)
{
- if ((state == ConnState::suspended) || (state == ConnState::terminated))
+ if ((state == ConnState::terminate) ||
+ (state == ConnState::terminated) || (state == ConnState::suspended))
{
return;
}
@@ -523,6 +544,15 @@ class HttpClient : public std::enable_shared_from_this
return;
}
+ bool isTerminated()
+ {
+ if (state == ConnState::terminated)
+ {
+ return true;
+ }
+ return false;
+ }
+
void setHeaders(const boost::beast::http::fields& httpHeaders)
{
// Set custom headers
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index 69db652..5d71c63 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -588,6 +588,14 @@ class Subscription : public persistent_data::UserSubscription
return std::nullopt;
}
+ bool isTerminated()
+ {
+ if (conn != nullptr)
+ return conn->isTerminated();
+
+ return false;
+ }
+
private:
std::shared_ptr sseConn = nullptr;
uint64_t eventSeqNum;
@@ -843,6 +851,22 @@ class EventServiceManager
}
}
+ void deleteTerminatedSubcriptions()
+ {
+ boost::container::flat_map>::iterator it =
+ subscriptionsMap.begin();
+ while (it != subscriptionsMap.end())
+ {
+ std::shared_ptr entry = it->second;
+ if (entry->isTerminated())
+ {
+ subscriptionsMap.erase(it);
+ }
+ it++;
+ }
+ }
+
void updateNoOfSubscribersCount()
{
size_t eventLogSubCount = 0;
@@ -877,6 +901,7 @@ class EventServiceManager
std::shared_ptr getSubscription(const std::string& id)
{
+ deleteTerminatedSubcriptions();
auto obj = subscriptionsMap.find(id);
if (obj == subscriptionsMap.end())
{
@@ -968,6 +993,7 @@ class EventServiceManager
bool isSubscriptionExist(const std::string& id)
{
+ deleteTerminatedSubcriptions();
auto obj = subscriptionsMap.find(id);
if (obj == subscriptionsMap.end())
{
@@ -1030,6 +1056,7 @@ class EventServiceManager
size_t getNumberOfSubscriptions()
{
+ deleteTerminatedSubcriptions();
return subscriptionsMap.size();
}
@@ -1046,6 +1073,7 @@ class EventServiceManager
std::vector getAllIDs()
{
+ deleteTerminatedSubcriptions();
std::vector idList;
for (const auto& it : subscriptionsMap)
{
@@ -1056,6 +1084,7 @@ class EventServiceManager
bool isDestinationExist(const std::string& destUrl)
{
+ deleteTerminatedSubcriptions();
for (const auto& it : subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1070,6 +1099,7 @@ class EventServiceManager
void sendTestEventLog()
{
+ deleteTerminatedSubcriptions();
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1097,6 +1127,8 @@ class EventServiceManager
}
eventRecord.push_back(eventMessage);
+ deleteTerminatedSubcriptions();
+
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1140,6 +1172,8 @@ class EventServiceManager
}
void sendBroadcastMsg(const std::string& broadcastMsg)
{
+ deleteTerminatedSubcriptions();
+
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
@@ -1254,6 +1288,8 @@ class EventServiceManager
return;
}
+ deleteTerminatedSubcriptions();
+
for (const auto& it : this->subscriptionsMap)
{
std::shared_ptr entry = it.second;
--
2.17.1