KafkaDataset
功能
kafka dataset支持配置多partition,并按时序消费kafka 消息
kafka dataset支持保存/恢复状态信息
接口介绍
API说明
class KafkaDataset(dataset_ops.Dataset):
def __init__(
self,
topics,
servers="localhost",
group="",
eof=False,
timeout=1000,
config_global=None,
config_topic=None,
message_key=False,
)
参数说明
topics: A tf.string tensor containing one or more subscriptions, in the format of [topic:partition:offset:length], by default length is -1 for unlimited.
servers: A list of bootstrap servers.
group: The consumer group id.
eof: If True, the kafka reader will stop on EOF.
timeout: The timeout value for the Kafka Consumer to wait (in millisecond).
config_global: A tf.string tensor containing global configuration properties in [Key=Value] format, eg. ["enable.auto.commit=false", "heartbeat.interval.ms=2000"], please refer to 'Global configuration properties' in librdkafka doc.
config_topic: A tf.string tensor containing topic configuration properties in [Key=Value] format, eg. ["auto.offset.reset=earliest"], please refer to 'Topic configuration properties' in librdkafka doc.
message_key: If True, the kafka will output both message value and key.
使用示例
import tensorflow as tf
from tensorflow.python.data.ops import iterator_ops
kafka_dataset = tf.data.KafkaDataset(topics=["test_1_partition:0:0:-1"],
group="test_group1",
timeout=100,
eof=False)
iterator = iterator_ops.Iterator.from_structure(batch_dataset.output_types)
init_op = iterator.make_initializer(kafka_dataset)
get_next = iterator.get_next()
saveable_obj = tf.data.experimental.make_saveable_from_iterator(iterator)
tf.add_to_collection(tf.GraphKeys.SAVEABLE_OBJECTS, saveable_obj)
saver=tf.train.Saver()
with tf.Session() as sess:
sess.run(init_op)
for i in range(100):
print("Data", sess.run(get_next))
saver.save(sess, "ckpt/1")
KafkaGroupIODataset
功能
KafkaGroupIODataset 支持配置多partition,并按时序消费kafka消息
KafkaGroupIODataset 支持消费者组内负载均衡
接口介绍
API说明
class KafkaGroupIODataset(dataset_ops.Dataset):
def __init__(
self,
topics,
group_id,
servers,
stream_timeout=0,
message_poll_timeout=10000,
configuration=None,
internal=True,
)
参数说明
topics: A
tf.string
tensor containing topic names in [topic] format. For example: ["topic1", "topic2"].group_id: The id of the consumer group. For example: cgstream.
servers: An optional list of bootstrap servers. For example:
localhost:9092
.stream_timeout: An optional timeout duration (in milliseconds) to block until the new messages from kafka are fetched. By default it is set to 0 milliseconds and doesn't block for new messages. To block indefinitely, set it to -1.
message_poll_timeout: An optional timeout duration (in milliseconds) after which the kafka consumer throws a timeout error while fetching a single message. This value also represents the intervals at which the kafka topic(s) are polled for new messages while using the
stream_timeout
.configuration: An optional
tf.string
tensor containing configurations in [Key=Value] format.Global configuration: please refer to 'Global configuration properties' in librdkafka doc. Examples include ["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be prefixed with
conf.topic.
. Examples include ["conf.topic.auto.offset.reset=earliest"]Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
internal: Whether the dataset is being created from within the named scope. Default: True.
使用示例
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.data.ops import iterator_ops
from tensorflow.python.framework import dtypes
from tensorflow.python.ops import array_ops
from tensorflow.python.data.ops import readers
import tensorflow as tf
def make_initializable_iterator(ds):
r"""Wrapper of make_initializable_iterator.
"""
if hasattr(dataset_ops, 'make_initializable_iterator'):
return dataset_ops.make_initializable_iterator(ds)
return ds.make_initializable_iterator()
dataset = readers.KafkaGroupIODataset(
topics=["topic1", "topic2"],
group_id="cgstream",
servers="localhost:9092",
stream_timeout=3000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
"enable.auto.commit=true",
],
)
# create the iterators from the dataset
train_iterator = make_initializable_iterator(dataset)
handle = array_ops.placeholder(dtypes.string, shape=[])
iter = iterator_ops.Iterator.from_string_handle(
handle, train_iterator.output_types, train_iterator.output_shapes,
train_iterator.output_classes)
next_elements = iter.get_next()
with tf.Session() as sess:
train_handle = sess.run(train_iterator.string_handle())
sess.run([train_iterator.initializer])
for _ in range(100):
x = sess.run(next_elements, feed_dict={handle: train_handle})
print(x)