KafkaDataset
Description
KafkaDataset supports configuring multiple partitions and consumes kafka messages in time sequence.
KafkaDataset supports saving/restoring state information.
User API
class KafkaDataset(dataset_ops.Dataset):
def __init__(
self,
topics,
servers="localhost",
group="",
eof=False,
timeout=1000,
config_global=None,
config_topic=None,
message_key=False,
)
topics
: A tf.string tensor containing one or more subscriptions, in the format of [topic:partition:offset:length], by default length is -1 for unlimited.servers
: A list of bootstrap servers.group
: The consumer group id.eof
: If True, the kafka reader will stop on EOF.timeout
: The timeout value for the Kafka Consumer to wait (in millisecond).config_global
: A tf.string tensor containing global configuration properties in [Key=Value] format, eg. [“enable.auto.commit=false”, “heartbeat.interval.ms=2000”], please refer to ‘Global configuration properties’ in librdkafka doc.config_topic
: A tf.string tensor containing topic configuration properties in [Key=Value] format, eg. [“auto.offset.reset=earliest”], please refer to ‘Topic configuration properties’ in librdkafka doc.message_key
: If True, the kafka will output both message value and key.
Examples
import tensorflow as tf
from tensorflow.python.data.ops import iterator_ops
kafka_dataset = tf.data.KafkaDataset(topics=["test_1_partition:0:0:-1"],
group="test_group1",
timeout=100,
eof=False)
iterator = iterator_ops.Iterator.from_structure(batch_dataset.output_types)
init_op = iterator.make_initializer(kafka_dataset)
get_next = iterator.get_next()
saveable_obj = tf.data.experimental.make_saveable_from_iterator(iterator)
tf.add_to_collection(tf.GraphKeys.SAVEABLE_OBJECTS, saveable_obj)
saver=tf.train.Saver()
with tf.Session() as sess:
sess.run(init_op)
for i in range(100):
print("Data", sess.run(get_next))
saver.save(sess, "ckpt/1")
KafkaGroupIODataset
Description
KafkaGroupIODataset supports configuring multiple partitions and consumes kafka messages in time sequence.
KafkaGroupIODataset supports load balancing within the consumer group.
User API
class KafkaGroupIODataset(dataset_ops.Dataset):
def __init__(
self,
topics,
group_id,
servers,
stream_timeout=0,
message_poll_timeout=10000,
configuration=None,
internal=True,
)
topics
: Atf.string
tensor containing topic names in [topic] format. For example: [“topic1”, “topic2”].group_id
: The id of the consumer group. For example: cgstream.servers
: An optional list of bootstrap servers. For example:localhost:9092
.stream_timeout
: An optional timeout duration (in milliseconds) to block until the new messages from kafka are fetched. By default it is set to 0 milliseconds and doesn’t block for new messages. To block indefinitely, set it to -1.message_poll_timeout
: An optional timeout duration (in milliseconds) after which the kafka consumer throws a timeout error while fetching a single message. This value also represents the intervals at which the kafka topic(s) are polled for new messages while using thestream_timeout
.configuration
: An optionaltf.string
tensor containing configurations in [Key=Value] format.Global configuration
: please refer to ‘Global configuration properties’ in librdkafka doc. Examples include [“enable.auto.commit=false”, “heartbeat.interval.ms=2000”]Topic configuration
: please refer to ‘Topic configuration properties’ in librdkafka doc. Note all topic configurations should be prefixed withconf.topic.
. Examples include [“conf.topic.auto.offset.reset=earliest”]Reference
: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
internal
: Whether the dataset is being created from within the named scope. Default: True.
Examples