Skip to content

kawamuray/kmql

Repository files navigation

kmql

Kafka Management with sQL

kmql is a command that allows you to query Apache Kafka cluster's metadata using SQL.

The problem

Kafka provides a bunch of commands to inspect a cluster's state such as kafka-topics.sh, kafka-configs.sh, kafka-acls.sh, and more. These scripts is a sufficient option when a human operator inspects relatively simpler information. However when we want to 1. parse result output by a program or 2. query relatively complex information as some are shown in Examples, it becomes a mess of scripts that deals with non-machine friendly output formats and involving to execute multiple commands aggregating their results. Alternatively, you may use AdminClient but that always involves writing a Java program and compile it which is not swift enough to operate clusters reliably and efficiently.

Install

  1. Download and unzip latest binary from Releases
  2. Copy kmql-$VERSION to the place where you like

Usage

Starting interactive console:

kmql --bootstrap-servers="YOUR CLUSTER's bootstrap.servers"
query> SELECT * FROM replicas LIMIT 3
╔═══════════╤═══════════╤═══════════╤═══════════╤═════════════════════╤════════════╗
║ TOPIC     │ PARTITION │ BROKER_ID │ IS_LEADER │ IS_PREFERRED_LEADER │ IS_IN_SYNC ║
╠═══════════╪═══════════╪═══════════╪═══════════╪═════════════════════╪════════════╣
║ topic-xyz │ 0         │ 1         │ truetruetrue       ║
╟───────────┼───────────┼───────────┼───────────┼─────────────────────┼────────────╢
║ topic-xyz │ 0         │ 3         │ falsefalsetrue       ║
╟───────────┼───────────┼───────────┼───────────┼─────────────────────┼────────────╢
║ topic-xyz │ 0         │ 2         │ falsefalsetrue       ║
╚═══════════╧═══════════╧═══════════╧═══════════╧═════════════════════╧════════════╝

Execute single query and print output:

kmql --bootstrap-servers="YOUR CLUSTER's bootstrap.servers" -e "SELECT * FROM replicas LIMIT 3"
(same as the above)

Execute single query and get output as JSON:

kmql --bootstrap-servers="YOUR CLUSTER's bootstrap.servers" -e "SELECT * FROM replicas LIMIT 3" --format=json | jq .
[
  {
    "TOPIC": "topic-xyz",
    "PARTITION": 0,
    "BROKER_ID": 1,
    "IS_LEADER": true,
    "IS_PREFERRED_LEADER": true,
    "IS_IN_SYNC": true
  },
...
]

Execute single query and get output as SSV (space-separated values):

kmql --bootstrap-servers="YOUR CLUSTER's bootstrap.servers" -e "SELECT * FROM replicas LIMIT 3" --format=ssv
# TOPIC PARTITION BROKER_ID IS_LEADER IS_PREFERRED_LEADER IS_IN_SYNC
topic-xyz 0 1 true true true
topic-xyz 0 3 false false true
topic-xyz 0 2 false false true

To see all available tables and their schema:

kmql --bootstrap-servers="YOUR CLUSTER's bootstrap.servers" --init-all
query> SHOW TABLES;
query> SHOW COLUMNS FROM table_name;

Supported Tables

  • replicas - all replicas, topics, partitions, assigned broker, ISR status, and etc.
  • brokers - all brokers in the cluster, including hostname, listening port, rack, and controllership.
  • logdirs - "logdirs" that every broker has 1 or more and store topic data, including per-topic, per-partition, filesystem path, size, and etc.
  • configs - static/dynamic configurations that applies for brokers and topics with its name, value and configuraiton source. (e.g, min.insync.replicas, retention.ms)
  • consumers - all consumer groups, including their coordinator broker, group state, host and topic/partitions assignment.

Query Examples

# Dump replica info for the specific topic
SELECT * FROM replicas WHERE topic = 'topic-name'

# Topics which has more than 100 partitions, sorted in descending order by number of partitions
SELECT topic, COUNT(DISTINCT partition) AS partitions FROM replicas GROUP BY topic HAVING partitions > 100 ORDER BY partitions DESC

# Topic/partitions which its leader is assigned to broker 1
SELECT topic, partition FROM replicas WHERE broker_id = 1 AND is_leader

# Topic partitions that has an out-of-sync replica
SELECT DISTINCT topic, partition FROM replicas WHERE NOT is_in_sync

# Replicas that are on non-preferred leader broker
SELECT * FROM replicas WHERE is_leader AND NOT is_preferred_leader

# Topics that are configured to enable message down conversion
SELECT name FROM configs WHERE resource_type = 'topic' AND key = 'message.downconversion.enable' AND value = 'true'

# List broker hostnames that are consuming their log directories over 10TB
SELECT host FROM brokers RIGHT JOIN logdirs ON id = broker_id GROUP BY (id) HAVING SUM(size) > 10000000000000

# Partitions that has ISRs below the min.insync.replicas
SELECT topic, partition, isr, value AS minISR
FROM configs
RIGHT JOIN (
  SELECT topic, partition, COUNT(*) as isr
  FROM replicas
  WHERE is_in_sync
  GROUP BY (topic, partition)
) ON name = topic
WHERE key = 'min.insync.replicas' AND isr < CAST(value AS INT)

How it works

It obtains information from Kafka cluster using AdminClient and feeds it into H2 in-memory database. This stupid approach works very well for:

  • providing fully SQL compliant query support
  • caching obtained metadata for arbitrary duration

License

Apache License Version 2.0. See LICENSE for more detail.

About

Query Kafka cluster's metadata using SQL

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •