From 7913d8a3eb320de0e19a067fb46ff666792d5602 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 3 Mar 2022 16:00:04 +0800 Subject: [PATCH 1/5] feat(meta): use etcd as MetaStore backend Signed-off-by: TennyZhuang --- rust/Cargo.lock | 17 ++ rust/meta/Cargo.toml | 2 + rust/meta/src/lib.rs | 2 + rust/meta/src/storage/etcd_meta_store.rs | 212 +++++++++++++++++++++++ rust/meta/src/storage/meta_store.rs | 7 +- rust/meta/src/storage/mod.rs | 1 + rust/meta/src/storage/tests.rs | 7 +- 7 files changed, 245 insertions(+), 3 deletions(-) create mode 100644 rust/meta/src/storage/etcd_meta_store.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b406e1df211d8..7f2d12b7bb29c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1451,6 +1451,21 @@ dependencies = [ "version_check", ] +[[package]] +name = "etcd-client" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d267091320c099e5b53e81261e7fc01c5280d308da0444f7cd4ee94696569820" +dependencies = [ + "http", + "prost 0.9.0", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower-service", +] + [[package]] name = "event-listener" version = "2.5.2" @@ -3880,6 +3895,7 @@ name = "risingwave_meta" version = "0.1.3" dependencies = [ "anyhow", + "assert_matches", "async-recursion", "async-stream", "async-trait", @@ -3892,6 +3908,7 @@ dependencies = [ "crc32fast", "dashmap 5.1.0", "either", + "etcd-client", "futures 0.3.21", "hex", "hyper", diff --git a/rust/meta/Cargo.toml b/rust/meta/Cargo.toml index 472765d035bb0..a243d5fc38001 100644 --- a/rust/meta/Cargo.toml +++ b/rust/meta/Cargo.toml @@ -17,6 +17,7 @@ clap = { version = "3", features = ["derive"] } crc32fast = "1" dashmap = "5" either = "1" +etcd-client = "0.8" futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = "0.4" hyper = "0.14" @@ -58,6 +59,7 @@ tracing-subscriber = "0.3" uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] +assert_matches = "1" rand = "0.8" [[bin]] diff --git a/rust/meta/src/lib.rs b/rust/meta/src/lib.rs index 5428ff37eaf10..f6bbea6b6d2c0 100644 --- a/rust/meta/src/lib.rs +++ b/rust/meta/src/lib.rs @@ -11,6 +11,8 @@ #![feature(binary_heap_drain_sorted)] #![feature(const_fn_trait_bound)] #![feature(option_result_contains)] +#![feature(let_chains)] +#![feature(type_alias_impl_trait)] mod barrier; pub mod cluster; diff --git a/rust/meta/src/storage/etcd_meta_store.rs b/rust/meta/src/storage/etcd_meta_store.rs new file mode 100644 index 0000000000000..45c0ea059db9b --- /dev/null +++ b/rust/meta/src/storage/etcd_meta_store.rs @@ -0,0 +1,212 @@ +use std::sync::atomic::{self, AtomicI64}; + +use anyhow; +use async_trait::async_trait; +use etcd_client::{Client, Error as EtcdError, GetOptions, KvClient, Txn, Compare, CompareOp, TxnOp}; +use futures::Future; +use tokio::sync::Mutex; + +use super::{Error, Key, MetaStore, Result, Snapshot, Transaction, Value}; + +impl From for Error { + fn from(err: EtcdError) -> Self { + Error::Internal(anyhow::Error::new(err)) + } +} + +const REVISION_UNINITIALIZED: i64 = -1; + +pub struct EtcdMetaStore { + client: Client, +} +pub struct EtcdSnapshot { + client: KvClient, + revision: AtomicI64, + init_lock: Mutex<()>, +} + +// TODO: we can refine the key encoding before release. +fn encode_etcd_key(cf: &str, key: &[u8]) -> Vec { + let mut encoded_key = Vec::with_capacity(key.len() + cf.len() + 1); + encoded_key.extend_from_slice(cf.as_bytes()); + encoded_key.push(b'/'); + encoded_key.extend_from_slice(key); + encoded_key +} + +impl EtcdSnapshot { + async fn view_inner( + &self, + view: V + ) -> Result { + loop { + let revision = self.revision.load(atomic::Ordering::Relaxed); + if revision != REVISION_UNINITIALIZED { + // Fast and likely path. + let (_, output) = view.view(self.client.clone(), revision).await?; + return Ok(output); + } else { + // Slow path + let _g = self.init_lock.lock().await; + let revision = self.revision.load(atomic::Ordering::Relaxed); + if revision != REVISION_UNINITIALIZED { + // Double check failed, release the lock. + continue; + } + let (new_revision, output) = view.view(self.client.clone(), revision).await?; + self.revision.store(new_revision, atomic::Ordering::Release); + return Ok(output); + } + } + } +} + +trait SnapshotViewer { + type Output; + type OutputFuture<'a>: Future> + 'a + where + Self: 'a; + + fn view(&self, client: KvClient, revision: i64) -> Self::OutputFuture<'_>; +} + +struct GetViewer { + key: Vec, +} + +impl SnapshotViewer for GetViewer { + type Output = Vec; + type OutputFuture<'a> = impl Future> + 'a; + + fn view(&self, mut client: KvClient, revision: i64) -> Self::OutputFuture<'_> { + async move { + let res = client + .get( + self.key.clone(), + Some(GetOptions::default().with_revision(revision)), + ) + .await?; + let new_revision = if let Some(header) = res.header() { + header.revision() + } else { + return Err(Error::Internal(anyhow::anyhow!( + "Etcd response missing header" + ))); + }; + let value = res + .kvs() + .first() + .map(|kv| kv.value().to_vec()) + .ok_or_else(|| Error::ItemNotFound(hex::encode(self.key.clone())))?; + Ok((new_revision, value)) + } + } +} + +struct ListViewer { + key: Vec, +} + +impl SnapshotViewer for ListViewer { + type Output = Vec>; + type OutputFuture<'a> = impl Future> + 'a; + + fn view(&self, mut client: KvClient, revision: i64) -> Self::OutputFuture<'_> { + async move { + let res = client + .get( + self.key.clone(), + Some(GetOptions::default().with_revision(revision).with_prefix()), + ) + .await?; + let new_revision = if let Some(header) = res.header() { + header.revision() + } else { + return Err(Error::Internal(anyhow::anyhow!( + "Etcd response missing header" + ))); + }; + let value = res + .kvs() + .iter() + .map(|kv| kv.value().to_vec()) + .collect(); + Ok((new_revision, value)) + } + } +} + +#[async_trait] +impl Snapshot for EtcdSnapshot { + async fn list_cf(&self, cf: &str) -> Result>> { + let view = ListViewer { key: encode_etcd_key(cf, &[]) }; + self.view_inner(view).await + } + + async fn get_cf(&self, cf: &str, key: &[u8]) -> Result> { + let view = GetViewer { key: encode_etcd_key(cf, key) }; + self.view_inner(view).await + } +} + +#[async_trait] +impl MetaStore for EtcdMetaStore { + type Snapshot = EtcdSnapshot; + + fn snapshot(&self) -> Self::Snapshot { + EtcdSnapshot { + client: self.client.kv_client(), + revision: AtomicI64::new(REVISION_UNINITIALIZED), + init_lock: Default::default(), + } + } + + async fn put_cf(&self, cf: &str, key: Key, value: Value) -> Result<()> { + self.client + .kv_client() + .put(encode_etcd_key(cf, &key), value, None) + .await?; + Ok(()) + } + + async fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> { + self.client + .kv_client() + .delete(encode_etcd_key(cf, &key), None) + .await?; + Ok(()) + } + + async fn txn(&self, trx: Transaction) -> Result<()> { + let (preconditions, operations) = trx.into_parts(); + let when = preconditions.into_iter().map(|cond| { + match cond { + super::Precondition::KeyExists { cf, key } => { + Compare::value( + encode_etcd_key(&cf, &key), + CompareOp::Equal, + vec![], + ) + } + } + }).collect::>(); + + let then = operations.into_iter().map(|op| { + match op { + super::Operation::Put { cf, key, value } => { + let key = encode_etcd_key(&cf, &key); + let value = value.to_vec(); + TxnOp::put(key, value, None) + } + super::Operation::Delete { cf, key } => { + let key = encode_etcd_key(&cf, &key); + TxnOp::delete(key, None) + } + } + }).collect::>(); + + let etcd_txn = Txn::new().when(when).and_then(then); + self.client.kv_client().txn(etcd_txn).await?; + Ok(()) + } +} diff --git a/rust/meta/src/storage/meta_store.rs b/rust/meta/src/storage/meta_store.rs index 14c63e57899ab..57c98e303a353 100644 --- a/rust/meta/src/storage/meta_store.rs +++ b/rust/meta/src/storage/meta_store.rs @@ -40,10 +40,11 @@ pub trait MetaStore: Sync + Send + 'static { } // Error of metastore -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] pub enum Error { ItemNotFound(String), TransactionAbort(), + Internal(anyhow::Error), } pub type Result = std::result::Result; @@ -55,6 +56,10 @@ impl From for RwError { Error::TransactionAbort() => { RwError::from(ErrorCode::InternalError("transaction aborted".to_owned())) } + Error::Internal(e) => RwError::from(ErrorCode::InternalError(format!( + "meta internal error: {}", + e + ))), } } } diff --git a/rust/meta/src/storage/mod.rs b/rust/meta/src/storage/mod.rs index 3eb319be90493..97d0c1f79b786 100644 --- a/rust/meta/src/storage/mod.rs +++ b/rust/meta/src/storage/mod.rs @@ -1,3 +1,4 @@ +mod etcd_meta_store; mod mem_meta_store; mod meta_store; #[cfg(test)] diff --git a/rust/meta/src/storage/tests.rs b/rust/meta/src/storage/tests.rs index 08cefc9040a93..3689b19071470 100644 --- a/rust/meta/src/storage/tests.rs +++ b/rust/meta/src/storage/tests.rs @@ -1,3 +1,4 @@ +use assert_matches::assert_matches; use async_trait::async_trait; use itertools::Itertools; use risingwave_common::error::Result; @@ -155,8 +156,10 @@ async fn test_meta_store_transaction(meta_store: &S) -> Result<()> let mut trx = Transaction::default(); trx.check_exists(cf.to_owned(), kvs[4].0.to_owned()); trx.put(cf.to_owned(), kvs[3].0.to_owned(), kvs[3].1.to_owned()); - let expected = Error::TransactionAbort(); - assert_eq!(meta_store.txn(trx).await.unwrap_err(), expected); + assert_matches!( + meta_store.txn(trx).await.unwrap_err(), + Error::TransactionAbort() + ); assert_eq!(0, meta_store.list_cf(cf).await.unwrap().len()); Ok(()) From b9cdc1916726538f76630c7f46155082ba39f145 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 3 Mar 2022 17:13:21 +0800 Subject: [PATCH 2/5] add entry Signed-off-by: TennyZhuang --- rust/meta/src/bin/meta_node.rs | 4 +- rust/meta/src/rpc/server.rs | 39 +++++++++++++---- rust/meta/src/storage/etcd_meta_store.rs | 54 +++++++++++++----------- rust/meta/src/storage/meta_store.rs | 2 +- rust/meta/src/storage/mod.rs | 1 + rust/meta/src/test_utils.rs | 4 +- 6 files changed, 67 insertions(+), 37 deletions(-) diff --git a/rust/meta/src/bin/meta_node.rs b/rust/meta/src/bin/meta_node.rs index fe549f09699de..87910bba7e22f 100644 --- a/rust/meta/src/bin/meta_node.rs +++ b/rust/meta/src/bin/meta_node.rs @@ -64,6 +64,8 @@ async fn main() { info!("Starting meta server at {}", addr); let (join_handle, _shutdown_send) = - rpc_serve(addr, prometheus_addr, dashboard_addr, MetaStoreBackend::Mem).await; + rpc_serve(addr, prometheus_addr, dashboard_addr, MetaStoreBackend::Mem) + .await + .unwrap(); join_handle.await.unwrap(); } diff --git a/rust/meta/src/rpc/server.rs b/rust/meta/src/rpc/server.rs index 42731e3c2864d..96b1b782ced9f 100644 --- a/rust/meta/src/rpc/server.rs +++ b/rust/meta/src/rpc/server.rs @@ -1,6 +1,9 @@ use std::net::SocketAddr; use std::sync::Arc; +use etcd_client::Client as EtcdClient; +use risingwave_common::error::ErrorCode::InternalError; +use risingwave_common::error::{Result, RwError}; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer; use risingwave_pb::meta::catalog_service_server::CatalogServiceServer; use risingwave_pb::meta::cluster_service_server::ClusterServiceServer; @@ -26,10 +29,11 @@ use crate::rpc::service::epoch_service::EpochServiceImpl; use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl; use crate::rpc::service::hummock_service::HummockServiceImpl; use crate::rpc::service::stream_service::StreamServiceImpl; -use crate::storage::MemStore; +use crate::storage::{EtcdMetaStore, MemStore, MetaStore}; use crate::stream::{FragmentManager, StreamManager}; pub enum MetaStoreBackend { + Etcd { endpoints: Vec }, Mem, } @@ -38,14 +42,31 @@ pub async fn rpc_serve( prometheus_addr: Option, dashboard_addr: Option, meta_store_backend: MetaStoreBackend, +) -> Result<(JoinHandle<()>, UnboundedSender<()>)> { + Ok(match meta_store_backend { + MetaStoreBackend::Etcd { endpoints } => { + let client = EtcdClient::connect(endpoints, None).await.map_err(|e| { + RwError::from(InternalError(format!("failed to connect etcd {}", e))) + })?; + let meta_store_ref = Arc::new(EtcdMetaStore::new(client)); + rpc_serve_with_store(addr, prometheus_addr, dashboard_addr, meta_store_ref).await + } + MetaStoreBackend::Mem => { + let meta_store_ref = Arc::new(MemStore::default()); + rpc_serve_with_store(addr, prometheus_addr, dashboard_addr, meta_store_ref).await + } + }) +} + +pub async fn rpc_serve_with_store( + addr: SocketAddr, + prometheus_addr: Option, + dashboard_addr: Option, + meta_store_ref: Arc, ) -> (JoinHandle<()>, UnboundedSender<()>) { let listener = TcpListener::bind(addr).await.unwrap(); - let meta_store_ref = match meta_store_backend { - MetaStoreBackend::Mem => Arc::new(MemStore::default()), - }; let epoch_generator_ref = Arc::new(MemEpochGenerator::new()); - let env = - MetaSrvEnv::::new(meta_store_ref.clone(), epoch_generator_ref.clone()).await; + let env = MetaSrvEnv::::new(meta_store_ref.clone(), epoch_generator_ref.clone()).await; let fragment_manager = Arc::new(FragmentManager::new(meta_store_ref.clone()).await.unwrap()); let hummock_manager = Arc::new(hummock::HummockManager::new(env.clone()).await.unwrap()); @@ -105,9 +126,9 @@ pub async fn rpc_serve( let epoch_srv = EpochServiceImpl::new(epoch_generator_ref.clone()); let heartbeat_srv = HeartbeatServiceImpl::new(); - let catalog_srv = CatalogServiceImpl::::new(env.clone(), catalog_manager_ref); - let cluster_srv = ClusterServiceImpl::::new(cluster_manager.clone()); - let stream_srv = StreamServiceImpl::::new( + let catalog_srv = CatalogServiceImpl::::new(env.clone(), catalog_manager_ref); + let cluster_srv = ClusterServiceImpl::::new(cluster_manager.clone()); + let stream_srv = StreamServiceImpl::::new( stream_manager_ref, fragment_manager.clone(), cluster_manager, diff --git a/rust/meta/src/storage/etcd_meta_store.rs b/rust/meta/src/storage/etcd_meta_store.rs index 45c0ea059db9b..e3c3edb449f3d 100644 --- a/rust/meta/src/storage/etcd_meta_store.rs +++ b/rust/meta/src/storage/etcd_meta_store.rs @@ -2,7 +2,9 @@ use std::sync::atomic::{self, AtomicI64}; use anyhow; use async_trait::async_trait; -use etcd_client::{Client, Error as EtcdError, GetOptions, KvClient, Txn, Compare, CompareOp, TxnOp}; +use etcd_client::{ + Client, Compare, CompareOp, Error as EtcdError, GetOptions, KvClient, Txn, TxnOp, +}; use futures::Future; use tokio::sync::Mutex; @@ -16,6 +18,7 @@ impl From for Error { const REVISION_UNINITIALIZED: i64 = -1; +#[derive(Clone)] pub struct EtcdMetaStore { client: Client, } @@ -35,10 +38,7 @@ fn encode_etcd_key(cf: &str, key: &[u8]) -> Vec { } impl EtcdSnapshot { - async fn view_inner( - &self, - view: V - ) -> Result { + async fn view_inner(&self, view: V) -> Result { loop { let revision = self.revision.load(atomic::Ordering::Relaxed); if revision != REVISION_UNINITIALIZED { @@ -126,11 +126,7 @@ impl SnapshotViewer for ListViewer { "Etcd response missing header" ))); }; - let value = res - .kvs() - .iter() - .map(|kv| kv.value().to_vec()) - .collect(); + let value = res.kvs().iter().map(|kv| kv.value().to_vec()).collect(); Ok((new_revision, value)) } } @@ -139,16 +135,26 @@ impl SnapshotViewer for ListViewer { #[async_trait] impl Snapshot for EtcdSnapshot { async fn list_cf(&self, cf: &str) -> Result>> { - let view = ListViewer { key: encode_etcd_key(cf, &[]) }; + let view = ListViewer { + key: encode_etcd_key(cf, &[]), + }; self.view_inner(view).await } async fn get_cf(&self, cf: &str, key: &[u8]) -> Result> { - let view = GetViewer { key: encode_etcd_key(cf, key) }; + let view = GetViewer { + key: encode_etcd_key(cf, key), + }; self.view_inner(view).await } } +impl EtcdMetaStore { + pub fn new(client: Client) -> Self { + Self { client } + } +} + #[async_trait] impl MetaStore for EtcdMetaStore { type Snapshot = EtcdSnapshot; @@ -179,20 +185,18 @@ impl MetaStore for EtcdMetaStore { async fn txn(&self, trx: Transaction) -> Result<()> { let (preconditions, operations) = trx.into_parts(); - let when = preconditions.into_iter().map(|cond| { - match cond { + let when = preconditions + .into_iter() + .map(|cond| match cond { super::Precondition::KeyExists { cf, key } => { - Compare::value( - encode_etcd_key(&cf, &key), - CompareOp::Equal, - vec![], - ) + Compare::value(encode_etcd_key(&cf, &key), CompareOp::Equal, vec![]) } - } - }).collect::>(); + }) + .collect::>(); - let then = operations.into_iter().map(|op| { - match op { + let then = operations + .into_iter() + .map(|op| match op { super::Operation::Put { cf, key, value } => { let key = encode_etcd_key(&cf, &key); let value = value.to_vec(); @@ -202,8 +206,8 @@ impl MetaStore for EtcdMetaStore { let key = encode_etcd_key(&cf, &key); TxnOp::delete(key, None) } - } - }).collect::>(); + }) + .collect::>(); let etcd_txn = Txn::new().when(when).and_then(then); self.client.kv_client().txn(etcd_txn).await?; diff --git a/rust/meta/src/storage/meta_store.rs b/rust/meta/src/storage/meta_store.rs index 57c98e303a353..9b1cefd518dbe 100644 --- a/rust/meta/src/storage/meta_store.rs +++ b/rust/meta/src/storage/meta_store.rs @@ -17,7 +17,7 @@ pub trait Snapshot: Sync + Send + 'static { /// `MetaStore` defines the functions used to operate metadata. #[async_trait] -pub trait MetaStore: Sync + Send + 'static { +pub trait MetaStore: Clone + Sync + Send + 'static { type Snapshot: Snapshot; fn snapshot(&self) -> Self::Snapshot; diff --git a/rust/meta/src/storage/mod.rs b/rust/meta/src/storage/mod.rs index 97d0c1f79b786..5f3070c22f7be 100644 --- a/rust/meta/src/storage/mod.rs +++ b/rust/meta/src/storage/mod.rs @@ -9,6 +9,7 @@ pub type ColumnFamily = String; pub type Key = Vec; pub type Value = Vec; +pub use etcd_meta_store::*; pub use mem_meta_store::*; pub use meta_store::*; pub use transaction::*; diff --git a/rust/meta/src/test_utils.rs b/rust/meta/src/test_utils.rs index 03149578b9fa2..ecb8284b13b71 100644 --- a/rust/meta/src/test_utils.rs +++ b/rust/meta/src/test_utils.rs @@ -18,7 +18,9 @@ impl LocalMeta { pub async fn start(port: u16) -> Self { let addr = Self::meta_addr_inner(port).parse().unwrap(); let (join_handle, shutdown_sender) = - crate::rpc::server::rpc_serve(addr, None, None, MetaStoreBackend::Mem).await; + crate::rpc::server::rpc_serve(addr, None, None, MetaStoreBackend::Mem) + .await + .unwrap(); Self { port, join_handle, From 8b92cd70550c51243146c430c81b04f30b4c11fe Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 3 Mar 2022 17:13:49 +0800 Subject: [PATCH 3/5] fix clippy Signed-off-by: TennyZhuang --- rust/meta/src/storage/etcd_meta_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/meta/src/storage/etcd_meta_store.rs b/rust/meta/src/storage/etcd_meta_store.rs index e3c3edb449f3d..7f0555efe187f 100644 --- a/rust/meta/src/storage/etcd_meta_store.rs +++ b/rust/meta/src/storage/etcd_meta_store.rs @@ -178,7 +178,7 @@ impl MetaStore for EtcdMetaStore { async fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> { self.client .kv_client() - .delete(encode_etcd_key(cf, &key), None) + .delete(encode_etcd_key(cf, key), None) .await?; Ok(()) } From 1bdadb3f7bf467ba4558da3150b6186271672acc Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 3 Mar 2022 20:38:15 +0800 Subject: [PATCH 4/5] add main entry Signed-off-by: TennyZhuang --- rust/meta/src/bin/meta_node.rs | 24 ++++++++++++++++++++---- rust/meta/src/rpc/server.rs | 15 +++++++++++---- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/rust/meta/src/bin/meta_node.rs b/rust/meta/src/bin/meta_node.rs index 87910bba7e22f..345c37fff65c5 100644 --- a/rust/meta/src/bin/meta_node.rs +++ b/rust/meta/src/bin/meta_node.rs @@ -16,6 +16,12 @@ struct Opts { #[clap(long)] prometheus_host: Option, + + #[clap(long, default_value_t = String::from(""))] + backend: String, + + #[clap(long, default_value_t = String::from(""))] + etcd_endpoints: String, } /// Configure log targets for all `RisingWave` crates. When new crates are added and TRACE level @@ -61,11 +67,21 @@ async fn main() { let addr = opts.host.parse().unwrap(); let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap()); let prometheus_addr = opts.prometheus_host.map(|x| x.parse().unwrap()); + let backend = match opts.backend.as_str() { + "etcd" => MetaStoreBackend::Etcd { + endpoints: opts + .etcd_endpoints + .split(',') + .map(|x| x.to_string()) + .collect(), + }, + "mem" | "" => MetaStoreBackend::Mem, + _ => panic!("unknown backend"), + }; info!("Starting meta server at {}", addr); - let (join_handle, _shutdown_send) = - rpc_serve(addr, prometheus_addr, dashboard_addr, MetaStoreBackend::Mem) - .await - .unwrap(); + let (join_handle, _shutdown_send) = rpc_serve(addr, prometheus_addr, dashboard_addr, backend) + .await + .unwrap(); join_handle.await.unwrap(); } diff --git a/rust/meta/src/rpc/server.rs b/rust/meta/src/rpc/server.rs index f59c20922f6c9..284c3a7f9d530 100644 --- a/rust/meta/src/rpc/server.rs +++ b/rust/meta/src/rpc/server.rs @@ -1,7 +1,8 @@ use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; -use etcd_client::Client as EtcdClient; +use etcd_client::{Client as EtcdClient, ConnectOptions}; use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::{Result, RwError}; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer; @@ -45,9 +46,15 @@ pub async fn rpc_serve( ) -> Result<(JoinHandle<()>, UnboundedSender<()>)> { Ok(match meta_store_backend { MetaStoreBackend::Etcd { endpoints } => { - let client = EtcdClient::connect(endpoints, None).await.map_err(|e| { - RwError::from(InternalError(format!("failed to connect etcd {}", e))) - })?; + let client = EtcdClient::connect( + endpoints, + Some( + ConnectOptions::default() + .with_keep_alive(Duration::from_secs(3), Duration::from_secs(5)), + ), + ) + .await + .map_err(|e| RwError::from(InternalError(format!("failed to connect etcd {}", e))))?; let meta_store_ref = Arc::new(EtcdMetaStore::new(client)); rpc_serve_with_store(addr, prometheus_addr, dashboard_addr, meta_store_ref).await } From 575ae97e1215eb7a41245cb902be37a3faefcb7c Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Fri, 4 Mar 2022 11:39:58 +0800 Subject: [PATCH 5/5] use Acquire Signed-off-by: TennyZhuang --- rust/meta/src/storage/etcd_meta_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/meta/src/storage/etcd_meta_store.rs b/rust/meta/src/storage/etcd_meta_store.rs index 7f0555efe187f..e440caa1f2a35 100644 --- a/rust/meta/src/storage/etcd_meta_store.rs +++ b/rust/meta/src/storage/etcd_meta_store.rs @@ -48,7 +48,7 @@ impl EtcdSnapshot { } else { // Slow path let _g = self.init_lock.lock().await; - let revision = self.revision.load(atomic::Ordering::Relaxed); + let revision = self.revision.load(atomic::Ordering::Acquire); if revision != REVISION_UNINITIALIZED { // Double check failed, release the lock. continue;