1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #define EIGEN_USE_THREADS
16
17 #include "tensorflow/compiler/xla/service/hlo_runner.h"
18
19 #include <string>
20 #include <utility>
21
22 #include "absl/memory/memory.h"
23 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
24 #include "tensorflow/compiler/xla/layout_util.h"
25 #include "tensorflow/compiler/xla/service/executable.h"
26 #include "tensorflow/compiler/xla/service/hlo_module_group.h"
27 #include "tensorflow/compiler/xla/service/hlo_parser.h"
28 #include "tensorflow/compiler/xla/service/transfer_manager.h"
29 #include "tensorflow/compiler/xla/shape.h"
30 #include "tensorflow/compiler/xla/shape_util.h"
31 #include "tensorflow/core/lib/core/blocking_counter.h"
32 #include "tensorflow/core/platform/logging.h"
33 #include "tensorflow/core/platform/types.h"
34
35 namespace xla {
36
HloRunner(se::Platform * platform,int intra_op_parallelism_threads)37 HloRunner::HloRunner(se::Platform* platform, int intra_op_parallelism_threads) {
38 BackendOptions backend_options;
39 backend_options.set_platform(platform);
40 backend_options.set_intra_op_parallelism_threads(
41 intra_op_parallelism_threads);
42 backend_ = Backend::CreateBackend(backend_options).ConsumeValueOrDie();
43 VLOG(1) << "Created HloRunner for platform: " << platform->Name();
44 }
45
~HloRunner()46 HloRunner::~HloRunner() {}
47
TransferLiteralToDevice(const Literal & literal)48 StatusOr<ScopedShapedBuffer> HloRunner::TransferLiteralToDevice(
49 const Literal& literal) {
50 TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
51 backend().transfer_manager()->AllocateScopedShapedBuffer(
52 literal.shape(), backend().memory_allocator(),
53 backend().default_device_ordinal()));
54 TF_ASSIGN_OR_RETURN(
55 auto stream, backend().BorrowStream(backend().default_stream_executor()));
56 TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
57 stream.get(), literal, buffer));
58 return std::move(buffer);
59 }
60
TransferLiteralsToDevice(absl::Span<const Literal * const> literals)61 StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
62 absl::Span<const Literal* const> literals) {
63 std::vector<ScopedShapedBuffer> buffers;
64 for (const Literal* literal : literals) {
65 CHECK(literal != nullptr);
66 TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
67 TransferLiteralToDevice(*literal));
68 buffers.push_back(std::move(buffer));
69 }
70 return std::move(buffers);
71 }
72
TransferLiteralsToDevice(absl::Span<const Literal> literals)73 StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
74 absl::Span<const Literal> literals) {
75 std::vector<const Literal*> literal_pointers;
76 literal_pointers.reserve(literals.size());
77 for (const auto& literal : literals) {
78 literal_pointers.push_back(&literal);
79 }
80 return TransferLiteralsToDevice(literal_pointers);
81 }
82
TransferLiteralFromDevice(const ShapedBuffer & buffer)83 StatusOr<Literal> HloRunner::TransferLiteralFromDevice(
84 const ShapedBuffer& buffer) {
85 TF_ASSIGN_OR_RETURN(
86 auto stream, backend().BorrowStream(backend().default_stream_executor()));
87 return backend().transfer_manager()->TransferLiteralFromDevice(stream.get(),
88 buffer);
89 }
90
Execute(std::unique_ptr<HloModule> module,absl::Span<const Literal * const> arguments,bool run_hlo_passes,ExecutionProfile * profile)91 StatusOr<Literal> HloRunner::Execute(std::unique_ptr<HloModule> module,
92 absl::Span<const Literal* const> arguments,
93 bool run_hlo_passes,
94 ExecutionProfile* profile) {
95 TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
96 TransferLiteralsToDevice(arguments));
97 TF_ASSIGN_OR_RETURN(ExecutionOutput result,
98 ExecuteWithDeviceBuffers(
99 /*module=*/std::move(module),
100 /*arguments=*/argument_buffers,
101 /*run_hlo_passes=*/run_hlo_passes,
102 /*profile=*/profile));
103 return TransferLiteralFromDevice(result.Result());
104 }
105
ExecuteWithExecutable(std::unique_ptr<Executable> executable,absl::Span<const Literal * const> arguments,ExecutionProfile * profile)106 StatusOr<Literal> HloRunner::ExecuteWithExecutable(
107 std::unique_ptr<Executable> executable,
108 absl::Span<const Literal* const> arguments, ExecutionProfile* profile) {
109 TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
110 TransferLiteralsToDevice(arguments));
111 TF_ASSIGN_OR_RETURN(ExecutionOutput result,
112 ExecuteWithDeviceBuffers(
113 /*executable=*/executable.get(),
114 /*arguments=*/argument_buffers,
115 /*profile=*/profile));
116 return TransferLiteralFromDevice(result.Result());
117 }
118
119 // Convert the owning buffer of inputs into a (partially) owning vector of
120 // ExecutionInputs, and an owning vector of `OwningDeviceMemory`'s.
ExecutionInputsFromScopedShapedBuffers(absl::Span<ScopedShapedBuffer const> inputs,HloInputOutputAliasConfig alias_config,int device_ordinal,se::DeviceMemoryAllocator * allocator)121 static std::vector<ExecutionInput> ExecutionInputsFromScopedShapedBuffers(
122 absl::Span<ScopedShapedBuffer const> inputs,
123 HloInputOutputAliasConfig alias_config, int device_ordinal,
124 se::DeviceMemoryAllocator* allocator) {
125 std::vector<ExecutionInput> execution_inputs;
126 std::vector<se::OwningDeviceMemory> owned_args;
127
128 for (int param_num = 0; param_num < inputs.size(); param_num++) {
129 const ScopedShapedBuffer& input_buffer = inputs[param_num];
130 ShapeTree<MaybeOwningDeviceMemory> buffer_tree(
131 input_buffer.on_device_shape());
132
133 input_buffer.buffers().ForEachElement(
134 [&](const ShapeIndex& index,
135 const se::DeviceMemoryBase& execution_input_buffer) {
136 if (alias_config.ParameterHasAlias(param_num, index)) {
137 // Store owned.
138 *buffer_tree.mutable_element(index) = se::OwningDeviceMemory{
139 execution_input_buffer, device_ordinal, allocator};
140 } else {
141 // Store unowned.
142 *buffer_tree.mutable_element(index) = execution_input_buffer;
143 }
144 });
145 execution_inputs.emplace_back(std::move(buffer_tree));
146 }
147 return execution_inputs;
148 }
149
ExecuteWithDeviceBuffers(std::unique_ptr<HloModule> module,absl::Span<ScopedShapedBuffer const> arguments,bool run_hlo_passes,ExecutionProfile * profile)150 StatusOr<ExecutionOutput> HloRunner::ExecuteWithDeviceBuffers(
151 std::unique_ptr<HloModule> module,
152 absl::Span<ScopedShapedBuffer const> arguments, bool run_hlo_passes,
153 ExecutionProfile* profile) {
154 TF_ASSIGN_OR_RETURN(std::unique_ptr<Executable> executable,
155 CreateExecutable(std::move(module), run_hlo_passes));
156 return ExecuteWithDeviceBuffers(executable.get(), arguments, profile);
157 }
158
ExecuteWithDeviceBuffers(Executable * executable,absl::Span<ScopedShapedBuffer const> arguments,ExecutionProfile * profile)159 StatusOr<ExecutionOutput> HloRunner::ExecuteWithDeviceBuffers(
160 Executable* executable, absl::Span<ScopedShapedBuffer const> arguments,
161 ExecutionProfile* profile) {
162 // Get service run options.
163 se::Stream stream(backend().default_stream_executor());
164 stream.Init();
165 ServiceExecutableRunOptions service_run_options =
166 GetServiceRunOptionsForDevice(backend().default_device_ordinal(), &stream,
167 nullptr, RunId());
168 service_run_options.mutable_run_options()->set_execution_profile(profile);
169
170 std::vector<ExecutionInput> execution_arguments =
171 ExecutionInputsFromScopedShapedBuffers(
172 arguments, executable->module().input_output_alias_config(),
173 stream.parent()->device_ordinal(), stream.parent()->GetAllocator());
174
175 TF_ASSIGN_OR_RETURN(
176 ExecutionOutput retval,
177 executable->ExecuteOnStreamWrapper(&service_run_options,
178 std::move(execution_arguments)));
179 TF_RETURN_IF_ERROR(stream.BlockHostUntilDone());
180 return std::move(retval);
181 }
182
ExecuteReplicated(std::unique_ptr<HloModule> module,const ReplicatedExecuteOptions & options,DeviceAssignment * device_assignment)183 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
184 std::unique_ptr<HloModule> module, const ReplicatedExecuteOptions& options,
185 DeviceAssignment* device_assignment) {
186 TF_ASSIGN_OR_RETURN(
187 std::unique_ptr<Executable> executable,
188 CreateExecutable(std::move(module), options.run_hlo_passes));
189 return ExecuteReplicated(executable.get(), options, device_assignment);
190 }
191
ExecuteReplicatedImpl(std::function<StatusOr<std::vector<ScopedShapedBuffer>> (const std::vector<ServiceExecutableRunOptions> &,const std::vector<absl::Span<const ShapedBuffer * const>> &)> execution_helper,std::function<int64 (int64)> argument_count_provider,std::function<const Literal * (int64,int64)> argument_provider,const ReplicatedExecuteOptions & options,DeviceAssignment * device_assignment)192 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicatedImpl(
193 std::function<StatusOr<std::vector<ScopedShapedBuffer>>(
194 const std::vector<ServiceExecutableRunOptions>&,
195 const std::vector<absl::Span<const ShapedBuffer* const>>&)>
196 execution_helper,
197 std::function<int64(int64)> argument_count_provider,
198 std::function<const Literal*(int64, int64)> argument_provider,
199 const ReplicatedExecuteOptions& options,
200 DeviceAssignment* device_assignment) {
201 std::vector<std::unique_ptr<se::Stream>> streams;
202 std::vector<ServiceExecutableRunOptions> service_run_options;
203
204 std::vector<ScopedShapedBuffer> argument_buffers;
205 // This reserve() call is necessary for correctness, because
206 // argument_buffer_ptrs contains pointers into the elements of
207 // argument_buffers.
208 const int64 total_argument_count = [&]() {
209 int64 total = 0;
210 for (int64 i = 0; i < options.num_replicas; ++i) {
211 total += argument_count_provider(i);
212 }
213 return total;
214 }();
215 argument_buffers.reserve(total_argument_count);
216
217 // Plus one so we can safely get &argument_buffer_ptrs[0] in case there are
218 // no arguments.
219 std::vector<const ShapedBuffer*> argument_buffer_ptrs(total_argument_count +
220 1);
221 std::vector<absl::Span<const ShapedBuffer* const>> argument_buffer_slices;
222 int64 index = 0;
223 RunId run_id;
224 for (int64 i = 0; i < options.num_replicas; ++i) {
225 int64 device = (*device_assignment)(i, 0);
226 TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor,
227 backend().stream_executor(device));
228 streams.push_back(absl::make_unique<se::Stream>(executor));
229 streams.back()->Init();
230 service_run_options.emplace_back(GetServiceRunOptionsForDevice(
231 device, streams.back().get(), device_assignment, run_id));
232
233 // Copy arguments to device.
234 const int64 argument_count = argument_count_provider(i);
235 for (int64 arg_index = 0; arg_index < argument_count; arg_index++) {
236 const Literal* const argument = argument_provider(i, arg_index);
237 TF_RET_CHECK(argument != nullptr);
238 TF_ASSIGN_OR_RETURN(
239 ScopedShapedBuffer argument_buffer,
240 backend().transfer_manager()->AllocateScopedShapedBuffer(
241 argument->shape(), backend().memory_allocator(), device));
242 TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
243 streams.back().get(), *argument, argument_buffer));
244 argument_buffers.push_back(std::move(argument_buffer));
245 argument_buffer_ptrs[index++] = &argument_buffers.back();
246 }
247 argument_buffer_slices.emplace_back(
248 &argument_buffer_ptrs[index - argument_count], argument_count);
249 }
250
251 std::unique_ptr<tensorflow::thread::ThreadPool> pool;
252 int64 num_threads = (options.infeed != nullptr) ? options.num_replicas : 0;
253 if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
254 num_threads += options.num_replicas;
255 }
256 if (num_threads > 0) {
257 pool = absl::make_unique<tensorflow::thread::ThreadPool>(
258 tensorflow::Env::Default(), "infeed_outfeed",
259 /*num_threads=*/num_threads);
260 }
261 if (options.infeed != nullptr) {
262 for (int64 i = 0; i < options.num_replicas; ++i) {
263 int64 device = (*device_assignment)(i, 0);
264 pool->Schedule([this, device, &options]() {
265 se::StreamExecutor* executor =
266 backend().stream_executor(device).ValueOrDie();
267 VLOG(1) << "Starting infeed on device " << device;
268 for (int64 step = 1;
269 options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
270 TF_CHECK_OK(backend().transfer_manager()->TransferLiteralToInfeed(
271 executor, *options.infeed));
272 if (step % 100 == 0) {
273 VLOG(1) << "Infeed step " << step;
274 }
275 }
276 });
277 }
278 }
279 if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
280 for (int64 i = 0; i < options.num_replicas; ++i) {
281 int64 device = (*device_assignment)(i, 0);
282 pool->Schedule([this, device, &options]() {
283 se::StreamExecutor* executor =
284 backend().stream_executor(device).ValueOrDie();
285 VLOG(1) << "Starting outfeed on device " << device;
286 for (int64 step = 1;
287 options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
288 Literal literal(options.outfeed_shape);
289 TF_CHECK_OK(backend().transfer_manager()->TransferLiteralFromOutfeed(
290 executor, &literal));
291 if (options.outfeed_values != nullptr) {
292 options.outfeed_values->push_back(std::move(literal));
293 }
294 if (step % 100 == 0) {
295 VLOG(1) << "Outfeed step " << step;
296 }
297 }
298 });
299 }
300 }
301
302 LOG(INFO) << "Replicated execution started";
303 TF_ASSIGN_OR_RETURN(
304 std::vector<ScopedShapedBuffer> results,
305 execution_helper(service_run_options, argument_buffer_slices));
306 LOG(INFO) << "Replicated execution terminated";
307
308 std::vector<Literal> exec_results;
309 for (int64 i = 0; i < options.num_replicas; ++i) {
310 TF_RETURN_IF_ERROR(streams[i]->BlockHostUntilDone());
311 TF_ASSIGN_OR_RETURN(Literal literal,
312 backend().transfer_manager()->TransferLiteralFromDevice(
313 streams[i].get(), results[i]));
314 exec_results.push_back(std::move(literal));
315 }
316 return std::move(exec_results);
317 }
318
ExecuteReplicated(Executable * executable,const ReplicatedExecuteOptions & options,DeviceAssignment * device_assignment,ExecutionProfile * profile)319 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
320 Executable* executable, const ReplicatedExecuteOptions& options,
321 DeviceAssignment* device_assignment, ExecutionProfile* profile) {
322 return ExecuteReplicatedImpl(
323 [&](const std::vector<ServiceExecutableRunOptions>& service_run_options,
324 const std::vector<absl::Span<const ShapedBuffer* const>>&
325 argument_buffer_slices)
326 -> StatusOr<std::vector<ScopedShapedBuffer>> {
327 std::vector<ScopedShapedBuffer> results;
328 if (!options.use_threads) {
329 TF_ASSIGN_OR_RETURN(
330 results, executable->ExecuteOnStreams(service_run_options,
331 argument_buffer_slices));
332 } else {
333 tensorflow::mutex mutex;
334 std::vector<StatusOr<ScopedShapedBuffer>> thread_results(
335 options.num_replicas);
336 {
337 LOG(INFO) << "Creating thread pool for " << options.num_replicas
338 << " replicas";
339 tensorflow::thread::ThreadPool pool(
340 tensorflow::Env::Default(), "replicas", options.num_replicas);
341 for (int64 i = 0; i < options.num_replicas; ++i) {
342 pool.Schedule([&, i] {
343 auto result = executable->ExecuteOnStream(
344 &service_run_options[i], argument_buffer_slices[i],
345 nullptr);
346 tensorflow::mutex_lock lock(mutex);
347 thread_results[i] = std::move(result);
348 });
349 }
350
351 // Note: the thread pool destructor guarantees it completes all work
352 // before we leave this scope.
353 }
354 for (auto& thread_result : thread_results) {
355 if (!thread_result.ok()) {
356 return thread_result.status();
357 }
358 results.push_back(std::move(thread_result).ValueOrDie());
359 }
360 }
361 return results;
362 },
363 [&](int64 replica) { return options.arguments.size(); },
364 [&](int64 replica, int64 index) { return options.arguments[index]; },
365 options, device_assignment);
366 }
367
ExecuteReplicated(std::function<Executable * (int64)> executable_provider,std::function<int64 (int64)> argument_count_provider,std::function<const Literal * (int64,int64)> argument_provider,const ReplicatedExecuteOptions & options)368 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
369 std::function<Executable*(int64)> executable_provider,
370 std::function<int64(int64)> argument_count_provider,
371 std::function<const Literal*(int64, int64)> argument_provider,
372 const ReplicatedExecuteOptions& options) {
373 TF_ASSIGN_OR_RETURN(
374 DeviceAssignment device_assignment,
375 backend().computation_placer()->AssignDevices(options.num_replicas, 1));
376 return ExecuteReplicatedImpl(
377 [&](const std::vector<ServiceExecutableRunOptions>& service_run_options,
378 const std::vector<absl::Span<const ShapedBuffer* const>>&
379 argument_buffer_slices)
380 -> StatusOr<std::vector<ScopedShapedBuffer>> {
381 TF_RET_CHECK(options.use_threads);
382 std::vector<ScopedShapedBuffer> results;
383 tensorflow::mutex mutex;
384 std::vector<StatusOr<ScopedShapedBuffer>> thread_results(
385 options.num_replicas);
386 {
387 LOG(INFO) << "Creating thread pool for " << options.num_replicas
388 << " replicas";
389 tensorflow::thread::ThreadPool pool(tensorflow::Env::Default(),
390 "replicas", options.num_replicas);
391 for (int64 i = 0; i < options.num_replicas; ++i) {
392 for (const auto& arg : argument_buffer_slices[i]) {
393 TF_RET_CHECK(arg != nullptr);
394 }
395 pool.Schedule([&, i] {
396 auto result = executable_provider(i)->ExecuteOnStream(
397 &service_run_options[i], argument_buffer_slices[i], nullptr);
398 tensorflow::mutex_lock lock(mutex);
399 thread_results[i] = std::move(result);
400 });
401 }
402
403 // Note: the thread pool destructor guarantees it completes all work
404 // before we leave this scope.
405 }
406 for (auto& thread_result : thread_results) {
407 if (!thread_result.ok()) {
408 return thread_result.status();
409 }
410 results.push_back(std::move(thread_result).ValueOrDie());
411 }
412 return results;
413 },
414 argument_count_provider, argument_provider, options, &device_assignment);
415 }
416
ExecuteReplicated(std::unique_ptr<HloModule> module,const ReplicatedExecuteOptions & options)417 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
418 std::unique_ptr<HloModule> module,
419 const ReplicatedExecuteOptions& options) {
420 TF_ASSIGN_OR_RETURN(
421 DeviceAssignment device_assignment,
422 backend().computation_placer()->AssignDevices(options.num_replicas, 1));
423 return ExecuteReplicated(std::move(module), options, &device_assignment);
424 }
425
CreateExecutable(std::unique_ptr<HloModule> module,bool run_hlo_passes)426 StatusOr<std::unique_ptr<Executable>> HloRunner::CreateExecutable(
427 std::unique_ptr<HloModule> module, bool run_hlo_passes) {
428 if (run_hlo_passes) {
429 auto module_group = absl::make_unique<HloModuleGroup>(std::move(module));
430 TF_ASSIGN_OR_RETURN(
431 auto executables,
432 backend().compiler()->Compile(std::move(module_group),
433 {{backend().default_stream_executor()}},
434 backend().memory_allocator()));
435 return std::move(executables[0]);
436 }
437 return backend().compiler()->RunBackend(std::move(module),
438 backend().default_stream_executor(),
439 backend().memory_allocator());
440 }
441
GetServiceRunOptionsForDevice(int64 device,se::Stream * stream,DeviceAssignment * device_assignment,RunId run_id)442 ServiceExecutableRunOptions HloRunner::GetServiceRunOptionsForDevice(
443 int64 device, se::Stream* stream, DeviceAssignment* device_assignment,
444 RunId run_id) {
445 ExecutableRunOptions run_options;
446 run_options.set_device_ordinal(device);
447 run_options.set_stream(stream);
448 run_options.set_allocator(backend().memory_allocator());
449 run_options.set_intra_op_thread_pool(
450 backend().eigen_intra_op_thread_pool_device());
451 if (device_assignment != nullptr) {
452 run_options.set_device_assignment(device_assignment);
453 }
454 run_options.set_run_id(run_id);
455 return ServiceExecutableRunOptions(run_options, backend().StreamBorrower());
456 }
457
backend()458 Backend& HloRunner::backend() {
459 if (!backend_) {
460 backend_ = Backend::CreateDefaultBackend().ConsumeValueOrDie();
461 VLOG(1) << "Executing on platform " << backend().platform()->Name();
462 }
463 return *backend_;
464 }
465
backend() const466 const Backend& HloRunner::backend() const {
467 return const_cast<HloRunner*>(this)->backend();
468 }
469
470 } // namespace xla
471