1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #define EIGEN_USE_THREADS
16
17 #include "tensorflow/compiler/xla/service/hlo_runner.h"
18
19 #include <string>
20 #include <utility>
21
22 #include "absl/memory/memory.h"
23 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
24 #include "tensorflow/compiler/xla/layout_util.h"
25 #include "tensorflow/compiler/xla/service/hlo_module_group.h"
26 #include "tensorflow/compiler/xla/service/hlo_parser.h"
27 #include "tensorflow/compiler/xla/service/transfer_manager.h"
28 #include "tensorflow/compiler/xla/shape_util.h"
29 #include "tensorflow/core/common_runtime/eigen_thread_pool.h"
30 #include "tensorflow/core/lib/core/blocking_counter.h"
31 #include "tensorflow/core/platform/logging.h"
32 #include "tensorflow/core/platform/types.h"
33
34 namespace xla {
35
36 /*static*/ StatusOr<std::unique_ptr<HloModule>>
CreateModuleFromString(const absl::string_view hlo_string,const DebugOptions & debug_options)37 HloRunner::CreateModuleFromString(const absl::string_view hlo_string,
38 const DebugOptions& debug_options) {
39 HloModuleConfig config;
40 config.set_debug_options(debug_options);
41 return ParseHloString(hlo_string, config);
42 }
43
44 namespace {
45
46 // Creates an HloModule from the given proto.
HloProtoToModule(const HloProto & proto,const DebugOptions & debug_options)47 StatusOr<std::unique_ptr<HloModule>> HloProtoToModule(
48 const HloProto& proto, const DebugOptions& debug_options) {
49 TF_ASSIGN_OR_RETURN(HloModuleConfig config,
50 HloModule::CreateModuleConfigFromProto(proto.hlo_module(),
51 debug_options));
52 TF_ASSIGN_OR_RETURN(auto module,
53 HloModule::CreateFromProto(proto.hlo_module(), config));
54 return std::move(module);
55 }
56
57 } // namespace
58
59 /*static*/ StatusOr<std::unique_ptr<HloModule>>
ReadModuleFromBinaryProtoFile(const std::string & filename,const DebugOptions & debug_options)60 HloRunner::ReadModuleFromBinaryProtoFile(const std::string& filename,
61 const DebugOptions& debug_options) {
62 HloProto proto;
63 TF_RETURN_IF_ERROR(tensorflow::ReadBinaryProto(tensorflow::Env::Default(),
64 filename, &proto));
65 return HloProtoToModule(proto, debug_options);
66 }
67
68 /*static*/ StatusOr<std::unique_ptr<HloModule>>
ReadModuleFromTextProtoFile(const std::string & filename,const DebugOptions & debug_options)69 HloRunner::ReadModuleFromTextProtoFile(const std::string& filename,
70 const DebugOptions& debug_options) {
71 HloProto proto;
72 TF_RETURN_IF_ERROR(
73 tensorflow::ReadTextProto(tensorflow::Env::Default(), filename, &proto));
74 return HloProtoToModule(proto, debug_options);
75 }
76
77 /*static*/ StatusOr<std::unique_ptr<HloModule>>
ReadModuleFromHloTextFile(const std::string & filename,const DebugOptions & debug_options)78 HloRunner::ReadModuleFromHloTextFile(const std::string& filename,
79 const DebugOptions& debug_options) {
80 string hlo_string;
81 TF_RETURN_IF_ERROR(tensorflow::ReadFileToString(tensorflow::Env::Default(),
82 filename, &hlo_string));
83 HloModuleConfig config;
84 config.set_debug_options(debug_options);
85 return ParseHloString(hlo_string, config);
86 }
87
HloRunner(se::Platform * platform)88 HloRunner::HloRunner(se::Platform* platform) {
89 BackendOptions backend_options;
90 backend_options.set_platform(platform);
91 backend_ = Backend::CreateBackend(backend_options).ConsumeValueOrDie();
92 VLOG(1) << "Created HloRunner for platform: " << platform->Name();
93 }
94
~HloRunner()95 HloRunner::~HloRunner() {}
96
TransferLiteralToDevice(const Literal & literal)97 StatusOr<ScopedShapedBuffer> HloRunner::TransferLiteralToDevice(
98 const Literal& literal) {
99 TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
100 backend().transfer_manager()->AllocateScopedShapedBuffer(
101 literal.shape(), backend().memory_allocator(),
102 backend().default_device_ordinal()));
103 TF_ASSIGN_OR_RETURN(
104 auto stream, backend().BorrowStream(backend().default_stream_executor()));
105 TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
106 stream.get(), literal, buffer));
107 return std::move(buffer);
108 }
109
TransferLiteralsToDevice(const absl::Span<const Literal * const> literals)110 StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
111 const absl::Span<const Literal* const> literals) {
112 std::vector<ScopedShapedBuffer> buffers;
113 for (const Literal* literal : literals) {
114 CHECK(literal != nullptr);
115 TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
116 TransferLiteralToDevice(*literal));
117 buffers.push_back(std::move(buffer));
118 }
119 return std::move(buffers);
120 }
121
TransferLiteralsToDevice(const absl::Span<const Literal> literals)122 StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
123 const absl::Span<const Literal> literals) {
124 std::vector<const Literal*> literal_pointers;
125 literal_pointers.reserve(literals.size());
126 for (const auto& literal : literals) {
127 literal_pointers.push_back(&literal);
128 }
129 return TransferLiteralsToDevice(literal_pointers);
130 }
131
TransferLiteralFromDevice(const ShapedBuffer & buffer)132 StatusOr<Literal> HloRunner::TransferLiteralFromDevice(
133 const ShapedBuffer& buffer) {
134 TF_ASSIGN_OR_RETURN(
135 auto stream, backend().BorrowStream(backend().default_stream_executor()));
136 return backend().transfer_manager()->TransferLiteralFromDevice(stream.get(),
137 buffer);
138 }
139
Execute(std::unique_ptr<HloModule> module,const absl::Span<const Literal * const> arguments,bool run_hlo_passes,ExecutionProfile * profile)140 StatusOr<Literal> HloRunner::Execute(
141 std::unique_ptr<HloModule> module,
142 const absl::Span<const Literal* const> arguments, bool run_hlo_passes,
143 ExecutionProfile* profile) {
144 TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
145 TransferLiteralsToDevice(arguments));
146 TF_ASSIGN_OR_RETURN(ScopedShapedBuffer result,
147 ExecuteWithDeviceBuffers(
148 /*module=*/std::move(module),
149 /*arguments=*/argument_buffers,
150 /*run_hlo_passes=*/run_hlo_passes,
151 /*profile=*/profile));
152 return TransferLiteralFromDevice(result);
153 }
154
Execute(std::unique_ptr<HloModule> module,const absl::Span<const Literal> arguments,bool run_hlo_passes,ExecutionProfile * profile)155 StatusOr<Literal> HloRunner::Execute(std::unique_ptr<HloModule> module,
156 const absl::Span<const Literal> arguments,
157 bool run_hlo_passes,
158 ExecutionProfile* profile) {
159 // Construct a vector of plain pointers for the arguments.
160 std::vector<const Literal*> argument_pointers;
161 argument_pointers.reserve(arguments.size());
162 for (const auto& argument : arguments) {
163 argument_pointers.push_back(&argument);
164 }
165 return Execute(
166 /*module=*/std::move(module),
167 /*arguments=*/argument_pointers,
168 /*run_hlo_passes=*/run_hlo_passes,
169 /*profile=*/profile);
170 }
171
Execute(std::unique_ptr<Executable> executable,const absl::Span<const Literal * const> arguments,ExecutionProfile * profile)172 StatusOr<Literal> HloRunner::Execute(
173 std::unique_ptr<Executable> executable,
174 const absl::Span<const Literal* const> arguments,
175 ExecutionProfile* profile) {
176 TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
177 TransferLiteralsToDevice(arguments));
178 TF_ASSIGN_OR_RETURN(ScopedShapedBuffer result,
179 ExecuteWithDeviceBuffers(
180 /*executable=*/executable.get(),
181 /*arguments=*/argument_buffers,
182 /*profile=*/profile));
183 return TransferLiteralFromDevice(result);
184 }
185
Execute(std::unique_ptr<Executable> executable,const absl::Span<const Literal> arguments,ExecutionProfile * profile)186 StatusOr<Literal> HloRunner::Execute(std::unique_ptr<Executable> executable,
187 const absl::Span<const Literal> arguments,
188 ExecutionProfile* profile) {
189 // Construct a vector of plain pointers for the arguments.
190 std::vector<const Literal*> argument_pointers;
191 argument_pointers.reserve(arguments.size());
192 for (const auto& argument : arguments) {
193 argument_pointers.push_back(&argument);
194 }
195 return Execute(
196 /*module=*/std::move(executable),
197 /*arguments=*/argument_pointers,
198 /*profile=*/profile);
199 }
200
ExecuteWithDeviceBuffers(std::unique_ptr<HloModule> module,const absl::Span<const ShapedBuffer * const> arguments,bool run_hlo_passes,ExecutionProfile * profile)201 StatusOr<ScopedShapedBuffer> HloRunner::ExecuteWithDeviceBuffers(
202 std::unique_ptr<HloModule> module,
203 const absl::Span<const ShapedBuffer* const> arguments, bool run_hlo_passes,
204 ExecutionProfile* profile) {
205 // Get service run options.
206 se::Stream stream(backend().default_stream_executor());
207 stream.Init();
208 ServiceExecutableRunOptions service_run_options =
209 GetServiceRunOptionsForDevice(backend().default_device_ordinal(), &stream,
210 nullptr);
211
212 TF_ASSIGN_OR_RETURN(std::unique_ptr<Executable> executable,
213 CreateExecutable(std::move(module), run_hlo_passes));
214 TF_ASSIGN_OR_RETURN(
215 ScopedShapedBuffer retval,
216 executable->ExecuteOnStreamWrapper(&service_run_options,
217 /*profile=*/profile, arguments));
218 TF_RETURN_IF_ERROR(stream.BlockHostUntilDone());
219 return std::move(retval);
220 }
221
ExecuteWithDeviceBuffers(std::unique_ptr<HloModule> module,const absl::Span<const ScopedShapedBuffer> arguments,bool run_hlo_passes,ExecutionProfile * profile)222 StatusOr<ScopedShapedBuffer> HloRunner::ExecuteWithDeviceBuffers(
223 std::unique_ptr<HloModule> module,
224 const absl::Span<const ScopedShapedBuffer> arguments, bool run_hlo_passes,
225 ExecutionProfile* profile) {
226 std::vector<const ShapedBuffer*> argument_pointers;
227 argument_pointers.reserve(arguments.size());
228 for (const auto& argument : arguments) {
229 argument_pointers.push_back(&argument);
230 }
231 return ExecuteWithDeviceBuffers(
232 /*module=*/std::move(module),
233 /*arguments=*/argument_pointers,
234 /*run_hlo_passes=*/run_hlo_passes,
235 /*profile=*/profile);
236 }
237
ExecuteWithDeviceBuffers(Executable * executable,const absl::Span<const ShapedBuffer * const> arguments,ExecutionProfile * profile)238 StatusOr<ScopedShapedBuffer> HloRunner::ExecuteWithDeviceBuffers(
239 Executable* executable,
240 const absl::Span<const ShapedBuffer* const> arguments,
241 ExecutionProfile* profile) {
242 // Get service run options.
243 se::Stream stream(backend().default_stream_executor());
244 stream.Init();
245 ServiceExecutableRunOptions service_run_options =
246 GetServiceRunOptionsForDevice(backend().default_device_ordinal(), &stream,
247 nullptr);
248
249 TF_ASSIGN_OR_RETURN(
250 ScopedShapedBuffer retval,
251 executable->ExecuteOnStreamWrapper(&service_run_options,
252 /*profile=*/profile, arguments));
253 TF_RETURN_IF_ERROR(stream.BlockHostUntilDone());
254 return std::move(retval);
255 }
256
ExecuteWithDeviceBuffers(Executable * executable,const absl::Span<const ScopedShapedBuffer> arguments,ExecutionProfile * profile)257 StatusOr<ScopedShapedBuffer> HloRunner::ExecuteWithDeviceBuffers(
258 Executable* executable,
259 const absl::Span<const ScopedShapedBuffer> arguments,
260 ExecutionProfile* profile) {
261 std::vector<const ShapedBuffer*> argument_pointers;
262 argument_pointers.reserve(arguments.size());
263 for (const auto& argument : arguments) {
264 argument_pointers.push_back(&argument);
265 }
266 return ExecuteWithDeviceBuffers(
267 /*executable=*/std::move(executable),
268 /*arguments=*/argument_pointers,
269 /*profile=*/profile);
270 }
271
ExecuteReplicated(std::unique_ptr<HloModule> module,const ReplicatedExecuteOptions & options,DeviceAssignment * device_assignment,bool use_threads)272 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
273 std::unique_ptr<HloModule> module, const ReplicatedExecuteOptions& options,
274 DeviceAssignment* device_assignment, bool use_threads) {
275 TF_ASSIGN_OR_RETURN(
276 std::unique_ptr<Executable> executable,
277 CreateExecutable(std::move(module), options.run_hlo_passes));
278 std::vector<std::unique_ptr<se::Stream>> streams;
279 std::vector<ServiceExecutableRunOptions> service_run_options;
280
281 std::vector<ScopedShapedBuffer> argument_buffers;
282 // This reserve() call is necessary for correctness, because
283 // argument_buffer_ptrs contains pointers into the elements of
284 // argument_buffers.
285 argument_buffers.reserve(options.num_replicas * options.arguments.size());
286
287 // Plus one so we can safely get &argument_buffer_ptrs[0] in case there are
288 // no arguments.
289 std::vector<const ShapedBuffer*> argument_buffer_ptrs(
290 options.num_replicas * options.arguments.size() + 1);
291 std::vector<absl::Span<const ShapedBuffer* const>> argument_buffer_slices;
292 int64 index = 0;
293 for (int64 i = 0; i < options.num_replicas; ++i) {
294 int64 device = (*device_assignment)(i, 0);
295 TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor,
296 backend().stream_executor(device));
297 streams.push_back(absl::make_unique<se::Stream>(executor));
298 streams.back()->Init();
299 service_run_options.emplace_back(GetServiceRunOptionsForDevice(
300 device, streams.back().get(), device_assignment));
301
302 // Copy arguments to device.
303 for (const Literal* argument : options.arguments) {
304 TF_ASSIGN_OR_RETURN(
305 ScopedShapedBuffer argument_buffer,
306 backend().transfer_manager()->AllocateScopedShapedBuffer(
307 argument->shape(), backend().memory_allocator(), device));
308 TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
309 streams.back().get(), *argument, argument_buffer));
310 argument_buffers.push_back(std::move(argument_buffer));
311 argument_buffer_ptrs[index++] = &argument_buffers.back();
312 }
313 argument_buffer_slices.emplace_back(
314 &argument_buffer_ptrs[index - options.arguments.size()],
315 options.arguments.size());
316 }
317
318 std::unique_ptr<tensorflow::thread::ThreadPool> pool;
319 int64 num_threads = (options.infeed != nullptr) ? options.num_replicas : 0;
320 if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
321 num_threads += options.num_replicas;
322 }
323 if (num_threads > 0) {
324 pool = absl::make_unique<tensorflow::thread::ThreadPool>(
325 tensorflow::Env::Default(), "infeed_outfeed",
326 /*num_threads=*/num_threads);
327 }
328 if (options.infeed != nullptr) {
329 for (int64 i = 0; i < options.num_replicas; ++i) {
330 int64 device = (*device_assignment)(i, 0);
331 pool->Schedule([this, device, &options]() {
332 se::StreamExecutor* executor =
333 backend().stream_executor(device).ValueOrDie();
334 VLOG(1) << "Starting infeed on device " << device;
335 for (int64 step = 1;
336 options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
337 TF_CHECK_OK(backend().transfer_manager()->TransferLiteralToInfeed(
338 executor, *options.infeed));
339 if (step % 100 == 0) {
340 VLOG(1) << "Infeed step " << step;
341 }
342 }
343 });
344 }
345 }
346 if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
347 for (int64 i = 0; i < options.num_replicas; ++i) {
348 int64 device = (*device_assignment)(i, 0);
349 pool->Schedule([this, device, &options]() {
350 se::StreamExecutor* executor =
351 backend().stream_executor(device).ValueOrDie();
352 VLOG(1) << "Starting outfeed on device " << device;
353 for (int64 step = 1;
354 options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
355 Literal literal;
356 TF_CHECK_OK(backend().transfer_manager()->TransferLiteralFromOutfeed(
357 executor, options.outfeed_shape, &literal));
358 if (options.outfeed_values != nullptr) {
359 options.outfeed_values->push_back(std::move(literal));
360 }
361 if (step % 100 == 0) {
362 VLOG(1) << "Outfeed step " << step;
363 }
364 }
365 });
366 }
367 }
368
369 LOG(INFO) << "Replicated execution started";
370 std::vector<ScopedShapedBuffer> results;
371 if (!use_threads) {
372 TF_ASSIGN_OR_RETURN(results,
373 executable->ExecuteOnStreams(service_run_options,
374 argument_buffer_slices));
375 } else {
376 tensorflow::mutex mutex;
377 std::vector<StatusOr<ScopedShapedBuffer>> thread_results(
378 options.num_replicas);
379 {
380 LOG(INFO) << "Creating thread pool for " << options.num_replicas
381 << " replicas";
382 tensorflow::thread::ThreadPool pool(tensorflow::Env::Default(),
383 "replicas", options.num_replicas);
384 for (int64 i = 0; i < options.num_replicas; ++i) {
385 pool.Schedule([&, i] {
386 auto result = executable->ExecuteOnStream(
387 &service_run_options[i], argument_buffer_slices[i], nullptr);
388 tensorflow::mutex_lock lock(mutex);
389 thread_results[i] = std::move(result);
390 });
391 }
392
393 // Note: the thread pool destructor guarantees it completes all work
394 // before we leave this scope.
395 }
396 for (auto& thread_result : thread_results) {
397 if (!thread_result.ok()) {
398 return thread_result.status();
399 }
400 results.push_back(std::move(thread_result).ValueOrDie());
401 }
402 }
403 LOG(INFO) << "Replicated execution terminated";
404
405 std::vector<Literal> exec_results;
406 for (int64 i = 0; i < options.num_replicas; ++i) {
407 TF_RETURN_IF_ERROR(streams[i]->BlockHostUntilDone());
408 TF_ASSIGN_OR_RETURN(Literal literal,
409 backend().transfer_manager()->TransferLiteralFromDevice(
410 streams[i].get(), results[i]));
411 exec_results.push_back(std::move(literal));
412 }
413 return std::move(exec_results);
414 }
415
ExecuteReplicated(std::unique_ptr<HloModule> module,const ReplicatedExecuteOptions & options,bool use_threads)416 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
417 std::unique_ptr<HloModule> module, const ReplicatedExecuteOptions& options,
418 bool use_threads) {
419 TF_ASSIGN_OR_RETURN(
420 DeviceAssignment device_assignment,
421 backend().computation_placer()->AssignDevices(options.num_replicas, 1));
422 return ExecuteReplicated(std::move(module), options, &device_assignment,
423 use_threads);
424 }
425
CreateExecutable(std::unique_ptr<HloModule> module,bool run_hlo_passes)426 StatusOr<std::unique_ptr<Executable>> HloRunner::CreateExecutable(
427 std::unique_ptr<HloModule> module, bool run_hlo_passes) {
428 if (run_hlo_passes) {
429 auto module_group = absl::make_unique<HloModuleGroup>(std::move(module));
430 TF_ASSIGN_OR_RETURN(
431 auto executables,
432 backend().compiler()->Compile(std::move(module_group),
433 {{backend().default_stream_executor()}},
434 backend().memory_allocator()));
435 return std::move(executables[0]);
436 }
437 return backend().compiler()->RunBackend(std::move(module),
438 backend().default_stream_executor(),
439 backend().memory_allocator());
440 }
441
GetServiceRunOptionsForDevice(int64 device,se::Stream * stream,DeviceAssignment * device_assignment)442 ServiceExecutableRunOptions HloRunner::GetServiceRunOptionsForDevice(
443 int64 device, se::Stream* stream, DeviceAssignment* device_assignment) {
444 ExecutableRunOptions run_options;
445 run_options.set_device_ordinal(device);
446 run_options.set_stream(stream);
447 run_options.set_allocator(backend().memory_allocator());
448 run_options.set_intra_op_thread_pool(
449 backend().eigen_intra_op_thread_pool_device());
450 if (device_assignment != nullptr) {
451 run_options.set_device_assignment(device_assignment);
452 }
453 return ServiceExecutableRunOptions(run_options, backend().StreamBorrower());
454 }
455
backend()456 Backend& HloRunner::backend() {
457 if (!backend_) {
458 backend_ = Backend::CreateDefaultBackend().ConsumeValueOrDie();
459 VLOG(1) << "Executing on platform " << backend().platform()->Name();
460 }
461 return *backend_;
462 }
463
backend() const464 const Backend& HloRunner::backend() const {
465 return const_cast<HloRunner*>(this)->backend();
466 }
467
468 } // namespace xla
469