Kastle is a REST interface to Kafka cluster, powered by Brod and Phoenix framework.
See also Kastle.
mix deps.get
mix phx.server
To start with an interactive shell:
iex --sname kastlex -S mix phx.server
To start an interactive shell with all applications paths loaded but not starting any application
iex -S mix shell
By default KastleX will try to connect to kafka at localhost:9092 and to zookeeper on localhost:2181.
Default app port is 8092.
docker-compose -f docker-compose-test.yml up -d
# give it ~10 seconds
# force create __consumer_offsets
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kastlex
# Ctrl+C to terminate consumer process
mix test
KastleX expects to find kafka:9092 and zookeeper:2181 on localhost.
It also needs kastlex topic with a single partition, and
auto.create.topics.enable parameter in Kafka's server.properties set
to false.
- Bring environment up
docker-compose up - Start consumer group
kafkacat -b localhost:9092 -G test-consumer-group test-topic - Start producing to that consumer group
LC_CTYPE=C watch -n 1 "cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1 | kafkacat -P -b localhost:9092 -t test-topic -p 0" - Access Prometheus http://localhost:9090 and verify that metrics were consumed.
POST /api/v1/messages/:topic
POST /api/v1/messages/:topic/:partition
With the first version KastleX will pick a random partition to produce message to.
Use Content-type: application/binary.
Key is supplied as query parameter key.
Value is request body.
Successful response should have:
- HTTP Status 204
- HTTP header "x-kafka-partition" set for which partition the message was produced to
- HTTP header "x-message-offset" set for which offset the message was persisted at
- Empty HTTP body
cURL example (-d implies POST):
curl -s -i -H "Content-Type: application/binary" localhost:8092/api/v1/messages/foo -d bar
GET /api/v1/messages/:topic/:partition GET /api/v1/messages/:topic/:partition/:offset
Body
{
"size": null,
"messages": [
{
"value": "test-data",
"ts_type": "create",
"ts": 1536157879011,
"offset": 45,
"key": null,
"headers": {
"foo": "1",
"baz": "null",
"bar": "s"
}
}
],
"highWmOffset": 46,
"error_code": "no_error"
}
GET /api/v2/messages/:topic/:partition GET /api/v2/messages/:topic/:partition/:offset
Header
x-high-wm-offset: 46
Body
[
{
"value": "test-data",
"ts_type": "create",
"ts": 1536157879011,
"offset": 45,
"key": null,
"headers": {
"foo": "1",
"baz": "null",
"bar": "s"
}
}
]
If offset is not given the last message is fetched by default.
Optional parameters:
max_wait_time: maximum time in ms to wait for the response, default 1000min_bytes: minimum bytes to accumulate in the response, default 1max_bytes: maximum bytes to fetch, default 100 kB
offset can be an exact offset, last, latest, earliest or negative.
Default last.
When negative, KastleX will assume it's a relative offset to the latest.
i.e. -1 is the same as last
With Accept: application/json header the response will include all
of the messages returned from kafka with their metadata
With Accept: application/binary header KastleX will return the value
in http response body. Message offset, headers, ts_type and ts are put to http header.
An example from cURL output:
$ curl -v -H "Accept: application/binary" localhost:8092/api/v2/messages/kastlex/0
< x-request-id: 2l8sgeskov545ica5c000005
< content-type: application/binary; charset=utf-8
< x-high-wm-offset: 654
< x-message-headers: {"foo":"1","baz":"null","bar":"s"}
< x-message-offset: 653
< x-message-ts: 1536228953642
< x-message-ts-type: :create
GET /api/v1/offsets/:topic/:partition
{"offset": 20}
Optional parameters:
at: point of interest,latest,earliest, or a number, defaultlatest
GET /api/v1/consumers
["console-consumer-25992"]
GET /api/v1/consumers/:group_id
{
"protocol": "range",
"partitions": [
{
"topic": "kastlex",
"partition": 0,
"offset": 20,
"metadata_encoding": "text",
"metadata": "",
"high_wm_offset": 20,
"expire_time": 1473714215481,
"commit_time": 1473627815481
}
],
"members": [
{
"subscription": {
"version": 0,
"user_data_encoding": "text",
"user_data": "",
"topics": [
"kastlex"
]
},
"session_timeout": 30000,
"member_id": "consumer-1-ea5aa1bc-6b14-488f-88f1-26edb2261786",
"client_id": "consumer-1",
"client_host": "/127.0.0.1",
"assignment": {
"version": 0,
"user_data_encoding": "text",
"user_data": "",
"topic_partitions": {
"kastlex": [
0
]
}
}
}
],
"leader": "consumer-1-ea5aa1bc-6b14-488f-88f1-26edb2261786",
"group_id": "console-consumer-66960",
"generation_id": 1
}
GET /api/v1/topics
["kastlex"]
GET /api/v1/topics/:topic
{"topic":"kastlex","partitions":[{"replicas":[0],"partition":0,"leader":0,"isr":[0]}],"config":{}}
GET /api/v1/brokers
[{"port":9092,"id":0,"host":"localhost","endpoints":["PLAINTEXT://127.0.0.1:9092"]}]
GET /api/v1/brokers/:broker_id
{"port":9092,"id":0,"host":"localhost","endpoints":["PLAINTEXT://127.0.0.1:9092"]}
(Yes, this one looks a bit silly)
GET /api/v1/urp
GET /api/v1/urp/:topic
Authentication is a courtesy of Guardian.
There are 2 files, permissions.yml and passwd.yml to configure permissions for different actions.
Example permissions.yml:
anonymous:
list_topics: true
show_topic: all
list_brokers: true
show_broker: all
show_offsets: all
fetch: all
list_urps: true
show_urps: all
list_groups: true
show_group: all
user1:
produce:
- kastlex
admin:
reload: true
revoke: true
Anonymous user can do pretty much everything except writing data to kafka.
user can write to topic kastlex.
admin can reload permissions.
all means access to all topics, replace it with a list of specific topics when applicable (see for example user.produce).
Example passwd.yml:
user1:
password_hash: "$2b$12$3iR64t7Sm.cAHtZs5jkxZehdWQ7knmN/NxmK.X7NBUHfiIAxT4T9y"
admin:
password_hash: "$2b$12$gp5pJc/AGclJradJC9DuHe6xJoIe5HOwtAUGe2z7QFeAjvw1eZUKW"
Here we specify password hashes for each user.
user has password user, admin has password admin. Simple.
Generate password:
mix hashpw difficult2guess
To obtain a token users need to login (submit a form with 2 fields, username and password):
curl localhost:8092/login --data "username=user1&password=user1"
{"token":"bcrypt hash"}
Get token directly into a shell variable (requires jq):
JWT=$(curl -s localhost:8092/login --data "username=user1&password=user1" | jq -r .token)
Then you can submit authenticated requests via curl as:
curl -H "Authorization: Bearer $JWT" localhost:8092/admin/reload
curl -H "Authorization: Bearer $JWT" localhost:8092/api/v1/messages/kastlex/0 -H "Content-type: application/binary" -d 1
It is possible to run KastleX in 2 modes: when tokens are issued with relatively short timespan and administrator does not have any control over them, and when tokens are long-lived, but persisted and can be revoked on demand.
KastleX is using a compacted topic (cleanup_policy=compact) with a single partition in Kafka as a token storage. In order to enable token storage, add the following hook config for Guardian application:
hooks: Kastlex.TokenStorage
And configuration for TokenStorage:
config :kastlex, Kastlex.TokenStorage,
topic: "_kastlex_tokens"
Alternatively set the following environment variable:
KASTLEX_ENABLE_TOKEN_STORAGE=1
These 2 can be used to alter default storage topic name and default ttl:
KASTLEX_TOKEN_STORAGE_TOPIC=_kastlex_tokens
KASTLEX_TOKEN_TTL_SECONDS=315360000
Administrator can use the following API endpoint to revoke a token:
DELETE /admin/tokens/:username
Correspinding permission item in permissions.yml file is 'revoke'.
openssl ecparam -name secp521r1 -genkey -noout -out jwk.pem
printf "%s" $(openssl rand -base64 32 | tr -d =) > secret_base.key
KASTLEX_SECRET_KEY_BASE=/path/to/secret_base.key
KASTLEX_JWK_FILE=/path/to/jwk.pem
KASTLEX_PERMISSIONS_FILE_PATH=/path/to/permissions.yml
KASTLEX_PASSWD_FILE_PATH=/path/to/passwd.yml
KASTLEX_KAFKA_CLUSTER=kafka-host1:9092,kafka-host2:9092
KASTLEX_HTTP_PORT=8092
Variables are given with their default values except for KASTLEX_USE_HTTPS which is disabled by default.
So if you just set KASTLEX_USE_HTTPS=true, Kastlex will be accepting TLS connection on 8093 and use certificates in /etc/kastlex/ssl.
KASTLEX_USE_HTTPS=true
KASTLEX_HTTPS_PORT=8093
KASTLEX_CERTFILE=/etc/kastlex/ssl/server.crt
KASTLEX_KEYFILE=/etc/kastlex/ssl/server.key
KASTLEX_CACERTFILE=/etc/kastlex/ssl/ca-cert.crt
KASTLEX_KAFKA_USE_SSL=false
KASTLEX_KAFKA_CACERTFILE=/path/to/cacertfile
KASTLEX_KAFKA_CERTFILE=/path/to/certfile
KASTLEX_KAFKA_KEYFILE=/path/to/keyfile
KASTLEX_KAFKA_SASL_FILE=/path/to/file/with/sasl/credentials
KASTLEX_PRODUCER_REQUIRED_ACKS=
KASTLEX_PRODUCER_ACK_TIMEOUT=
KASTLEX_PRODUCER_PARTITION_BUFFER_LIMIT=
KASTLEX_PRODUCER_PARTITION_ONWIRE_LIMIT=
KASTLEX_PRODUCER_MAX_BATCH_SIZE=
KASTLEX_PRODUCER_MAX_RETRIES=
KASTLEX_PRODUCER_RETRY_BACKOFF_MS=
KASTLEX_PRODUCER_MAX_LINGER_MS=
KASTLEX_PRODUCER_MAX_LINGER_COUNT=
File with sasl credentials is a plain text yml like file having username, password and mechanism,
where mechanism is optional which supports plain (default), scram_sha_256 or scram_sha_512.
For example:
username: user
password: s3cr3t
mechanism: scram_sha_512
If the variable is set, and file exists, KastleX will use SASL authentication when connecting to Kafka.
MIX_ENV=prod mix compile
MIX_ENV=prod mix release
rel/kastlex/bin/kastlex console