-
Notifications
You must be signed in to change notification settings - Fork 705
feat: support catalog update #567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## main #567 +/- ##
============================================
+ Coverage 71.28% 71.56% +0.27%
Complexity 2706 2706
============================================
Files 895 895
Lines 51238 51538 +300
Branches 1730 1730
============================================
+ Hits 36526 36881 +355
+ Misses 13897 13842 -55
Partials 815 815
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
fuyufjh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
| /// `update_database` is called in `start` method. | ||
| /// It calls `create_database` and `drop_database` of `CatalogCache`. | ||
| fn update_database(&self, operation: Operation, database: Database) { | ||
| debug!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend using info!
| Operation::Delete => catalog_cache_guard | ||
| .drop_table(&db_name, &schema_name, &table.table_name) | ||
| .unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So even for Delete operation, the meta server needs to send the full description of the object instead of the name only ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So even for
Deleteoperation, the meta server needs to send the full description of the object instead of the name only ?
Actually sending a object is also not easy for create/drop because we should use ***_ref_id to find the name of database or schema. It can just simplify proto file.
rust/frontend/src/session.rs
Outdated
|
|
||
| let mut observer_manager = ObserverManager::new(meta_client.clone(), host).await; | ||
|
|
||
| let catalog_cache = Arc::new(Mutex::new(CatalogCache::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should fetch catalog from meta firstly to init CatalogCache, and if dev database/schema already exist, ignore creating them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should fetch catalog from meta firstly to init CatalogCache, and if dev database/schema already exist, ignore creating them.
I consider to init dev database/schema when meta server starts. I plan to move init code to meta later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but for dev and other catalog already exist in our cluster, there's no create operation when meta start or frontend reboot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We should call GetCatalog rpc when frontend starts.
|
|
||
| fn create_schema( | ||
| pub fn get_database_name(&self, db_id: DatabaseId) -> Option<String> { | ||
| Some(self.db_name_by_id.get(&db_id)?.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Some(self.db_name_by_id.get(&db_id)?.clone()) | |
| self.db_name_by_id.get(&db_id).cloned() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggested change
Some(self.db_name_by_id.get(&db_id)?.clone())
self.db_name_by_id.get(&db_id).cloned()
self.db_name_by_id.get(&db_id).cloned() will create a Option<&String>, &String requires the lifetime of &self is longer than itself. Then if &String exists, we can't get a &mut self to update schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's cloned instead of clone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's
clonedinstead ofclone.
Sorry, it's my fault. I will change it, thanks.
rust/frontend/src/session.rs
Outdated
| let catalog_cache = Arc::new(Mutex::new(CatalogCache::new())); | ||
| let catalog_manager = CatalogConnector::new(meta_client.clone(), catalog_cache.clone()); | ||
|
|
||
| observer_manager.set_catalog_cache(catalog_cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we directly pass this catalog_cache to ObserveManager::new? Ditto for WorkerNodeManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we directly pass this
catalog_cachetoObserveManager::new? Ditto forWorkerNodeManager.
I consider that it is not easy to collect all members of ObserverManager. May be we will transfer the quote of ObserverManager to set some member in the future. We can change this code in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? There must be some way to initialize these managers one by one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will update the code in the next commit.
| .worker_node_manager | ||
| .as_ref() | ||
| .expect("forget to call set_worker_node_manager before call start"); | ||
| .expect("forget to call `set_worker_node_manager` before call start"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems set_worker_node_manager is unused now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems
set_worker_node_manageris unused now?
Yes. It seems that BatchScheduler is not used, which contains WorkerNodeManager.
BowenXiao1999
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. So one frontend node will have exactly one observe mgr?
BTW, suggest add a Tracking issue for this project, to make the roadmap more clear.
Yes. Now there is only one |
|
I add a member For example, when frontend creates a database, it calls And there is a problem I will solve it in the future. This state is only designed for the competition, but not duplicate data. If we create a database in the frontend, and finish sync, the state of database is |
|
I'm thinking whether the state is necessary. I remeber that in previous code, create table will just call RPC and do nothing in cache (?), waiting for ObserverManager to update local database (?). Any reason to introduce this? |
In the first commit, I deleted the operation which changes |
|
Or may be meta should wait until all observer managers successfully update? Not sure |
It's a good idea and it can work. It just requires frontend to design another RPC message sent to meta. I discuss with @yezizp2012 . We decide to use |
6cc324e to
3b780c7
Compare
|
After discussions at yesterday's meeting, we decided to go with this option: |
| pub async fn new( | ||
| client: MetaClient, | ||
| addr: SocketAddr, | ||
| catalog_cache: Arc<Mutex<CatalogCache>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing worker node manager parameter.
| while self | ||
| .catalog_cache | ||
| .lock() | ||
| .get_table(db_name, schema_name, &table.table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still looks strange for me to identifying table by its name :(, seems there will be possible race cases.
Considering the create_table RPC returns a TableId, how about waiting for the occurrence of this id? Note that the id is always guaranteed to be unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still looks strange for me to identifying table by its name :(, seems there will be possible race cases.
Considering thecreate_tableRPC returns aTableId, how about waiting for the occurrence of this id? Note that the id is always guaranteed to be unique.
When will there be race cases? If two frontends create a same table in same schema and database, I think we should reject it in meta, although this part has not been implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this🤣. However I believe looking for table id will be more robust.
| rx: Streaming<SubscribeResponse>, | ||
| worker_node_manager: Option<WorkerNodeManagerRef>, | ||
| worker_node_manager: Option<WorkerNodeManagerRef>, /* Option<> will be removed when `WorkerNodeManager` is used in code. */ | ||
| catalog_cache: Arc<Mutex<CatalogCache>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| catalog_cache: Arc<Mutex<CatalogCache>>, | |
| catalog_cache: Arc<RwLock<CatalogCache>>, |
Suggest using RwLock here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will reading the catalog be more frequent than writing the catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, when we're waiting for the table to be created, we'll frequently read the catalog.
| &Info::Database(database), | ||
| NotificationTarget::Frontend, | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the plan for handling the notifying error? Seems notify is a try_join_all for notifications to all nodes, and an error on one of them will abort the whole process, which will lead to an inconsistent situation.
Should there be a way to ignore notification error on some frontends, or a way to record notifying process for each node with corresponding retry strategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or just leave a TODO here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will add a TODO here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be handled when error occurred, as i list in the error handling part in #361. For this PR, you can leave a TODO here.
| .is_none() | ||
| { | ||
| info!("Wait to create table: {}", table.table_name); | ||
| time::sleep(Duration::from_micros(10)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about use watch channel for this case? A reference might be:
https://github.com/singularity-data/risingwave-dev/blob/3725111b990b99e5dbc64b43a75e1cc779b33849/rust/storage/src/hummock/local_version_manager.rs#L151-L172
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose that there are two frontends A and B. If A creates a database, not only A's but also B's CatalogCache will be update by ObserverManager. Now ObserverManager doesn't know that the notification from meta is who creates. I think if B's ObserverManager also sends a message in B's channel, it's wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wrap changed in the loop as well. :)
c3f0c8d to
16798f5
Compare
| self.catalog_updated_rx | ||
| .to_owned() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should clone this rx before the RPC from meta_client since the data before cloning is always considered to be seen. We may lose notification in this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there document about this? Or I misunderstand the code. I I browse the code of its Clone implement, it just copys the value of version and pointer of the shared state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there document about this? Or I misunderstand the code. I I browse the code of its Clone implement, it just copys the value of version and pointer of the shared state.
Can't clone in while loop. If there is an unmarked message before clone, program will be stuck here.
| .to_owned() | ||
| .changed() | ||
| .await | ||
| .map_err(|e| RwError::from(InternalError(e.to_string())))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The while loop may still miss some updates in some concurrent scenarios, for example
- Frontend A creates a database
D - Frontend A got updates from
catalog_updated_rx - Someone else drops database
Dimmediately - Frontend A continues from step 2 and check
catalog_cachebut didn't foundD - Frontend A hangs forever
Since almost all DDL statements need to wait for catalog updates, I think we need some mechanism to let the frontend wait for a specific request_id or catalog_version or catalog_epoch, which would be piggy-backed in the Notification RPC to allow frontend DDL executor to wait for it.
This is not very easy. We may leave a TODO here and fix it later.
|
Now in unit test, we will not use RPC. However, the type of return value of async fn subscribe(&self, request: SubscribeRequest) -> Result<Streaming<SubscribeResponse>> { ... }It seems that we have no way to manually convert async fn subscribe(&self, request: SubscribeRequest) -> Result<Box<dyn NotificationStream>> { ... }
#[async_trait::async_trait]
pub trait NotificationStream: Send {
/// Ok(Some) => receive a `SubscribeResponse`.
/// Ok(None) => stream terminates.
/// Err => error happens.
async fn next(&mut self) -> Result<Option<SubscribeResponse>>;
}
#[async_trait::async_trait]
impl NotificationStream for Streaming<SubscribeResponse> { ... }
#[async_trait::async_trait]
impl NotificationStream for Receiver<std::result::Result<SubscribeResponse, Status>> { ... } |
What's changed and what's your intention?
Support catalog update by
NotificationService.Note
There is a lock competition problem here. When we create a database and access it immediately, the code of
ObserverManagerupdate and the code of immediate access will compete for the lock onCatalogCacheat the same time.In the existing code, it may cause the frontend to fail to start, reporting an error that a database named "dev" does not exist.. This is because there is a continuous creation of a database named "dev" and a schema under this database in the frontend (Although we will move these creation to meta later, it can reflect this competition for the lock).
I put the code for getting the lock in the
ObserverManagerearlier. But it doesn't always work.Checklist
Refer to a related PR or issue link (optional)
related #361