Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/floresta-wire/src/p2p_wire/running_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ where
continue;
};

// Punnishing this peer for taking too long to respond
self.increase_banscore(peer, 2).await?;
if !matches!(request, InflightRequests::Connect(_)) {
// Punnishing this peer for taking too long to respond
self.increase_banscore(peer, 2).await?;
}

match request {
InflightRequests::UtreexoState(_) => {}
Expand Down
48 changes: 33 additions & 15 deletions crates/floresta-wire/src/p2p_wire/sync_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use floresta_chain::BlockchainError;
use log::debug;
use log::error;
use log::info;
use log::warn;

use super::error::WireError;
use super::peer::PeerMessages;
Expand Down Expand Up @@ -49,6 +50,23 @@ where
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
{
async fn get_blocks_to_download(&mut self) {
let mut blocks = Vec::with_capacity(10);
for _ in 0..10 {
let next_block = self.1.last_block_requested + 1;
let next_block = self.chain.get_block_hash(next_block);
match next_block {
Ok(next_block) => {
blocks.push(next_block);
self.1.last_block_requested += 1;
}
Err(_) => {
break;
}
}
}
try_and_log!(self.request_blocks(blocks).await);
}
pub async fn run(&mut self, kill_signal: Arc<RwLock<bool>>, done_cb: impl FnOnce(&Chain)) {
info!("Starting sync node");
self.1.last_block_requested = self.chain.get_validation_index().unwrap();
Expand Down Expand Up @@ -78,22 +96,12 @@ where
continue;
}

if self.inflight.len() < SyncNode::MAX_INFLIGHT_REQUESTS {
let mut blocks = Vec::with_capacity(100);
for _ in 0..100 {
let next_block = self.1.last_block_requested + 1;
let next_block = self.chain.get_block_hash(next_block);
match next_block {
Ok(next_block) => {
blocks.push(next_block);
self.1.last_block_requested += 1;
}
Err(_) => {
break;
}
}
if self.chain.get_validation_index().unwrap() + 10 > self.1.last_block_requested {
if self.inflight.len() > 10 {
continue;
}
try_and_log!(self.request_blocks(blocks).await);
info!("Requesting blocks from {}", self.1.last_block_requested);
self.get_blocks_to_download().await;
}
}

Expand Down Expand Up @@ -202,6 +210,10 @@ where
debug!("accepted block {}", block.block.block_hash());
}

if self.inflight.len() < 4 {
self.get_blocks_to_download().await;
}

Ok(())
}

Expand All @@ -218,6 +230,12 @@ where
}
PeerMessages::Disconnected(idx) => {
try_and_log!(self.handle_disconnection(peer, idx));
if !self.has_utreexo_peers() {
warn!("No utreexo peers connected, trying to create a new one");
try_and_log!(self.maybe_open_connection().await);
self.1.last_block_requested = self.chain.get_validation_index().unwrap();
self.inflight.clear();
}
}
_ => {}
},
Expand Down