Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ CatalogException Catalog::CreateMissingEntryException(CatalogEntryRetriever &ret
auto &db_manager = DatabaseManager::Get(context);
auto databases = db_manager.GetDatabases(context, max_schema_count);

for (auto &database : databases) {
for (const auto &database : databases) {
if (unseen_schemas.size() >= max_schema_count) {
break;
}
Expand Down
19 changes: 17 additions & 2 deletions src/include/duckdb/common/serializer/deserializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,32 @@ class Deserializer {
}

template <class FUNC>
void ReadList(const field_id_t field_id, const char *tag, FUNC func) {
OnPropertyBegin(field_id, tag);
void ReadListInternal(FUNC func) {
auto size = OnListBegin();
List list {*this};
for (idx_t i = 0; i < size; i++) {
func(list, i);
}
OnListEnd();
}

template <class FUNC>
void ReadList(const field_id_t field_id, const char *tag, FUNC func) {
OnPropertyBegin(field_id, tag);
ReadListInternal(func);
OnPropertyEnd();
}

template <class FUNC>
void ReadOptionalList(const field_id_t field_id, const char *tag, FUNC func) {
if (!OnOptionalPropertyBegin(field_id, tag)) {
OnOptionalPropertyEnd(false);
return;
}
ReadListInternal(func);
OnOptionalPropertyEnd(true);
}

template <class FUNC>
void ReadObject(const field_id_t field_id, const char *tag, FUNC func) {
OnPropertyBegin(field_id, tag);
Expand Down
63 changes: 43 additions & 20 deletions src/storage/wal_replay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ReplayState {
optional_ptr<TableCatalogEntry> current_table;
MetaBlockPointer checkpoint_id;
idx_t wal_version = 1;
optional_idx expected_checkpoint_id;

struct ReplayIndexInfo {
ReplayIndexInfo(TableIndexList &index_list, unique_ptr<Index> index, const string &table_schema,
Expand Down Expand Up @@ -192,10 +193,12 @@ class WriteAheadLogDeserializer {
return false;
}

bool DeserializeOnly() {
bool DeserializeOnly() const {
return deserialize_only;
}

static void ThrowVersionError(idx_t checkpoint_iteration, idx_t expected_checkpoint_iteration);

protected:
void ReplayEntry(WALType wal_type);

Expand Down Expand Up @@ -246,6 +249,7 @@ class WriteAheadLogDeserializer {
MemoryStream stream;
BinaryDeserializer deserializer;
bool deserialize_only;
optional_idx expected_checkpoint_id;
};

//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -311,6 +315,11 @@ unique_ptr<WriteAheadLog> WriteAheadLog::ReplayInternal(AttachedDatabase &databa
return nullptr;
}
}
if (checkpoint_state.expected_checkpoint_id.IsValid()) {
// we expected a checkpoint id - but no checkpoint has happened - abort!
auto expected_id = checkpoint_state.expected_checkpoint_id.GetIndex();
WriteAheadLogDeserializer::ThrowVersionError(expected_id - 1, expected_id);
}

// we need to recover from the WAL: actually set up the replay state
ReplayState state(database, *con.context);
Expand Down Expand Up @@ -454,31 +463,45 @@ void WriteAheadLogDeserializer::ReplayEntry(WALType entry_type) {
//===--------------------------------------------------------------------===//
// Replay Version
//===--------------------------------------------------------------------===//
void WriteAheadLogDeserializer::ThrowVersionError(idx_t checkpoint_iteration, idx_t expected_checkpoint_iteration) {
string relation = checkpoint_iteration < expected_checkpoint_iteration ? "an older" : "a newer";
throw IOException("This WAL was created for this database file, but the WAL checkpoint iteration does not "
"match the database file. "
"That means the WAL was created for %s version of this database. File checkpoint "
"iteration: %d, WAL checkpoint iteration: %d",
relation, expected_checkpoint_iteration, checkpoint_iteration);
}

void WriteAheadLogDeserializer::ReplayVersion() {
state.wal_version = deserializer.ReadProperty<idx_t>(101, "version");

auto &single_file_block_manager = db.GetStorageManager().GetBlockManager().Cast<SingleFileBlockManager>();
auto file_version_number = single_file_block_manager.GetVersionNumber();
if (file_version_number > 66) {
data_t db_identifier[MainHeader::DB_IDENTIFIER_LEN];
deserializer.ReadList(102, "db_identifier", [&](Deserializer::List &list, idx_t i) {
db_identifier[i] = list.ReadElement<uint8_t>();
});
auto expected_db_identifier = single_file_block_manager.GetDBIdentifier();
if (!MainHeader::CompareDBIdentifiers(db_identifier, expected_db_identifier)) {
throw IOException("WAL does not match database file.");
}
data_t db_identifier[MainHeader::DB_IDENTIFIER_LEN];
bool is_set = false;
deserializer.ReadOptionalList(102, "db_identifier", [&](Deserializer::List &list, idx_t i) {
db_identifier[i] = list.ReadElement<uint8_t>();
is_set = true;
});
auto checkpoint_iteration = deserializer.ReadPropertyWithDefault<optional_idx>(103, "checkpoint_iteration");
if (!is_set || !checkpoint_iteration.IsValid()) {
return;
}
auto expected_db_identifier = single_file_block_manager.GetDBIdentifier();
if (!MainHeader::CompareDBIdentifiers(db_identifier, expected_db_identifier)) {
throw IOException("WAL does not match database file.");
}

auto expected_checkpoint_iteration = single_file_block_manager.GetCheckpointIteration();
auto checkpoint_iteration = deserializer.ReadProperty<uint64_t>(103, "checkpoint_iteration");
if (expected_checkpoint_iteration != checkpoint_iteration) {
string relation = checkpoint_iteration < expected_checkpoint_iteration ? "older" : "newer";
throw IOException("This WAL was created for this database file, but the WAL checkpoint iteration does not "
"match the database file. "
"That means the WAL was created for a %s version of this database. File checkpoint "
"iteration: %d, WAL checkpoint iteration: %d",
relation, expected_checkpoint_iteration, checkpoint_iteration);
auto wal_checkpoint_iteration = checkpoint_iteration.GetIndex();
auto expected_checkpoint_iteration = single_file_block_manager.GetCheckpointIteration();
if (expected_checkpoint_iteration != wal_checkpoint_iteration) {
if (wal_checkpoint_iteration + 1 == expected_checkpoint_iteration) {
// this iteration is exactly one lower than the expected iteration
// this can happen if we aborted AFTER checkpointing the file, but BEFORE truncating the WAL
// expect this situation to occur - we will throw an error if it does not later on
state.expected_checkpoint_id = expected_checkpoint_iteration;
return;
}
ThrowVersionError(wal_checkpoint_iteration, expected_checkpoint_iteration);
}
}

Expand Down
6 changes: 2 additions & 4 deletions test/persistence/test_file_matches_wal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,12 @@ TEST_CASE("Test replaying mismatching WAL files", "[persistence][.]") {
result = con.Query("ATTACH '" + too_old_path_file + "';");
REQUIRE(result->HasError());
string error_msg = result->GetError();
REQUIRE(StringUtil::Contains(error_msg, "That means the WAL was created for a older version of this database. File "
"checkpoint iteration: 1, WAL checkpoint iteration: 0"));
REQUIRE(StringUtil::Contains(error_msg, "older"));

result = con.Query("ATTACH '" + too_new_path_file + "';");
REQUIRE(result->HasError());
error_msg = result->GetError();
REQUIRE(StringUtil::Contains(error_msg, "That means the WAL was created for a newer version of this database. File "
"checkpoint iteration: 0, WAL checkpoint iteration: 1"));
REQUIRE(StringUtil::Contains(error_msg, "newer"));

// Create and initialize a different file.
string other_db_path = test_dir + "/my_other_db.db";
Expand Down
36 changes: 36 additions & 0 deletions test/sql/storage/wal/wal_promote_version.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# name: test/sql/storage/wal/wal_promote_version.test
# description: WAL promote version
# group: [wal]

statement ok
PRAGMA disable_checkpoint_on_shutdown;

statement ok
SET checkpoint_threshold='1TB'

statement ok
ATTACH '__TEST_DIR__/wal_promote.db';

statement ok
CREATE TABLE wal_promote.T AS (FROM range(10));

statement ok
DETACH wal_promote;

statement ok
ATTACH '__TEST_DIR__/wal_promote.db' (STORAGE_VERSION 'latest');

statement ok
INSERT INTO wal_promote.T VALUES (42);

statement ok
DETACH wal_promote;

statement ok
ATTACH '__TEST_DIR__/wal_promote.db' (STORAGE_VERSION 'latest');

statement ok
INSERT INTO wal_promote.T VALUES (42);

statement ok
DETACH wal_promote;
Loading