@@ -5,6 +5,8 @@ import type {
5
5
AST ,
6
6
Condition ,
7
7
Conjunction ,
8
+ CorrelatedSubQuery ,
9
+ CorrelatedSubQueryCondition ,
8
10
Disjunction ,
9
11
LiteralValue ,
10
12
Ordering ,
@@ -14,6 +16,7 @@ import type {
14
16
} from '../../../zero-protocol/src/ast.js' ;
15
17
import type { Row } from '../../../zero-protocol/src/data.js' ;
16
18
import type { PrimaryKey } from '../../../zero-protocol/src/primary-key.js' ;
19
+ import { Exists } from '../ivm/exists.js' ;
17
20
import { FanIn } from '../ivm/fan-in.js' ;
18
21
import { FanOut } from '../ivm/fan-out.js' ;
19
22
import { Filter } from '../ivm/filter.js' ;
@@ -104,15 +107,25 @@ export function bindStaticParameters(
104
107
} ;
105
108
106
109
function bindCondition ( condition : Condition ) : Condition {
107
- return condition . type === 'simple'
108
- ? {
109
- ...condition ,
110
- value : bindValue ( condition . value ) ,
111
- }
112
- : {
113
- ...condition ,
114
- conditions : condition . conditions . map ( bindCondition ) ,
115
- } ;
110
+ if ( condition . type === 'simple' ) {
111
+ return {
112
+ ...condition ,
113
+ value : bindValue ( condition . value ) ,
114
+ } ;
115
+ }
116
+ if ( condition . type === 'subquery' ) {
117
+ return {
118
+ ...condition ,
119
+ related : {
120
+ ...condition . related ,
121
+ subquery : visit ( condition . related . subquery ) ,
122
+ } ,
123
+ } ;
124
+ }
125
+ return {
126
+ ...condition ,
127
+ conditions : condition . conditions . map ( bindCondition ) ,
128
+ } ;
116
129
}
117
130
118
131
const bindValue = ( value : ValuePosition ) : LiteralValue => {
@@ -157,8 +170,20 @@ function buildPipelineInternal(
157
170
end = new Skip ( end , ast . start ) ;
158
171
}
159
172
173
+ const correlatedSubQueryConditions = gatherCorrelatedSubQueryConditions (
174
+ ast . where ,
175
+ ) ;
176
+
177
+ for ( const csqc of correlatedSubQueryConditions ) {
178
+ let csq = csqc . related ;
179
+ if ( csqc . condition . type === 'exists' ) {
180
+ csq = { ...csq , subquery : { ...csq . subquery , limit : EXISTS_LIMIT } } ;
181
+ }
182
+ end = applyCorrelatedSubQuery ( csq , delegate , staticQueryParameters , end ) ;
183
+ }
184
+
160
185
if ( ast . where ) {
161
- end = applyWhere ( end , ast . where , appliedFilters ) ;
186
+ end = applyWhere ( end , ast . where , appliedFilters , delegate ) ;
162
187
}
163
188
164
189
if ( ast . limit ) {
@@ -167,28 +192,38 @@ function buildPipelineInternal(
167
192
168
193
if ( ast . related ) {
169
194
for ( const sq of ast . related ) {
170
- assert ( sq . subquery . alias , 'Subquery must have an alias' ) ;
171
- const child = buildPipelineInternal (
172
- sq . subquery ,
173
- delegate ,
174
- staticQueryParameters ,
175
- sq . correlation . childField ,
176
- ) ;
177
- end = new Join ( {
178
- parent : end ,
179
- child,
180
- storage : delegate . createStorage ( ) ,
181
- parentKey : sq . correlation . parentField ,
182
- childKey : sq . correlation . childField ,
183
- relationshipName : sq . subquery . alias ,
184
- hidden : sq . hidden ?? false ,
185
- } ) ;
195
+ end = applyCorrelatedSubQuery ( sq , delegate , staticQueryParameters , end ) ;
186
196
}
187
197
}
188
198
189
199
return end ;
190
200
}
191
201
202
+ function applyCorrelatedSubQuery (
203
+ sq : CorrelatedSubQuery ,
204
+ delegate : BuilderDelegate ,
205
+ staticQueryParameters : StaticQueryParameters | undefined ,
206
+ end : Input ,
207
+ ) {
208
+ assert ( sq . subquery . alias , 'Subquery must have an alias' ) ;
209
+ const child = buildPipelineInternal (
210
+ sq . subquery ,
211
+ delegate ,
212
+ staticQueryParameters ,
213
+ sq . correlation . childField ,
214
+ ) ;
215
+ end = new Join ( {
216
+ parent : end ,
217
+ child,
218
+ storage : delegate . createStorage ( ) ,
219
+ parentKey : sq . correlation . parentField ,
220
+ childKey : sq . correlation . childField ,
221
+ relationshipName : sq . subquery . alias ,
222
+ hidden : sq . hidden ?? false ,
223
+ } ) ;
224
+ return end ;
225
+ }
226
+
192
227
function applyWhere (
193
228
input : Input ,
194
229
condition : Condition ,
@@ -197,12 +232,15 @@ function applyWhere(
197
232
// Or we do the union of queries approach and retain this `appliedFilters` and `sourceConnect` behavior.
198
233
// Downside of that being unbounded memory usage.
199
234
appliedFilters : boolean ,
235
+ delegate : BuilderDelegate ,
200
236
) : Input {
201
237
switch ( condition . type ) {
202
238
case 'and' :
203
- return applyAnd ( input , condition , appliedFilters ) ;
239
+ return applyAnd ( input , condition , appliedFilters , delegate ) ;
204
240
case 'or' :
205
- return applyOr ( input , condition , appliedFilters ) ;
241
+ return applyOr ( input , condition , appliedFilters , delegate ) ;
242
+ case 'subquery' :
243
+ return applyCorrelatedSubqueryCondition ( input , condition , delegate ) ;
206
244
default :
207
245
return applySimpleCondition ( input , condition , appliedFilters ) ;
208
246
}
@@ -212,9 +250,10 @@ function applyAnd(
212
250
input : Input ,
213
251
condition : Conjunction ,
214
252
appliedFilters : boolean ,
253
+ delegate : BuilderDelegate ,
215
254
) {
216
255
for ( const subCondition of condition . conditions ) {
217
- input = applyWhere ( input , subCondition , appliedFilters ) ;
256
+ input = applyWhere ( input , subCondition , appliedFilters , delegate ) ;
218
257
}
219
258
return input ;
220
259
}
@@ -223,11 +262,12 @@ function applyOr(
223
262
input : Input ,
224
263
condition : Disjunction ,
225
264
appliedFilters : boolean ,
265
+ delegate : BuilderDelegate ,
226
266
) : Input {
227
267
const fanOut = new FanOut ( input ) ;
228
268
const branches : Input [ ] = [ ] ;
229
269
for ( const subCondition of condition . conditions ) {
230
- branches . push ( applyWhere ( fanOut , subCondition , appliedFilters ) ) ;
270
+ branches . push ( applyWhere ( fanOut , subCondition , appliedFilters , delegate ) ) ;
231
271
}
232
272
assert ( branches . length > 0 , 'Or condition must have at least one branch' ) ;
233
273
return new FanIn ( fanOut , branches as [ Input , ...Input [ ] ] ) ;
@@ -264,3 +304,37 @@ export function assertOrderingIncludesPK(
264
304
) ;
265
305
}
266
306
}
307
+ function applyCorrelatedSubqueryCondition (
308
+ input : Input ,
309
+ condition : CorrelatedSubQueryCondition ,
310
+ delegate : BuilderDelegate ,
311
+ ) : Input {
312
+ assert ( condition . condition . type === 'exists' ) ;
313
+ return new Exists (
314
+ input ,
315
+ delegate . createStorage ( ) ,
316
+ must ( condition . related . subquery . alias ) ,
317
+ ) ;
318
+ }
319
+
320
+ function gatherCorrelatedSubQueryConditions ( condition : Condition | undefined ) {
321
+ const correlatedSubQueryConditions : CorrelatedSubQueryCondition [ ] = [ ] ;
322
+ const gather = ( condition : Condition ) => {
323
+ if ( condition . type === 'subquery' ) {
324
+ correlatedSubQueryConditions . push ( condition ) ;
325
+ return ;
326
+ }
327
+ if ( condition . type === 'and' || condition . type === 'or' ) {
328
+ for ( const c of condition . conditions ) {
329
+ gather ( c ) ;
330
+ }
331
+ return ;
332
+ }
333
+ } ;
334
+ if ( condition ) {
335
+ gather ( condition ) ;
336
+ }
337
+ return correlatedSubQueryConditions ;
338
+ }
339
+
340
+ const EXISTS_LIMIT = 5 ;
0 commit comments