KafkaDataset

功能

  1. kafka dataset支持配置多partition,并按时序消费kafka 消息

  2. kafka dataset支持保存/恢复状态信息

接口介绍

API说明

class KafkaDataset(dataset_ops.Dataset):
    def __init__(
        self,
        topics,
        servers="localhost",
        group="",
        eof=False,
        timeout=1000,
        config_global=None,
        config_topic=None,
        message_key=False,
)

参数说明

  • topics: A tf.string tensor containing one or more subscriptions, in the format of [topic:partition:offset:length], by default length is -1 for unlimited.

  • servers: A list of bootstrap servers.

  • group: The consumer group id.

  • eof: If True, the kafka reader will stop on EOF.

  • timeout: The timeout value for the Kafka Consumer to wait (in millisecond).

  • config_global: A tf.string tensor containing global configuration properties in [Key=Value] format, eg. ["enable.auto.commit=false", "heartbeat.interval.ms=2000"], please refer to 'Global configuration properties' in librdkafka doc.

  • config_topic: A tf.string tensor containing topic configuration properties in [Key=Value] format, eg. ["auto.offset.reset=earliest"], please refer to 'Topic configuration properties' in librdkafka doc.

  • message_key: If True, the kafka will output both message value and key.

使用示例

import tensorflow as tf
from tensorflow.python.data.ops import iterator_ops
 
kafka_dataset = tf.data.KafkaDataset(topics=["test_1_partition:0:0:-1"],
                                     group="test_group1",
                                     timeout=100,
                                     eof=False)
iterator = iterator_ops.Iterator.from_structure(batch_dataset.output_types)
init_op = iterator.make_initializer(kafka_dataset)
get_next = iterator.get_next()
saveable_obj = tf.data.experimental.make_saveable_from_iterator(iterator)
tf.add_to_collection(tf.GraphKeys.SAVEABLE_OBJECTS, saveable_obj)
saver=tf.train.Saver()
with tf.Session() as sess:
  sess.run(init_op)
  for i in range(100):
    print("Data", sess.run(get_next))
  saver.save(sess, "ckpt/1")

KafkaGroupIODataset

功能

  1. KafkaGroupIODataset 支持配置多partition,并按时序消费kafka消息

  2. KafkaGroupIODataset 支持消费者组内负载均衡

接口介绍

API说明

class KafkaGroupIODataset(dataset_ops.Dataset):
    def __init__(
        self,
        topics,
        group_id,
        servers,
        stream_timeout=0,
        message_poll_timeout=10000,
        configuration=None,
        internal=True,
)

参数说明

  • topics: A tf.string tensor containing topic names in [topic] format. For example: ["topic1", "topic2"].

  • group_id: The id of the consumer group. For example: cgstream.

  • servers: An optional list of bootstrap servers. For example: localhost:9092.

  • stream_timeout: An optional timeout duration (in milliseconds) to block until the new messages from kafka are fetched. By default it is set to 0 milliseconds and doesn't block for new messages. To block indefinitely, set it to -1.

  • message_poll_timeout: An optional timeout duration (in milliseconds) after which the kafka consumer throws a timeout error while fetching a single message. This value also represents the intervals at which the kafka topic(s) are polled for new messages while using the stream_timeout.

  • configuration: An optional tf.string tensor containing configurations in [Key=Value] format.

    • Global configuration: please refer to 'Global configuration properties' in librdkafka doc. Examples include ["enable.auto.commit=false", "heartbeat.interval.ms=2000"]

    • Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be prefixed with conf.topic.. Examples include ["conf.topic.auto.offset.reset=earliest"]

    • Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

  • internal: Whether the dataset is being created from within the named scope. Default: True.

使用示例

from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.data.ops import iterator_ops
from tensorflow.python.framework import dtypes
from tensorflow.python.ops import array_ops
from tensorflow.python.data.ops import readers
import tensorflow as tf


def make_initializable_iterator(ds):
  r"""Wrapper of make_initializable_iterator.
    """
  if hasattr(dataset_ops, 'make_initializable_iterator'):
    return dataset_ops.make_initializable_iterator(ds)
  return ds.make_initializable_iterator()
 
dataset = readers.KafkaGroupIODataset(
        topics=["topic1", "topic2"],
        group_id="cgstream",
        servers="localhost:9092",
        stream_timeout=3000,
        configuration=[
            "session.timeout.ms=7000",
            "max.poll.interval.ms=8000",
            "auto.offset.reset=earliest",
            "enable.auto.commit=true",
        ],
    )

# create the iterators from the dataset
train_iterator = make_initializable_iterator(dataset)
handle = array_ops.placeholder(dtypes.string, shape=[])

iter = iterator_ops.Iterator.from_string_handle(
    handle, train_iterator.output_types, train_iterator.output_shapes,
    train_iterator.output_classes)
next_elements = iter.get_next()

with tf.Session() as sess:
  train_handle = sess.run(train_iterator.string_handle())
  sess.run([train_iterator.initializer])
  for _ in range(100):
    x = sess.run(next_elements, feed_dict={handle: train_handle})
    print(x)