KafkaDataset

Description

  1. KafkaDataset supports configuring multiple partitions and consumes kafka messages in time sequence.

  2. KafkaDataset supports saving/restoring state information.

User API

class KafkaDataset(dataset_ops.Dataset):
    def __init__(
        self,
        topics,
        servers="localhost",
        group="",
        eof=False,
        timeout=1000,
        config_global=None,
        config_topic=None,
        message_key=False,
)
  • topics: A tf.string tensor containing one or more subscriptions, in the format of [topic:partition:offset:length], by default length is -1 for unlimited.

  • servers: A list of bootstrap servers.

  • group: The consumer group id.

  • eof: If True, the kafka reader will stop on EOF.

  • timeout: The timeout value for the Kafka Consumer to wait (in millisecond).

  • config_global: A tf.string tensor containing global configuration properties in [Key=Value] format, eg. [“enable.auto.commit=false”, “heartbeat.interval.ms=2000”], please refer to ‘Global configuration properties’ in librdkafka doc.

  • config_topic: A tf.string tensor containing topic configuration properties in [Key=Value] format, eg. [“auto.offset.reset=earliest”], please refer to ‘Topic configuration properties’ in librdkafka doc.

  • message_key: If True, the kafka will output both message value and key.

Examples

import tensorflow as tf
from tensorflow.python.data.ops import iterator_ops

kafka_dataset = tf.data.KafkaDataset(topics=["test_1_partition:0:0:-1"],
                                     group="test_group1",
                                     timeout=100,
                                     eof=False)
iterator = iterator_ops.Iterator.from_structure(batch_dataset.output_types)
init_op = iterator.make_initializer(kafka_dataset)
get_next = iterator.get_next()
saveable_obj = tf.data.experimental.make_saveable_from_iterator(iterator)
tf.add_to_collection(tf.GraphKeys.SAVEABLE_OBJECTS, saveable_obj)
saver=tf.train.Saver()
with tf.Session() as sess:
  sess.run(init_op)
  for i in range(100):
    print("Data", sess.run(get_next))
  saver.save(sess, "ckpt/1")

KafkaGroupIODataset

Description

  1. KafkaGroupIODataset supports configuring multiple partitions and consumes kafka messages in time sequence.

  2. KafkaGroupIODataset supports load balancing within the consumer group.

User API

class KafkaGroupIODataset(dataset_ops.Dataset):
    def __init__(
        self,
        topics,
        group_id,
        servers,
        stream_timeout=0,
        message_poll_timeout=10000,
        configuration=None,
        internal=True,
)
  • topics: A tf.string tensor containing topic names in [topic] format. For example: [“topic1”, “topic2”].

  • group_id: The id of the consumer group. For example: cgstream.

  • servers: An optional list of bootstrap servers. For example: localhost:9092.

  • stream_timeout: An optional timeout duration (in milliseconds) to block until the new messages from kafka are fetched. By default it is set to 0 milliseconds and doesn’t block for new messages. To block indefinitely, set it to -1.

  • message_poll_timeout: An optional timeout duration (in milliseconds) after which the kafka consumer throws a timeout error while fetching a single message. This value also represents the intervals at which the kafka topic(s) are polled for new messages while using the stream_timeout.

  • configuration: An optional tf.string tensor containing configurations in [Key=Value] format.

    • Global configuration: please refer to ‘Global configuration properties’ in librdkafka doc. Examples include [“enable.auto.commit=false”, “heartbeat.interval.ms=2000”]

    • Topic configuration: please refer to ‘Topic configuration properties’ in librdkafka doc. Note all topic configurations should be prefixed with conf.topic.. Examples include [“conf.topic.auto.offset.reset=earliest”]

    • Referencehttps://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

  • internal: Whether the dataset is being created from within the named scope. Default: True.

Examples