Document updated on Mar 5, 2026
Kafka Driver for the Asynchronous Agent
The Kafka driver for async agents allows KrakenD to consume Kafka queues autonomously. Routines listening to Kafka topics react to new events and push data to your backends.
This driver differs from the Kafka backend consumer. Unlike endpoints, async agents do not require a client request to trigger an action. Instead, the agent subscribes to the topics and fires an action when an event is delivered.
Driver Configuration
Place the Kafka driver inside the extra_config of the async component to subscribe to one or more topics:
{
"async/kafka": {
"cluster": {
"brokers": [
"localhost:9092"
],
"client_id": "cid_stocksconsumer"
},
"group": {
"group_id": "my_group_id",
"isolation_level": "read_commited"
},
"key_meta": "Message-Id"
}
}
KrakenD consumes messages following the “latest” offset policy, meaning it processes only messages produced after startup.
The Kafka driver configuration accepts the same fields as the reader field in
backend kafka consumer (backend/pubsub/kafka/subscriber), except for the topics field that is defined in the general async agent configuration:
Fields of Kafka Reader
The cluster field describes details about the connection to the Kafka cluster. It is the same configuration used for the Kafka PubSub publisher and subscriber:
Fields of Kafka Cluster Connection
brokers* array of stringschannel_buffer_sizenumber- The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughputDefaults to
256 client_idstring- A name to give to the client stablishing the connectionDefaults to
"KrakenD v[X].[Y].[Z]" client_tlsobject- Enables specific TLS connection options when connecting to the Kafke brokers. Supports all options under TLS client settings.
dial_timeoutstring- Dial timeout for establishing new connectionsSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s" keep_alivestring- Time to maintain an idle connection openSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"0s" metadata_retry_backoffstring- The time between attempts to reconnect when disconnectedSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"250ms" metadata_retry_maxnumber- When a disconnection happens, the client needs to refresh its metadata to know the current state of the kafka cluster (effectively the number of attempts to reconnect)Defaults to
3 rack_idstring- A name to identify the rack we are connecting fromDefaults to
"" read_timeoutstring- Maximum time allowed to read from brokerSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s" saslobject- Enables to authenticate to the Kafka service using user and password
write_timeoutstring- Maximum time allowed to write to brokerSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s"
metadata_retry_max and metadata_retry_backoff play a very important role in reconnection in case you have network issues. By default, its values allow to recovery only from connections lost during less that 750ms
(3 retries with 250ms backoff). We strongly advise incrementing the number of retries (and increasing the backoff
to 1sec), if you want to protect yourself from longer disconnection times.The cluster field includes a client_tls field that allows you to authenticate with the Kafka service, as described in Service Client TLS Settings.
You can also use Kafka user and password authentication with the SASL config:
{
"cluster": {
"brokers": [
"localhost:9092"
],
"sasl": {
"user": "johnsmith",
"password": "myp4ssword"
}
}
}
Fields of Kafka SASL
auth_identitystring- Auth Identity is an (optional) authorization identity (authzid) to use for SASL/PLAIN authentication (if different from User) when an authenticated user is permitted to act as the presented alternative user. See RFC4616 for details
azure_event_hubboolean- Kafka > 1.x should use SASL V1, except on Azure EventHub which uses V0Defaults to
false disable_hanshakeboolean- Whether or not to send the Kafka SASL handshake first if enabled. You should only set this to false if you’re using a non-Kafka SASL proxyDefaults to
true mechanism- Name of the enabled SASL mechanismPossible values are:
"PLAIN","OAUTHBEARER"Defaults to"PLAIN" passwordstring- Password for SASL/PLAIN authentication
scram_auth_idstring- Authz id used for SASL/SCRAM authentication
userstring- Authentication identity (authcid) to present for SASL/PLAIN or SASL/SCRAM authentication
The group field has details about the Kafka consumer group behaviour:
Fields of Kafka Consumer Details
fetch_defaultnumber- The default number of message bytes to fetch from the broker in each request (default 1MB). This should be larger than the majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar to the JVM’s
fetch.message.max.bytes heartbeat_intervalstring- Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalancesSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"3s" idstring- Name of the consumer group to use
instance_idstring- Support KIP-345
isolation_level- Supports 2 modes:
read_commitedto consume and return all messages in message channel, andread_uncommitedto hide messages that are part of an aborted transactionPossible values are:"read_commited","read_uncommited"Defaults to"read_commited" rebalance_strategiesarray- Priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader. Options are:
range,roundrobin, andstickyDefaults to["range"] rebalance_timeoutstring- Maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.Specify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"60s" session_timeoutstring- Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by
group.min.session.timeout.msandgroup.max.session.timeout.msSpecify units usingns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"10s"
ACK Behaviour in Kafka
Kafka has no per-message ACK: acknowledging a Kafka message commits the client’s partition offset.
KrakenD ACKs every message read, even when the response of the pipeline results in a non-successful status code.
