Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions src/meta/src/rpc/election_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ pub struct ElectionMember {
#[async_trait::async_trait]
pub trait ElectionClient: Send + Sync + 'static {
fn id(&self) -> MetaResult<String>;
async fn run_once(&self, ttl: i64, stop: watch::Receiver<()>) -> MetaResult<()>;
fn subscribe(&self) -> watch::Receiver<bool>;
async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>;
fn subscribe(&self) -> Receiver<bool>;
async fn leader(&self) -> MetaResult<Option<ElectionMember>>;
async fn get_members(&self) -> MetaResult<Vec<ElectionMember>>;
async fn is_leader(&self) -> bool;
}

pub struct EtcdElectionClient {
client: Client,
id: String,
is_leader_sender: watch::Sender<bool>,
endpoints: Vec<String>,
options: Option<ConnectOptions>,
}

#[async_trait::async_trait]
Expand All @@ -54,7 +55,7 @@ impl ElectionClient for EtcdElectionClient {
}

async fn leader(&self) -> MetaResult<Option<ElectionMember>> {
let mut election_client = self.client.election_client();
let mut election_client = self.client().await?.election_client();
let leader = election_client.leader(META_ELECTION_KEY).await;

let leader = match leader {
Expand All @@ -72,8 +73,9 @@ impl ElectionClient for EtcdElectionClient {
}

async fn run_once(&self, ttl: i64, stop: watch::Receiver<()>) -> MetaResult<()> {
let mut lease_client = self.client.lease_client();
let mut election_client = self.client.election_client();
let client = self.client().await?;
let mut lease_client = client.lease_client();
let mut election_client = client.election_client();
let mut stop = stop;

tracing::info!("client {} start election", self.id);
Expand Down Expand Up @@ -124,7 +126,7 @@ impl ElectionClient for EtcdElectionClient {

let (keep_alive_fail_tx, mut keep_alive_fail_rx) = oneshot::channel();

let mut lease_client = self.client.lease_client();
let mut lease_client = client.lease_client();

let mut stop_ = stop.clone();

Expand Down Expand Up @@ -246,7 +248,7 @@ impl ElectionClient for EtcdElectionClient {
}

async fn get_members(&self) -> MetaResult<Vec<ElectionMember>> {
let mut client = self.client.kv_client();
let mut client = self.client().await?.kv_client();
let keys = client
.get(META_ELECTION_KEY, Some(GetOptions::new().with_prefix()))
.await?;
Expand Down Expand Up @@ -287,19 +289,19 @@ impl ElectionClient for EtcdElectionClient {
}

impl EtcdElectionClient {
pub(crate) async fn new(
endpoints: Vec<String>,
options: Option<ConnectOptions>,
id: String,
) -> MetaResult<Self> {
let client = Client::connect(&endpoints, options.clone()).await?;

pub(crate) fn new(endpoints: Vec<String>, options: Option<ConnectOptions>, id: String) -> Self {
let (sender, _) = watch::channel(false);
Ok(Self {
client,
Self {
endpoints,
options,
id,
is_leader_sender: sender,
})
}
}

async fn client(&self) -> MetaResult<Client> {
let client = Client::connect(self.endpoints.clone(), self.options.clone()).await?;
Ok(client)
}
}

Expand Down Expand Up @@ -330,15 +332,11 @@ mod tests {
let (stop_sender, stop_receiver) = watch::channel(());
clients.push((
stop_sender,
Arc::new(
EtcdElectionClient::new(
vec!["localhost:2388".to_string()],
None,
format!("client_{}", i).to_string(),
)
.await
.unwrap(),
),
Arc::new(EtcdElectionClient::new(
vec!["localhost:2388".to_string()],
None,
format!("client_{}", i).to_string(),
)),
));
}

Expand Down
13 changes: 5 additions & 8 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,11 @@ pub async fn rpc_serve(
.map_err(|e| anyhow::anyhow!("failed to connect etcd {}", e))?;
let meta_store = Arc::new(EtcdMetaStore::new(client));

let election_client = Arc::new(
EtcdElectionClient::new(
endpoints,
Some(options),
address_info.advertise_addr.clone(),
)
.await?,
);
let election_client = Arc::new(EtcdElectionClient::new(
endpoints,
Some(options),
address_info.advertise_addr.clone(),
));

rpc_serve_with_store(
meta_store,
Expand Down