• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Kafka Dataset."""
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20from tensorflow.contrib.kafka.python.ops import gen_dataset_ops
21from tensorflow.contrib.kafka.python.ops import kafka_op_loader  # pylint: disable=unused-import
22from tensorflow.python.data.ops import dataset_ops
23from tensorflow.python.data.util import structure
24from tensorflow.python.framework import dtypes
25from tensorflow.python.framework import ops
26from tensorflow.python.util import deprecation
27
28
29class KafkaDataset(dataset_ops.DatasetSource):
30  """A Kafka Dataset that consumes the message.
31  """
32
33  @deprecation.deprecated(
34      None,
35      "tf.contrib.kafka will be removed in 2.0, the support for Apache Kafka "
36      "will continue to be provided through the tensorflow/io GitHub project.")
37  def __init__(self,
38               topics,
39               servers="localhost",
40               group="",
41               eof=False,
42               timeout=1000):
43    """Create a KafkaReader.
44
45    Args:
46      topics: A `tf.string` tensor containing one or more subscriptions,
47              in the format of [topic:partition:offset:length],
48              by default length is -1 for unlimited.
49      servers: A list of bootstrap servers.
50      group: The consumer group id.
51      eof: If True, the kafka reader will stop on EOF.
52      timeout: The timeout value for the Kafka Consumer to wait
53               (in millisecond).
54    """
55    self._topics = ops.convert_to_tensor(
56        topics, dtype=dtypes.string, name="topics")
57    self._servers = ops.convert_to_tensor(
58        servers, dtype=dtypes.string, name="servers")
59    self._group = ops.convert_to_tensor(
60        group, dtype=dtypes.string, name="group")
61    self._eof = ops.convert_to_tensor(eof, dtype=dtypes.bool, name="eof")
62    self._timeout = ops.convert_to_tensor(
63        timeout, dtype=dtypes.int64, name="timeout")
64
65    super(KafkaDataset, self).__init__(self._as_variant_tensor())
66
67  def _as_variant_tensor(self):
68    return gen_dataset_ops.kafka_dataset(self._topics, self._servers,
69                                         self._group, self._eof, self._timeout)
70
71  @property
72  def _element_structure(self):
73    return structure.TensorStructure(dtypes.string, [])
74