Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/floresta-cli/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ pub trait FlorestaRPC {
/// Gets information about the peers we're connected with
///
/// This method returns information about the peers we're connected with. This includes
/// the peer's IP address, the peer's version, the peer's user agent, and the peer's
/// current height.
/// the peer's IP address, the peer's version, the peer's user agent, the transport protocol
/// and the peer's current height.
fn get_peer_info(&self) -> Result<Vec<PeerInfo>>;
/// Returns a block, given a block hash
///
Expand Down
2 changes: 2 additions & 0 deletions crates/floresta-cli/src/rpc_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ pub struct PeerInfo {
///
/// Can be either Ready, Connecting or Banned
pub state: String,
/// The transport protocol used with peer.
pub transport_protocol: String,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
11 changes: 9 additions & 2 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use super::peer::Version;
use super::running_node::RunningNode;
use super::socks::Socks5StreamBuilder;
use super::transport;
use super::transport::TransportProtocol;
use super::UtreexoNodeConfig;
use crate::node_context::PeerId;

Expand Down Expand Up @@ -133,6 +134,7 @@ pub struct LocalPeerView {
pub(crate) kind: ConnectionKind,
pub(crate) height: u32,
pub(crate) banscore: u32,
pub(crate) transport_protocol: TransportProtocol,
}

#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -615,6 +617,7 @@ where
services: peer.services.to_string(),
user_agent: peer.user_agent.clone(),
initial_height: peer.height,
transport_protocol: peer.transport_protocol,
})
}

Expand Down Expand Up @@ -787,6 +790,7 @@ where
peer_data.services = version.services;
peer_data.user_agent.clone_from(&version.user_agent);
peer_data.height = version.blocks;
peer_data.transport_protocol = version.transport_protocol;

// If this peer doesn't have basic services, we disconnect it
if let ConnectionKind::Regular(needs) = version.kind {
Expand Down Expand Up @@ -1185,7 +1189,7 @@ where
) -> Result<(), WireError> {
let address = (address.get_net_address(), address.get_port());

let (transport_reader, transport_writer) =
let (transport_reader, transport_writer, transport_protocol) =
transport::connect(address, network, allow_v1_fallback).await?;

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
Expand All @@ -1209,6 +1213,7 @@ where
transport_writer,
user_agent,
cancellation_sender,
transport_protocol,
)
.await;

Expand All @@ -1229,7 +1234,7 @@ where
user_agent: String,
allow_v1_fallback: bool,
) -> Result<(), WireError> {
let (transport_reader, transport_writer) =
let (transport_reader, transport_writer, transport_protocol) =
transport::connect_proxy(proxy, address, network, allow_v1_fallback).await?;

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
Expand All @@ -1252,6 +1257,7 @@ where
transport_writer,
user_agent,
cancellation_sender,
transport_protocol,
)
.await;
Ok(())
Expand Down Expand Up @@ -1323,6 +1329,7 @@ where
address_id: peer_id as u32,
height: 0,
banscore: 0,
transport_protocol: TransportProtocol::V1, // Default to V1, will be updated when peer is ready.
},
);

Expand Down
2 changes: 2 additions & 0 deletions crates/floresta-wire/src/p2p_wire/node_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use serde::Serialize;

use super::node::ConnectionKind;
use super::node::PeerStatus;
use super::transport::TransportProtocol;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum UserRequest {
Expand All @@ -29,6 +30,7 @@ pub struct PeerInfo {
pub initial_height: u32,
pub state: PeerStatus,
pub kind: ConnectionKind,
pub transport_protocol: TransportProtocol,
}

#[derive(Debug, Clone)]
Expand Down
6 changes: 6 additions & 0 deletions crates/floresta-wire/src/p2p_wire/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::mempool::Mempool;
use super::node::NodeNotification;
use super::node::NodeRequest;
use super::transport::TransportError;
use super::transport::TransportProtocol;
use super::transport::WriteTransport;
use crate::node::ConnectionKind;
use crate::p2p_wire::transport::ReadTransport;
Expand Down Expand Up @@ -114,6 +115,7 @@ pub struct Peer<T: AsyncWrite + Unpin + Send + Sync> {
writer: WriteTransport<T>,
our_user_agent: String,
cancellation_sender: tokio::sync::oneshot::Sender<()>,
transport_protocol: TransportProtocol,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -492,6 +494,7 @@ impl<T: AsyncWrite + Unpin + Send + Sync> Peer<T> {
address_id: self.address_id,
services: self.services,
kind: self.kind,
transport_protocol: self.transport_protocol,
}))
.await;
}
Expand Down Expand Up @@ -550,6 +553,7 @@ impl<T: AsyncWrite + Unpin + Send + Sync> Peer<T> {
writer: WriteTransport<W>,
our_user_agent: String,
cancellation_sender: tokio::sync::oneshot::Sender<()>,
transport_protocol: TransportProtocol,
) {
let peer = Peer {
address_id,
Expand All @@ -574,6 +578,7 @@ impl<T: AsyncWrite + Unpin + Send + Sync> Peer<T> {
writer,
our_user_agent,
cancellation_sender,
transport_protocol,
};

spawn(peer.read_loop());
Expand Down Expand Up @@ -670,6 +675,7 @@ pub struct Version {
pub address_id: usize,
pub services: ServiceFlags,
pub kind: ConnectionKind,
pub transport_protocol: TransportProtocol,
}
/// Messages passed from different modules to the main node to process. They should minimal
/// and only if it requires global states, everything else should be handled by the module
Expand Down
3 changes: 3 additions & 0 deletions crates/floresta-wire/src/p2p_wire/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::node::PeerStatus;
use crate::p2p_wire::node::ConnectionKind;
use crate::p2p_wire::peer::PeerMessages;
use crate::p2p_wire::peer::Version;
use crate::p2p_wire::transport::TransportProtocol;
use crate::UtreexoNodeConfig;

/// A list of headers, used to represent the collection of headers.
Expand Down Expand Up @@ -101,6 +102,7 @@ impl TestPeer {
| ServiceFlags::COMPACT_FILTERS
| ServiceFlags::from(1 << 25),
kind: ConnectionKind::Regular(UTREEXO.into()),
transport_protocol: TransportProtocol::V2,
};

self.node_tx
Expand Down Expand Up @@ -167,6 +169,7 @@ pub fn create_peer(
banscore: 0,
address_id: 0,
_last_message: Instant::now(),
transport_protocol: TransportProtocol::V2,
}
}

Expand Down
109 changes: 81 additions & 28 deletions crates/floresta-wire/src/p2p_wire/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use bitcoin::p2p::message::RawNetworkMessage;
use bitcoin::p2p::Magic;
use bitcoin::Network;
use floresta_chain::UtreexoBlock;
use log::debug;
use log::info;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
Expand All @@ -35,7 +39,8 @@ use crate::address_man::LocalAddress;

type TcpReadTransport = ReadTransport<ReadHalf<TcpStream>>;
type TcpWriteTransport = WriteTransport<WriteHalf<TcpStream>>;
type TransportResult = Result<(TcpReadTransport, TcpWriteTransport), TransportError>;
type TransportResult =
Result<(TcpReadTransport, TcpWriteTransport, TransportProtocol), TransportError>;

#[derive(Error, Debug)]
pub enum TransportError {
Expand Down Expand Up @@ -67,6 +72,15 @@ pub enum WriteTransport<W: AsyncWrite + Unpin + Send + Sync> {
V1(W, Network),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
/// Bitcoin nodes can communicate using different transport layer protocols.
pub enum TransportProtocol {
/// Encrypted V2 protocol defined in BIP-324.
V2,
/// Original unencrypted V1 protocol.
V1,
}

struct V1MessageHeader {
_magic: Magic,
_command: [u8; 12],
Expand Down Expand Up @@ -134,13 +148,21 @@ async fn try_connection<A: ToSocketAddrs>(
) -> TransportResult {
let tcp_stream = TcpStream::connect(address).await?;
tcp_stream.set_nodelay(true)?;
let peer_addr = match tcp_stream.peer_addr() {
Ok(addr) => addr.to_string(),
Err(_) => String::from("unknown peer"),
};
let (mut reader, mut writer) = tokio::io::split(tcp_stream);

match force_v1 {
true => Ok((
ReadTransport::V1(reader),
WriteTransport::V1(writer, network),
)),
true => {
info!("Using V1 protocol for connection to {}", peer_addr);
Ok((
ReadTransport::V1(reader),
WriteTransport::V1(writer, network),
TransportProtocol::V1,
))
}
false => match AsyncProtocol::new(
network,
Role::Initiator,
Expand All @@ -152,13 +174,24 @@ async fn try_connection<A: ToSocketAddrs>(
.await
{
Ok(protocol) => {
info!(
"Successfully established V2 protocol connection to {}",
peer_addr
);
let (reader_protocol, writer_protocol) = protocol.into_split();
Ok((
ReadTransport::V2(reader, reader_protocol),
WriteTransport::V2(writer, writer_protocol),
TransportProtocol::V2,
))
}
Err(e) => Err(TransportError::Protocol(e)),
Err(e) => {
debug!(
"Failed to establish V2 protocol connection to {}: {:?}",
peer_addr, e
);
Err(TransportError::Protocol(e))
}
},
}
}
Expand Down Expand Up @@ -226,29 +259,49 @@ async fn try_proxy_connection<A: ToSocketAddrs>(
let (mut reader, mut writer) = tokio::io::split(stream);

match force_v1 {
true => Ok((
ReadTransport::V1(reader),
WriteTransport::V1(writer, network),
)),
false => match AsyncProtocol::new(
network,
Role::Initiator,
None,
None,
&mut reader,
&mut writer,
)
.await
{
Ok(protocol) => {
let (reader_protocol, writer_protocol) = protocol.into_split();
Ok((
ReadTransport::V2(reader, reader_protocol),
WriteTransport::V2(writer, writer_protocol),
))
true => {
info!(
"Using V1 protocol for proxy connection to {:?}",
target_addr
);
Ok((
ReadTransport::V1(reader),
WriteTransport::V1(writer, network),
TransportProtocol::V1,
))
}
false => {
match AsyncProtocol::new(
network,
Role::Initiator,
None,
None,
&mut reader,
&mut writer,
)
.await
{
Ok(protocol) => {
info!(
"Successfully established V2 protocol proxy connection to {:?}",
target_addr
);
let (reader_protocol, writer_protocol) = protocol.into_split();
Ok((
ReadTransport::V2(reader, reader_protocol),
WriteTransport::V2(writer, writer_protocol),
TransportProtocol::V2,
))
}
Err(e) => {
debug!(
"Failed to establish V2 protocol proxy connection to {:?}: {:?}",
target_addr, e
);
Err(TransportError::Protocol(e))
}
}
Err(e) => Err(TransportError::Protocol(e)),
},
}
}
}

Expand Down