Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/zql/src/zql/ast2/ast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* As in SQL you can have multiple orderings. We don't currently
* support ordering on anything other than the root query.
*/
export type OrderPart = [field: string, direction: 'asc' | 'desc'];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be worth some commentary on why OrderPart is not a path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we only support ordering by root level rows. Is that what you had in mind or something else?

export type Ordering = OrderPart[];

export type SimpleOperator = EqualityOps | OrderOps | InOps | LikeOps;
export type EqualityOps = '=' | '!=';
export type OrderOps = '<' | '>' | '<=' | '>=';
export type InOps = 'IN' | 'NOT IN';
export type LikeOps = 'LIKE' | 'NOT LIKE' | 'ILIKE' | 'NOT ILIKE';
17 changes: 17 additions & 0 deletions packages/zql/src/zql/ivm2/capture-output.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import type {Input, Output} from './operator.js';
import type {Change, TreeDiff} from './tree-diff.js';

/**
* A simple output that consumes and stores all pushed changes.
*/
export class CaptureOutput implements Output {
readonly changes: Change[] = [];

push(_source: Input, diff: TreeDiff) {
this.changes.push(...diff.changes);
}

reset() {
this.changes.length = 0;
}
}
60 changes: 60 additions & 0 deletions packages/zql/src/zql/ivm2/change-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {expect, test} from 'vitest';
import {ChangeStream} from './change-stream.js';

test('needy', () => {
const arr = [1, 2, 3];
const cases: {
numToConsume: number;
expectError: boolean;
}[] = [
{numToConsume: 1, expectError: true},
{numToConsume: 2, expectError: true},
{numToConsume: 3, expectError: true},
{numToConsume: 4, expectError: false},
{numToConsume: 1, expectError: true},
{numToConsume: 2, expectError: true},
{numToConsume: 3, expectError: true},
{numToConsume: 4, expectError: false},
];

const gen = function* (it: Iterable<number>) {
for (const x of it) {
yield x;
}
};

for (const mode of ['normal', 'needy'] as const) {
for (const wrapInGenerator of [false, true]) {
for (const c of cases) {
const {numToConsume, expectError} = c;
const f = () => {
let count = 0;
let it: Iterable<number> = new ChangeStream(arr, mode);
if (wrapInGenerator) {
it = gen(it);
}
for (const _ of it) {
if (++count === numToConsume) {
break;
}
}
};

if (mode === 'needy' && expectError) {
expect(f, JSON.stringify({mode, wrapInGenerator, c})).toThrow(
'NeedyIterator was not fully consumed!',
);
} else {
expect(f, JSON.stringify({mode, wrapInGenerator, c})).not.toThrow();
}
}
}
}
});

test('once', () => {
const arr = [1, 2, 3];
const cs = new ChangeStream(arr);
expect([...cs]).toEqual(arr);
expect([...cs]).toEqual([]);
});
64 changes: 64 additions & 0 deletions packages/zql/src/zql/ivm2/change-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
export type Mode = 'normal' | 'needy';

/**
* ChangeStream wraps another iterator and enforces some special semantics:
*
* 1. The iterator is only callable once. Our change streams can only be used
* once because they are coupled to the state of the datastore, which would
* need to be reset if we wanted to iterate again. We want to enforce that
* they can only be iterated once.
* 2. If the iterator contains data that was push()'d, it must be fully
* consumed. Otherwise, the resulting state of the query will be incomplete.
*
* For an example of (2), consider a query like:
*
* z.issue.select().orderBy('id').limit(10);
*
* On the first pull, the ChangeStream we receive will be `normal`. We can stop
* consuming it at any point because it is sorted. When we've received 10 rows,
* we can stop consuming it and the query will be complete.
*
* Now consider that someone pushes two changes into the pipeline. These changes
* will not be sorted and so we must consume them all to ensure that the query
* results are correct.
*/
export class ChangeStream<T> implements Iterable<T> {
readonly #iterator: Iterator<T>;
readonly #mode: Mode;

#done: boolean;

constructor(iterable: Iterable<T>, mode: Mode = 'normal') {
this.#iterator = iterable[Symbol.iterator]();
this.#mode = mode;
this.#done = false;
}

[Symbol.iterator]() {
return this;
}

next() {
const result = this.#iterator.next();
this.#done = result.done ?? false;
return result;
}

throw() {
if (this.#iterator.throw) {
return this.#iterator.throw();
}
return {done: true, value: undefined} as const;
}

return() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please implement throw as well so the base iterator can be cleaned up on exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'll need to call this.#iterator.return?.() as well to ensure any resources (like the SQLite statement) are freed / reset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good catch. thank you!

this.#iterator.return?.();

if (!this.#done && this.#mode === 'needy') {
throw new Error('NeedyIterator was not fully consumed!');
}

this.#done = true;
return {done: true, value: undefined} as const;
}
}
148 changes: 148 additions & 0 deletions packages/zql/src/zql/ivm2/data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import {compareUTF8} from 'compare-utf8';
import {
assert,
assertBoolean,
assertNumber,
assertString,
} from 'shared/src/asserts.js';
import type {Ordering} from '../ast2/ast.js';

/**
* The data types that Zero can represent are limited by two things:
*
* 1. The underlying Replicache sync layer currently can only represent JSON
* types. This could possibly be expanded in the future, but we do want to be
* careful of adding encoding overhead. By using JSON, we are taking
* advantage of IndexedDB’s fast native JSValue [de]serialization which has
* historically been a perf advantage for us.
*
* 2. IDs in Zero need to be comparable because we use them for sorting and row
* identity. We could expand the set of allowed value types (to include,
* i.e., Objects) but we would then need to restrict IDs to only comparable
* types.
*
* These two facts leave us with the following allowed types. Zero's replication
* layer must convert other types into these for tables to be used with Zero.
*
* TODO: Add support for undefined to support optimistic mutations on client
* that omit fields.
*/
export type Value = null | boolean | number | string;

/**
* A Row is represented as a JS Object.
*
* We do everything in IVM as loosely typed values because these pipelines are
* going to be constructed at runtime by other code, so type-safety can't buy us
* anything.
*
* Also since the calling code on the client ultimately wants objects to work
* with we end up with a lot less copies by using objects throughout.
*/
export type Row = Record<string, Value>;

/**
* Zero requires that all synced tables have a unique primary key. Primary keys
* composed of multiple columns are supported (and required, due to junction
* tables). Rows from the same source having the same ID are considered to be
* the same row, without comparing other fields.
*
* The code that vends these IDs must return the columns in some consistent
* order over the lifetime of the process. This avoid the sort having to be done
* at the time of comparison.
*
* TODO: Microbenchmark this approach against the version where we put an ID
* symbol on each object. Benchmark maintaining some sorted list of rows.
*/
export type ID = Value[];

/**
* For performance reasons (to avoid expanding every single row with a new
* object/array having the ID fields) we provide access to the identity of a row
* externally, with a separate function when necessary.
*/
export type GetID = (row: Row) => ID;

/**
* Check two IDs for equality. This function considers any two IDs with the same
* components equal, even if they are from different tables.
*/
export function idEquals(id1: ID, id2: ID): boolean {
if (id1.length !== id2.length) {
return false;
}
for (let i = 0; i < id1.length; i++) {
if (id1[i] !== id2[i]) {
return false;
}
}
return true;
}

/**
* Compare two values. The values must be of the same type. This function
* throws at runtime if the types differ.
* @returns < 0 if a < b, 0 if a === b, > 0 if a > b
*/
export function compareValues(a: Value, b: Value): number {
if (a === b) {
return 0;
}
if (a === null) {
return -1;
}
if (b === null) {
return 1;
}
if (typeof a === 'boolean') {
assertBoolean(b);
return a ? 1 : -1;
}
if (typeof a === 'number') {
assertNumber(b);
return a - b;
}
if (typeof a === 'string') {
assertString(b);
// We compare all strings in Zero as UTF-8. This is the default on SQLite
// and we need to match it. See:
// https://blog.replicache.dev/blog/replicache-11-adventures-in-text-encoding.
//
// TODO: We could change this since SQLite supports UTF-16. Microbenchmark
// to see if there's a big win.
//
// https://www.sqlite.org/c3ref/create_collation.html
return compareUTF8(a, b);
}
throw new Error(`Unsupported type: ${a}`);
}

/**
* Compare two IDs. This function throws at runtime if the IDs have different
* lengths, or if the types of the components don't match.
* @returns < 0 if a < b, 0 if a === b, > 0 if a > b
*/
export function compareIDs(a: ID, b: ID): number {
assert(a.length === b.length, 'Mismatched ID lengths');
for (let i = 0; i < a.length; i++) {
const cmp = compareValues(a[i], b[i]);
if (cmp !== 0) {
return cmp;
}
}
return 0;
}

export type Comparator = (r1: Row, r2: Row) => number;

export function makeComparator(order: Ordering): Comparator {
return (a, b) => {
for (const [field, direction] of order) {
const comp = compareValues(a[field], b[field]);
if (comp !== 0) {
return direction === 'asc' ? comp : -comp;
}
}
return 0;
};
}
77 changes: 77 additions & 0 deletions packages/zql/src/zql/ivm2/filter-operator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import {expect, test} from 'vitest';
import {MemoryInput} from './memory-input.js';
import {FilterOperator} from './filter-operator.js';
import {CaptureOutput} from './capture-output.js';
import {everything} from './operator.js';
import type {SimpleOperator} from '../ast2/ast.js';
import type {Value} from './data.js';

test('basics', () => {
const mem = new MemoryInput([['id', 'asc']]);
const filter = new FilterOperator(mem, {
field: 'f',
op: '=',
value: 1,
});
mem.setOutput(filter);

const capture = new CaptureOutput();
filter.setOutput(capture);

mem.push([
{type: 'add', row: {id: 1, f: 1}},
{type: 'add', row: {id: 2, f: 2}},
{type: 'add', row: {id: 3, f: 1}},
{type: 'remove', row: {id: 1, f: 1}},
]);

// The output should only have received the changes that match the filter.
expect(capture.changes).toEqual([
{type: 'add', row: {id: 1, f: 1}},
{type: 'add', row: {id: 3, f: 1}},
{type: 'remove', row: {id: 1, f: 1}},
]);

// Pulling should only return rows that currently match filter.
expect([...filter.pull(everything).diff.changes]).toEqual([
{type: 'add', row: {id: 3, f: 1}},
]);

// But if we look at mem, we see the non-matching rows are still there.
expect([...mem.pull(everything).diff.changes]).toEqual([
{type: 'add', row: {id: 2, f: 2}},
{type: 'add', row: {id: 3, f: 1}},
]);
});

test('operators', () => {
// TODO(aa): This require some thought on how to exhaustively test.
// fast-check to the rescue?
const pipe = (op: SimpleOperator, value: Value) => {
const mem = new MemoryInput([['id', 'asc']]);
const filter = new FilterOperator(mem, {field: 'f', op, value});
mem.setOutput(filter);
const capture = new CaptureOutput();
filter.setOutput(capture);
return {mem, filter, capture};
};

// equals
const vals = [null, true, false, -1, 0, 1, 3.14, '', 'a', 'b'];
for (const v1 of vals) {
for (const v2 of vals) {
const {mem: m1, capture: c1} = pipe('=', v1);
m1.push([{type: 'add', row: {id: 1, f: v2}}]);
expect([...c1.changes]).toEqual(
v1 === v2 ? [{type: 'add', row: {id: 1, f: v1}}] : [],
);

// reverse
const {mem: m2, capture: c2} = pipe('=', v2);
m2.push([{type: 'add', row: {id: 1, f: v1}}]);
expect([...c2.changes]).toEqual(
v1 === v2 ? [{type: 'add', row: {id: 1, f: v2}}] : [],
);
}
}
});
Loading