1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/graph_view.h"
17
18 #include <atomic>
19 #include <deque>
20 #include <memory>
21 #include <string>
22 #include <unordered_map>
23 #include <vector>
24
25 #include "tensorflow/core/common_runtime/device.h"
26 #include "tensorflow/core/framework/node_def_util.h"
27 #include "tensorflow/core/framework/op_kernel.h"
28 #include "tensorflow/core/framework/tensor.h"
29 #include "tensorflow/core/graph/edgeset.h"
30 #include "tensorflow/core/graph/graph.h"
31 #include "tensorflow/core/lib/core/errors.h"
32 #include "tensorflow/core/lib/gtl/inlined_vector.h"
33 #include "tensorflow/core/lib/strings/str_util.h"
34 #include "tensorflow/core/util/device_name_utils.h"
35
36 namespace tensorflow {
37
DebugString() const38 string NodeItem::DebugString() const {
39 string ret = strings::StrCat("{name:'", kernel->name(), "' id:", node_id);
40 if (is_source) {
41 strings::StrAppend(&ret, " source}");
42 } else {
43 strings::StrAppend(&ret, " def:{", SummarizeNodeDef(kernel->def()), "}}");
44 }
45 return ret;
46 }
47
~GraphView()48 GraphView::~GraphView() {
49 static_assert(std::is_trivially_destructible<AllocatorAttributes>::value,
50 "Update code if AllocatorAttributes gains a destructor");
51 static_assert(std::is_trivially_destructible<EdgeInfo>::value,
52 "Update code if EdgeInfo gains a destructor");
53 for (int i = 0; i < num_nodes_; i++) {
54 NodeItem* n = node(i);
55 if (n != nullptr) {
56 n->NodeItem::~NodeItem();
57 // Memory for "n" itself is held in space_ & gets cleaned up below
58 }
59 }
60 delete[] node_offsets_;
61 delete[] space_;
62 }
63
64 namespace {
65 typedef std::tuple<int32, int32> OutputAndControlEdges;
66
CountOutputEdges(const Node * n)67 OutputAndControlEdges CountOutputEdges(const Node* n) {
68 DCHECK_LE(n->out_edges().size(), kint32max);
69 int32 num_output_edges = 0;
70 int32 num_output_control_edges = 0;
71 for (auto e : n->out_edges()) {
72 if (IsSink(e->dst())) continue;
73 if (e->IsControlEdge()) {
74 ++num_output_control_edges;
75 } else {
76 ++num_output_edges;
77 }
78 }
79 return OutputAndControlEdges(num_output_edges, num_output_control_edges);
80 }
81 } // namespace
82
NodeItemBytes(const Node * n)83 size_t GraphView::NodeItemBytes(const Node* n) {
84 int32 num_output_edges;
85 int32 num_output_control_edges;
86 std::tie(num_output_edges, num_output_control_edges) = CountOutputEdges(n);
87 const int num_inputs = n->num_inputs();
88 const int num_outputs = n->num_outputs();
89
90 // Compute number of bytes needed for NodeItem and variable length data.
91 // We do not subtract sizeof(var) since num_inputs/num_outputs might
92 // both be zero.
93 const size_t raw_bytes =
94 sizeof(NodeItem) // Fixed
95 + num_output_edges * sizeof(EdgeInfo) // output_edges[...]
96 + num_output_control_edges * //
97 sizeof(ControlEdgeInfo) // output_control_edges[...]
98 + num_outputs * sizeof(AllocatorAttributes) // output_attr[...]
99 + num_outputs * sizeof(int) // forward_from[num_outputs]
100 + num_inputs * sizeof(uint8) // input_type[num_inputs]
101 + num_outputs * sizeof(uint8); // output_type[num_outputs]
102 static constexpr size_t kItemAlignment = sizeof(NodeItem*);
103 static_assert(kItemAlignment % alignof(NodeItem) == 0,
104 "NodeItem must be aligned with kItemAlignment");
105 static_assert(kItemAlignment % alignof(EdgeInfo) == 0,
106 "EdgeInfo must be aligned with kItemAlignment");
107 static_assert(kItemAlignment % alignof(ControlEdgeInfo) == 0,
108 "ControlEdgeInfo must be aligned with kItemAlignment");
109 static_assert(kItemAlignment % alignof(AllocatorAttributes) == 0,
110 "AllocatorAttributes must be aligned with kItemAlignment");
111 static_assert(sizeof(NodeItem) % alignof(EdgeInfo) == 0,
112 "NodeItem must be aligned with EdgeInfo");
113 static_assert(sizeof(NodeItem) % alignof(AllocatorAttributes) == 0,
114 "NodeItem must be aligned with AllocatorAttributes");
115 static_assert(sizeof(EdgeInfo) % alignof(AllocatorAttributes) == 0,
116 "EdgeInfo must be aligned with AllocatorAttributes");
117 const size_t bytes =
118 ((raw_bytes + kItemAlignment - 1) / kItemAlignment) * kItemAlignment;
119 return bytes;
120 }
121
InitializeNode(char * ptr,const Node * n)122 char* GraphView::InitializeNode(char* ptr, const Node* n) {
123 const int id = n->id();
124 CHECK(node_offsets_[id] == kuint32max); // Initial value in constructor
125
126 const size_t bytes = NodeItemBytes(n);
127 constexpr size_t kItemAlignment = sizeof(NodeItem*);
128 CHECK_EQ(reinterpret_cast<uintptr_t>(ptr) % kItemAlignment, 0);
129 NodeItem* item = reinterpret_cast<NodeItem*>(ptr);
130
131 // We store a 32-bit offset relative to the beginning of space_, so that we
132 // only need an array of 32-bit values to map from node id to the NodeItem*,
133 // (versus 64 bits on most machines if we just stored an array of NodeItem*
134 // pointers). Casting to int64 is needed on 32bit CPU to avoid comparing
135 // values as "int" vs "size_t" in CHECK_LE.
136 CHECK_LE(static_cast<int64>(ptr - space_), kuint32max);
137 const uint32 offset = static_cast<uint32>(ptr - space_);
138 node_offsets_[id] = offset;
139 ptr += bytes;
140
141 int32 num_output_edges;
142 int32 num_output_control_edges;
143 std::tie(num_output_edges, num_output_control_edges) = CountOutputEdges(n);
144 const int num_inputs = n->num_inputs();
145 const int num_outputs = n->num_outputs();
146
147 new (item) NodeItem();
148 item->num_inputs = num_inputs;
149 item->num_outputs = num_outputs;
150 item->num_output_edges = num_output_edges;
151 item->num_output_control_edges = num_output_control_edges;
152
153 // Fill output edges.
154 // Keep track of the last EdgeInfo in the EdgeInfo array that references
155 // a given output slot. For all but the last, we need to do a copy of the
156 // Tensor when propagating results downstream in the graph, but for the
157 // last one, we can just do a move of the Tensor object to propagate it.
158 gtl::InlinedVector<EdgeInfo*, 4> last_indices(num_outputs, nullptr);
159 EdgeInfo* dst_edge = item->output_edge_base();
160 for (auto e : n->out_edges()) {
161 if (e->IsControlEdge()) continue;
162 dst_edge->dst_id = e->dst()->id();
163 CHECK_LE(e->src_output(), 0x3FFFFFFF); // Must fit in 31 bits
164 dst_edge->output_slot = e->src_output();
165 dst_edge->is_last = false;
166 const int output_slot = dst_edge->output_slot;
167 if (output_slot >= 0) {
168 last_indices[output_slot] = dst_edge;
169 }
170 // NOTE: The `input_slot` will be rewritten to the frame-wide offset later
171 // in `ExecutorImpl::Initialize()`.
172 dst_edge->input_slot = e->dst_input();
173 dst_edge++;
174 }
175 for (EdgeInfo* edge_info : last_indices) {
176 if (edge_info != nullptr) {
177 edge_info->is_last = true;
178 }
179 }
180 ControlEdgeInfo* dst_control_edge = item->output_control_edge_base();
181 for (auto e : n->out_edges()) {
182 if (!e->IsControlEdge() || IsSink(e->dst())) continue;
183 dst_control_edge->dst_id = e->dst()->id();
184 dst_control_edge++;
185 }
186
187 AllocatorAttributes* output_attrs = item->output_attr_base();
188 for (int i = 0; i < num_outputs; i++) {
189 new (&output_attrs[i]) AllocatorAttributes();
190 }
191
192 DCHECK_LT(DataType_MAX, 255); // Must fit in uint8
193 uint8* input_types = item->input_type_base();
194 item->is_any_input_ref_typed = false;
195 for (int i = 0; i < num_inputs; i++) {
196 input_types[i] = static_cast<uint8>(n->input_type(i));
197 DCHECK_EQ(item->input_type(i), n->input_type(i));
198 item->is_any_input_ref_typed |= IsRefType(n->input_type(i));
199 }
200
201 // Check ScopedAllocatorAttrs and forward_from. Also assign output_types.
202 {
203 std::vector<int> forward_input;
204 Status fwd_status =
205 GetNodeAttr(n->attrs(), "_forward_input", &forward_input);
206 std::vector<int> scoped_allocator_attrs;
207 Status sa_status =
208 GetNodeAttr(n->attrs(), "_scoped_allocator", &scoped_allocator_attrs);
209
210 int* forward_from = item->forward_from_base();
211 uint8* output_types = item->output_type_base();
212 for (int i = 0; i < num_outputs; ++i) {
213 output_types[i] = static_cast<uint8>(n->output_type(i));
214 DCHECK_EQ(item->output_type(i), n->output_type(i));
215
216 forward_from[i] = OpKernelContext::Params::kNoReservation;
217 if (sa_status.ok()) {
218 for (int j = 0; j < scoped_allocator_attrs.size(); j += 2) {
219 if (scoped_allocator_attrs[j] == i) {
220 // This output slot must be explicitly allocated from a
221 // ScopedAllocator.
222 forward_from[i] = OpKernelContext::Params::kNeverForward;
223 DCHECK_EQ(output_attrs[i].scope_id, 0);
224 output_attrs[i].scope_id = scoped_allocator_attrs[j + 1];
225 }
226 }
227 }
228 if (fwd_status.ok() &&
229 forward_from[i] == OpKernelContext::Params::kNoReservation) {
230 DCHECK_EQ(forward_input.size() % 2, 0);
231 for (int j = 0; j < forward_input.size(); j += 2) {
232 if (forward_input[j + 1] == i) {
233 DCHECK_EQ(forward_from[i], OpKernelContext::Params::kNoReservation);
234 forward_from[i] = forward_input[j];
235 break;
236 }
237 }
238 }
239 }
240 }
241
242 return ptr;
243 }
244
Initialize(const Graph * g)245 Status GraphView::Initialize(const Graph* g) {
246 CHECK(node_offsets_ == nullptr);
247 const int num_nodes = g->num_node_ids();
248 num_nodes_ = num_nodes;
249 size_t total_bytes = 0;
250 for (const Node* n : g->nodes()) {
251 if (n->out_edges().size() > kint32max) {
252 return errors::InvalidArgument(
253 "The executor cannot handle nodes with more than ", kint32max,
254 " output edges. Node ", n->name(), " had ", n->out_edges().size(),
255 " output edges.");
256 }
257 total_bytes += NodeItemBytes(n);
258 }
259
260 node_offsets_ = new uint32[num_nodes];
261 for (int i = 0; i < num_nodes; i++) {
262 node_offsets_[i] = kuint32max;
263 }
264
265 space_ = new char[total_bytes]; // NodeItem objects are allocated here
266 char* ptr = space_;
267 for (const Node* n : g->nodes()) {
268 ptr = InitializeNode(ptr, n);
269 }
270 CHECK_EQ(ptr, space_ + total_bytes);
271 return Status::OK();
272 }
273
274 namespace {
275 // If a Node has been marked to use a ScopedAllocator x for output i, then
276 // sc_attr will contain the subsequence (i, x) at an even offset. This function
277 // extracts and transfers that ScopedAllocator id to alloc_attr. For now, we
278 // only allow one ScopedAllocator use per Node.
ExtractScopedAllocatorAttr(const std::vector<int> & sc_attr,int output_index,AllocatorAttributes * alloc_attr)279 bool ExtractScopedAllocatorAttr(const std::vector<int>& sc_attr,
280 int output_index,
281 AllocatorAttributes* alloc_attr) {
282 DCHECK_LE(2, sc_attr.size());
283 for (int i = 0; i < sc_attr.size(); i += 2) {
284 if (sc_attr[i] == output_index) {
285 CHECK_EQ(alloc_attr->scope_id, 0);
286 alloc_attr->scope_id = sc_attr[i + 1];
287 return true;
288 }
289 }
290 return false;
291 }
292 } // namespace
293
SetScopedAllocatorAttrs(const std::vector<const Node * > & sa_nodes)294 void GraphView::SetScopedAllocatorAttrs(
295 const std::vector<const Node*>& sa_nodes) {
296 for (const Node* sa : sa_nodes) {
297 NodeItem* sa_item = node(sa->id());
298 AllocatorAttributes* sa_attrs = sa_item->output_attr_base();
299 // Control edges out of the ScopedAllocator should be use instances, but may
300 // include a few other nodes.
301 for (const auto& e : sa->out_edges()) {
302 if (IsSink(e->dst()) || !e->IsControlEdge()) {
303 continue;
304 }
305 Node* use_node = e->dst();
306 NodeItem* item = node(use_node->id());
307 AllocatorAttributes* use_attrs = item->output_attr_base();
308 std::vector<int> scoped_allocator_attrs;
309 Status s = GetNodeAttr(use_node->attrs(), "_scoped_allocator",
310 &scoped_allocator_attrs);
311 if (!s.ok()) {
312 VLOG(2) << "Failed to find expected ScopedAllocator attr on "
313 << use_node->name();
314 continue;
315 }
316 // There can be more than one output using ScopedAllocation, but this
317 // analysis assumes they use the same ScopedAllocator.
318 for (const auto& e : use_node->out_edges()) {
319 if (IsSink(e->dst()) || !e->IsControlEdge()) {
320 AllocatorAttributes attr;
321 if (ExtractScopedAllocatorAttr(scoped_allocator_attrs,
322 e->src_output(), &attr)) {
323 // Set the scope_id on this use instance node.
324 (use_attrs + e->src_output())->Merge(attr);
325 // Propagate the other attributes of this node back to the SA node.
326 attr = *(use_attrs + e->src_output());
327 attr.scope_id = 0;
328 sa_attrs->Merge(attr);
329 }
330 }
331 }
332 }
333 }
334 }
335
336 namespace {
InferAllocAttr(const Node * n,const Node * dst,const DeviceNameUtils::ParsedName & local_dev_name,AllocatorAttributes * attr)337 Status InferAllocAttr(const Node* n, const Node* dst,
338 const DeviceNameUtils::ParsedName& local_dev_name,
339 AllocatorAttributes* attr) {
340 Status s;
341 // Note that it's possible for *n to be a Recv and *dst to be a Send,
342 // so these two cases are not mutually exclusive.
343 if (IsRecv(n)) {
344 string src_name;
345 s = GetNodeAttr(n->attrs(), "send_device", &src_name);
346 if (!s.ok()) return s;
347 DeviceNameUtils::ParsedName parsed_src_name;
348 if (!DeviceNameUtils::ParseFullName(src_name, &parsed_src_name)) {
349 s = errors::Internal("Bad send_device attr '", src_name, "' in node ",
350 n->name());
351 return s;
352 }
353 if (!DeviceNameUtils::IsSameAddressSpace(parsed_src_name, local_dev_name)) {
354 // Value is going to be the sink of an RPC.
355 attr->set_nic_compatible(true);
356 VLOG(2) << "node " << n->name() << " is the sink of an RPC in";
357 } else if ((local_dev_name.type == "CPU" || n->IsHostRecv()) &&
358 parsed_src_name.type != "CPU") {
359 // Value is going to be the sink of a local DMA from GPU to CPU (or
360 // other types of accelerators).
361 attr->set_gpu_compatible(true);
362 VLOG(2) << "node " << n->name() << " is the sink of a gpu->cpu copy";
363 } else {
364 VLOG(2) << "default alloc case local type " << local_dev_name.type
365 << " remote type " << parsed_src_name.type;
366 }
367 }
368 if (IsSend(dst)) {
369 string dst_name;
370 s = GetNodeAttr(dst->attrs(), "recv_device", &dst_name);
371 if (!s.ok()) return s;
372 DeviceNameUtils::ParsedName parsed_dst_name;
373 if (!DeviceNameUtils::ParseFullName(dst_name, &parsed_dst_name)) {
374 s = errors::Internal("Bad recv_device attr '", dst_name, "' in node ",
375 n->name());
376 return s;
377 }
378 if (!DeviceNameUtils::IsSameAddressSpace(parsed_dst_name, local_dev_name)) {
379 // Value is going to be the source of an RPC.
380 attr->set_nic_compatible(true);
381 VLOG(2) << "node " << n->name() << " is the source of an RPC out";
382 } else if ((local_dev_name.type == "CPU" || dst->IsHostSend()) &&
383 parsed_dst_name.type != "CPU") {
384 // Value is going to be the source of a local DMA from CPU to GPU (or
385 // other types of accelerators).
386 // Note that this does not cover the case where the allocation of the
387 // output tensor is not generated by the src: n.
388 attr->set_gpu_compatible(true);
389 VLOG(2) << "node " << n->name() << " is the source of a cpu->gpu copy";
390 } else {
391 VLOG(2) << "default alloc case local type " << local_dev_name.type
392 << " remote type " << parsed_dst_name.type;
393 }
394 }
395 if (n->IsCollective()) {
396 // We'll make the sweeping assumption that any collective op is going
397 // to be involved in network i/o.
398 attr->set_nic_compatible(true);
399 }
400 return s;
401 }
402 } // namespace
403
SetAllocAttrs(const Graph * g,const Device * device)404 Status GraphView::SetAllocAttrs(const Graph* g, const Device* device) {
405 Status s;
406 DeviceNameUtils::ParsedName local_dev_name = device->parsed_name();
407
408 std::vector<const Node*> scoped_allocator_instances;
409 for (const Node* n : g->nodes()) {
410 NodeItem* item = node(n->id());
411 AllocatorAttributes* attrs = item->output_attr_base();
412 if (IsScopedAllocator(n)) {
413 scoped_allocator_instances.push_back(n);
414 }
415
416 // Examine the out edges of each node looking for special use
417 // cases that may affect memory allocation attributes.
418 for (const auto& e : n->out_edges()) {
419 if (!e->IsControlEdge()) {
420 AllocatorAttributes attr;
421 s = InferAllocAttr(n, e->dst(), local_dev_name, &attr);
422 if (!s.ok()) return s;
423 if (attr.value != 0 || attr.scope_id != 0) {
424 attrs[e->src_output()].Merge(attr);
425 }
426 }
427 }
428
429 for (int out = 0; out < n->num_outputs(); out++) {
430 const OpKernel* op_kernel = item->kernel;
431 DCHECK_LT(out, op_kernel->output_memory_types().size());
432 bool on_host = op_kernel->output_memory_types()[out] == HOST_MEMORY;
433 if (on_host) {
434 AllocatorAttributes h;
435 h.set_on_host(on_host);
436 attrs[out].Merge(h);
437 }
438 }
439 }
440 SetScopedAllocatorAttrs(scoped_allocator_instances);
441 return s;
442 }
443
444 } // namespace tensorflow
445