Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions crates/floresta-wire/src/p2p_wire/chain_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ use std::time::Instant;

use bitcoin::block::Header;
use bitcoin::consensus::deserialize;
use bitcoin::p2p::message_blockdata::Inventory;
use bitcoin::p2p::ServiceFlags;
use bitcoin::BlockHash;
use floresta_chain::pruned_utreexo::BlockchainInterface;
use floresta_chain::pruned_utreexo::UpdatableChainstate;
use floresta_chain::UtreexoBlock;
use floresta_common::service_flags;
use log::debug;
use log::info;
use log::warn;
use rustreexo::accumulator::node_hash::BitcoinNodeHash;
Expand All @@ -66,6 +68,7 @@ use tokio::time::timeout;

use super::error::WireError;
use super::node::PeerStatus;
use super::node_interface::UserRequest;
use super::peer::PeerMessages;
use crate::address_man::AddressState;
use crate::node::periodic_job;
Expand All @@ -76,6 +79,7 @@ use crate::node::NodeRequest;
use crate::node::UtreexoNode;
use crate::node_context::NodeContext;
use crate::node_context::PeerId;
use crate::node_interface::NodeResponse;

#[derive(Debug, Default, Clone)]
/// A p2p driver that attempts to connect with multiple peers, ask which chain are them following
Expand Down Expand Up @@ -645,6 +649,11 @@ where
.insert(InflightRequests::Headers, (new_sync_peer, Instant::now()));
}
}

if let InflightRequests::UserRequest(req) = request {
self.user_requests.send_answer(req, None);
}

self.inflight.remove(&request);
}

Expand Down Expand Up @@ -690,6 +699,8 @@ where
try_and_log!(self.handle_notification(notification).await);
}

try_and_log!(self.handle_user_request().await);

// Checks if we need to open a new connection
periodic_job!(
self.maybe_open_connection(ServiceFlags::NONE).await,
Expand Down Expand Up @@ -853,6 +864,45 @@ where
self.address_man.push_addresses(&addresses);
}

PeerMessages::Block(block) => {
if self.check_is_user_block_and_reply(block).await?.is_some() {
log::error!("peer {peer} sent us a block we didn't request");
self.increase_banscore(peer, 5).await?;
}
}

PeerMessages::NotFound(inv) => match inv {
Inventory::Error => {}
Inventory::Block(block)
| Inventory::WitnessBlock(block)
| Inventory::CompactBlock(block) => {
self.user_requests
.send_answer(UserRequest::Block(block), None);
}

Inventory::WitnessTransaction(tx) | Inventory::Transaction(tx) => {
self.user_requests
.send_answer(UserRequest::MempoolTransaction(tx), None);
}
_ => {}
},

PeerMessages::Transaction(tx) => {
debug!("saw a mempool transaction with txid={}", tx.compute_txid());
self.user_requests.send_answer(
UserRequest::MempoolTransaction(tx.compute_txid()),
Some(NodeResponse::MempoolTransaction(tx)),
);
}

PeerMessages::UtreexoState(_) => {
warn!(
"Utreexo state received from peer {}, but we didn't ask",
peer
);
self.increase_banscore(peer, 5).await?;
}

_ => {}
}
Ok(())
Expand Down
158 changes: 155 additions & 3 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use super::mempool::Mempool;
use super::mempool::MempoolProof;
use super::node_context::NodeContext;
use super::node_interface::NodeInterface;
use super::node_interface::NodeResponse;
use super::node_interface::PeerInfo;
use super::node_interface::UserRequest;
use super::peer::create_actors;
Expand Down Expand Up @@ -145,9 +146,6 @@ impl Default for RunningNode {
fn default() -> Self {
RunningNode {
last_address_rearrange: Instant::now(),
user_requests: Arc::new(NodeInterface {
requests: std::sync::Mutex::new(Vec::new()),
}),
last_invs: HashMap::default(),
inflight_filters: BTreeMap::new(),
}
Expand Down Expand Up @@ -195,6 +193,9 @@ pub struct NodeCommon<Chain: BlockchainInterface + UpdatableChainstate> {
pub(crate) config: UtreexoNodeConfig,
pub(crate) datadir: String,
pub(crate) network: Network,

// 7. Stuff used by the node handle
pub(crate) user_requests: Arc<NodeInterface>,
}

pub struct UtreexoNode<Chain: BlockchainInterface + UpdatableChainstate, Context> {
Expand Down Expand Up @@ -276,11 +277,162 @@ where
max_banscore: config.max_banscore,
fixed_peer,
config,
user_requests: Arc::new(NodeInterface {
requests: std::sync::Mutex::new(Vec::new()),
}),
},
context: T::default(),
})
}

/// Returns a handle to the node interface that we can use to request data from our
/// node. This struct is thread safe, so we can use it from multiple threads and have
/// multiple handles. It also doesn't require a mutable reference to the node, or any
/// synchronization mechanism.
pub fn get_handle(&self) -> Arc<NodeInterface> {
self.user_requests.clone()
}

/// Checks if we have a request made through the user interface and handles it
///
/// This function is called by the main loop of the node, and it checks if we have any
/// requests made by the user interface. If we do, it will handle them and send the
/// response back to the user.
///
/// See the [`NodeInterface`] struct for more information on how to make requests.
pub(crate) async fn handle_user_request(&mut self) -> Result<(), WireError> {
let requests = self
.user_requests
.requests
.lock()
.map_err(|_| WireError::PoisonedLock)?
.iter()
.filter(|req| {
!self
.inflight
.contains_key(&InflightRequests::UserRequest(req.req))
})
.map(|req| req.req)
.collect();

self.perform_user_request(requests).await;

Ok(())
}

/// Handles getpeerinfo requests, returning a list of all connected peers and some useful
/// information about it.
fn handle_get_peer_info(&self) {
let mut peers = Vec::new();
for peer in self.peer_ids.iter() {
peers.push(self.get_peer_info(peer));
}
let peers = peers.into_iter().flatten().collect();
self.user_requests.send_answer(
UserRequest::GetPeerInfo,
Some(NodeResponse::GetPeerInfo(peers)),
);
}

/// Actually perform the user request
///
/// These are requests made by some consumer of `floresta-wire` using the [`NodeInterface`], and may
/// be a mempool transaction, a block, or a connection request.
async fn perform_user_request(&mut self, user_req: Vec<UserRequest>) {
for user_req in user_req {
debug!("Performing user request {user_req:?}");
if self.inflight.len() >= RunningNode::MAX_INFLIGHT_REQUESTS {
return;
}

let req = match user_req {
UserRequest::Block(block) => NodeRequest::GetBlock((vec![block], false)),
UserRequest::UtreexoBlock(block) => NodeRequest::GetBlock((vec![block], true)),
UserRequest::MempoolTransaction(txid) => NodeRequest::MempoolTransaction(txid),
UserRequest::GetPeerInfo => {
self.handle_get_peer_info();
continue;
}
UserRequest::Connect((addr, port)) => {
let addr_v2 = match addr {
IpAddr::V4(addr) => AddrV2::Ipv4(addr),
IpAddr::V6(addr) => AddrV2::Ipv6(addr),
};
let local_addr = LocalAddress::new(
addr_v2,
0,
AddressState::NeverTried,
0.into(),
port,
self.peer_id_count as usize,
);
self.open_connection(
ConnectionKind::Regular(ServiceFlags::NONE),
0,
local_addr,
)
.await;
self.peer_id_count += 1;
self.user_requests.send_answer(
UserRequest::Connect((addr, port)),
Some(NodeResponse::Connect(true)),
);
continue;
}
};
let peer = self.send_to_random_peer(req, ServiceFlags::NONE).await;
if let Ok(peer) = peer {
self.inflight.insert(
InflightRequests::UserRequest(user_req),
(peer, Instant::now()),
);
}
}
}

/// Check if this block request is made by a user through the user interface and answer it
/// back to the user if so.
///
/// This function will return the given block if isn't a user request. This is to avoid cloning
/// the block.
pub(crate) async fn check_is_user_block_and_reply(
&mut self,
block: UtreexoBlock,
) -> Result<Option<UtreexoBlock>, WireError> {
// If this block is a request made through the user interface, send it back to the
// user.
if self
.inflight
.remove(&InflightRequests::UserRequest(UserRequest::Block(
block.block.block_hash(),
)))
.is_some()
{
debug!(
"answering user request for block {}",
block.block.block_hash()
);

if block.udata.is_some() {
self.user_requests.send_answer(
UserRequest::UtreexoBlock(block.block.block_hash()),
Some(NodeResponse::UtreexoBlock(block)),
);

return Ok(None);
}

self.user_requests.send_answer(
UserRequest::Block(block.block.block_hash()),
Some(NodeResponse::Block(block.block)),
);

return Ok(None);
}

Ok(Some(block))
}

fn get_port(network: Network) -> u16 {
match network {
Network::Bitcoin => 8333,
Expand Down
Loading