-
Notifications
You must be signed in to change notification settings - Fork 111
[consensus::marshal] Fix: Responses from broadcast::buffered
are not immediately processed
#1381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Makes sense to me (after doing a quick scan here) |
f339dc1
to
6717430
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic/architecture looks solid. Left some small questions/nits.
/// | ||
/// If the pool is empty, the future will never resolve. | ||
/// Returns `Ok(T)` for successful completion or `Err(Aborted)` for aborted futures. | ||
pub fn next_completed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: next()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next_completed
for consistency with futures::Pool
above. I did originally consider next()
for this, but decided not to for some reason (maybe too close to stream next?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think next()
in Rust for this is pretty generic (at least I interpret as the correct method name for something like this...we can change in the future but 🤷 ).
consensus/src/marshal/actor.rs
Outdated
let (tx, rx) = oneshot::channel(); | ||
buffer.subscribe_prepared(None, commitment, None, tx).await; | ||
entry.insert(Waiter::new(rx, response)); | ||
let hook = waiters.push(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it makes sense to make a "wrapper" struct that handles the waiters and listeners?
The logic you introduced here looks solid but seems it could be simplified if we introduced one additional util (unless the generics get too bad)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree there could be (?) room for a util here but not yet confident that I have an idea for one that's general enough yet to be useful in multiple places. In the future I think
(schemes, peers, identity, shares) | ||
} | ||
|
||
async fn setup_network_links(oracle: &mut Oracle<P>, peers: &[P], link: Link) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JFYI I asked Denis to make some of these helpers exported functions in p2p::simulated
to avoid further duplication of these utils: #1344 (comment)
Codecov Report❌ Patch coverage is
@@ Coverage Diff @@
## main #1381 +/- ##
==========================================
+ Coverage 91.34% 91.50% +0.16%
==========================================
Files 265 265
Lines 66706 67023 +317
==========================================
+ Hits 60930 61330 +400
+ Misses 5776 5693 -83
... and 27 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Fixes #1380