Skip to content

Commit 317b3e2

Browse files
authored
feat: Forwarding metrics (#2128)
Port of #2096 and #2103 into `release/2.4.1`.
2 parents 099052a + 7eff1e6 commit 317b3e2

File tree

3 files changed

+117
-1
lines changed

3 files changed

+117
-1
lines changed

src/etl/LoadBalancer.cpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
#include "rpc/Errors.hpp"
2929
#include "util/Assert.hpp"
3030
#include "util/CoroutineGroup.hpp"
31+
#include "util/Profiler.hpp"
3132
#include "util/Random.hpp"
3233
#include "util/ResponseExpirationCache.hpp"
3334
#include "util/log/Logger.hpp"
3435
#include "util/newconfig/ArrayView.hpp"
3536
#include "util/newconfig/ConfigDefinition.hpp"
3637
#include "util/newconfig/ObjectView.hpp"
38+
#include "util/prometheus/Label.hpp"
39+
#include "util/prometheus/Prometheus.hpp"
3740

3841
#include <boost/asio/io_context.hpp>
3942
#include <boost/asio/spawn.hpp>
@@ -57,6 +60,7 @@
5760
#include <vector>
5861

5962
using namespace util::config;
63+
using util::prometheus::Labels;
6064

6165
namespace etl {
6266

@@ -83,6 +87,34 @@ LoadBalancer::LoadBalancer(
8387
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
8488
SourceFactory sourceFactory
8589
)
90+
: forwardingCounters_{
91+
.successDuration = PrometheusService::counterInt(
92+
"forwarding_duration_milliseconds_counter",
93+
Labels({util::prometheus::Label{"status", "success"}}),
94+
"The duration of processing successful forwarded requests"
95+
),
96+
.failDuration = PrometheusService::counterInt(
97+
"forwarding_duration_milliseconds_counter",
98+
Labels({util::prometheus::Label{"status", "fail"}}),
99+
"The duration of processing failed forwarded requests"
100+
),
101+
.retries = PrometheusService::counterInt(
102+
"forwarding_retries_counter",
103+
Labels(),
104+
"The number of retries before a forwarded request was successful. Initial attempt excluded"
105+
),
106+
.cacheHit = PrometheusService::counterInt(
107+
"forwarding_cache_hit_counter",
108+
Labels(),
109+
"The number of requests that we served from the cache"
110+
),
111+
.cacheMiss = PrometheusService::counterInt(
112+
"forwarding_cache_miss_counter",
113+
Labels(),
114+
"The number of requests that were not served from the cache"
115+
)
116+
}
117+
86118
{
87119
auto const forwardingCacheTimeout = config.get<float>("forwarding.cache_timeout");
88120
if (forwardingCacheTimeout > 0.f) {
@@ -241,9 +273,11 @@ LoadBalancer::forwardToRippled(
241273
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
242274
if (forwardingCache_) {
243275
if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) {
276+
++forwardingCounters_.cacheHit.get();
244277
return std::move(cachedResponse).value();
245278
}
246279
}
280+
++forwardingCounters_.cacheMiss.get();
247281

248282
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
249283
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
@@ -255,11 +289,15 @@ LoadBalancer::forwardToRippled(
255289
std::optional<boost::json::object> response;
256290
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
257291
while (numAttempts < sources_.size()) {
258-
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
292+
auto [res, duration] =
293+
util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
259294
if (res) {
295+
forwardingCounters_.successDuration.get() += duration;
260296
response = std::move(res).value();
261297
break;
262298
}
299+
forwardingCounters_.failDuration.get() += duration;
300+
++forwardingCounters_.retries.get();
263301
error = std::max(error, res.error()); // Choose the best result between all sources
264302

265303
sourceIdx = (sourceIdx + 1) % sources_.size();

src/etl/LoadBalancer.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "util/ResponseExpirationCache.hpp"
3333
#include "util/log/Logger.hpp"
3434
#include "util/newconfig/ConfigDefinition.hpp"
35+
#include "util/prometheus/Counter.hpp"
3536

3637
#include <boost/asio.hpp>
3738
#include <boost/asio/io_context.hpp>
@@ -44,8 +45,10 @@
4445
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
4546

4647
#include <chrono>
48+
#include <concepts>
4749
#include <cstdint>
4850
#include <expected>
51+
#include <functional>
4952
#include <memory>
5053
#include <optional>
5154
#include <string>
@@ -91,6 +94,14 @@ class LoadBalancer : public etlng::LoadBalancerInterface, LoadBalancerTag {
9194
std::uint32_t downloadRanges_ =
9295
kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
9396

97+
struct ForwardingCounters {
98+
std::reference_wrapper<util::prometheus::CounterInt> successDuration;
99+
std::reference_wrapper<util::prometheus::CounterInt> failDuration;
100+
std::reference_wrapper<util::prometheus::CounterInt> retries;
101+
std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
102+
std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
103+
} forwardingCounters_;
104+
94105
// Using mutext instead of atomic_bool because choosing a new source to
95106
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
96107
util::Mutex<bool> hasForwardingSource_{false};

tests/unit/etl/LoadBalancerTests.cpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "util/newconfig/ConfigFileJson.hpp"
3535
#include "util/newconfig/ConfigValue.hpp"
3636
#include "util/newconfig/Types.hpp"
37+
#include "util/prometheus/Counter.hpp"
3738

3839
#include <boost/asio/io_context.hpp>
3940
#include <boost/asio/spawn.hpp>
@@ -59,6 +60,7 @@
5960
using namespace etl;
6061
using namespace util::config;
6162
using testing::Return;
63+
using namespace util::prometheus;
6264

6365
constexpr static auto const kTWO_SOURCES_LEDGER_RESPONSE = R"({
6466
"etl_sources": [
@@ -641,6 +643,71 @@ TEST_F(LoadBalancerForwardToRippledTests, source0Fails)
641643
});
642644
}
643645

646+
struct LoadBalancerForwardToRippledPrometheusTests : LoadBalancerForwardToRippledTests, WithMockPrometheus {};
647+
648+
TEST_F(LoadBalancerForwardToRippledPrometheusTests, forwardingCacheEnabled)
649+
{
650+
configJson_.as_object()["forwarding"] = boost::json::object{{"cache_timeout", 10.}};
651+
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
652+
auto loadBalancer = makeLoadBalancer();
653+
654+
auto const request = boost::json::object{{"command", "server_info"}};
655+
656+
auto& cacheHitCounter = makeMock<util::prometheus::CounterInt>("forwarding_cache_hit_counter", "");
657+
auto& cacheMissCounter = makeMock<CounterInt>("forwarding_cache_miss_counter", "");
658+
auto& successDurationCounter =
659+
makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"success\"}");
660+
661+
EXPECT_CALL(cacheMissCounter, add(1));
662+
EXPECT_CALL(cacheHitCounter, add(1)).Times(3);
663+
EXPECT_CALL(successDurationCounter, add(testing::_));
664+
665+
EXPECT_CALL(
666+
sourceFactory_.sourceAt(0),
667+
forwardToRippled(request, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
668+
)
669+
.WillOnce(Return(response_));
670+
671+
runSpawn([&](boost::asio::yield_context yield) {
672+
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
673+
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
674+
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
675+
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
676+
});
677+
}
678+
679+
TEST_F(LoadBalancerForwardToRippledPrometheusTests, source0Fails)
680+
{
681+
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
682+
auto loadBalancer = makeLoadBalancer();
683+
684+
auto& cacheMissCounter = makeMock<CounterInt>("forwarding_cache_miss_counter", "");
685+
auto& retriesCounter = makeMock<CounterInt>("forwarding_retries_counter", "");
686+
auto& successDurationCounter =
687+
makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"success\"}");
688+
auto& failDurationCounter = makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"fail\"}");
689+
690+
EXPECT_CALL(cacheMissCounter, add(1));
691+
EXPECT_CALL(retriesCounter, add(1));
692+
EXPECT_CALL(successDurationCounter, add(testing::_));
693+
EXPECT_CALL(failDurationCounter, add(testing::_));
694+
695+
EXPECT_CALL(
696+
sourceFactory_.sourceAt(0),
697+
forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
698+
)
699+
.WillOnce(Return(std::unexpected{rpc::ClioError::EtlConnectionError}));
700+
EXPECT_CALL(
701+
sourceFactory_.sourceAt(1),
702+
forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
703+
)
704+
.WillOnce(Return(response_));
705+
706+
runSpawn([&](boost::asio::yield_context yield) {
707+
EXPECT_EQ(loadBalancer->forwardToRippled(request_, clientIP_, false, yield), response_);
708+
});
709+
}
710+
644711
struct LoadBalancerForwardToRippledErrorTestBundle {
645712
std::string testName;
646713
rpc::ClioError firstSourceError;

0 commit comments

Comments
 (0)