From ea83e542da3778d56975d9544cb1f04814d18aa2 Mon Sep 17 00:00:00 2001 From: Davidson Souza Date: Wed, 14 Aug 2024 19:19:04 -0300 Subject: [PATCH] Wire: handle block filters out-of-order Right now, if we receive filters out of order, we'll consider that a misbehaving. But due to some weird connectivity or even how the remote node processes things, we may get those in a random order. This commit fixes that by holding the filters we've got in a BTree and only processing it after all the previous filters already got processed. --- .../src/flat_filters_store.rs | 21 +++++++---- crates/floresta-compact-filters/src/lib.rs | 1 + .../src/network_filters.rs | 3 +- .../src/p2p_wire/chain_selector.rs | 13 ++++++- crates/floresta-wire/src/p2p_wire/node.rs | 2 + .../src/p2p_wire/running_node.rs | 37 ++++++++++--------- 6 files changed, 50 insertions(+), 27 deletions(-) diff --git a/crates/floresta-compact-filters/src/flat_filters_store.rs b/crates/floresta-compact-filters/src/flat_filters_store.rs index aa9ed1dbd..7c39c7e13 100644 --- a/crates/floresta-compact-filters/src/flat_filters_store.rs +++ b/crates/floresta-compact-filters/src/flat_filters_store.rs @@ -15,7 +15,6 @@ use crate::IteratableFilterStoreError; pub struct FiltersIterator { reader: BufReader, - current: u32, } impl Iterator for FiltersIterator { @@ -23,22 +22,25 @@ impl Iterator for FiltersIterator { fn next(&mut self) -> Option { let mut buf = [0; 4]; + + self.reader.read_exact(&mut buf).ok()?; + let height = u32::from_le_bytes(buf); + self.reader.read_exact(&mut buf).ok()?; let length = u32::from_le_bytes(buf); debug_assert!( length < 1_000_000, "filter for block {} has length {}", - self.current, - length + height, + length, ); let mut buf = vec![0_u8; length as usize]; self.reader.read_exact(&mut buf).ok()?; let filter = crate::bip158::BlockFilter::new(&buf); - self.current += 1; - Some((self.current - 1, filter)) + Some((height, filter)) } } @@ -95,7 +97,7 @@ impl IntoIterator for FlatFiltersStore { let mut inner = self.0.lock().unwrap(); inner.file.seek(SeekFrom::Start(4)).unwrap(); let reader = BufReader::new(inner.file.try_clone().unwrap()); - FiltersIterator { reader, current: 0 } + FiltersIterator { reader } } } @@ -124,21 +126,24 @@ impl IteratableFilterStore for FlatFiltersStore { let new_file = File::open(inner.path.clone())?; let mut reader = BufReader::new(new_file); reader.seek(SeekFrom::Start(4))?; - Ok(FiltersIterator { reader, current: 1 }) + Ok(FiltersIterator { reader }) } fn put_filter( &self, block_filter: crate::bip158::BlockFilter, + height: u32, ) -> Result<(), IteratableFilterStoreError> { let length = block_filter.content.len() as u32; if length > 1_000_000 { return Err(IteratableFilterStoreError::FilterTooLarge); } + let mut inner = self.0.lock()?; inner.file.seek(SeekFrom::End(0))?; + inner.file.write_all(&height.to_le_bytes())?; inner.file.write_all(&length.to_le_bytes())?; inner.file.write_all(&block_filter.content)?; @@ -166,7 +171,7 @@ mod tests { let filter = BlockFilter::new(&[10, 11, 12, 13]); store - .put_filter(filter.clone()) + .put_filter(filter.clone(), 1) .expect("could not put filter"); let mut iter = store.iter().expect("could not get iterator"); diff --git a/crates/floresta-compact-filters/src/lib.rs b/crates/floresta-compact-filters/src/lib.rs index b7d21615c..8d063a36b 100644 --- a/crates/floresta-compact-filters/src/lib.rs +++ b/crates/floresta-compact-filters/src/lib.rs @@ -97,6 +97,7 @@ pub trait IteratableFilterStore: fn put_filter( &self, block_filter: bip158::BlockFilter, + height: u32, ) -> Result<(), IteratableFilterStoreError>; /// Persists the height of the last filter we have fn set_height(&self, height: u32) -> Result<(), IteratableFilterStoreError>; diff --git a/crates/floresta-compact-filters/src/network_filters.rs b/crates/floresta-compact-filters/src/network_filters.rs index d8c5e8245..1d14a0bd0 100644 --- a/crates/floresta-compact-filters/src/network_filters.rs +++ b/crates/floresta-compact-filters/src/network_filters.rs @@ -41,8 +41,9 @@ impl NetworkFilters { pub fn push_filter( &self, filter: crate::BlockFilter, + height: u32, ) -> Result<(), IteratableFilterStoreError> { - self.filters.put_filter(filter) + self.filters.put_filter(filter, height) } pub fn get_height(&self) -> Result { diff --git a/crates/floresta-wire/src/p2p_wire/chain_selector.rs b/crates/floresta-wire/src/p2p_wire/chain_selector.rs index eb21f5f0e..7bb647bfb 100644 --- a/crates/floresta-wire/src/p2p_wire/chain_selector.rs +++ b/crates/floresta-wire/src/p2p_wire/chain_selector.rs @@ -630,6 +630,18 @@ where } for request in failed { + match request { + InflightRequests::Headers => { + let new_sync_peer = rand::random::() % self.peer_ids.len(); + let new_sync_peer = *self.peer_ids.get(new_sync_peer).unwrap(); + self.1.sync_peer = new_sync_peer; + self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer) + .await?; + self.inflight + .insert(InflightRequests::Headers, (new_sync_peer, Instant::now())); + } + _ => {} + } self.inflight.remove(&request); } @@ -676,7 +688,6 @@ where if !self.peer_ids.is_empty() { let new_sync_peer = rand::random::() % self.peer_ids.len(); self.1.sync_peer = *self.peer_ids.get(new_sync_peer).unwrap(); - try_and_log!( self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer) .await diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 5c4ed9722..411a0071d 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -1,6 +1,7 @@ //! Main file for this blockchain. A node is the central task that runs and handles important //! events, such as new blocks, peer connection/disconnection, new addresses, etc. //! A node should not care about peer-specific messages, peers'll handle things like pings. +use std::collections::BTreeMap; use std::collections::HashMap; use std::fmt::Debug; use std::net::IpAddr; @@ -134,6 +135,7 @@ impl Default for RunningNode { requests: Mutex::new(Vec::new()), }), last_invs: HashMap::default(), + inflight_filters: BTreeMap::new(), } } } diff --git a/crates/floresta-wire/src/p2p_wire/running_node.rs b/crates/floresta-wire/src/p2p_wire/running_node.rs index f7bad30a6..e93c3863a 100644 --- a/crates/floresta-wire/src/p2p_wire/running_node.rs +++ b/crates/floresta-wire/src/p2p_wire/running_node.rs @@ -1,7 +1,8 @@ -use std::collections::HashMap; /// After a node catches-up with the network, we can start listening for new blocks, handing any /// request our user might make and keep our peers alive. This mode requires way less bandwidth and /// CPU to run, being bound by the number of blocks found in a given period. +use std::collections::BTreeMap; +use std::collections::HashMap; use std::net::IpAddr; use std::sync::Arc; use std::time::Duration; @@ -18,6 +19,7 @@ use floresta_chain::pruned_utreexo::UpdatableChainstate; use floresta_chain::BlockValidationErrors; use floresta_chain::BlockchainError; use floresta_chain::UtreexoBlock; +use floresta_compact_filters::BlockFilter; use log::debug; use log::error; use log::info; @@ -59,6 +61,7 @@ pub struct RunningNode { /// in a timely manner, but keep the ones that notified us of a new blocks the fastest. /// We also keep the moment we received the first inv message pub(crate) last_invs: HashMap)>, + pub(crate) inflight_filters: BTreeMap, } impl NodeContext for RunningNode { @@ -455,7 +458,6 @@ where ASSUME_STALE, RunningNode ); - try_and_log!(self.request_rescan_block().await); try_and_log!(self.download_filters().await); @@ -497,7 +499,7 @@ where } else { user_height as u32 }; - + height -= 1; filters.save_height(height)?; } @@ -506,10 +508,10 @@ where } info!("Downloading filters from height {}", filters.get_height()?); - let stop = if height + 1000 > best_height { + let stop = if height + 500 > best_height { best_height } else { - height + 1000 + height + 500 }; let stop_hash = self.chain.get_block_hash(stop)?; @@ -904,28 +906,29 @@ where PeerMessages::BlockFilter((hash, filter)) => { debug!("Got a block filter for block {hash} from peer {peer}"); - if let Some(filters) = self.block_filters.as_ref() { - let current_height = filters.get_height()?; + if let Some(filters) = self.0.block_filters.as_ref() { + let mut current_height = filters.get_height()?; let Some(this_height) = self.chain.get_block_height(&hash)? else { warn!("Filter for block {} received, but we don't have it", hash); return Ok(()); }; - // we expect to receive them in order if current_height + 1 != this_height { - warn!( - "Expected filter for height {}, got filter for height {}", - current_height + 1, - this_height - ); - self.increase_banscore(peer, 10).await?; + self.1.inflight_filters.insert(this_height, filter); return Ok(()); } - filters.save_height(current_height + 1)?; - filters.push_filter(filter)?; + filters.push_filter(filter, current_height + 1)?; + current_height += 1; + + while let Some(filter) = self.1.inflight_filters.remove(&(current_height)) { + filters.push_filter(filter, current_height)?; + current_height += 1; + } + + filters.save_height(current_height)?; - if self.last_filter == hash { + if self.last_filter == hash && self.1.inflight_filters.is_empty() { self.inflight.remove(&InflightRequests::GetFilters); self.download_filters().await?; }