1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ 17 #define TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ 18 19 #include <deque> 20 21 #include "tensorflow/core/framework/op_kernel.h" 22 #include "tensorflow/core/framework/queue_interface.h" 23 #include "tensorflow/core/framework/resource_op_kernel.h" 24 #include "tensorflow/core/framework/tensor.h" 25 #include "tensorflow/core/framework/types.h" 26 #include "tensorflow/core/kernels/queue_base.h" 27 #include "tensorflow/core/lib/core/errors.h" 28 #include "tensorflow/core/platform/macros.h" 29 #include "tensorflow/core/platform/types.h" 30 31 namespace tensorflow { 32 33 // Defines a QueueOp, an abstract class for Queue construction ops. 34 class QueueOp : public ResourceOpKernel<QueueInterface> { 35 public: 36 QueueOp(OpKernelConstruction* context); 37 38 void Compute(OpKernelContext* context) override; 39 40 protected: 41 // Variables accessible by subclasses 42 int32 capacity_; 43 DataTypeVector component_types_; 44 45 private: 46 Status VerifyResource(QueueInterface* queue) override; 47 }; 48 49 class TypedQueueOp : public QueueOp { 50 public: 51 using QueueOp::QueueOp; 52 53 protected: 54 template <typename TypedQueue> CreateTypedQueue(TypedQueue * queue,QueueInterface ** ret)55 Status CreateTypedQueue(TypedQueue* queue, QueueInterface** ret) { 56 if (queue == nullptr) { 57 return errors::ResourceExhausted("Failed to allocate queue."); 58 } 59 *ret = queue; 60 return queue->Initialize(); 61 } 62 }; 63 64 // Queue manipulator kernels 65 66 class QueueOpKernel : public AsyncOpKernel { 67 public: 68 explicit QueueOpKernel(OpKernelConstruction* context); 69 70 void ComputeAsync(OpKernelContext* ctx, DoneCallback callback) final; 71 72 protected: 73 virtual void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 74 DoneCallback callback) = 0; 75 }; 76 77 class QueueAccessOpKernel : public QueueOpKernel { 78 public: 79 explicit QueueAccessOpKernel(OpKernelConstruction* context); 80 81 protected: 82 int64 timeout_; 83 }; 84 85 // Defines an EnqueueOp, the execution of which enqueues a tuple of 86 // tensors in the given Queue. 87 // 88 // The op has 1 + k inputs, where k is the number of components in the 89 // tuples stored in the given Queue: 90 // - Input 0: queue handle. 91 // - Input 1: 0th element of the tuple. 92 // - ... 93 // - Input (1+k): kth element of the tuple. 94 class EnqueueOp : public QueueAccessOpKernel { 95 public: 96 explicit EnqueueOp(OpKernelConstruction* context); 97 98 protected: 99 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 100 DoneCallback callback) override; 101 102 private: 103 TF_DISALLOW_COPY_AND_ASSIGN(EnqueueOp); 104 }; 105 106 // Defines an EnqueueManyOp, the execution of which slices each 107 // component of a tuple of tensors along the 0th dimension, and 108 // enqueues tuples of slices in the given Queue. 109 // 110 // The op has 1 + k inputs, where k is the number of components in the 111 // tuples stored in the given Queue: 112 // - Input 0: queue handle. 113 // - Input 1: 0th element of the tuple. 114 // - ... 115 // - Input (1+k): kth element of the tuple. 116 // 117 // N.B. All tuple components must have the same size in the 0th 118 // dimension. 119 class EnqueueManyOp : public QueueAccessOpKernel { 120 public: 121 explicit EnqueueManyOp(OpKernelConstruction* context); 122 123 protected: 124 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 125 DoneCallback callback) override; 126 127 ~EnqueueManyOp() override; 128 129 private: 130 TF_DISALLOW_COPY_AND_ASSIGN(EnqueueManyOp); 131 }; 132 133 // Defines a DequeueOp, the execution of which dequeues a tuple of 134 // tensors from the given Queue. 135 // 136 // The op has one input, which is the handle of the appropriate 137 // Queue. The op has k outputs, where k is the number of components in 138 // the tuples stored in the given Queue, and output i is the ith 139 // component of the dequeued tuple. 140 class DequeueOp : public QueueAccessOpKernel { 141 public: 142 explicit DequeueOp(OpKernelConstruction* context); 143 144 protected: 145 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 146 DoneCallback callback) override; 147 148 ~DequeueOp() override; 149 150 private: 151 TF_DISALLOW_COPY_AND_ASSIGN(DequeueOp); 152 }; 153 154 // Defines a DequeueManyOp, the execution of which concatenates the 155 // requested number of elements from the given Queue along the 0th 156 // dimension, and emits the result as a single tuple of tensors. 157 // 158 // The op has two inputs: 159 // - Input 0: the handle to a queue. 160 // - Input 1: the number of elements to dequeue. 161 // 162 // The op has k outputs, where k is the number of components in the 163 // tuples stored in the given Queue, and output i is the ith component 164 // of the dequeued tuple. 165 class DequeueManyOp : public QueueAccessOpKernel { 166 public: 167 explicit DequeueManyOp(OpKernelConstruction* context); 168 169 protected: 170 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 171 DoneCallback callback) override; 172 173 ~DequeueManyOp() override; 174 175 private: 176 TF_DISALLOW_COPY_AND_ASSIGN(DequeueManyOp); 177 }; 178 179 // Defines a DequeueUpToOp, the execution of which concatenates the 180 // requested number of elements from the given Queue along the 0th 181 // dimension, and emits the result as a single tuple of tensors. 182 // 183 // The difference between this op and DequeueMany is the handling when 184 // the Queue is closed. While the DequeueMany op will return if there 185 // an error when there are less than num_elements elements left in the 186 // closed queue, this op will return between 1 and 187 // min(num_elements, elements_remaining_in_queue), and will not block. 188 // If there are no elements left, then the standard DequeueMany error 189 // is returned. 190 // 191 // This op only works if the underlying Queue implementation accepts 192 // the allow_small_batch = true parameter to TryDequeueMany. 193 // If it does not, an errors::Unimplemented exception is returned. 194 // 195 // The op has two inputs: 196 // - Input 0: the handle to a queue. 197 // - Input 1: the number of elements to dequeue. 198 // 199 // The op has k outputs, where k is the number of components in the 200 // tuples stored in the given Queue, and output i is the ith component 201 // of the dequeued tuple. 202 // 203 // The op has one attribute: allow_small_batch. If the Queue supports 204 // it, setting this to true causes the queue to return smaller 205 // (possibly zero length) batches when it is closed, up to however 206 // many elements are available when the op executes. In this case, 207 // the Queue does not block when closed. 208 class DequeueUpToOp : public QueueAccessOpKernel { 209 public: 210 explicit DequeueUpToOp(OpKernelConstruction* context); 211 212 protected: 213 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 214 DoneCallback callback) override; 215 216 ~DequeueUpToOp() override; 217 218 private: 219 TF_DISALLOW_COPY_AND_ASSIGN(DequeueUpToOp); 220 }; 221 222 // Defines a QueueCloseOp, which closes the given Queue. Closing a 223 // Queue signals that no more elements will be enqueued in it. 224 // 225 // The op has one input, which is the handle of the appropriate Queue. 226 class QueueCloseOp : public QueueOpKernel { 227 public: 228 explicit QueueCloseOp(OpKernelConstruction* context); 229 230 protected: 231 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 232 DoneCallback callback) override; 233 234 private: 235 bool cancel_pending_enqueues_; 236 TF_DISALLOW_COPY_AND_ASSIGN(QueueCloseOp); 237 }; 238 239 // Defines a QueueSizeOp, which computes the number of elements in the 240 // given Queue, and emits it as an output tensor. 241 // 242 // The op has one input, which is the handle of the appropriate Queue; 243 // and one output, which is a single-element tensor containing the current 244 // size of that Queue. 245 class QueueSizeOp : public QueueOpKernel { 246 public: 247 explicit QueueSizeOp(OpKernelConstruction* context); 248 249 protected: 250 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 251 DoneCallback callback) override; 252 253 private: 254 TF_DISALLOW_COPY_AND_ASSIGN(QueueSizeOp); 255 }; 256 257 class QueueIsClosedOp : public QueueOpKernel { 258 public: 259 explicit QueueIsClosedOp(OpKernelConstruction* context); 260 261 protected: 262 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, 263 DoneCallback callback) override; 264 265 private: 266 TF_DISALLOW_COPY_AND_ASSIGN(QueueIsClosedOp); 267 }; 268 269 } // namespace tensorflow 270 271 #endif // TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ 272