Skip to content

Commit 6b3cb50

Browse files
authored
Use CoordinatorClient for fetching tier lookup (apache#18142)
This patch is part of the effort to phase out `DruidLeaderClient` and utilize `CoordinatorClient` to interact with Coordinators. This patch does the following: - Add API `CoordinatorClient.fetchLookupForTier()` - Replace use of `DruidLeaderClient` in `LookupReferencesManager` with `CoordinatorClient`
1 parent d4f116c commit 6b3cb50

File tree

6 files changed

+231
-346
lines changed

6 files changed

+231
-346
lines changed

server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.druid.client.BootstrapSegmentsResponse;
2424
import org.apache.druid.client.ImmutableSegmentLoadInfo;
2525
import org.apache.druid.query.SegmentDescriptor;
26+
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
2627
import org.apache.druid.rpc.ServiceRetryPolicy;
2728
import org.apache.druid.segment.metadata.DataSourceInformation;
2829
import org.apache.druid.server.compaction.CompactionStatusResponse;
@@ -32,6 +33,7 @@
3233

3334
import javax.annotation.Nullable;
3435
import java.util.List;
36+
import java.util.Map;
3537
import java.util.Set;
3638

3739
public interface CoordinatorClient
@@ -94,4 +96,13 @@ public interface CoordinatorClient
9496
* API: {@code GET /druid/coordinator/v1/config}
9597
*/
9698
ListenableFuture<CoordinatorDynamicConfig> getCoordinatorDynamicConfig();
99+
100+
/**
101+
* Gets the lookup configuration for a tier
102+
* <p>
103+
* API: {@code GET /druid/coordinator/v1/lookups/config/<tier>}
104+
*
105+
* @param tier The name of the tier for which the lookup configuration is to be fetched.
106+
*/
107+
ListenableFuture<Map<String, LookupExtractorFactoryContainer>> fetchLookupsForTier(String tier);
97108
}

server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
import org.apache.druid.java.util.common.StringUtils;
3131
import org.apache.druid.java.util.common.jackson.JacksonUtils;
3232
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
33+
import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
3334
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
3435
import org.apache.druid.query.SegmentDescriptor;
36+
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
37+
import org.apache.druid.query.lookup.LookupUtils;
3538
import org.apache.druid.rpc.RequestBuilder;
3639
import org.apache.druid.rpc.ServiceClient;
3740
import org.apache.druid.rpc.ServiceRetryPolicy;
@@ -46,6 +49,7 @@
4649
import javax.annotation.Nullable;
4750
import java.util.ArrayList;
4851
import java.util.List;
52+
import java.util.Map;
4953
import java.util.Set;
5054

5155
public class CoordinatorClientImpl implements CoordinatorClient
@@ -239,4 +243,36 @@ public ListenableFuture<CoordinatorDynamicConfig> getCoordinatorDynamicConfig()
239243
)
240244
);
241245
}
246+
247+
@Override
248+
public ListenableFuture<Map<String, LookupExtractorFactoryContainer>> fetchLookupsForTier(
249+
String tier
250+
)
251+
{
252+
final String path = StringUtils.format(
253+
"/druid/coordinator/v1/lookups/config/%s?detailed=true",
254+
StringUtils.urlEncode(tier)
255+
);
256+
257+
return FutureUtils.transform(
258+
client.asyncRequest(
259+
new RequestBuilder(HttpMethod.GET, path),
260+
new BytesFullResponseHandler()
261+
),
262+
this::extractLookupFactory
263+
);
264+
}
265+
266+
private Map<String, LookupExtractorFactoryContainer> extractLookupFactory(BytesFullResponseHolder holder)
267+
{
268+
Map<String, Object> lookupNameToGenericConfig = JacksonUtils.readValue(
269+
jsonMapper,
270+
holder.getContent(),
271+
new TypeReference<>() {}
272+
);
273+
return LookupUtils.tryConvertObjectMapToLookupConfigMap(
274+
lookupNameToGenericConfig,
275+
jsonMapper
276+
);
277+
}
242278
}

server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.druid.client.BootstrapSegmentsResponse;
2424
import org.apache.druid.client.ImmutableSegmentLoadInfo;
2525
import org.apache.druid.query.SegmentDescriptor;
26+
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
2627
import org.apache.druid.rpc.ServiceRetryPolicy;
2728
import org.apache.druid.segment.metadata.DataSourceInformation;
2829
import org.apache.druid.server.compaction.CompactionStatusResponse;
@@ -32,6 +33,7 @@
3233

3334
import javax.annotation.Nullable;
3435
import java.util.List;
36+
import java.util.Map;
3537
import java.util.Set;
3638

3739
public class NoopCoordinatorClient implements CoordinatorClient
@@ -97,4 +99,12 @@ public ListenableFuture<CoordinatorDynamicConfig> getCoordinatorDynamicConfig()
9799
throw new UnsupportedOperationException();
98100
}
99101

102+
@Override
103+
public ListenableFuture<Map<String, LookupExtractorFactoryContainer>> fetchLookupsForTier(
104+
String tier
105+
)
106+
{
107+
throw new UnsupportedOperationException();
108+
}
109+
100110
}

server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java

Lines changed: 37 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,31 @@
2020
package org.apache.druid.query.lookup;
2121

2222

23-
import com.fasterxml.jackson.core.type.TypeReference;
2423
import com.fasterxml.jackson.databind.ObjectMapper;
2524
import com.google.common.annotations.VisibleForTesting;
2625
import com.google.common.base.Preconditions;
2726
import com.google.common.base.Strings;
27+
import com.google.common.base.Throwables;
2828
import com.google.common.collect.ImmutableList;
2929
import com.google.common.collect.ImmutableMap;
3030
import com.google.inject.Inject;
3131
import org.apache.commons.lang3.mutable.MutableBoolean;
32-
import org.apache.druid.client.coordinator.Coordinator;
32+
import org.apache.druid.client.coordinator.CoordinatorClient;
33+
import org.apache.druid.common.guava.FutureUtils;
3334
import org.apache.druid.concurrent.LifecycleLock;
34-
import org.apache.druid.discovery.DruidLeaderClient;
3535
import org.apache.druid.guice.ManageLifecycle;
3636
import org.apache.druid.guice.annotations.Json;
3737
import org.apache.druid.java.util.common.FileUtils;
38-
import org.apache.druid.java.util.common.IOE;
3938
import org.apache.druid.java.util.common.ISE;
4039
import org.apache.druid.java.util.common.RE;
4140
import org.apache.druid.java.util.common.RetryUtils;
42-
import org.apache.druid.java.util.common.StringUtils;
4341
import org.apache.druid.java.util.common.concurrent.Execs;
4442
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
4543
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
4644
import org.apache.druid.java.util.emitter.EmittingLogger;
47-
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
45+
import org.apache.druid.rpc.HttpResponseException;
4846
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
4947
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
50-
import org.jboss.netty.handler.codec.http.HttpMethod;
5148
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
5249

5350
import javax.annotation.Nullable;
@@ -78,7 +75,7 @@
7875
* This class provide a basic {@link LookupExtractorFactory} references manager. It allows basic operations fetching,
7976
* listing, adding and deleting of {@link LookupExtractor} objects, and can take periodic snap shot of the loaded lookup
8077
* extractor specifications in order to bootstrap nodes after restart.
81-
*
78+
* <p>
8279
* It also implements {@link LookupExtractorFactoryContainerProvider}, to supply queries and indexing transformations
8380
* with a reference to a {@link LookupExtractorFactoryContainer}. This class is a companion of
8481
* {@link org.apache.druid.server.lookup.cache.LookupCoordinatorManager}, which communicates with
@@ -89,9 +86,6 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
8986
{
9087
private static final EmittingLogger LOG = new EmittingLogger(LookupReferencesManager.class);
9188

92-
private static final TypeReference<Map<String, Object>> LOOKUPS_ALL_GENERIC_REFERENCE =
93-
new TypeReference<>() {};
94-
9589
// Lookups state (loaded/to-be-loaded/to-be-dropped etc) is managed by immutable LookupUpdateState instance.
9690
// Any update to state is done by creating updated LookupUpdateState instance and atomically setting that
9791
// into the ref here.
@@ -111,9 +105,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
111105
//for unit testing only
112106
private final boolean testMode;
113107

114-
private final DruidLeaderClient druidLeaderClient;
115-
116-
private final ObjectMapper jsonMapper;
108+
private final CoordinatorClient coordinatorClient;
117109

118110
private final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig;
119111

@@ -125,18 +117,18 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
125117
public LookupReferencesManager(
126118
LookupConfig lookupConfig,
127119
@Json ObjectMapper objectMapper,
128-
@Coordinator DruidLeaderClient druidLeaderClient,
120+
CoordinatorClient coordinatorClient,
129121
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig
130122
)
131123
{
132-
this(lookupConfig, objectMapper, druidLeaderClient, lookupListeningAnnouncerConfig, false);
124+
this(lookupConfig, objectMapper, coordinatorClient, lookupListeningAnnouncerConfig, false);
133125
}
134126

135127
@VisibleForTesting
136128
LookupReferencesManager(
137129
LookupConfig lookupConfig,
138130
ObjectMapper objectMapper,
139-
DruidLeaderClient druidLeaderClient,
131+
CoordinatorClient coordinatorClient,
140132
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
141133
boolean testMode
142134
)
@@ -146,8 +138,7 @@ public LookupReferencesManager(
146138
} else {
147139
this.lookupSnapshotTaker = new LookupSnapshotTaker(objectMapper, lookupConfig.getSnapshotWorkingDir());
148140
}
149-
this.druidLeaderClient = druidLeaderClient;
150-
this.jsonMapper = objectMapper;
141+
this.coordinatorClient = coordinatorClient;
151142
this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig;
152143
this.lookupConfig = lookupConfig;
153144
this.testMode = testMode;
@@ -286,7 +277,11 @@ public void add(String lookupName, LookupExtractorFactoryContainer lookupExtract
286277
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.NONE ||
287278
(lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED
288279
&& !lookupLoadingSpec.getLookupsToLoad().contains(lookupName))) {
289-
LOG.info("Skipping notice to add lookup [%s] since current lookup loading mode [%s] does not allow it.", lookupName, lookupLoadingSpec.getMode());
280+
LOG.info(
281+
"Skipping notice to add lookup[%s] since current lookup loading mode[%s] does not allow it.",
282+
lookupName,
283+
lookupLoadingSpec.getMode()
284+
);
290285
return;
291286
}
292287
addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer, lookupConfig.getLookupStartRetries()));
@@ -401,7 +396,8 @@ private void loadLookupsAndInitStateRef()
401396
lookupBeanList = getLookupsList();
402397
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED && lookupBeanList != null) {
403398
lookupBeanList = lookupBeanList.stream()
404-
.filter(lookupBean -> lookupLoadingSpec.getLookupsToLoad().contains(lookupBean.getName()))
399+
.filter(lookupBean -> lookupLoadingSpec.getLookupsToLoad()
400+
.contains(lookupBean.getName()))
405401
.collect(Collectors.toList());
406402
}
407403
}
@@ -437,7 +433,6 @@ private List<LookupBean> getLookupsList()
437433
* Returns a list of lookups from the coordinator if the coordinator is available. If it's not available, returns null.
438434
*
439435
* @param tier lookup tier name
440-
*
441436
* @return list of LookupBean objects, or null
442437
*/
443438
@Nullable
@@ -476,37 +471,24 @@ private List<LookupBean> getLookupListFromCoordinator(String tier)
476471

477472
@Nullable
478473
private Map<String, LookupExtractorFactoryContainer> tryGetLookupListFromCoordinator(String tier)
479-
throws IOException, InterruptedException
480474
{
481-
final StringFullResponseHolder response = fetchLookupsForTier(tier);
482-
if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
483-
LOG.warn("No lookups found for tier [%s], response [%s]", tier, response);
484-
return null;
485-
} else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
486-
throw new IOE(
487-
"Error while fetching lookup code from Coordinator with status[%s] and content[%s]",
488-
response.getStatus(),
489-
response.getContent()
490-
);
475+
try {
476+
return FutureUtils.getUnchecked(coordinatorClient.fetchLookupsForTier(tier), true);
491477
}
492-
493-
// Older version of getSpecificTier returns a list of lookup names.
494-
// Lookup loading is performed via snapshot if older version is present.
495-
// This check is only for backward compatibility and should be removed in a future release
496-
if (response.getContent().startsWith("[")) {
497-
LOG.info(
498-
"Failed to retrieve lookup information from coordinator, " +
499-
"because coordinator appears to be running on older Druid version. " +
500-
"Attempting to load lookups using snapshot instead"
501-
);
502-
return null;
503-
} else {
504-
Map<String, Object> lookupNameToGenericConfig =
505-
jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_GENERIC_REFERENCE);
506-
return LookupUtils.tryConvertObjectMapToLookupConfigMap(
507-
lookupNameToGenericConfig,
508-
jsonMapper
509-
);
478+
catch (Exception e) {
479+
Throwable rootCause = Throwables.getRootCause(e);
480+
if (rootCause instanceof HttpResponseException) {
481+
final HttpResponseException httpException = (HttpResponseException) rootCause;
482+
if (httpException.getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
483+
LOG.info(
484+
"No lookups found for tier [%s], status [%s]",
485+
tier,
486+
httpException.getResponse().getStatus()
487+
);
488+
return null;
489+
}
490+
}
491+
throw e;
510492
}
511493
}
512494

@@ -628,15 +610,6 @@ private LookupUpdateState atomicallyUpdateStateRef(Function<LookupUpdateState, L
628610
}
629611
}
630612

631-
private StringFullResponseHolder fetchLookupsForTier(String tier) throws InterruptedException, IOException
632-
{
633-
return druidLeaderClient.go(
634-
druidLeaderClient.makeRequest(
635-
HttpMethod.GET,
636-
StringUtils.format("/druid/coordinator/v1/lookups/config/%s?detailed=true", tier)
637-
)
638-
);
639-
}
640613
private void dropContainer(LookupExtractorFactoryContainer container, String lookupName)
641614
{
642615
if (container != null) {
@@ -651,10 +624,12 @@ private void dropContainer(LookupExtractorFactoryContainer container, String loo
651624
}
652625
}
653626
}
627+
654628
@VisibleForTesting
655629
interface Notice
656630
{
657-
void handle(Map<String, LookupExtractorFactoryContainer> lookupMap, LookupReferencesManager manager) throws Exception;
631+
void handle(Map<String, LookupExtractorFactoryContainer> lookupMap, LookupReferencesManager manager)
632+
throws Exception;
658633
}
659634

660635
private static class LoadNotice implements Notice
@@ -741,6 +716,7 @@ public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap, Looku
741716
}
742717
});
743718
}
719+
744720
@Override
745721
public String toString()
746722
{

0 commit comments

Comments
 (0)