1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Kafka Dataset.""" 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20from tensorflow.contrib.kafka.python.ops import gen_dataset_ops 21from tensorflow.contrib.kafka.python.ops import kafka_op_loader # pylint: disable=unused-import 22from tensorflow.python.data.ops import dataset_ops 23from tensorflow.python.data.util import structure 24from tensorflow.python.framework import dtypes 25from tensorflow.python.framework import ops 26from tensorflow.python.util import deprecation 27 28 29class KafkaDataset(dataset_ops.DatasetSource): 30 """A Kafka Dataset that consumes the message. 31 """ 32 33 @deprecation.deprecated( 34 None, 35 "tf.contrib.kafka will be removed in 2.0, the support for Apache Kafka " 36 "will continue to be provided through the tensorflow/io GitHub project.") 37 def __init__(self, 38 topics, 39 servers="localhost", 40 group="", 41 eof=False, 42 timeout=1000): 43 """Create a KafkaReader. 44 45 Args: 46 topics: A `tf.string` tensor containing one or more subscriptions, 47 in the format of [topic:partition:offset:length], 48 by default length is -1 for unlimited. 49 servers: A list of bootstrap servers. 50 group: The consumer group id. 51 eof: If True, the kafka reader will stop on EOF. 52 timeout: The timeout value for the Kafka Consumer to wait 53 (in millisecond). 54 """ 55 self._topics = ops.convert_to_tensor( 56 topics, dtype=dtypes.string, name="topics") 57 self._servers = ops.convert_to_tensor( 58 servers, dtype=dtypes.string, name="servers") 59 self._group = ops.convert_to_tensor( 60 group, dtype=dtypes.string, name="group") 61 self._eof = ops.convert_to_tensor(eof, dtype=dtypes.bool, name="eof") 62 self._timeout = ops.convert_to_tensor( 63 timeout, dtype=dtypes.int64, name="timeout") 64 65 super(KafkaDataset, self).__init__(self._as_variant_tensor()) 66 67 def _as_variant_tensor(self): 68 return gen_dataset_ops.kafka_dataset(self._topics, self._servers, 69 self._group, self._eof, self._timeout) 70 71 @property 72 def _element_structure(self): 73 return structure.TensorStructure(dtypes.string, []) 74