1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/compiler/xla/service/cpu/cpu_executable.h"
17
18 #include <stdint.h>
19 #include <algorithm>
20 #include <set>
21 #include <unordered_set>
22 #include <utility>
23 #include <vector>
24
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "llvm/ExecutionEngine/Orc/IRCompileLayer.h"
29 #include "tensorflow/compiler/xla/service/buffer_assignment.h"
30 #include "tensorflow/compiler/xla/service/computation_layout.h"
31 #include "tensorflow/compiler/xla/service/hlo_computation.h"
32 #include "tensorflow/compiler/xla/service/hlo_module.h"
33 #include "tensorflow/compiler/xla/service/logical_buffer.h"
34 #include "tensorflow/compiler/xla/service/shaped_buffer.h"
35 #include "tensorflow/compiler/xla/shape_tree.h"
36 #include "tensorflow/compiler/xla/shape_util.h"
37 #include "tensorflow/compiler/xla/status_macros.h"
38 #include "tensorflow/compiler/xla/types.h"
39 #include "tensorflow/compiler/xla/util.h"
40 #include "tensorflow/compiler/xla/xla_data.pb.h"
41 #include "tensorflow/core/platform/env.h"
42 #include "tensorflow/core/platform/logging.h"
43 #include "tensorflow/core/platform/mem.h"
44 #include "tensorflow/core/platform/mutex.h"
45 #include "tensorflow/core/platform/types.h"
46 #include "tensorflow/stream_executor/host/host_stream.h"
47
48 namespace xla {
49 namespace cpu {
50
CpuExecutable(std::unique_ptr<SimpleOrcJIT> jit,std::unique_ptr<const BufferAssignment> assignment,std::unique_ptr<HloModule> hlo_module,const string & entry_function_name,std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data,std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map)51 CpuExecutable::CpuExecutable(
52 std::unique_ptr<SimpleOrcJIT> jit,
53 std::unique_ptr<const BufferAssignment> assignment,
54 std::unique_ptr<HloModule> hlo_module, const string& entry_function_name,
55 std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data,
56 std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map)
57 : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data),
58 std::move(hlo_profile_index_map)),
59 jit_(std::move(jit)),
60 assignment_(std::move(assignment)) {
61 // Resolve symbols in the constructor rather than at execution time to avoid
62 // races because FindSymbol is not thread safe.
63 llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry_function_name);
64 // We expect to find the symbol provided with entry_function_name; otherwise
65 // this is an internal error.
66 CHECK(sym) << "Symbol " << entry_function_name << " not found.";
67 // getAddress can do work under the hood in the jit, so it needs to be
68 // guarded by the mutex.
69 compute_function_ =
70 reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress()));
71 VLOG(1) << "compute_function_ at address "
72 << reinterpret_cast<void*>(compute_function_);
73 }
74
75 StatusOr<std::pair<std::vector<se::DeviceMemoryBase>,
76 std::vector<OwningDeviceMemory>>>
CreateBufferTable(DeviceMemoryAllocator * memory_allocator,int device_ordinal,absl::Span<const ShapedBuffer * const> arguments)77 CpuExecutable::CreateBufferTable(
78 DeviceMemoryAllocator* memory_allocator, int device_ordinal,
79 absl::Span<const ShapedBuffer* const> arguments) {
80 std::vector<se::DeviceMemoryBase> unowning_buffers(
81 assignment_->Allocations().size());
82 std::vector<OwningDeviceMemory> owning_buffers(
83 assignment_->Allocations().size());
84 VLOG(3) << "Allocating " << assignment_->Allocations().size()
85 << " allocations for module " << module().name();
86 for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size();
87 ++i) {
88 auto& allocation = assignment_->GetAllocation(i);
89
90 VLOG(3) << allocation.ToString();
91
92 if (allocation.is_entry_computation_parameter()) {
93 unowning_buffers[i] = arguments[allocation.parameter_number()]->buffer(
94 allocation.param_shape_index());
95 VLOG(3) << "allocation #" << i << " is a parameter";
96 continue;
97 }
98
99 if (allocation.is_constant()) {
100 VLOG(3) << "allocation #" << i << " is a constant";
101 continue;
102 }
103
104 if (allocation.is_thread_local()) {
105 VLOG(3) << "buffer #" << i << " is thread-local";
106 continue;
107 }
108
109 int64 buffer_size = allocation.size();
110 if (!owning_buffers[i].is_null()) {
111 VLOG(3) << "buffer #" << i
112 << " is in the preallocated result ShapedBuffer";
113 } else {
114 TF_ASSIGN_OR_RETURN(owning_buffers[i], memory_allocator->Allocate(
115 device_ordinal, buffer_size));
116 unowning_buffers[i] = owning_buffers[i].AsDeviceMemoryBase();
117
118 VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes ["
119 << owning_buffers[i].opaque() << "]";
120 }
121
122 // Since the output buffer and all the temporary buffers were written into
123 // by the JITed code, msan has no way of knowing their memory was
124 // initialized. Mark them initialized so that msan doesn't flag loads from
125 // these buffers.
126 TF_ANNOTATE_MEMORY_IS_INITIALIZED(owning_buffers[i].opaque(), buffer_size);
127 }
128
129 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
130 assignment_->GetUniqueTopLevelOutputSlice());
131 VLOG(3) << "result index: " << result_slice.index();
132
133 return {{std::move(unowning_buffers), std::move(owning_buffers)}};
134 }
135
ExecuteComputeFunction(const ExecutableRunOptions * run_options,absl::Span<const se::DeviceMemoryBase> buffers,HloExecutionProfile * hlo_execution_profile)136 Status CpuExecutable::ExecuteComputeFunction(
137 const ExecutableRunOptions* run_options,
138 absl::Span<const se::DeviceMemoryBase> buffers,
139 HloExecutionProfile* hlo_execution_profile) {
140 // The calling convention for JITed functions is:
141 //
142 // void function(void* result, const void* run_options, void** args_array,
143 // void** buffer_table)
144 //
145 // result: Points at the result.
146 // run_options: the ExecutableRunOptions object.
147 // args_array: null
148 // buffer_table: An array of pointers, containing pointers to temporary
149 // buffers required by the executable adn pointers to entry computation
150 // parameters.
151 //
152
153 uint64 start_micros = tensorflow::Env::Default()->NowMicros();
154
155 size_t profile_counters_size =
156 hlo_execution_profile ? hlo_execution_profile->profile_counters().size()
157 : 0;
158 int64* profile_counters =
159 hlo_execution_profile
160 ? hlo_execution_profile->mutable_profile_counters()->data()
161 : nullptr;
162
163 // Call the computation function following the calling convention.
164 std::vector<void*> buffer_pointers;
165 for (auto& buffer : buffers) {
166 buffer_pointers.push_back(const_cast<void*>(buffer.opaque()));
167 }
168 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
169 assignment_->GetUniqueTopLevelOutputSlice());
170 void* result_buffer = buffer_pointers[result_slice.index()];
171 if (VLOG_IS_ON(3)) {
172 VLOG(3) << "Executing compute function:";
173 VLOG(3) << absl::StrFormat(
174 " func(void* result, void* params[null], void* buffer_table[%u], "
175 "uint64 profile_counters[%u])",
176 buffer_pointers.size(), profile_counters_size);
177 VLOG(3) << absl::StrFormat(" result = %p", result_buffer);
178 auto ptr_printer = [](string* out, const void* p) {
179 absl::StrAppend(out, absl::StrFormat("%p", p));
180 };
181 VLOG(3) << " params = nullptr";
182 VLOG(3) << absl::StrFormat(
183 " buffer_table = [%s]",
184 absl::StrJoin(buffer_pointers, ", ", ptr_printer));
185 VLOG(3) << absl::StrFormat(" profile_counters = %p", profile_counters);
186 }
187
188 compute_function_(result_buffer, run_options, nullptr, buffer_pointers.data(),
189 profile_counters);
190
191 uint64 end_micros = tensorflow::Env::Default()->NowMicros();
192
193 {
194 tensorflow::mutex_lock lock(mutex_);
195 const double nanoseconds = (end_micros - start_micros) * 1000.0;
196 execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0));
197 // If hlo profiling was disabled then the cycle count is left empty.
198 if (hlo_execution_profile) {
199 execution_profile_.set_compute_cycle_count(
200 hlo_execution_profile->total_cycles_executed(
201 *module().entry_computation()));
202 }
203 }
204
205 return Status::OK();
206 }
207
CreateResultShapedBuffer(const ServiceExecutableRunOptions * run_options,absl::Span<OwningDeviceMemory> buffers)208 StatusOr<ScopedShapedBuffer> CpuExecutable::CreateResultShapedBuffer(
209 const ServiceExecutableRunOptions* run_options,
210 absl::Span<OwningDeviceMemory> buffers) {
211 se::Stream* stream = run_options->stream();
212 ScopedShapedBuffer result_buffer(
213 /*on_host_shape=*/result_shape(),
214 /*on_device_shape=*/result_shape(), run_options->allocator(),
215 stream->parent()->device_ordinal());
216 const HloInputOutputAliasConfig& input_output_alias =
217 module().input_output_alias_config();
218
219 // Move OwningDeviceMemory values which contain the array(s) of the result
220 // into the respective location in ScopedShapedBuffer which is returned to the
221 // caller.
222 TF_RETURN_IF_ERROR(result_buffer.buffers().ForEachMutableElementWithStatus(
223 [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) {
224 const auto& sources = this->GetRootPointsToSet().element(index);
225 // The points to set is unambiguous so the set should be a
226 // singleton.
227 CHECK_EQ(1, sources.size());
228 const LogicalBuffer* buffer_source = sources[0];
229 HloInstruction* src = buffer_source->instruction();
230
231 // The source for this result buffer can be a nested buffer such as
232 // a tuple element. The source instruction should have a
233 // non-parameter buffer assigned.
234 TF_ASSIGN_OR_RETURN(
235 const BufferAllocation::Slice slice,
236 this->assignment_->GetUniqueSlice(src, buffer_source->index()));
237 const BufferAllocation::Index buffer_index = slice.index();
238 OwningDeviceMemory& buffer = buffers[buffer_index];
239 if (!slice.allocation()->is_entry_computation_parameter()) {
240 // If the buffer coming out of the result is from a parameter, the
241 // owning buffer will be null, and that means the caller aliased some
242 // parameter buffer to an output one (via the
243 // HloInputOutputAliasConfig API). If that is the case, the caller
244 // will receive a partially complete scoped shaped buffer, which they
245 // will have to fill up on return. Unfortunately the interface to the
246 // execute APIs are ShapedBuffer pointer based, which assumes caller
247 // ownership, and hence a buffer coming from there cannot be part of
248 // the new ScopedShapedBuffer we create for the result (which assumes
249 // ownership).
250 *device_memory = buffer.Forget();
251 } else {
252 auto output_alias = input_output_alias.GetAliasedOutput(
253 slice.allocation()->parameter_number(),
254 slice.allocation()->param_shape_index());
255 CHECK(output_alias)
256 << "Ouput buffer is coming from parameter "
257 << slice.allocation()->parameter_number() << " at index "
258 << slice.allocation()->param_shape_index()
259 << ", but no alias exists";
260 CHECK_EQ(*output_alias, index);
261 }
262 return Status::OK();
263 }));
264 return std::move(result_buffer);
265 }
266
ExecuteOnStream(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments,HloExecutionProfile * hlo_execution_profile)267 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteOnStream(
268 const ServiceExecutableRunOptions* run_options,
269 absl::Span<const ShapedBuffer* const> arguments,
270 HloExecutionProfile* hlo_execution_profile) {
271 TF_ASSIGN_OR_RETURN(
272 auto result,
273 ExecuteAsyncOnStreamImpl(run_options, arguments, hlo_execution_profile));
274 TF_RETURN_IF_ERROR(run_options->stream()->BlockHostUntilDone());
275 return std::move(result);
276 }
277
ExecuteAsyncOnStream(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments)278 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteAsyncOnStream(
279 const ServiceExecutableRunOptions* run_options,
280 absl::Span<const ShapedBuffer* const> arguments) {
281 if (hlo_profiling_enabled()) {
282 return Unimplemented(
283 "Asynchronous execution on stream with hlo profiling is not yet "
284 "supported on CPU.");
285 }
286 return ExecuteAsyncOnStreamImpl(run_options, arguments, nullptr);
287 }
288
ExecuteAsyncOnStreamImpl(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments,HloExecutionProfile * hlo_execution_profile)289 StatusOr<ScopedShapedBuffer> CpuExecutable::ExecuteAsyncOnStreamImpl(
290 const ServiceExecutableRunOptions* run_options,
291 absl::Span<const ShapedBuffer* const> arguments,
292 HloExecutionProfile* hlo_execution_profile) {
293 if (GetRootPointsToSet().IsAmbiguous()) {
294 return Unimplemented("Points-to set of root instruction is ambiguous");
295 }
296
297 auto* host_stream = dynamic_cast<se::host::HostStream*>(
298 run_options->stream()->implementation());
299 se::Stream* stream = run_options->stream();
300 DeviceMemoryAllocator* memory_allocator = run_options->allocator();
301 std::vector<OwningDeviceMemory> owning_buffers;
302 std::vector<se::DeviceMemoryBase> unowning_buffers;
303 TF_ASSIGN_OR_RETURN(
304 std::tie(unowning_buffers, owning_buffers),
305 CreateBufferTable(memory_allocator, stream->parent()->device_ordinal(),
306 arguments));
307
308 TF_ASSIGN_OR_RETURN(
309 ScopedShapedBuffer result,
310 CreateResultShapedBuffer(run_options, absl::MakeSpan(owning_buffers)));
311
312 // At this point, `unowning_buffers` contains unowning pointers to all of our
313 // buffers, and `buffers` contains owning pointers to the non-live-out
314 // buffers. Enqueue a task which keeps alive the non-live-out buffers.
315 //
316 // Logically we want this lambda to capture `buffers` by move, ultimately our
317 // functor needs to be wrapped in an std::function, and that requires its
318 // functor to be copyable. Thus we perpitrate the hack of capturing buffers
319 // "by shared pointer".
320 //
321 // We also need to change the types of some of the variables we capture:
322 // run_options needs to change from a pointer to a value type, and arguments
323 // needs to change from a Span into a vector. We use a struct instead
324 // of a lambda to make this explicit.
325 struct AsyncRunTask {
326 CpuExecutable* executable;
327 ServiceExecutableRunOptions run_options;
328 std::vector<se::DeviceMemoryBase> unowning_buffers;
329 std::shared_ptr<std::vector<OwningDeviceMemory>> buffers;
330 HloExecutionProfile* hlo_execution_profile;
331
332 void operator()() {
333 // Failing a CHECK here is not great, but I don't see an obvious way to
334 // return a failed Status asynchronously.
335 TF_CHECK_OK(executable->ExecuteComputeFunction(
336 &run_options.run_options(), unowning_buffers, hlo_execution_profile));
337 }
338 };
339 host_stream->EnqueueTask(
340 AsyncRunTask{this, *run_options, std::move(unowning_buffers),
341 std::make_shared<std::vector<OwningDeviceMemory>>(
342 std::move(owning_buffers)),
343 hlo_execution_profile});
344
345 return std::move(result);
346 }
347
ShapeSizeBytes(const Shape & shape)348 /*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) {
349 // On the cpu, opaques are pointers.
350 if (shape.IsOpaque()) {
351 return sizeof(void*);
352 }
353 return ShapeUtil::ByteSizeOf(shape, sizeof(void*));
354 }
355
GetRootPointsToSet() const356 const PointsToSet& CpuExecutable::GetRootPointsToSet() const {
357 return assignment_->points_to_analysis().GetPointsToSet(
358 module().entry_computation()->root_instruction());
359 }
360
361 } // namespace cpu
362 } // namespace xla
363