1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/kernels/stack.h"
17
18 #include <limits.h>
19 #include <atomic>
20 #include <vector>
21
22 #include "tensorflow/core/common_runtime/device.h"
23 #include "tensorflow/core/framework/device_base.h"
24 #include "tensorflow/core/framework/op_kernel.h"
25 #include "tensorflow/core/framework/register_types.h"
26 #include "tensorflow/core/framework/resource_mgr.h"
27 #include "tensorflow/core/framework/tensor.h"
28 #include "tensorflow/core/framework/tensor_shape.h"
29 #include "tensorflow/core/framework/types.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/refcount.h"
32 #include "tensorflow/core/lib/gtl/map_util.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/macros.h"
35 #include "tensorflow/core/platform/mutex.h"
36 #include "tensorflow/core/platform/thread_annotations.h"
37 #include "tensorflow/core/platform/types.h"
38
39 namespace tensorflow {
40
41 class Stack : public ResourceBase {
42 public:
43 static std::atomic<int64> stack_counter;
44
45 struct TensorAndAllocation {
46 Tensor tensor;
47 AllocatorAttributes alloc_attrs;
48 bool swapped_to_cpu;
49 };
50
Stack(const DataType & elem_type,const string & stack_name,int max_size)51 Stack(const DataType& elem_type, const string& stack_name, int max_size)
52 : elem_type_(elem_type),
53 stack_name_(stack_name),
54 max_size_(max_size),
55 closed_(false) {}
56
Push(const TensorAndAllocation & value)57 Status Push(const TensorAndAllocation& value) {
58 mutex_lock l(mu_);
59 TF_RETURN_IF_ERROR(CheckNotClosed());
60 if (max_size_ >= 0 && stack_.size() >= max_size_) {
61 return errors::InvalidArgument("Stack[", stack_name_, "] overflowed ",
62 "its max_size (", max_size_, ")");
63 }
64 stack_.push_back(value);
65 return Status::OK();
66 }
67
Pop(TensorAndAllocation * value)68 Status Pop(TensorAndAllocation* value) {
69 mutex_lock l(mu_);
70 TF_RETURN_IF_ERROR(CheckNotClosed());
71 if (stack_.empty()) {
72 return errors::InvalidArgument("Stack[", stack_name_,
73 "] is empty when calling Pop().");
74 }
75 *value = stack_.back();
76 stack_.pop_back();
77 return Status::OK();
78 }
79
80 // We don't swap the first tensor on the stack and any subsequent tensors
81 // that share the buffer with the first tensor.
IsUsefulToSwap(const Tensor & tensor) const82 bool IsUsefulToSwap(const Tensor& tensor) const {
83 mutex_lock l(mu_);
84 if (stack_.empty()) {
85 return false;
86 }
87 const Tensor& first = stack_.front().tensor;
88 return !tensor.SharesBufferWith(first);
89 }
90
Close()91 void Close() {
92 mutex_lock l(mu_);
93 stack_.clear();
94 closed_ = true;
95 }
96
ElemType()97 DataType ElemType() { return elem_type_; }
98
DebugString() const99 string DebugString() const override {
100 mutex_lock l(mu_);
101 return strings::StrCat("Stack[", stack_name_, "]");
102 }
103
stack_name()104 const string& stack_name() { return stack_name_; }
105
106 private:
107 friend class StackOp;
mu()108 mutex* mu() { return &mu_; }
109
110 mutable mutex mu_;
111 DataType elem_type_;
112 const string stack_name_;
113 Tensor handle_;
114 int max_size_;
115 bool closed_ GUARDED_BY(mu_);
116 std::vector<TensorAndAllocation> stack_ GUARDED_BY(mu_);
117
CheckNotClosed() const118 Status CheckNotClosed() const EXCLUSIVE_LOCKS_REQUIRED(mu_) {
119 if (closed_) {
120 return errors::InvalidArgument("Stack[", stack_name_,
121 "] has already been closed.");
122 }
123 return Status::OK();
124 }
125 };
126
GetStack(OpKernelContext * ctx,Stack ** stack)127 Status GetStack(OpKernelContext* ctx, Stack** stack) {
128 if (ctx->input_dtype(0) == DT_RESOURCE) {
129 return LookupResource(ctx, HandleFromInput(ctx, 0), stack);
130 } else {
131 Tensor Tstack_handle = ctx->mutable_input(0, false);
132 if (Tstack_handle.NumElements() != 2) {
133 return errors::InvalidArgument(
134 "Stack handle must have two elements, but had shape: ",
135 Tstack_handle.shape().DebugString());
136 }
137 const string& container = Tstack_handle.flat<string>()(0);
138 const string& stack_name = Tstack_handle.flat<string>()(1);
139 string key = strings::StrCat(container, stack_name);
140 ResourceMgr* rm = ctx->resource_manager();
141 if (rm == nullptr) {
142 return errors::Internal("No resource manager.");
143 }
144 auto* step_container = ctx->step_container();
145 if (step_container == nullptr) {
146 return errors::Internal("No step container.");
147 }
148 TF_RETURN_IF_ERROR(rm->Lookup(step_container->name(), key, stack));
149 return Status::OK();
150 }
151 }
152
153 std::atomic<int64> Stack::stack_counter{0};
154
155 // StackOp
156
StackOp(OpKernelConstruction * context)157 StackOp::StackOp(OpKernelConstruction* context) : OpKernel(context) {
158 OP_REQUIRES_OK(context, context->GetAttr("elem_type", &elem_type_));
159 OP_REQUIRES_OK(context, context->GetAttr("stack_name", &stack_name_));
160 if (stack_name_.empty()) stack_name_ = name();
161 }
162
Compute(OpKernelContext * ctx)163 void StackOp::Compute(OpKernelContext* ctx) {
164 int32 size = std::numeric_limits<int32>::max();
165 if (ctx->num_inputs() > 0) {
166 const Tensor* tensor_size;
167 OP_REQUIRES_OK(ctx, ctx->input("max_size", &tensor_size));
168
169 OP_REQUIRES(
170 ctx, TensorShapeUtils::IsScalar(tensor_size->shape()),
171 errors::InvalidArgument("Stack size must be a scalar, but had shape: ",
172 tensor_size->shape().DebugString()));
173
174 int32 size_value = tensor_size->scalar<int32>()();
175 if (size_value >= 0) {
176 size = size_value;
177 }
178 }
179
180 static const char kContainer[] = "_stacks";
181 auto stack_id = Stack::stack_counter.fetch_add(1);
182 string stack_name = strings::StrCat(stack_name_, "_", stack_id);
183 // Store the handle in a per-step container.
184 ResourceMgr* rm = ctx->resource_manager();
185 OP_REQUIRES(ctx, rm != nullptr, errors::Internal("No resource manager."));
186 string key = strings::StrCat(kContainer, stack_name);
187 Stack* stack = new Stack(elem_type_, stack_name, size);
188 auto* step_container = ctx->step_container();
189 OP_REQUIRES(ctx, step_container != nullptr,
190 errors::Internal("No step container."));
191 OP_REQUIRES_OK(ctx, rm->Create(step_container->name(), key, stack));
192 if (IsRefType(ctx->expected_output_dtype(0))) {
193 // Create the stack handle.
194 AllocatorAttributes alloc_attr;
195 alloc_attr.set_on_host(true);
196 OP_REQUIRES_OK(ctx, ctx->allocate_temp(tensorflow::DT_STRING,
197 tensorflow::TensorShape({2}),
198 &stack->handle_, alloc_attr));
199 auto handle = stack->handle_.flat<string>();
200 handle(0) = kContainer;
201 handle(1) = std::move(stack_name);
202 ctx->set_output_ref(0, stack->mu(), &stack->handle_);
203 } else {
204 Tensor* handle;
205 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &handle));
206 handle->flat<ResourceHandle>()(0) =
207 MakePerStepResourceHandle<Stack>(ctx, key);
208 }
209 }
210
211 // StackPushOp
212
StackPushOp(OpKernelConstruction * context,bool allow_swapping)213 StackPushOp::StackPushOp(OpKernelConstruction* context, bool allow_swapping)
214 : AsyncOpKernel(context) {
215 if (allow_swapping) {
216 OP_REQUIRES_OK(context, context->GetAttr("swap_memory", &swap_memory_));
217 }
218 }
219
ComputeAsync(OpKernelContext * ctx,DoneCallback done)220 void StackPushOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
221 // Get the stack from the handle.
222 Stack* stack = nullptr;
223 OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
224 core::ScopedUnref unref(stack);
225
226 if (ctx->input_dtype(1) != stack->ElemType()) {
227 ctx->CtxFailure(errors::InvalidArgument("Must have type ",
228 stack->ElemType(), " but got ",
229 ctx->input_dtype(1)));
230 done();
231 return;
232 }
233
234 // Push the tensor onto the stack. Swap the tensor to CPU if instructed.
235 const Tensor& tensor = ctx->input(1);
236 AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
237 // For now, we use a simple heuristic for swapping: A GPU tensor is moved
238 // to CPU if the tensor has more than kCopyThreshold bytes and the GPU
239 // allocator says more than kOccupancy of the memory is in use.
240 static constexpr int kCopyThreshold = 2048;
241 static constexpr double kOccupancy = 0.7;
242 if (swap_memory_ && !alloc_attrs.on_host() &&
243 tensor.TotalBytes() > kCopyThreshold && stack->IsUsefulToSwap(tensor)) {
244 DeviceContext* device_ctxt = ctx->op_device_context();
245 auto device = static_cast<tensorflow::Device*>(ctx->device());
246 Allocator* allocator = device->GetAllocator(alloc_attrs);
247 absl::optional<AllocatorStats> stats = allocator->GetStats();
248 if (stats && *stats->bytes_limit &&
249 stats->bytes_in_use > (*stats->bytes_limit * kOccupancy)) {
250 // Asynchronously copy the tensor from GPU to CPU memory.
251 // TODO(yuanbyu): Swap the oldest tensor first.
252 AllocatorAttributes host_alloc_attrs;
253 host_alloc_attrs.set_gpu_compatible(true);
254 host_alloc_attrs.set_on_host(true);
255 Allocator* cpu_allocator = device->GetAllocator(host_alloc_attrs);
256 Tensor* cpu_tensor =
257 new Tensor(cpu_allocator, tensor.dtype(), tensor.shape());
258 device_ctxt->CopyDeviceTensorToCPU(
259 &tensor, "StackPush", device, cpu_tensor,
260 [cpu_tensor, stack, ctx, done](const Status& s) {
261 ctx->SetStatus(s);
262 if (s.ok()) {
263 AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
264 ctx->SetStatus(stack->Push({*cpu_tensor, alloc_attrs, true}));
265 }
266 if (ctx->status().ok()) {
267 ctx->set_output(0, *cpu_tensor);
268 }
269 done();
270 delete cpu_tensor;
271 });
272 return;
273 }
274 }
275
276 // Execute synchronously if not swapped.
277 OP_REQUIRES_OK_ASYNC(ctx, stack->Push({tensor, alloc_attrs, false}), done);
278 ctx->set_output(0, tensor);
279 done();
280 }
281
IsExpensive()282 bool StackPushOp::IsExpensive() { return false; }
283
284 // StackPopOp
285
StackPopOp(OpKernelConstruction * context)286 StackPopOp::StackPopOp(OpKernelConstruction* context)
287 : AsyncOpKernel(context) {}
288
ComputeAsync(OpKernelContext * ctx,DoneCallback done)289 void StackPopOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
290 // Get the stack from the handle.
291 Stack* stack = nullptr;
292 OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
293 core::ScopedUnref unref(stack);
294
295 // Pop the tensor. Transfer the tensor back to device if it was
296 // swapped out to CPU.
297 Stack::TensorAndAllocation value;
298 OP_REQUIRES_OK_ASYNC(ctx, stack->Pop(&value), done);
299 if (value.swapped_to_cpu) {
300 // Asynchronously copy the tensor back from CPU to GPU memory.
301 DeviceContext* device_ctxt = ctx->op_device_context();
302 Device* device = static_cast<Device*>(ctx->device());
303 Tensor* cpu_tensor = &value.tensor;
304 Allocator* gpu_allocator = device->GetAllocator(value.alloc_attrs);
305 Tensor* device_tensor =
306 new Tensor(gpu_allocator, cpu_tensor->dtype(), cpu_tensor->shape());
307 device_ctxt->CopyCPUTensorToDevice(
308 cpu_tensor, device, device_tensor,
309 [device_tensor, ctx, done](const Status& s) {
310 ctx->SetStatus(s);
311 if (s.ok()) {
312 ctx->set_output(0, *device_tensor);
313 }
314 done();
315 delete device_tensor;
316 });
317 } else {
318 // Execute synchronously if not swapped.
319 ctx->set_output(0, value.tensor);
320 done();
321 }
322 }
323
IsExpensive()324 bool StackPopOp::IsExpensive() { return false; }
325
326 // StackCloseOp
327
StackCloseOp(OpKernelConstruction * context)328 StackCloseOp::StackCloseOp(OpKernelConstruction* context) : OpKernel(context) {}
329
Compute(OpKernelContext * ctx)330 void StackCloseOp::Compute(OpKernelContext* ctx) {
331 Stack* stack = nullptr;
332 OP_REQUIRES_OK(ctx, GetStack(ctx, &stack));
333 core::ScopedUnref unref(stack);
334 stack->Close();
335 }
336
IsExpensive()337 bool StackCloseOp::IsExpensive() { return false; }
338
339 } // namespace tensorflow
340