Document updated on Mar 5, 2026
Kafka PubSub with Extended Connectivity
Kafka PubSub enables KrakenD to publish messages to and consume messages from Kafka topics directly from backend configurations. Use this component when you need to secure your Kafka connections with mutual TLS (mTLS) or SASL authentication. It provides two backend namespaces: backend/pubsub/publisher/kafka for publishing and backend/pubsub/subscriber/kafka for subscribing.
Publisher Configuration
Add the backend/pubsub/publisher/kafka namespace under a backend’s extra_config to publish messages to a Kafka topic. KrakenD converts the backend’s input_headers into message metadata.
The following configuration enables publishing to the orderplacement topic with mTLS authentication:
{
"backend/pubsub/publisher/kafka": {
"success_status_code": 201,
"writer": {
"topic": "orderplacement",
"key_meta": "X-My-Key",
"cluster": {
"brokers": [
"localhost:49092"
],
"client_id": "krakend_async_agent",
"client_tls": {
"allow_insecure_connections": false,
"ca_certs": [
"./config/ca/cacert.pem"
],
"client_certs": [
{
"certificate": "./config/certs/client/client.signed.pem",
"private_key": "./config/certs/client/client.key"
}
]
}
},
"producer": {
"idempotent": true
}
}
}
}
The top-level fields for backend/pubsub/publisher/kafka are:
Fields of PubSub Kafka Publisher EE
success_status_codenumber- HTTP status code to return for a successful write in the queueDefaults to
200 writer* object- Define how to write messages to a kafka topic
The writer field contains the configuration for writing to a Kafka topic:
Fields of Kafka Writer
The cluster field describes details about the connection to the Kafka cluster. It is the same configuration used for the Kafka PubSub subscriber and the Kafka async agent:
Fields of Kafka Cluster Connection
brokers* array of stringschannel_buffer_sizenumber- The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughputDefaults to
256 client_idstring- A name to give to the client stablishing the connectionDefaults to
"KrakenD v[X].[Y].[Z]" client_tlsobject- Enables specific TLS connection options when connecting to the Kafke brokers. Supports all options under TLS client settings.
dial_timeoutstring- Dial timeout for establishing new connectionsSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s" keep_alivestring- Time to maintain an idle connection openSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"0s" metadata_retry_backoffstring- The time between attempts to reconnect when disconnectedSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"250ms" metadata_retry_maxnumber- When a disconnection happens, the client needs to refresh its metadata to know the current state of the kafka cluster (effectively the number of attempts to reconnect)Defaults to
3 rack_idstring- A name to identify the rack we are connecting fromDefaults to
"" read_timeoutstring- Maximum time allowed to read from brokerSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s" saslobject- Enables to authenticate to the Kafka service using user and password
write_timeoutstring- Maximum time allowed to write to brokerSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s"
metadata_retry_max and metadata_retry_backoff play a very important role in reconnection in case you have network issues. By default, its values allow to recovery only from connections lost during less that 750ms
(3 retries with 250ms backoff). We strongly advise incrementing the number of retries (and increasing the backoff
to 1sec), if you want to protect yourself from longer disconnection times.The cluster field includes a client_tls field that allows you to authenticate with the Kafka service, as described in Service Client TLS Settings.
You can also use user and password authentication with the SASL config:
Fields of Kafka SASL
auth_identitystring- Auth Identity is an (optional) authorization identity (authzid) to use for SASL/PLAIN authentication (if different from User) when an authenticated user is permitted to act as the presented alternative user. See RFC4616 for details
azure_event_hubboolean- Kafka > 1.x should use SASL V1, except on Azure EventHub which uses V0Defaults to
false disable_hanshakeboolean- Whether or not to send the Kafka SASL handshake first if enabled. You should only set this to false if you’re using a non-Kafka SASL proxyDefaults to
true mechanism- Name of the enabled SASL mechanismPossible values are:
"PLAIN","OAUTHBEARER"Defaults to"PLAIN" passwordstring- Password for SASL/PLAIN authentication
scram_auth_idstring- Authz id used for SASL/SCRAM authentication
userstring- Authentication identity (authcid) to present for SASL/PLAIN or SASL/SCRAM authentication
The producer field has details about how to write messages:
Fields of Kafka Producer Details
compression_codec- Type of compression to use on messages (defaults to no compression). Similar to
compression.codecsetting of the JVM producer.Possible values are:"none","gzip","snappy","lz4","zstd"Defaults to"none" compression_levelstring- Level of compression to use on messages. The meaning depends on the actual compression type used and defaults to default compression level for the codec.
idempotentboolean- If enabled, the producer will ensure that exactly one copy of each message is writtenDefaults to
false max_message_bytesnumber- Maximum permitted size of a message. Should be set equal to or smaller than the broker’s
message.max.bytes. partitioner- Select behaviour for choosing the partition to send messages (similar to the
partitioner.classsetting for the JVM producer). The options are:
-sarama: DEPRECATED uses a Partitioner which behaves as follows: If the message’s key is nil then a random partition is chosen. Otherwise theFNV-1ahash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.
-standardis likesaramaexcept that it handles absolute values in the same way as the reference Java implementation.saramawas supposed to do that but it had a mistake and now there are people depending on both behaviours. This will all go away on the next major version bump.
-randomuses a Partitioner which chooses a random partition each time.
-roundrobinuses a Partitioner which walks through the available partitions one at a time.Possible values are:"sarama","standard","random","roundrobin"Defaults to"standard" required_acksstring- Level of acknowledgement reliability needed from the broker. Equivalent to the
request.required.ackssetting of the JVM producer. Can be a positibe number (as a string), or one of hte following values:no_response(no required acks),wait_for_local(waits for only the local commit to succeed before responding),wait_for_all(waits for all in-sync replicas to commit before responding).Defaults to"wait_for_local" required_acks_timeoutstring- Maximum duration the broker will wait the receipt of the number of
required_acks. This is only relevant whenrequired_acksis set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Equivalent to the JVM producer’srequest.timeout.mssetting.Specify units usingns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"10s" retry_backoffstring- How long to wait for the cluster to settle between retries (similar to the
retry.backoff.mssetting of the JVM producer.Specify units usingns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"100ms" retry_maxnumber- The total number of times to retry sending a message. Similar to the
message.send.max.retriessetting of the JVM producer.Defaults to3
OpenTelemetry
Metrics
When OpenTelemetry is enabled, KrakenD reports the following metrics for all messages:
messaging.write.body.size: histogram of body sizes in bytes (does not include metadata).messaging.write.body.duration: histogram of duration taken to write a message.messaging.write.failure.count: histogram of the time taken to fail to write a message.
All metrics include the following attributes:
kind: identifies the messaging system (in this casekafka).topic: the topic where the message is written to.
Traces
KrakenD adds information about the write process as a new span if a trace is active, with the following fields:
kind: identifies the messaging system (in this casekafka).topic: the topic where the message is written to.messaging.write.body.size: size of the message body.messaging.write.body.duration: time taken to write the message in seconds.
Subscriber Configuration
Add the backend/pubsub/subscriber/kafka namespace under a backend’s extra_config to consume messages from a Kafka topic. It contains a reader field with the same configuration as the Kafka async agent driver.
The following configuration subscribes to the orderplacement topic with mTLS authentication:
{
"backend/pubsub/subscriber/kafka": {
"reader": {
"topics": ["orderplacement"],
"key_meta": "X-Idempotency-Key",
"cluster": {
"brokers": [
"localhost:49092"
],
"client_id": "krakend_async_agent",
"client_tls": {
"allow_insecure_connections": false,
"ca_certs": [
"./config/ca/cacert.pem"
],
"client_certs": [
{
"certificate": "./config/certs/client/client.signed.pem",
"private_key": "./config/certs/client/client.key"
}
]
}
},
"group": {
"id": "k_endpoint_read",
"isolation_level": "read_commited"
}
}
}
}
Fields of PubSub Kafka Subscriber EE
reader* object- Define how to read messages from a kafka topic
KrakenD consumes messages following the “latest” offset policy, meaning it processes only messages produced after KrakenD starts.
The reader field defines how to read from a Kafka cluster:
Fields of Kafka Reader
The cluster field describes details about the connection to the Kafka cluster. It is the same configuration used for the Kafka PubSub publisher and the Kafka async agent:
Fields of Kafka Cluster Connection
brokers* array of stringschannel_buffer_sizenumber- The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughputDefaults to
256 client_idstring- A name to give to the client stablishing the connectionDefaults to
"KrakenD v[X].[Y].[Z]" client_tlsobject- Enables specific TLS connection options when connecting to the Kafke brokers. Supports all options under TLS client settings.
dial_timeoutstring- Dial timeout for establishing new connectionsSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s" keep_alivestring- Time to maintain an idle connection openSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"0s" metadata_retry_backoffstring- The time between attempts to reconnect when disconnectedSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"250ms" metadata_retry_maxnumber- When a disconnection happens, the client needs to refresh its metadata to know the current state of the kafka cluster (effectively the number of attempts to reconnect)Defaults to
3 rack_idstring- A name to identify the rack we are connecting fromDefaults to
"" read_timeoutstring- Maximum time allowed to read from brokerSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s" saslobject- Enables to authenticate to the Kafka service using user and password
write_timeoutstring- Maximum time allowed to write to brokerSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"30s"
metadata_retry_max and metadata_retry_backoff play a very important role in reconnection in case you have network issues. By default, its values allow to recovery only from connections lost during less that 750ms
(3 retries with 250ms backoff). We strongly advise incrementing the number of retries (and increasing the backoff
to 1sec), if you want to protect yourself from longer disconnection times.The cluster field includes a client_tls field that allows you to authenticate with the Kafka service, as described in Service Client TLS Settings.
You can also use user and password authentication with the SASL config:
Fields of Kafka SASL
auth_identitystring- Auth Identity is an (optional) authorization identity (authzid) to use for SASL/PLAIN authentication (if different from User) when an authenticated user is permitted to act as the presented alternative user. See RFC4616 for details
azure_event_hubboolean- Kafka > 1.x should use SASL V1, except on Azure EventHub which uses V0Defaults to
false disable_hanshakeboolean- Whether or not to send the Kafka SASL handshake first if enabled. You should only set this to false if you’re using a non-Kafka SASL proxyDefaults to
true mechanism- Name of the enabled SASL mechanismPossible values are:
"PLAIN","OAUTHBEARER"Defaults to"PLAIN" passwordstring- Password for SASL/PLAIN authentication
scram_auth_idstring- Authz id used for SASL/SCRAM authentication
userstring- Authentication identity (authcid) to present for SASL/PLAIN or SASL/SCRAM authentication
The group field has details about the Kafka consumer group:
Fields of Kafka Consumer Details
fetch_defaultnumber- The default number of message bytes to fetch from the broker in each request (default 1MB). This should be larger than the majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar to the JVM’s
fetch.message.max.bytes heartbeat_intervalstring- Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalancesSpecify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"3s" idstring- Name of the consumer group to use
instance_idstring- Support KIP-345
isolation_level- Supports 2 modes:
read_commitedto consume and return all messages in message channel, andread_uncommitedto hide messages that are part of an aborted transactionPossible values are:"read_commited","read_uncommited"Defaults to"read_commited" rebalance_strategiesarray- Priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader. Options are:
range,roundrobin, andstickyDefaults to["range"] rebalance_timeoutstring- Maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.Specify units using
ns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"60s" session_timeoutstring- Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by
group.min.session.timeout.msandgroup.max.session.timeout.msSpecify units usingns(nanoseconds),usorµs(microseconds),ms(milliseconds),s(seconds),m(minutes), orh(hours).Defaults to"10s"
OpenTelemetry
Metrics
When OpenTelemetry is enabled, KrakenD reports the following metrics for all messages read:
messaging.read.body.size: histogram of body sizes in bytes (does not include metadata).messaging.read.body.duration: histogram of the duration taken to read a message.messaging.read.ack.duration: histogram of the time taken to acknowledge the message.messaging.read.failure.count: histogram of time taken to fail reading a message.
All metrics include the following attributes:
kind: identifies the messaging system (for examplekafka).topic: the topic where the message is read from.origin: always set tosubscriber, to differentiate reading metrics from those of an async agent (which can take longer as it waits for new events).
Traces
KrakenD adds information about the read process as a new span if a trace is active, with the following fields:
kind: identifies the messaging system (for examplekafka).topic: the topic where the message is read from.messaging.read.body.size: size of the read message body.messaging.read.body.duration: time taken to read a message in seconds.
ACK Behaviour in Kafka
Kafka has no per-message ACK: acknowledging a Kafka message commits the client’s partition offset.
KrakenD ACKs every message read, even when the response of the pipeline results in a non-successful status code.
