Skip to content

Commit 571c198

Browse files
feat(topic): increase topic partition (#1601)
close #294
1 parent d756878 commit 571c198

File tree

11 files changed

+147
-0
lines changed

11 files changed

+147
-0
lines changed

client/src/components/Root/Root.jsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class Root extends Component {
4242
buildConfig() {
4343
let config = new Map();
4444
config.cancelToken = this.cancel.token;
45+
config.validateStatus = () => true;
4546

4647
if (localStorage.getItem('jwtToken')) {
4748
config.headers = {};

client/src/containers/Topic/Topic/Topic.jsx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,17 @@ class Topic extends Root {
363363
</Link>
364364
)}
365365

366+
{selectedTab === 'partitions' &&
367+
roles.TOPIC_DATA &&
368+
roles.TOPIC_DATA.includes('CREATE') && (
369+
<Link
370+
to={`/ui/${clusterId}/topic/${topicId}/increasepartition`}
371+
className="btn btn-secondary mr-2"
372+
>
373+
<i className="fa fa-plus" aria-hidden={true} /> Increase Partition
374+
</Link>
375+
)}
376+
366377
{roles.TOPIC_DATA && roles.TOPIC_DATA.includes('CREATE') && (
367378
<Link to={`/ui/${clusterId}/topic/${topicId}/produce`} className="btn btn-primary">
368379
<i className="fa fa-plus" aria-hidden={true} /> Produce to topic
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import React from 'react';
2+
import Joi from 'joi-browser';
3+
import { withRouter } from 'react-router-dom';
4+
import Form from '../../../../components/Form/Form';
5+
import Header from '../../../Header';
6+
import { uriTopicIncreasePartition } from '../../../../utils/endpoints';
7+
import { toast } from 'react-toastify';
8+
9+
class TopicIncreasePartition extends Form {
10+
state = {
11+
formData: {
12+
partition: 1
13+
},
14+
selectedCluster: this.props.match.params.clusterId,
15+
selectedTopic: this.props.match.params.topicId,
16+
errors: {}
17+
};
18+
19+
componentDidMount() {
20+
this.getTopicsPartitions();
21+
}
22+
23+
async getTopicsPartitions() {
24+
const { selectedCluster, selectedTopic } = this.state;
25+
26+
let partitions = await this.getApi(uriTopicIncreasePartition(selectedCluster, selectedTopic));
27+
let form = {};
28+
form.partition = partitions.data.length;
29+
this.setState({ formData: form });
30+
}
31+
32+
schema = {
33+
partition: Joi.number().min(1).label('Partition').required()
34+
};
35+
36+
async doSubmit() {
37+
const { formData, selectedCluster, selectedTopic } = this.state;
38+
const partitionData = {
39+
partition: formData.partition
40+
};
41+
42+
this.postApi(uriTopicIncreasePartition(selectedCluster, selectedTopic), partitionData)
43+
.then(() => {
44+
this.props.history.push({
45+
pathname: `/ui/${selectedCluster}/topic`
46+
});
47+
toast.success('Topic partition updated');
48+
})
49+
.catch(error => toast.error(error.data.message));
50+
}
51+
render() {
52+
return (
53+
<div>
54+
<form
55+
encType="multipart/form-data"
56+
className="khq-form khq-form-config"
57+
onSubmit={() => this.doSubmit()}
58+
>
59+
<Header title="Increase topic partition" history={this.props.history} />
60+
{this.renderInput('partition', 'Partition', 'Partition', 'number')}
61+
{this.renderButton(
62+
'Update',
63+
() => {
64+
this.doSubmit();
65+
},
66+
undefined,
67+
'button'
68+
)}
69+
</form>
70+
</div>
71+
);
72+
}
73+
}
74+
75+
export default withRouter(TopicIncreasePartition);

client/src/utils/Routes.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import ConnectCreate from '../containers/Connect/ConnectCreate/ConnectCreate';
1313
import Connect from '../containers/Connect/ConnectDetail/Connect';
1414
import TopicCreate from '../containers/Topic/TopicCreate/TopicCreate';
1515
import TopicProduce from '../containers/Topic/TopicProduce';
16+
import TopicIncreaseParition from '../containers/Topic/Topic/TopicPartitions/TopicIncreaseParition';
1617
import TopicCopy from '../containers/Topic/TopicCopy';
1718
import Loading from '../containers/Loading';
1819
import ConsumerGroupList from '../containers/ConsumerGroup/ConsumerGroupList';
@@ -166,6 +167,14 @@ class Routes extends Root {
166167
/>
167168
)}
168169

170+
{roles && roles.TOPIC && roles.TOPIC_DATA.includes('CREATE') && (
171+
<Route
172+
exact
173+
path="/ui/:clusterId/topic/:topicId/increasepartition"
174+
component={TopicIncreaseParition}
175+
/>
176+
)}
177+
169178
{roles && roles.TOPIC && roles.TOPIC_DATA.includes('CREATE') && (
170179
<Route exact path="/ui/:clusterId/topic/:topicId/copy" component={TopicCopy} />
171180
)}

client/src/utils/api.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ export const post = (url, body, config) =>
7979
axios
8080
.post(url, body, { ...configs, ...config })
8181
.then(res => {
82+
if (res.status >= 400) {
83+
reject(res);
84+
}
8285
resolve(res);
8386
})
8487
.catch(err => {

client/src/utils/endpoints.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ export const uriTopicsInfo = (clusterId, topicId) => `${apiUrl}/${clusterId}/top
5252

5353
export const uriTopicsCreate = clusterId => `${apiUrl}/${clusterId}/topic`;
5454

55+
export const uriTopicIncreasePartition = (clusterId, topicId) =>
56+
`${apiUrl}/${clusterId}/topic/${topicId}/partitions`;
57+
5558
export const uriTopicsProduce = (clusterId, topicName) =>
5659
`${apiUrl}/${clusterId}/topic/${topicName}/data`;
5760

@@ -380,5 +383,6 @@ export default {
380383
uriLiveTail,
381384
uriTopicDataSearch,
382385
uriTopicDataDelete,
386+
uriTopicIncreasePartition,
383387
uriDeleteGroupOffsets
384388
};

src/main/java/org/akhq/controllers/TopicController.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,16 @@ public List<Config> updateConfig(String cluster, String topicName, Map<String, S
322322
return updated;
323323
}
324324

325+
@AKHQSecured(resource = Role.Resource.TOPIC, action = Role.Action.UPDATE)
326+
@Post(value = "api/{cluster}/topic/{topicName}/partitions")
327+
@Operation(tags = {"topic"}, summary = "Increase partition for a topic")
328+
public HttpResponse<?> increasePartition(String cluster, String topicName, Map<String, Integer> config) throws ExecutionException, InterruptedException {
329+
checkIfClusterAndResourceAllowed(cluster, topicName);
330+
this.topicRepository.increasePartition(cluster, topicName, config.get("partition"));
331+
332+
return HttpResponse.accepted();
333+
}
334+
325335
@AKHQSecured(resource = Role.Resource.TOPIC_DATA, action = Role.Action.DELETE)
326336
@Delete("api/{cluster}/topic/{topicName}/data/empty")
327337
@Operation(tags = {"topic data"}, summary = "Empty data from a topic")

src/main/java/org/akhq/modules/AbstractKafkaWrapper.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,18 @@ public void createTopics(String clusterId, String name, int partitions, short re
107107
listTopics = new HashMap<>();
108108
}
109109

110+
public void alterTopicPartition(String clusterId, String name, int partitions) throws ExecutionException {
111+
Map<String, NewPartitions> newPartitionMap = new HashMap<>();
112+
newPartitionMap.put(name, NewPartitions.increaseTo(partitions));
113+
114+
Logger.call(kafkaModule
115+
.getAdminClient(clusterId)
116+
.createPartitions(newPartitionMap).all(),
117+
"Increase Topic partition",
118+
Collections.singletonList(name)
119+
);
120+
}
121+
110122
public void deleteTopics(String clusterId, String name) throws ExecutionException {
111123
Logger.call(kafkaModule.getAdminClient(clusterId)
112124
.deleteTopics(Collections.singleton(name))

src/main/java/org/akhq/repositories/TopicRepository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ public void delete(String clusterId, String name) throws ExecutionException, Int
126126
kafkaWrapper.deleteTopics(clusterId, name);
127127
}
128128

129+
public void increasePartition(String clusterId, String name, int partitions) throws ExecutionException, InterruptedException {
130+
kafkaWrapper.alterTopicPartition(clusterId, name, partitions);
131+
}
132+
129133
@Retryable(
130134
includes = {
131135
UnknownTopicOrPartitionException.class

src/test/java/org/akhq/controllers/TopicControllerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,13 @@ void produceMultipleMessages() {
259259
assertTrue(response.get(2).getValue().contains("key3_{\"test_1\":3}"));
260260
}
261261

262+
@Test
263+
@Order(7)
264+
void increasePartitionApi() {
265+
this.exchange(HttpRequest.POST(CREATE_TOPIC_URL + "/partitions",
266+
ImmutableMap.of("partition", 4)));
267+
}
268+
262269
@Test
263270
@Order(8)
264271
void delete() {

0 commit comments

Comments
 (0)