Skip to content

Commit da76907

Browse files
authored
feat: server info cache (#1671)
fix: #1181
1 parent 1b42466 commit da76907

14 files changed

+700
-267
lines changed

docs/examples/config/example-config.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
"cache_timeout": 0.250, // in seconds, could be 0, which means no cache
4040
"request_timeout": 10.0 // time for Clio to wait for rippled to reply on a forwarded request (default is 10 seconds)
4141
},
42+
"rpc": {
43+
"cache_timeout": 0.5 // in seconds, could be 0, which means no cache for rpc
44+
}
4245
"dos_guard": {
4346
// Comma-separated list of IPs to exclude from rate limiting
4447
"whitelist": [

src/app/ClioApplication.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,14 @@ ClioApplication::run()
121121
auto const handlerProvider = std::make_shared<rpc::impl::ProductionHandlerProvider const>(
122122
config_, backend, subscriptions, balancer, etl, amendmentCenter, counters
123123
);
124+
125+
using RPCEngineType = rpc::RPCEngine<etl::LoadBalancer, rpc::Counters>;
124126
auto const rpcEngine =
125-
rpc::RPCEngine::make_RPCEngine(backend, balancer, dosGuard, workQueue, counters, handlerProvider);
127+
RPCEngineType::make_RPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
126128

127129
// Init the web server
128130
auto handler =
129-
std::make_shared<web::RPCServerHandler<rpc::RPCEngine, etl::ETLService>>(config_, backend, rpcEngine, etl);
131+
std::make_shared<web::RPCServerHandler<RPCEngineType, etl::ETLService>>(config_, backend, rpcEngine, etl);
130132
auto const httpServer = web::make_HttpServer(config_, ioc, dosGuard, handler);
131133

132134
// Blocks until stopped.

src/etl/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ target_sources(
1111
NFTHelpers.cpp
1212
Source.cpp
1313
impl/AmendmentBlockHandler.cpp
14-
impl/ForwardingCache.cpp
1514
impl/ForwardingSource.cpp
1615
impl/GrpcSource.cpp
1716
impl/SubscriptionSource.cpp

src/etl/LoadBalancer.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
#include "rpc/Errors.hpp"
2828
#include "util/Assert.hpp"
2929
#include "util/Random.hpp"
30+
#include "util/ResponseExpirationCache.hpp"
3031
#include "util/log/Logger.hpp"
3132

3233
#include <boost/asio/io_context.hpp>
3334
#include <boost/asio/spawn.hpp>
3435
#include <boost/json/array.hpp>
3536
#include <boost/json/object.hpp>
3637
#include <boost/json/value.hpp>
38+
#include <boost/json/value_to.hpp>
3739
#include <fmt/core.h>
3840

3941
#include <algorithm>
@@ -79,7 +81,10 @@ LoadBalancer::LoadBalancer(
7981
{
8082
auto const forwardingCacheTimeout = config.valueOr<float>("forwarding.cache_timeout", 0.f);
8183
if (forwardingCacheTimeout > 0.f) {
82-
forwardingCache_ = impl::ForwardingCache{Config::toMilliseconds(forwardingCacheTimeout)};
84+
forwardingCache_ = util::ResponseExpirationCache{
85+
Config::toMilliseconds(forwardingCacheTimeout),
86+
{"server_info", "server_state", "server_definitions", "fee", "ledger_closed"}
87+
};
8388
}
8489

8590
static constexpr std::uint32_t MAX_DOWNLOAD = 256;
@@ -224,8 +229,12 @@ LoadBalancer::forwardToRippled(
224229
boost::asio::yield_context yield
225230
)
226231
{
232+
if (not request.contains("command"))
233+
return std::unexpected{rpc::ClioError::rpcCOMMAND_IS_MISSING};
234+
235+
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
227236
if (forwardingCache_) {
228-
if (auto cachedResponse = forwardingCache_->get(request); cachedResponse) {
237+
if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) {
229238
return std::move(cachedResponse).value();
230239
}
231240
}
@@ -253,7 +262,7 @@ LoadBalancer::forwardToRippled(
253262

254263
if (response) {
255264
if (forwardingCache_ and not response->contains("error"))
256-
forwardingCache_->put(request, *response);
265+
forwardingCache_->put(cmd, *response);
257266
return std::move(response).value();
258267
}
259268

src/etl/LoadBalancer.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
#include "etl/ETLState.hpp"
2424
#include "etl/NetworkValidatedLedgersInterface.hpp"
2525
#include "etl/Source.hpp"
26-
#include "etl/impl/ForwardingCache.hpp"
2726
#include "feed/SubscriptionManagerInterface.hpp"
2827
#include "util/Mutex.hpp"
28+
#include "util/ResponseExpirationCache.hpp"
2929
#include "util/config/Config.hpp"
3030
#include "util/log/Logger.hpp"
3131

@@ -68,7 +68,7 @@ class LoadBalancer {
6868

6969
util::Logger log_{"ETL"};
7070
// Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache
71-
std::optional<impl::ForwardingCache> forwardingCache_;
71+
std::optional<util::ResponseExpirationCache> forwardingCache_;
7272
std::optional<std::string> forwardingXUserValue_;
7373

7474
std::vector<SourcePtr> sources_;

src/rpc/RPCEngine.hpp

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,22 @@
2020
#pragma once
2121

2222
#include "data/BackendInterface.hpp"
23-
#include "rpc/Counters.hpp"
2423
#include "rpc/Errors.hpp"
2524
#include "rpc/RPCHelpers.hpp"
2625
#include "rpc/WorkQueue.hpp"
2726
#include "rpc/common/HandlerProvider.hpp"
2827
#include "rpc/common/Types.hpp"
2928
#include "rpc/common/impl/ForwardingProxy.hpp"
29+
#include "util/ResponseExpirationCache.hpp"
3030
#include "util/log/Logger.hpp"
3131
#include "web/Context.hpp"
3232
#include "web/dosguard/DOSGuardInterface.hpp"
3333

3434
#include <boost/asio/spawn.hpp>
35+
#include <boost/iterator/transform_iterator.hpp>
3536
#include <boost/json.hpp>
3637
#include <fmt/core.h>
38+
#include <fmt/format.h>
3739
#include <xrpl/protocol/ErrorCodes.h>
3840

3941
#include <chrono>
@@ -42,14 +44,9 @@
4244
#include <memory>
4345
#include <optional>
4446
#include <string>
47+
#include <unordered_set>
4548
#include <utility>
4649

47-
// forward declarations
48-
namespace etl {
49-
class LoadBalancer;
50-
class ETLService;
51-
} // namespace etl
52-
5350
/**
5451
* @brief This namespace contains all the RPC logic and handlers.
5552
*/
@@ -58,23 +55,27 @@ namespace rpc {
5855
/**
5956
* @brief The RPC engine that ties all RPC-related functionality together.
6057
*/
58+
template <typename LoadBalancerType, typename CountersType>
6159
class RPCEngine {
6260
util::Logger perfLog_{"Performance"};
6361
util::Logger log_{"RPC"};
6462

6563
std::shared_ptr<BackendInterface> backend_;
6664
std::reference_wrapper<web::dosguard::DOSGuardInterface const> dosGuard_;
6765
std::reference_wrapper<WorkQueue> workQueue_;
68-
std::reference_wrapper<Counters> counters_;
66+
std::reference_wrapper<CountersType> counters_;
6967

7068
std::shared_ptr<HandlerProvider const> handlerProvider_;
7169

72-
impl::ForwardingProxy<etl::LoadBalancer, Counters, HandlerProvider> forwardingProxy_;
70+
impl::ForwardingProxy<LoadBalancerType, CountersType, HandlerProvider> forwardingProxy_;
71+
72+
std::optional<util::ResponseExpirationCache> responseCache_;
7373

7474
public:
7575
/**
7676
* @brief Construct a new RPCEngine object
7777
*
78+
* @param config The config to use
7879
* @param backend The backend to use
7980
* @param balancer The load balancer to use
8081
* @param dosGuard The DOS guard to use
@@ -83,11 +84,12 @@ class RPCEngine {
8384
* @param handlerProvider The handler provider to use
8485
*/
8586
RPCEngine(
87+
util::Config const& config,
8688
std::shared_ptr<BackendInterface> const& backend,
87-
std::shared_ptr<etl::LoadBalancer> const& balancer,
89+
std::shared_ptr<LoadBalancerType> const& balancer,
8890
web::dosguard::DOSGuardInterface const& dosGuard,
8991
WorkQueue& workQueue,
90-
Counters& counters,
92+
CountersType& counters,
9193
std::shared_ptr<HandlerProvider const> const& handlerProvider
9294
)
9395
: backend_{backend}
@@ -97,11 +99,22 @@ class RPCEngine {
9799
, handlerProvider_{handlerProvider}
98100
, forwardingProxy_{balancer, counters, handlerProvider}
99101
{
102+
// Let main thread catch the exception if config type is wrong
103+
auto const cacheTimeout = config.valueOr<float>("rpc.cache_timeout", 0.f);
104+
105+
if (cacheTimeout > 0.f) {
106+
LOG(log_.info()) << fmt::format("Init RPC Cache, timeout: {} seconds", cacheTimeout);
107+
108+
responseCache_.emplace(
109+
util::Config::toMilliseconds(cacheTimeout), std::unordered_set<std::string>{"server_info"}
110+
);
111+
}
100112
}
101113

102114
/**
103115
* @brief Factory function to create a new instance of the RPC engine.
104116
*
117+
* @param config The config to use
105118
* @param backend The backend to use
106119
* @param balancer The load balancer to use
107120
* @param dosGuard The DOS guard to use
@@ -112,15 +125,16 @@ class RPCEngine {
112125
*/
113126
static std::shared_ptr<RPCEngine>
114127
make_RPCEngine(
128+
util::Config const& config,
115129
std::shared_ptr<BackendInterface> const& backend,
116-
std::shared_ptr<etl::LoadBalancer> const& balancer,
130+
std::shared_ptr<LoadBalancerType> const& balancer,
117131
web::dosguard::DOSGuardInterface const& dosGuard,
118132
WorkQueue& workQueue,
119-
Counters& counters,
133+
CountersType& counters,
120134
std::shared_ptr<HandlerProvider const> const& handlerProvider
121135
)
122136
{
123-
return std::make_shared<RPCEngine>(backend, balancer, dosGuard, workQueue, counters, handlerProvider);
137+
return std::make_shared<RPCEngine>(config, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
124138
}
125139

126140
/**
@@ -140,6 +154,11 @@ class RPCEngine {
140154
return forwardingProxy_.forward(ctx);
141155
}
142156

157+
if (not ctx.isAdmin and responseCache_) {
158+
if (auto res = responseCache_->get(ctx.method); res.has_value())
159+
return Result{std::move(res).value()};
160+
}
161+
143162
if (backend_->isTooBusy()) {
144163
LOG(log_.error()) << "Database is too busy. Rejecting request";
145164
notifyTooBusy(); // TODO: should we add ctx.method if we have it?
@@ -160,8 +179,11 @@ class RPCEngine {
160179

161180
LOG(perfLog_.debug()) << ctx.tag() << " finish executing rpc `" << ctx.method << '`';
162181

163-
if (not v)
182+
if (not v) {
164183
notifyErrored(ctx.method);
184+
} else if (not ctx.isAdmin and responseCache_) {
185+
responseCache_->put(ctx.method, v.result->as_object());
186+
}
165187

166188
return Result{std::move(v)};
167189
} catch (data::DatabaseTimeout const& t) {

src/util/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ target_sources(
1919
requests/Types.cpp
2020
requests/WsConnection.cpp
2121
requests/impl/SslContext.cpp
22+
ResponseExpirationCache.cpp
2223
SignalsHandler.cpp
2324
Taggable.cpp
2425
TerminationHandler.cpp
Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,88 +17,55 @@
1717
*/
1818
//==============================================================================
1919

20-
#include "etl/impl/ForwardingCache.hpp"
20+
#include "util/ResponseExpirationCache.hpp"
2121

2222
#include "util/Assert.hpp"
2323

2424
#include <boost/json/object.hpp>
25-
#include <boost/json/value_to.hpp>
2625

2726
#include <chrono>
2827
#include <mutex>
2928
#include <optional>
3029
#include <shared_mutex>
3130
#include <string>
32-
#include <unordered_set>
33-
#include <utility>
3431

35-
namespace etl::impl {
36-
37-
namespace {
38-
39-
std::optional<std::string>
40-
getCommand(boost::json::object const& request)
41-
{
42-
if (not request.contains("command")) {
43-
return std::nullopt;
44-
}
45-
return boost::json::value_to<std::string>(request.at("command"));
46-
}
47-
48-
} // namespace
32+
namespace util {
4933

5034
void
51-
CacheEntry::put(boost::json::object response)
35+
ResponseExpirationCache::Entry::put(boost::json::object response)
5236
{
5337
response_ = std::move(response);
5438
lastUpdated_ = std::chrono::steady_clock::now();
5539
}
5640

5741
std::optional<boost::json::object>
58-
CacheEntry::get() const
42+
ResponseExpirationCache::Entry::get() const
5943
{
6044
return response_;
6145
}
6246

6347
std::chrono::steady_clock::time_point
64-
CacheEntry::lastUpdated() const
48+
ResponseExpirationCache::Entry::lastUpdated() const
6549
{
6650
return lastUpdated_;
6751
}
6852

6953
void
70-
CacheEntry::invalidate()
54+
ResponseExpirationCache::Entry::invalidate()
7155
{
7256
response_.reset();
7357
}
7458

75-
std::unordered_set<std::string> const
76-
ForwardingCache::CACHEABLE_COMMANDS{"server_info", "server_state", "server_definitions", "fee", "ledger_closed"};
77-
78-
ForwardingCache::ForwardingCache(std::chrono::steady_clock::duration const cacheTimeout) : cacheTimeout_{cacheTimeout}
79-
{
80-
for (auto const& command : CACHEABLE_COMMANDS) {
81-
cache_.emplace(command, CacheEntry{});
82-
}
83-
}
84-
8559
bool
86-
ForwardingCache::shouldCache(boost::json::object const& request)
60+
ResponseExpirationCache::shouldCache(std::string const& cmd)
8761
{
88-
auto const command = getCommand(request);
89-
return command.has_value() and CACHEABLE_COMMANDS.contains(*command);
62+
return cache_.contains(cmd);
9063
}
9164

9265
std::optional<boost::json::object>
93-
ForwardingCache::get(boost::json::object const& request) const
66+
ResponseExpirationCache::get(std::string const& cmd) const
9467
{
95-
auto const command = getCommand(request);
96-
97-
if (not command.has_value()) {
98-
return std::nullopt;
99-
}
100-
101-
auto it = cache_.find(*command);
68+
auto it = cache_.find(cmd);
10269
if (it == cache_.end())
10370
return std::nullopt;
10471

@@ -110,25 +77,24 @@ ForwardingCache::get(boost::json::object const& request) const
11077
}
11178

11279
void
113-
ForwardingCache::put(boost::json::object const& request, boost::json::object const& response)
80+
ResponseExpirationCache::put(std::string const& cmd, boost::json::object const& response)
11481
{
115-
auto const command = getCommand(request);
116-
if (not command.has_value() or not shouldCache(request))
82+
if (not shouldCache(cmd))
11783
return;
11884

119-
ASSERT(cache_.contains(*command), "Command is not in the cache: {}", *command);
85+
ASSERT(cache_.contains(cmd), "Command is not in the cache: {}", cmd);
12086

121-
auto entry = cache_[*command].lock<std::unique_lock>();
87+
auto entry = cache_[cmd].lock<std::unique_lock>();
12288
entry->put(response);
12389
}
12490

12591
void
126-
ForwardingCache::invalidate()
92+
ResponseExpirationCache::invalidate()
12793
{
12894
for (auto& [_, entry] : cache_) {
12995
auto entryLock = entry.lock<std::unique_lock>();
13096
entryLock->invalidate();
13197
}
13298
}
13399

134-
} // namespace etl::impl
100+
} // namespace util

0 commit comments

Comments
 (0)