Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
3dbc2dd
WIP implement sync dynamic target
danlaine Jul 16, 2025
c1542a1
verify db state in test
danlaine Jul 16, 2025
5915100
test cleanup
danlaine Jul 16, 2025
85fed9b
test cleanup
danlaine Jul 16, 2025
d47f296
appease clippy
danlaine Jul 16, 2025
6e23cfb
remove unneeded SyncTargetUpdate type
danlaine Jul 16, 2025
67c121f
nit
danlaine Jul 16, 2025
621745c
nit
danlaine Jul 16, 2025
e2bd239
re-initialize log with init_sync if it's size is <= new lower bound
danlaine Jul 16, 2025
5118061
unbounded sender --> sender
danlaine Jul 16, 2025
0464106
comment
danlaine Jul 16, 2025
2ac23f3
comments
danlaine Jul 16, 2025
2d4f364
WIP add test_target_update_during_sync
danlaine Jul 16, 2025
9b46572
update test_target_update_during_sync
danlaine Jul 16, 2025
f645757
naming nit
danlaine Jul 16, 2025
624920a
test cleanup
danlaine Jul 16, 2025
d7f09af
remove useless comment
danlaine Jul 16, 2025
a2a8d4f
nit remove #[cfg(test)]
danlaine Jul 16, 2025
0f82a2f
add test clauses
danlaine Jul 17, 2025
d488d9e
move update_receiver to sync Config
danlaine Jul 17, 2025
97282b4
nits
danlaine Jul 17, 2025
8cbee02
match nit
danlaine Jul 17, 2025
7dbac40
move log
danlaine Jul 17, 2025
0ec1e71
fix incorrect setting of pinned nodes
danlaine Jul 17, 2025
db82eec
failing test
patrick-ogrady Jul 17, 2025
6b73352
update test bounds to test regression for bug where we don't set pinn…
danlaine Jul 17, 2025
7be0cce
cleanup test
patrick-ogrady Jul 17, 2025
1cd7416
nit
patrick-ogrady Jul 17, 2025
3ef4ee6
Merge branch 'danlaine/sync-dynamic-root' into create-failing-test-2
danlaine Jul 17, 2025
a69a595
remove incorrect assertion; update test
danlaine Jul 18, 2025
b46db51
update SyncTarget comment
danlaine Jul 18, 2025
ddfb65e
Merge branch 'create-failing-test' into danlaine/sync-dynamic-root
danlaine Jul 18, 2025
8067762
Merge remote-tracking branch 'origin/main' into danlaine/sync-dynamic…
danlaine Jul 18, 2025
af1b4ba
use table test for test_target_update_during_sync
danlaine Jul 18, 2025
3c7b7db
rename hash to digest
danlaine Jul 21, 2025
b75c2b9
nits
danlaine Jul 21, 2025
0281d49
appease clippy
danlaine Jul 21, 2025
8199147
rename root_digest to root
danlaine Jul 21, 2025
fc981e3
rename root_digest to root
danlaine Jul 21, 2025
cf1289d
Merge remote-tracking branch 'origin/main' into danlaine/sync-rename-…
danlaine Jul 21, 2025
7a28c7c
fix bad merge
danlaine Jul 21, 2025
c90cd02
nits
danlaine Jul 21, 2025
722a198
nit: got_digest --> got_root
danlaine Jul 21, 2025
e788686
Merge remote-tracking branch 'origin/main' into danlaine/sync-rename-…
danlaine Jul 21, 2025
9bfcab1
digest --> root
danlaine Jul 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions examples/sync/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct Config {

#[derive(Debug)]
struct ServerMetadata {
target_hash: Digest,
root: Digest,
oldest_retained_loc: u64,
latest_op_loc: u64,
}
Expand All @@ -50,7 +50,7 @@ where

let metadata = resolver.get_server_metadata().await?;
let metadata = ServerMetadata {
target_hash: metadata.target_hash,
root: metadata.root,
oldest_retained_loc: metadata.oldest_retained_loc,
latest_op_loc: metadata.latest_op_loc,
};
Expand All @@ -74,7 +74,7 @@ where

// Get server metadata to determine sync parameters
let ServerMetadata {
target_hash,
root,
oldest_retained_loc,
latest_op_loc,
} = get_server_metadata(&resolver).await?;
Expand Down Expand Up @@ -102,7 +102,7 @@ where
db_config,
fetch_batch_size: NonZeroU64::new(config.batch_size).unwrap(),
target: SyncTarget {
hash: target_hash,
root,
lower_bound_ops: oldest_retained_loc,
upper_bound_ops: latest_op_loc,
},
Expand All @@ -123,23 +123,26 @@ where
info!("Beginning sync operation...");
let database = sync::sync(sync_config).await?;

// Get the root hash of the synced database
// Get the root digest of the synced database
let mut hasher = Standard::new();
let root_hash = database.root(&mut hasher);
let got_root = database.root(&mut hasher);

// Verify the hash matches the target hash.
if root_hash != target_hash {
return Err(format!("Synced database root hash does not match target hash: {root_hash:?} != {target_hash:?}").into());
// Verify the digest matches the target digest.
if got_root != root {
return Err(format!(
"Synced database root digest does not match target root digest: {got_root:?} != {root:?}"
)
.into());
}

let root_hash_hex = root_hash
let root_hex = got_root
.as_ref()
.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>();
info!(
database_ops = database.op_count(),
root_hash = %root_hash_hex,
root = %root_hex,
"✅ Sync completed successfully"
);

Expand Down
10 changes: 5 additions & 5 deletions examples/sync/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ where
let oldest_retained_loc = database.oldest_retained_loc().unwrap_or(0);
let latest_op_loc = database.op_count().saturating_sub(1);

let target_hash = {
let root = {
let mut hasher = Standard::new();
database.root(&mut hasher)
};

drop(database);

let response = GetServerMetadataResponse {
target_hash,
root,
oldest_retained_loc,
latest_op_loc,
};
Expand Down Expand Up @@ -399,16 +399,16 @@ fn main() {

// Display database state
let mut hasher = Standard::new();
let root_hash = database.root(&mut hasher);
let root_hash_hex = root_hash
let root = database.root(&mut hasher);
let root_hex = root
.as_ref()
.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>();

info!(
op_count = database.op_count(),
root_hash = %root_hash_hex,
root = %root_hex,
"Database ready"
);

Expand Down
16 changes: 8 additions & 8 deletions examples/sync/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! logic for safe network communication.
//!
//! The protocol supports:
//! - Getting server metadata (database size, target hash, operation bounds)
//! - Getting server metadata (database size, root digest, operation bounds)
//! - Fetching operations with cryptographic proofs
//! - Error handling

Expand All @@ -32,7 +32,7 @@ pub enum Message {
GetOperationsRequest(GetOperationsRequest),
/// Response with operations and proof.
GetOperationsResponse(GetOperationsResponse),
/// Request server metadata (target hash, bounds, etc.).
/// Request server metadata (target root digest, bounds, etc.).
GetServerMetadataRequest,
/// Response with server metadata.
GetServerMetadataResponse(GetServerMetadataResponse),
Expand Down Expand Up @@ -67,8 +67,8 @@ pub struct GetOperationsResponse {
/// Response with server metadata.
#[derive(Debug, Clone)]
pub struct GetServerMetadataResponse {
/// Target hash of the database.
pub target_hash: Digest,
/// Target root digest of the database.
pub root: Digest,
/// Oldest retained operation location.
pub oldest_retained_loc: u64,
/// Latest operation location.
Expand Down Expand Up @@ -233,15 +233,15 @@ impl Read for GetOperationsResponse {

impl Write for GetServerMetadataResponse {
fn write(&self, buf: &mut impl BufMut) {
self.target_hash.write(buf);
self.root.write(buf);
self.oldest_retained_loc.write(buf);
self.latest_op_loc.write(buf);
}
}

impl EncodeSize for GetServerMetadataResponse {
fn encode_size(&self) -> usize {
self.target_hash.encode_size()
self.root.encode_size()
+ self.oldest_retained_loc.encode_size()
+ self.latest_op_loc.encode_size()
}
Expand All @@ -251,11 +251,11 @@ impl Read for GetServerMetadataResponse {
type Cfg = ();

fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
let target_hash = Digest::read(buf)?;
let root = Digest::read(buf)?;
let oldest_retained_loc = u64::read(buf)?;
let latest_op_loc = u64::read(buf)?;
Ok(Self {
target_hash,
root,
oldest_retained_loc,
latest_op_loc,
})
Expand Down
2 changes: 1 addition & 1 deletion examples/sync/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
.map_err(|e| ResolverError::DeserializationError(e.to_string()))
}

/// Get server metadata (target hash and bounds)
/// Get server metadata (target root digest and bounds)
pub async fn get_server_metadata(&self) -> Result<GetServerMetadataResponse, ResolverError> {
match self.send_request(Message::GetServerMetadataRequest).await? {
Message::GetServerMetadataResponse(response) => {
Expand Down
20 changes: 10 additions & 10 deletions storage/src/adb/any/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher, T: Translato
proof: &Proof<H::Digest>,
start_loc: u64,
ops: &[Operation<K, V>],
root_digest: &H::Digest,
root: &H::Digest,
) -> bool {
let start_pos = leaf_num_to_pos(start_loc);

Expand All @@ -696,7 +696,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher, T: Translato
.map(|op| Any::<E, _, _, _, T>::op_digest(hasher, op))
.collect::<Vec<_>>();

proof.verify_range_inclusion(hasher, &digests, start_pos, root_digest)
proof.verify_range_inclusion(hasher, &digests, start_pos, root)
}

/// Commit any pending operations to the db, ensuring they are persisted to disk & recoverable
Expand Down Expand Up @@ -1216,10 +1216,10 @@ pub(super) mod test {
assert_eq!(db.snapshot.items(), 857);

// Close & reopen the db, making sure the re-opened db has exactly the same state.
let root_digest = db.root(&mut hasher);
let root = db.root(&mut hasher);
db.close().await.unwrap();
let mut db = open_db(context.clone()).await;
assert_eq!(root_digest, db.root(&mut hasher));
assert_eq!(root, db.root(&mut hasher));
assert_eq!(db.op_count(), 2336);
assert_eq!(db.inactivity_floor_loc, 1478);
assert_eq!(db.snapshot.items(), 857);
Expand Down Expand Up @@ -1400,10 +1400,10 @@ pub(super) mod test {
assert!(db.get(&k).await.unwrap().is_none());

// Close & reopen the db, making sure the re-opened db has exactly the same state.
let root_digest = db.root(&mut hasher);
let root = db.root(&mut hasher);
db.close().await.unwrap();
let db = open_db(context.clone()).await;
assert_eq!(root_digest, db.root(&mut hasher));
assert_eq!(root, db.root(&mut hasher));
assert!(db.get(&k).await.unwrap().is_none());

db.destroy().await.unwrap();
Expand Down Expand Up @@ -1655,7 +1655,7 @@ pub(super) mod test {
source_db.log.size().await.unwrap()
);

// Verify the root hash matches the target
// Verify the root digest matches the target
assert_eq!(db.root(&mut hasher), target_hash);

// Verify state matches the source operations
Expand Down Expand Up @@ -1854,7 +1854,7 @@ pub(super) mod test {
leaf_num_to_pos(sync_lower_bound)
);

// Verify the root hash matches the target
// Verify the root digest matches the target
assert_eq!(sync_db.root(&mut hasher), target_hash);

// Verify state matches the source operations
Expand Down Expand Up @@ -2235,7 +2235,7 @@ pub(super) mod test {
));
}

// Changing the root hash should cause verification to fail
// Changing the root digest should cause verification to fail
{
assert!(!AnyTest::verify_proof(
&mut hasher,
Expand Down Expand Up @@ -2292,7 +2292,7 @@ pub(super) mod test {
// Final commit to establish the inactivity floor
db.commit().await.unwrap();

// Get the root hash
// Get the root digest
let original_root = db.root(&mut hasher);

// Verify the pruning boundary is correct
Expand Down
Loading
Loading