/* // Copyright (c) 2020 Intel Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. */ #pragma once #include "async_resolve.hpp" #include "http_body.hpp" #include "http_response.hpp" #include "logging.hpp" #include "ssl_key_handler.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace crow { // With Redfish Aggregation it is assumed we will connect to another // instance of BMCWeb which can handle 100 simultaneous connections. constexpr size_t maxPoolSize = 20; constexpr size_t maxRequestQueueSize = 500; constexpr unsigned int httpReadBodyLimit = 131072; constexpr unsigned int httpReadBufferSize = 4096; enum class ConnState { initialized, resolveInProgress, resolveFailed, connectInProgress, connectFailed, connected, handshakeInProgress, handshakeFailed, sendInProgress, sendFailed, recvInProgress, recvFailed, idle, closed, suspended, terminated, abortConnection, sslInitFailed, retry }; static inline boost::system::error_code defaultRetryHandler(unsigned int respCode) { // As a default, assume 200X is alright BMCWEB_LOG_DEBUG("Using default check for response code validity"); if ((respCode < 200) || (respCode >= 300)) { return boost::system::errc::make_error_code( boost::system::errc::result_out_of_range); } // Return 0 if the response code is valid return boost::system::errc::make_error_code(boost::system::errc::success); }; // We need to allow retry information to be set before a message has been // sent and a connection pool has been created struct ConnectionPolicy { uint32_t maxRetryAttempts = 5; // the max size of requests in bytes. 0 for unlimited boost::optional requestByteLimit = httpReadBodyLimit; size_t maxConnections = 1; std::string retryPolicyAction = "TerminateAfterRetries"; std::chrono::seconds retryIntervalSecs = std::chrono::seconds(0); std::function invalidResp = defaultRetryHandler; }; struct PendingRequest { boost::beast::http::request req; std::function callback; PendingRequest( boost::beast::http::request&& reqIn, const std::function& callbackIn) : req(std::move(reqIn)), callback(callbackIn) {} }; namespace http = boost::beast::http; class ConnectionInfo : public std::enable_shared_from_this { private: ConnState state = ConnState::initialized; uint32_t retryCount = 0; std::string subId; std::shared_ptr connPolicy; boost::urls::url host; uint32_t connId; // Data buffers http::request req; using parser_type = http::response_parser; std::optional parser; boost::beast::flat_static_buffer buffer; Response res; // Ascync callables std::function callback; boost::asio::io_context& ioc; using Resolver = std::conditional_t; Resolver resolver; boost::asio::ip::tcp::socket conn; std::optional> sslConn; boost::asio::steady_timer timer; friend class ConnectionPool; void doResolve() { state = ConnState::resolveInProgress; BMCWEB_LOG_DEBUG("Trying to resolve: {}, id: {}", host, connId); resolver.async_resolve(host.encoded_host_address(), host.port(), std::bind_front(&ConnectionInfo::afterResolve, this, shared_from_this())); } void afterResolve(const std::shared_ptr& /*self*/, const boost::system::error_code& ec, const Resolver::results_type& endpointList) { if (ec || (endpointList.empty())) { BMCWEB_LOG_ERROR("Resolve failed: {} {}", ec.message(), host); state = ConnState::resolveFailed; waitAndRetry(); return; } BMCWEB_LOG_DEBUG("Resolved {}, id: {}", host, connId); state = ConnState::connectInProgress; BMCWEB_LOG_DEBUG("Trying to connect to: {}, id: {}", host, connId); timer.expires_after(std::chrono::seconds(30)); timer.async_wait(std::bind_front(onTimeout, weak_from_this())); boost::asio::async_connect( conn, endpointList, std::bind_front(&ConnectionInfo::afterConnect, this, shared_from_this())); } void afterConnect(const std::shared_ptr& /*self*/, const boost::beast::error_code& ec, const boost::asio::ip::tcp::endpoint& endpoint) { // The operation already timed out. We don't want do continue down // this branch if (ec && ec == boost::asio::error::operation_aborted) { return; } timer.cancel(); if (ec) { BMCWEB_LOG_ERROR("Connect {}:{}, id: {} failed: {}", endpoint.address().to_string(), endpoint.port(), connId, ec.message()); state = ConnState::connectFailed; waitAndRetry(); return; } BMCWEB_LOG_DEBUG("Connected to: {}:{}, id: {}", endpoint.address().to_string(), endpoint.port(), connId); if (sslConn) { doSslHandshake(); return; } state = ConnState::connected; sendMessage(); } void doSslHandshake() { if (!sslConn) { return; } state = ConnState::handshakeInProgress; timer.expires_after(std::chrono::seconds(30)); timer.async_wait(std::bind_front(onTimeout, weak_from_this())); sslConn->async_handshake( boost::asio::ssl::stream_base::client, std::bind_front(&ConnectionInfo::afterSslHandshake, this, shared_from_this())); } void afterSslHandshake(const std::shared_ptr& /*self*/, const boost::beast::error_code& ec) { // The operation already timed out. We don't want do continue down // this branch if (ec && ec == boost::asio::error::operation_aborted) { return; } timer.cancel(); if (ec) { BMCWEB_LOG_ERROR("SSL Handshake failed - id: {} error: {}", connId, ec.message()); state = ConnState::handshakeFailed; waitAndRetry(); return; } BMCWEB_LOG_DEBUG("SSL Handshake successful - id: {}", connId); state = ConnState::connected; sendMessage(); } void sendMessage() { state = ConnState::sendInProgress; // Set a timeout on the operation timer.expires_after(std::chrono::seconds(30)); timer.async_wait(std::bind_front(onTimeout, weak_from_this())); boost::beast::http::message_generator messageGenerator(std::move(req)); // Send the HTTP request to the remote host if (sslConn) { boost::beast::async_write( *sslConn, std::move(messageGenerator), std::bind_front(&ConnectionInfo::afterWrite, this, shared_from_this())); } else { boost::beast::async_write( conn, std::move(messageGenerator), std::bind_front(&ConnectionInfo::afterWrite, this, shared_from_this())); } } void afterWrite(const std::shared_ptr& /*self*/, const boost::beast::error_code& ec, size_t bytesTransferred) { // The operation already timed out. We don't want do continue down // this branch if (ec && ec == boost::asio::error::operation_aborted) { return; } timer.cancel(); if (ec) { BMCWEB_LOG_ERROR("sendMessage() failed: {} {}", ec.message(), host); state = ConnState::sendFailed; waitAndRetry(); return; } BMCWEB_LOG_DEBUG("sendMessage() bytes transferred: {}", bytesTransferred); recvMessage(); } void recvMessage() { state = ConnState::recvInProgress; parser_type& thisParser = parser.emplace(std::piecewise_construct, std::make_tuple()); thisParser.body_limit(connPolicy->requestByteLimit); timer.expires_after(std::chrono::seconds(30)); timer.async_wait(std::bind_front(onTimeout, weak_from_this())); // Receive the HTTP response if (sslConn) { boost::beast::http::async_read( *sslConn, buffer, thisParser, std::bind_front(&ConnectionInfo::afterRead, this, shared_from_this())); } else { boost::beast::http::async_read( conn, buffer, thisParser, std::bind_front(&ConnectionInfo::afterRead, this, shared_from_this())); } } void afterRead(const std::shared_ptr& /*self*/, const boost::beast::error_code& ec, const std::size_t& bytesTransferred) { // The operation already timed out. We don't want do continue down // this branch if (ec && ec == boost::asio::error::operation_aborted) { return; } timer.cancel(); if (ec && ec != boost::asio::ssl::error::stream_truncated) { BMCWEB_LOG_ERROR("recvMessage() failed: {} from {}", ec.message(), host); state = ConnState::recvFailed; waitAndRetry(); return; } BMCWEB_LOG_DEBUG("recvMessage() bytes transferred: {}", bytesTransferred); if (!parser) { return; } BMCWEB_LOG_DEBUG("recvMessage() data: {}", parser->get().body().str()); unsigned int respCode = parser->get().result_int(); BMCWEB_LOG_DEBUG("recvMessage() Header Response Code: {}", respCode); // Handle the case of stream_truncated. Some servers close the ssl // connection uncleanly, so check to see if we got a full response // before we handle this as an error. if (!parser->is_done()) { state = ConnState::recvFailed; waitAndRetry(); return; } // Make sure the received response code is valid as defined by // the associated retry policy if (connPolicy->invalidResp(respCode)) { // The listener failed to receive the Sent-Event BMCWEB_LOG_ERROR( "recvMessage() Listener Failed to " "receive Sent-Event. Header Response Code: {} from {}", respCode, host); state = ConnState::recvFailed; waitAndRetry(); return; } // Send is successful // Reset the counter just in case this was after retrying retryCount = 0; // Keep the connection alive if server supports it // Else close the connection BMCWEB_LOG_DEBUG("recvMessage() keepalive : {}", parser->keep_alive()); // Copy the response into a Response object so that it can be // processed by the callback function. res.response = parser->release(); callback(parser->keep_alive(), connId, res); res.clear(); } static void onTimeout(const std::weak_ptr& weakSelf, const boost::system::error_code& ec) { if (ec == boost::asio::error::operation_aborted) { BMCWEB_LOG_DEBUG( "async_wait failed since the operation is aborted"); return; } if (ec) { BMCWEB_LOG_ERROR("async_wait failed: {}", ec.message()); // If the timer fails, we need to close the socket anyway, same // as if it expired. } std::shared_ptr self = weakSelf.lock(); if (self == nullptr) { return; } self->waitAndRetry(); } void waitAndRetry() { if ((retryCount >= connPolicy->maxRetryAttempts) || (state == ConnState::sslInitFailed)) { BMCWEB_LOG_ERROR("Maximum number of retries reached. {}", host); BMCWEB_LOG_DEBUG("Retry policy: {}", connPolicy->retryPolicyAction); if (connPolicy->retryPolicyAction == "TerminateAfterRetries") { // TODO: delete subscription state = ConnState::terminated; } if (connPolicy->retryPolicyAction == "SuspendRetries") { state = ConnState::suspended; } // We want to return a 502 to indicate there was an error with // the external server res.result(boost::beast::http::status::bad_gateway); callback(false, connId, res); res.clear(); // Reset the retrycount to zero so that client can try // connecting again if needed retryCount = 0; return; } retryCount++; BMCWEB_LOG_DEBUG("Attempt retry after {} seconds. RetryCount = {}", connPolicy->retryIntervalSecs.count(), retryCount); timer.expires_after(connPolicy->retryIntervalSecs); timer.async_wait(std::bind_front(&ConnectionInfo::onTimerDone, this, shared_from_this())); } void onTimerDone(const std::shared_ptr& /*self*/, const boost::system::error_code& ec) { if (ec == boost::asio::error::operation_aborted) { BMCWEB_LOG_DEBUG( "async_wait failed since the operation is aborted{}", ec.message()); } else if (ec) { BMCWEB_LOG_ERROR("async_wait failed: {}", ec.message()); // Ignore the error and continue the retry loop to attempt // sending the event as per the retry policy } // Let's close the connection and restart from resolve. shutdownConn(true); } void restartConnection() { BMCWEB_LOG_DEBUG("{}, id: {} restartConnection", host, std::to_string(connId)); initializeConnection(host.scheme() == "https"); doResolve(); } void shutdownConn(bool retry) { boost::beast::error_code ec; conn.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); conn.close(); // not_connected happens sometimes so don't bother reporting it. if (ec && ec != boost::beast::errc::not_connected) { BMCWEB_LOG_ERROR("{}, id: {} shutdown failed: {}", host, connId, ec.message()); } else { BMCWEB_LOG_DEBUG("{}, id: {} closed gracefully", host, connId); } if (retry) { // Now let's try to resend the data state = ConnState::retry; restartConnection(); } else { state = ConnState::closed; } } void doClose(bool retry = false) { if (!sslConn) { shutdownConn(retry); return; } sslConn->async_shutdown( std::bind_front(&ConnectionInfo::afterSslShutdown, this, shared_from_this(), retry)); } void afterSslShutdown(const std::shared_ptr& /*self*/, bool retry, const boost::system::error_code& ec) { if (ec) { BMCWEB_LOG_ERROR("{}, id: {} shutdown failed: {}", host, connId, ec.message()); } else { BMCWEB_LOG_DEBUG("{}, id: {} closed gracefully", host, connId); } shutdownConn(retry); } void setCipherSuiteTLSext() { if (!sslConn) { return; } if (host.host_type() != boost::urls::host_type::name) { // Avoid setting SNI hostname if its IP address return; } // Create a null terminated string for SSL std::string hostname(host.encoded_host_address()); // NOTE: The SSL_set_tlsext_host_name is defined in tlsv1.h header // file but its having old style casting (name is cast to void*). // Since bmcweb compiler treats all old-style-cast as error, its // causing the build failure. So replaced the same macro inline and // did corrected the code by doing static_cast to viod*. This has to // be fixed in openssl library in long run. Set SNI Hostname (many // hosts need this to handshake successfully) if (SSL_ctrl(sslConn->native_handle(), SSL_CTRL_SET_TLSEXT_HOSTNAME, TLSEXT_NAMETYPE_host_name, static_cast(hostname.data())) == 0) { boost::beast::error_code ec{static_cast(::ERR_get_error()), boost::asio::error::get_ssl_category()}; BMCWEB_LOG_ERROR("SSL_set_tlsext_host_name {}, id: {} failed: {}", host, connId, ec.message()); // Set state as sslInit failed so that we close the connection // and take appropriate action as per retry configuration. state = ConnState::sslInitFailed; waitAndRetry(); return; } } void initializeConnection(bool ssl) { conn = boost::asio::ip::tcp::socket(ioc); if (ssl) { std::optional sslCtx = ensuressl::getSSLClientContext(); if (!sslCtx) { BMCWEB_LOG_ERROR("prepareSSLContext failed - {}, id: {}", host, connId); // Don't retry if failure occurs while preparing SSL context // such as certificate is invalid or set cipher failure or // set host name failure etc... Setting conn state to // sslInitFailed and connection state will be transitioned // to next state depending on retry policy set by // subscription. state = ConnState::sslInitFailed; waitAndRetry(); return; } sslConn.emplace(conn, *sslCtx); setCipherSuiteTLSext(); } } public: explicit ConnectionInfo( boost::asio::io_context& iocIn, const std::string& idIn, const std::shared_ptr& connPolicyIn, const boost::urls::url_view_base& hostIn, unsigned int connIdIn) : subId(idIn), connPolicy(connPolicyIn), host(hostIn), connId(connIdIn), ioc(iocIn), resolver(iocIn), conn(iocIn), timer(iocIn) { initializeConnection(host.scheme() == "https"); } }; class ConnectionPool : public std::enable_shared_from_this { private: boost::asio::io_context& ioc; std::string id; std::shared_ptr connPolicy; boost::urls::url destIP; std::vector> connections; boost::container::devector requestQueue; friend class HttpClient; // Configure a connections's request, callback, and retry info in // preparation to begin sending the request void setConnProps(ConnectionInfo& conn) { if (requestQueue.empty()) { BMCWEB_LOG_ERROR( "setConnProps() should not have been called when requestQueue is empty"); return; } PendingRequest& nextReq = requestQueue.front(); conn.req = std::move(nextReq.req); conn.callback = std::move(nextReq.callback); BMCWEB_LOG_DEBUG("Setting properties for connection {}, id: {}", conn.host, conn.connId); // We can remove the request from the queue at this point requestQueue.pop_front(); } // Gets called as part of callback after request is sent // Reuses the connection if there are any requests waiting to be sent // Otherwise closes the connection if it is not a keep-alive void sendNext(bool keepAlive, uint32_t connId) { auto conn = connections[connId]; // Allow the connection's handler to be deleted // This is needed because of Redfish Aggregation passing an // AsyncResponse shared_ptr to this callback conn->callback = nullptr; // Reuse the connection to send the next request in the queue if (!requestQueue.empty()) { BMCWEB_LOG_DEBUG( "{} requests remaining in queue for {}, reusing connection {}", requestQueue.size(), destIP, connId); setConnProps(*conn); if (keepAlive) { conn->sendMessage(); } else { // Server is not keep-alive enabled so we need to close the // connection and then start over from resolve conn->doClose(); conn->doResolve(); } return; } // No more messages to send so close the connection if necessary if (keepAlive) { conn->state = ConnState::idle; } else { // Abort the connection since server is not keep-alive enabled conn->state = ConnState::abortConnection; conn->doClose(); } } void sendData(std::string&& data, const boost::urls::url_view_base& destUri, const boost::beast::http::fields& httpHeader, const boost::beast::http::verb verb, const std::function& resHandler) { // Construct the request to be sent boost::beast::http::request thisReq( verb, destUri.encoded_target(), 11, "", httpHeader); thisReq.set(boost::beast::http::field::host, destUri.encoded_host_address()); thisReq.keep_alive(true); thisReq.body().str() = std::move(data); thisReq.prepare_payload(); auto cb = std::bind_front(&ConnectionPool::afterSendData, weak_from_this(), resHandler); // Reuse an existing connection if one is available for (unsigned int i = 0; i < connections.size(); i++) { auto conn = connections[i]; if ((conn->state == ConnState::idle) || (conn->state == ConnState::initialized) || (conn->state == ConnState::closed)) { conn->req = std::move(thisReq); conn->callback = std::move(cb); std::string commonMsg = std::format("{} from pool {}", i, id); if (conn->state == ConnState::idle) { BMCWEB_LOG_DEBUG("Grabbing idle connection {}", commonMsg); conn->sendMessage(); } else { BMCWEB_LOG_DEBUG("Reusing existing connection {}", commonMsg); conn->doResolve(); } return; } } // All connections in use so create a new connection or add request // to the queue if (connections.size() < connPolicy->maxConnections) { BMCWEB_LOG_DEBUG("Adding new connection to pool {}", id); auto conn = addConnection(); conn->req = std::move(thisReq); conn->callback = std::move(cb); conn->doResolve(); } else if (requestQueue.size() < maxRequestQueueSize) { BMCWEB_LOG_DEBUG("Max pool size reached. Adding data to queue {}", id); requestQueue.emplace_back(std::move(thisReq), std::move(cb)); } else { // If we can't buffer the request then we should let the // callback handle a 429 Too Many Requests dummy response BMCWEB_LOG_ERROR("{} request queue full. Dropping request.", id); Response dummyRes; dummyRes.result(boost::beast::http::status::too_many_requests); resHandler(dummyRes); } } // Callback to be called once the request has been sent static void afterSendData(const std::weak_ptr& weakSelf, const std::function& resHandler, bool keepAlive, uint32_t connId, Response& res) { // Allow provided callback to perform additional processing of the // request resHandler(res); // If requests remain in the queue then we want to reuse this // connection to send the next request std::shared_ptr self = weakSelf.lock(); if (!self) { BMCWEB_LOG_CRITICAL("{} Failed to capture connection", logPtr(self.get())); return; } self->sendNext(keepAlive, connId); } std::shared_ptr& addConnection() { unsigned int newId = static_cast(connections.size()); auto& ret = connections.emplace_back(std::make_shared( ioc, id, connPolicy, destIP, newId)); BMCWEB_LOG_DEBUG("Added connection {} to pool {}", connections.size() - 1, id); return ret; } public: explicit ConnectionPool( boost::asio::io_context& iocIn, const std::string& idIn, const std::shared_ptr& connPolicyIn, const boost::urls::url_view_base& destIPIn) : ioc(iocIn), id(idIn), connPolicy(connPolicyIn), destIP(destIPIn) { BMCWEB_LOG_DEBUG("Initializing connection pool for {}", id); // Initialize the pool with a single connection addConnection(); } }; class HttpClient { private: std::unordered_map> connectionPools; boost::asio::io_context& ioc; std::shared_ptr connPolicy; // Used as a dummy callback by sendData() in order to call // sendDataWithCallback() static void genericResHandler(const Response& res) { BMCWEB_LOG_DEBUG("Response handled with return code: {}", res.resultInt()); } public: HttpClient() = delete; explicit HttpClient(boost::asio::io_context& iocIn, const std::shared_ptr& connPolicyIn) : ioc(iocIn), connPolicy(connPolicyIn) {} HttpClient(const HttpClient&) = delete; HttpClient& operator=(const HttpClient&) = delete; HttpClient(HttpClient&&) = delete; HttpClient& operator=(HttpClient&&) = delete; ~HttpClient() = default; // Send a request to destIP where additional processing of the // result is not required void sendData(std::string&& data, const boost::urls::url_view_base& destUri, const boost::beast::http::fields& httpHeader, const boost::beast::http::verb verb) { const std::function cb = genericResHandler; sendDataWithCallback(std::move(data), destUri, httpHeader, verb, cb); } // Send request to destIP and use the provided callback to // handle the response void sendDataWithCallback(std::string&& data, const boost::urls::url_view_base& destUrl, const boost::beast::http::fields& httpHeader, const boost::beast::http::verb verb, const std::function& resHandler) { std::string clientKey = std::format("{}://{}", destUrl.scheme(), destUrl.encoded_host_and_port()); auto pool = connectionPools.try_emplace(clientKey); if (pool.first->second == nullptr) { pool.first->second = std::make_shared( ioc, clientKey, connPolicy, destUrl); } // Send the data using either the existing connection pool or the // newly created connection pool pool.first->second->sendData(std::move(data), destUrl, httpHeader, verb, resHandler); } }; } // namespace crow