Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions p2p/src/authenticated/discovery/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,31 @@ impl<E: Spawner + Clock + ReasonablyRealtime + Network + Rng + CryptoRng + Metri
return;
}
};

// Attempt to claim the connection
//
// Reserve also checks if the peer is authorized.
let peer = incoming.peer();
let Some(reservation) = tracker.listen(peer.clone()).await else {
debug!("unable to reserve connection to peer");
debug!(?peer, ?address, "verified handshake");

// Check if the peer is listenable
if !tracker.listenable(peer.clone()).await {
debug!(?peer, ?address, "peer not listenable");
return;
};
}

// Perform handshake
let stream = match Connection::upgrade_listener(context, incoming).await {
Ok(connection) => connection,
Err(err) => {
debug!(?err, "failed to upgrade connection");
debug!(?err, ?peer, ?address, "failed to upgrade connection");
return;
}
};
debug!(?peer, ?address, "upgraded connection");
debug!(?peer, ?address, "completed handshake");

// Attempt to claim the connection
let Some(reservation) = tracker.listen(peer.clone()).await else {
debug!(?peer, ?address, "unable to reserve connection to peer");
return;
};
debug!(?peer, ?address, "reserved connection");

// Start peer to handle messages
supervisor.spawn(stream, reservation).await;
Expand Down
43 changes: 43 additions & 0 deletions p2p/src/authenticated/discovery/actors/tracker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ impl<E: Spawner + Rng + Clock + GClock + RuntimeMetrics, C: Signer> Actor<E, C>
} => {
let _ = reservation.send(self.directory.dial(&public_key));
}
Message::Listenable {
public_key,
responder,
} => {
let _ = responder.send(self.directory.listenable(&public_key));
}
Message::Listen {
public_key,
reservation,
Expand Down Expand Up @@ -854,6 +860,39 @@ mod tests {
});
}

#[test]
fn test_listenable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (peer_signer, peer_pk) = new_signer_and_pk(0);
let (_peer_signer2, peer_pk2) = new_signer_and_pk(1);
let (_peer_signer3, peer_pk3) = new_signer_and_pk(2);
let cfg_initial = default_test_config(peer_signer, Vec::new());
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);

// None listenable because not registered
assert!(!mailbox.listenable(peer_pk.clone()).await);
assert!(!mailbox.listenable(peer_pk2.clone()).await);
assert!(!mailbox.listenable(peer_pk3.clone()).await);

oracle
.register(0, vec![peer_pk.clone(), peer_pk2.clone()])
.await;
context.sleep(Duration::from_millis(10)).await;

// Not listenable because self
assert!(!mailbox.listenable(peer_pk).await);
// Listenable because registered
assert!(mailbox.listenable(peer_pk2).await);
// Not listenable because not registered
assert!(!mailbox.listenable(peer_pk3).await);
});
}

#[test]
fn test_listen() {
let executor = deterministic::Runner::default();
Expand All @@ -873,9 +912,13 @@ mod tests {
oracle.register(0, vec![peer_pk.clone()]).await;
context.sleep(Duration::from_millis(10)).await; // Allow register to process

assert!(mailbox.listenable(peer_pk.clone()).await);

let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());

assert!(!mailbox.listenable(peer_pk.clone()).await);

let failed_reservation = mailbox.listen(peer_pk.clone()).await;
assert!(failed_reservation.is_none());

Expand Down
31 changes: 18 additions & 13 deletions p2p/src/authenticated/discovery/actors/tracker/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,6 @@ impl<E: Spawner + Rng + Clock + GClock + RuntimeMetrics, C: PublicKey> Directory
self.rate_limiter.shrink_to_fit();
}

/// Returns a vector of dialable peers. That is, unconnected peers for which we have a socket.
pub fn dialable(&self) -> Vec<C> {
// Collect peers with known addresses
let mut result: Vec<_> = self
.peers
.iter()
.filter(|&(_, r)| r.dialable())
.map(|(peer, _)| peer.clone())
.collect();
result.sort();
result
}

/// Attempt to reserve a peer for the dialer.
///
/// Returns `Some` on success, `None` otherwise.
Expand Down Expand Up @@ -310,6 +297,24 @@ impl<E: Spawner + Rng + Clock + GClock + RuntimeMetrics, C: PublicKey> Directory
self.peers.get(peer).is_some_and(|r| r.allowed())
}

/// Returns a vector of dialable peers. That is, unconnected peers for which we have a socket.
pub fn dialable(&self) -> Vec<C> {
// Collect peers with known addresses
let mut result: Vec<_> = self
.peers
.iter()
.filter(|&(_, r)| r.dialable())
.map(|(peer, _)| peer.clone())
.collect();
result.sort();
result
}

/// Returns true if the peer is listenable.
pub fn listenable(&self, peer: &C) -> bool {
self.peers.get(peer).is_some_and(|r| r.listenable())
}

// --------- Helpers ----------

/// Attempt to reserve a peer.
Expand Down
21 changes: 21 additions & 0 deletions p2p/src/authenticated/discovery/actors/tracker/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ pub enum Message<E: Spawner + Metrics, C: PublicKey> {
},

// ---------- Used by listener ----------
/// Check if we should listen to a peer.
Listenable {
/// The public key of the peer to check.
public_key: C,

/// The sender to respond with the listenable status.
responder: oneshot::Sender<bool>,
},

/// Request a reservation for a particular peer.
///
/// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if the
Expand Down Expand Up @@ -165,6 +174,18 @@ impl<E: Spawner + Metrics, C: PublicKey> Mailbox<Message<E, C>> {
rx.await.unwrap()
}

/// Send a `Listenable` message to the tracker.
pub async fn listenable(&mut self, public_key: C) -> bool {
let (tx, rx) = oneshot::channel();
self.send(Message::Listenable {
public_key,
responder: tx,
})
.await
.unwrap();
rx.await.unwrap()
}

/// Send a `Listen` message to the tracker.
pub async fn listen(&mut self, public_key: C) -> Option<Reservation<E, C>> {
let (tx, rx) = oneshot::channel();
Expand Down
9 changes: 9 additions & 0 deletions p2p/src/authenticated/discovery/actors/tracker/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ impl<C: PublicKey> Record<C> {
)
}

/// Returns `true` if the peer is listenable.
///
/// A record is listenable if:
/// - The peer is allowed
/// - We are not already connected
pub fn listenable(&self) -> bool {
self.allowed() && self.status == Status::Inert
}

/// Return the socket of the peer, if known.
pub fn socket(&self) -> Option<SocketAddr> {
match &self.address {
Expand Down
26 changes: 16 additions & 10 deletions p2p/src/authenticated/lookup/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,35 @@ impl<E: Spawner + Clock + ReasonablyRealtime + Network + Rng + CryptoRng + Metri
let incoming = match IncomingConnection::verify(&context, stream_cfg, sink, stream).await {
Ok(partial) => partial,
Err(err) => {
debug!(?err, "failed to verify incoming handshake");
debug!(?err, ?address, "failed to verify incoming handshake");
return;
}
};

// Attempt to claim the connection
//
// Reserve also checks if the peer is authorized.
let peer = incoming.peer();
let Some(reservation) = tracker.listen(peer.clone()).await else {
debug!("unable to reserve connection to peer");
debug!(?peer, ?address, "verified handshake");

// Check if the peer is listenable
if !tracker.listenable(peer.clone()).await {
debug!(?peer, ?address, "peer not listenable");
return;
};
}

// Perform handshake
let stream = match Connection::upgrade_listener(context, incoming).await {
Ok(connection) => connection,
Err(err) => {
debug!(?err, "failed to upgrade connection");
debug!(?err, ?peer, ?address, "failed to upgrade connection");
return;
}
};
debug!(?peer, ?address, "upgraded connection");
debug!(?peer, ?address, "completed handshake");

// Attempt to claim the connection
let Some(reservation) = tracker.listen(peer.clone()).await else {
debug!(?peer, ?address, "unable to reserve connection to peer");
return;
};
debug!(?peer, ?address, "reserved connection");

// Start peer to handle messages
supervisor.spawn(stream, reservation).await;
Expand Down
48 changes: 48 additions & 0 deletions p2p/src/authenticated/lookup/actors/tracker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ impl<E: Spawner + Rng + Clock + GClock + RuntimeMetrics, C: Signer> Actor<E, C>
} => {
let _ = reservation.send(self.directory.dial(&public_key));
}
Message::Listenable {
public_key,
responder,
} => {
let _ = responder.send(self.directory.listenable(&public_key));
}
Message::Listen {
public_key,
reservation,
Expand Down Expand Up @@ -288,6 +294,44 @@ mod tests {
});
}

#[test]
fn test_listenable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (peer_signer, peer_pk) = new_signer_and_pk(1);
let peer_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001);
let (_peer_signer2, peer_pk2) = new_signer_and_pk(2);
let peer_addr2 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1002);
let (_peer_signer3, peer_pk3) = new_signer_and_pk(3);
let cfg_initial = default_test_config(peer_signer);
let TestHarness {
mut mailbox,
mut oracle,
..
} = setup_actor(context.clone(), cfg_initial);

// None listenable because not registered
assert!(!mailbox.listenable(peer_pk.clone()).await);
assert!(!mailbox.listenable(peer_pk2.clone()).await);
assert!(!mailbox.listenable(peer_pk3.clone()).await);

oracle
.register(
0,
vec![(peer_pk.clone(), peer_addr), (peer_pk2.clone(), peer_addr2)],
)
.await;
context.sleep(Duration::from_millis(10)).await;

// Not listenable because self
assert!(!mailbox.listenable(peer_pk).await);
// Listenable because registered
assert!(mailbox.listenable(peer_pk2).await);
// Not listenable because not registered
assert!(!mailbox.listenable(peer_pk3).await);
});
}

#[test]
fn test_listen() {
let executor = deterministic::Runner::default();
Expand All @@ -308,9 +352,13 @@ mod tests {
oracle.register(0, vec![(peer_pk.clone(), peer_addr)]).await;
context.sleep(Duration::from_millis(10)).await; // Allow register to process

assert!(mailbox.listenable(peer_pk.clone()).await);

let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());

assert!(!mailbox.listenable(peer_pk.clone()).await);

let failed_reservation = mailbox.listen(peer_pk.clone()).await;
assert!(failed_reservation.is_none());

Expand Down
33 changes: 20 additions & 13 deletions p2p/src/authenticated/lookup/actors/tracker/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,6 @@ impl<E: Spawner + Rng + Clock + GClock + RuntimeMetrics, C: PublicKey> Directory
deleted_peers
}

/// Returns a vector of dialable peers. That is, unconnected peers for which we have a socket.
pub fn dialable(&self) -> Vec<C> {
// Collect peers with known addresses
let mut result: Vec<_> = self
.peers
.iter()
.filter(|&(_, r)| r.dialable(self.allow_private_ips))
.map(|(peer, _)| peer.clone())
.collect();
result.sort();
result
}

/// Attempt to reserve a peer for the dialer.
///
/// Returns `Some` on success, `None` otherwise.
Expand Down Expand Up @@ -200,6 +187,26 @@ impl<E: Spawner + Rng + Clock + GClock + RuntimeMetrics, C: PublicKey> Directory
.is_some_and(|r| r.allowed(self.allow_private_ips))
}

/// Returns a vector of dialable peers. That is, unconnected peers for which we have a socket.
pub fn dialable(&self) -> Vec<C> {
// Collect peers with known addresses
let mut result: Vec<_> = self
.peers
.iter()
.filter(|&(_, r)| r.dialable(self.allow_private_ips))
.map(|(peer, _)| peer.clone())
.collect();
result.sort();
result
}

/// Returns true if the peer is listenable.
pub fn listenable(&self, peer: &C) -> bool {
self.peers
.get(peer)
.is_some_and(|r| r.listenable(self.allow_private_ips))
}

// --------- Helpers ----------

/// Attempt to reserve a peer.
Expand Down
Loading
Loading