• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/single_threaded_executor.h"
17 
18 #include "tensorflow/core/common_runtime/entry.h"
19 #include "tensorflow/core/common_runtime/executor.h"
20 #include "tensorflow/core/common_runtime/executor_factory.h"
21 #include "tensorflow/core/common_runtime/renamed_device.h"
22 #include "tensorflow/core/graph/algorithm.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/status.h"
25 #include "tensorflow/core/lib/gtl/cleanup.h"
26 #include "tensorflow/core/platform/errors.h"
27 #include "tensorflow/core/platform/macros.h"
28 
29 namespace tensorflow {
30 
ValidateOp(const Node & n)31 Status ValidateOp(const Node& n) {
32   for (DataType dt : n.output_types()) {
33     if (IsRefType(dt)) {
34       return errors::Unimplemented(
35           "Single-threaded executor does not support reference-typed "
36           "edges.  But saw type ",
37           DataTypeString(dt), " in outputs of node ", n.name());
38     }
39   }
40   if (n.IsControlFlow()) {
41     return errors::FailedPrecondition(
42         "Single-threaded executor does not support low level control flow, "
43         " but saw control flow node ",
44         n.name(),
45         ".  Perhaps your graph contains old-style control flow primitives? "
46         "Try using tf.compat.v1.enable_control_flow_v2().");
47   }
48   if (n.IsSend() || n.IsHostSend() || n.IsRecv() || n.IsHostRecv()) {
49     return errors::Unimplemented(
50         "Single-threaded executor does not support partitioned graphs.  "
51         "But saw send/recv node ",
52         n.name());
53   }
54   if (n.IsCollective()) {
55     return errors::Unimplemented(
56         "Single-threaded executor does not support collective ops.  But "
57         "saw collective node ",
58         n.name());
59   }
60   return Status::OK();
61 }
62 
63 namespace {
64 
65 typedef gtl::InlinedVector<TensorValue, 4> TensorValueVec;
66 typedef gtl::InlinedVector<AllocatorAttributes, 4> AllocatorAttributeVec;
67 
68 static const string& kSingleThreadedExecutor =
69     *new string("SINGLE_THREADED_EXECUTOR");
70 
71 class SingleThreadedExecutorImpl : public Executor {
72  public:
SingleThreadedExecutorImpl(const LocalExecutorParams & params)73   explicit SingleThreadedExecutorImpl(const LocalExecutorParams& params)
74       : params_(params) {}
75 
~SingleThreadedExecutorImpl()76   ~SingleThreadedExecutorImpl() override {
77     for (const KernelState& kernel_state : kernels_) {
78       params_.delete_kernel(kernel_state.kernel);
79     }
80     for (const ConstTensorKernelState& kernel_state : const_tensor_kernels_) {
81       params_.delete_kernel(kernel_state.kernel);
82     }
83   }
84 
Initialize(const Graph & graph)85   Status Initialize(const Graph& graph) {
86     // Topologicially sort `graph` to get a sequence of OpKernels.
87     std::vector<Node*> ordered_nodes;
88     ordered_nodes.reserve(graph.num_nodes());
89     GetReversePostOrder(graph, &ordered_nodes);
90     int ordered_nodes_size = ordered_nodes.size();
91     if (ordered_nodes_size != graph.num_nodes()) {
92       return errors::InvalidArgument("Graph had ", graph.num_nodes(),
93                                      " but reverse post-order had ",
94                                      ordered_nodes.size());
95     }
96 
97     kernels_.reserve(ordered_nodes.size());
98     std::vector<Node*> nodes_with_kernels;
99     std::vector<Node*> nodes_with_const_tensor_kernels;
100     nodes_with_kernels.reserve(ordered_nodes.size());
101 
102     std::map<size_t, Node*> arg_index_to_node_map;
103     absl::flat_hash_map<Node*, size_t> node_to_index_map;
104 
105     // Create the kernel and input-related structures for each node in `graph`.
106     for (Node* n : ordered_nodes) {
107       TF_RETURN_IF_ERROR(ValidateOp(*n));
108       if (n->IsArg()) {
109         int32_t arg_index;
110         TF_RETURN_IF_ERROR(GetNodeAttr(n->attrs(), "index", &arg_index));
111         if (arg_index < 0) {
112           return errors::InvalidArgument("Invalid argument index ", arg_index,
113                                          " in node ", n->name());
114         }
115         arg_index_to_node_map[arg_index] = n;
116         // We do not create a kernel for Arg nodes, and instead inline the
117         // argument handling directly in the executor code.
118         continue;
119       }
120 
121       OpKernel* kernel;
122       TF_RETURN_IF_ERROR(params_.create_kernel(n->properties(), &kernel));
123 
124       const Tensor* const_tensor;
125       if (n->num_outputs() == 1 && (const_tensor = kernel->const_tensor())) {
126         // Nodes that produce a single constant tensor are handled specially:
127         // we evaluate the tensor once, and propagate it to its consumers as
128         // a `const Tensor*`, to avoid refcount manipulation.
129         const size_t kernel_index = const_tensor_kernels_.size();
130         const_tensor_kernels_.push_back({});
131         nodes_with_const_tensor_kernels.push_back(n);
132         ConstTensorKernelState& kernel_state =
133             const_tensor_kernels_[kernel_index];
134         kernel_state.kernel = kernel;
135         kernel_state.const_tensor = *const_tensor;
136       } else {
137         const size_t kernel_index = kernels_.size();
138         kernels_.push_back({});
139         nodes_with_kernels.push_back(n);
140         KernelState& kernel_state = kernels_[kernel_index];
141         kernel_state.kernel = kernel;
142         kernel_state.num_inputs = n->num_inputs();
143         kernel_state.num_outputs = n->num_outputs();
144         node_to_index_map[n] = kernel_index;
145         if (kernel_index == 0) {
146           kernel_state.input_start_index = 0;
147         } else {
148           const KernelState& previous_kernel_state = kernels_[kernel_index - 1];
149           kernel_state.input_start_index =
150               previous_kernel_state.input_start_index +
151               previous_kernel_state.num_inputs;
152         }
153       }
154     }
155 
156     // Build the mapping from each Arg node output to the input slot for the
157     // corresponding destination node.
158     if (!arg_index_to_node_map.empty()) {
159       const size_t num_args = arg_index_to_node_map.rbegin()->first + 1;
160       arg_output_locations_.resize(num_args);
161       for (const auto& arg_index_node_pair : arg_index_to_node_map) {
162         const size_t arg_index = arg_index_node_pair.first;
163         const Node* arg_node = arg_index_node_pair.second;
164         arg_output_locations_[arg_index].reserve(arg_node->out_edges().size());
165         for (const Edge* e : arg_node->out_edges()) {
166           if (e->src_output() == Graph::kControlSlot) {
167             continue;
168           } else if (e->src_output() != 0) {
169             return errors::Internal("Invalid output index ", e->src_output(),
170                                     " from argument node ", arg_index);
171           }
172           arg_output_locations_[arg_index].push_back(
173               kernels_[node_to_index_map[e->dst()]].input_start_index +
174               e->dst_input());
175         }
176       }
177     }
178 
179     // Build the mapping from each const tensor kernel to the input slot for the
180     // corresponding destination node.
181     for (size_t i = 0; i < const_tensor_kernels_.size(); ++i) {
182       Node* n = nodes_with_const_tensor_kernels[i];
183       ConstTensorKernelState& kernel_state = const_tensor_kernels_[i];
184       for (const Edge* e : n->out_edges()) {
185         if (e->src_output() == Graph::kControlSlot) {
186           continue;
187         } else if (e->src_output() != 0) {
188           return errors::Internal("Invalid output index ", e->src_output(),
189                                   " from node ", n->DebugString());
190         }
191         kernel_state.output_locations.push_back(
192             kernels_[node_to_index_map[e->dst()]].input_start_index +
193             e->dst_input());
194       }
195 
196       bool on_host =
197           kernel_state.kernel->output_memory_types()[0] == HOST_MEMORY;
198       kernel_state.output_alloc_attr.set_on_host(on_host);
199     }
200 
201     // Build the mapping from each node output to the input slot for the
202     // corresponding destination node.
203     for (size_t i = 0; i < kernels_.size(); ++i) {
204       Node* n = nodes_with_kernels[i];
205       KernelState& kernel_state = kernels_[i];
206       kernel_state.output_locations.resize(kernel_state.num_outputs);
207       for (const Edge* e : n->out_edges()) {
208         if (!e->IsControlEdge()) {
209           kernel_state.output_locations[e->src_output()].push_back(
210               kernels_[node_to_index_map[e->dst()]].input_start_index +
211               e->dst_input());
212         }
213       }
214 
215       // Compute allocator attributes for each node output, and corresponding
216       // node input.
217       kernel_state.output_alloc_attrs.resize(kernel_state.num_outputs);
218       AllocatorAttributes* attrs = kernel_state.output_alloc_attrs.data();
219 
220       OpKernel* op_kernel = kernel_state.kernel;
221       for (int out = 0; out < n->num_outputs(); out++) {
222         DCHECK_LT(out, op_kernel->output_memory_types().size());
223         bool on_host = op_kernel->output_memory_types()[out] == HOST_MEMORY;
224         if (on_host) {
225           AllocatorAttributes h;
226           h.set_on_host(on_host);
227           attrs[out].Merge(h);
228         }
229       }
230     }
231 
232     if (!kernels_.empty()) {
233       const KernelState& last_kernel_state = kernels_.back();
234       total_num_inputs_ =
235           last_kernel_state.input_start_index + last_kernel_state.num_inputs;
236       input_alloc_attrs_.resize(total_num_inputs_);
237       for (size_t i = 0; i < kernels_.size(); ++i) {
238         for (size_t j = 0; j < kernels_[i].output_locations.size(); ++j) {
239           for (size_t output_location : kernels_[i].output_locations[j]) {
240             input_alloc_attrs_[output_location] =
241                 kernels_[i].output_alloc_attrs[j];
242           }
243         }
244       }
245     } else {
246       total_num_inputs_ = 0;
247     }
248     return Status::OK();
249   }
250 
Run(const Args & args)251   Status Run(const Args& args) override {
252     // The inputs to each kernel are stored contiguously in `inputs`.
253     //
254     // We use `kernels_[i].input_start_index` and `kernels_[i].num_inputs` to
255     // determine the range of elements in this vector that correspond to
256     // the inputs of `kernels_[i]`.
257     //
258     // This vector has the following layout:
259     //
260     // * Kernel 0, input 0.
261     // * Kernel 0, input 1.
262     // * ...
263     // * Kernel 0, input `kernels_[0].num_inputs - 1`.
264     // * Kernel 1, input 0.
265     // * ...
266     // * Kernel 1, input `kernels_[1].num_inputs - 1`.
267     // * ...
268     // * Kernel `kernels_.size() - 1`, input 0.
269     // * ...
270     // * Kernel `kernels_.size() - 1`, input `kernels_.back().num_inputs - 1`.
271     //
272     // Note that kernels with zero inputs do not correspond to any elements in
273     // this vector.
274     //
275     // We use `ManualConstructor<Tensor>` to avoid the overhead of
276     // default-constructing an invalid `Tensor` for each slot at the beginning
277     // of execution:
278     // * Elements are initialized when the outputs of a kernel execution are
279     //   propagated to the inputs of kernels that depend on them.
280     // * The elements corresponding to the inputs for kernel `i` are destroyed
281     //   after kernel `i` executes.
282     // * In an error case (see below), we use the connectivity information in
283     //   `KernelState::output_locations` to determine which locations have been
284     //   initialized, and manually destroy them.
285     std::vector<Entry> inputs(total_num_inputs_);
286 
287     // TODO(mrry): Can we avoid copying into these vectors? Consider modifying
288     // OpKernelContext to take the TensorValueVec as a pointer into `inputs`.
289     TensorValueVec node_inputs;
290     AllocatorAttributeVec input_alloc_attrs;
291 
292     // Override intra op thread pool if requested.
293     Device* device = params_.device;
294     std::unique_ptr<Device> user_device;
295     if (args.user_intra_op_threadpool != nullptr) {
296       user_device = RenamedDevice::NewRenamedDevice(
297           device->name(), device, /*owns_underlying=*/false,
298           /*isolate_session_state=*/false, args.user_intra_op_threadpool);
299       device = user_device.get();
300     }
301 
302     // Prepare the parameters that will be the same for all kernels.
303     OpKernelContext::Params params;
304     params.step_id = args.step_id;
305     params.device = device;
306     params.log_memory = false;  // TODO(mrry): Too severe?
307     params.rendezvous = args.rendezvous;
308     params.session_state = args.session_state;
309     params.session_metadata = params_.session_metadata;
310     params.tensor_store = args.tensor_store;
311     params.cancellation_manager = args.cancellation_manager;
312     params.call_frame = args.call_frame;
313     params.function_library = params_.function_library;
314     params.resource_manager = device->resource_manager();
315     params.step_container = args.step_container;
316     params.collective_executor = args.collective_executor;
317     params.slice_reader_cache = nullptr;  // TODO(mrry): Too severe?
318     params.inputs = &node_inputs;
319     params.input_alloc_attrs = &input_alloc_attrs;
320 
321     Args::Runner runner_copy = args.runner;
322     params.runner = &runner_copy;
323     params.run_all_kernels_inline = args.run_all_kernels_inline;
324     params.stats_collector = args.stats_collector;
325     params.executor_type = &kSingleThreadedExecutor;
326 
327     // NOTE(mrry): We are assuming that the graph is loopless and condless.
328     params.frame_iter = FrameAndIter(0, 0);
329     params.is_input_dead = false;
330 
331     device->TryGetDeviceContext(&params.op_device_context).IgnoreError();
332     auto context_cleanup = gtl::MakeCleanup([&params] {
333       if (params.op_device_context != nullptr) {
334         params.op_device_context->Unref();
335       }
336     });
337 
338     // TODO(mrry): Consider implementing forwarding.
339     params.forward_from_array = nullptr;
340 
341     const size_t received_args =
342         args.call_frame ? args.call_frame->num_args() : 0;
343     if (TF_PREDICT_FALSE(arg_output_locations_.size() > received_args)) {
344       return errors::InvalidArgument("Expected ", arg_output_locations_.size(),
345                                      " arguments, but only received ",
346                                      received_args, ".");
347     }
348 
349     // ArgOp is a relatively expensive OpKernel due to the Tensor
350     // allocations that it performs. Therefore we specialize its implementation
351     // and forward arguments directly to the inputs of kernels that consume
352     // them.
353     for (size_t i = 0; i < arg_output_locations_.size(); ++i) {
354       const size_t num_destinations = arg_output_locations_[i].size();
355       if (num_destinations > 0) {
356         if (args.call_frame->CanConsumeArg(i)) {
357           // The first destination input can consume the argument.
358           Entry& first_input = inputs[arg_output_locations_[i][0]];
359           first_input.state = Entry::State::HAS_VALUE;
360           first_input.val.Init();
361           args.call_frame->ConsumeArg(i, first_input.val.get());
362           // All subsequent destination inputs get a shallow copy of the first
363           // destination input.
364           //
365           // NOTE: If we had metadata about which kernels might attempt to
366           // forward their input, we could arrange the kernel order so that
367           // one of those kernels was executed last.
368           for (size_t j = 1; j < num_destinations; ++j) {
369             Entry& input = inputs[arg_output_locations_[i][j]];
370             input.state = Entry::State::HAS_VALUE;
371             input.val.Init(*first_input.val);
372           }
373         } else {
374           const Tensor* arg;
375           TF_RETURN_IF_ERROR(args.call_frame->GetArg(i, &arg));
376           for (size_t j = 0; j < num_destinations; ++j) {
377             Entry& input = inputs[arg_output_locations_[i][j]];
378             // NOTE: We must make at least one shallow copy of the argument
379             // tensor that remains live until all consuming kernels have
380             // executed, to keep the reference count > 1, and inhibit buffer
381             // forwarding. For simplicity, we shallow copy into the input entry
382             // for each consuming kernel.
383             input.state = Entry::State::HAS_VALUE;
384             input.val.Init(*arg);
385           }
386         }
387       }
388     }
389 
390     // Kernels that return a constant value (e.g. ConstOp) are relatively
391     // expensive due to the Tensor allocations that they perform. Therefore we
392     // specialize their implementation and forward their constant value directly
393     // to the inputs of kernels that consume them.
394     for (const ConstTensorKernelState& kernel_state : const_tensor_kernels_) {
395       for (size_t i = 0; i < kernel_state.output_locations.size(); ++i) {
396         Entry& input = inputs[kernel_state.output_locations[i]];
397         input.state = Entry::State::HAS_CONST_TENSOR;
398         input.const_tensor = &kernel_state.const_tensor;
399       }
400     }
401 
402     // Execute the kernels one-at-a-time in topological order.
403     for (size_t i = 0; i < kernels_.size(); ++i) {
404       const KernelState& kernel_state = kernels_[i];
405 
406       // Prepare the per-kernel parameters.
407       const size_t input_start_index = kernel_state.input_start_index;
408       const size_t num_inputs = kernel_state.num_inputs;
409       const size_t num_outputs = kernel_state.num_outputs;
410 
411       node_inputs.clear();
412       node_inputs.resize(num_inputs);
413       input_alloc_attrs.clear();
414       input_alloc_attrs.resize(num_inputs);
415       for (size_t j = 0; j < num_inputs; ++j) {
416         Entry& input = inputs[input_start_index + j];
417         switch (input.state) {
418           case Entry::State::HAS_CONST_TENSOR:
419             // NOTE(mrry): This `const_cast` is necessary because `TensorValue`
420             // stores a non-const `Tensor*`, and relies on the `OpKernelContext`
421             // accessors making dynamic checks that prevent using an immutable
422             // tensor as a mutable tensor.
423             node_inputs[j].tensor = const_cast<Tensor*>(input.const_tensor);
424             break;
425           case Entry::State::HAS_VALUE:
426             node_inputs[j].tensor = input.val.get();
427             break;
428           default:
429             DCHECK(false) << "Input did not have a valid value.";
430         }
431         input_alloc_attrs[j] = input_alloc_attrs_[input_start_index + j];
432       }
433       params.op_kernel = kernel_state.kernel;
434       params.output_attr_array = kernel_state.output_alloc_attrs.data();
435       OpKernelContext ctx(&params, num_outputs);
436 
437       // Actually execute the kernel.
438       device->Compute(kernel_state.kernel, &ctx);
439       TF_RETURN_IF_ERROR(ctx.status());
440 
441       // Free the inputs to the current kernel.
442       for (size_t j = 0; j < num_inputs; ++j) {
443         inputs[input_start_index + j].ClearVal();
444       }
445 
446       // Forward the outputs of the kernel to the inputs of subsequent kernels.
447       for (size_t j = 0; j < num_outputs; ++j) {
448         TensorValue val = ctx.release_output(j);
449         const size_t num_destinations = kernel_state.output_locations[j].size();
450         if (num_destinations > 0) {
451           // TODO(mrry): Consider flattening the `output_locations` vector
452           // to improve the cache-friendliness of this loop.
453           for (size_t k = 0; k < num_destinations - 1; ++k) {
454             // TODO(mrry): Validate that the types match the expected values or
455             // ensure that the necessary validation has already happened.
456             Entry& input = inputs[kernel_state.output_locations[j][k]];
457             input.state = Entry::State::HAS_VALUE;
458             input.val.Init(*val.tensor);
459           }
460           // Move `arg` to the last consumer to avoid the cost of copying it.
461           Entry& input =
462               inputs[kernel_state.output_locations[j][num_destinations - 1]];
463           input.state = Entry::State::HAS_VALUE;
464           input.val.Init(std::move(*val.tensor));
465         }
466         delete val.tensor;
467       }
468     }
469     return Status::OK();
470   }
471 
472   // Execute all operations in the calling thread when asynchronous execution
473   // is requested. Callers may expect to perform expensive work in the calling
474   // thread even when the execution itself is single-threaded.
475   //
476   // This also avoid stack-overflow issues with functional control flow.
RunAsync(const Args & args,DoneCallback done)477   void RunAsync(const Args& args, DoneCallback done) override {
478     args.runner([this, args, done]() { done(Run(args)); });
479   }
480 
481  private:
482   const LocalExecutorParams params_;
483 
484   // All following members are read-only after Initialize().
485 
486   // The sum of the number of inputs for each node in the graph. This determines
487   // the length of the flat `inputs` vector. See comment at the beginning of
488   // `RunAsync()` for details.
489   size_t total_num_inputs_;
490 
491   // Represents cached graph structure state for each kernel.
492   struct KernelState {
493     // The kernel object. Not owned.
494     //
495     // This pointer is managed by `params_.create_kernel()` and
496     // `params_.delete_kernel()`.
497     OpKernel* kernel;
498 
499     // These fields determine the range of elements in `inputs` that corresponds
500     // to the inputs of `kernel`.
501     size_t input_start_index;
502     size_t num_inputs;
503 
504     size_t num_outputs;
505 
506     // For the `j`th output of `kernel`, `output_locations[j]` contains the
507     // locations in the flat `inputs` vector to which that output must be
508     // copied. See comment at the beginning of `Run()` for details.
509     std::vector<std::vector<size_t>>
510         output_locations;  // Length = `num_outputs`.
511 
512     // Memory space information for each output of `kernel`.
513     std::vector<AllocatorAttributes>
514         output_alloc_attrs;  // Length = `num_outputs`.
515   };
516   std::vector<KernelState> kernels_;
517 
518   // For the `i`th argument, `arg_output_locations_[i]` contains the locations
519   // in the flat `inputs` vector to which that argument must be copied.
520   std::vector<std::vector<size_t>>
521       arg_output_locations_;  // Length = `num_args`.
522 
523   // Represents cached graph structure state for each kernel that produces
524   // a single constant-valued tensor.
525   struct ConstTensorKernelState {
526     // The kernel object. Not owned.
527     //
528     // This pointer is managed by `params_.create_kernel()` and
529     // `params_.delete_kernel()`.
530     OpKernel* kernel;
531 
532     // The cached value of `kernel->const_tensor()`.
533     //
534     // NOTE: We keep a `Tensor` rather than a `const Tensor*` here in order to
535     // keep the reference count on the underlying buffer above 1. Otherwise, a
536     // kernel could interpret the input as a forwardable tensor, and mutate the
537     // underlying constant tensor.
538     Tensor const_tensor;
539 
540     // For the single output of `kernel`, `output_locations` contains the
541     // locations in the flat `inputs` vector to which that output must be
542     // copied. See comment at the beginning of `Run()` for details.
543     std::vector<size_t> output_locations;  // Length = `num_outputs`.
544 
545     // Memory space information for the single output of `kernel`.
546     AllocatorAttributes output_alloc_attr;
547   };
548   std::vector<ConstTensorKernelState> const_tensor_kernels_;
549 
550   // Memory space information for each input. This information is stored in the
551   // same order as the flat `inputs` vector. See comment at the beginning of
552   // `RunAsync()` for details.
553   std::vector<AllocatorAttributes>
554       input_alloc_attrs_;  // Length = `total_num_inputs_`.
555 };
556 
557 class SingleThreadedExecutorRegistrar {
558  public:
SingleThreadedExecutorRegistrar()559   SingleThreadedExecutorRegistrar() {
560     ExecutorFactory::Register(kSingleThreadedExecutor, new Factory());
561   }
562 
563  private:
564   class Factory : public ExecutorFactory {
NewExecutor(const LocalExecutorParams & params,const Graph & graph,std::unique_ptr<Executor> * out_executor)565     Status NewExecutor(const LocalExecutorParams& params, const Graph& graph,
566                        std::unique_ptr<Executor>* out_executor) override {
567       Executor* ret;
568       TF_RETURN_IF_ERROR(NewSingleThreadedExecutor(params, graph, &ret));
569       out_executor->reset(ret);
570       return Status::OK();
571     }
572   };
573 };
574 static SingleThreadedExecutorRegistrar registrar;
575 
576 }  // namespace
577 
NewSingleThreadedExecutor(const LocalExecutorParams & params,const Graph & graph,Executor ** executor)578 Status NewSingleThreadedExecutor(const LocalExecutorParams& params,
579                                  const Graph& graph, Executor** executor) {
580   auto impl = absl::make_unique<SingleThreadedExecutorImpl>(params);
581   TF_RETURN_IF_ERROR(impl->Initialize(graph));
582   *executor = impl.release();
583   return Status::OK();
584 }
585 
586 }  // namespace tensorflow
587