The confluent_kafka API

A reliable, performant and feature rich Python client for Apache Kafka v0.8 and above.

Clients
Supporting classes

Index

Consumer

class confluent_kafka.Consumer

A high-level Apache Kafka Consumer

Consumer(config)

Create a new Consumer instance using the provided configuration dict (including properties and callback functions). See Configuration for more information.

Parameters:config (dict) – Configuration properties. At a minimum group.id must be set, bootstrap.servers should be set.
assign()
assign(partitions)

Set consumer partition assignment to the provided list of TopicPartition and starts consuming.

Parameters:partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming.
Raises:RuntimeError if called on a closed consumer
assignment()

Returns the current partition assignment.

Returns:List of assigned topic+partitions.
Return type:list(TopicPartition)
Raises:KafkaException
Raises:RuntimeError if called on a closed consumer
close()

Close down and terminate the Kafka Consumer.

Actions performed:

  • Stops consuming
  • Commits offsets - except if the consumer property ‘enable.auto.commit’ is set to False
  • Leave consumer group
Return type:None
Raises:RuntimeError if called on a closed consumer
commit()
commit([message=None][, offsets=None][, asynchronous=True])

Commit a message or a list of offsets.

message and offsets are mutually exclusive, if neither is set the current partition assignment’s offsets are used instead. The consumer relies on your use of this method if you have set ‘enable.auto.commit’ to False

Parameters:
  • message (confluent_kafka.Message) – Commit message’s offset+1.
  • offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
  • asynchronous (bool) – Asynchronous commit, return None immediately. If False the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition will need to be checked for success.
Return type:

None|list(TopicPartition)

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

committed()
committed(partitions[, timeout=None])

Retrieve committed offsets for the list of partitions.

Parameters:
  • partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.
  • timeout (float) – Request timeout. (Seconds)
Returns:

List of topic+partitions with offset and possibly error set.

Return type:

list(TopicPartition)

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

consume()
consume([num_messages=1][, timeout=-1])

Consume messages, calls callbacks and returns list of messages (possibly empty on timeout).

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error for each Message in the list (see error().code() for specifics).

Parameters:
  • num_messages (int) – Maximum number of messages to return (default: 1).
  • timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)
Returns:

A list of Message objects (possibly empty on timeout)

Return type:

list(Message)

Raises:
  • RuntimeError – if called on a closed consumer
  • KafkaError – in case of internal error
  • ValueError – if num_messages > 1M
consumer_group_metadata()
consumer_group_metadata()
Returns:the consumer’s current group metadata. This object should be passed to the transactional producer’s send_offsets_to_transaction() API.
get_watermark_offsets()
get_watermark_offsets(partition[, timeout=None][, cached=False])

Retrieve low and high offsets for partition.

Parameters:
  • partition (TopicPartition) – Topic+partition to return offsets for.
  • timeout (float) – Request timeout (when cached=False). (Seconds)
  • cached (bool) – Instead of querying the broker used cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
Returns:

Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1.

Return type:

tuple(int,int)

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

list_topics()
list_topics([topic=None][, timeout=-1])

Request Metadata from cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.

Parameters:
  • topic (str) – If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.
  • timeout (float) – Maximum response time before timing out, or -1 for infinite timeout.
Return type:

ClusterMetadata

Raises:

KafkaException

offsets_for_times()
offsets_for_times(partitions[, timeout=None])

offsets_for_times looks up offsets by timestamp for the given partitions.

The returned offsets for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.

param list(TopicPartition) partitions:
 topic+partitions with timestamps in the TopicPartition.offset field.
param float timeout:
 Request timeout. (Seconds)
returns:list of topic+partition with offset field set and possibly error set
rtype:list(TopicPartition)
raises:KafkaException
raises:RuntimeError if called on a closed consumer
pause()
pause(partitions)

Pause consumption for the provided list of partitions.

Parameters:partitions (list(TopicPartition)) – List of topic+partitions to pause.
Return type:None
Raises:KafkaException
poll()
poll([timeout=None])

Consume messages, calls callbacks and returns events.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).

Parameters:timeout (float) – Maximum time to block waiting for message, event or callback. (Seconds)
Returns:A Message object or None on timeout
Return type:Message or None
Raises:RuntimeError if called on a closed consumer
position()
position(partitions)

Retrieve current positions (offsets) for the list of partitions.

Parameters:partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.
Returns:List of topic+partitions with offset and possibly error set.
Return type:list(TopicPartition)
Raises:KafkaException
Raises:RuntimeError if called on a closed consumer
resume()
resume(partitions)

Resume consumption for the provided list of partitions.

Parameters:partitions (list(TopicPartition)) – List of topic+partitions to resume.
Return type:None
Raises:KafkaException
seek()
seek(partition)

Set consume position for partition to offset. The offset may be an absolute (>=0) or a logical offset (OFFSET_BEGINNING et.al).

seek() may only be used to update the consume offset of an actively consumed partition (i.e., after assign()), to set the starting offset of partition not being consumed instead pass the offset in an assign() call.

Parameters:partition (TopicPartition) – Topic+partition+offset to seek to.
Raises:KafkaException
store_offsets()
store_offsets([message=None][, offsets=None])

Store offsets for a message or a list of offsets.

message and offsets are mutually exclusive. The stored offsets will be committed according to ‘auto.commit.interval.ms’ or manual offset-less commit(). Note that ‘enable.auto.offset.store’ must be set to False when using this API.

Parameters:
Return type:

None

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

subscribe()
subscribe(topics[, on_assign=None][, on_revoke=None])

Set subscription to supplied list of topics This replaces a previous subscription.

Regexp pattern subscriptions are supported by prefixing the topic string with "^", e.g.:

consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
Parameters:
  • topics (list(str)) – List of topics (strings) to subscribe to.
  • on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.
  • on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

on_assign(consumer, partitions)
on_revoke(consumer, partitions)
Parameters:
  • consumer (Consumer) – Consumer instance.
  • partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.
unassign()

Removes the current partition assignment and stops consuming.

Raises:
unsubscribe()

Remove current subscription.

Raises:KafkaException
Raises:RuntimeError if called on a closed consumer

Producer

class confluent_kafka.Producer

Asynchronous Kafka Producer

Producer(config)
Parameters:config (dict) – Configuration properties. At a minimum bootstrap.servers should be set

Create a new Producer instance using the provided configuration dict.

len()
Returns:Number of messages and Kafka protocol requests waiting to be delivered to broker.
Return type:int
abort_transaction()
abort_transaction([timeout])

Aborts the current transaction. This function should also be used to recover from non-fatal abortable transaction errors when KafkaError.txn_requires_abort() is True.

Any outstanding messages will be purged and fail with _PURGE_INFLIGHT or _PURGE_QUEUE.

Note: This function will block until all outstanding messages are purged and the transaction abort request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.

Note: Will automatically call purge() and flush() to ensure all queued and in-flight messages are purged before attempting to abort the transaction.

Parameters:timeout (float) – The maximum amount of time to block waiting for transaction to abort in seconds.
Raises:KafkaError: Use exc.args[0].retriable() to check if the operation may be retried. Treat any other error as a fatal error.
begin_transaction()
begin_transaction()

Begin a new transaction.

init_transactions() must have been called successfully (once) before this function is called.

Any messages produced or offsets sent to a transaction, after the successful return of this function will be part of the transaction and committed or aborted atomically.

Complete the transaction by calling commit_transaction() or Abort the transaction by calling abort_transaction().

Raises:KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error.
commit_transaction()
commit_transaction([timeout])

Commmit the current transaction. Any outstanding messages will be flushed (delivered) before actually committing the transaction.

If any of the outstanding messages fail permanently the current transaction will enter the abortable error state and this function will return an abortable error, in this case the application must call abort_transaction() before attempting a new transaction with begin_transaction().

Note: This function will block until all outstanding messages are delivered and the transaction commit request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.

Note: Will automatically call flush() to ensure all queued messages are delivered before attempting to commit the transaction. Delivery reports and other callbacks may thus be triggered from this method.

Parameters:timeout (float) – The amount of time to block in seconds.
Raises:KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error.
flush()
flush([timeout])
Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len() is zero or the optional timeout elapses.
Param:float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds)
Returns:Number of messages still in queue.

Note

See poll() for a description on what callbacks may be triggered.

init_transactions()

Initializes transactions for the producer instance.

This function ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the transactional.id is configured.

If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction’s completion.

When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance.

Upon successful return from this function the application has to perform at least one of the following operations within transactional.timeout.ms to avoid timing out the transaction on the broker: * produce() (et.al) * send_offsets_to_transaction() * commit_transaction() * abort_transaction()

Parameters:timeout (float) – Maximum time to block in seconds.
Raises:KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error.
list_topics()
list_topics([topic=None][, timeout=-1])

Request Metadata from cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.

Parameters:
  • topic (str) – If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.
  • timeout (float) – Maximum response time before timing out, or -1 for infinite timeout.
Return type:

ClusterMetadata

Raises:

KafkaException

poll()
poll([timeout])

Polls the producer for events and calls the corresponding callbacks (if registered).

Callbacks:

Parameters:timeout (float) – Maximum time to block waiting for events. (Seconds)
Returns:Number of events processed (callbacks served)
Return type:int
produce()
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])

Produce message to topic. This is an asynchronous operation, an application may use the callback (alias on_delivery) argument to pass a function (or lambda) that will be called from poll() when the message has been successfully delivered or permanently fails delivery.

Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set.

Parameters:
  • topic (str) – Topic to produce message to
  • value (str|bytes) – Message payload
  • key (str|bytes) – Message key
  • partition (int) – Partition to produce to, else uses the configured built-in partitioner.
  • on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
  • timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
  • dict|list (headers) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
Return type:

None

Raises:
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)
  • KafkaException – for other errors, see exception code
  • NotImplementedError – if timestamp is specified without underlying library support.
send_offsets_to_transaction()
send_offsets_to_transaction(positions, group_metadata[, timeout])

Sends a list of topic partition offsets to the consumer group coordinator for group_metadata and marks the offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.

The offsets should be the next message your application will consume, i.e., the last processed message’s offset + 1 for each partition. Either track the offsets manually during processing or use consumer.position() (on the consumer) to get the current offsets for the partitions assigned to the consumer.

Use this method at the end of a consume-transform-produce loop prior to committing the transaction with commit_transaction().

Note: The consumer must disable auto commits
(set enable.auto.commit to false on the consumer).

Note: Logical and invalid offsets (e.g., OFFSET_INVALID) in offsets will be ignored. If there are no valid offsets in offsets the function will return successfully and no action will be taken.

Parameters:
  • offsets (list(TopicPartition)) – current consumer/processing position(offsets) for the list of partitions.
  • group_metadata (object) – consumer group metadata retrieved from the input consumer’s get_consumer_group_metadata().
  • timeout (float) – Amount of time to block in seconds.
Raises:

KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error.

AdminClient

Kafka Admin client: create, view, alter, delete topics and resources.

class confluent_kafka.admin.AdminClient(conf)

AdminClient provides admin operations for Kafka brokers, topics, groups, and other resource types supported by the broker.

The Admin API methods are asynchronous and returns a dict of concurrent.futures.Future objects keyed by the entity. The entity is a topic name for create_topics(), delete_topics(), create_partitions(), and a ConfigResource for alter_configs(), describe_configs().

All the futures for a single API call will currently finish/fail at the same time (backed by the same protocol request), but this might change in future versions of the client.

See examples/adminapi.py for example usage.

For more information see the Java Admin API documentation: https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/clients/admin/package-frame.html

Requires broker version v0.11.0.0 or later.

alter_configs(resources, **kwargs)

Update configuration values for the specified resources. Updates are not transactional so they may succeed for a subset of the provided resources while the others fail. The configuration for a particular resource is updated atomically, replacing the specified values while reverting unspecified configuration entries to their default values.

The future result() value is None.

Warning:

alter_configs() will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration for the resource back to their default values.

Warning:

Multiple resources and resource types may be specified, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.

Parameters:
  • resources (list(ConfigResource)) – Resources to update configuration for.
  • request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0.
  • validate_only (bool) – Tell broker to only validate the request, without altering the configuration. Default: False
Returns:

a dict of futures for each resource, keyed by the ConfigResource.

Return type:

dict(<ConfigResource, future>)

Raises:
  • KafkaException – Operation failed locally or on broker.
  • TypeException – Invalid input.
  • ValueException – Invalid input.
create_partitions(new_partitions, **kwargs)

Create additional partitions for the given topics.

The future result() value is None.

Parameters:
  • new_partitions (list(NewPartitions)) – New partitions to be created.
  • operation_timeout (float) – Set broker’s operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate in the cluster. A value of 0 returns immediately. Default: 0
  • request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
  • validate_only (bool) – Tell broker to only validate the request, without creating the partitions. Default: False
Returns:

a dict of futures for each topic, keyed by the topic name.

Return type:

dict(<topic_name, future>)

Raises:
  • KafkaException – Operation failed locally or on broker.
  • TypeException – Invalid input.
  • ValueException – Invalid input.
create_topics(new_topics, **kwargs)

Create new topics in cluster.

The future result() value is None.

Parameters:
  • new_topics (list(NewTopic)) – New topics to be created.
  • operation_timeout (float) – Set broker’s operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate in the cluster. A value of 0 returns immediately. Default: 0
  • request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
  • validate_only (bool) – Tell broker to only validate the request, without creating the topic. Default: False
Returns:

a dict of futures for each topic, keyed by the topic name.

Return type:

dict(<topic_name, future>)

Raises:
  • KafkaException – Operation failed locally or on broker.
  • TypeException – Invalid input.
  • ValueException – Invalid input.
delete_topics(topics, **kwargs)

Delete topics.

The future result() value is None.

Parameters:
  • topics (list(str)) – Topics to mark for deletion.
  • operation_timeout (float) – Set broker’s operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate in the cluster. A value of 0 returns immediately. Default: 0
  • request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
Returns:

a dict of futures for each topic, keyed by the topic name.

Return type:

dict(<topic_name, future>)

Raises:
  • KafkaException – Operation failed locally or on broker.
  • TypeException – Invalid input.
  • ValueException – Invalid input.
describe_configs(resources, **kwargs)

Get configuration for the specified resources.

The future result() value is a dict(<configname, ConfigEntry>).

Warning:

Multiple resources and resource types may be requested, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.

Parameters:
  • resources (list(ConfigResource)) – Resources to get configuration for.
  • request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
  • validate_only (bool) – Tell broker to only validate the request, without creating the partitions. Default: False
Returns:

a dict of futures for each resource, keyed by the ConfigResource.

Return type:

dict(<ConfigResource, future>)

Raises:
  • KafkaException – Operation failed locally or on broker.
  • TypeException – Invalid input.
  • ValueException – Invalid input.
class confluent_kafka.admin.BrokerMetadata

BrokerMetadata contains information about a Kafka broker.

This class is typically not user instantiated.

Variables:
  • id (int) – Broker id.
  • host (str) – Broker hostname.
  • port (int) – Broker port.
class confluent_kafka.admin.ClusterMetadata

ClusterMetadata as returned by list_topics() contains information about the Kafka cluster, brokers, and topics.

This class is typically not user instantiated.

Variables:
  • cluster_id (str) – Cluster id string, if supported by broker, else None.
  • controller_id (id) – Current controller broker id, or -1.
  • brokers (dict) – Map of brokers indexed by the int broker id. Value is BrokerMetadata object.
  • topics (dict) – Map of topics indexed by the topic name. Value is TopicMetadata object.
  • orig_broker_id (int) – The broker this metadata originated from.
  • orig_broker_name (str) – Broker name/address this metadata originated from.
class confluent_kafka.admin.ConfigEntry(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>, is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[])

ConfigEntry is returned by describe_configs() for each configuration entry for the specified resource.

This class is typically not user instantiated.

Variables:
  • name (str) – Configuration property name.
  • value (str) – Configuration value (or None if not set or is_sensitive==True).
  • source (ConfigSource) – Configuration source.
  • is_read_only (bool) – Indicates if configuration property is read-only.
  • is_default (bool) – Indicates if configuration property is using its default value.
  • is_sensitive (bool) – Indicates if configuration property value contains sensitive information (such as security settings), in which case .value is None.
  • is_synonym (bool) – Indicates if configuration property is a synonym for the parent configuration entry.
  • synonyms (list) – A ConfigEntry list of synonyms and alternate sources for this configuration property.
class confluent_kafka.admin.ConfigResource(restype, name, set_config=None, described_configs=None, error=None)

Class representing resources that have configs.

Instantiate with a resource type and a resource name.

class Type

ConfigResource.Type depicts the type of a Kafka resource.

ANY = 1

Match any resource, used for lookups.

BROKER = 4

Broker resource. Resource name is broker id

GROUP = 3

Group resource. Resource name is group.id

TOPIC = 2

Topic resource. Resource name is topic name

UNKNOWN = 0

Resource type is not known or not set.

set_config(name, value, overwrite=True)

Set/Overwrite configuration entry

Any configuration properties that are not included will be reverted to their default values. As a workaround use describe_configs() to retrieve the current configuration and overwrite the settings you want to change.

Parameters:
  • name (str) – Configuration property name
  • value (str) – Configuration value
  • overwrite (bool) – If True overwrite entry if already exists (default). If False do nothing if entry already exists.
class confluent_kafka.admin.ConfigSource

Config sources returned in ConfigEntry by describe_configs().

DEFAULT_CONFIG = 5
DYNAMIC_BROKER_CONFIG = 2
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
DYNAMIC_TOPIC_CONFIG = 1
STATIC_BROKER_CONFIG = 4
UNKNOWN_CONFIG = 0
class confluent_kafka.admin.PartitionMetadata

PartitionsMetadata contains information about a Kafka partition.

This class is typically not user instantiated.

Variables:
  • id (int) – Partition id.
  • leader (int) – Current leader broker for this partition, or -1.
  • replicas (list(int)) – List of replica broker ids for this partition.
  • isrs (list(int)) – List of in-sync-replica broker ids for this partition.
  • -error (KafkaError) – Partition error, or None. Value is a KafkaError object.
Warning:

Depending on cluster state the broker ids referenced in leader, replicas and isrs may temporarily not be reported in ClusterMetadata.brokers. Always check the availability of a broker id in the brokers dict.

class confluent_kafka.admin.TopicMetadata

TopicMetadata contains information about a Kafka topic.

This class is typically not user instantiated.

Variables:
  • -topic (str) – Topic name.
  • partitions (dict) – Map of partitions indexed by partition id. Value is PartitionMetadata object.
  • -error (KafkaError) – Topic error, or None. Value is a KafkaError object.

Avro

Avro schema registry module: Deals with encoding and decoding of messages with avro schemas

class confluent_kafka.avro.AvroConsumer(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None)

Kafka Consumer client which does avro schema decoding of messages. Handles message deserialization.

Constructor takes below parameters

Parameters:
  • config (dict) – Config parameters containing url for schema registry (schema.registry.url) and the standard Kafka client configuration (bootstrap.servers et.al)
  • reader_key_schema (schema) – a reader schema for the message key
  • reader_value_schema (schema) – a reader schema for the message value
Raises:

ValueError – For invalid configurations

poll(timeout=None)

This is an overriden method from confluent_kafka.Consumer class. This handles message deserialization using avro schema

Parameters:timeout (float) – Poll timeout in seconds (default: indefinite)
Returns:message object with deserialized key and value as dict objects
Return type:Message
class confluent_kafka.avro.AvroProducer(config, default_key_schema=None, default_value_schema=None, schema_registry=None)

Kafka Producer client which does avro schema encoding to messages. Handles schema registration, Message serialization.

Constructor takes below parameters.

Parameters:
  • config (dict) – Config parameters containing url for schema registry (schema.registry.url) and the standard Kafka client configuration (bootstrap.servers et.al).
  • default_key_schema (str) – Optional default avro schema for key
  • default_value_schema (str) – Optional default avro schema for value
produce(**kwargs)

Asynchronously sends message to Kafka by encoding with specified or default avro schema.

Parameters:
  • topic (str) – topic name
  • value (object) – An object to serialize
  • value_schema (str) – Avro schema for value
  • key (object) – An object to serialize
  • key_schema (str) – Avro schema for key

Plus any other parameters accepted by confluent_kafka.Producer.produce

Raises:
  • SerializerError – On serialization failure
  • BufferError – If producer queue is full.
  • KafkaException – For other produce failures.

Supporting Classes

Message

class confluent_kafka.Message

The Message object represents either a single consumed or produced message, or an event (error() is not None).

An application must check with error() to see if the object is a proper message (error() returns None) or an error/event.

This class is not user-instantiable.

len()
Returns:Message value (payload) size in bytes
Return type:int
error()

The message object is also used to propagate errors and events, an application must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)

Return type:None or KafkaError
headers()

Retrieve the headers set on a message. Each header is a key valuepair. Please note that header keys are ordered and can repeat.

Returns:list of two-tuples, one (key, value) pair for each header.
Return type:[(str, bytes),..] or None.
key()
Returns:message key or None if not available.
Return type:str|bytes or None
offset()
Returns:message offset or None if not available.
Return type:int or None
partition()
Returns:partition number or None if not available.
Return type:int or None
set_headers()

Set the field ‘Message.headers’ with new value.

Parameters:value (object) – Message.headers.
Returns:None.
Return type:None
set_key()

Set the field ‘Message.key’ with new value.

Parameters:value (object) – Message.key.
Returns:None.
Return type:None
set_value()

Set the field ‘Message.value’ with new value.

Parameters:value (object) – Message.value.
Returns:None.
Return type:None
timestamp()

Retrieve timestamp type and timestamp from message. The timestamp type is one of:

  • TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker.
  • TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time).
  • TIMESTAMP_LOG_APPEND_TIME - Broker receive time.

The returned timestamp should be ignored if the timestamp type is TIMESTAMP_NOT_AVAILABLE.

The timestamp is the number of milliseconds since the epoch (UTC).

Timestamps require broker version 0.10.0.0 or later and {'api.version.request': True} configured on the client.

returns:tuple of message timestamp type, and timestamp.
rtype:(int, int)
topic()
Returns:topic name or None if not available.
Return type:str or None
value()
Returns:message value (payload) or None if not available.
Return type:str|bytes or None

TopicPartition

class confluent_kafka.TopicPartition

TopicPartition is a generic type to hold a single partition and various information about it.

It is typically used to provide a list of topics or partitions for various operations, such as Consumer.assign().

TopicPartition(topic[, partition][, offset])

Instantiate a TopicPartition object.

Parameters:
  • topic (string) – Topic name
  • partition (int) – Partition id
  • offset (int) – Initial partition offset
Return type:

TopicPartition

error
Attribute error:
 Indicates an error (with KafkaError) unless None.
offset
Attribute offset:
 Offset (long)

Either an absolute offset (>=0) or a logical offset: OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID

partition
Attribute partition:
 Partition number (int)
topic
Attribute topic:
 Topic name (string)

KafkaError

class confluent_kafka.KafkaError

Kafka error and event object

The KafkaError class serves multiple purposes

  • Propagation of errors
  • Propagation of events
  • Exceptions
Args:

error_code (KafkaError): Error code indicating the type of error.

reason (str): Alternative message to describe the error.

fatal (bool): Set to true if a fatal error.

retriable (bool): Set to true if operation is retriable.

txn_requires_abort (bool): Set to true if this is an abortable transaction error.

Error and event constants:

Constant Description
_BAD_MSG Local: Bad message format
_BAD_COMPRESSION Local: Invalid compressed data
_DESTROY Local: Broker handle destroyed
_FAIL Local: Communication failure with broker
_TRANSPORT Local: Broker transport failure
_CRIT_SYS_RESOURCE Local: Critical system resource failure
_RESOLVE Local: Host resolution failure
_MSG_TIMED_OUT Local: Message timed out
_PARTITION_EOF Broker: No more messages
_UNKNOWN_PARTITION Local: Unknown partition
_FS Local: File or filesystem error
_UNKNOWN_TOPIC Local: Unknown topic
_ALL_BROKERS_DOWN Local: All broker connections are down
_INVALID_ARG Local: Invalid argument or configuration
_TIMED_OUT Local: Timed out
_QUEUE_FULL Local: Queue full
_ISR_INSUFF Local: ISR count insufficient
_NODE_UPDATE Local: Broker node update
_SSL Local: SSL error
_WAIT_COORD Local: Waiting for coordinator
_UNKNOWN_GROUP Local: Unknown group
_IN_PROGRESS Local: Operation in progress
_PREV_IN_PROGRESS Local: Previous operation in progress
_EXISTING_SUBSCRIPTION Local: Existing subscription
_ASSIGN_PARTITIONS Local: Assign partitions
_REVOKE_PARTITIONS Local: Revoke partitions
_CONFLICT Local: Conflicting use
_STATE Local: Erroneous state
_UNKNOWN_PROTOCOL Local: Unknown protocol
_NOT_IMPLEMENTED Local: Not implemented
_AUTHENTICATION Local: Authentication failure
_NO_OFFSET Local: No offset stored
_OUTDATED Local: Outdated
_TIMED_OUT_QUEUE Local: Timed out in queue
_UNSUPPORTED_FEATURE Local: Required feature not supported by broker
_WAIT_CACHE Local: Awaiting cache update
_INTR Local: Operation interrupted
_KEY_SERIALIZATION Local: Key serialization error
_VALUE_SERIALIZATION Local: Value serialization error
_KEY_DESERIALIZATION Local: Key deserialization error
_VALUE_DESERIALIZATION Local: Value deserialization error
_PARTIAL Local: Partial response
_READ_ONLY Local: Read-only object
_NOENT Local: No such entry
_UNDERFLOW Local: Read underflow
_INVALID_TYPE Local: Invalid type
_RETRY Local: Retry operation
_PURGE_QUEUE Local: Purged in queue
_PURGE_INFLIGHT Local: Purged in flight
_FATAL Local: Fatal error
_INCONSISTENT Local: Inconsistent state
_GAPLESS_GUARANTEE Local: Gap-less ordering would not be guaranteed if proceeding
_MAX_POLL_EXCEEDED Local: Maximum application poll interval (max.poll.interval.ms) exceeded
_UNKNOWN_BROKER Local: Unknown broker
_NOT_CONFIGURED Local: Functionality not configured
_FENCED Local: This instance has been fenced by a newer instance
_APPLICATION Local: Application generated error
UNKNOWN Unknown broker error
NO_ERROR Success
OFFSET_OUT_OF_RANGE Broker: Offset out of range
INVALID_MSG Broker: Invalid message
UNKNOWN_TOPIC_OR_PART Broker: Unknown topic or partition
INVALID_MSG_SIZE Broker: Invalid message size
LEADER_NOT_AVAILABLE Broker: Leader not available
NOT_LEADER_FOR_PARTITION Broker: Not leader for partition
REQUEST_TIMED_OUT Broker: Request timed out
BROKER_NOT_AVAILABLE Broker: Broker not available
REPLICA_NOT_AVAILABLE Broker: Replica not available
MSG_SIZE_TOO_LARGE Broker: Message size too large
STALE_CTRL_EPOCH Broker: StaleControllerEpochCode
OFFSET_METADATA_TOO_LARGE Broker: Offset metadata string too large
NETWORK_EXCEPTION Broker: Broker disconnected before response received
COORDINATOR_LOAD_IN_PROGRESS Broker: Coordinator load in progress
COORDINATOR_NOT_AVAILABLE Broker: Coordinator not available
NOT_COORDINATOR Broker: Not coordinator
TOPIC_EXCEPTION Broker: Invalid topic
RECORD_LIST_TOO_LARGE Broker: Message batch larger than configured server segment size
NOT_ENOUGH_REPLICAS Broker: Not enough in-sync replicas
NOT_ENOUGH_REPLICAS_AFTER_APPEND Broker: Message(s) written to insufficient number of in-sync replicas
INVALID_REQUIRED_ACKS Broker: Invalid required acks value
ILLEGAL_GENERATION Broker: Specified group generation id is not valid
INCONSISTENT_GROUP_PROTOCOL Broker: Inconsistent group protocol
INVALID_GROUP_ID Broker: Invalid group.id
UNKNOWN_MEMBER_ID Broker: Unknown member
INVALID_SESSION_TIMEOUT Broker: Invalid session timeout
REBALANCE_IN_PROGRESS Broker: Group rebalance in progress
INVALID_COMMIT_OFFSET_SIZE Broker: Commit offset data size is not valid
TOPIC_AUTHORIZATION_FAILED Broker: Topic authorization failed
GROUP_AUTHORIZATION_FAILED Broker: Group authorization failed
CLUSTER_AUTHORIZATION_FAILED Broker: Cluster authorization failed
INVALID_TIMESTAMP Broker: Invalid timestamp
UNSUPPORTED_SASL_MECHANISM Broker: Unsupported SASL mechanism
ILLEGAL_SASL_STATE Broker: Request not valid in current SASL state
UNSUPPORTED_VERSION Broker: API version not supported
TOPIC_ALREADY_EXISTS Broker: Topic already exists
INVALID_PARTITIONS Broker: Invalid number of partitions
INVALID_REPLICATION_FACTOR Broker: Invalid replication factor
INVALID_REPLICA_ASSIGNMENT Broker: Invalid replica assignment
INVALID_CONFIG Broker: Configuration is invalid
NOT_CONTROLLER Broker: Not controller for cluster
INVALID_REQUEST Broker: Invalid request
UNSUPPORTED_FOR_MESSAGE_FORMAT Broker: Message format on broker does not support request
POLICY_VIOLATION Broker: Policy violation
OUT_OF_ORDER_SEQUENCE_NUMBER Broker: Broker received an out of order sequence number
DUPLICATE_SEQUENCE_NUMBER Broker: Broker received a duplicate sequence number
INVALID_PRODUCER_EPOCH Broker: Producer attempted an operation with an old epoch
INVALID_TXN_STATE Broker: Producer attempted a transactional operation in an invalid state
INVALID_PRODUCER_ID_MAPPING Broker: Producer attempted to use a producer id which is not currently assigned to its transactional
INVALID_TRANSACTION_TIMEOUT Broker: Transaction timeout is larger than the maximum value allowed by the broker’s max.transaction
CONCURRENT_TRANSACTIONS Broker: Producer attempted to update a transaction while another concurrent operation on the same tr
TRANSACTION_COORDINATOR_FENCED Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current
TRANSACTIONAL_ID_AUTHORIZATION_FAILED Broker: Transactional Id authorization failed
SECURITY_DISABLED Broker: Security features are disabled
OPERATION_NOT_ATTEMPTED Broker: Operation not attempted
KAFKA_STORAGE_ERROR Broker: Disk error when trying to access log file on disk
LOG_DIR_NOT_FOUND Broker: The user-specified log directory is not found in the broker config
SASL_AUTHENTICATION_FAILED Broker: SASL Authentication failed
UNKNOWN_PRODUCER_ID Broker: Unknown Producer Id
REASSIGNMENT_IN_PROGRESS Broker: Partition reassignment is in progress
DELEGATION_TOKEN_AUTH_DISABLED Broker: Delegation Token feature is not enabled
DELEGATION_TOKEN_NOT_FOUND Broker: Delegation Token is not found on server
DELEGATION_TOKEN_OWNER_MISMATCH Broker: Specified Principal is not valid Owner/Renewer
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED Broker: Delegation Token requests are not allowed on this connection
DELEGATION_TOKEN_AUTHORIZATION_FAILED Broker: Delegation Token authorization failed
DELEGATION_TOKEN_EXPIRED Broker: Delegation Token is expired
INVALID_PRINCIPAL_TYPE Broker: Supplied principalType is not supported
NON_EMPTY_GROUP Broker: The group is not empty
GROUP_ID_NOT_FOUND Broker: The group id does not exist
FETCH_SESSION_ID_NOT_FOUND Broker: The fetch session ID was not found
INVALID_FETCH_SESSION_EPOCH Broker: The fetch session epoch is invalid
LISTENER_NOT_FOUND Broker: No matching listener
TOPIC_DELETION_DISABLED Broker: Topic deletion is disabled
FENCED_LEADER_EPOCH Broker: Leader epoch is older than broker epoch
UNKNOWN_LEADER_EPOCH Broker: Leader epoch is newer than broker epoch
UNSUPPORTED_COMPRESSION_TYPE Broker: Unsupported compression type
STALE_BROKER_EPOCH Broker: Broker epoch has changed
OFFSET_NOT_AVAILABLE Broker: Leader high watermark is not caught up
MEMBER_ID_REQUIRED Broker: Group member needs a valid member ID
PREFERRED_LEADER_NOT_AVAILABLE Broker: Preferred leader was not available
GROUP_MAX_SIZE_REACHED Broker: Consumer group has reached maximum size
FENCED_INSTANCE_ID Broker: Static consumer fenced by other consumer with same group.instance.id
code()

Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.

Returns:error/event code
Return type:int
fatal()
Returns:True if this a fatal error, else False.
Return type:bool
name()

Returns the enum name for error/event.

Returns:error/event enum name string
Return type:str
retriable()
Returns:True if the operation that failed may be retried, else False.
Return type:bool
str()

Returns the human-readable error/event string.

Returns:error/event message string
Return type:str
txn_requires_abort()
Returns:True if the error is an abortable transaction error in which case application must abort the current transaction with abort_transaction() and start a new transaction with begin_transaction() if it wishes to proceed with transactional operations. This will only return true for errors from the transactional producer API.
Return type:bool

KafkaException

class confluent_kafka.KafkaException

Kafka exception that wraps the KafkaError class.

Use exception.args[0] to extract the KafkaError object

Offset

Logical offset constants:

  • OFFSET_BEGINNING - Beginning of partition (oldest offset)
  • OFFSET_END - End of partition (next offset)
  • OFFSET_STORED - Use stored/committed offset
  • OFFSET_INVALID - Invalid/Default offset

ThrottleEvent

class confluent_kafka.ThrottleEvent(broker_name, broker_id, throttle_time)

ThrottleEvent contains details about a throttled request. Set up a throttle callback by setting the throttle_cb configuration property to a callable that takes a ThrottleEvent object as its only argument. The callback will be triggered from poll(), consume() or flush() when a request has been throttled by the broker.

This class is typically not user instantiated.

Variables:
  • broker_name (str) – The hostname of the broker which throttled the request
  • broker_id (int) – The broker id
  • throttle_time (float) – The amount of time (in seconds) the broker throttled (delayed) the request

Configuration

Configuration of producer and consumer instances is performed by providing a dict of configuration properties to the instance constructor, e.g.:

conf = {'bootstrap.servers': 'mybroker.com',
        'group.id': 'mygroup', 'session.timeout.ms': 6000,
        'on_commit': my_commit_callback,
        'auto.offset.reset': 'earliest'}
consumer = confluent_kafka.Consumer(conf)

The supported configuration values are dictated by the underlying librdkafka C library. For the full range of configuration properties please consult librdkafka’s documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

The Python bindings also provide some additional configuration properties:

  • default.topic.config: value is a dict of client topic-level configuration properties that are applied to all used topics for the instance. DEPRECATED: topic configuration should now be specified in the global top-level configuration.

  • error_cb(kafka.KafkaError): Callback for generic/global error events, these errors are typically to be considered informational since the client will automatically try to recover. This callback is served upon calling client.poll() or producer.flush().

  • throttle_cb(confluent_kafka.ThrottleEvent): Callback for throttled request reporting. This callback is served upon calling client.poll() or producer.flush().

  • stats_cb(json_str): Callback for statistics data. This callback is triggered by poll() or flush every statistics.interval.ms (needs to be configured separately). Function argument json_str is a str instance of a JSON document containing statistics data. This callback is served upon calling client.poll() or producer.flush(). See https://github.com/edenhill/librdkafka/wiki/Statistics” for more information.

  • on_delivery(kafka.KafkaError, kafka.Message) (Producer): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passing callback=callable (or on_delivery=callable) to the confluent_kafka.Producer.produce() function. Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set. This callback is served upon calling producer.poll() or producer.flush().

  • on_commit(kafka.KafkaError, list(kafka.TopicPartition)) (Consumer): Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon calling consumer.poll(). Is not triggered for synchronous commits. Callback arguments: KafkaError is the commit error, or None on success. list(TopicPartition) is the list of partitions with their committed offsets or per-partition errors.

  • logger=logging.Handler kwarg: forward logs from the Kafka client to the provided logging.Handler instance. To avoid spontaneous calls from non-Python threads the log messages will only be forwarded when client.poll() or producer.flush() are called. For example:

    mylogger = logging.getLogger()
    mylogger.addHandler(logging.StreamHandler())
    producer = confluent_kafka.Producer({'bootstrap.servers': 'mybroker.com'}, logger=mylogger)