Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option java_multiple_files = true;
option java_package = "com.risingwave.proto.metanode";
option optimize_for = SPEED;

import "catalog.proto";
import "common.proto";
import "plan.proto";
import "stream_plan.proto";
Expand Down Expand Up @@ -245,9 +246,14 @@ message SubscribeResponse {
Operation operation = 2;
oneof info {
common.WorkerNode node = 3;
// TODO: remove below fields, using new message defined in catalog.
Database database = 4;
Schema schema = 5;
Table table = 6;
catalog.Database database_v2 = 7;
catalog.Schema schema_v2 = 8;
catalog.Table table_v2 = 9;
catalog.Source source = 10;
}
}

Expand Down
1 change: 1 addition & 0 deletions rust/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl ObserverManager {
self.update_table(operation, table)
.unwrap_or_else(|e| error!("{}", e.to_string()));
}
Some(_) => todo!(),
None => (),
}
}
Expand Down
6 changes: 3 additions & 3 deletions rust/meta/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub type ParallelUnitId = u32;
pub type NodeLocations = HashMap<NodeId, WorkerNode>;
pub type StoredClusterManagerRef<S> = Arc<StoredClusterManager<S>>;

const DEFAULT_WORKNODE_PARALLEL_DEGREE: usize = 8;
const DEFAULT_WORK_NODE_PARALLEL_DEGREE: usize = 8;

/// [`StoredClusterManager`] manager cluster/worker meta data in [`MetaStore`].
pub struct StoredClusterManager<S> {
Expand Down Expand Up @@ -107,10 +107,10 @@ where
let start_id = self
.id_gen_manager_ref
.generate_interval::<{ IdCategory::ParallelUnit }>(
DEFAULT_WORKNODE_PARALLEL_DEGREE as i32,
DEFAULT_WORK_NODE_PARALLEL_DEGREE as i32,
)
.await? as usize;
let parallel_units = (start_id..start_id + DEFAULT_WORKNODE_PARALLEL_DEGREE)
let parallel_units = (start_id..start_id + DEFAULT_WORK_NODE_PARALLEL_DEGREE)
.map(|id| ParallelUnit { id: id as u32 })
.collect_vec();

Expand Down
67 changes: 58 additions & 9 deletions rust/meta/src/manager/catalog_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use risingwave_common::catalog::CatalogVersion;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_pb::catalog::{Database, Schema, Source, Table};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::sync::Mutex;

use crate::manager::NotificationManagerRef;
use crate::manager::{NotificationManagerRef, NotificationTarget};
use crate::model::{CatalogVersionGenerator, MetadataModel};
use crate::storage::MetaStore;

Expand Down Expand Up @@ -53,7 +54,13 @@ where
database.insert(&*self.meta_store_ref).await?;
core.add_database(database);

// TODO: Notify frontends to create database.
self.nm
.notify(
Operation::Add,
&Info::DatabaseV2(database.to_owned()),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand All @@ -70,7 +77,13 @@ where
Database::delete(&*self.meta_store_ref, &database_id).await?;
core.drop_database(&database);

// TODO: Notify frontends to drop database.
self.nm
.notify(
Operation::Delete,
&Info::DatabaseV2(database),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand All @@ -86,7 +99,13 @@ where
schema.insert(&*self.meta_store_ref).await?;
core.add_schema(schema);

// TODO: Notify frontends to create schema.
self.nm
.notify(
Operation::Add,
&Info::SchemaV2(schema.to_owned()),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand All @@ -103,7 +122,13 @@ where
Schema::delete(&*self.meta_store_ref, &schema_id).await?;
core.drop_schema(&schema);

// TODO: Notify frontends to drop schema.
self.nm
.notify(
Operation::Delete,
&Info::SchemaV2(schema),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand All @@ -119,7 +144,13 @@ where
table.insert(&*self.meta_store_ref).await?;
core.add_table(table);

// TODO: Notify frontends to create schema.
self.nm
.notify(
Operation::Add,
&Info::TableV2(table.to_owned()),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand All @@ -136,7 +167,13 @@ where
Table::delete(&*self.meta_store_ref, &table_id).await?;
core.drop_table(&table);

// TODO: Notify frontends to drop table.
self.nm
.notify(
Operation::Delete,
&Info::TableV2(table),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand All @@ -152,7 +189,13 @@ where
source.insert(&*self.meta_store_ref).await?;
core.add_source(source);

// TODO: Notify frontends to create source.
self.nm
.notify(
Operation::Add,
&Info::Source(source.to_owned()),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand All @@ -169,7 +212,13 @@ where
Source::delete(&*self.meta_store_ref, &source_id).await?;
core.drop_source(&source);

// TODO: Notify frontends to drop source.
self.nm
.notify(
Operation::Delete,
&Info::Source(source),
NotificationTarget::Frontend,
)
.await?;
Ok(version)
} else {
Err(RwError::from(InternalError(
Expand Down
3 changes: 3 additions & 0 deletions rust/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub mod stream_service;
#[rustfmt::skip]
pub mod hummock;

#[rustfmt::skip]
#[path = "catalog.serde.rs"]
pub mod catalog_serde;
#[rustfmt::skip]
#[path = "common.serde.rs"]
pub mod common_serde;
Expand Down