Skip to content

Commit 65f2d92

Browse files
authored
fix(zero-cache): improve IVM time-slicing to avoid I/O starvation (#4989)
Improve IVM time-slicing to run slices in separate iterations of the Node JS event loop, thus allowing I/O processing to happen between slices and preventing connection failures due to unresponsive pings. The included simulator illustrates the behavior of the two time-slicing implementations based on a single simulated "pipeline" which: * fetches ~250K rows from an SQLite db * yields every 50K rows (roughly 500ms) * measures the delay between a per-second ping and its received response: <img width="362" height="267" alt="Screenshot 2025-10-09 at 11 10 04" src="https://github.com/user-attachments/assets/215051e2-417f-4c53-8b08-a87c957815e0" /> ### Previous implementation The previous `setTimeout()`-based implementation, while achieving fairness across pipelines, was problematic in that the slices of all pipelines were run in a single iteration of the event loop (specifically, the [pending callbacks phase](https://nodejs.org/en/learn/asynchronous-work/event-loop-timers-and-nexttick#pending-callbacks)), thereby introducing an I/O processing delay proportional to the number of pipelines. For example, with 20 pipelines (e.g. ViewSyncers) in the process, the ping delay goes up to 20 x 500ms = 10sec. <img width="376" height="898" alt="Screenshot 2025-10-09 at 11 09 23" src="https://github.com/user-attachments/assets/03178883-d382-4470-afbd-29abe7c24fb0" /> ### New implementation The new time slicing implementation leverages a queue (implemented with a `Lock`) to: * run a single time-slice per iteration of the event loop * using `setImmediate()` to run logic after I/O, and * scheduling the next slice by running `setImmediate()` in that phase. ```ts const queue = new Lock(); function yieldProcess() { // previously ... // return new Promise(resolve => setTimeout(resolve, 0)); // now ... return queue.withLock(() => new Promise(setImmediate)); } ``` As explained in the [specification of setImmediate()](https://nodejs.org/api/timers.html#setimmediatecallback-args): > If an immediate timer is queued from inside an executing callback, that timer will not be triggered until the next event loop iteration. The resulting behavior achieves the same fairness and overall completion time, but allows I/O to be processed between slices, capping the ping delay to the duration of a single time slice: <img width="379" height="1120" alt="Screenshot 2025-10-09 at 11 13 52" src="https://github.com/user-attachments/assets/ba0d9097-5797-487d-b4f4-9878b2e5c146" /> ### Miscellaneous Also consolidated the timer and yielding code, and made it such that we yield before the first time slice (e.g. so that we don't run into the same problem when a bunch of ViewSyncers start processing a long advancement).
1 parent ceecd46 commit 65f2d92

File tree

7 files changed

+237
-27
lines changed

7 files changed

+237
-27
lines changed

package-lock.json

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/zero-cache/src/services/view-syncer/pipeline-driver.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ import {afterEach, beforeEach, describe, expect, test} from 'vitest';
33
import {testLogConfig} from '../../../../otel/src/test-log-config.ts';
44
import {createSilentLogContext} from '../../../../shared/src/logging-test-utils.ts';
55
import type {AST} from '../../../../zero-protocol/src/ast.ts';
6+
import {
7+
CREATE_STORAGE_TABLE,
8+
DatabaseStorage,
9+
} from '../../../../zqlite/src/database-storage.ts';
610
import type {Database as DB} from '../../../../zqlite/src/db.ts';
711
import {Database} from '../../../../zqlite/src/db.ts';
812
import {InspectorDelegate} from '../../server/inspector-delegate.ts';
@@ -15,13 +19,9 @@ import {
1519
type FakeReplicator,
1620
} from '../replicator/test-utils.ts';
1721
import {getMutationResultsQuery} from './cvr.ts';
18-
import {
19-
CREATE_STORAGE_TABLE,
20-
DatabaseStorage,
21-
} from '../../../../zqlite/src/database-storage.ts';
2222
import {PipelineDriver} from './pipeline-driver.ts';
2323
import {ResetPipelinesSignal, Snapshotter} from './snapshotter.ts';
24-
import {Timer} from './view-syncer.ts';
24+
import {TimeSliceTimer} from './view-syncer.ts';
2525

2626
describe('view-syncer/pipeline-driver', () => {
2727
let dbFile: DbFile;
@@ -317,7 +317,7 @@ describe('view-syncer/pipeline-driver', () => {
317317
);
318318

319319
function startTimer() {
320-
return new Timer().start();
320+
return new TimeSliceTimer().startWithoutYielding();
321321
}
322322

323323
function changes() {

packages/zero-cache/src/services/view-syncer/view-syncer.ts

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
10731073
transformationHash,
10741074
transformedAst,
10751075
} of transformedQueries) {
1076-
const timer = new Timer();
1076+
const timer = new TimeSliceTimer();
10771077
let count = 0;
10781078
await startAsyncSpan(
10791079
tracer,
@@ -1086,13 +1086,11 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
10861086
transformationHash,
10871087
queryID,
10881088
transformedAst,
1089-
timer.start(),
1089+
await timer.start(),
10901090
)) {
10911091
if (++count % TIME_SLICE_CHECK_SIZE === 0) {
10921092
if (timer.elapsedLap() > TIME_SLICE_MS) {
1093-
timer.stopLap();
1094-
await yieldProcess(this.#setTimeout);
1095-
timer.startLap();
1093+
await timer.yieldProcess();
10961094
}
10971095
}
10981096
}
@@ -1457,13 +1455,17 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
14571455
}
14581456

14591457
let totalProcessTime = 0;
1460-
const timer = new Timer();
1458+
const timer = new TimeSliceTimer();
14611459
const pipelines = this.#pipelines;
14621460
const hydrations = this.#hydrations;
14631461
const hydrationTime = this.#hydrationTime;
14641462
// eslint-disable-next-line @typescript-eslint/no-this-alias
14651463
const self = this;
14661464

1465+
// yield at the very beginning so that the first time slice
1466+
// is properly processed by the time-slice queue.
1467+
await yieldProcess();
1468+
14671469
function* generateRowChanges(slowHydrateThreshold: number) {
14681470
for (const q of addQueries) {
14691471
lc = lc
@@ -1475,7 +1477,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
14751477
q.transformationHash,
14761478
q.id,
14771479
q.ast,
1478-
timer.start(),
1480+
timer.startWithoutYielding(),
14791481
);
14801482
const elapsed = timer.stop();
14811483
totalProcessTime += elapsed;
@@ -1637,7 +1639,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
16371639

16381640
#processChanges(
16391641
lc: LogContext,
1640-
timer: Timer,
1642+
timer: TimeSliceTimer,
16411643
changes: Iterable<RowChange>,
16421644
updater: CVRQueryDrivenUpdater,
16431645
pokers: PokeHandler,
@@ -1716,9 +1718,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
17161718

17171719
if (rows.size % TIME_SLICE_CHECK_SIZE === 0) {
17181720
if (timer.elapsedLap() > TIME_SLICE_MS) {
1719-
timer.stopLap();
1720-
await yieldProcess(this.#setTimeout);
1721-
timer.startLap();
1721+
await timer.yieldProcess();
17221722
}
17231723
}
17241724
}
@@ -1749,7 +1749,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
17491749
);
17501750
const start = performance.now();
17511751

1752-
const timer = new Timer();
1752+
const timer = new TimeSliceTimer();
17531753
const {version, numChanges, changes} = this.#pipelines.advance(timer);
17541754
lc = lc.withContext('newVersion', version);
17551755

@@ -1774,7 +1774,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
17741774
try {
17751775
await this.#processChanges(
17761776
lc,
1777-
timer.start(),
1777+
await timer.start(),
17781778
changes,
17791779
updater,
17801780
pokers,
@@ -1873,8 +1873,23 @@ function createHashToIDs(cvr: CVRSnapshot) {
18731873
return hashToIDs;
18741874
}
18751875

1876-
function yieldProcess(setTimeoutFn: SetTimeout) {
1877-
return new Promise(resolve => setTimeoutFn(resolve, 0));
1876+
// A global Lock acts as a queue to run a single IVM time slice per iteration
1877+
// of the node event loop, thus bounding I/O delay to the duration of a single
1878+
// time slice.
1879+
//
1880+
// Refresher:
1881+
// https://nodejs.org/en/learn/asynchronous-work/event-loop-timers-and-nexttick#phases-overview
1882+
//
1883+
// Note that recursive use of setImmediate() (i.e. calling setImmediate() from
1884+
// within a setImmediate() callback), results in enqueuing the latter
1885+
// callback in the *next* event loop iteration, as documented in:
1886+
// https://nodejs.org/api/timers.html#setimmediatecallback-args
1887+
//
1888+
// This effectively achieves the desired one-per-event-loop-iteration behavior.
1889+
const timeSliceQueue = new Lock();
1890+
1891+
function yieldProcess() {
1892+
return timeSliceQueue.withLock(() => new Promise(setImmediate));
18781893
}
18791894

18801895
function contentsAndVersion(row: Row) {
@@ -2002,17 +2017,30 @@ function hasExpiredQueries(cvr: CVRSnapshot): boolean {
20022017
return false;
20032018
}
20042019

2005-
export class Timer {
2020+
export class TimeSliceTimer {
20062021
#total = 0;
20072022
#start = 0;
20082023

2009-
start() {
2024+
async start() {
2025+
// yield at the very beginning so that the first time slice
2026+
// is properly processed by the time-slice queue.
2027+
await yieldProcess();
2028+
return this.startWithoutYielding();
2029+
}
2030+
2031+
startWithoutYielding() {
20102032
this.#total = 0;
2011-
this.startLap();
2033+
this.#startLap();
20122034
return this;
20132035
}
20142036

2015-
startLap() {
2037+
async yieldProcess() {
2038+
this.#stopLap();
2039+
await yieldProcess();
2040+
this.#startLap();
2041+
}
2042+
2043+
#startLap() {
20162044
assert(this.#start === 0, 'already running');
20172045
this.#start = performance.now();
20182046
}
@@ -2022,15 +2050,15 @@ export class Timer {
20222050
return performance.now() - this.#start;
20232051
}
20242052

2025-
stopLap() {
2053+
#stopLap() {
20262054
assert(this.#start !== 0, 'not running');
20272055
this.#total += performance.now() - this.#start;
20282056
this.#start = 0;
20292057
}
20302058

20312059
/** @returns the total elapsed time */
20322060
stop(): number {
2033-
this.stopLap();
2061+
this.#stopLap();
20342062
return this.#total;
20352063
}
20362064

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"name": "sqlite-io-yield-simulator",
3+
"private": true,
4+
"version": "0.0.0",
5+
"type": "module",
6+
"scripts": {
7+
"go": "tsx src/main.ts",
8+
"check-types": "tsc",
9+
"format": "prettier --write .",
10+
"check-format": "prettier --check .",
11+
"lint": "oxlint --type-aware src/"
12+
},
13+
"dependencies": {
14+
"@rocicorp/lock": "^1.0.4",
15+
"@rocicorp/logger": "^5.4.0",
16+
"ws": "^8.18.1",
17+
"zqlite": "0.0.0"
18+
},
19+
"devDependencies": {
20+
"@rocicorp/prettier-config": "^0.3.0"
21+
},
22+
"prettier": "@rocicorp/prettier-config"
23+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import {Lock} from '@rocicorp/lock';
2+
import {consoleLogSink, LogContext} from '@rocicorp/logger';
3+
import {fork} from 'node:child_process';
4+
import {WebSocketServer} from 'ws';
5+
import {randInt} from '../../../packages/shared/src/rand.ts';
6+
import {Database, Statement} from '../../../packages/zqlite/src/db.ts';
7+
8+
const lc = new LogContext('info', {}, consoleLogSink);
9+
10+
const queue = new Lock();
11+
12+
function yieldProcess() {
13+
// The old yield implementation ends up scheduling time slices
14+
// for all pipelines in the same iteration of the event loop,
15+
// potentially starving the I/O and incurring a delay proportional
16+
// to the number of pipelines.
17+
//
18+
// return new Promise(resolve => setTimeout(resolve, 0));
19+
20+
// This yield implementation runs one time slice per event loop,
21+
// allowing I/O to be processed between slices, capping the
22+
// ping delay to the duration of a single time slice.
23+
return queue.withLock(() => new Promise(setImmediate));
24+
}
25+
26+
class YieldingPipeline {
27+
readonly lc: LogContext;
28+
readonly stmt: Statement;
29+
30+
constructor(id: number) {
31+
this.lc = lc.withContext('id', id);
32+
const db = new Database(
33+
this.lc,
34+
process.env.DB_FILE ?? '/tmp/zbugs-sync-replica.db',
35+
undefined,
36+
Number.MAX_SAFE_INTEGER, // Don't log slow queries
37+
);
38+
this.stmt = db.prepare('SELECT * FROM comment');
39+
}
40+
41+
async run() {
42+
const start = performance.now();
43+
let lapStart = start;
44+
let i = 0;
45+
for (const _ of this.stmt.iterate()) {
46+
if (i++ % 500000 === 0) {
47+
const lapElapsed = performance.now() - lapStart;
48+
this.lc.debug?.(`Yielding at ${i} rows ${lapElapsed.toFixed(2)} ms`);
49+
await yieldProcess();
50+
lapStart = performance.now();
51+
}
52+
}
53+
const elapsed = performance.now() - start;
54+
this.lc.info?.(`Finished fetching ${i} rows in ${elapsed.toFixed(2)} ms`);
55+
}
56+
}
57+
58+
async function runPipelines(numPipelines: number) {
59+
const pipelines: YieldingPipeline[] = Array.from(
60+
{length: numPipelines},
61+
(_, i) => new YieldingPipeline(i),
62+
);
63+
64+
await Promise.all(pipelines.map(p => p.run()));
65+
}
66+
67+
const port = randInt(10000, 32767);
68+
const wss = new WebSocketServer({port});
69+
lc.debug?.(`Running server on port ${port}`);
70+
wss.on('connection', ws => {
71+
lc.debug?.(`Received client connection`);
72+
ws.on('message', (data: Buffer) => {
73+
const {ping} = JSON.parse(data.toString()) as {ping: number};
74+
const pong = Date.now() - ping;
75+
ws.send(JSON.stringify({ping, pong}));
76+
});
77+
});
78+
79+
fork(new URL('./pinger.ts', import.meta.url), [String(port)]);
80+
81+
const argv = process.argv.slice(2);
82+
const numPipelines = argv.length ? parseInt(argv[0]) : 20;
83+
84+
lc.info?.(`Simulating ${numPipelines} pipeline(s) ...`);
85+
const start = performance.now();
86+
await runPipelines(numPipelines);
87+
const elapsed = performance.now() - start;
88+
lc.info?.(`Finished ${elapsed.toFixed(2)} ms`);

0 commit comments

Comments
 (0)