Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions crates/floresta-compact-filters/src/flat_filters_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,32 @@ use crate::IteratableFilterStoreError;

pub struct FiltersIterator {
reader: BufReader<File>,
current: u32,
}

impl Iterator for FiltersIterator {
type Item = (u32, crate::bip158::BlockFilter);

fn next(&mut self) -> Option<Self::Item> {
let mut buf = [0; 4];

self.reader.read_exact(&mut buf).ok()?;
let height = u32::from_le_bytes(buf);

self.reader.read_exact(&mut buf).ok()?;
let length = u32::from_le_bytes(buf);

debug_assert!(
length < 1_000_000,
"filter for block {} has length {}",
self.current,
length
height,
length,
);

let mut buf = vec![0_u8; length as usize];
self.reader.read_exact(&mut buf).ok()?;
let filter = crate::bip158::BlockFilter::new(&buf);
self.current += 1;

Some((self.current - 1, filter))
Some((height, filter))
}
}

Expand Down Expand Up @@ -95,7 +97,7 @@ impl IntoIterator for FlatFiltersStore {
let mut inner = self.0.lock().unwrap();
inner.file.seek(SeekFrom::Start(4)).unwrap();
let reader = BufReader::new(inner.file.try_clone().unwrap());
FiltersIterator { reader, current: 0 }
FiltersIterator { reader }
}
}

Expand Down Expand Up @@ -124,21 +126,24 @@ impl IteratableFilterStore for FlatFiltersStore {
let new_file = File::open(inner.path.clone())?;
let mut reader = BufReader::new(new_file);
reader.seek(SeekFrom::Start(4))?;
Ok(FiltersIterator { reader, current: 1 })
Ok(FiltersIterator { reader })
}

fn put_filter(
&self,
block_filter: crate::bip158::BlockFilter,
height: u32,
) -> Result<(), IteratableFilterStoreError> {
let length = block_filter.content.len() as u32;

if length > 1_000_000 {
return Err(IteratableFilterStoreError::FilterTooLarge);
}

let mut inner = self.0.lock()?;

inner.file.seek(SeekFrom::End(0))?;
inner.file.write_all(&height.to_le_bytes())?;
inner.file.write_all(&length.to_le_bytes())?;
inner.file.write_all(&block_filter.content)?;

Expand Down Expand Up @@ -166,7 +171,7 @@ mod tests {

let filter = BlockFilter::new(&[10, 11, 12, 13]);
store
.put_filter(filter.clone())
.put_filter(filter.clone(), 1)
.expect("could not put filter");

let mut iter = store.iter().expect("could not get iterator");
Expand Down
1 change: 1 addition & 0 deletions crates/floresta-compact-filters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub trait IteratableFilterStore:
fn put_filter(
&self,
block_filter: bip158::BlockFilter,
height: u32,
) -> Result<(), IteratableFilterStoreError>;
/// Persists the height of the last filter we have
fn set_height(&self, height: u32) -> Result<(), IteratableFilterStoreError>;
Expand Down
3 changes: 2 additions & 1 deletion crates/floresta-compact-filters/src/network_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ impl<Storage: IteratableFilterStore> NetworkFilters<Storage> {
pub fn push_filter(
&self,
filter: crate::BlockFilter,
height: u32,
) -> Result<(), IteratableFilterStoreError> {
self.filters.put_filter(filter)
self.filters.put_filter(filter, height)
}

pub fn get_height(&self) -> Result<u32, IteratableFilterStoreError> {
Expand Down
13 changes: 12 additions & 1 deletion crates/floresta-wire/src/p2p_wire/chain_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,18 @@ where
}

for request in failed {
match request {
InflightRequests::Headers => {
let new_sync_peer = rand::random::<usize>() % self.peer_ids.len();
let new_sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();
self.1.sync_peer = new_sync_peer;
self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer)
.await?;
self.inflight
.insert(InflightRequests::Headers, (new_sync_peer, Instant::now()));
}
_ => {}
}
self.inflight.remove(&request);
}

Expand Down Expand Up @@ -676,7 +688,6 @@ where
if !self.peer_ids.is_empty() {
let new_sync_peer = rand::random::<usize>() % self.peer_ids.len();
self.1.sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();

try_and_log!(
self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer)
.await
Expand Down
2 changes: 2 additions & 0 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Main file for this blockchain. A node is the central task that runs and handles important
//! events, such as new blocks, peer connection/disconnection, new addresses, etc.
//! A node should not care about peer-specific messages, peers'll handle things like pings.
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::IpAddr;
Expand Down Expand Up @@ -134,6 +135,7 @@ impl Default for RunningNode {
requests: Mutex::new(Vec::new()),
}),
last_invs: HashMap::default(),
inflight_filters: BTreeMap::new(),
}
}
}
Expand Down
37 changes: 20 additions & 17 deletions crates/floresta-wire/src/p2p_wire/running_node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
/// After a node catches-up with the network, we can start listening for new blocks, handing any
/// request our user might make and keep our peers alive. This mode requires way less bandwidth and
/// CPU to run, being bound by the number of blocks found in a given period.
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,6 +19,7 @@ use floresta_chain::pruned_utreexo::UpdatableChainstate;
use floresta_chain::BlockValidationErrors;
use floresta_chain::BlockchainError;
use floresta_chain::UtreexoBlock;
use floresta_compact_filters::BlockFilter;
use log::debug;
use log::error;
use log::info;
Expand Down Expand Up @@ -59,6 +61,7 @@ pub struct RunningNode {
/// in a timely manner, but keep the ones that notified us of a new blocks the fastest.
/// We also keep the moment we received the first inv message
pub(crate) last_invs: HashMap<BlockHash, (Instant, Vec<PeerId>)>,
pub(crate) inflight_filters: BTreeMap<u32, BlockFilter>,
}

impl NodeContext for RunningNode {
Expand Down Expand Up @@ -455,7 +458,6 @@ where
ASSUME_STALE,
RunningNode
);

try_and_log!(self.request_rescan_block().await);
try_and_log!(self.download_filters().await);

Expand Down Expand Up @@ -497,7 +499,7 @@ where
} else {
user_height as u32
};

height -= 1;
filters.save_height(height)?;
}

Expand All @@ -506,10 +508,10 @@ where
}

info!("Downloading filters from height {}", filters.get_height()?);
let stop = if height + 1000 > best_height {
let stop = if height + 500 > best_height {
best_height
} else {
height + 1000
height + 500
};

let stop_hash = self.chain.get_block_hash(stop)?;
Expand Down Expand Up @@ -904,28 +906,29 @@ where
PeerMessages::BlockFilter((hash, filter)) => {
debug!("Got a block filter for block {hash} from peer {peer}");

if let Some(filters) = self.block_filters.as_ref() {
let current_height = filters.get_height()?;
if let Some(filters) = self.0.block_filters.as_ref() {
let mut current_height = filters.get_height()?;
let Some(this_height) = self.chain.get_block_height(&hash)? else {
warn!("Filter for block {} received, but we don't have it", hash);
return Ok(());
};

// we expect to receive them in order
if current_height + 1 != this_height {
warn!(
"Expected filter for height {}, got filter for height {}",
current_height + 1,
this_height
);
self.increase_banscore(peer, 10).await?;
self.1.inflight_filters.insert(this_height, filter);
return Ok(());
}

filters.save_height(current_height + 1)?;
filters.push_filter(filter)?;
filters.push_filter(filter, current_height + 1)?;
current_height += 1;

while let Some(filter) = self.1.inflight_filters.remove(&(current_height)) {
filters.push_filter(filter, current_height)?;
current_height += 1;
}

filters.save_height(current_height)?;

if self.last_filter == hash {
if self.last_filter == hash && self.1.inflight_filters.is_empty() {
self.inflight.remove(&InflightRequests::GetFilters);
self.download_filters().await?;
}
Expand Down