1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/compiler/xla/service/executable.h"
17
18 #include <memory>
19
20 #include "absl/strings/str_format.h"
21 #include "tensorflow/compiler/xla/debug_options_flags.h"
22 #include "tensorflow/compiler/xla/service/dump.h"
23 #include "tensorflow/compiler/xla/service/hlo_graph_dumper.h"
24 #include "tensorflow/compiler/xla/service/maybe_owning_device_memory.h"
25 #include "tensorflow/compiler/xla/status.h"
26 #include "tensorflow/compiler/xla/status_macros.h"
27 #include "tensorflow/core/lib/core/status.h"
28 #include "tensorflow/core/lib/strings/proto_serialization.h"
29 #include "tensorflow/core/platform/env.h"
30 #include "tensorflow/core/platform/errors.h"
31 #include "tensorflow/stream_executor/device_description.h"
32
33 namespace xla {
34
~ExecutionInput()35 ExecutionInput::~ExecutionInput() {
36 for (auto& index : unowned_indices_) {
37 auto buffer = buffers_.mutable_element(index)->Release();
38 if (buffer) {
39 buffer->Release();
40 }
41 }
42 }
43
SetDynamicShape(Shape dynamic_shape)44 Status ExecutionInput::SetDynamicShape(Shape dynamic_shape) {
45 const Shape& input_shape = shape();
46 if (!ShapeUtil::DynamicShapeIsCompatible(input_shape, dynamic_shape)) {
47 return tensorflow::errors::InvalidArgument(
48 "Cannot set dynamic shape: ", input_shape.DebugString(), " vs. ",
49 dynamic_shape.DebugString());
50 }
51 dynamic_shape_ = std::make_unique<Shape>(std::move(dynamic_shape));
52 return OkStatus();
53 }
54
SetUnownedBuffer(const ShapeIndex & index,MaybeOwningDeviceMemory buffer)55 void ExecutionInput::SetUnownedBuffer(const ShapeIndex& index,
56 MaybeOwningDeviceMemory buffer) {
57 *buffers_.mutable_element(index) = std::move(buffer);
58 unowned_indices_.insert(index);
59 }
60
ToShapedBuffer(se::DeviceMemoryAllocator * allocator,int device_ordinal) const61 StatusOr<ShapedBuffer> ExecutionInput::ToShapedBuffer(
62 se::DeviceMemoryAllocator* allocator, int device_ordinal) const {
63 const Shape& input_shape = shape();
64 ShapedBuffer shaped_buffer(input_shape, device_ordinal);
65 for (const auto& index_buffer : Buffers()) {
66 const tensorflow::se::OwningDeviceMemory* mem =
67 index_buffer.second.AsOwningDeviceMemory();
68 if (mem != nullptr && (mem->allocator() != allocator ||
69 mem->device_ordinal() != device_ordinal)) {
70 return tensorflow::errors::InvalidArgument(
71 "Device buffer at index ", index_buffer.first.ToString(),
72 " has mismatching allocator/device");
73 }
74 shaped_buffer.set_buffer(index_buffer.second.AsDeviceMemoryBase(),
75 index_buffer.first);
76 }
77 return std::move(shaped_buffer);
78 }
79
ExecuteOnStream(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments,HloExecutionProfile * hlo_execution_profile)80 StatusOr<ScopedShapedBuffer> Executable::ExecuteOnStream(
81 const ServiceExecutableRunOptions* run_options,
82 absl::Span<const ShapedBuffer* const> arguments,
83 HloExecutionProfile* hlo_execution_profile) {
84 StatusOr<ScopedShapedBuffer> result =
85 ExecuteAsyncOnStream(run_options, arguments, hlo_execution_profile);
86 Status blocking_status = run_options->stream()->BlockHostUntilDone();
87 TF_RETURN_IF_ERROR(result.status());
88 TF_RETURN_IF_ERROR(blocking_status);
89 return result;
90 }
91
MakeMaybeOwningDeviceMemoryTree(const ShapedBuffer & shaped_buffer)92 static ExecutionInput MakeMaybeOwningDeviceMemoryTree(
93 const ShapedBuffer& shaped_buffer) {
94 ExecutionInput result(shaped_buffer.on_device_shape());
95 shaped_buffer.buffers().ForEachElement(
96 [&](const ShapeIndex& index, const se::DeviceMemoryBase& mem) {
97 result.SetBuffer(index, MaybeOwningDeviceMemory(mem));
98 });
99 return result;
100 }
101
ExecuteAsyncOnStream(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments,HloExecutionProfile * hlo_execution_profile)102 StatusOr<ScopedShapedBuffer> Executable::ExecuteAsyncOnStream(
103 const ServiceExecutableRunOptions* run_options,
104 absl::Span<const ShapedBuffer* const> arguments,
105 HloExecutionProfile* hlo_execution_profile) {
106 std::vector<ExecutionInput> args;
107 args.reserve(arguments.size());
108 for (const ShapedBuffer* arg : arguments) {
109 args.emplace_back(MakeMaybeOwningDeviceMemoryTree(*arg));
110 }
111 TF_ASSIGN_OR_RETURN(ExecutionOutput out,
112 ExecuteAsyncOnStream(run_options, std::move(args),
113 hlo_execution_profile));
114 return out.ConsumeResult();
115 }
116
ExecuteOnStream(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments,HloExecutionProfile * hlo_execution_profile)117 StatusOr<ExecutionOutput> Executable::ExecuteOnStream(
118 const ServiceExecutableRunOptions* run_options,
119 std::vector<ExecutionInput> arguments,
120 HloExecutionProfile* hlo_execution_profile) {
121 StatusOr<ExecutionOutput> result = ExecuteAsyncOnStream(
122 run_options, std::move(arguments), hlo_execution_profile);
123 Status blocking_status = run_options->stream()->BlockHostUntilDone();
124 TF_RETURN_IF_ERROR(result.status());
125 TF_RETURN_IF_ERROR(blocking_status);
126 return result;
127 }
128
ExecuteOnStreams(absl::Span<const ServiceExecutableRunOptions> run_options,absl::Span<const absl::Span<const ShapedBuffer * const>> arguments)129 StatusOr<std::vector<ScopedShapedBuffer>> Executable::ExecuteOnStreams(
130 absl::Span<const ServiceExecutableRunOptions> run_options,
131 absl::Span<const absl::Span<const ShapedBuffer* const>> arguments) {
132 TF_RET_CHECK(run_options.size() == arguments.size());
133
134 std::vector<ScopedShapedBuffer> return_values;
135 return_values.reserve(run_options.size());
136
137 if (run_options.size() == 1) {
138 TF_ASSIGN_OR_RETURN(auto rv,
139 ExecuteOnStream(&run_options[0], arguments[0],
140 /*hlo_execution_profile=*/nullptr));
141 return_values.push_back(std::move(rv));
142 return std::move(return_values);
143 }
144
145 for (size_t i = 0; i < run_options.size(); ++i) {
146 // We cannot BlockHostUntilDone() on the already-launched executions in case
147 // of error, since if the executions communicate, the initially launched
148 // executions may never complete if not all executions are running.
149 TF_ASSIGN_OR_RETURN(
150 auto rv, ExecuteAsyncOnStream(&run_options[i], arguments[i],
151 /*hlo_execution_profile=*/nullptr));
152 return_values.push_back(std::move(rv));
153 }
154 for (const auto& options : run_options) {
155 TF_RET_CHECK(options.stream() != nullptr);
156 TF_RETURN_IF_ERROR(options.stream()->BlockHostUntilDone());
157 }
158 return std::move(return_values);
159 }
160
ExecuteOnStreamWrapper(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments)161 StatusOr<ScopedShapedBuffer> Executable::ExecuteOnStreamWrapper(
162 const ServiceExecutableRunOptions* run_options,
163 absl::Span<const ShapedBuffer* const> arguments) {
164 StatusOr<ScopedShapedBuffer> result =
165 ExecuteAsyncOnStreamWrapper(run_options, arguments);
166 Status block_status = run_options->stream()->BlockHostUntilDone();
167 TF_RETURN_IF_ERROR(result.status());
168 TF_RETURN_IF_ERROR(block_status);
169 return result;
170 }
171
ExecuteOnStreamWrapper(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments)172 StatusOr<ExecutionOutput> Executable::ExecuteOnStreamWrapper(
173 const ServiceExecutableRunOptions* run_options,
174 std::vector<ExecutionInput> arguments) {
175 StatusOr<ExecutionOutput> result =
176 ExecuteAsyncOnStreamWrapper(run_options, std::move(arguments));
177 Status block_status = run_options->stream()->BlockHostUntilDone();
178 TF_RETURN_IF_ERROR(result.status());
179 TF_RETURN_IF_ERROR(block_status);
180 return result;
181 }
182
183 struct ExecuteAsyncOnStreamWrapperState {
184 ExecutionProfile* profile;
185 std::shared_ptr<se::Timer> timer;
186 std::shared_ptr<HloExecutionProfile> profile_ptr;
187 };
188
ExecuteWrapperBeforeExecution(const Executable & executable,const ServiceExecutableRunOptions * run_options)189 static ExecuteAsyncOnStreamWrapperState ExecuteWrapperBeforeExecution(
190 const Executable& executable,
191 const ServiceExecutableRunOptions* run_options) {
192 ExecuteAsyncOnStreamWrapperState state;
193 se::Stream* stream = run_options->stream();
194 state.profile = run_options->run_options().execution_profile();
195 if (state.profile != nullptr) {
196 state.timer = std::make_shared<se::Timer>(stream->parent());
197 stream->InitTimer(state.timer.get()).ThenStartTimer(state.timer.get());
198 }
199
200 VLOG(1) << "enqueueing executable on stream...";
201 // If the profiling flag isn't enabled, we pass nullptr as the profile to
202 // indicate profiling is not requested.
203 state.profile_ptr =
204 executable.module_config().debug_options().xla_hlo_profile() &&
205 executable.hlo_profiling_enabled()
206 ? std::make_shared<HloExecutionProfile>(
207 &executable.hlo_profile_printer_data(),
208 &executable.hlo_profile_index_map())
209 : nullptr;
210 return state;
211 }
212
ExecuteWrapperAfterExecution(Executable * executable,const ExecuteAsyncOnStreamWrapperState & state,Status return_status,se::Stream * stream)213 Status ExecuteWrapperAfterExecution(
214 Executable* executable, const ExecuteAsyncOnStreamWrapperState& state,
215 Status return_status, se::Stream* stream) {
216 if (!return_status.ok()) {
217 if (state.profile != nullptr) {
218 // Ensure the ThenStartTimer call has completed before we destroy timer.
219 // We already have a failure status to return, so just log this if it
220 // fails.
221 Status status = stream->BlockHostUntilDone();
222 if (!status.ok()) {
223 LOG(ERROR) << "Failed to BlockHostUntilDone: " << status;
224 }
225 }
226 return return_status;
227 }
228
229 if (state.profile != nullptr) {
230 VLOG(1) << "enqueueing 'stop timer' and profiling callback...";
231 stream->ThenStopTimer(state.timer.get());
232
233 // We block instead of using an async callback because reading the timer
234 // value may call back into the driver on GPU, which is not allowed.
235 TF_RETURN_IF_ERROR(stream->BlockHostUntilDone());
236
237 const int64_t executable_size_in_bytes =
238 executable->SizeOfGeneratedCodeInBytes();
239 // Merge in run-time profile information from execution_profile.
240
241 // Overall execution time (in nanoseconds) from the executor timer.
242 state.profile->set_compute_and_transfer_time_ns(state.timer->Nanoseconds());
243
244 // TODO(b/28447609): The value in compute_and_transfer_time_ns is actually
245 // the compute time without the transfer time, so this way we get the
246 // correct compute time. We should instead have the correct value for
247 // compute_and_transfer_time and set compute_time to the compute time.
248 if (state.profile->compute_time_ns() == 0) {
249 state.profile->set_compute_time_ns(
250 state.profile->compute_and_transfer_time_ns());
251 }
252
253 if (executable_size_in_bytes != 0) {
254 state.profile->set_executable_size_in_bytes(executable_size_in_bytes);
255 }
256 }
257
258 if (executable->module_config().debug_options().xla_hlo_profile() &&
259 state.profile_ptr != nullptr) {
260 DumpToFileInDir(executable->module(), /*file_prefix=*/"",
261 /*file_suffix=*/"hlo_execution_profile_data",
262 state.profile_ptr->ToProto().SerializeAsString());
263 }
264
265 if (state.profile_ptr != nullptr) {
266 const se::DeviceDescription* device_description =
267 &stream->parent()->GetDeviceDescription();
268 std::shared_ptr<HloExecutionProfile> profile = state.profile_ptr;
269 stream->ThenDoHostCallback([profile, device_description]() {
270 XLA_LOG_LINES(tensorflow::INFO,
271 profile->ToString(device_description->clock_rate_ghz()));
272 });
273 }
274
275 return return_status;
276 }
277
ExecuteAsyncOnStreamWrapper(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments)278 StatusOr<ScopedShapedBuffer> Executable::ExecuteAsyncOnStreamWrapper(
279 const ServiceExecutableRunOptions* run_options,
280 absl::Span<const ShapedBuffer* const> arguments) {
281 auto state = ExecuteWrapperBeforeExecution(*this, run_options);
282 StatusOr<ScopedShapedBuffer> return_value =
283 ExecuteAsyncOnStream(run_options, arguments, state.profile_ptr.get());
284 TF_RETURN_IF_ERROR(ExecuteWrapperAfterExecution(
285 this, state, return_value.status(), run_options->stream()));
286 return return_value;
287 }
288
ExecuteAsyncOnStreamWrapper(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments)289 StatusOr<ExecutionOutput> Executable::ExecuteAsyncOnStreamWrapper(
290 const ServiceExecutableRunOptions* run_options,
291 std::vector<ExecutionInput> arguments) {
292 auto state = ExecuteWrapperBeforeExecution(*this, run_options);
293 StatusOr<ExecutionOutput> return_value = ExecuteAsyncOnStream(
294 run_options, std::move(arguments), state.profile_ptr.get());
295 TF_RETURN_IF_ERROR(ExecuteWrapperAfterExecution(
296 this, state, return_value.status(), run_options->stream()));
297 return return_value;
298 }
299
SizeOfGeneratedCodeInBytes() const300 int64_t Executable::SizeOfGeneratedCodeInBytes() const { return -1; }
301
MarkToBeReleasedArguments(absl::Span<ExecutionInput> arguments,ExecutionOutput & result)302 void Executable::MarkToBeReleasedArguments(absl::Span<ExecutionInput> arguments,
303 ExecutionOutput& result) {
304 for (ExecutionInput& argument : arguments) {
305 for (auto& index_buffer : *argument.MutableBuffers()) {
306 if (std::optional<se::OwningDeviceMemory> maybe_owning_buffer =
307 index_buffer.second.Release()) {
308 result.AddToBeReleased(std::move(*maybe_owning_buffer));
309 }
310 }
311 }
312 }
313
314 } // namespace xla
315