Skip to content

Conversation

@HuaHuaY
Copy link
Contributor

@HuaHuaY HuaHuaY commented Feb 25, 2022

What's changed and what's your intention?

Support catalog update by NotificationService.

Note

There is a lock competition problem here. When we create a database and access it immediately, the code of ObserverManager update and the code of immediate access will compete for the lock on CatalogCache at the same time.

In the existing code, it may cause the frontend to fail to start, reporting an error that a database named "dev" does not exist.. This is because there is a continuous creation of a database named "dev" and a schema under this database in the frontend (Although we will move these creation to meta later, it can reflect this competition for the lock).

I put the code for getting the lock in the ObserverManager earlier. But it doesn't always work.

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests

Refer to a related PR or issue link (optional)

related #361

@github-actions github-actions bot added the type/feature Type: New feature. label Feb 25, 2022
@codecov
Copy link

codecov bot commented Feb 25, 2022

Codecov Report

Merging #567 (bc5962a) into main (9ddfe63) will increase coverage by 0.27%.
The diff coverage is 82.94%.

Impacted file tree graph

@@             Coverage Diff              @@
##               main     #567      +/-   ##
============================================
+ Coverage     71.28%   71.56%   +0.27%     
  Complexity     2706     2706              
============================================
  Files           895      895              
  Lines         51238    51538     +300     
  Branches       1730     1730              
============================================
+ Hits          36526    36881     +355     
+ Misses        13897    13842      -55     
  Partials        815      815              
Flag Coverage Δ
java 59.86% <ø> (ø)
rust 76.56% <82.94%> (+0.35%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
rust/meta/src/cluster/mod.rs 77.88% <ø> (+18.26%) ⬆️
rust/rpc_client/src/meta_client.rs 57.38% <70.83%> (+16.86%) ⬆️
rust/frontend/src/catalog/catalog_service.rs 86.82% <71.69%> (-11.01%) ⬇️
rust/frontend/src/observer/observer_manager.rs 85.18% <82.60%> (+85.18%) ⬆️
rust/frontend/src/test_utils.rs 85.18% <84.21%> (+5.18%) ⬆️
rust/meta/src/manager/catalog.rs 75.62% <88.46%> (+3.97%) ⬆️
rust/frontend/src/catalog/database_catalog.rs 92.30% <90.90%> (-1.81%) ⬇️
rust/frontend/src/scheduler/schedule.rs 89.84% <100.00%> (+7.92%) ⬆️
rust/frontend/src/session.rs 98.43% <100.00%> (+4.68%) ⬆️
rust/meta/src/rpc/server.rs 91.17% <100.00%> (ø)
... and 11 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9ddfe63...bc5962a. Read the comment docs.

Copy link
Collaborator

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

/// `update_database` is called in `start` method.
/// It calls `create_database` and `drop_database` of `CatalogCache`.
fn update_database(&self, operation: Operation, database: Database) {
debug!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend using info!

Comment on lines 202 to 204
Operation::Delete => catalog_cache_guard
.drop_table(&db_name, &schema_name, &table.table_name)
.unwrap(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So even for Delete operation, the meta server needs to send the full description of the object instead of the name only ?

Copy link
Contributor Author

@HuaHuaY HuaHuaY Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So even for Delete operation, the meta server needs to send the full description of the object instead of the name only ?

Actually sending a object is also not easy for create/drop because we should use ***_ref_id to find the name of database or schema. It can just simplify proto file.


let mut observer_manager = ObserverManager::new(meta_client.clone(), host).await;

let catalog_cache = Arc::new(Mutex::new(CatalogCache::new()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should fetch catalog from meta firstly to init CatalogCache, and if dev database/schema already exist, ignore creating them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should fetch catalog from meta firstly to init CatalogCache, and if dev database/schema already exist, ignore creating them.

I consider to init dev database/schema when meta server starts. I plan to move init code to meta later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but for dev and other catalog already exist in our cluster, there's no create operation when meta start or frontend reboot.

Copy link
Contributor Author

@HuaHuaY HuaHuaY Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should call GetCatalog rpc when frontend starts.


fn create_schema(
pub fn get_database_name(&self, db_id: DatabaseId) -> Option<String> {
Some(self.db_name_by_id.get(&db_id)?.clone())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Some(self.db_name_by_id.get(&db_id)?.clone())
self.db_name_by_id.get(&db_id).cloned()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Some(self.db_name_by_id.get(&db_id)?.clone())
self.db_name_by_id.get(&db_id).cloned()

self.db_name_by_id.get(&db_id).cloned() will create a Option<&String>, &String requires the lifetime of &self is longer than itself. Then if &String exists, we can't get a &mut self to update schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's cloned instead of clone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's cloned instead of clone.

Sorry, it's my fault. I will change it, thanks.

let catalog_cache = Arc::new(Mutex::new(CatalogCache::new()));
let catalog_manager = CatalogConnector::new(meta_client.clone(), catalog_cache.clone());

observer_manager.set_catalog_cache(catalog_cache);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we directly pass this catalog_cache to ObserveManager::new? Ditto for WorkerNodeManager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we directly pass this catalog_cache to ObserveManager::new? Ditto for WorkerNodeManager.

I consider that it is not easy to collect all members of ObserverManager. May be we will transfer the quote of ObserverManager to set some member in the future. We can change this code in the future.

Copy link
Member

@BugenZhao BugenZhao Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? There must be some way to initialize these managers one by one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will update the code in the next commit.

.worker_node_manager
.as_ref()
.expect("forget to call set_worker_node_manager before call start");
.expect("forget to call `set_worker_node_manager` before call start");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems set_worker_node_manager is unused now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems set_worker_node_manager is unused now?

Yes. It seems that BatchScheduler is not used, which contains WorkerNodeManager.

Copy link
Contributor

@BowenXiao1999 BowenXiao1999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM. So one frontend node will have exactly one observe mgr?

BTW, suggest add a Tracking issue for this project, to make the roadmap more clear.

@HuaHuaY
Copy link
Contributor Author

HuaHuaY commented Feb 28, 2022

Generally LGTM. So one frontend node will have exactly one observe mgr?

BTW, suggest add a Tracking issue for this project, to make the roadmap more clear.

Yes. Now there is only one ObserverManager in one frontend node.

@HuaHuaY
Copy link
Contributor Author

HuaHuaY commented Feb 28, 2022

I add a member state: CatalogState of Database/Schema/Table, which represents the state of the current catalog, whether it's just created and not synced, or synced.

For example, when frontend creates a database, it calls Create RPC first. Then it will add a database with state CatalogState::Standalone to CatalogCache. At the same time, ObserverManager tries to add a database with state CatalogState::Synchronous to CatalogCache. Both of them call CatalogCache::create_database.
If create_database is called with Standalone first, Synchronous second, it just changes the state of database from Standalone to Synchronous. If Synchronous first, Standalone second, it does nothing.

And there is a problem I will solve it in the future. This state is only designed for the competition, but not duplicate data. If we create a database in the frontend, and finish sync, the state of database is Synchronous. Then we create the same database in the frontend again and create_database is called with Standalone first, it will do nothing and not give an error. I will add duplicate check in meta in the future.

@BowenXiao1999
Copy link
Contributor

I'm thinking whether the state is necessary. I remeber that in previous code, create table will just call RPC and do nothing in cache (?), waiting for ObserverManager to update local database (?). Any reason to introduce this?

@HuaHuaY
Copy link
Contributor Author

HuaHuaY commented Mar 1, 2022

I'm thinking whether the state is necessary. I remeber that in previous code, create table will just call RPC and do nothing in cache (?), waiting for ObserverManager to update local database (?). Any reason to introduce this?

In the first commit, I deleted the operation which changes CatalogCache. There is a problem that if I create a table, then read it, because Create RPC will return when meta sends notification successfully but not frontend has read notification, the created database may not be updated by ObserverManager in time. I will get a "database not found". So in the second commit, I added back the code that modifies the CatalogCache. Then there were two piece of code that create the same database, I should solve the duplicate database problem.

@BowenXiao1999
Copy link
Contributor

Or may be meta should wait until all observer managers successfully update? Not sure

@HuaHuaY
Copy link
Contributor Author

HuaHuaY commented Mar 1, 2022

Or may be meta should wait until all observer managers successfully update? Not sure

It's a good idea and it can work. It just requires frontend to design another RPC message sent to meta.

I discuss with @yezizp2012 . We decide to use node_id of CreateRequest to filter the node who sent the message when meta sends notifications. Maybe I should change members of NotificationManager from DashMap<WorkerKey, Sender<Notification>> into DashMap<WorkerId, Sender<Notification>>, change design of SubscribeRequest and save the return value of AddWorkerNode RPC and so on.

@HuaHuaY HuaHuaY force-pushed the zehua/support_catalog_update branch from 6cc324e to 3b780c7 Compare March 2, 2022 02:48
@HuaHuaY
Copy link
Contributor Author

HuaHuaY commented Mar 2, 2022

After discussions at yesterday's meeting, we decided to go with this option:
When CatalogConnector calls create_database, it first sends a Create RPC to meta. Then it calls catalog_cache.get_database in a loop until the return value is not None. It waits for ObserverManager to create database.

pub async fn new(
client: MetaClient,
addr: SocketAddr,
catalog_cache: Arc<Mutex<CatalogCache>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing worker node manager parameter.

while self
.catalog_cache
.lock()
.get_table(db_name, schema_name, &table.table_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still looks strange for me to identifying table by its name :(, seems there will be possible race cases.
Considering the create_table RPC returns a TableId, how about waiting for the occurrence of this id? Note that the id is always guaranteed to be unique.

Copy link
Contributor Author

@HuaHuaY HuaHuaY Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still looks strange for me to identifying table by its name :(, seems there will be possible race cases.
Considering the create_table RPC returns a TableId, how about waiting for the occurrence of this id? Note that the id is always guaranteed to be unique.

When will there be race cases? If two frontends create a same table in same schema and database, I think we should reject it in meta, although this part has not been implemented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this🤣. However I believe looking for table id will be more robust.

rx: Streaming<SubscribeResponse>,
worker_node_manager: Option<WorkerNodeManagerRef>,
worker_node_manager: Option<WorkerNodeManagerRef>, /* Option<> will be removed when `WorkerNodeManager` is used in code. */
catalog_cache: Arc<Mutex<CatalogCache>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
catalog_cache: Arc<Mutex<CatalogCache>>,
catalog_cache: Arc<RwLock<CatalogCache>>,

Suggest using RwLock here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will reading the catalog be more frequent than writing the catalog?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, when we're waiting for the table to be created, we'll frequently read the catalog.

&Info::Database(database),
NotificationTarget::Frontend,
)
.await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for handling the notifying error? Seems notify is a try_join_all for notifications to all nodes, and an error on one of them will abort the whole process, which will lead to an inconsistent situation.
Should there be a way to ignore notification error on some frontends, or a way to record notifying process for each node with corresponding retry strategy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just leave a TODO here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will add a TODO here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be handled when error occurred, as i list in the error handling part in #361. For this PR, you can leave a TODO here.

.is_none()
{
info!("Wait to create table: {}", table.table_name);
time::sleep(Duration::from_micros(10)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose that there are two frontends A and B. If A creates a database, not only A's but also B's CatalogCache will be update by ObserverManager. Now ObserverManager doesn't know that the notification from meta is who creates. I think if B's ObserverManager also sends a message in B's channel, it's wrong.

Copy link
Member

@BugenZhao BugenZhao Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wrap changed in the loop as well. :)

@HuaHuaY HuaHuaY force-pushed the zehua/support_catalog_update branch from c3f0c8d to 16798f5 Compare March 3, 2022 02:38
Comment on lines 411 to 412
self.catalog_updated_rx
.to_owned()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should clone this rx before the RPC from meta_client since the data before cloning is always considered to be seen. We may lose notification in this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there document about this? Or I misunderstand the code. I I browse the code of its Clone implement, it just copys the value of version and pointer of the shared state.

Copy link
Contributor Author

@HuaHuaY HuaHuaY Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there document about this? Or I misunderstand the code. I I browse the code of its Clone implement, it just copys the value of version and pointer of the shared state.

Can't clone in while loop. If there is an unmarked message before clone, program will be stuck here.

.to_owned()
.changed()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The while loop may still miss some updates in some concurrent scenarios, for example

  1. Frontend A creates a database D
  2. Frontend A got updates from catalog_updated_rx
  3. Someone else drops database D immediately
  4. Frontend A continues from step 2 and check catalog_cache but didn't found D
  5. Frontend A hangs forever

Since almost all DDL statements need to wait for catalog updates, I think we need some mechanism to let the frontend wait for a specific request_id or catalog_version or catalog_epoch, which would be piggy-backed in the Notification RPC to allow frontend DDL executor to wait for it.

This is not very easy. We may leave a TODO here and fix it later.

@HuaHuaY
Copy link
Contributor Author

HuaHuaY commented Mar 3, 2022

Now in unit test, we will not use RPC. However, the type of return value of subscribe RPC in the meta client is tonic::Streaming<>, which is changed from ReceiverStream by tonic.

async fn subscribe(&self, request: SubscribeRequest) -> Result<Streaming<SubscribeResponse>> { ... }

It seems that we have no way to manually convert ReceiverStream to tonic::Streaming<>. I create a trait NotificationStream which has only one method next, used in the return value of subscribe. And I implement this trait for tonic::Streaming<> and ReceiverStream` respectively.

async fn subscribe(&self, request: SubscribeRequest) -> Result<Box<dyn NotificationStream>> { ... }

#[async_trait::async_trait]
pub trait NotificationStream: Send {
    /// Ok(Some) => receive a `SubscribeResponse`.
    /// Ok(None) => stream terminates.
    /// Err => error happens.
    async fn next(&mut self) -> Result<Option<SubscribeResponse>>;
}

#[async_trait::async_trait]
impl NotificationStream for Streaming<SubscribeResponse> { ... }

#[async_trait::async_trait]
impl NotificationStream for Receiver<std::result::Result<SubscribeResponse, Status>> { ... }

@HuaHuaY HuaHuaY merged commit 01615a2 into main Mar 3, 2022
@HuaHuaY HuaHuaY deleted the zehua/support_catalog_update branch March 3, 2022 09:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/feature Type: New feature.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants