• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/kernels/stack.h"
17 
18 #include <limits.h>
19 #include <atomic>
20 #include <vector>
21 
22 #include "tensorflow/core/common_runtime/device.h"
23 #include "tensorflow/core/framework/device_base.h"
24 #include "tensorflow/core/framework/op_kernel.h"
25 #include "tensorflow/core/framework/register_types.h"
26 #include "tensorflow/core/framework/resource_mgr.h"
27 #include "tensorflow/core/framework/tensor.h"
28 #include "tensorflow/core/framework/tensor_shape.h"
29 #include "tensorflow/core/framework/types.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/refcount.h"
32 #include "tensorflow/core/lib/gtl/map_util.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/macros.h"
35 #include "tensorflow/core/platform/mutex.h"
36 #include "tensorflow/core/platform/thread_annotations.h"
37 #include "tensorflow/core/platform/types.h"
38 
39 namespace tensorflow {
40 
41 class Stack : public ResourceBase {
42  public:
43   static std::atomic<int64> stack_counter;
44 
45   struct TensorAndAllocation {
46     Tensor tensor;
47     AllocatorAttributes alloc_attrs;
48     bool swapped_to_cpu;
49   };
50 
Stack(const DataType & elem_type,const string & stack_name,int max_size)51   Stack(const DataType& elem_type, const string& stack_name, int max_size)
52       : elem_type_(elem_type),
53         stack_name_(stack_name),
54         max_size_(max_size),
55         closed_(false) {}
56 
Push(const TensorAndAllocation & value)57   Status Push(const TensorAndAllocation& value) {
58     mutex_lock l(mu_);
59     TF_RETURN_IF_ERROR(CheckNotClosed());
60     if (max_size_ >= 0 && stack_.size() >= max_size_) {
61       return errors::InvalidArgument("Stack[", stack_name_, "] overflowed ",
62                                      "its max_size (", max_size_, ")");
63     }
64     stack_.push_back(value);
65     return Status::OK();
66   }
67 
Pop(TensorAndAllocation * value)68   Status Pop(TensorAndAllocation* value) {
69     mutex_lock l(mu_);
70     TF_RETURN_IF_ERROR(CheckNotClosed());
71     if (stack_.empty()) {
72       return errors::InvalidArgument("Stack[", stack_name_,
73                                      "] is empty when calling Pop().");
74     }
75     *value = stack_.back();
76     stack_.pop_back();
77     return Status::OK();
78   }
79 
80   // We don't swap the first tensor on the stack and any subsequent tensors
81   // that share the buffer with the first tensor.
IsUsefulToSwap(const Tensor & tensor) const82   bool IsUsefulToSwap(const Tensor& tensor) const {
83     mutex_lock l(mu_);
84     if (stack_.empty()) {
85       return false;
86     }
87     const Tensor& first = stack_.front().tensor;
88     return !tensor.SharesBufferWith(first);
89   }
90 
Close()91   void Close() {
92     mutex_lock l(mu_);
93     stack_.clear();
94     closed_ = true;
95   }
96 
ElemType()97   DataType ElemType() { return elem_type_; }
98 
DebugString() const99   string DebugString() const override {
100     mutex_lock l(mu_);
101     return strings::StrCat("Stack[", stack_name_, "]");
102   }
103 
stack_name()104   const string& stack_name() { return stack_name_; }
105 
106  private:
107   friend class StackOp;
mu()108   mutex* mu() { return &mu_; }
109 
110   mutable mutex mu_;
111   DataType elem_type_;
112   const string stack_name_;
113   Tensor handle_;
114   int max_size_;
115   bool closed_ GUARDED_BY(mu_);
116   std::vector<TensorAndAllocation> stack_ GUARDED_BY(mu_);
117 
CheckNotClosed() const118   Status CheckNotClosed() const EXCLUSIVE_LOCKS_REQUIRED(mu_) {
119     if (closed_) {
120       return errors::InvalidArgument("Stack[", stack_name_,
121                                      "] has already been closed.");
122     }
123     return Status::OK();
124   }
125 };
126 
GetStack(OpKernelContext * ctx,Stack ** stack)127 Status GetStack(OpKernelContext* ctx, Stack** stack) {
128   if (ctx->input_dtype(0) == DT_RESOURCE) {
129     return LookupResource(ctx, HandleFromInput(ctx, 0), stack);
130   } else {
131     Tensor Tstack_handle = ctx->mutable_input(0, false);
132     if (Tstack_handle.NumElements() != 2) {
133       return errors::InvalidArgument(
134           "Stack handle must have two elements, but had shape: ",
135           Tstack_handle.shape().DebugString());
136     }
137     const string& container = Tstack_handle.flat<string>()(0);
138     const string& stack_name = Tstack_handle.flat<string>()(1);
139     string key = strings::StrCat(container, stack_name);
140     ResourceMgr* rm = ctx->resource_manager();
141     if (rm == nullptr) {
142       return errors::Internal("No resource manager.");
143     }
144     auto* step_container = ctx->step_container();
145     if (step_container == nullptr) {
146       return errors::Internal("No step container.");
147     }
148     TF_RETURN_IF_ERROR(rm->Lookup(step_container->name(), key, stack));
149     return Status::OK();
150   }
151 }
152 
153 std::atomic<int64> Stack::stack_counter{0};
154 
155 // StackOp
156 
StackOp(OpKernelConstruction * context)157 StackOp::StackOp(OpKernelConstruction* context) : OpKernel(context) {
158   OP_REQUIRES_OK(context, context->GetAttr("elem_type", &elem_type_));
159   OP_REQUIRES_OK(context, context->GetAttr("stack_name", &stack_name_));
160   if (stack_name_.empty()) stack_name_ = name();
161 }
162 
Compute(OpKernelContext * ctx)163 void StackOp::Compute(OpKernelContext* ctx) {
164   int32 size = std::numeric_limits<int32>::max();
165   if (ctx->num_inputs() > 0) {
166     const Tensor* tensor_size;
167     OP_REQUIRES_OK(ctx, ctx->input("max_size", &tensor_size));
168 
169     OP_REQUIRES(
170         ctx, TensorShapeUtils::IsScalar(tensor_size->shape()),
171         errors::InvalidArgument("Stack size must be a scalar, but had shape: ",
172                                 tensor_size->shape().DebugString()));
173 
174     int32 size_value = tensor_size->scalar<int32>()();
175     if (size_value >= 0) {
176       size = size_value;
177     }
178   }
179 
180   static const char kContainer[] = "_stacks";
181   auto stack_id = Stack::stack_counter.fetch_add(1);
182   string stack_name = strings::StrCat(stack_name_, "_", stack_id);
183   // Store the handle in a per-step container.
184   ResourceMgr* rm = ctx->resource_manager();
185   OP_REQUIRES(ctx, rm != nullptr, errors::Internal("No resource manager."));
186   string key = strings::StrCat(kContainer, stack_name);
187   Stack* stack = new Stack(elem_type_, stack_name, size);
188   auto* step_container = ctx->step_container();
189   OP_REQUIRES(ctx, step_container != nullptr,
190               errors::Internal("No step container."));
191   OP_REQUIRES_OK(ctx, rm->Create(step_container->name(), key, stack));
192   if (IsRefType(ctx->expected_output_dtype(0))) {
193     // Create the stack handle.
194     AllocatorAttributes alloc_attr;
195     alloc_attr.set_on_host(true);
196     OP_REQUIRES_OK(ctx, ctx->allocate_temp(tensorflow::DT_STRING,
197                                            tensorflow::TensorShape({2}),
198                                            &stack->handle_, alloc_attr));
199     auto handle = stack->handle_.flat<string>();
200     handle(0) = kContainer;
201     handle(1) = std::move(stack_name);
202     ctx->set_output_ref(0, stack->mu(), &stack->handle_);
203   } else {
204     Tensor* handle;
205     OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &handle));
206     handle->flat<ResourceHandle>()(0) =
207         MakePerStepResourceHandle<Stack>(ctx, key);
208   }
209 }
210 
211 // StackPushOp
212 
StackPushOp(OpKernelConstruction * context,bool allow_swapping)213 StackPushOp::StackPushOp(OpKernelConstruction* context, bool allow_swapping)
214     : AsyncOpKernel(context) {
215   if (allow_swapping) {
216     OP_REQUIRES_OK(context, context->GetAttr("swap_memory", &swap_memory_));
217   }
218 }
219 
ComputeAsync(OpKernelContext * ctx,DoneCallback done)220 void StackPushOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
221   // Get the stack from the handle.
222   Stack* stack = nullptr;
223   OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
224   core::ScopedUnref unref(stack);
225 
226   if (ctx->input_dtype(1) != stack->ElemType()) {
227     ctx->CtxFailure(errors::InvalidArgument("Must have type ",
228                                             stack->ElemType(), " but got ",
229                                             ctx->input_dtype(1)));
230     done();
231     return;
232   }
233 
234   // Push the tensor onto the stack. Swap the tensor to CPU if instructed.
235   const Tensor& tensor = ctx->input(1);
236   AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
237   // For now, we use a simple heuristic for swapping: A GPU tensor is moved
238   // to CPU if the tensor has more than kCopyThreshold bytes and the GPU
239   // allocator says more than kOccupancy of the memory is in use.
240   static constexpr int kCopyThreshold = 2048;
241   static constexpr double kOccupancy = 0.7;
242   if (swap_memory_ && !alloc_attrs.on_host() &&
243       tensor.TotalBytes() > kCopyThreshold && stack->IsUsefulToSwap(tensor)) {
244     DeviceContext* device_ctxt = ctx->op_device_context();
245     auto device = static_cast<tensorflow::Device*>(ctx->device());
246     Allocator* allocator = device->GetAllocator(alloc_attrs);
247     absl::optional<AllocatorStats> stats = allocator->GetStats();
248     if (stats && *stats->bytes_limit &&
249         stats->bytes_in_use > (*stats->bytes_limit * kOccupancy)) {
250       // Asynchronously copy the tensor from GPU to CPU memory.
251       // TODO(yuanbyu): Swap the oldest tensor first.
252       AllocatorAttributes host_alloc_attrs;
253       host_alloc_attrs.set_gpu_compatible(true);
254       host_alloc_attrs.set_on_host(true);
255       Allocator* cpu_allocator = device->GetAllocator(host_alloc_attrs);
256       Tensor* cpu_tensor =
257           new Tensor(cpu_allocator, tensor.dtype(), tensor.shape());
258       device_ctxt->CopyDeviceTensorToCPU(
259           &tensor, "StackPush", device, cpu_tensor,
260           [cpu_tensor, stack, ctx, done](const Status& s) {
261             ctx->SetStatus(s);
262             if (s.ok()) {
263               AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
264               ctx->SetStatus(stack->Push({*cpu_tensor, alloc_attrs, true}));
265             }
266             if (ctx->status().ok()) {
267               ctx->set_output(0, *cpu_tensor);
268             }
269             done();
270             delete cpu_tensor;
271           });
272       return;
273     }
274   }
275 
276   // Execute synchronously if not swapped.
277   OP_REQUIRES_OK_ASYNC(ctx, stack->Push({tensor, alloc_attrs, false}), done);
278   ctx->set_output(0, tensor);
279   done();
280 }
281 
IsExpensive()282 bool StackPushOp::IsExpensive() { return false; }
283 
284 // StackPopOp
285 
StackPopOp(OpKernelConstruction * context)286 StackPopOp::StackPopOp(OpKernelConstruction* context)
287     : AsyncOpKernel(context) {}
288 
ComputeAsync(OpKernelContext * ctx,DoneCallback done)289 void StackPopOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
290   // Get the stack from the handle.
291   Stack* stack = nullptr;
292   OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
293   core::ScopedUnref unref(stack);
294 
295   // Pop the tensor. Transfer the tensor back to device if it was
296   // swapped out to CPU.
297   Stack::TensorAndAllocation value;
298   OP_REQUIRES_OK_ASYNC(ctx, stack->Pop(&value), done);
299   if (value.swapped_to_cpu) {
300     // Asynchronously copy the tensor back from CPU to GPU memory.
301     DeviceContext* device_ctxt = ctx->op_device_context();
302     Device* device = static_cast<Device*>(ctx->device());
303     Tensor* cpu_tensor = &value.tensor;
304     Allocator* gpu_allocator = device->GetAllocator(value.alloc_attrs);
305     Tensor* device_tensor =
306         new Tensor(gpu_allocator, cpu_tensor->dtype(), cpu_tensor->shape());
307     device_ctxt->CopyCPUTensorToDevice(
308         cpu_tensor, device, device_tensor,
309         [device_tensor, ctx, done](const Status& s) {
310           ctx->SetStatus(s);
311           if (s.ok()) {
312             ctx->set_output(0, *device_tensor);
313           }
314           done();
315           delete device_tensor;
316         });
317   } else {
318     // Execute synchronously if not swapped.
319     ctx->set_output(0, value.tensor);
320     done();
321   }
322 }
323 
IsExpensive()324 bool StackPopOp::IsExpensive() { return false; }
325 
326 // StackCloseOp
327 
StackCloseOp(OpKernelConstruction * context)328 StackCloseOp::StackCloseOp(OpKernelConstruction* context) : OpKernel(context) {}
329 
Compute(OpKernelContext * ctx)330 void StackCloseOp::Compute(OpKernelContext* ctx) {
331   Stack* stack = nullptr;
332   OP_REQUIRES_OK(ctx, GetStack(ctx, &stack));
333   core::ScopedUnref unref(stack);
334   stack->Close();
335 }
336 
IsExpensive()337 bool StackCloseOp::IsExpensive() { return false; }
338 
339 }  // namespace tensorflow
340