Skip to content

Commit 021a03a

Browse files
authored
[LI-FIXUP] Populate the error fields of the LiCombinedControlResponse properly (linkedin#421)
TICKET = N/A LI_DESCRIPTION = This reverts commit a2ac1c2 (linkedin#408) The original commit is incorrect and is a backward incompatible schema change on the LiCombinedControlResponse.json. This PR reverts the original schema change, and addresses the original problem properly: 1. when the LiCombinedControl request version is below 1, the response populates the LeaderAndIsrPartitionErrors field with the error code. When the version is at or greather than 1, it populates the LeaderAndIsrTopics field. 2. When the LiCombinedControl request version is below 1, the StopReplicaPartitionErrors field of the LiCombinedControlResponse should be populated according to the StopReplicaPartitionStates of the LiCombinedControlRequest. When the version is at or greather than 1, the StopReplicaPartitionErrors field should be populated according to the StopReplicaTopicStates of the LiCombinedControlRequest. EXIT_CRITERIA = The same as the LiCombinedControlRequest feature.
1 parent a95d648 commit 021a03a

File tree

3 files changed

+90
-13
lines changed

3 files changed

+90
-13
lines changed

clients/src/main/java/org/apache/kafka/common/requests/LiCombinedControlRequest.java

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -254,26 +254,52 @@ public LiCombinedControlResponse getErrorResponse(int throttleTimeMs, Throwable
254254
// below we populate the error code to all the error fields and the partition error fields
255255
// 1. populate LeaderAndIsr error code
256256
responseData.setLeaderAndIsrErrorCode(error.code());
257-
List<LiCombinedControlResponseData.LeaderAndIsrPartitionError> leaderAndIsrPartitionErrors = new ArrayList<>();
258-
for (LiCombinedControlRequestData.LeaderAndIsrPartitionState partition : leaderAndIsrPartitionStates()) {
259-
leaderAndIsrPartitionErrors.add(new LiCombinedControlResponseData.LeaderAndIsrPartitionError()
260-
.setTopicName(partition.topicName())
261-
.setPartitionIndex(partition.partitionIndex())
262-
.setErrorCode(error.code()));
257+
if (version() < 1) {
258+
List<LiCombinedControlResponseData.LeaderAndIsrPartitionError> leaderAndIsrPartitionErrors = new ArrayList<>();
259+
for (LiCombinedControlRequestData.LeaderAndIsrPartitionState partition : leaderAndIsrPartitionStates()) {
260+
leaderAndIsrPartitionErrors.add(new LiCombinedControlResponseData.LeaderAndIsrPartitionError()
261+
.setTopicName(partition.topicName())
262+
.setPartitionIndex(partition.partitionIndex())
263+
.setErrorCode(error.code()));
264+
}
265+
responseData.setLeaderAndIsrPartitionErrors(leaderAndIsrPartitionErrors);
266+
} else {
267+
for (LiCombinedControlRequestData.LeaderAndIsrTopicState topicState : data.leaderAndIsrTopicStates()) {
268+
List<LiCombinedControlResponseData.LeaderAndIsrPartitionError> partitions = new ArrayList<>(topicState.partitionStates().size());
269+
for (LiCombinedControlRequestData.LeaderAndIsrPartitionState partition: topicState.partitionStates()) {
270+
partitions.add(new LiCombinedControlResponseData.LeaderAndIsrPartitionError()
271+
.setPartitionIndex(partition.partitionIndex())
272+
.setErrorCode(error.code()));
273+
}
274+
275+
responseData.leaderAndIsrTopics().add(new LiCombinedControlResponseData.LeaderAndIsrTopicError()
276+
.setTopicId(topicState.topicId())
277+
.setPartitionErrors(partitions));
278+
}
263279
}
264-
responseData.setLeaderAndIsrPartitionErrors(leaderAndIsrPartitionErrors);
265280

266281
// 2. populate the UpdateMetadata error code
267282
responseData.setUpdateMetadataErrorCode(error.code());
268283

269284
// 3. populate the StopReplica error code
270285
responseData.setStopReplicaErrorCode(error.code());
271286
List<LiCombinedControlResponseData.StopReplicaPartitionError> stopReplicaPartitions = new ArrayList<>();
272-
for (LiCombinedControlRequestData.StopReplicaPartitionState tp : stopReplicaPartitions()) {
273-
stopReplicaPartitions.add(new LiCombinedControlResponseData.StopReplicaPartitionError()
274-
.setTopicName(tp.topicName())
275-
.setPartitionIndex(tp.partitionIndex())
276-
.setErrorCode(error.code()));
287+
if (version() < 1) {
288+
for (LiCombinedControlRequestData.StopReplicaPartitionState tp : stopReplicaPartitions()) {
289+
stopReplicaPartitions.add(new LiCombinedControlResponseData.StopReplicaPartitionError()
290+
.setTopicName(tp.topicName())
291+
.setPartitionIndex(tp.partitionIndex())
292+
.setErrorCode(error.code()));
293+
}
294+
} else {
295+
for (LiCombinedControlRequestData.StopReplicaTopicState topicState : stopReplicaTopicStates()) {
296+
for (LiCombinedControlRequestData.StopReplicaPartitionState partition : topicState.partitionStates()) {
297+
stopReplicaPartitions.add(new LiCombinedControlResponseData.StopReplicaPartitionError()
298+
.setTopicName(topicState.topicName())
299+
.setPartitionIndex(partition.partitionIndex())
300+
.setErrorCode(error.code()));
301+
}
302+
}
277303
}
278304
responseData.setStopReplicaPartitionErrors(stopReplicaPartitions);
279305

clients/src/main/resources/common/message/LiCombinedControlResponse.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
// fields from the LeaderAndIsr response
3030
{ "name": "LeaderAndIsrErrorCode", "type": "int16", "versions": "0+",
3131
"about": "The error code, or 0 if there was no error." },
32-
{ "name": "LeaderAndIsrPartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
32+
{ "name": "LeaderAndIsrPartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0",
3333
"about": "Each partition."},
3434
{ "name": "LeaderAndIsrTopics", "type": "[]LeaderAndIsrTopicError", "versions": "1+",
3535
"about": "Each topic", "fields": [

core/src/test/scala/integration/kafka/api/LiCombinedControlRequestTest.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@ import kafka.metrics.KafkaYammerMetrics
2323
import kafka.server.{KafkaConfig, KafkaServer}
2424
import kafka.utils.{Logging, TestUtils}
2525
import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig}
26+
import org.apache.kafka.common.errors.StaleBrokerEpochException
27+
import org.apache.kafka.common.{Node, Uuid}
28+
import org.apache.kafka.common.message.LiCombinedControlRequestData
2629
import org.apache.kafka.common.network.ListenerName
30+
import org.apache.kafka.common.protocol.Errors
31+
import org.apache.kafka.common.requests.LiCombinedControlRequest
2732
import org.apache.kafka.common.security.auth.SecurityProtocol
2833
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
2934
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
3035

3136
import java.util.Properties
37+
import java.util
3238
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
3339

3440
/**
@@ -84,6 +90,51 @@ class LiCombinedControlRequestTest extends KafkaServerTestHarness with Logging
8490
assertEquals(combinedRequestsSent2, combinedRequestsSent3)
8591
}
8692

93+
@Test
94+
def testLiCombinedControlResponseV1(): Unit = {
95+
val topic1Uuid = Uuid.randomUuid()
96+
val leaderAndIsrPartitionStates = new util.ArrayList[LiCombinedControlRequestData.LeaderAndIsrPartitionState]()
97+
leaderAndIsrPartitionStates.add(new LiCombinedControlRequestData.LeaderAndIsrPartitionState().setTopicName("topic1")
98+
.setPartitionIndex(1))
99+
100+
val topic2Uuid = Uuid.randomUuid()
101+
val updateMetadataPartitionStates = new util.ArrayList[LiCombinedControlRequestData.UpdateMetadataPartitionState]()
102+
updateMetadataPartitionStates.add(new LiCombinedControlRequestData.UpdateMetadataPartitionState().setTopicName("topic2")
103+
.setPartitionIndex(2))
104+
105+
val topic3Uuid = Uuid.randomUuid()
106+
val stopReplicaPartitionStates = new util.ArrayList[LiCombinedControlRequestData.StopReplicaPartitionState]()
107+
stopReplicaPartitionStates.add(new LiCombinedControlRequestData.StopReplicaPartitionState().setTopicName("topic3")
108+
.setPartitionIndex(3))
109+
val topicIds = new util.HashMap[String, Uuid]()
110+
topicIds.put("topic1", topic1Uuid)
111+
topicIds.put("topic2", topic2Uuid)
112+
topicIds.put("topic3", topic3Uuid)
113+
114+
val liCombinedControlRequest = new LiCombinedControlRequest.Builder(1, 0, 0, leaderAndIsrPartitionStates,
115+
new util.ArrayList[Node](), updateMetadataPartitionStates, new util.ArrayList[LiCombinedControlRequestData.UpdateMetadataBroker](),
116+
stopReplicaPartitionStates, topicIds).build()
117+
118+
val errorResponse = liCombinedControlRequest.getErrorResponse(0, new StaleBrokerEpochException("stale broker"))
119+
120+
val expectedError = Errors.STALE_BROKER_EPOCH.code
121+
// the per partition error should only be used for version 0, which means it should be empty for in the v1 case
122+
assertTrue(errorResponse.leaderAndIsrPartitionErrors().isEmpty)
123+
// check the topic level error is set
124+
assertEquals(expectedError, errorResponse.leaderAndIsrErrorCode())
125+
assertEquals(1, errorResponse.leaderAndIsrTopicErrors().size())
126+
assertEquals(1, errorResponse.leaderAndIsrTopicErrors().find(topic1Uuid).partitionErrors().get(0).partitionIndex())
127+
128+
assertEquals(expectedError, errorResponse.updateMetadataErrorCode())
129+
130+
assertEquals(expectedError, errorResponse.stopReplicaErrorCode())
131+
assertEquals(1, errorResponse.stopReplicaPartitionErrors().size())
132+
val stopReplicaPartitionError = errorResponse.stopReplicaPartitionErrors().get(0)
133+
assertEquals(expectedError, stopReplicaPartitionError.errorCode())
134+
assertEquals("topic3", stopReplicaPartitionError.topicName())
135+
assertEquals(3, stopReplicaPartitionError.partitionIndex())
136+
}
137+
87138
private def createAdminClient(): Admin = {
88139
val config = new Properties()
89140
val bootstrapServers = TestUtils.bootstrapServers(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))

0 commit comments

Comments
 (0)