Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmake/Settings.cmake
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
set(COMPILER_FLAGS
-pedantic
-Wall
-Wcast-align
-Wdouble-promotion
-Wextra
-Werror
-Wextra
-Wformat=2
-Wimplicit-fallthrough
-Wmisleading-indentation
-Wno-narrowing
-Wno-deprecated-declarations
-Wno-dangling-else
-Wno-deprecated-declarations
-Wno-narrowing
-Wno-unused-but-set-variable
-Wnon-virtual-dtor
-Wnull-dereference
-Wold-style-cast
-pedantic
-Wpedantic
-Wunused
# FIXME: The following bunch are needed for gcc12 atm.
Expand Down
11 changes: 8 additions & 3 deletions src/etl/ETLService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "etlng/LoadBalancer.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/MonitorProvider.hpp"
#include "etlng/impl/TaskManagerProvider.hpp"
#include "etlng/impl/ext/Cache.hpp"
#include "etlng/impl/ext/Core.hpp"
Expand Down Expand Up @@ -86,13 +87,15 @@ ETLService::makeETLService(
);

auto state = std::make_shared<etl::SystemState>();
state->isStrictReadonly = config.get<bool>("read_only");

auto fetcher = std::make_shared<etl::impl::LedgerFetcher>(backend, balancer);
auto extractor = std::make_shared<etlng::impl::Extractor>(fetcher);
auto publisher = std::make_shared<etlng::impl::LedgerPublisher>(ioc, backend, subscriptions, *state);
auto cacheLoader = std::make_shared<etl::CacheLoader<>>(config, backend, backend->cache());
auto cacheUpdater = std::make_shared<etlng::impl::CacheUpdater>(backend->cache());
auto amendmentBlockHandler = std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx, *state);
auto monitorProvider = std::make_shared<etlng::impl::MonitorProvider>();

auto loader = std::make_shared<etlng::impl::Loader>(
backend,
Expand All @@ -104,7 +107,8 @@ ETLService::makeETLService(
etlng::impl::NFTExt{backend},
etlng::impl::MPTExt{backend}
),
amendmentBlockHandler
amendmentBlockHandler,
state
);

auto taskManagerProvider = std::make_shared<etlng::impl::TaskManagerProvider>(*ledgers, extractor, loader);
Expand All @@ -122,6 +126,7 @@ ETLService::makeETLService(
loader, // loader itself
loader, // initial load observer
taskManagerProvider,
monitorProvider,
state
);
} else {
Expand Down Expand Up @@ -346,7 +351,7 @@ ETLService::doWork()
worker_ = std::thread([this]() {
beast::setCurrentThreadName("ETLService worker");

if (state_.isReadOnly) {
if (state_.isStrictReadonly) {
monitorReadOnly();
} else {
monitor();
Expand All @@ -373,7 +378,7 @@ ETLService::ETLService(
{
startSequence_ = config.maybeValue<uint32_t>("start_sequence");
finishSequence_ = config.maybeValue<uint32_t>("finish_sequence");
state_.isReadOnly = config.get<bool>("read_only");
state_.isStrictReadonly = config.get<bool>("read_only");
extractorThreads_ = config.get<uint32_t>("extractor_threads");

// This should probably be done in the backend factory but we don't have state available until here
Expand Down
2 changes: 1 addition & 1 deletion src/etl/ETLService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class ETLService : public etlng::ETLServiceInterface, ETLServiceTag {

result["etl_sources"] = loadBalancer_->toJson();
result["is_writer"] = static_cast<int>(state_.isWriting);
result["read_only"] = static_cast<int>(state_.isReadOnly);
result["read_only"] = static_cast<int>(state_.isStrictReadonly);
auto last = ledgerPublisher_.getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_age_seconds"] = std::to_string(ledgerPublisher_.lastPublishAgeSeconds());
Expand Down
2 changes: 1 addition & 1 deletion src/etl/SystemState.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct SystemState {
* In strict read-only mode, the process will never attempt to become the ETL writer, and will only publish ledgers
* as they are written to the database.
*/
util::prometheus::Bool isReadOnly = PrometheusService::boolMetric(
util::prometheus::Bool isStrictReadonly = PrometheusService::boolMetric(
"read_only",
util::prometheus::Labels{},
"Whether the process is in strict read-only mode"
Expand Down
4 changes: 2 additions & 2 deletions src/etl/impl/LedgerLoader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ class LedgerLoader {
}

prev = cur->key;
static constexpr std::size_t kLOG_INTERVAL = 100000;
if (numWrites % kLOG_INTERVAL == 0 && numWrites != 0)
static constexpr std::size_t kLOG_STRIDE = 100000;
if (numWrites % kLOG_STRIDE == 0 && numWrites != 0)
LOG(log_.info()) << "Wrote " << numWrites << " book successors";
}

Expand Down
75 changes: 59 additions & 16 deletions src/etlng/ETLService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/MonitorProviderInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "etlng/impl/Extraction.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/Monitor.hpp"
#include "etlng/impl/Registry.hpp"
#include "etlng/impl/Scheduling.hpp"
#include "etlng/impl/TaskManager.hpp"
Expand All @@ -52,6 +52,7 @@
#include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/log/Logger.hpp"

#include <boost/json/object.hpp>
Expand Down Expand Up @@ -82,6 +83,7 @@ ETLService::ETLService(
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<etlng::MonitorProviderInterface> monitorProvider,
std::shared_ptr<etl::SystemState> state
)
: ctx_(std::move(ctx))
Expand All @@ -96,9 +98,11 @@ ETLService::ETLService(
, loader_(std::move(loader))
, initialLoadObserver_(std::move(initialLoadObserver))
, taskManagerProvider_(std::move(taskManagerProvider))
, monitorProvider_(std::move(monitorProvider))
, state_(std::move(state))
{
LOG(log_.info()) << "Creating ETLng...";
ASSERT(not state_->isWriting, "ETL should never start in writer mode");
LOG(log_.info()) << "Starting in " << (state_->isStrictReadonly ? "STRICT READONLY MODE" : "WRITE MODE");
}

ETLService::~ETLService()
Expand All @@ -112,12 +116,7 @@ ETLService::run()
{
LOG(log_.info()) << "Running ETLng...";

// TODO: write-enabled node should start in readonly and do the 10 second dance to become a writer
mainLoop_.emplace(ctx_.execute([this] {
state_->isWriting =
not state_->isReadOnly; // TODO: this is now needed because we don't have a mechanism for readonly or
// ETL writer node. remove later in favor of real mechanism

auto const rng = loadInitialLedgerIfNeeded();

LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
Expand All @@ -135,9 +134,8 @@ ETLService::run()
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
startMonitor(nextSequence);

// TODO: we only want to run the full ETL task man if we are POSSIBLY a write node
// but definitely not in strict readonly
if (not state_->isReadOnly)
// If we are a writer as the result of loading the initial ledger - start loading
if (state_->isWriting)
startLoading(nextSequence);
}));
}
Expand All @@ -147,6 +145,8 @@ ETLService::stop()
{
LOG(log_.info()) << "Stop called";

if (mainLoop_)
mainLoop_->wait();
if (taskMan_)
taskMan_->stop();
if (monitor_)
Expand All @@ -160,7 +160,7 @@ ETLService::getInfo() const

result["etl_sources"] = balancer_->toJson();
result["is_writer"] = static_cast<int>(state_->isWriting);
result["read_only"] = static_cast<int>(state_->isReadOnly);
result["read_only"] = static_cast<int>(state_->isStrictReadonly);
auto last = publisher_->getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds());
Expand Down Expand Up @@ -196,12 +196,19 @@ ETLService::loadInitialLedgerIfNeeded()
{
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (not rng.has_value()) {
ASSERT(
not state_->isStrictReadonly,
"Database is empty but this node is in strict readonly mode. Can't write initial ledger."
);

LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
state_->isWriting = true; // immediately become writer as the db is empty

LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) {
auto const seq = *mostRecentValidated;
LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... ";
LOG(log_.info()) << "Ledger " << seq << " has been validated. "
<< "Downloading and extracting (takes a while)...";

auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) {
Expand Down Expand Up @@ -238,28 +245,64 @@ ETLService::loadInitialLedgerIfNeeded()
void
ETLService::startMonitor(uint32_t seq)
{
monitor_ = std::make_unique<impl::Monitor>(ctx_, backend_, ledgers_, seq);
monitorSubscription_ = monitor_->subscribe([this](uint32_t seq) {
log_.info() << "MONITOR got new seq from db: " << seq;
monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq);

monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) {
LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq;

if (state_->writeConflict) {
LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately";
giveUpWriter();
}

// FIXME: is this the best way?
if (not state_->isWriting) {
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
return backend_->fetchLedgerDiff(seq, yield);
});

cacheUpdater_->update(seq, diff);
backend_->updateRange(seq);
}

publisher_->publish(seq, {});
});

monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() {
LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor";
if (not state_->isStrictReadonly and not state_->isWriting)
attemptTakeoverWriter();
});

monitor_->run();
}

void
ETLService::startLoading(uint32_t seq)
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq);
taskMan_->run(config_.get().get<std::size_t>("extractor_threads"));
}

void
ETLService::attemptTakeoverWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
auto rng = backend_->hardFetchLedgerRangeNoThrow();
ASSERT(rng.has_value(), "Ledger range can't be null");

state_->isWriting = true; // switch to writer
LOG(log_.info()) << "Taking over the ETL writer seat";
startLoading(rng->maxSequence + 1);
}

void
ETLService::giveUpWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
state_->isWriting = false;
state_->writeConflict = false;
taskMan_ = nullptr;
}

} // namespace etlng
16 changes: 13 additions & 3 deletions src/etlng/ETLService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/MonitorProviderInterface.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "etlng/impl/Extraction.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/Monitor.hpp"
#include "etlng/impl/Registry.hpp"
#include "etlng/impl/Scheduling.hpp"
#include "etlng/impl/TaskManager.hpp"
Expand All @@ -69,6 +69,7 @@
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>

#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -106,12 +107,14 @@ class ETLService : public ETLServiceInterface {
std::shared_ptr<LoaderInterface> loader_;
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver_;
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider_;
std::shared_ptr<etlng::MonitorProviderInterface> monitorProvider_;
std::shared_ptr<etl::SystemState> state_;

std::unique_ptr<MonitorInterface> monitor_;
std::unique_ptr<TaskManagerInterface> taskMan_;

boost::signals2::scoped_connection monitorSubscription_;
boost::signals2::scoped_connection monitorNewSeqSubscription_;
boost::signals2::scoped_connection monitorDbStalledSubscription_;

std::optional<util::async::AnyOperation<void>> mainLoop_;

Expand All @@ -131,6 +134,7 @@ class ETLService : public ETLServiceInterface {
* @param loader Interface for loading data
* @param initialLoadObserver The observer for initial data loading
* @param taskManagerProvider The provider of the task manager instance
* @param monitorProvider The provider of the monitor instance
* @param state System state tracking object
*/
ETLService(
Expand All @@ -146,6 +150,7 @@ class ETLService : public ETLServiceInterface {
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<etlng::MonitorProviderInterface> monitorProvider,
std::shared_ptr<etl::SystemState> state
);

Expand Down Expand Up @@ -173,7 +178,6 @@ class ETLService : public ETLServiceInterface {
lastCloseAgeSeconds() const override;

private:
// TODO: this better be std::expected
std::optional<data::LedgerRange>
loadInitialLedgerIfNeeded();

Expand All @@ -182,6 +186,12 @@ class ETLService : public ETLServiceInterface {

void
startLoading(uint32_t seq);

void
attemptTakeoverWriter();

void
giveUpWriter();
};

} // namespace etlng
Loading
Loading