2020package org .apache .druid .query .lookup ;
2121
2222
23- import com .fasterxml .jackson .core .type .TypeReference ;
2423import com .fasterxml .jackson .databind .ObjectMapper ;
2524import com .google .common .annotations .VisibleForTesting ;
2625import com .google .common .base .Preconditions ;
2726import com .google .common .base .Strings ;
27+ import com .google .common .base .Throwables ;
2828import com .google .common .collect .ImmutableList ;
2929import com .google .common .collect .ImmutableMap ;
3030import com .google .inject .Inject ;
3131import org .apache .commons .lang3 .mutable .MutableBoolean ;
32- import org .apache .druid .client .coordinator .Coordinator ;
32+ import org .apache .druid .client .coordinator .CoordinatorClient ;
33+ import org .apache .druid .common .guava .FutureUtils ;
3334import org .apache .druid .concurrent .LifecycleLock ;
34- import org .apache .druid .discovery .DruidLeaderClient ;
3535import org .apache .druid .guice .ManageLifecycle ;
3636import org .apache .druid .guice .annotations .Json ;
3737import org .apache .druid .java .util .common .FileUtils ;
38- import org .apache .druid .java .util .common .IOE ;
3938import org .apache .druid .java .util .common .ISE ;
4039import org .apache .druid .java .util .common .RE ;
4140import org .apache .druid .java .util .common .RetryUtils ;
42- import org .apache .druid .java .util .common .StringUtils ;
4341import org .apache .druid .java .util .common .concurrent .Execs ;
4442import org .apache .druid .java .util .common .lifecycle .LifecycleStart ;
4543import org .apache .druid .java .util .common .lifecycle .LifecycleStop ;
4644import org .apache .druid .java .util .emitter .EmittingLogger ;
47- import org .apache .druid .java . util . http . client . response . StringFullResponseHolder ;
45+ import org .apache .druid .rpc . HttpResponseException ;
4846import org .apache .druid .server .lookup .cache .LookupLoadingSpec ;
4947import org .apache .druid .server .metrics .DataSourceTaskIdHolder ;
50- import org .jboss .netty .handler .codec .http .HttpMethod ;
5148import org .jboss .netty .handler .codec .http .HttpResponseStatus ;
5249
5350import javax .annotation .Nullable ;
7875 * This class provide a basic {@link LookupExtractorFactory} references manager. It allows basic operations fetching,
7976 * listing, adding and deleting of {@link LookupExtractor} objects, and can take periodic snap shot of the loaded lookup
8077 * extractor specifications in order to bootstrap nodes after restart.
81- *
78+ * <p>
8279 * It also implements {@link LookupExtractorFactoryContainerProvider}, to supply queries and indexing transformations
8380 * with a reference to a {@link LookupExtractorFactoryContainer}. This class is a companion of
8481 * {@link org.apache.druid.server.lookup.cache.LookupCoordinatorManager}, which communicates with
@@ -89,9 +86,6 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
8986{
9087 private static final EmittingLogger LOG = new EmittingLogger (LookupReferencesManager .class );
9188
92- private static final TypeReference <Map <String , Object >> LOOKUPS_ALL_GENERIC_REFERENCE =
93- new TypeReference <>() {};
94-
9589 // Lookups state (loaded/to-be-loaded/to-be-dropped etc) is managed by immutable LookupUpdateState instance.
9690 // Any update to state is done by creating updated LookupUpdateState instance and atomically setting that
9791 // into the ref here.
@@ -111,9 +105,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
111105 //for unit testing only
112106 private final boolean testMode ;
113107
114- private final DruidLeaderClient druidLeaderClient ;
115-
116- private final ObjectMapper jsonMapper ;
108+ private final CoordinatorClient coordinatorClient ;
117109
118110 private final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig ;
119111
@@ -125,18 +117,18 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
125117 public LookupReferencesManager (
126118 LookupConfig lookupConfig ,
127119 @ Json ObjectMapper objectMapper ,
128- @ Coordinator DruidLeaderClient druidLeaderClient ,
120+ CoordinatorClient coordinatorClient ,
129121 LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig
130122 )
131123 {
132- this (lookupConfig , objectMapper , druidLeaderClient , lookupListeningAnnouncerConfig , false );
124+ this (lookupConfig , objectMapper , coordinatorClient , lookupListeningAnnouncerConfig , false );
133125 }
134126
135127 @ VisibleForTesting
136128 LookupReferencesManager (
137129 LookupConfig lookupConfig ,
138130 ObjectMapper objectMapper ,
139- DruidLeaderClient druidLeaderClient ,
131+ CoordinatorClient coordinatorClient ,
140132 LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig ,
141133 boolean testMode
142134 )
@@ -146,8 +138,7 @@ public LookupReferencesManager(
146138 } else {
147139 this .lookupSnapshotTaker = new LookupSnapshotTaker (objectMapper , lookupConfig .getSnapshotWorkingDir ());
148140 }
149- this .druidLeaderClient = druidLeaderClient ;
150- this .jsonMapper = objectMapper ;
141+ this .coordinatorClient = coordinatorClient ;
151142 this .lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig ;
152143 this .lookupConfig = lookupConfig ;
153144 this .testMode = testMode ;
@@ -286,7 +277,11 @@ public void add(String lookupName, LookupExtractorFactoryContainer lookupExtract
286277 if (lookupLoadingSpec .getMode () == LookupLoadingSpec .Mode .NONE ||
287278 (lookupLoadingSpec .getMode () == LookupLoadingSpec .Mode .ONLY_REQUIRED
288279 && !lookupLoadingSpec .getLookupsToLoad ().contains (lookupName ))) {
289- LOG .info ("Skipping notice to add lookup [%s] since current lookup loading mode [%s] does not allow it." , lookupName , lookupLoadingSpec .getMode ());
280+ LOG .info (
281+ "Skipping notice to add lookup[%s] since current lookup loading mode[%s] does not allow it." ,
282+ lookupName ,
283+ lookupLoadingSpec .getMode ()
284+ );
290285 return ;
291286 }
292287 addNotice (new LoadNotice (lookupName , lookupExtractorFactoryContainer , lookupConfig .getLookupStartRetries ()));
@@ -401,7 +396,8 @@ private void loadLookupsAndInitStateRef()
401396 lookupBeanList = getLookupsList ();
402397 if (lookupLoadingSpec .getMode () == LookupLoadingSpec .Mode .ONLY_REQUIRED && lookupBeanList != null ) {
403398 lookupBeanList = lookupBeanList .stream ()
404- .filter (lookupBean -> lookupLoadingSpec .getLookupsToLoad ().contains (lookupBean .getName ()))
399+ .filter (lookupBean -> lookupLoadingSpec .getLookupsToLoad ()
400+ .contains (lookupBean .getName ()))
405401 .collect (Collectors .toList ());
406402 }
407403 }
@@ -437,7 +433,6 @@ private List<LookupBean> getLookupsList()
437433 * Returns a list of lookups from the coordinator if the coordinator is available. If it's not available, returns null.
438434 *
439435 * @param tier lookup tier name
440- *
441436 * @return list of LookupBean objects, or null
442437 */
443438 @ Nullable
@@ -476,37 +471,24 @@ private List<LookupBean> getLookupListFromCoordinator(String tier)
476471
477472 @ Nullable
478473 private Map <String , LookupExtractorFactoryContainer > tryGetLookupListFromCoordinator (String tier )
479- throws IOException , InterruptedException
480474 {
481- final StringFullResponseHolder response = fetchLookupsForTier (tier );
482- if (response .getStatus ().equals (HttpResponseStatus .NOT_FOUND )) {
483- LOG .warn ("No lookups found for tier [%s], response [%s]" , tier , response );
484- return null ;
485- } else if (!response .getStatus ().equals (HttpResponseStatus .OK )) {
486- throw new IOE (
487- "Error while fetching lookup code from Coordinator with status[%s] and content[%s]" ,
488- response .getStatus (),
489- response .getContent ()
490- );
475+ try {
476+ return FutureUtils .getUnchecked (coordinatorClient .fetchLookupsForTier (tier ), true );
491477 }
492-
493- // Older version of getSpecificTier returns a list of lookup names.
494- // Lookup loading is performed via snapshot if older version is present.
495- // This check is only for backward compatibility and should be removed in a future release
496- if (response .getContent ().startsWith ("[" )) {
497- LOG .info (
498- "Failed to retrieve lookup information from coordinator, " +
499- "because coordinator appears to be running on older Druid version. " +
500- "Attempting to load lookups using snapshot instead"
501- );
502- return null ;
503- } else {
504- Map <String , Object > lookupNameToGenericConfig =
505- jsonMapper .readValue (response .getContent (), LOOKUPS_ALL_GENERIC_REFERENCE );
506- return LookupUtils .tryConvertObjectMapToLookupConfigMap (
507- lookupNameToGenericConfig ,
508- jsonMapper
509- );
478+ catch (Exception e ) {
479+ Throwable rootCause = Throwables .getRootCause (e );
480+ if (rootCause instanceof HttpResponseException ) {
481+ final HttpResponseException httpException = (HttpResponseException ) rootCause ;
482+ if (httpException .getResponse ().getStatus ().equals (HttpResponseStatus .NOT_FOUND )) {
483+ LOG .info (
484+ "No lookups found for tier [%s], status [%s]" ,
485+ tier ,
486+ httpException .getResponse ().getStatus ()
487+ );
488+ return null ;
489+ }
490+ }
491+ throw e ;
510492 }
511493 }
512494
@@ -628,15 +610,6 @@ private LookupUpdateState atomicallyUpdateStateRef(Function<LookupUpdateState, L
628610 }
629611 }
630612
631- private StringFullResponseHolder fetchLookupsForTier (String tier ) throws InterruptedException , IOException
632- {
633- return druidLeaderClient .go (
634- druidLeaderClient .makeRequest (
635- HttpMethod .GET ,
636- StringUtils .format ("/druid/coordinator/v1/lookups/config/%s?detailed=true" , tier )
637- )
638- );
639- }
640613 private void dropContainer (LookupExtractorFactoryContainer container , String lookupName )
641614 {
642615 if (container != null ) {
@@ -651,10 +624,12 @@ private void dropContainer(LookupExtractorFactoryContainer container, String loo
651624 }
652625 }
653626 }
627+
654628 @ VisibleForTesting
655629 interface Notice
656630 {
657- void handle (Map <String , LookupExtractorFactoryContainer > lookupMap , LookupReferencesManager manager ) throws Exception ;
631+ void handle (Map <String , LookupExtractorFactoryContainer > lookupMap , LookupReferencesManager manager )
632+ throws Exception ;
658633 }
659634
660635 private static class LoadNotice implements Notice
@@ -741,6 +716,7 @@ public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap, Looku
741716 }
742717 });
743718 }
719+
744720 @ Override
745721 public String toString ()
746722 {
0 commit comments