Skip to content

Commit 40ce202

Browse files
authored
Bump kafka client dep to 3.9.1 and mitigate CVE-2025-27817 (apache#18178)
* Bump kafka client dep to 3.9.1 and mitigate CVE-2025-27817 * update tests and dependencies * remove unused throws from method signature * checkstyle fixes
1 parent aff9637 commit 40ce202

File tree

8 files changed

+106
-3
lines changed

8 files changed

+106
-3
lines changed

extensions-core/kafka-extraction-namespace/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,10 @@
142142
<version>${scala.library.version}</version>
143143
<scope>test</scope>
144144
</dependency>
145+
<dependency>
146+
<groupId>org.hamcrest</groupId>
147+
<artifactId>hamcrest-core</artifactId>
148+
<scope>test</scope>
149+
</dependency>
145150
</dependencies>
146151
</project>

extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@
6464
@JsonTypeName("kafka")
6565
public class KafkaLookupExtractorFactory implements LookupExtractorFactory
6666
{
67+
// by default, we reject all URLs for OAuthBearer authentication
68+
// CVE ref: https://www.cve.org/CVERecord?id=CVE-2025-27817
69+
// Upgrade kafka dependencies to 4.x to remove the need for this static block
70+
static {
71+
final String allowedSaslOauthbearerUrlsConfig = "org.apache.kafka.sasl.oauthbearer.allowed.urls";
72+
String allowedUrlsProp = System.getProperty(allowedSaslOauthbearerUrlsConfig);
73+
if (allowedUrlsProp == null) {
74+
System.setProperty(allowedSaslOauthbearerUrlsConfig, "notallowed");
75+
}
76+
}
77+
6778
private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
6879
private final ListeningExecutorService executorService;
6980
private final AtomicLong doubleEventCount = new AtomicLong(0L);

extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@
4040
import org.apache.kafka.clients.producer.KafkaProducer;
4141
import org.apache.kafka.clients.producer.Producer;
4242
import org.apache.kafka.clients.producer.ProducerRecord;
43+
import org.apache.kafka.common.KafkaException;
4344
import org.apache.kafka.common.serialization.ByteArraySerializer;
4445
import org.apache.kafka.common.utils.Time;
46+
import org.hamcrest.CoreMatchers;
47+
import org.hamcrest.MatcherAssert;
4548
import org.junit.After;
4649
import org.junit.Assert;
4750
import org.junit.Before;
@@ -59,6 +62,8 @@
5962
import java.util.Properties;
6063
import java.util.concurrent.ThreadLocalRandom;
6164

65+
import static org.junit.Assert.assertThrows;
66+
6267
/**
6368
*
6469
*/
@@ -221,6 +226,39 @@ private void checkServer() throws Exception
221226
}
222227
}
223228

229+
@Test
230+
public void test_defaultRejectAllUrlsForSaslOauthBearerUrlConsumerProperty()
231+
{
232+
Map<String, String> properties = getConsumerProperties();
233+
properties.put("sasl.mechanism", "OAUTHBEARER");
234+
properties.put("security.protocol", "SASL_SSL");
235+
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
236+
properties.put("sasl.login.callback.handler.class", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler");
237+
238+
properties.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:8080/token");
239+
KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
240+
null,
241+
TOPIC_NAME,
242+
properties
243+
);
244+
MatcherAssert.assertThat(
245+
assertThrows(KafkaException.class, factory::getConsumer),
246+
CoreMatchers.instanceOf(KafkaException.class)
247+
);
248+
249+
properties.remove("sasl.oauthbearer.token.endpoint.url");
250+
properties.put("sasl.oauthbearer.jwks.endpoint.url", "http://localhost:8080/jwks");
251+
factory = new KafkaLookupExtractorFactory(
252+
null,
253+
TOPIC_NAME,
254+
properties
255+
);
256+
MatcherAssert.assertThat(
257+
assertThrows(KafkaException.class, factory::getConsumer),
258+
CoreMatchers.instanceOf(KafkaException.class)
259+
);
260+
}
261+
224262
@Test(timeout = 60_000L)
225263
public void testSimpleLookup() throws Exception
226264
{

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@
6363

6464
public class KafkaRecordSupplier implements RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
6565
{
66+
// by default, we reject all URLs for OAuthBearer authentication
67+
// CVE ref: https://www.cve.org/CVERecord?id=CVE-2025-27817
68+
// Upgrade kafka dependencies to 4.x to remove the need for this static block
69+
static {
70+
final String allowedSaslOauthbearerUrlsConfig = "org.apache.kafka.sasl.oauthbearer.allowed.urls";
71+
String allowedUrlsProp = System.getProperty(allowedSaslOauthbearerUrlsConfig);
72+
if (allowedUrlsProp == null) {
73+
System.setProperty(allowedSaslOauthbearerUrlsConfig, "notallowed");
74+
}
75+
}
76+
6677
private final KafkaConsumer<byte[], byte[]> consumer;
6778
private final KafkaConsumerMonitor monitor;
6879
private boolean closed;

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@
4141
import org.apache.kafka.clients.consumer.KafkaConsumer;
4242
import org.apache.kafka.clients.producer.KafkaProducer;
4343
import org.apache.kafka.clients.producer.ProducerRecord;
44+
import org.apache.kafka.common.KafkaException;
4445
import org.apache.kafka.common.MetricName;
4546
import org.apache.kafka.common.metrics.KafkaMetric;
4647
import org.apache.kafka.common.serialization.Deserializer;
48+
import org.hamcrest.CoreMatchers;
49+
import org.hamcrest.MatcherAssert;
4750
import org.junit.AfterClass;
4851
import org.junit.Assert;
4952
import org.junit.Before;
@@ -61,6 +64,8 @@
6164
import java.util.regex.Pattern;
6265
import java.util.stream.Collectors;
6366

67+
import static org.junit.Assert.assertThrows;
68+
6469
public class KafkaRecordSupplierTest
6570
{
6671

@@ -252,6 +257,38 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException
252257
recordSupplier.close();
253258
}
254259

260+
@Test
261+
public void test_defaultRejectAllUrlsForSaslOauthBearerUrlConsumerProperty() throws ExecutionException, InterruptedException
262+
{
263+
// Insert data
264+
insertData();
265+
266+
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
267+
StreamPartition.of(TOPIC, PARTITION_0),
268+
StreamPartition.of(TOPIC, PARTITION_1)
269+
);
270+
271+
Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
272+
properties.put("sasl.mechanism", "OAUTHBEARER");
273+
properties.put("security.protocol", "SASL_SSL");
274+
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
275+
properties.put("sasl.login.callback.handler.class", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler");
276+
277+
properties.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:8080/token");
278+
279+
MatcherAssert.assertThat(
280+
assertThrows(KafkaException.class, () -> new KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false)),
281+
CoreMatchers.instanceOf(KafkaException.class)
282+
);
283+
284+
properties.remove("sasl.oauthbearer.token.endpoint.url");
285+
properties.put("sasl.oauthbearer.jwks.endpoint.url", "http://localhost:8080/jwks");
286+
MatcherAssert.assertThat(
287+
assertThrows(KafkaException.class, () -> new KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false)),
288+
CoreMatchers.instanceOf(KafkaException.class)
289+
);
290+
}
291+
255292
@Test
256293
public void testMultiTopicSupplierSetup() throws ExecutionException, InterruptedException
257294
{

integration-tests-ex/image/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ Reference: https://dzone.com/articles/build-docker-image-from-maven
204204
<MARIADB_VERSION>${mariadb.version}</MARIADB_VERSION>
205205
<MYSQL_IMAGE_VERSION>${mysql.image.version}</MYSQL_IMAGE_VERSION>
206206
<CONFLUENT_VERSION>${confluent-version}</CONFLUENT_VERSION>
207-
<KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
207+
<!-- bitnami currently does not offer 3.9.1 image -->
208+
<KAFKA_VERSION>4.0.0</KAFKA_VERSION>
208209
<ZK_VERSION>${zookeeper.version}</ZK_VERSION>
209210
<HADOOP_VERSION>${hadoop.compile.version}</HADOOP_VERSION>
210211
<DRUID_VERSION>${project.version}</DRUID_VERSION>

licenses.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3228,7 +3228,7 @@ libraries:
32283228
---
32293229

32303230
name: Apache Kafka
3231-
version: 3.9.0
3231+
version: 3.9.1
32323232
license_category: binary
32333233
module: extensions/druid-kafka-indexing-service
32343234
license_name: Apache License version 2.0

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
<project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
7676
<aether.version>0.9.0.M2</aether.version>
7777
<apache.curator.version>5.8.0</apache.curator.version>
78-
<apache.kafka.version>3.9.0</apache.kafka.version>
78+
<apache.kafka.version>3.9.1</apache.kafka.version>
7979
<!-- when updating apache ranger, verify the usage of aws-bundle-sdk vs aws-logs-sdk
8080
and update as needed in extensions-core/druid-ranger-security/pm.xml -->
8181
<apache.ranger.version>2.4.0</apache.ranger.version>

0 commit comments

Comments
 (0)