Skip to content

Commit c8480ac

Browse files
authored
[CLN]: remove PrefetchRecordOperator (#5345)
1 parent a0bd1f4 commit c8480ac

File tree

5 files changed

+0
-204
lines changed

5 files changed

+0
-204
lines changed

rust/worker/src/execution/operators/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ pub mod knn_merge;
1717
pub mod knn_projection;
1818
pub mod limit;
1919
pub mod partition_log;
20-
pub mod prefetch_record;
2120
pub mod prefetch_segment;
2221
pub mod projection;
2322
pub mod purge_dirty_log;

rust/worker/src/execution/operators/prefetch_record.rs

Lines changed: 0 additions & 105 deletions
This file was deleted.

rust/worker/src/execution/orchestration/get.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use crate::execution::operators::{
1717
fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput},
1818
filter::{FilterError, FilterInput, FilterOutput},
1919
limit::{LimitError, LimitInput, LimitOutput},
20-
prefetch_record::{PrefetchRecordError, PrefetchRecordOperator, PrefetchRecordOutput},
2120
prefetch_segment::{
2221
PrefetchSegmentError, PrefetchSegmentInput, PrefetchSegmentOperator, PrefetchSegmentOutput,
2322
},
@@ -343,18 +342,6 @@ impl Handler<TaskResult<LimitOutput, LimitError>> for GetOrchestrator {
343342
offset_ids: output.offset_ids.iter().collect(),
344343
};
345344

346-
// Prefetch records before projection
347-
let prefetch_task = wrap(
348-
Box::new(PrefetchRecordOperator {}),
349-
input.clone(),
350-
ctx.receiver(),
351-
self.context.task_cancellation_token.clone(),
352-
);
353-
354-
if !self.send(prefetch_task, ctx, Some(Span::current())).await {
355-
return;
356-
}
357-
358345
let task = wrap(
359346
Box::new(self.projection.clone()),
360347
input,
@@ -365,19 +352,6 @@ impl Handler<TaskResult<LimitOutput, LimitError>> for GetOrchestrator {
365352
}
366353
}
367354

368-
#[async_trait]
369-
impl Handler<TaskResult<PrefetchRecordOutput, PrefetchRecordError>> for GetOrchestrator {
370-
type Result = ();
371-
372-
async fn handle(
373-
&mut self,
374-
_message: TaskResult<PrefetchRecordOutput, PrefetchRecordError>,
375-
_ctx: &ComponentContext<Self>,
376-
) {
377-
// The output and error from `PrefetchRecordOperator` are ignored
378-
}
379-
}
380-
381355
#[async_trait]
382356
impl Handler<TaskResult<ProjectionOutput, ProjectionError>> for GetOrchestrator {
383357
type Result = ();

rust/worker/src/execution/orchestration/knn.rs

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ use crate::execution::operators::{
1515
knn_log::{KnnLogError, KnnLogInput},
1616
knn_merge::{KnnMergeError, KnnMergeInput, KnnMergeOutput},
1717
knn_projection::{KnnProjectionError, KnnProjectionInput},
18-
prefetch_record::{
19-
PrefetchRecordError, PrefetchRecordInput, PrefetchRecordOperator, PrefetchRecordOutput,
20-
},
2118
};
2219

2320
use super::knn_filter::{KnnError, KnnFilterOutput};
@@ -290,26 +287,6 @@ impl Handler<TaskResult<KnnMergeOutput, KnnMergeError>> for KnnOrchestrator {
290287
None => return,
291288
};
292289

293-
// Prefetch records before projection
294-
let prefetch_task = wrap(
295-
Box::new(PrefetchRecordOperator {}),
296-
PrefetchRecordInput {
297-
logs: self.knn_filter_output.logs.clone(),
298-
blockfile_provider: self.blockfile_provider.clone(),
299-
record_segment: self.knn_filter_output.record_segment.clone(),
300-
offset_ids: output
301-
.distances
302-
.iter()
303-
.map(|record| record.offset_id)
304-
.collect(),
305-
},
306-
ctx.receiver(),
307-
self.context.task_cancellation_token.clone(),
308-
);
309-
// Prefetch span is detached from the orchestrator.
310-
let prefetch_span = tracing::info_span!(parent: None, "Prefetch_record", num_records = output.distances.len());
311-
self.send(prefetch_task, ctx, Some(prefetch_span)).await;
312-
313290
let projection_task = wrap(
314291
Box::new(self.knn_projection.clone()),
315292
KnnProjectionInput {
@@ -325,19 +302,6 @@ impl Handler<TaskResult<KnnMergeOutput, KnnMergeError>> for KnnOrchestrator {
325302
}
326303
}
327304

328-
#[async_trait]
329-
impl Handler<TaskResult<PrefetchRecordOutput, PrefetchRecordError>> for KnnOrchestrator {
330-
type Result = ();
331-
332-
async fn handle(
333-
&mut self,
334-
_message: TaskResult<PrefetchRecordOutput, PrefetchRecordError>,
335-
_ctx: &ComponentContext<Self>,
336-
) {
337-
// The output and error from `PrefetchRecordOperator` are ignored
338-
}
339-
}
340-
341305
#[async_trait]
342306
impl Handler<TaskResult<KnnProjectionOutput, KnnProjectionError>> for KnnOrchestrator {
343307
type Result = ();

rust/worker/src/execution/orchestration/spann_knn.rs

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ use crate::execution::operators::{
2222
knn_log::{KnnLogError, KnnLogInput},
2323
knn_merge::{KnnMergeError, KnnMergeInput, KnnMergeOutput},
2424
knn_projection::{KnnProjectionError, KnnProjectionInput},
25-
prefetch_record::{
26-
PrefetchRecordError, PrefetchRecordInput, PrefetchRecordOperator, PrefetchRecordOutput,
27-
},
2825
spann_bf_pl::{SpannBfPlError, SpannBfPlInput, SpannBfPlOperator, SpannBfPlOutput},
2926
spann_centers_search::{
3027
SpannCentersSearchError, SpannCentersSearchInput, SpannCentersSearchOperator,
@@ -369,26 +366,6 @@ impl Handler<TaskResult<KnnMergeOutput, KnnMergeError>> for SpannKnnOrchestrator
369366
None => return,
370367
};
371368

372-
// Prefetch records before projection
373-
let prefetch_task = wrap(
374-
Box::new(PrefetchRecordOperator {}),
375-
PrefetchRecordInput {
376-
logs: self.knn_filter_output.logs.clone(),
377-
blockfile_provider: self.spann_provider.blockfile_provider.clone(),
378-
record_segment: self.knn_filter_output.record_segment.clone(),
379-
offset_ids: output
380-
.distances
381-
.iter()
382-
.map(|record| record.offset_id)
383-
.collect(),
384-
},
385-
ctx.receiver(),
386-
self.context.task_cancellation_token.clone(),
387-
);
388-
// Prefetch span is detached from the orchestrator.
389-
let prefetch_span = tracing::info_span!(parent: None, "Prefetch_record", num_records = output.distances.len());
390-
self.send(prefetch_task, ctx, Some(prefetch_span)).await;
391-
392369
let projection_task = wrap(
393370
Box::new(self.knn_projection.clone()),
394371
KnnProjectionInput {
@@ -404,19 +381,6 @@ impl Handler<TaskResult<KnnMergeOutput, KnnMergeError>> for SpannKnnOrchestrator
404381
}
405382
}
406383

407-
#[async_trait]
408-
impl Handler<TaskResult<PrefetchRecordOutput, PrefetchRecordError>> for SpannKnnOrchestrator {
409-
type Result = ();
410-
411-
async fn handle(
412-
&mut self,
413-
_message: TaskResult<PrefetchRecordOutput, PrefetchRecordError>,
414-
_ctx: &ComponentContext<Self>,
415-
) {
416-
// The output and error from `PrefetchRecordOperator` are ignored
417-
}
418-
}
419-
420384
#[async_trait]
421385
impl Handler<TaskResult<KnnProjectionOutput, KnnProjectionError>> for SpannKnnOrchestrator {
422386
type Result = ();

0 commit comments

Comments
 (0)