Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/etl/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ LoadBalancer::getETLState() noexcept
void
LoadBalancer::chooseForwardingSource()
{
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
hasForwardingSource_ = false;
for (auto& source : sources_) {
if (not hasForwardingSource_ and source->isConnected()) {
Expand Down
2 changes: 1 addition & 1 deletion src/etl/impl/GrpcSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
namespace etl::impl {

GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
: log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
{
try {
boost::asio::io_context ctx;
Expand Down
22 changes: 14 additions & 8 deletions src/etl/impl/SubscriptionSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ SubscriptionSource::SubscriptionSource(
std::chrono::steady_clock::duration const connectionTimeout,
std::chrono::steady_clock::duration const retryDelay
)
: log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort))
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
, wsConnectionBuilder_(ip, wsPort)
, validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions))
Expand Down Expand Up @@ -133,6 +133,7 @@ void
SubscriptionSource::setForwarding(bool isForwarding)
{
isForwarding_ = isForwarding;
LOG(log_.info()) << "Forwarding set to " << isForwarding_;
}

std::chrono::steady_clock::time_point
Expand Down Expand Up @@ -168,6 +169,7 @@ SubscriptionSource::subscribe()
wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
LOG(log_.info()) << "Connected";

auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
Expand Down Expand Up @@ -224,10 +226,11 @@ SubscriptionSource::handleMessage(std::string const& message)
auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
setValidatedRange(std::move(validatedLedgers));
}
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;

} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
<< object;
if (object.contains(JS(ledger_index))) {
ledgerIndex = object.at(JS(ledger_index)).as_int64();
}
Expand All @@ -245,11 +248,16 @@ SubscriptionSource::handleMessage(std::string const& message)
// 2 - Validated transaction
// Only forward proposed transaction, validated transactions are sent by Clio itself
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
subscriptions_->forwardProposedTransaction(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
LOG(log_.debug()) << "Forwarding validation: " << object;
subscriptions_->forwardValidation(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
} else {
LOG(log_.error()) << "Unknown message: " << object;
}
}
}
Expand All @@ -261,7 +269,7 @@ SubscriptionSource::handleMessage(std::string const& message)

return std::nullopt;
} catch (std::exception const& e) {
LOG(log_.error()) << "Exception in handleMessage : " << e.what();
LOG(log_.error()) << "Exception in handleMessage: " << e.what();
return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
}
}
Expand All @@ -273,13 +281,11 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost
isForwarding_ = false;
if (not stop_) {
onDisconnect_();
LOG(log_.info()) << "Disconnected";
}

if (wsConnection_ != nullptr) {
auto const err = wsConnection_->close(yield);
if (err) {
LOG(log_.error()) << "Error closing websocket connection: " << err->message();
}
wsConnection_->close(yield);
wsConnection_.reset();
}

Expand Down