From f74393a9bca899e353be3d0e2dc5c224539fe432 Mon Sep 17 00:00:00 2001 From: Sunitha Harish Date: Fri, 19 Feb 2021 13:38:31 +0530 Subject: [PATCH] EventService : Fix retry handling for http-client When the event send/receive is failed, the bmcweb does not handle the failure to tear-down the complete connection and start a fresh The keep-alive header from the event listener is read to update the connection states, so that the connection will be kept alive or closed as per the subscriber's specifications Updated the connection state machine to handle retry logic properly. Avoided multiple simultaneous async calls which crashes the bmcweb. So added connBusy flag which protects simultaneous async calls. Used boost http response parser as parser for producing the response message. Set the parser skip option to handle the empty response message from listening server. Tested by: - Subscribe for the events at BMC using DMTF event listener - Generate an event and see the same is received at the listener's console - Update the listner to change the keep-alive to true/false and observe the http-client connection states at bmcweb - Changed listener client to return non success HTTP status code and observed retry logic gets trigrred in http-client. - Gave wrong fqdn and observed async resolve failure and retry logc. - Stopped listener after connect and verified timeouts on http-client side. Change-Id: Ibb45691f139916ba2954da37beda9d4f91c7cef3 Signed-off-by: Sunitha Harish Signed-off-by: AppaRao Puli --- http/http_client.hpp | 288 ++++++++++-------- .../include/event_service_manager.hpp | 2 +- 2 files changed, 162 insertions(+), 128 deletions(-) diff --git a/http/http_client.hpp b/http/http_client.hpp index 992ac2b..feabbba 100644 --- a/http/http_client.hpp +++ b/http/http_client.hpp @@ -34,22 +34,28 @@ namespace crow { static constexpr uint8_t maxRequestQueueSize = 50; +static constexpr unsigned int httpReadBodyLimit = 8192; enum class ConnState { initialized, resolveInProgress, resolveFailed, + resolved, connectInProgress, connectFailed, connected, sendInProgress, sendFailed, + recvInProgress, recvFailed, idle, - suspended, + closeInProgress, closed, - terminated + suspended, + terminated, + abortConnection, + retry }; class HttpClient : public std::enable_shared_from_this @@ -58,11 +64,13 @@ class HttpClient : public std::enable_shared_from_this crow::async_resolve::Resolver resolver; boost::beast::tcp_stream conn; boost::asio::steady_timer timer; - boost::beast::flat_buffer buffer; + boost::beast::flat_static_buffer buffer; boost::beast::http::request req; - boost::beast::http::response res; - std::vector> headers; - std::queue requestDataQueue; + std::optional< + boost::beast::http::response_parser> + parser; + boost::circular_buffer_space_optimized requestDataQueue{}; + std::vector endPoints; ConnState state; std::string subId; std::string host; @@ -76,12 +84,7 @@ class HttpClient : public std::enable_shared_from_this void doResolve() { - if (state == ConnState::resolveInProgress) - { - return; - } state = ConnState::resolveInProgress; - BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":" << port; auto respHandler = @@ -89,78 +92,56 @@ class HttpClient : public std::enable_shared_from_this const boost::beast::error_code ec, const std::vector& endpointList) { - if (ec) + if (ec || (endpointList.size() == 0)) { BMCWEB_LOG_ERROR << "Resolve failed: " << ec.message(); self->state = ConnState::resolveFailed; - self->checkQueue(); + self->handleConnState(); return; } BMCWEB_LOG_DEBUG << "Resolved"; - self->doConnect(endpointList); + self->endPoints.assign(endpointList.begin(), + endpointList.end()); + self->state = ConnState::resolved; + self->handleConnState(); }; resolver.asyncResolve(host, port, std::move(respHandler)); } - void doConnect( - const std::vector& endpointList) + void doConnect() { - if (state == ConnState::connectInProgress) - { - return; - } state = ConnState::connectInProgress; BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port; conn.expires_after(std::chrono::seconds(30)); conn.async_connect( - endpointList, [self(shared_from_this())]( - const boost::beast::error_code ec, - const boost::asio::ip::tcp::endpoint& endpoint) { + endPoints, [self(shared_from_this())]( + const boost::beast::error_code ec, + const boost::asio::ip::tcp::endpoint& endpoint) { if (ec) { BMCWEB_LOG_ERROR << "Connect " << endpoint << " failed: " << ec.message(); self->state = ConnState::connectFailed; - self->checkQueue(); + self->handleConnState(); return; } - self->state = ConnState::connected; BMCWEB_LOG_DEBUG << "Connected to: " << endpoint; - - self->checkQueue(); + self->state = ConnState::connected; + self->handleConnState(); }); } void sendMessage(const std::string& data) { - if (state == ConnState::sendInProgress) - { - return; - } state = ConnState::sendInProgress; BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port; - req.version(static_cast(11)); // HTTP 1.1 - req.target(uri); - req.method(boost::beast::http::verb::post); - - // Set headers - for (const auto& [key, value] : headers) - { - req.set(key, value); - } - req.set(boost::beast::http::field::host, host); - req.keep_alive(true); - req.body() = data; req.prepare_payload(); - // Set a timeout on the operation - conn.expires_after(std::chrono::seconds(30)); - // Send the HTTP request to the remote host boost::beast::http::async_write( conn, req, @@ -171,7 +152,7 @@ class HttpClient : public std::enable_shared_from_this BMCWEB_LOG_ERROR << "sendMessage() failed: " << ec.message(); self->state = ConnState::sendFailed; - self->checkQueue(); + self->handleConnState(); return; } BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: " @@ -184,9 +165,17 @@ class HttpClient : public std::enable_shared_from_this void recvMessage() { + state = ConnState::recvInProgress; + + parser.emplace(std::piecewise_construct, std::make_tuple()); + parser->body_limit(httpReadBodyLimit); + + // Check only for the response header + parser->skip(true); + // Receive the HTTP response boost::beast::http::async_read( - conn, buffer, res, + conn, buffer, *parser, [self(shared_from_this())](const boost::beast::error_code& ec, const std::size_t& bytesTransferred) { if (ec) @@ -194,30 +183,47 @@ class HttpClient : public std::enable_shared_from_this BMCWEB_LOG_ERROR << "recvMessage() failed: " << ec.message(); self->state = ConnState::recvFailed; - self->checkQueue(); + self->handleConnState(); return; } BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: " << bytesTransferred; - boost::ignore_unused(bytesTransferred); - - // Discard received data. We are not interested. - BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res; + BMCWEB_LOG_DEBUG << "recvMessage() data: " + << self->parser->get(); // Send is successful, Lets remove data from queue // check for next request data in queue. - self->requestDataQueue.pop(); + if (!self->requestDataQueue.empty()) + { + self->requestDataQueue.pop_front(); + } self->state = ConnState::idle; - self->checkQueue(); + + // Keep the connection alive if server supports it + // Else close the connection + BMCWEB_LOG_DEBUG << "recvMessage() keepalive : " + << self->parser->keep_alive(); + if (!self->parser->keep_alive()) + { + // Abort the connection since server is not keep-alive + // enabled + self->state = ConnState::abortConnection; + } + + // Returns ownership of the parsed message + self->parser->release(); + + self->handleConnState(); }); } void doClose() { + state = ConnState::closeInProgress; boost::beast::error_code ec; conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + conn.close(); - state = ConnState::closed; // not_connected happens sometimes so don't bother reporting it. if (ec && ec != boost::beast::errc::not_connected) { @@ -225,112 +231,139 @@ class HttpClient : public std::enable_shared_from_this return; } BMCWEB_LOG_DEBUG << "Connection closed gracefully"; - } - - void checkQueue(const bool newRecord = false) - { - if (requestDataQueue.empty()) + if ((state != ConnState::suspended) && (state != ConnState::terminated)) { - // TODO: Having issue in keeping connection alive. So lets close if - // nothing to be transferred. - doClose(); - - BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n"; - return; + state = ConnState::closed; + handleConnState(); } + } + void waitAndRetry() + { if (retryCount >= maxRetryAttempts) { - BMCWEB_LOG_ERROR << "Maximum number of retries is reached."; + BMCWEB_LOG_ERROR << "Maximum number of retries reached."; // Clear queue. while (!requestDataQueue.empty()) { - requestDataQueue.pop(); + requestDataQueue.pop_front(); } - BMCWEB_LOG_DEBUG << "Retry policy is set to " << retryPolicyAction; + BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction; if (retryPolicyAction == "TerminateAfterRetries") { // TODO: delete subscription state = ConnState::terminated; - return; } if (retryPolicyAction == "SuspendRetries") { state = ConnState::suspended; - return; } - // keep retrying, reset count and continue. + // Reset the retrycount to zero so that client can try connecting + // again if needed retryCount = 0; + handleConnState(); + return; } - if ((state == ConnState::connectFailed) || - (state == ConnState::sendFailed) || - (state == ConnState::recvFailed)) + if (runningTimer) { - if (newRecord) - { - // We are already running async wait and retry. - // Since record is added to queue, it gets the - // turn in FIFO. - return; - } - - if (runningTimer) - { - BMCWEB_LOG_DEBUG << "Retry timer is already running."; - return; - } - runningTimer = true; - - retryCount++; - - BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs - << " seconds. RetryCount = " << retryCount; - timer.expires_after(std::chrono::seconds(retryIntervalSecs)); - timer.async_wait( - [self = shared_from_this()](const boost::system::error_code&) { - self->runningTimer = false; - self->connStateCheck(); - }); + BMCWEB_LOG_DEBUG << "Retry timer is already running."; return; } - // reset retry count. - retryCount = 0; - connStateCheck(); + runningTimer = true; + + retryCount++; + + BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs + << " seconds. RetryCount = " << retryCount; + timer.expires_after(std::chrono::seconds(retryIntervalSecs)); + timer.async_wait( + [self = shared_from_this()](const boost::system::error_code ec) { + if (ec) + { + BMCWEB_LOG_ERROR << "async_wait failed: " << ec.message(); + // Ignore the error and continue the retry loop to attempt + // sending the event as per the retry policy + } + self->runningTimer = false; + // Lets close connection and start from resolve. + self->doClose(); + }); return; } - void connStateCheck() + void handleConnState() { switch (state) { case ConnState::resolveInProgress: case ConnState::connectInProgress: case ConnState::sendInProgress: - case ConnState::suspended: - case ConnState::terminated: - // do nothing + case ConnState::recvInProgress: + case ConnState::closeInProgress: + { + BMCWEB_LOG_DEBUG << "Async operation is already in progress"; break; + } case ConnState::initialized: case ConnState::closed: + { + if (requestDataQueue.empty()) + { + BMCWEB_LOG_DEBUG << "requestDataQueue is empty"; + return; + } + doResolve(); + break; + } + case ConnState::resolved: + { + doConnect(); + break; + } + case ConnState::suspended: + case ConnState::terminated: + { + doClose(); + break; + } + case ConnState::resolveFailed: case ConnState::connectFailed: case ConnState::sendFailed: case ConnState::recvFailed: - case ConnState::resolveFailed: + case ConnState::retry: { - doResolve(); + // In case of failures during connect and handshake + // the retry policy will be applied + waitAndRetry(); break; } case ConnState::connected: case ConnState::idle: { + // State idle means, previous attempt is successful + // State connected means, client connection is established + // successfully + if (requestDataQueue.empty()) + { + BMCWEB_LOG_DEBUG << "requestDataQueue is empty"; + return; + } std::string data = requestDataQueue.front(); sendMessage(data); break; } + case ConnState::abortConnection: + { + // Server did not want to keep alive the session + doClose(); + break; + } + default: + break; } } @@ -339,37 +372,38 @@ class HttpClient : public std::enable_shared_from_this const std::string& destIP, const std::string& destPort, const std::string& destUri) : conn(ioc), - timer(ioc), subId(id), host(destIP), port(destPort), uri(destUri), - retryCount(0), maxRetryAttempts(5), retryIntervalSecs(0), + timer(ioc), req(boost::beast::http::verb::post, destUri, 11), + state(ConnState::initialized), subId(id), host(destIP), port(destPort), + uri(destUri), retryCount(0), maxRetryAttempts(5), retryIntervalSecs(0), retryPolicyAction("TerminateAfterRetries"), runningTimer(false) { - state = ConnState::initialized; + // Set the request header + req.set(boost::beast::http::field::host, host); + req.set(boost::beast::http::field::content_type, "application/json"); + req.keep_alive(true); + + requestDataQueue.set_capacity(maxRequestQueueSize); } void sendData(const std::string& data) { - if (state == ConnState::suspended) + if ((state == ConnState::suspended) || (state == ConnState::terminated)) { return; } - - if (requestDataQueue.size() <= maxRequestQueueSize) - { - requestDataQueue.push(data); - checkQueue(true); - } - else - { - BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data."; - } - + requestDataQueue.push_back(data); + handleConnState(); return; } - void setHeaders( + void addHeaders( const std::vector>& httpHeaders) { - headers = httpHeaders; + // Set custom headers + for (const auto& [key, value] : httpHeaders) + { + req.set(key, value); + } } void setRetryConfig(const uint32_t retryAttempts, diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp index 11190ef..a8f7517 100644 --- a/redfish-core/include/event_service_manager.hpp +++ b/redfish-core/include/event_service_manager.hpp @@ -422,7 +422,7 @@ class Subscription reqHeaders.emplace_back(std::pair(key, val)); } } - conn->setHeaders(reqHeaders); + conn->addHeaders(reqHeaders); conn->sendData(msg); this->eventSeqNum++; } -- 2.25.1