Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ message Dispatcher {
// A StreamActor is a running fragment of the overall stream graph,
message StreamActor {
uint32 actor_id = 1;
StreamNode nodes = 2;
Dispatcher dispatcher = 3;
uint32 fragment_id = 2;
StreamNode nodes = 3;
Dispatcher dispatcher = 4;
// Number of downstreams decides how many endpoints a dispatcher should dispatch.
repeated uint32 downstream_actor_id = 4;
repeated uint32 downstream_actor_id = 5;
}
1 change: 1 addition & 0 deletions rust/meta/src/stream/graph/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl StreamActorBuilder {
pub fn build(&self) -> StreamActor {
StreamActor {
actor_id: self.actor_id,
fragment_id: self.fragment_id,
nodes: Some(self.nodes.deref().clone()),
dispatcher: self.dispatcher.clone(),
downstream_actor_id: self.downstream_actors.iter().copied().collect(),
Expand Down
6 changes: 3 additions & 3 deletions rust/storage/src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ impl<S: StateStore> Keyspace<S> {
///
/// Note: when using shared keyspace, be caution to scan the keyspace since states of other
/// executors might be scanned as well.
pub fn shared_executor_root(store: S, operator_id: u32) -> Self {
pub fn shared_executor_root(store: S, operator_id: u64) -> Self {
let mut root = Self {
store,
prefix: Vec::with_capacity(5),
prefix: Vec::with_capacity(9),
};
root.push(Segment::root(b's'));
root.push(Segment::u32(operator_id));
root.push(Segment::u64(operator_id));
root
}

Expand Down
20 changes: 15 additions & 5 deletions rust/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl StreamManagerCore {
/// Create a chain(tree) of nodes, with given `store`.
fn create_nodes_inner(
&mut self,
fragment_id: u32,
actor_id: u32,
node: &stream_plan::StreamNode,
input_pos: usize,
Expand All @@ -353,7 +354,14 @@ impl StreamManagerCore {
.iter()
.enumerate()
.map(|(input_pos, input)| {
self.create_nodes_inner(actor_id, input, input_pos, env.clone(), store.clone())
self.create_nodes_inner(
fragment_id,
actor_id,
input,
input_pos,
env.clone(),
store.clone(),
)
})
.try_collect()?;

Expand All @@ -369,7 +377,7 @@ impl StreamManagerCore {
// We assume that the operator_id of different instances from the same RelNode will be the
// same.
let executor_id = ((actor_id as u64) << 32) + node.get_operator_id();
let operator_id = node.get_operator_id().try_into().unwrap();
let operator_id = ((fragment_id as u64) << 32) + node.get_operator_id();

let executor: Result<Box<dyn Executor>> = match node.get_node()? {
SourceNode(node) => {
Expand Down Expand Up @@ -551,7 +559,7 @@ impl StreamManagerCore {
executor_id,
condition,
)) as Box<dyn Executor>, )*
_ => todo!("Join type {:?} not inplemented", typ),
_ => todo!("Join type {:?} not implemented", typ),
}
}
}
Expand Down Expand Up @@ -662,12 +670,13 @@ impl StreamManagerCore {
/// Create a chain(tree) of nodes and return the head executor.
fn create_nodes(
&mut self,
fragment_id: u32,
actor_id: u32,
node: &stream_plan::StreamNode,
env: StreamTaskEnv,
) -> Result<Box<dyn Executor>> {
dispatch_state_store!(self.state_store.clone(), store, {
self.create_nodes_inner(actor_id, node, 0, env, store)
self.create_nodes_inner(fragment_id, actor_id, node, 0, env, store)
})
}

Expand Down Expand Up @@ -807,7 +816,8 @@ impl StreamManagerCore {
for actor_id in actors {
let actor_id = *actor_id;
let actor = self.actors.remove(&actor_id).unwrap();
let executor = self.create_nodes(actor_id, actor.get_nodes()?, env.clone())?;
let executor =
self.create_nodes(actor.fragment_id, actor_id, actor.get_nodes()?, env.clone())?;
let dispatcher = self.create_dispatcher(
executor,
actor.get_dispatcher()?,
Expand Down
1 change: 1 addition & 0 deletions rust/stream/src/task/test_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ async fn test_stream_mv_proto() {
};
let actor_proto = StreamActor {
actor_id: 1,
fragment_id: 1,
nodes: Some(mview_proto),
dispatcher: Some(Dispatcher {
r#type: DispatcherType::Simple as i32,
Expand Down
5 changes: 5 additions & 0 deletions rust/stream/src/task/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async fn test_stream_proto() {
// create 0 -> (1) -> 3
StreamActor {
actor_id: 1,
fragment_id: 1,
nodes: Some(StreamNode {
node: Some(Node::ProjectNode(ProjectNode::default())),
input: vec![StreamNode {
Expand Down Expand Up @@ -85,6 +86,7 @@ async fn test_stream_proto() {
// create 1 -> (3) -> 7, 11
StreamActor {
actor_id: 3,
fragment_id: 1,
nodes: Some(StreamNode {
node: Some(Node::ProjectNode(ProjectNode::default())),
input: vec![StreamNode {
Expand Down Expand Up @@ -114,6 +116,7 @@ async fn test_stream_proto() {
// create 3 -> (7) -> 13
StreamActor {
actor_id: 7,
fragment_id: 2,
nodes: Some(StreamNode {
node: Some(Node::ProjectNode(ProjectNode::default())),
input: vec![StreamNode {
Expand Down Expand Up @@ -143,6 +146,7 @@ async fn test_stream_proto() {
// create 3 -> (11) -> 13
StreamActor {
actor_id: 11,
fragment_id: 2,
nodes: Some(StreamNode {
node: Some(Node::ProjectNode(ProjectNode::default())),
input: vec![StreamNode {
Expand Down Expand Up @@ -172,6 +176,7 @@ async fn test_stream_proto() {
// create 7, 11 -> (13) -> 233
StreamActor {
actor_id: 13,
fragment_id: 3,
nodes: Some(StreamNode {
node: Some(Node::ProjectNode(ProjectNode::default())),
input: vec![StreamNode {
Expand Down