Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 108 additions & 38 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! events, such as new blocks, peer connection/disconnection, new addresses, etc.
//! A node should not care about peer-specific messages, peers'll handle things like pings.
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::net::IpAddr;
use std::net::SocketAddr;
Expand Down Expand Up @@ -128,6 +127,7 @@ pub struct LocalPeerView {
_last_message: Instant,
feeler: bool,
height: u32,
banscore: u32,
}

#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -173,6 +173,7 @@ pub struct NodeCommon<Chain: BlockchainInterface + UpdatableChainstate> {
mempool: Arc<RwLock<Mempool>>,
datadir: String,
address_man: AddressMan,
max_banscore: u32,
socks5: Option<Socks5StreamBuilder>,
}

Expand All @@ -195,7 +196,6 @@ impl<T, Chain: BlockchainInterface + UpdatableChainstate> DerefMut for UtreexoNo
enum PeerStatus {
Awaiting,
Ready,
ShutingDown,
}
impl<T, Chain> UtreexoNode<T, Chain>
where
Expand All @@ -209,6 +209,7 @@ where
network: Network,
datadir: String,
proxy: Option<SocketAddr>,
max_banscore: Option<u32>,
) -> Self {
let (node_tx, node_rx) = channel::unbounded();
let socks5 = proxy.map(Socks5StreamBuilder::new);
Expand Down Expand Up @@ -236,10 +237,12 @@ where
last_send_addresses: Instant::now(),
datadir,
socks5,
max_banscore: max_banscore.unwrap_or(50),
},
T::default(),
)
}

fn get_peer_info(&self, peer: &u32) -> Option<PeerInfo> {
let peer = self.peers.get(peer)?;
Some(PeerInfo {
Expand Down Expand Up @@ -384,6 +387,7 @@ where

Ok((proof, hashes, inputs))
}

async fn send_to_peer(&self, peer_id: u32, req: NodeRequest) -> Result<(), WireError> {
if let Some(peer) = &self.peers.get(&peer_id) {
if peer.state == PeerStatus::Ready {
Expand All @@ -395,6 +399,32 @@ where
}
Ok(())
}

/// Increses the "banscore" of a peer.
///
/// This is a always increasing number that, if reaches our `max_banscore` setting,
/// will cause our peer to be banned for one BANTIME.
/// The amount of each increment is given by factor, and it's callibrated for each misbehaving
/// action that a peer may incur in.
async fn increase_banscore(&mut self, peer_id: u32, factor: u32) -> Result<(), WireError> {
let Some(peer) = self.0.peers.get_mut(&peer_id) else {
return Ok(());
};
peer.banscore += factor;

// This peer is misbehaving too often, ban it
if peer.banscore >= self.0.max_banscore {
warn!("banning peer {} for misbehaving", peer_id);
let _ = peer.channel.send(NodeRequest::Shutdown).await;
self.0.address_man.update_set_state(
peer.address_id as usize,
AddressState::Banned(RunningNode::BAN_TIME),
);
}

Ok(())
}

async fn check_for_timeout(&mut self) -> Result<(), WireError> {
let mut timed_out = Vec::new();
for request in self.inflight.keys() {
Expand All @@ -403,47 +433,83 @@ where
timed_out.push(request.clone());
}
}
let mut removed_peers = HashSet::new();
let mut to_request = Vec::new();
let mut rescan_blocks = Vec::new();

for request in timed_out {
let Some((peer, _)) = self.inflight.remove(&request) else {
continue;
};

// Punning this peer for taking too long to respond
self.increase_banscore(peer, 2).await?;

match request {
InflightRequests::Blocks(block) => to_request.push(block),
InflightRequests::RescanBlock(block) => rescan_blocks.push(block),
InflightRequests::Blocks(block) => {
let peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![block], true)),
ServiceFlags::UTREEXO,
)
.await?;
self.inflight
.insert(InflightRequests::Blocks(block), (peer, Instant::now()));
}
InflightRequests::RescanBlock(block) => {
let peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![block], false)),
ServiceFlags::NONE,
)
.await?;
self.inflight
.insert(InflightRequests::RescanBlock(block), (peer, Instant::now()));
}
InflightRequests::Headers => {
self.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE)
let peer = self
.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE)
.await?;
self.last_headers_request = Instant::now();
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));
}
InflightRequests::UserRequest(_) => {}
InflightRequests::UserRequest(req) => match req {
UserRequest::Block(block) => {
let peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![block], true)),
ServiceFlags::NONE,
)
.await?;
self.inflight
.insert(InflightRequests::UserRequest(req), (peer, Instant::now()));
}
UserRequest::MempoolTransaction(txid) => {
let peer = self
.send_to_random_peer(
NodeRequest::MempoolTransaction(txid),
ServiceFlags::NONE,
)
.await?;
self.inflight
.insert(InflightRequests::UserRequest(req), (peer, Instant::now()));
}
UserRequest::UtreexoBlock(block) => {
let peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![block], true)),
ServiceFlags::NONE,
)
.await?;
self.inflight
.insert(InflightRequests::UserRequest(req), (peer, Instant::now()));
}
_ => {}
},
InflightRequests::Connect(peer) => {
self.send_to_peer(peer, NodeRequest::Shutdown).await?
}
}

if !removed_peers.contains(&peer) {
self.send_to_peer(peer, NodeRequest::Shutdown).await?;
removed_peers.insert(peer);
if let Some(peer) = self.peers.get_mut(&peer) {
info!("Peer {} timed out request, shuting down", peer.address);
peer.state = PeerStatus::ShutingDown;
}
}
}
for hash in rescan_blocks {
let peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![hash], false)),
ServiceFlags::NONE,
)
.await?;
self.inflight
.insert(InflightRequests::RescanBlock(hash), (peer, Instant::now()));
}
self.request_blocks(to_request).await?;

Ok(())
}
#[inline]
Expand Down Expand Up @@ -747,6 +813,7 @@ where
feeler,
address_id: peer_id as u32,
height: 0,
banscore: 0,
},
);

Expand Down Expand Up @@ -841,6 +908,7 @@ where
if self.state == NodeState::WaitingPeer {
try_and_log!(self.maybe_open_connection().await);
}

self.last_tip_update = Instant::now();

// If we don't have any peers, then we can't do anything
Expand Down Expand Up @@ -1147,20 +1215,14 @@ where

// Jobs that don't need a connected peer

// Check whether we are in a stale tip
periodic_job!(
self.check_for_stale_tip().await,
self.last_tip_update,
ASSUME_STALE,
RunningNode
);
// Save our peers db
periodic_job!(
self.save_peers(),
self.last_peer_db_dump,
PEER_DB_DUMP_INTERVAL,
RunningNode
);

// Rework our address database
periodic_job!(
self.address_man.rearrange_buckets(),
Expand All @@ -1169,6 +1231,7 @@ where
RunningNode,
true
);

// Perhaps we need more connections
periodic_job!(
self.maybe_open_connection().await,
Expand All @@ -1188,6 +1251,13 @@ where
if self.state == NodeState::WaitingPeer {
continue;
}
// Check whether we are in a stale tip
periodic_job!(
self.check_for_stale_tip().await,
self.last_tip_update,
ASSUME_STALE,
RunningNode
);
// Check if we haven't missed any block
periodic_job!(
self.ask_missed_block().await,
Expand Down Expand Up @@ -1349,8 +1419,8 @@ where
"Peer {peer} sent us block {} which we didn't request",
block.block.block_hash()
);
self.send_to_peer(peer, NodeRequest::Shutdown).await?;
return Err(WireError::PeerMisbehaving);
self.increase_banscore(peer, 5).await?;
return Ok(());
}

let validation_index = self.chain.get_validation_index()?;
Expand Down
7 changes: 6 additions & 1 deletion crates/floresta-wire/src/p2p_wire/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,17 @@ impl Debug for Peer<TcpStream> {
Ok(())
}
}

type Result<T> = std::result::Result<T, PeerError>;

impl<T: Transport> Peer<T> {
pub async fn read_loop(mut self) -> Result<()> {
let err = self.peer_loop_inner().await;
warn!("Peer connection loop closed: {err:?}");

if let Err(err) = err {
warn!("Peer {} connection loop closed: {err:?}", self.id);
}

self.send_to_node(PeerMessages::Disconnected(self.address_id))
.await;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions crates/floresta/examples/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ async fn main() {
Network::Bitcoin,
DATA_DIR.into(),
None,
None,
);
// A handle is a simple way to interact with the node. It implements a queue of requests
// that will be processed by the node.
Expand Down
1 change: 1 addition & 0 deletions florestad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ fn run_with_ctx(ctx: Ctx) {
get_net(&ctx.network).into(),
data_dir,
ctx.proxy.map(|x| x.parse().expect("Invalid proxy address")),
None,
);

// ZMQ
Expand Down