Skip to content

Commit 85a30e0

Browse files
authored
AuthDO schema migration refinements (#515)
Update AuthDO schema migration logic to deal with down-migrates (i.e. dealing with rolling back a deploy that does a migration). Also address arv's feedback from #491
1 parent 57dc8b9 commit 85a30e0

File tree

2 files changed

+354
-120
lines changed

2 files changed

+354
-120
lines changed

packages/reflect-server/src/server/auth-do.test.ts

Lines changed: 196 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ import {
2626
createAuthAPIHeaders,
2727
} from './auth-api-headers.js';
2828
import {
29-
AUTH_DO_STORAGE_SCHEMA_VERSION,
30-
AUTH_DO_STORAGE_SCHEMA_VERSION_KEY,
29+
STORAGE_SCHEMA_META_KEY,
30+
STORAGE_SCHEMA_VERSION,
31+
STORAGE_SCHEMA_MIN_SAFE_ROLLBACK_VERSION,
3132
AUTH_ROUTES,
3233
BaseAuthDO,
3334
recordConnection,
@@ -56,10 +57,11 @@ let state: TestDurableObjectState;
5657
beforeEach(async () => {
5758
storage = await getMiniflareDurableObjectStorage(authDOID);
5859
await storage.deleteAll();
59-
await storage.put(
60-
AUTH_DO_STORAGE_SCHEMA_VERSION_KEY,
61-
AUTH_DO_STORAGE_SCHEMA_VERSION,
62-
);
60+
await storage.put(STORAGE_SCHEMA_META_KEY, {
61+
version: STORAGE_SCHEMA_VERSION,
62+
maxVersion: STORAGE_SCHEMA_VERSION,
63+
minSafeRollbackVersion: STORAGE_SCHEMA_MIN_SAFE_ROLLBACK_VERSION,
64+
});
6365
state = new TestDurableObjectState(authDOID, storage);
6466
jest.useFakeTimers();
6567
jest.setSystemTime(0);
@@ -78,7 +80,7 @@ async function recordConnectionHelper(
7880
roomID: string,
7981
clientID: string,
8082
) {
81-
recordConnection(
83+
await recordConnection(
8284
{
8385
userID,
8486
roomID,
@@ -89,7 +91,6 @@ async function recordConnectionHelper(
8991
connectTimestamp: 1000,
9092
},
9193
);
92-
await storage.sync();
9394
}
9495

9596
async function storeTestConnectionState() {
@@ -101,6 +102,23 @@ async function storeTestConnectionState() {
101102
await recordConnectionHelper('testUserID3', 'testRoomID3', 'testClientID6');
102103
}
103104

105+
const expectedConnectionKeysForTestConnectionState = [
106+
'connection/testUserID1/testRoomID1/testClientID1/',
107+
'connection/testUserID1/testRoomID1/testClientID2/',
108+
'connection/testUserID1/testRoomID2/testClientID3/',
109+
'connection/testUserID2/testRoomID1/testClientID4/',
110+
'connection/testUserID2/testRoomID3/testClientID5/',
111+
'connection/testUserID3/testRoomID3/testClientID6/',
112+
];
113+
const expectedConnectionsByRoomKeysForTestConnectionState = [
114+
'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID1/',
115+
'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID2/',
116+
'connections_by_room/testRoomID1/connection/testUserID2/testRoomID1/testClientID4/',
117+
'connections_by_room/testRoomID2/connection/testUserID1/testRoomID2/testClientID3/',
118+
'connections_by_room/testRoomID3/connection/testUserID2/testRoomID3/testClientID5/',
119+
'connections_by_room/testRoomID3/connection/testUserID3/testRoomID3/testClientID6/',
120+
];
121+
104122
function createCreateRoomTestFixture() {
105123
const testRoomID = 'testRoomID1';
106124

@@ -2131,16 +2149,7 @@ test('revalidateConnections continues if one roomDO returns an error', async ()
21312149
});
21322150

21332151
test('test migration from schema 0 to schema 1, basic', async () => {
2134-
const {testRequest, testRoomDO, state} = await createCreateRoomTestFixture();
2135-
2136-
const authDO = new BaseAuthDO({
2137-
roomDO: testRoomDO,
2138-
state,
2139-
authHandler: () => Promise.reject('should not be called'),
2140-
authApiKey: TEST_AUTH_API_KEY,
2141-
logSink: new TestLogSink(),
2142-
logLevel: 'debug',
2143-
});
2152+
const {testRoomDO, state} = await createCreateRoomTestFixture();
21442153
await storage.deleteAll();
21452154

21462155
await storage.put('connection/testUserID1/testRoomID1/testClientID1/', {
@@ -2165,87 +2174,204 @@ test('test migration from schema 0 to schema 1, basic', async () => {
21652174
},
21662175
);
21672176

2168-
expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual(
2169-
undefined,
2177+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual(undefined);
2178+
expect([
2179+
...(await storage.list({prefix: 'connections_by_room/'})).keys(),
2180+
]).toEqual([]);
2181+
2182+
const ensureStorageSchemaMigratedCalls: Promise<void>[] = [];
2183+
new BaseAuthDO(
2184+
{
2185+
roomDO: testRoomDO,
2186+
state,
2187+
authHandler: () => Promise.reject('should not be called'),
2188+
authApiKey: TEST_AUTH_API_KEY,
2189+
logSink: new TestLogSink(),
2190+
logLevel: 'debug',
2191+
},
2192+
p => {
2193+
ensureStorageSchemaMigratedCalls.push(p);
2194+
return p;
2195+
},
21702196
);
2197+
expect(ensureStorageSchemaMigratedCalls.length).toEqual(1);
2198+
await ensureStorageSchemaMigratedCalls[0];
2199+
2200+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual({
2201+
version: STORAGE_SCHEMA_VERSION,
2202+
maxVersion: STORAGE_SCHEMA_VERSION,
2203+
minSafeRollbackVersion: STORAGE_SCHEMA_MIN_SAFE_ROLLBACK_VERSION,
2204+
});
2205+
expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([]);
21712206
expect([
21722207
...(await storage.list({prefix: 'connections_by_room/'})).keys(),
21732208
]).toEqual([]);
2209+
});
21742210

2175-
// Create the room for the first time.
2176-
await authDO.fetch(testRequest);
2211+
test('test migration from schema 0 to schema 1, existing connections by room index entries', async () => {
2212+
await storage.deleteAll();
2213+
await storeTestConnectionState();
21772214

2178-
expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual(1);
2215+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual(undefined);
2216+
expect(
2217+
[...(await storage.list({prefix: 'connection/'})).keys()].length,
2218+
).toBeGreaterThan(0);
2219+
expect(
2220+
[...(await storage.list({prefix: 'connections_by_room/'})).keys()].length,
2221+
).toBeGreaterThan(0);
21792222

2180-
expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([
2181-
'connection/%2FtestUserID%2F%3F/%2FtestRoomID%2F%3F/%2FtestClientID%2F/',
2182-
'connection/testUserID1/testRoomID1/testClientID1/',
2183-
'connection/testUserID1/testRoomID1/testClientID2/',
2184-
'connection/testUserID1/testRoomID2/testClientID4/',
2185-
'connection/testUserID2/testRoomID1/testClientID3/',
2186-
'connection/testUserID2/testRoomID3/testClientID5/',
2187-
]);
2223+
const ensureStorageSchemaMigratedCalls: Promise<void>[] = [];
2224+
new BaseAuthDO(
2225+
{
2226+
roomDO: createRoomDOThatThrowsIfFetchIsCalled(),
2227+
state,
2228+
authHandler: () => Promise.reject('should not be called'),
2229+
authApiKey: TEST_AUTH_API_KEY,
2230+
logSink: new TestLogSink(),
2231+
logLevel: 'debug',
2232+
},
2233+
p => {
2234+
ensureStorageSchemaMigratedCalls.push(p);
2235+
return p;
2236+
},
2237+
);
2238+
expect(ensureStorageSchemaMigratedCalls.length).toEqual(1);
2239+
await ensureStorageSchemaMigratedCalls[0];
2240+
2241+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual({
2242+
version: STORAGE_SCHEMA_VERSION,
2243+
maxVersion: STORAGE_SCHEMA_VERSION,
2244+
minSafeRollbackVersion: STORAGE_SCHEMA_MIN_SAFE_ROLLBACK_VERSION,
2245+
});
2246+
expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([]);
21882247
expect([
21892248
...(await storage.list({prefix: 'connections_by_room/'})).keys(),
2190-
]).toEqual([
2191-
'connections_by_room/%2FtestRoomID%2F%3F/connection/%2FtestUserID%2F%3F/%2FtestRoomID%2F%3F/%2FtestClientID%2F/',
2192-
'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID1/',
2193-
'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID2/',
2194-
'connections_by_room/testRoomID1/connection/testUserID2/testRoomID1/testClientID3/',
2195-
'connections_by_room/testRoomID2/connection/testUserID1/testRoomID2/testClientID4/',
2196-
'connections_by_room/testRoomID3/connection/testUserID2/testRoomID3/testClientID5/',
2197-
]);
2249+
]).toEqual([]);
21982250
});
21992251

22002252
// 3333 is chosen because it is >3 x the limit used to page through the
22012253
// connections and is not a multiple of the limit
22022254
test('test migration from schema 0 to schema 1, 3333 connections', async () => {
2203-
const {testRequest, testRoomDO, state} = await createCreateRoomTestFixture();
2204-
2205-
const authDO = new BaseAuthDO({
2206-
roomDO: testRoomDO,
2207-
state,
2208-
authHandler: () => Promise.reject('should not be called'),
2209-
authApiKey: TEST_AUTH_API_KEY,
2210-
logSink: new TestLogSink(),
2211-
logLevel: 'debug',
2212-
});
22132255
await storage.deleteAll();
22142256

2215-
const expectedConnectionKeys = [];
2216-
const expectedConnectionRoomIndexKeys = [];
22172257
for (let i = 0; i < 3333; i++) {
22182258
const connectionKeyString = `connection/testUserID${i % 10}/testRoomID${
22192259
i % 10
22202260
}/testClientID${i}/`;
22212261
await storage.put(connectionKeyString, {
22222262
connectTimestamp: 1000,
22232263
});
2224-
expectedConnectionKeys.push(connectionKeyString);
2225-
expectedConnectionRoomIndexKeys.push(
2226-
`connections_by_room/testRoomID${i % 10}/connection/testUserID${
2227-
i % 10
2228-
}/testRoomID${i % 10}/testClientID${i}/`,
2229-
);
22302264
}
2231-
expectedConnectionKeys.sort();
2232-
expectedConnectionRoomIndexKeys.sort();
2233-
expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual(
2234-
undefined,
2235-
);
2265+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual(undefined);
22362266
expect([
22372267
...(await storage.list({prefix: 'connections_by_room/'})).keys(),
22382268
]).toEqual([]);
22392269

2240-
// Create the room for the first time.
2241-
await authDO.fetch(testRequest);
2242-
2243-
expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual(1);
2244-
2245-
expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual(
2246-
expectedConnectionKeys,
2270+
const ensureStorageSchemaMigratedCalls: Promise<void>[] = [];
2271+
new BaseAuthDO(
2272+
{
2273+
roomDO: createRoomDOThatThrowsIfFetchIsCalled(),
2274+
state,
2275+
authHandler: () => Promise.reject('should not be called'),
2276+
authApiKey: TEST_AUTH_API_KEY,
2277+
logSink: new TestLogSink(),
2278+
logLevel: 'debug',
2279+
},
2280+
p => {
2281+
ensureStorageSchemaMigratedCalls.push(p);
2282+
return p;
2283+
},
22472284
);
2285+
expect(ensureStorageSchemaMigratedCalls.length).toEqual(1);
2286+
await ensureStorageSchemaMigratedCalls[0];
2287+
2288+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual({
2289+
version: STORAGE_SCHEMA_VERSION,
2290+
maxVersion: STORAGE_SCHEMA_VERSION,
2291+
minSafeRollbackVersion: STORAGE_SCHEMA_MIN_SAFE_ROLLBACK_VERSION,
2292+
});
2293+
expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([]);
22482294
expect([
22492295
...(await storage.list({prefix: 'connections_by_room/'})).keys(),
2250-
]).toEqual(expectedConnectionRoomIndexKeys);
2296+
]).toEqual([]);
2297+
});
2298+
2299+
describe('test down migrate', () => {
2300+
const testWithMinSafeRollbackVersion = async (
2301+
version: number,
2302+
minSafeRollbackVersion: number,
2303+
expectedErrorMessage?: string,
2304+
) => {
2305+
await storage.deleteAll();
2306+
2307+
await storage.put(STORAGE_SCHEMA_META_KEY, {
2308+
version,
2309+
maxVersion: version,
2310+
minSafeRollbackVersion,
2311+
});
2312+
await storeTestConnectionState();
2313+
2314+
const ensureStorageSchemaMigratedCalls: Promise<void>[] = [];
2315+
const ensureStorageSchemaMigratedCallErrorMessages: string[] = [];
2316+
new BaseAuthDO(
2317+
{
2318+
roomDO: createRoomDOThatThrowsIfFetchIsCalled(),
2319+
state,
2320+
authHandler: () => Promise.reject('should not be called'),
2321+
authApiKey: TEST_AUTH_API_KEY,
2322+
logSink: new TestLogSink(),
2323+
logLevel: 'debug',
2324+
},
2325+
p => {
2326+
const catchErrors = async () => {
2327+
try {
2328+
await p;
2329+
} catch (e) {
2330+
ensureStorageSchemaMigratedCallErrorMessages.push(
2331+
(e as Error).message,
2332+
);
2333+
}
2334+
};
2335+
const wrappedP = catchErrors();
2336+
ensureStorageSchemaMigratedCalls.push(wrappedP);
2337+
return wrappedP;
2338+
},
2339+
);
2340+
expect(ensureStorageSchemaMigratedCalls.length).toEqual(1);
2341+
await ensureStorageSchemaMigratedCalls[0];
2342+
if (expectedErrorMessage) {
2343+
expect(ensureStorageSchemaMigratedCallErrorMessages).toEqual([
2344+
expectedErrorMessage,
2345+
]);
2346+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual({
2347+
version,
2348+
maxVersion: version,
2349+
minSafeRollbackVersion,
2350+
});
2351+
} else {
2352+
expect(ensureStorageSchemaMigratedCallErrorMessages.length).toEqual(0);
2353+
expect(await storage.get(STORAGE_SCHEMA_META_KEY)).toEqual({
2354+
version: STORAGE_SCHEMA_VERSION,
2355+
maxVersion: version,
2356+
minSafeRollbackVersion,
2357+
});
2358+
}
2359+
expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual(
2360+
expectedConnectionKeysForTestConnectionState,
2361+
);
2362+
expect([
2363+
...(await storage.list({prefix: 'connections_by_room/'})).keys(),
2364+
]).toEqual(expectedConnectionsByRoomKeysForTestConnectionState);
2365+
};
2366+
2367+
test('from 2 with minSafeRollbackVersion 0', () =>
2368+
testWithMinSafeRollbackVersion(2, 0));
2369+
test('from 2 with minSafeRollbackVersion 1', () =>
2370+
testWithMinSafeRollbackVersion(2, 1));
2371+
test('from 3 with minSafeRollbackVersion 2', () =>
2372+
testWithMinSafeRollbackVersion(
2373+
3,
2374+
2,
2375+
'Cannot safely migrate to schema version 1, schema is currently version 3, min safe rollback version is 2',
2376+
));
22512377
});

0 commit comments

Comments
 (0)