Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ message InjectBarrierRequest {
message InjectBarrierResponse {
string request_id = 1;
common.Status status = 2;
repeated uint32 failed_actor_ids = 3;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
Expand Down
4 changes: 3 additions & 1 deletion rust/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,16 @@ impl StreamService for StreamServiceImpl {
let barrier =
Barrier::from_protobuf(req.get_barrier().map_err(tonic_err)?).map_err(tonic_err)?;

self.mgr
let failed_actor_ids = self
.mgr
.send_and_collect_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.await
.map_err(|e| e.to_grpc_status())?;

Ok(Response::new(InjectBarrierResponse {
request_id: req.request_id,
status: None,
failed_actor_ids: failed_actor_ids.into_iter().collect(),
}))
}
}
49 changes: 33 additions & 16 deletions rust/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::iter::once;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -21,6 +21,7 @@ use self::info::BarrierActorInfo;
use crate::cluster::{StoredClusterManager, StoredClusterManagerRef};
use crate::hummock::HummockManager;
use crate::manager::{EpochGeneratorRef, MetaSrvEnv, StreamClientsRef, INVALID_EPOCH};
use crate::model::ActorId;
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MetaStore;
use crate::stream::FragmentManagerRef;
Expand Down Expand Up @@ -181,7 +182,10 @@ where
let mut min_interval = tokio::time::interval(Self::INTERVAL);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

// TODO: these states should be persisted!
let mut ignored_actors = HashSet::<ActorId>::new();
let mut prev_epoch = INVALID_EPOCH;

loop {
tokio::select! {
// Wait for the minimal interval,
Expand Down Expand Up @@ -214,8 +218,14 @@ where
let new_epoch = self.epoch_generator.generate()?.into_inner();

let collect_futures = info.node_map.iter().filter_map(|(node_id, node)| {
let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec();
let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec();
let actor_ids_to_send = info
.actor_ids_to_send(node_id)
.filter(|id| !ignored_actors.contains(id))
.collect_vec();
let actor_ids_to_collect = info
.actor_ids_to_collect(node_id)
.filter(|id| !ignored_actors.contains(id))
.collect_vec();

if actor_ids_to_send.is_empty() || actor_ids_to_collect.is_empty() {
// No need to send barrier for this node.
Expand Down Expand Up @@ -254,33 +264,40 @@ where
// worker node has not collected this barrier, it might should retry.
// 3. BarrierManager should distinguish failures and let `StreamManager`
// cancel related jobs.
client.inject_barrier(request).await.to_rw_result()?;
let response = client.inject_barrier(request).await.to_rw_result()?;
let failed_actors = response.into_inner().failed_actor_ids;

Ok::<_, RwError>(())
Ok::<_, RwError>(failed_actors)
}
.into()
}
});

let mut notifiers = notifiers;
notifiers.iter_mut().for_each(Notifier::notify_to_send);

// Wait all barriers collected.
let timer = self.metrics.barrier_latency.start_timer();
// wait all barriers collected
let collect_result = try_join_all(collect_futures).await;
let collect_result: Result<Vec<Vec<ActorId>>> = try_join_all(collect_futures).await;
timer.observe_duration();

// Commit Hummock epoch.
if prev_epoch != INVALID_EPOCH {
match collect_result {
Ok(_) => {
self.hummock_manager.commit_epoch(prev_epoch).await?;
}
Err(err) => {
self.hummock_manager.abort_epoch(prev_epoch).await?;
return Err(err);
}
match &collect_result {
Ok(_) => self.hummock_manager.commit_epoch(prev_epoch).await?,
Err(_) => self.hummock_manager.abort_epoch(prev_epoch).await?,
};
}
prev_epoch = new_epoch;
command_context.post_collect().await?; // do some post stuffs

// Record failed actors.
// TODO: these states should be persisted in actor's status.
let failed_actors = collect_result?.into_iter().flatten();
ignored_actors.extend(failed_actors);

// Do some post stuffs.
command_context.post_collect().await?;

notifiers.iter_mut().for_each(Notifier::notify_collected);
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ mod tests {
Ok(Response::new(InjectBarrierResponse {
request_id: "".to_string(),
status: None,
failed_actor_ids: Default::default(),
}))
}
}
Expand Down
107 changes: 45 additions & 62 deletions rust/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,25 @@ use crate::executor::*;
/// Note that this option will significantly increase the overhead of tracing.
pub const ENABLE_BARRIER_AGGREGATION: bool = false;

pub type FailedActors = HashSet<u32>;

struct ManagedBarrierState {
epoch: u64,

/// Notify that the collection is finished.
collect_notifier: oneshot::Sender<()>,
collect_notifier: oneshot::Sender<FailedActors>,

/// Actor ids remaining to be collected.
remaining_actors: HashSet<u32>,

/// Actor ids that are reported failed.
failed_actors: FailedActors,
}

impl ManagedBarrierState {
fn notify(self) {
// Notify about barrier finishing.
if self.collect_notifier.send(()).is_err() {
if self.collect_notifier.send(self.failed_actors).is_err() {
warn!(
"failed to notify barrier collection with epoch {}: rx is dropped",
self.epoch
Expand Down Expand Up @@ -60,9 +65,6 @@ pub struct LocalBarrierManager {

/// Last epoch of barriers.
last_epoch: Option<u64>,

/// Blacklist of failed senders.
failed_senders: HashSet<u32>,
}

impl Default for LocalBarrierManager {
Expand All @@ -78,7 +80,6 @@ impl LocalBarrierManager {
span: tracing::Span::none(),
state,
last_epoch: None,
failed_senders: HashSet::new(),
}
}

Expand All @@ -101,83 +102,55 @@ impl LocalBarrierManager {
/// When actor is cancelled or aborted, it should call this method to withdraw itself.
/// For example, a source actor might be cancelled because of Kafka stream timeout,
/// which would cause a chain of actors (itself and the downstream actors to abort).
/// TODO(#742): Should let global stream manager know the cancelled actors and decide what
/// to do.
// TODO: this is a workaround. When an actor (especially the source) encounters a problem, it
// should first report the situation to the meta service but not fails itself, and then let the
// meta decide whether to send the stop barrier to gracefully exit the actor.
pub fn withdraw_actor(&mut self, actor_id: u32) {
debug!("withdraw actor: {}", actor_id);
// Add it to local black list.
self.failed_senders.insert(actor_id);

if let BarrierState::Managed(managed_state) = &mut self.state {
let mut need_to_notify: bool = false;

// If the barrier state has been initialized, which means `send_barrier` has been
// called.
if let Some(barrier_state) = managed_state.as_mut() {
if barrier_state.remaining_actors.remove(&actor_id) {
// If the actor is the last one to be withdrawn.
if barrier_state.remaining_actors.is_empty() {
need_to_notify = true;
}
}
}

if need_to_notify {
// Reset this round of barrier and notify local stream manager.
let state = managed_state.take().unwrap();
state.notify();
}
// If the barrier state has been initialized, which means `send_barrier` has been
// called.
if let BarrierState::Managed(Some(state)) = &mut self.state {
// Add it to local black list.
state.failed_actors.insert(actor_id);
state.remaining_actors.remove(&actor_id);

self.may_notify_finishing();
}
}

/// Broadcast a barrier to all senders. Returns a receiver which will get notified when this
/// barrier is finished, in managed mode.
/// Returns `Ok(None)` is receiver is not available.
/// TODO: async collect barrier flush state from hummock.
pub fn send_barrier(
&mut self,
barrier: &Barrier,
actor_ids_to_send: impl IntoIterator<Item = u32>,
actor_ids_to_collect: impl IntoIterator<Item = u32>,
) -> Result<Option<oneshot::Receiver<()>>> {
) -> Result<Option<oneshot::Receiver<FailedActors>>> {
let to_send = {
let mut to_send: HashSet<u32> = actor_ids_to_send.into_iter().collect();
// For failed actors that have been locally observed
// and not tracked by global stream manager, untrack them.
// TODO(#742): Global stream manager should know failed actors and untrack them.
to_send.retain(|actor_id| !self.failed_senders.contains(actor_id));

match &self.state {
BarrierState::Local => {
if to_send.is_empty() {
to_send = self.senders.keys().cloned().collect()
}
}
BarrierState::Managed(_) => {
// If there's no actor to send, return.
if to_send.is_empty() {
return Ok(None);
}
// There must be some actors to send to.
assert!(!to_send.is_empty());
}
}
to_send
};
let mut to_collect: HashSet<u32> = actor_ids_to_collect.into_iter().collect();

let to_collect: HashSet<u32> = actor_ids_to_collect.into_iter().collect();
trace!(
"send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}",
barrier,
to_send,
to_collect
);
// For failed actors that have been locally observed
// and not tracked by global stream manager, untrack them.
// TODO(#742): Global stream manager should know failed actors and untrack them.
to_collect.retain(|actor_id| !self.failed_senders.contains(actor_id));

info!(
"send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}",
barrier, to_send, to_collect
);

let rx = match &mut self.state {
BarrierState::Local => None,
Expand All @@ -193,6 +166,7 @@ impl LocalBarrierManager {
epoch: barrier.epoch.curr,
collect_notifier: tx,
remaining_actors: to_collect,
failed_actors: Default::default(),
});

Some(rx)
Expand All @@ -205,15 +179,15 @@ impl LocalBarrierManager {
.get(&actor_id)
.unwrap_or_else(|| panic!("sender for actor {} does not exist", actor_id));

match sender.send(Message::Barrier(barrier.clone())) {
Ok(it) => it,
Err(_) => {
// Fail to send barrier to this actor (e.g. Kafka source timeout).
error!("[LocalBarrierManager] Send barrier to actor {} failed. Make sure target actor is still alive.", actor_id);
// TODO: this is a workaround.
if let Err(err) = sender.send(Message::Barrier(barrier.clone())) {
// Fail to send barrier to this actor (e.g. Kafka source timeout).
error!(
"Send barrier to actor {} failed: {}. Make sure target actor is still alive.",
actor_id, err
);

// // Remove this actor from the list of senders.
// self.senders.remove(&actor_id);
}
self.withdraw_actor(actor_id);
}
}

Expand Down Expand Up @@ -259,16 +233,25 @@ impl LocalBarrierManager {
.collect_vec()
);

if state.remaining_actors.is_empty() {
let state = managed_state.take().unwrap();
self.may_notify_finishing();
}
}

Ok(())
}

fn may_notify_finishing(&mut self) {
match &mut self.state {
BarrierState::Local => {}
BarrierState::Managed(state) => {
if state.as_mut().unwrap().remaining_actors.is_empty() {
let state = state.take().unwrap();
self.last_epoch = Some(state.epoch);
// Notify about barrier finishing.
state.notify();
}
}
}

Ok(())
}

/// Returns whether [`BarrierState`] is `Local`.
Expand Down
21 changes: 8 additions & 13 deletions rust/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_storage::{dispatch_state_store, Keyspace, StateStore, StateStoreI
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

use super::FailedActors;
use crate::executor::*;
use crate::task::{
ConsumableChannelPair, SharedContext, StreamEnvironment, UpDownActorIds,
Expand Down Expand Up @@ -105,10 +106,12 @@ impl StreamManager {
barrier: &Barrier,
actor_ids_to_send: impl IntoIterator<Item = u32>,
actor_ids_to_collect: impl IntoIterator<Item = u32>,
) -> Result<Option<oneshot::Receiver<()>>> {
) -> Result<oneshot::Receiver<FailedActors>> {
let core = self.core.lock().unwrap();
let mut barrier_manager = core.context.lock_barrier_manager();
let rx = barrier_manager.send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect)?;
let rx = barrier_manager
.send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect)?
.expect("no rx for local mode");
Ok(rx)
}

Expand All @@ -118,19 +121,11 @@ impl StreamManager {
barrier: &Barrier,
actor_ids_to_send: impl IntoIterator<Item = u32>,
actor_ids_to_collect: impl IntoIterator<Item = u32>,
) -> Result<()> {
) -> Result<FailedActors> {
let rx = self.send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect)?;

// Wait for all actors to finish this barrier.
match rx {
Some(rx) => {
info!("Awaiting rx.");
rx.await.unwrap();
}
None => {
error!("Failed to send barrier as rx is not valid.");
}
}
let failed_actors = rx.await.unwrap();

// Sync states from shared buffer to S3 before telling meta service we've done.
dispatch_state_store!(self.state_store(), store, {
Expand All @@ -145,7 +140,7 @@ impl StreamManager {
}
});

Ok(())
Ok(failed_actors)
}

/// Broadcast a barrier to all senders. Returns immediately, and caller won't be notified when
Expand Down