Skip to content

Commit e8e548f

Browse files
Allow Dart to query realtime tasks (apache#18076)
* Wire up injectables * Refactor to pass query context to slicer * Modify Dart input slicer * Add test * Clean up * Address review comments * Fix equals * Update docs * Have default segment sources for MSQ task and Dart
1 parent a2d326f commit e8e548f

25 files changed

+428
-112
lines changed

docs/querying/query-context.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters
6666
|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query |
6767
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
6868
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
69-
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly, and Dart will not respect this context parameter.|
69+
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly.|
7070

7171
## Parameters by query type
7272

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.druid.msq.exec.ControllerContext;
3535
import org.apache.druid.msq.exec.ControllerMemoryParameters;
3636
import org.apache.druid.msq.exec.MemoryIntrospector;
37+
import org.apache.druid.msq.exec.SegmentSource;
3738
import org.apache.druid.msq.exec.WorkerFailureListener;
3839
import org.apache.druid.msq.exec.WorkerManager;
3940
import org.apache.druid.msq.indexing.IndexerControllerContext;
@@ -77,12 +78,15 @@ public class DartControllerContext implements ControllerContext
7778
*/
7879
public static final int DEFAULT_MAX_NON_LEAF_WORKER_COUNT = 1;
7980

81+
public static final SegmentSource DEFAULT_SEGMENT_SOURCE = SegmentSource.REALTIME;
82+
8083
private final Injector injector;
8184
private final ObjectMapper jsonMapper;
8285
private final DruidNode selfNode;
8386
private final DartWorkerClient workerClient;
8487
private final TimelineServerView serverView;
8588
private final MemoryIntrospector memoryIntrospector;
89+
private final QueryContext context;
8690
private final ServiceMetricEvent.Builder metricBuilder;
8791
private final ServiceEmitter emitter;
8892

@@ -93,7 +97,8 @@ public DartControllerContext(
9397
final DartWorkerClient workerClient,
9498
final MemoryIntrospector memoryIntrospector,
9599
final TimelineServerView serverView,
96-
final ServiceEmitter emitter
100+
final ServiceEmitter emitter,
101+
final QueryContext context
97102
)
98103
{
99104
this.injector = injector;
@@ -102,6 +107,7 @@ public DartControllerContext(
102107
this.workerClient = workerClient;
103108
this.serverView = serverView;
104109
this.memoryIntrospector = memoryIntrospector;
110+
this.context = context;
105111
this.metricBuilder = new ServiceMetricEvent.Builder();
106112
this.emitter = emitter;
107113
}
@@ -180,7 +186,7 @@ public DruidNode selfNode()
180186
@Override
181187
public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
182188
{
183-
return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView);
189+
return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView, context);
184190
}
185191

186192
@Override

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@
2121

2222
import org.apache.druid.msq.dart.controller.sql.DartQueryMaker;
2323
import org.apache.druid.msq.exec.ControllerContext;
24+
import org.apache.druid.query.QueryContext;
2425

2526
/**
2627
* Class for creating {@link ControllerContext} in {@link DartQueryMaker}.
2728
*/
2829
public interface DartControllerContextFactory
2930
{
30-
ControllerContext newContext(String queryId);
31+
ControllerContext newContext(QueryContext queryContext);
3132
}

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
3232
import org.apache.druid.msq.exec.ControllerContext;
3333
import org.apache.druid.msq.exec.MemoryIntrospector;
34+
import org.apache.druid.query.QueryContext;
35+
import org.apache.druid.query.QueryContexts;
3436
import org.apache.druid.rpc.ServiceClientFactory;
3537
import org.apache.druid.server.DruidNode;
3638

@@ -68,16 +70,18 @@ public DartControllerContextFactoryImpl(
6870
}
6971

7072
@Override
71-
public ControllerContext newContext(final String queryId)
73+
public ControllerContext newContext(final QueryContext context)
7274
{
75+
final String queryId = context.getString(QueryContexts.CTX_DART_QUERY_ID);
7376
return new DartControllerContext(
7477
injector,
7578
jsonMapper,
7679
selfNode,
7780
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()),
7881
memoryIntrospector,
7982
serverView,
80-
emitter
83+
emitter,
84+
context
8185
);
8286
}
8387
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.msq.dart.controller;
21+
22+
import org.apache.druid.msq.dart.worker.DartQueryableSegment;
23+
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
/**
29+
* Represents the set of segments assigned to a particular dart worker, used by {@link DartTableInputSpecSlicer}.
30+
*/
31+
public class DartSegmentAssignment
32+
{
33+
private final List<DartQueryableSegment> dartQueryableSegments;
34+
private final List<DataServerRequestDescriptor> dataServerRequestDescriptor;
35+
36+
public DartSegmentAssignment(
37+
List<DartQueryableSegment> dartQueryableSegments,
38+
List<DataServerRequestDescriptor> dataServerRequestDescriptor
39+
)
40+
{
41+
this.dartQueryableSegments = dartQueryableSegments;
42+
this.dataServerRequestDescriptor = dataServerRequestDescriptor;
43+
}
44+
45+
public static DartSegmentAssignment empty()
46+
{
47+
return new DartSegmentAssignment(new ArrayList<>(), new ArrayList<>());
48+
}
49+
50+
public void addSegments(DartQueryableSegment segment)
51+
{
52+
dartQueryableSegments.add(segment);
53+
}
54+
55+
public void addRequest(DataServerRequestDescriptor requestDescriptor)
56+
{
57+
dataServerRequestDescriptor.add(requestDescriptor);
58+
}
59+
60+
public List<DartQueryableSegment> getDartQueryableSegments()
61+
{
62+
return dartQueryableSegments;
63+
}
64+
65+
public List<DataServerRequestDescriptor> getDataServerRequestDescriptor()
66+
{
67+
return dataServerRequestDescriptor;
68+
}
69+
70+
public boolean isEmpty()
71+
{
72+
return dataServerRequestDescriptor.isEmpty() && dartQueryableSegments.isEmpty();
73+
}
74+
}

0 commit comments

Comments
 (0)