From d0cb5aa17c382900437812d2316883a39e030f97 Mon Sep 17 00:00:00 2001 From: Gregory Baker Date: Fri, 5 May 2023 19:22:12 -0700 Subject: [PATCH 1/4] fix: enable connection revalidationt to scale to more connections than can fit in memory at once --- .../reflect-server/src/server/auth-do.test.ts | 372 +++++++----------- packages/reflect-server/src/server/auth-do.ts | 267 ++++++++++--- 2 files changed, 358 insertions(+), 281 deletions(-) diff --git a/packages/reflect-server/src/server/auth-do.test.ts b/packages/reflect-server/src/server/auth-do.test.ts index 5c25aeeaf7..9bb123e2f8 100644 --- a/packages/reflect-server/src/server/auth-do.test.ts +++ b/packages/reflect-server/src/server/auth-do.test.ts @@ -25,7 +25,14 @@ import { AUTH_API_KEY_HEADER_NAME, createAuthAPIHeaders, } from './auth-api-headers.js'; -import {AUTH_ROUTES, BaseAuthDO, ConnectionRecord} from './auth-do.js'; +import { + AUTH_DO_STORAGE_SCHEMA_VERSION, + AUTH_DO_STORAGE_SCHEMA_VERSION_KEY, + AUTH_ROUTES, + BaseAuthDO, + ConnectionRecord, + recordConnection, +} from './auth-do.js'; import {AuthHandler, USER_DATA_HEADER_NAME} from './auth.js'; import { TestDurableObjectId, @@ -44,8 +51,17 @@ import { const TEST_AUTH_API_KEY = 'TEST_REFLECT_AUTH_API_KEY_TEST'; const {authDO} = getMiniflareBindings(); const authDOID = authDO.idFromName('auth'); +let storage: DurableObjectStorage; +let state: TestDurableObjectState; -beforeEach(() => { +beforeEach(async () => { + storage = await getMiniflareDurableObjectStorage(authDOID); + await storage.deleteAll(); + await storage.put( + AUTH_DO_STORAGE_SCHEMA_VERSION_KEY, + AUTH_DO_STORAGE_SCHEMA_VERSION, + ); + state = new TestDurableObjectState(authDOID, storage); jest.useFakeTimers(); jest.setSystemTime(0); }); @@ -58,7 +74,7 @@ function isAuthRequest(request: Request) { return request.url.indexOf('/api/auth/') !== -1; } -async function createCreateRoomTestFixture() { +function createCreateRoomTestFixture() { const testRoomID = 'testRoomID1'; const testRequest = newCreateRoomRequest( @@ -67,9 +83,6 @@ async function createCreateRoomTestFixture() { testRoomID, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - const roomDOcreateRoomCounts = new Map< string, // objectIDString number @@ -986,9 +999,6 @@ describe("connect will implicitly create a room that doesn't exist", () => { mocket, encodedTestAuth, } = createConnectTestFixture({jurisdiction}); - - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); const logSink = new TestLogSink(); const authDO = new BaseAuthDO({ roomDO: testRoomDO, @@ -1031,9 +1041,6 @@ test('connect calls authHandler and sends resolved UserData in header to Room DO mocket, encodedTestAuth, } = createConnectTestFixture(); - - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); const logSink = new TestLogSink(); const authDO = new BaseAuthDO({ roomDO: testRoomDO, @@ -1079,9 +1086,6 @@ describe('connect with undefined authHandler sends UserData with url param userI testAuth: tTestAuth, encodedTestAuth: tEncodedTestAuth, }); - - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); const logSink = new TestLogSink(); const authDO = new BaseAuthDO({ roomDO: testRoomDO, @@ -1117,9 +1121,6 @@ test('connect wont connect to a room that is closed', async () => { jest.useRealTimers(); const {testUserID, testRoomID, testRequest, testRoomDO} = createConnectTestFixture(); - - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); const logSink = new TestLogSink(); const [, serverWS] = mockWebSocketPair(); const authDO = new BaseAuthDO({ @@ -1165,8 +1166,6 @@ test('connect percent escapes components of the connection key', async () => { testClientID: '/testClientID/&', }); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); const authDO = new BaseAuthDO({ roomDO: testRoomDO, state, @@ -1218,9 +1217,10 @@ describe('connect pipes 401 over ws without calling Room DO if', () => { }, ); const [clientWS, serverWS] = mockWebSocketPair(); + const authDO = new BaseAuthDO({ roomDO: createRoomDOThatThrowsIfFetchIsCalled(), - state: {id: authDOID} as DurableObjectState, + state, authHandler, authApiKey: TEST_AUTH_API_KEY, logSink: new TestLogSink(), @@ -1293,9 +1293,10 @@ describe('connect sends InvalidConnectionRequest over ws without calling Room DO headers, }, ); + const authDO = new BaseAuthDO({ roomDO: createRoomDOThatThrowsIfFetchIsCalled(), - state: {id: authDOID} as DurableObjectState, + state, // eslint-disable-next-line require-await authHandler: () => Promise.reject(new Error('Unexpected call to authHandler')), @@ -1347,9 +1348,10 @@ test('connect sends over InvalidConnectionRequest over ws without calling Room D }, ); const [clientWS, serverWS] = mockWebSocketPair(); + const authDO = new BaseAuthDO({ roomDO: createRoomDOThatThrowsIfFetchIsCalled(), - state: {id: authDOID} as DurableObjectState, + state, // eslint-disable-next-line require-await authHandler: async (auth, roomID) => { expect(auth).toEqual(testAuth); @@ -1397,9 +1399,10 @@ describe('connect sends VersionNotSupported error over ws if path is for unsuppo }, ); const [clientWS, serverWS] = mockWebSocketPair(); + const authDO = new BaseAuthDO({ roomDO: createRoomDOThatThrowsIfFetchIsCalled(), - state: {id: authDOID} as DurableObjectState, + state, // eslint-disable-next-line require-await authHandler: () => Promise.reject(new Error('Unexpected call to authHandler')), @@ -1443,23 +1446,7 @@ test('authInvalidateForUser when requests to roomDOs are successful', async () = }, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - await storage.put('connection/testUserID1/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID1/testClientID2/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID2/testClientID3/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID1/testClientID4/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID3/testClientID5/', { - connectTimestamp: 1000, - }); + await storeTestConnectionState(); const roomDORequestCountsByRoomID = new Map(); const testRoomDO: DurableObjectNamespace = { ...createTestDurableObjectNamespace(), @@ -1520,8 +1507,6 @@ test('authInvalidateForUser when connection ids have chars that need to be perce }, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); await storage.put( 'connection/%2FtestUserID%2F%3F/testRoomID1/%2FtestClientID%2F/', { @@ -1600,20 +1585,7 @@ test('authInvalidateForUser when any request to roomDOs returns error response', }, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - await storage.put('connection/testUserID1/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID2/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID3/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); + await storeTestConnectionState(); const roomDORequestCountsByRoomID = new Map(); const testRoomDO: DurableObjectNamespace = { @@ -1681,9 +1653,6 @@ test('authInvalidateForRoom when request to roomDO is successful', async () => { }, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - let roomDORequestCount = 0; let gotObjectId: DurableObjectId | undefined; const testRoomDO: DurableObjectNamespace = { @@ -1722,6 +1691,76 @@ test('authInvalidateForRoom when request to roomDO is successful', async () => { expect(response.status).toEqual(200); }); +async function storeTestConnectionState() { + recordConnection( + { + userID: 'testUserID1', + roomID: 'testRoomID1', + clientID: 'testClientID1', + }, + storage, + { + connectTimestamp: 1000, + }, + ); + recordConnection( + { + userID: 'testUserID1', + roomID: 'testRoomID1', + clientID: 'testClientID2', + }, + storage, + { + connectTimestamp: 1000, + }, + ); + recordConnection( + { + userID: 'testUserID1', + roomID: 'testRoomID2', + clientID: 'testClientID4', + }, + storage, + { + connectTimestamp: 1000, + }, + ); + recordConnection( + { + userID: 'testUserID2', + roomID: 'testRoomID1', + clientID: 'testClientID3', + }, + storage, + { + connectTimestamp: 1000, + }, + ); + recordConnection( + { + userID: 'testUserID2', + roomID: 'testRoomID3', + clientID: 'testClientID5', + }, + storage, + { + connectTimestamp: 1000, + }, + ); + recordConnection( + { + userID: 'testUserID3', + roomID: 'testRoomID3', + clientID: 'testClientID6', + }, + storage, + { + connectTimestamp: 1000, + }, + ); + await storage.sync(); +} + async function connectAndTestThatRoomGotCreated( authDO: BaseAuthDO, testRequest: Request, @@ -1782,9 +1821,6 @@ test('authInvalidateForRoom when request to roomDO returns error response', asyn }, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - let roomDORequestCount = 0; let gotObjectId: DurableObjectId | undefined; const testRoomDO: DurableObjectNamespace = { @@ -1841,29 +1877,7 @@ test('authInvalidateAll when requests to roomDOs are successful', async () => { }, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - await storage.put('connection/testUserID1/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID1/testClientID2/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID2/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID3/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put( - 'connection/%2FtestUserID%2F%3F/testRoomID/%2FtestClientID%2F/', - { - connectTimestamp: 1000, - }, - ); + await storeTestConnectionState(); const roomDORequestCountsByRoomID = new Map(); const testRoomDO: DurableObjectNamespace = { @@ -1897,16 +1911,14 @@ test('authInvalidateAll when requests to roomDOs are successful', async () => { await createRoom(authDO, 'testRoomID1'); await createRoom(authDO, 'testRoomID2'); await createRoom(authDO, 'testRoomID3'); - await createRoom(authDO, 'testRoomID'); const response = await authDO.fetch(testRequest); expect(response.status).toEqual(200); - expect(roomDORequestCountsByRoomID.size).toEqual(4); + expect(roomDORequestCountsByRoomID.size).toEqual(3); expect(roomDORequestCountsByRoomID.get('testRoomID1')).toEqual(1); expect(roomDORequestCountsByRoomID.get('testRoomID2')).toEqual(1); expect(roomDORequestCountsByRoomID.get('testRoomID3')).toEqual(1); - expect(roomDORequestCountsByRoomID.get('testRoomID')).toEqual(1); }); async function expectForwardedAuthInvalidateRequest( @@ -1934,29 +1946,7 @@ test('authInvalidateAll when any request to roomDOs returns error response', asy }, ); - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - await storage.put('connection/testUserID1/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID1/testClientID2/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID2/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID3/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put( - 'connection/%2FtestUserID%2F%3F/testRoomID/%2FtestClientID%2F/', - { - connectTimestamp: 1000, - }, - ); + await storeTestConnectionState(); const roomDORequestCountsByRoomID = new Map(); const testRoomDO: DurableObjectNamespace = { @@ -1995,22 +1985,22 @@ test('authInvalidateAll when any request to roomDOs returns error response', asy await createRoom(authDO, 'testRoomID1'); await createRoom(authDO, 'testRoomID2'); await createRoom(authDO, 'testRoomID3'); - await createRoom(authDO, 'testRoomID'); const response = await authDO.fetch(testRequest); expect(response.status).toEqual(500); - expect(roomDORequestCountsByRoomID.size).toEqual(4); + expect(roomDORequestCountsByRoomID.size).toEqual(3); expect(roomDORequestCountsByRoomID.get('testRoomID1')).toEqual(1); expect(roomDORequestCountsByRoomID.get('testRoomID2')).toEqual(1); expect(roomDORequestCountsByRoomID.get('testRoomID3')).toEqual(1); - expect(roomDORequestCountsByRoomID.get('testRoomID')).toEqual(1); expect(await response.text()).toEqual( 'Test authInvalidateAll Internal Server Error Msg', ); }); -async function createRevalidateConnectionsTestFixture() { +async function createRevalidateConnectionsTestFixture({ + roomDOIDWithErrorResponse, +}: {roomDOIDWithErrorResponse?: string} = {}) { const testRequest = new Request( `https://test.roci.dev/api/auth/v0/revalidateConnections`, { @@ -2018,27 +2008,7 @@ async function createRevalidateConnectionsTestFixture() { method: 'post', }, ); - - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - await storage.put('connection/testUserID1/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID1/testClientID2/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID2/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID3/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID3/testRoomID3/testClientID1/', { - connectTimestamp: 1000, - }); + await storeTestConnectionState(); const roomDORequestCountsByRoomID = new Map(); const testRoomDO: DurableObjectNamespace = { @@ -2057,18 +2027,26 @@ async function createRevalidateConnectionsTestFixture() { expect(request.url).toEqual( 'https://unused-reflect-room-do.dev/api/auth/v0/connections', ); + if (roomDOIDWithErrorResponse === roomID) { + return new Response( + 'Test revalidateConnections Internal Server Error Msg', + { + status: 500, + }, + ); + } switch (roomID) { case 'testRoomID1': return new Response( JSON.stringify([ {userID: 'testUserID1', clientID: 'testClientID1'}, - {userID: 'testUserID2', clientID: 'testClientID1'}, + {userID: 'testUserID2', clientID: 'testClientID3'}, ]), ); case 'testRoomID2': return new Response( JSON.stringify([ - {userID: 'testUserID1', clientID: 'testClientID1'}, + {userID: 'testUserID1', clientID: 'testClientID4'}, ]), ); case 'testRoomID3': @@ -2106,10 +2084,18 @@ test('revalidateConnections', async () => { expect(roomDORequestCountsByRoomID.get('testRoomID2')).toEqual(1); expect(roomDORequestCountsByRoomID.get('testRoomID3')).toEqual(1); + console.log([...(await storage.list({prefix: 'connection/'})).keys()]); expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([ 'connection/testUserID1/testRoomID1/testClientID1/', - 'connection/testUserID1/testRoomID2/testClientID1/', - 'connection/testUserID2/testRoomID1/testClientID1/', + 'connection/testUserID1/testRoomID2/testClientID4/', + 'connection/testUserID2/testRoomID1/testClientID3/', + ]); + expect([ + ...(await storage.list({prefix: 'connection_room/'})).keys(), + ]).toEqual([ + 'connection_room/testRoomID1/testUserID1/testClientID1/', + 'connection_room/testRoomID1/testUserID2/testClientID3/', + 'connection_room/testRoomID2/testUserID1/testClientID4/', ]); }); @@ -2130,94 +2116,24 @@ test('revalidateConnections continues if one storage delete throws an error', as expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([ 'connection/testUserID1/testRoomID1/testClientID1/', 'connection/testUserID1/testRoomID1/testClientID2/', - 'connection/testUserID1/testRoomID2/testClientID1/', - 'connection/testUserID2/testRoomID1/testClientID1/', + 'connection/testUserID1/testRoomID2/testClientID4/', + 'connection/testUserID2/testRoomID1/testClientID3/', + ]); + expect([ + ...(await storage.list({prefix: 'connection_room/'})).keys(), + ]).toEqual([ + 'connection_room/testRoomID1/testUserID1/testClientID1/', + 'connection_room/testRoomID1/testUserID1/testClientID2/', + 'connection_room/testRoomID1/testUserID2/testClientID3/', + 'connection_room/testRoomID2/testUserID1/testClientID4/', ]); }); test('revalidateConnections continues if one roomDO returns an error', async () => { - const testRequest = new Request( - `https://test.roci.dev/api/auth/v0/revalidateConnections`, - { - headers: createAuthAPIHeaders(TEST_AUTH_API_KEY), - method: 'post', - }, - ); - - const storage = await getMiniflareDurableObjectStorage(authDOID); - const state = new TestDurableObjectState(authDOID, storage); - await storage.put('connection/testUserID1/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID1/testClientID2/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID1/testRoomID2/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID2/testRoomID3/testClientID1/', { - connectTimestamp: 1000, - }); - await storage.put('connection/testUserID3/testRoomID3/testClientID1/', { - connectTimestamp: 1000, - }); - - const roomDORequestCountsByRoomID = new Map(); - const testRoomDO: DurableObjectNamespace = { - ...createTestDurableObjectNamespace(), - get: (id: DurableObjectId) => - new TestDurableObjectStub(id, async (request: Request) => { - if (isAuthRequest(request)) { - const {roomID} = (await getRoomRecordByObjectID( - storage, - id, - )) as RoomRecord; - roomDORequestCountsByRoomID.set( - roomID, - (roomDORequestCountsByRoomID.get(roomID) || 0) + 1, - ); - expect(request.url).toEqual( - 'https://unused-reflect-room-do.dev/api/auth/v0/connections', - ); - switch (roomID) { - case 'testRoomID1': - return new Response( - 'Test revalidateConnections Internal Server Error Msg', - { - status: 500, - }, - ); - case 'testRoomID2': - return new Response( - JSON.stringify([ - {userID: 'testUserID1', clientID: 'testClientID1'}, - ]), - ); - case 'testRoomID3': - return new Response(JSON.stringify([])); - default: - throw new Error(`Unexpected roomID ${roomID}`); - } - } - return new Response('ok', {status: 200}); - }), - }; - - const authDO = new BaseAuthDO({ - roomDO: testRoomDO, - state, - authHandler: () => - Promise.reject(new Error('Unexpected call to authHandler')), - authApiKey: TEST_AUTH_API_KEY, - logSink: new TestLogSink(), - logLevel: 'debug', - }); - await createRoom(authDO, 'testRoomID1'); - await createRoom(authDO, 'testRoomID2'); - await createRoom(authDO, 'testRoomID3'); + const {authDO, testRequest, roomDORequestCountsByRoomID, storage} = + await createRevalidateConnectionsTestFixture({ + roomDOIDWithErrorResponse: 'testRoomID1', + }); const response = await authDO.fetch(testRequest); expect(response.status).toEqual(200); @@ -2228,7 +2144,9 @@ test('revalidateConnections continues if one roomDO returns an error', async () expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([ 'connection/testUserID1/testRoomID1/testClientID1/', 'connection/testUserID1/testRoomID1/testClientID2/', - 'connection/testUserID1/testRoomID2/testClientID1/', - 'connection/testUserID2/testRoomID1/testClientID1/', + 'connection/testUserID1/testRoomID2/testClientID4/', + 'connection/testUserID2/testRoomID1/testClientID3/', ]); }); + +// TODO test migration diff --git a/packages/reflect-server/src/server/auth-do.ts b/packages/reflect-server/src/server/auth-do.ts index fba778d080..26c81f3839 100644 --- a/packages/reflect-server/src/server/auth-do.ts +++ b/packages/reflect-server/src/server/auth-do.ts @@ -9,6 +9,7 @@ import { invalidateForUserRequestSchema, } from 'reflect-protocol'; import {assert} from 'shared/asserts.js'; +import {must} from 'shared/must.js'; import * as valita from 'shared/valita.js'; import {DurableStorage} from '../storage/durable-storage.js'; import {encodeHeaderValue} from '../util/headers.js'; @@ -95,6 +96,10 @@ export const AUTH_ROUTES = { ...AUTH_ROUTES_AUTHED_BY_AUTH_HANDLER, } as const; +export const AUTH_DO_STORAGE_SCHEMA_VERSION_KEY = + 'auth_do_storage_schema_version'; +export const AUTH_DO_STORAGE_SCHEMA_VERSION = 1; + export class BaseAuthDO implements DurableObject { private readonly _router = new Router(); private readonly _roomDO: DurableObjectNamespace; @@ -142,6 +147,9 @@ export class BaseAuthDO implements DurableObject { const lc = addRequestIDFromHeadersOrRandomID(this._lc, request); lc.debug?.('Handling request:', request.url); try { + console.log(1); + await maybeMigrateStorageSchema(this._state.storage, lc); + console.log(2); const resp = await this._router.dispatch(request, {lc}); lc.debug?.(`Returning response: ${resp.status} ${resp.statusText}`); return resp; @@ -359,7 +367,7 @@ export class BaseAuthDO implements DurableObject { ); return closeWithErrorLocal('VersionNotSupported', 'unsupported version'); } - + console.log(3); const {searchParams} = new URL(url); // TODO apparently many of these checks are not tested :( const clientID = searchParams.get('clientID'); @@ -488,15 +496,17 @@ export class BaseAuthDO implements DurableObject { const roomObjectID = this._roomDO.idFromString(roomRecord.objectIDString); // Record the connection in DO storage - const connectionKey = connectionKeyToString({ - userID: userData.userID, - roomID, - clientID, - }); - const connectionRecord: ConnectionRecord = { - connectTimestamp: Date.now(), - }; - await this._state.storage.put(connectionKey, connectionRecord); + recordConnection( + { + userID: userData.userID, + roomID, + clientID, + }, + this._state.storage, + { + connectTimestamp: Date.now(), + }, + ); // Forward the request to the Room Durable Object... const stub = this._roomDO.get(roomObjectID); @@ -607,33 +617,19 @@ export class BaseAuthDO implements DurableObject { private _authRevalidateConnections = post( this._requireAPIKey(async ctx => { const {lc} = ctx; - const connectionRecords = await this._state.storage.list({ - prefix: CONNECTION_KEY_PREFIX, - }); - const connectionKeyStringsByRoomID = new Map>(); - for (const keyString of connectionRecords.keys()) { - const connectionKey = connectionKeyFromString(keyString); - if (!connectionKey) { - lc.error?.('Failed to parse connection key', keyString); - continue; - } - const {roomID} = connectionKey; - let keyStringSet = connectionKeyStringsByRoomID.get(roomID); - if (!keyStringSet) { - keyStringSet = new Set(); - connectionKeyStringsByRoomID.set(roomID, keyStringSet); - } - keyStringSet.add(keyString); - } - lc.info?.( - `Revalidating ${connectionRecords.size} ConnectionRecords across ${connectionKeyStringsByRoomID.size} rooms.`, + lc.info?.('Revalidating connections.'); + const connectionsByRoomGenerator = createConnectionsByRoomGenerator( + this._state.storage, + lc, ); + let connectionCount = 0; + let revalidatedCount = 0; let deleteCount = 0; - for (const [ - roomID, - connectionKeyStringsForRoomID, - ] of connectionKeyStringsByRoomID) { - lc.debug?.(`revalidating connections for ${roomID} waiting for lock.`); + for await (const {roomID, connectionKeys} of connectionsByRoomGenerator) { + connectionCount += connectionKeys.length; + lc.info?.( + `Revalidating ${connectionKeys.length} connections for room ${roomID}.`, + ); await this._authLock.withWrite(async () => { lc.debug?.('got lock.'); const roomObjectID = await this._roomRecordLock.withRead(() => @@ -653,7 +649,7 @@ export class BaseAuthDO implements DurableObject { }, ), ); - let connectionsResponse: ConnectionsResponse | undefined; + let connectionsResponse: ConnectionsResponse; try { const responseJSON = valita.parse( await response.json(), @@ -662,33 +658,48 @@ export class BaseAuthDO implements DurableObject { connectionsResponse = responseJSON; } catch (e) { lc.error?.( - `Bad ${ROOM_ROUTES.authConnections} response from roomDO`, + `Bad ${ROOM_ROUTES.authConnections} response from roomDO ${roomID}`, e, ); + return; } - if (connectionsResponse) { - const openConnectionKeyStrings = new Set( - connectionsResponse.map(({userID, clientID}) => - connectionKeyToString({ - roomID, - userID, - clientID, - }), - ), + const openConnectionKeyStrings = new Set( + connectionsResponse.map(({userID, clientID}) => + connectionKeyToString({ + roomID, + userID, + clientID, + }), + ), + ); + const toDelete: [ConnectionKey, string][] = connectionKeys + .map((key): [ConnectionKey, string] => [ + key, + connectionKeyToString(key), + ]) + .filter( + ([_, keyString]) => !openConnectionKeyStrings.has(keyString), ); - const keysToDelete: string[] = [ - ...connectionKeyStringsForRoomID, - ].filter(keyString => !openConnectionKeyStrings.has(keyString)); - try { - deleteCount += await this._state.storage.delete(keysToDelete); - } catch (e) { - lc.info?.('Failed to delete connections for roomID', roomID); + try { + for (const [keyToDelete] of toDelete) { + deleteConnection(keyToDelete, this._state.storage); } + await this._state.storage.sync(); + } catch (e) { + lc.info?.('Failed to delete connections for roomID', roomID); + return; } + revalidatedCount += connectionKeys.length; + deleteCount += toDelete.length; + lc.info?.( + `Revalidated ${connectionKeys.length} connections for room ${roomID}, deleted ${deleteCount} connections.`, + ); }); } lc.info?.( - `Revalidated ${connectionRecords.size} ConnectionRecords, deleted ${deleteCount} ConnectionRecords.`, + `Revalidated ${revalidatedCount} connections, deleted ${deleteCount} connections. Failed to revalidate ${ + connectionCount - revalidatedCount + } connections.`, ); return new Response('Complete', {status: 200}); }), @@ -756,6 +767,7 @@ export class BaseAuthDO implements DurableObject { } const CONNECTION_KEY_PREFIX = 'connection/'; +const CONNECTION_ROOM_INDEX_PREFIX = 'connection_room/'; function createWSAndCloseWithError( lc: LogContext, @@ -800,6 +812,16 @@ function getConnectionKeyStringUserPrefix(userID: string): string { return `${CONNECTION_KEY_PREFIX}${encodeURIComponent(userID)}/`; } +function connectionKeyToConnectionRoomIndexString(key: ConnectionKey): string { + return `${CONNECTION_ROOM_INDEX_PREFIX}${encodeURIComponent( + key.roomID, + )}/${encodeURIComponent(key.userID)}/${encodeURIComponent(key.clientID)}/`; +} + +function getConnectionRoomIndexPrefix(roomID: string): string { + return `${CONNECTION_ROOM_INDEX_PREFIX}${encodeURIComponent(roomID)}/`; +} + export function connectionKeyFromString( key: string, ): ConnectionKey | undefined { @@ -816,3 +838,140 @@ export function connectionKeyFromString( clientID: decodeURIComponent(parts[3]), }; } + +export function connectionKeyFromRoomIndexString( + key: string, +): ConnectionKey | undefined { + if (!key.startsWith(CONNECTION_ROOM_INDEX_PREFIX)) { + return undefined; + } + const parts = key.split('/'); + if (parts.length !== 5 || parts[4] !== '') { + return undefined; + } + return { + userID: decodeURIComponent(parts[2]), + roomID: decodeURIComponent(parts[1]), + clientID: decodeURIComponent(parts[3]), + }; +} + +async function getConnectionKeysForRoomID( + roomID: string, + storage: DurableObjectStorage, +): Promise { + const connectionKeys = []; + for (const key of ( + await storage.list({ + prefix: getConnectionRoomIndexPrefix(roomID), + }) + ).keys()) { + const connectionKey = connectionKeyFromRoomIndexString(key); + if (connectionKey) { + connectionKeys.push(connectionKey); + } + } + return connectionKeys; +} + +async function* createConnectionsByRoomGenerator( + storage: DurableObjectStorage, + lc: LogContext, +) { + let lastKey = ''; + while (true) { + const nextRoomListResult = await storage.list({ + startAfter: lastKey, + prefix: CONNECTION_ROOM_INDEX_PREFIX, + limit: 1, + }); + if (nextRoomListResult.size === 0) { + return; + } + const firstRoomIndexString: string = nextRoomListResult.keys().next().value; + const connectionKey = + connectionKeyFromRoomIndexString(firstRoomIndexString); + if (!connectionKey) { + lc.error?.( + 'Failed to parse connection room index key', + firstRoomIndexString, + ); + lastKey = firstRoomIndexString; + continue; + } + const {roomID} = connectionKey; + const connectionKeys = await getConnectionKeysForRoomID(roomID, storage); + yield {roomID, connectionKeys}; + lastKey = connectionKeyToConnectionRoomIndexString( + connectionKeys.length > 0 + ? connectionKeys[connectionKeys.length - 1] + : connectionKey, + ); + } +} + +export function recordConnection( + connectionKey: ConnectionKey, + storage: DurableObjectStorage, + record: ConnectionRecord, +) { + const connectionKeyString = connectionKeyToString(connectionKey); + const connectionRoomIndexString = + connectionKeyToConnectionRoomIndexString(connectionKey); + // no await to ensure the two puts are coalesced and done atomically + void storage.put(connectionKeyString, record); + void storage.put(connectionRoomIndexString, {}); +} + +function deleteConnection( + connectionKey: ConnectionKey, + storage: DurableObjectStorage, +) { + const connectionKeyString = connectionKeyToString(connectionKey); + const connectionRoomIndexString = + connectionKeyToConnectionRoomIndexString(connectionKey); + // no await to ensure the two deletes are coalesced and done atomically + void storage.delete(connectionKeyString); + void storage.delete(connectionRoomIndexString); +} + +async function maybeMigrateStorageSchema( + storage: DurableObjectStorage, + lc: LogContext, +) { + const storageSchemaVersion = + (await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)) ?? 0; + if (storageSchemaVersion >= AUTH_DO_STORAGE_SCHEMA_VERSION) { + return; + } + if (storageSchemaVersion === 0) { + lc.info?.( + 'Migrating from storage schema version 0 to storage schema version 1.', + ); + let lastKey = ''; + let done = false; + while (done) { + const connectionsListResult = await storage.list({ + startAfter: lastKey, + prefix: CONNECTION_KEY_PREFIX, + limit: 1000, + }); + for (const connectionKeyString of connectionsListResult.keys()) { + const connectionKey = must( + connectionKeyFromString(connectionKeyString), + ); + void storage.put( + connectionKeyToConnectionRoomIndexString(connectionKey), + {}, + ); + lastKey = connectionKeyString; + } + done = connectionsListResult.size === 0; + } + void storage.put(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY, 1); + await storage.sync(); + lc.info?.( + 'Successfully migrated from storage schema version 0 to storage schema version 1.', + ); + } +} From b01666d7122272f8347722f83a48244e3f56984c Mon Sep 17 00:00:00 2001 From: Gregory Baker Date: Sat, 6 May 2023 09:45:43 -0700 Subject: [PATCH 2/4] finish tests --- .../reflect-server/src/server/auth-do.test.ts | 308 ++++++++++++------ packages/reflect-server/src/server/auth-do.ts | 5 +- 2 files changed, 202 insertions(+), 111 deletions(-) diff --git a/packages/reflect-server/src/server/auth-do.test.ts b/packages/reflect-server/src/server/auth-do.test.ts index 9bb123e2f8..c25b1e593c 100644 --- a/packages/reflect-server/src/server/auth-do.test.ts +++ b/packages/reflect-server/src/server/auth-do.test.ts @@ -30,7 +30,6 @@ import { AUTH_DO_STORAGE_SCHEMA_VERSION_KEY, AUTH_ROUTES, BaseAuthDO, - ConnectionRecord, recordConnection, } from './auth-do.js'; import {AuthHandler, USER_DATA_HEADER_NAME} from './auth.js'; @@ -74,6 +73,34 @@ function isAuthRequest(request: Request) { return request.url.indexOf('/api/auth/') !== -1; } +async function recordConnectionHelper( + userID: string, + roomID: string, + clientID: string, +) { + recordConnection( + { + userID, + roomID, + clientID, + }, + storage, + { + connectTimestamp: 1000, + }, + ); + await storage.sync(); +} + +async function storeTestConnectionState() { + await recordConnectionHelper('testUserID1', 'testRoomID1', 'testClientID1'); + await recordConnectionHelper('testUserID1', 'testRoomID1', 'testClientID2'); + await recordConnectionHelper('testUserID1', 'testRoomID2', 'testClientID3'); + await recordConnectionHelper('testUserID2', 'testRoomID1', 'testClientID4'); + await recordConnectionHelper('testUserID2', 'testRoomID3', 'testClientID5'); + await recordConnectionHelper('testUserID3', 'testRoomID3', 'testClientID6'); +} + function createCreateRoomTestFixture() { const testRoomID = 'testRoomID1'; @@ -1190,12 +1217,22 @@ test('connect percent escapes components of the connection key', async () => { expect(response.headers.get('Sec-WebSocket-Protocol')).toEqual( encodedTestAuth, ); - expect((await storage.list({prefix: 'connection/'})).size).toEqual(1); - const connectionRecord = (await storage.get( - 'connection/%2FtestUserID%2F%3F/testRoomID/%2FtestClientID%2F/', - )) as ConnectionRecord; - expect(connectionRecord).toBeDefined(); - expect(connectionRecord.connectTimestamp).toEqual(testTime); + expect(await storage.list({prefix: 'connection/'})).toEqual( + new Map([ + [ + 'connection/%2FtestUserID%2F%3F/testRoomID/%2FtestClientID%2F/', + {connectTimestamp: testTime}, + ], + ]), + ); + expect(await storage.list({prefix: 'connection_room/'})).toEqual( + new Map([ + [ + 'connection_room/testRoomID/%2FtestUserID%2F%3F/%2FtestClientID%2F/', + {}, + ], + ]), + ); }); describe('connect pipes 401 over ws without calling Room DO if', () => { @@ -1507,27 +1544,22 @@ test('authInvalidateForUser when connection ids have chars that need to be perce }, ); - await storage.put( - 'connection/%2FtestUserID%2F%3F/testRoomID1/%2FtestClientID%2F/', - { - connectTimestamp: 1000, - }, + await recordConnectionHelper( + '/testUserID/?', + 'testRoomID1', + '/testClientID1/&', ); - await storage.put( - 'connection/%2FtestUserID%2F%3F/testRoomID1/%2FtestClientID2%2F/', - { - connectTimestamp: 1000, - }, + await recordConnectionHelper( + '/testUserID/?', + 'testRoomID1', + '/testClientID2/&', ); - await storage.put( - 'connection/%2FtestUserID%2F%3F/testRoomID2/%2FtestClientID%2F/', - { - connectTimestamp: 1000, - }, + await recordConnectionHelper( + '/testUserID/?', + 'testRoomID2', + '/testClientID3/&', ); - await storage.put('connection/testUserID2/testRoomID1/testClientID1/', { - connectTimestamp: 1000, - }); + await recordConnectionHelper('testUserID2', 'testRoomID1', 'testClientID1'); const roomDORequestCountsByRoomID = new Map(); const testRoomDO: DurableObjectNamespace = { @@ -1586,6 +1618,7 @@ test('authInvalidateForUser when any request to roomDOs returns error response', ); await storeTestConnectionState(); + await recordConnectionHelper('testUserID1', 'testRoomID3', 'testClientID6'); const roomDORequestCountsByRoomID = new Map(); const testRoomDO: DurableObjectNamespace = { @@ -1691,76 +1724,6 @@ test('authInvalidateForRoom when request to roomDO is successful', async () => { expect(response.status).toEqual(200); }); -async function storeTestConnectionState() { - recordConnection( - { - userID: 'testUserID1', - roomID: 'testRoomID1', - clientID: 'testClientID1', - }, - storage, - { - connectTimestamp: 1000, - }, - ); - recordConnection( - { - userID: 'testUserID1', - roomID: 'testRoomID1', - clientID: 'testClientID2', - }, - storage, - { - connectTimestamp: 1000, - }, - ); - recordConnection( - { - userID: 'testUserID1', - roomID: 'testRoomID2', - clientID: 'testClientID4', - }, - storage, - { - connectTimestamp: 1000, - }, - ); - recordConnection( - { - userID: 'testUserID2', - roomID: 'testRoomID1', - clientID: 'testClientID3', - }, - storage, - { - connectTimestamp: 1000, - }, - ); - recordConnection( - { - userID: 'testUserID2', - roomID: 'testRoomID3', - clientID: 'testClientID5', - }, - storage, - { - connectTimestamp: 1000, - }, - ); - recordConnection( - { - userID: 'testUserID3', - roomID: 'testRoomID3', - clientID: 'testClientID6', - }, - storage, - { - connectTimestamp: 1000, - }, - ); - await storage.sync(); -} - async function connectAndTestThatRoomGotCreated( authDO: BaseAuthDO, testRequest: Request, @@ -1789,12 +1752,18 @@ async function connectAndTestThatRoomGotCreated( )) as Record | undefined; assert(connectionRecord); expect(connectionRecord.connectTimestamp).toEqual(testTime); + expect(await storage.list({prefix: 'connection_room/'})).toEqual( + new Map([ + [`connection_room/testRoomID1/${testUserID}/testClientID1/`, {}], + ]), + ); } else { expect((await storage.list({prefix: 'connection/'})).size).toEqual(0); const connectionRecord = await storage.get( `connection/${testUserID}/testRoomID1/testClientID1/`, ); expect(connectionRecord).toBeUndefined(); + expect((await storage.list({prefix: 'connection_room/'})).size).toEqual(0); } } @@ -2040,13 +2009,13 @@ async function createRevalidateConnectionsTestFixture({ return new Response( JSON.stringify([ {userID: 'testUserID1', clientID: 'testClientID1'}, - {userID: 'testUserID2', clientID: 'testClientID3'}, + {userID: 'testUserID2', clientID: 'testClientID4'}, ]), ); case 'testRoomID2': return new Response( JSON.stringify([ - {userID: 'testUserID1', clientID: 'testClientID4'}, + {userID: 'testUserID1', clientID: 'testClientID3'}, ]), ); case 'testRoomID3': @@ -2084,18 +2053,17 @@ test('revalidateConnections', async () => { expect(roomDORequestCountsByRoomID.get('testRoomID2')).toEqual(1); expect(roomDORequestCountsByRoomID.get('testRoomID3')).toEqual(1); - console.log([...(await storage.list({prefix: 'connection/'})).keys()]); expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([ 'connection/testUserID1/testRoomID1/testClientID1/', - 'connection/testUserID1/testRoomID2/testClientID4/', - 'connection/testUserID2/testRoomID1/testClientID3/', + 'connection/testUserID1/testRoomID2/testClientID3/', + 'connection/testUserID2/testRoomID1/testClientID4/', ]); expect([ ...(await storage.list({prefix: 'connection_room/'})).keys(), ]).toEqual([ 'connection_room/testRoomID1/testUserID1/testClientID1/', - 'connection_room/testRoomID1/testUserID2/testClientID3/', - 'connection_room/testRoomID2/testUserID1/testClientID4/', + 'connection_room/testRoomID1/testUserID2/testClientID4/', + 'connection_room/testRoomID2/testUserID1/testClientID3/', ]); }); @@ -2116,16 +2084,16 @@ test('revalidateConnections continues if one storage delete throws an error', as expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([ 'connection/testUserID1/testRoomID1/testClientID1/', 'connection/testUserID1/testRoomID1/testClientID2/', - 'connection/testUserID1/testRoomID2/testClientID4/', - 'connection/testUserID2/testRoomID1/testClientID3/', + 'connection/testUserID1/testRoomID2/testClientID3/', + 'connection/testUserID2/testRoomID1/testClientID4/', ]); expect([ ...(await storage.list({prefix: 'connection_room/'})).keys(), ]).toEqual([ 'connection_room/testRoomID1/testUserID1/testClientID1/', 'connection_room/testRoomID1/testUserID1/testClientID2/', - 'connection_room/testRoomID1/testUserID2/testClientID3/', - 'connection_room/testRoomID2/testUserID1/testClientID4/', + 'connection_room/testRoomID1/testUserID2/testClientID4/', + 'connection_room/testRoomID2/testUserID1/testClientID3/', ]); }); @@ -2142,11 +2110,137 @@ test('revalidateConnections continues if one roomDO returns an error', async () expect(roomDORequestCountsByRoomID.get('testRoomID3')).toEqual(1); expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([ + 'connection/testUserID1/testRoomID1/testClientID1/', + 'connection/testUserID1/testRoomID1/testClientID2/', + 'connection/testUserID1/testRoomID2/testClientID3/', + 'connection/testUserID2/testRoomID1/testClientID4/', + ]); + expect([ + ...(await storage.list({prefix: 'connection_room/'})).keys(), + ]).toEqual([ + 'connection_room/testRoomID1/testUserID1/testClientID1/', + 'connection_room/testRoomID1/testUserID1/testClientID2/', + 'connection_room/testRoomID1/testUserID2/testClientID4/', + 'connection_room/testRoomID2/testUserID1/testClientID3/', + ]); +}); + +test('test migration from schema 0 to schema 1, basic', async () => { + const {testRequest, testRoomDO, state} = await createCreateRoomTestFixture(); + + const authDO = new BaseAuthDO({ + roomDO: testRoomDO, + state, + authHandler: () => Promise.reject('should not be called'), + authApiKey: TEST_AUTH_API_KEY, + logSink: new TestLogSink(), + logLevel: 'debug', + }); + await storage.deleteAll(); + + await storage.put('connection/testUserID1/testRoomID1/testClientID1/', { + connectTimestamp: 1000, + }); + await storage.put('connection/testUserID1/testRoomID1/testClientID2/', { + connectTimestamp: 1000, + }); + await storage.put('connection/testUserID2/testRoomID1/testClientID3/', { + connectTimestamp: 1000, + }); + await storage.put('connection/testUserID1/testRoomID2/testClientID4/', { + connectTimestamp: 1000, + }); + await storage.put('connection/testUserID2/testRoomID3/testClientID5/', { + connectTimestamp: 1000, + }); + await storage.put( + 'connection/%2FtestUserID%2F%3F/%2FtestRoomID%2F%3F/%2FtestClientID%2F/', + { + connectTimestamp: 1000, + }, + ); + + expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual( + undefined, + ); + expect([ + ...(await storage.list({prefix: 'connection_room/'})).keys(), + ]).toEqual([]); + + // Create the room for the first time. + await authDO.fetch(testRequest); + + expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual(1); + + expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual([ + 'connection/%2FtestUserID%2F%3F/%2FtestRoomID%2F%3F/%2FtestClientID%2F/', 'connection/testUserID1/testRoomID1/testClientID1/', 'connection/testUserID1/testRoomID1/testClientID2/', 'connection/testUserID1/testRoomID2/testClientID4/', 'connection/testUserID2/testRoomID1/testClientID3/', + 'connection/testUserID2/testRoomID3/testClientID5/', + ]); + expect([ + ...(await storage.list({prefix: 'connection_room/'})).keys(), + ]).toEqual([ + 'connection_room/%2FtestRoomID%2F%3F/%2FtestUserID%2F%3F/%2FtestClientID%2F/', + 'connection_room/testRoomID1/testUserID1/testClientID1/', + 'connection_room/testRoomID1/testUserID1/testClientID2/', + 'connection_room/testRoomID1/testUserID2/testClientID3/', + 'connection_room/testRoomID2/testUserID1/testClientID4/', + 'connection_room/testRoomID3/testUserID2/testClientID5/', ]); }); -// TODO test migration +// 3333 is chosen is it is >3 x the limit used to page through the connections +// an is not a multiple of the limit +test('test migration from schema 0 to schema 1, 3333 connections', async () => { + const {testRequest, testRoomDO, state} = await createCreateRoomTestFixture(); + + const authDO = new BaseAuthDO({ + roomDO: testRoomDO, + state, + authHandler: () => Promise.reject('should not be called'), + authApiKey: TEST_AUTH_API_KEY, + logSink: new TestLogSink(), + logLevel: 'debug', + }); + await storage.deleteAll(); + + const expectedConnectionKeys = []; + const expectedConnectionRoomIndexKeys = []; + for (let i = 0; i < 3333; i++) { + const connectionKeyString = `connection/testUserID${i % 10}/testRoomID${ + i % 10 + }/testClientID${i}/`; + await storage.put(connectionKeyString, { + connectTimestamp: 1000, + }); + expectedConnectionKeys.push(connectionKeyString); + expectedConnectionRoomIndexKeys.push( + `connection_room/testRoomID${i % 10}/testUserID${ + i % 10 + }/testClientID${i}/`, + ); + } + expectedConnectionKeys.sort(); + expectedConnectionRoomIndexKeys.sort(); + expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual( + undefined, + ); + expect([ + ...(await storage.list({prefix: 'connection_room/'})).keys(), + ]).toEqual([]); + + // Create the room for the first time. + await authDO.fetch(testRequest); + + expect(await storage.get(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY)).toEqual(1); + + expect([...(await storage.list({prefix: 'connection/'})).keys()]).toEqual( + expectedConnectionKeys, + ); + expect([ + ...(await storage.list({prefix: 'connection_room/'})).keys(), + ]).toEqual(expectedConnectionRoomIndexKeys); +}); diff --git a/packages/reflect-server/src/server/auth-do.ts b/packages/reflect-server/src/server/auth-do.ts index 26c81f3839..5e2d3bae01 100644 --- a/packages/reflect-server/src/server/auth-do.ts +++ b/packages/reflect-server/src/server/auth-do.ts @@ -147,9 +147,7 @@ export class BaseAuthDO implements DurableObject { const lc = addRequestIDFromHeadersOrRandomID(this._lc, request); lc.debug?.('Handling request:', request.url); try { - console.log(1); await maybeMigrateStorageSchema(this._state.storage, lc); - console.log(2); const resp = await this._router.dispatch(request, {lc}); lc.debug?.(`Returning response: ${resp.status} ${resp.statusText}`); return resp; @@ -367,7 +365,6 @@ export class BaseAuthDO implements DurableObject { ); return closeWithErrorLocal('VersionNotSupported', 'unsupported version'); } - console.log(3); const {searchParams} = new URL(url); // TODO apparently many of these checks are not tested :( const clientID = searchParams.get('clientID'); @@ -950,7 +947,7 @@ async function maybeMigrateStorageSchema( ); let lastKey = ''; let done = false; - while (done) { + while (!done) { const connectionsListResult = await storage.list({ startAfter: lastKey, prefix: CONNECTION_KEY_PREFIX, From 3197949b6c6a64ca974f610759ee21ec3e19a684 Mon Sep 17 00:00:00 2001 From: Gregory Baker Date: Sat, 6 May 2023 12:30:36 -0700 Subject: [PATCH 3/4] make InvalidateAll scale as well --- packages/reflect-server/src/server/auth-do.ts | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/packages/reflect-server/src/server/auth-do.ts b/packages/reflect-server/src/server/auth-do.ts index 5e2d3bae01..25c11f8af9 100644 --- a/packages/reflect-server/src/server/auth-do.ts +++ b/packages/reflect-server/src/server/auth-do.ts @@ -595,18 +595,18 @@ export class BaseAuthDO implements DurableObject { this._requireAPIKey((ctx, req) => { const {lc} = ctx; lc.debug?.(`authInvalidateAll waiting for lock.`); - return this._authLock.withWrite(async () => { + return this._authLock.withWrite(() => { lc.debug?.('got lock.'); - const connectionKeys = ( - await this._state.storage.list({ - prefix: CONNECTION_KEY_PREFIX, - }) - ).keys(); // The request to the Room DOs must be completed inside the write lock // to avoid races with connect requests. - return this._forwardInvalidateRequest(lc, 'authInvalidateAll', req, [ - ...connectionKeys, - ]); + return this._forwardInvalidateRequest( + lc, + 'authInvalidateAll', + req, + // Use async generator because the full list of connections + // may exceed the DO's memory limits. + createConnectionKeyStringsGenerator(this._state.storage), + ); }); }), ); @@ -706,19 +706,15 @@ export class BaseAuthDO implements DurableObject { lc: LogContext, invalidateRequestName: string, request: Request, - connectionKeyStrings: string[], + connectionKeyStrings: Iterable | AsyncGenerator, ): Promise { - const connectionKeys = connectionKeyStrings.map(keyString => { - const connectionKey = connectionKeyFromString(keyString); - if (!connectionKey) { - lc.error?.('Failed to parse connection key', keyString); - } - return connectionKey; - }); const roomIDSet = new Set(); - for (const connectionKey of connectionKeys) { + for await (const keyString of connectionKeyStrings) { + const connectionKey = connectionKeyFromString(keyString); if (connectionKey) { roomIDSet.add(connectionKey.roomID); + } else { + lc.error?.('Failed to parse connection key', keyString); } } @@ -885,7 +881,7 @@ async function* createConnectionsByRoomGenerator( if (nextRoomListResult.size === 0) { return; } - const firstRoomIndexString: string = nextRoomListResult.keys().next().value; + const firstRoomIndexString: string = [...nextRoomListResult.keys()][0]; const connectionKey = connectionKeyFromRoomIndexString(firstRoomIndexString); if (!connectionKey) { @@ -907,6 +903,25 @@ async function* createConnectionsByRoomGenerator( } } +async function* createConnectionKeyStringsGenerator( + storage: DurableObjectStorage, +) { + let lastKey = ''; + let done = false; + while (!done) { + const connectionsListResult = await storage.list({ + startAfter: lastKey, + prefix: CONNECTION_KEY_PREFIX, + limit: 1000, + }); + for (const connectionKeyString of connectionsListResult.keys()) { + yield connectionKeyString; + lastKey = connectionKeyString; + } + done = connectionsListResult.size === 0; + } +} + export function recordConnection( connectionKey: ConnectionKey, storage: DurableObjectStorage, @@ -945,25 +960,14 @@ async function maybeMigrateStorageSchema( lc.info?.( 'Migrating from storage schema version 0 to storage schema version 1.', ); - let lastKey = ''; - let done = false; - while (!done) { - const connectionsListResult = await storage.list({ - startAfter: lastKey, - prefix: CONNECTION_KEY_PREFIX, - limit: 1000, - }); - for (const connectionKeyString of connectionsListResult.keys()) { - const connectionKey = must( - connectionKeyFromString(connectionKeyString), - ); - void storage.put( - connectionKeyToConnectionRoomIndexString(connectionKey), - {}, - ); - lastKey = connectionKeyString; - } - done = connectionsListResult.size === 0; + const connectionKeyStringsGenerator = + createConnectionKeyStringsGenerator(storage); + for await (const connectionKeyString of connectionKeyStringsGenerator) { + const connectionKey = must(connectionKeyFromString(connectionKeyString)); + void storage.put( + connectionKeyToConnectionRoomIndexString(connectionKey), + {}, + ); } void storage.put(AUTH_DO_STORAGE_SCHEMA_VERSION_KEY, 1); await storage.sync(); From ede896d9c987deb2f66c2d5518385670d787c477 Mon Sep 17 00:00:00 2001 From: Gregory Baker Date: Mon, 8 May 2023 10:21:02 -0700 Subject: [PATCH 4/4] feedback --- .../reflect-server/src/server/auth-do.test.ts | 71 ++++++++++--------- packages/reflect-server/src/server/auth-do.ts | 47 ++++++------ 2 files changed, 62 insertions(+), 56 deletions(-) diff --git a/packages/reflect-server/src/server/auth-do.test.ts b/packages/reflect-server/src/server/auth-do.test.ts index c25b1e593c..1ec9bc01a5 100644 --- a/packages/reflect-server/src/server/auth-do.test.ts +++ b/packages/reflect-server/src/server/auth-do.test.ts @@ -1225,10 +1225,10 @@ test('connect percent escapes components of the connection key', async () => { ], ]), ); - expect(await storage.list({prefix: 'connection_room/'})).toEqual( + expect(await storage.list({prefix: 'connections_by_room/'})).toEqual( new Map([ [ - 'connection_room/testRoomID/%2FtestUserID%2F%3F/%2FtestClientID%2F/', + 'connections_by_room/testRoomID/connection/%2FtestUserID%2F%3F/testRoomID/%2FtestClientID%2F/', {}, ], ]), @@ -1752,9 +1752,12 @@ async function connectAndTestThatRoomGotCreated( )) as Record | undefined; assert(connectionRecord); expect(connectionRecord.connectTimestamp).toEqual(testTime); - expect(await storage.list({prefix: 'connection_room/'})).toEqual( + expect(await storage.list({prefix: 'connections_by_room/'})).toEqual( new Map([ - [`connection_room/testRoomID1/${testUserID}/testClientID1/`, {}], + [ + `connections_by_room/testRoomID1/connection/${testUserID}/testRoomID1/testClientID1/`, + {}, + ], ]), ); } else { @@ -1763,7 +1766,9 @@ async function connectAndTestThatRoomGotCreated( `connection/${testUserID}/testRoomID1/testClientID1/`, ); expect(connectionRecord).toBeUndefined(); - expect((await storage.list({prefix: 'connection_room/'})).size).toEqual(0); + expect((await storage.list({prefix: 'connections_by_room/'})).size).toEqual( + 0, + ); } } @@ -2059,11 +2064,11 @@ test('revalidateConnections', async () => { 'connection/testUserID2/testRoomID1/testClientID4/', ]); expect([ - ...(await storage.list({prefix: 'connection_room/'})).keys(), + ...(await storage.list({prefix: 'connections_by_room/'})).keys(), ]).toEqual([ - 'connection_room/testRoomID1/testUserID1/testClientID1/', - 'connection_room/testRoomID1/testUserID2/testClientID4/', - 'connection_room/testRoomID2/testUserID1/testClientID3/', + 'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID1/', + 'connections_by_room/testRoomID1/connection/testUserID2/testRoomID1/testClientID4/', + 'connections_by_room/testRoomID2/connection/testUserID1/testRoomID2/testClientID3/', ]); }); @@ -2088,12 +2093,12 @@ test('revalidateConnections continues if one storage delete throws an error', as 'connection/testUserID2/testRoomID1/testClientID4/', ]); expect([ - ...(await storage.list({prefix: 'connection_room/'})).keys(), + ...(await storage.list({prefix: 'connections_by_room/'})).keys(), ]).toEqual([ - 'connection_room/testRoomID1/testUserID1/testClientID1/', - 'connection_room/testRoomID1/testUserID1/testClientID2/', - 'connection_room/testRoomID1/testUserID2/testClientID4/', - 'connection_room/testRoomID2/testUserID1/testClientID3/', + 'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID1/', + 'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID2/', + 'connections_by_room/testRoomID1/connection/testUserID2/testRoomID1/testClientID4/', + 'connections_by_room/testRoomID2/connection/testUserID1/testRoomID2/testClientID3/', ]); }); @@ -2116,12 +2121,12 @@ test('revalidateConnections continues if one roomDO returns an error', async () 'connection/testUserID2/testRoomID1/testClientID4/', ]); expect([ - ...(await storage.list({prefix: 'connection_room/'})).keys(), + ...(await storage.list({prefix: 'connections_by_room/'})).keys(), ]).toEqual([ - 'connection_room/testRoomID1/testUserID1/testClientID1/', - 'connection_room/testRoomID1/testUserID1/testClientID2/', - 'connection_room/testRoomID1/testUserID2/testClientID4/', - 'connection_room/testRoomID2/testUserID1/testClientID3/', + 'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID1/', + 'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID2/', + 'connections_by_room/testRoomID1/connection/testUserID2/testRoomID1/testClientID4/', + 'connections_by_room/testRoomID2/connection/testUserID1/testRoomID2/testClientID3/', ]); }); @@ -2164,7 +2169,7 @@ test('test migration from schema 0 to schema 1, basic', async () => { undefined, ); expect([ - ...(await storage.list({prefix: 'connection_room/'})).keys(), + ...(await storage.list({prefix: 'connections_by_room/'})).keys(), ]).toEqual([]); // Create the room for the first time. @@ -2181,19 +2186,19 @@ test('test migration from schema 0 to schema 1, basic', async () => { 'connection/testUserID2/testRoomID3/testClientID5/', ]); expect([ - ...(await storage.list({prefix: 'connection_room/'})).keys(), + ...(await storage.list({prefix: 'connections_by_room/'})).keys(), ]).toEqual([ - 'connection_room/%2FtestRoomID%2F%3F/%2FtestUserID%2F%3F/%2FtestClientID%2F/', - 'connection_room/testRoomID1/testUserID1/testClientID1/', - 'connection_room/testRoomID1/testUserID1/testClientID2/', - 'connection_room/testRoomID1/testUserID2/testClientID3/', - 'connection_room/testRoomID2/testUserID1/testClientID4/', - 'connection_room/testRoomID3/testUserID2/testClientID5/', + 'connections_by_room/%2FtestRoomID%2F%3F/connection/%2FtestUserID%2F%3F/%2FtestRoomID%2F%3F/%2FtestClientID%2F/', + 'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID1/', + 'connections_by_room/testRoomID1/connection/testUserID1/testRoomID1/testClientID2/', + 'connections_by_room/testRoomID1/connection/testUserID2/testRoomID1/testClientID3/', + 'connections_by_room/testRoomID2/connection/testUserID1/testRoomID2/testClientID4/', + 'connections_by_room/testRoomID3/connection/testUserID2/testRoomID3/testClientID5/', ]); }); -// 3333 is chosen is it is >3 x the limit used to page through the connections -// an is not a multiple of the limit +// 3333 is chosen because it is >3 x the limit used to page through the +// connections and is not a multiple of the limit test('test migration from schema 0 to schema 1, 3333 connections', async () => { const {testRequest, testRoomDO, state} = await createCreateRoomTestFixture(); @@ -2218,9 +2223,9 @@ test('test migration from schema 0 to schema 1, 3333 connections', async () => { }); expectedConnectionKeys.push(connectionKeyString); expectedConnectionRoomIndexKeys.push( - `connection_room/testRoomID${i % 10}/testUserID${ + `connections_by_room/testRoomID${i % 10}/connection/testUserID${ i % 10 - }/testClientID${i}/`, + }/testRoomID${i % 10}/testClientID${i}/`, ); } expectedConnectionKeys.sort(); @@ -2229,7 +2234,7 @@ test('test migration from schema 0 to schema 1, 3333 connections', async () => { undefined, ); expect([ - ...(await storage.list({prefix: 'connection_room/'})).keys(), + ...(await storage.list({prefix: 'connections_by_room/'})).keys(), ]).toEqual([]); // Create the room for the first time. @@ -2241,6 +2246,6 @@ test('test migration from schema 0 to schema 1, 3333 connections', async () => { expectedConnectionKeys, ); expect([ - ...(await storage.list({prefix: 'connection_room/'})).keys(), + ...(await storage.list({prefix: 'connections_by_room/'})).keys(), ]).toEqual(expectedConnectionRoomIndexKeys); }); diff --git a/packages/reflect-server/src/server/auth-do.ts b/packages/reflect-server/src/server/auth-do.ts index 25c11f8af9..3368ee15ef 100644 --- a/packages/reflect-server/src/server/auth-do.ts +++ b/packages/reflect-server/src/server/auth-do.ts @@ -96,10 +96,6 @@ export const AUTH_ROUTES = { ...AUTH_ROUTES_AUTHED_BY_AUTH_HANDLER, } as const; -export const AUTH_DO_STORAGE_SCHEMA_VERSION_KEY = - 'auth_do_storage_schema_version'; -export const AUTH_DO_STORAGE_SCHEMA_VERSION = 1; - export class BaseAuthDO implements DurableObject { private readonly _router = new Router(); private readonly _roomDO: DurableObjectNamespace; @@ -147,7 +143,7 @@ export class BaseAuthDO implements DurableObject { const lc = addRequestIDFromHeadersOrRandomID(this._lc, request); lc.debug?.('Handling request:', request.url); try { - await maybeMigrateStorageSchema(this._state.storage, lc); + await ensureStorageSchemaMigrated(this._state.storage, lc); const resp = await this._router.dispatch(request, {lc}); lc.debug?.(`Returning response: ${resp.status} ${resp.statusText}`); return resp; @@ -760,7 +756,7 @@ export class BaseAuthDO implements DurableObject { } const CONNECTION_KEY_PREFIX = 'connection/'; -const CONNECTION_ROOM_INDEX_PREFIX = 'connection_room/'; +const CONNECTIONS_BY_ROOM_INDEX_PREFIX = 'connections_by_room/'; function createWSAndCloseWithError( lc: LogContext, @@ -796,9 +792,9 @@ function createWSAndCloseWithError( } function connectionKeyToString(key: ConnectionKey): string { - return `${CONNECTION_KEY_PREFIX}${encodeURIComponent( - key.userID, - )}/${encodeURIComponent(key.roomID)}/${encodeURIComponent(key.clientID)}/`; + return `${getConnectionKeyStringUserPrefix(key.userID)}${encodeURIComponent( + key.roomID, + )}/${encodeURIComponent(key.clientID)}/`; } function getConnectionKeyStringUserPrefix(userID: string): string { @@ -806,13 +802,13 @@ function getConnectionKeyStringUserPrefix(userID: string): string { } function connectionKeyToConnectionRoomIndexString(key: ConnectionKey): string { - return `${CONNECTION_ROOM_INDEX_PREFIX}${encodeURIComponent( - key.roomID, - )}/${encodeURIComponent(key.userID)}/${encodeURIComponent(key.clientID)}/`; + return `${getConnectionRoomIndexPrefix(key.roomID)}${connectionKeyToString( + key, + )}`; } function getConnectionRoomIndexPrefix(roomID: string): string { - return `${CONNECTION_ROOM_INDEX_PREFIX}${encodeURIComponent(roomID)}/`; + return `${CONNECTIONS_BY_ROOM_INDEX_PREFIX}${encodeURIComponent(roomID)}/`; } export function connectionKeyFromString( @@ -835,18 +831,19 @@ export function connectionKeyFromString( export function connectionKeyFromRoomIndexString( key: string, ): ConnectionKey | undefined { - if (!key.startsWith(CONNECTION_ROOM_INDEX_PREFIX)) { + if (!key.startsWith(CONNECTIONS_BY_ROOM_INDEX_PREFIX)) { return undefined; } - const parts = key.split('/'); - if (parts.length !== 5 || parts[4] !== '') { + const indexOfFirstSlashAfterPrefix = key.indexOf( + '/', + CONNECTIONS_BY_ROOM_INDEX_PREFIX.length, + ); + if (indexOfFirstSlashAfterPrefix === -1) { return undefined; } - return { - userID: decodeURIComponent(parts[2]), - roomID: decodeURIComponent(parts[1]), - clientID: decodeURIComponent(parts[3]), - }; + return connectionKeyFromString( + key.substring(indexOfFirstSlashAfterPrefix + 1), + ); } async function getConnectionKeysForRoomID( @@ -875,7 +872,7 @@ async function* createConnectionsByRoomGenerator( while (true) { const nextRoomListResult = await storage.list({ startAfter: lastKey, - prefix: CONNECTION_ROOM_INDEX_PREFIX, + prefix: CONNECTIONS_BY_ROOM_INDEX_PREFIX, limit: 1, }); if (nextRoomListResult.size === 0) { @@ -947,7 +944,11 @@ function deleteConnection( void storage.delete(connectionRoomIndexString); } -async function maybeMigrateStorageSchema( +export const AUTH_DO_STORAGE_SCHEMA_VERSION_KEY = + 'auth_do_storage_schema_version'; +export const AUTH_DO_STORAGE_SCHEMA_VERSION = 1; + +async function ensureStorageSchemaMigrated( storage: DurableObjectStorage, lc: LogContext, ) {