1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/kernels/queue_op.h"
17 #include "tensorflow/core/framework/op_kernel.h"
18 #include "tensorflow/core/framework/queue_interface.h"
19 #include "tensorflow/core/framework/tensor.h"
20 #include "tensorflow/core/framework/tensor_shape.h"
21 #include "tensorflow/core/framework/types.h"
22 #include "tensorflow/core/lib/core/errors.h"
23 #include "tensorflow/core/platform/macros.h"
24 #include "tensorflow/core/platform/types.h"
25
26 namespace tensorflow {
27
QueueOp(OpKernelConstruction * context)28 QueueOp::QueueOp(OpKernelConstruction* context) : ResourceOpKernel(context) {
29 OP_REQUIRES_OK(context, context->GetAttr("capacity", &capacity_));
30 if (capacity_ < 0) {
31 capacity_ = QueueBase::kUnbounded;
32 }
33 OP_REQUIRES_OK(context,
34 context->GetAttr("component_types", &component_types_));
35 }
36
Compute(OpKernelContext * context)37 void QueueOp::Compute(OpKernelContext* context) {
38 ResourceOpKernel<QueueInterface>::Compute(context);
39 mutex_lock l(mu_);
40 if (resource_ && context->track_allocations()) {
41 context->record_persistent_memory_allocation(resource_->MemoryUsed());
42 }
43 }
44
VerifyResource(QueueInterface * queue)45 Status QueueOp::VerifyResource(QueueInterface* queue) {
46 return queue->MatchesNodeDef(def());
47 }
48
49
QueueOpKernel(OpKernelConstruction * context)50 QueueOpKernel::QueueOpKernel(OpKernelConstruction* context)
51 : AsyncOpKernel(context) {}
52
ComputeAsync(OpKernelContext * ctx,DoneCallback callback)53 void QueueOpKernel::ComputeAsync(OpKernelContext* ctx, DoneCallback callback) {
54 QueueInterface* queue;
55 if (ctx->input_dtype(0) == DT_RESOURCE) {
56 OP_REQUIRES_OK_ASYNC(
57 ctx, LookupResource(ctx, HandleFromInput(ctx, 0), &queue), callback);
58 } else {
59 OP_REQUIRES_OK_ASYNC(ctx, GetResourceFromContext(ctx, "handle", &queue),
60 callback);
61 }
62 ComputeAsync(ctx, queue, [callback, queue]() {
63 queue->Unref();
64 callback();
65 });
66 }
67
QueueAccessOpKernel(OpKernelConstruction * context)68 QueueAccessOpKernel::QueueAccessOpKernel(OpKernelConstruction* context)
69 : QueueOpKernel(context) {
70 OP_REQUIRES_OK(context, context->GetAttr("timeout_ms", &timeout_));
71 // TODO(keveman): Enable timeout.
72 OP_REQUIRES(context, timeout_ == -1,
73 errors::InvalidArgument("Timeout not supported yet."));
74 }
75
76 // Defines an EnqueueOp, the execution of which enqueues a tuple of
77 // tensors in the given Queue.
78 //
79 // The op has 1 + k inputs, where k is the number of components in the
80 // tuples stored in the given Queue:
81 // - Input 0: queue handle.
82 // - Input 1: 0th element of the tuple.
83 // - ...
84 // - Input (1+k): kth element of the tuple.
EnqueueOp(OpKernelConstruction * context)85 EnqueueOp::EnqueueOp(OpKernelConstruction* context)
86 : QueueAccessOpKernel(context) {}
87
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)88 void EnqueueOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
89 DoneCallback callback) {
90 DataTypeVector expected_inputs;
91 if (ctx->input_dtype(0) == DT_RESOURCE) {
92 expected_inputs.push_back(DT_RESOURCE);
93 } else {
94 expected_inputs.push_back(DT_STRING_REF);
95 }
96 for (DataType dt : queue->component_dtypes()) {
97 expected_inputs.push_back(dt);
98 }
99 OP_REQUIRES_OK_ASYNC(ctx, ctx->MatchSignature(expected_inputs, {}), callback);
100
101 QueueInterface::Tuple tuple;
102 OpInputList components;
103 OP_REQUIRES_OK_ASYNC(ctx, ctx->input_list("components", &components),
104 callback);
105 for (const Tensor& Tcomponent : components) {
106 tuple.push_back(Tcomponent);
107 }
108
109 OP_REQUIRES_OK_ASYNC(ctx, queue->ValidateTuple(tuple), callback);
110 queue->TryEnqueue(tuple, ctx, callback);
111 }
112
113 // Defines an EnqueueManyOp, the execution of which slices each
114 // component of a tuple of tensors along the 0th dimension, and
115 // enqueues tuples of slices in the given Queue.
116 //
117 // The op has 1 + k inputs, where k is the number of components in the
118 // tuples stored in the given Queue:
119 // - Input 0: queue handle.
120 // - Input 1: 0th element of the tuple.
121 // - ...
122 // - Input (1+k): kth element of the tuple.
123 //
124 // N.B. All tuple components must have the same size in the 0th
125 // dimension.
EnqueueManyOp(OpKernelConstruction * context)126 EnqueueManyOp::EnqueueManyOp(OpKernelConstruction* context)
127 : QueueAccessOpKernel(context) {}
128
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)129 void EnqueueManyOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
130 DoneCallback callback) {
131 DataTypeVector expected_inputs;
132 if (ctx->input_dtype(0) == DT_RESOURCE) {
133 expected_inputs.push_back(DT_RESOURCE);
134 } else {
135 expected_inputs.push_back(DT_STRING_REF);
136 }
137 for (DataType dt : queue->component_dtypes()) {
138 expected_inputs.push_back(dt);
139 }
140 OP_REQUIRES_OK_ASYNC(ctx, ctx->MatchSignature(expected_inputs, {}), callback);
141
142 QueueInterface::Tuple tuple;
143 OpInputList components;
144 OP_REQUIRES_OK_ASYNC(ctx, ctx->input_list("components", &components),
145 callback);
146 for (const Tensor& Tcomponent : components) {
147 tuple.push_back(Tcomponent);
148 }
149
150 OP_REQUIRES_OK_ASYNC(ctx, queue->ValidateManyTuple(tuple), callback);
151 queue->TryEnqueueMany(tuple, ctx, callback);
152 }
153
154 EnqueueManyOp::~EnqueueManyOp() = default;
155
156 // Defines a DequeueOp, the execution of which dequeues a tuple of
157 // tensors from the given Queue.
158 //
159 // The op has one input, which is the handle of the appropriate
160 // Queue. The op has k outputs, where k is the number of components in
161 // the tuples stored in the given Queue, and output i is the ith
162 // component of the dequeued tuple.
DequeueOp(OpKernelConstruction * context)163 DequeueOp::DequeueOp(OpKernelConstruction* context)
164 : QueueAccessOpKernel(context) {}
165
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)166 void DequeueOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
167 DoneCallback callback) {
168 if (ctx->input_dtype(0) == DT_RESOURCE) {
169 OP_REQUIRES_OK_ASYNC(
170 ctx, ctx->MatchSignature({DT_RESOURCE}, queue->component_dtypes()),
171 callback);
172 } else {
173 OP_REQUIRES_OK_ASYNC(
174 ctx, ctx->MatchSignature({DT_STRING_REF}, queue->component_dtypes()),
175 callback);
176 }
177
178 queue->TryDequeue(ctx, [ctx, callback](const QueueInterface::Tuple& tuple) {
179 if (!ctx->status().ok()) {
180 callback();
181 return;
182 }
183 OpOutputList output_components;
184 OP_REQUIRES_OK_ASYNC(
185 ctx, ctx->output_list("components", &output_components), callback);
186 for (int i = 0; i < ctx->num_outputs(); ++i) {
187 output_components.set(i, tuple[i]);
188 }
189 callback();
190 });
191 }
192
193 DequeueOp::~DequeueOp() = default;
194
195 // Defines a DequeueManyOp, the execution of which concatenates the
196 // requested number of elements from the given Queue along the 0th
197 // dimension, and emits the result as a single tuple of tensors.
198 //
199 // The op has two inputs:
200 // - Input 0: the handle to a queue.
201 // - Input 1: the number of elements to dequeue.
202 //
203 // The op has k outputs, where k is the number of components in the
204 // tuples stored in the given Queue, and output i is the ith component
205 // of the dequeued tuple.
DequeueManyOp(OpKernelConstruction * context)206 DequeueManyOp::DequeueManyOp(OpKernelConstruction* context)
207 : QueueAccessOpKernel(context) {}
208
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)209 void DequeueManyOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
210 DoneCallback callback) {
211 const Tensor& Tnum_elements = ctx->input(1);
212 int32 num_elements = Tnum_elements.flat<int32>()(0);
213
214 OP_REQUIRES_ASYNC(ctx, num_elements >= 0,
215 errors::InvalidArgument("DequeueManyOp requested ",
216 num_elements, " < 0 elements"),
217 callback);
218
219 if (ctx->input_dtype(0) == DT_RESOURCE) {
220 OP_REQUIRES_OK_ASYNC(
221 ctx,
222 ctx->MatchSignature({DT_RESOURCE, DT_INT32}, queue->component_dtypes()),
223 callback);
224 } else {
225 OP_REQUIRES_OK_ASYNC(ctx,
226 ctx->MatchSignature({DT_STRING_REF, DT_INT32},
227 queue->component_dtypes()),
228 callback);
229 }
230
231 queue->TryDequeueMany(
232 num_elements, ctx, false /* allow_small_batch */,
233 [ctx, callback](const QueueInterface::Tuple& tuple) {
234 if (!ctx->status().ok()) {
235 callback();
236 return;
237 }
238 OpOutputList output_components;
239 OP_REQUIRES_OK_ASYNC(
240 ctx, ctx->output_list("components", &output_components), callback);
241 for (int i = 0; i < ctx->num_outputs(); ++i) {
242 output_components.set(i, tuple[i]);
243 }
244 callback();
245 });
246 }
247
248 DequeueManyOp::~DequeueManyOp() = default;
249
250 // Defines a DequeueUpToOp, the execution of which concatenates the
251 // requested number of elements from the given Queue along the 0th
252 // dimension, and emits the result as a single tuple of tensors.
253 //
254 // The difference between this op and DequeueMany is the handling when
255 // the Queue is closed. While the DequeueMany op will return if there
256 // an error when there are less than num_elements elements left in the
257 // closed queue, this op will return between 1 and
258 // min(num_elements, elements_remaining_in_queue), and will not block.
259 // If there are no elements left, then the standard DequeueMany error
260 // is returned.
261 //
262 // This op only works if the underlying Queue implementation accepts
263 // the allow_small_batch = true parameter to TryDequeueMany.
264 // If it does not, an errors::Unimplemented exception is returned.
265 //
266 // The op has two inputs:
267 // - Input 0: the handle to a queue.
268 // - Input 1: the number of elements to dequeue.
269 //
270 // The op has k outputs, where k is the number of components in the
271 // tuples stored in the given Queue, and output i is the ith component
272 // of the dequeued tuple.
273 //
274 // The op has one attribute: allow_small_batch. If the Queue supports
275 // it, setting this to true causes the queue to return smaller
276 // (possibly zero length) batches when it is closed, up to however
277 // many elements are available when the op executes. In this case,
278 // the Queue does not block when closed.
DequeueUpToOp(OpKernelConstruction * context)279 DequeueUpToOp::DequeueUpToOp(OpKernelConstruction* context)
280 : QueueAccessOpKernel(context) {}
281
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)282 void DequeueUpToOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
283 DoneCallback callback) {
284 const Tensor& Tnum_elements = ctx->input(1);
285 int32 num_elements = Tnum_elements.flat<int32>()(0);
286
287 OP_REQUIRES_ASYNC(ctx, num_elements >= 0,
288 errors::InvalidArgument("DequeueUpToOp requested ",
289 num_elements, " < 0 elements"),
290 callback);
291
292 if (ctx->input_dtype(0) == DT_RESOURCE) {
293 OP_REQUIRES_OK_ASYNC(
294 ctx,
295 ctx->MatchSignature({DT_RESOURCE, DT_INT32}, queue->component_dtypes()),
296 callback);
297 } else {
298 OP_REQUIRES_OK_ASYNC(ctx,
299 ctx->MatchSignature({DT_STRING_REF, DT_INT32},
300 queue->component_dtypes()),
301 callback);
302 }
303
304 queue->TryDequeueMany(
305 num_elements, ctx, true /* allow_small_batch */,
306 [ctx, callback](const QueueInterface::Tuple& tuple) {
307 if (!ctx->status().ok()) {
308 callback();
309 return;
310 }
311 OpOutputList output_components;
312 OP_REQUIRES_OK_ASYNC(
313 ctx, ctx->output_list("components", &output_components), callback);
314 for (int i = 0; i < ctx->num_outputs(); ++i) {
315 output_components.set(i, tuple[i]);
316 }
317 callback();
318 });
319 }
320
321 DequeueUpToOp::~DequeueUpToOp() = default;
322
323 // Defines a QueueCloseOp, which closes the given Queue. Closing a
324 // Queue signals that no more elements will be enqueued in it.
325 //
326 // The op has one input, which is the handle of the appropriate Queue.
QueueCloseOp(OpKernelConstruction * context)327 QueueCloseOp::QueueCloseOp(OpKernelConstruction* context)
328 : QueueOpKernel(context) {
329 OP_REQUIRES_OK(context, context->GetAttr("cancel_pending_enqueues",
330 &cancel_pending_enqueues_));
331 }
332
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)333 void QueueCloseOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
334 DoneCallback callback) {
335 queue->Close(ctx, cancel_pending_enqueues_, callback);
336 }
337
338 // Defines a QueueSizeOp, which computes the number of elements in the
339 // given Queue, and emits it as an output tensor.
340 //
341 // The op has one input, which is the handle of the appropriate Queue;
342 // and one output, which is a single-element tensor containing the current
343 // size of that Queue.
QueueSizeOp(OpKernelConstruction * context)344 QueueSizeOp::QueueSizeOp(OpKernelConstruction* context)
345 : QueueOpKernel(context) {}
346
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)347 void QueueSizeOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
348 DoneCallback callback) {
349 Tensor* Tqueue_size = nullptr;
350 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &Tqueue_size));
351 Tqueue_size->flat<int32>().setConstant(queue->size());
352 callback();
353 }
354
QueueIsClosedOp(OpKernelConstruction * context)355 QueueIsClosedOp::QueueIsClosedOp(OpKernelConstruction* context)
356 : QueueOpKernel(context) {}
357
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)358 void QueueIsClosedOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
359 DoneCallback callback) {
360 Tensor* Tqueue_is_closed = nullptr;
361 OP_REQUIRES_OK(ctx,
362 ctx->allocate_output(0, TensorShape({}), &Tqueue_is_closed));
363 Tqueue_is_closed->flat<bool>().setConstant(queue->is_closed());
364 callback();
365 }
366
367 } // namespace tensorflow
368