News KrakenD EE v2.13: AWS Bedrock, AI Dashboard, Kafka, and Plugin Extensibility

Document updated on Mar 5, 2026

Kafka Driver for the Asynchronous Agent

The Kafka driver for async agents allows KrakenD to consume Kafka queues autonomously. Routines listening to Kafka topics react to new events and push data to your backends.

This driver differs from the Kafka backend consumer. Unlike endpoints, async agents do not require a client request to trigger an action. Instead, the agent subscribes to the topics and fires an action when an event is delivered.

Driver Configuration

Place the Kafka driver inside the extra_config of the async component to subscribe to one or more topics:

{
    "async/kafka": {
        "cluster": {
            "brokers": [
                "localhost:9092"
            ],
            "client_id": "cid_stocksconsumer"
        },
        "group": {
            "group_id": "my_group_id",
            "isolation_level": "read_commited"
        },
        "key_meta": "Message-Id"
    }
}

KrakenD consumes messages following the “latest” offset policy, meaning it processes only messages produced after startup.

The Kafka driver configuration accepts the same fields as the reader field in backend kafka consumer (backend/pubsub/kafka/subscriber), except for the topics field that is defined in the general async agent configuration:

Fields of Kafka Reader
* required fields

cluster * object
Settings to stablish the connection to the kafka cluster
group object
Details about the consumer croup
key_meta string
Name of the header where the kafka message key value is written

The cluster field describes details about the connection to the Kafka cluster. It is the same configuration used for the Kafka PubSub publisher and subscriber:

Fields of Kafka Cluster Connection
* required fields

brokers * array of strings
channel_buffer_size number
The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughput
Defaults to 256
client_id string
A name to give to the client stablishing the connection
Defaults to "KrakenD v[X].[Y].[Z]"
client_tls object
Enables specific TLS connection options when connecting to the Kafke brokers. Supports all options under TLS client settings.
dial_timeout string
Dial timeout for establishing new connections
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "30s"
keep_alive string
Time to maintain an idle connection open
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "0s"
metadata_retry_backoff string
The time between attempts to reconnect when disconnected
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "250ms"
metadata_retry_max number
When a disconnection happens, the client needs to refresh its metadata to know the current state of the kafka cluster (effectively the number of attempts to reconnect)
Defaults to 3
rack_id string
A name to identify the rack we are connecting from
Defaults to ""
read_timeout string
Maximum time allowed to read from broker
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "30s"
sasl object
Enables to authenticate to the Kafka service using user and password
write_timeout string
Maximum time allowed to write to broker
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "30s"
Reconnection parameters
The metadata_retry_max and metadata_retry_backoff play a very important role in reconnection in case you have network issues. By default, its values allow to recovery only from connections lost during less that 750ms (3 retries with 250ms backoff). We strongly advise incrementing the number of retries (and increasing the backoff to 1sec), if you want to protect yourself from longer disconnection times.

The cluster field includes a client_tls field that allows you to authenticate with the Kafka service, as described in Service Client TLS Settings.

You can also use Kafka user and password authentication with the SASL config:

{
    "cluster": {
        "brokers": [
            "localhost:9092"
        ],
        "sasl": {
            "user": "johnsmith",
            "password": "myp4ssword"
        }
    }
}
Fields of Kafka SASL
* required fields

auth_identity string
Auth Identity is an (optional) authorization identity (authzid) to use for SASL/PLAIN authentication (if different from User) when an authenticated user is permitted to act as the presented alternative user. See RFC4616 for details
azure_event_hub boolean
Kafka > 1.x should use SASL V1, except on Azure EventHub which uses V0
Defaults to false
disable_hanshake boolean
Whether or not to send the Kafka SASL handshake first if enabled. You should only set this to false if you’re using a non-Kafka SASL proxy
Defaults to true
mechanism
Name of the enabled SASL mechanism
Possible values are: "PLAIN" , "OAUTHBEARER"
Defaults to "PLAIN"
password string
Password for SASL/PLAIN authentication
scram_auth_id string
Authz id used for SASL/SCRAM authentication
user string
Authentication identity (authcid) to present for SASL/PLAIN or SASL/SCRAM authentication

The group field has details about the Kafka consumer group behaviour:

Fields of Kafka Consumer Details
* required fields

fetch_default number
The default number of message bytes to fetch from the broker in each request (default 1MB). This should be larger than the majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar to the JVM’s fetch.message.max.bytes
heartbeat_interval string
Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "3s"
id string
Name of the consumer group to use
instance_id string
Support KIP-345
isolation_level
Supports 2 modes: read_commited to consume and return all messages in message channel, and read_uncommited to hide messages that are part of an aborted transaction
Possible values are: "read_commited" , "read_uncommited"
Defaults to "read_commited"
rebalance_strategies array
Priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader. Options are: range, roundrobin, and sticky
Defaults to ["range"]
rebalance_timeout string
Maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "60s"
session_timeout string
Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms
Specify units using ns (nanoseconds), us or µs (microseconds), ms (milliseconds), s (seconds), m (minutes), or h (hours).
Defaults to "10s"

ACK Behaviour in Kafka

Kafka has no per-message ACK: acknowledging a Kafka message commits the client’s partition offset.

KrakenD ACKs every message read, even when the response of the pipeline results in a non-successful status code.

Unresolved issues?

The documentation is only a piece of the help you can get! Whether you are looking for Open Source or Enterprise support, see more support channels that can help you.

See all support channels