• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/compiler/xla/service/executable.h"
17 
18 #include <memory>
19 
20 #include "absl/strings/str_format.h"
21 #include "tensorflow/compiler/xla/debug_options_flags.h"
22 #include "tensorflow/compiler/xla/service/dump.h"
23 #include "tensorflow/compiler/xla/service/hlo_graph_dumper.h"
24 #include "tensorflow/compiler/xla/service/maybe_owning_device_memory.h"
25 #include "tensorflow/compiler/xla/status.h"
26 #include "tensorflow/compiler/xla/status_macros.h"
27 #include "tensorflow/core/lib/core/status.h"
28 #include "tensorflow/core/lib/strings/proto_serialization.h"
29 #include "tensorflow/core/platform/env.h"
30 #include "tensorflow/core/platform/errors.h"
31 #include "tensorflow/stream_executor/device_description.h"
32 
33 namespace xla {
34 
~ExecutionInput()35 ExecutionInput::~ExecutionInput() {
36   for (auto& index : unowned_indices_) {
37     auto buffer = buffers_.mutable_element(index)->Release();
38     if (buffer) {
39       buffer->Release();
40     }
41   }
42 }
43 
SetDynamicShape(Shape dynamic_shape)44 Status ExecutionInput::SetDynamicShape(Shape dynamic_shape) {
45   const Shape& input_shape = shape();
46   if (!ShapeUtil::DynamicShapeIsCompatible(input_shape, dynamic_shape)) {
47     return tensorflow::errors::InvalidArgument(
48         "Cannot set dynamic shape: ", input_shape.DebugString(), " vs. ",
49         dynamic_shape.DebugString());
50   }
51   dynamic_shape_ = std::make_unique<Shape>(std::move(dynamic_shape));
52   return OkStatus();
53 }
54 
SetUnownedBuffer(const ShapeIndex & index,MaybeOwningDeviceMemory buffer)55 void ExecutionInput::SetUnownedBuffer(const ShapeIndex& index,
56                                       MaybeOwningDeviceMemory buffer) {
57   *buffers_.mutable_element(index) = std::move(buffer);
58   unowned_indices_.insert(index);
59 }
60 
ToShapedBuffer(se::DeviceMemoryAllocator * allocator,int device_ordinal) const61 StatusOr<ShapedBuffer> ExecutionInput::ToShapedBuffer(
62     se::DeviceMemoryAllocator* allocator, int device_ordinal) const {
63   const Shape& input_shape = shape();
64   ShapedBuffer shaped_buffer(input_shape, device_ordinal);
65   for (const auto& index_buffer : Buffers()) {
66     const tensorflow::se::OwningDeviceMemory* mem =
67         index_buffer.second.AsOwningDeviceMemory();
68     if (mem != nullptr && (mem->allocator() != allocator ||
69                            mem->device_ordinal() != device_ordinal)) {
70       return tensorflow::errors::InvalidArgument(
71           "Device buffer at index ", index_buffer.first.ToString(),
72           " has mismatching allocator/device");
73     }
74     shaped_buffer.set_buffer(index_buffer.second.AsDeviceMemoryBase(),
75                              index_buffer.first);
76   }
77   return std::move(shaped_buffer);
78 }
79 
ExecuteOnStream(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments,HloExecutionProfile * hlo_execution_profile)80 StatusOr<ScopedShapedBuffer> Executable::ExecuteOnStream(
81     const ServiceExecutableRunOptions* run_options,
82     absl::Span<const ShapedBuffer* const> arguments,
83     HloExecutionProfile* hlo_execution_profile) {
84   StatusOr<ScopedShapedBuffer> result =
85       ExecuteAsyncOnStream(run_options, arguments, hlo_execution_profile);
86   Status blocking_status = run_options->stream()->BlockHostUntilDone();
87   TF_RETURN_IF_ERROR(result.status());
88   TF_RETURN_IF_ERROR(blocking_status);
89   return result;
90 }
91 
MakeMaybeOwningDeviceMemoryTree(const ShapedBuffer & shaped_buffer)92 static ExecutionInput MakeMaybeOwningDeviceMemoryTree(
93     const ShapedBuffer& shaped_buffer) {
94   ExecutionInput result(shaped_buffer.on_device_shape());
95   shaped_buffer.buffers().ForEachElement(
96       [&](const ShapeIndex& index, const se::DeviceMemoryBase& mem) {
97         result.SetBuffer(index, MaybeOwningDeviceMemory(mem));
98       });
99   return result;
100 }
101 
ExecuteAsyncOnStream(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments,HloExecutionProfile * hlo_execution_profile)102 StatusOr<ScopedShapedBuffer> Executable::ExecuteAsyncOnStream(
103     const ServiceExecutableRunOptions* run_options,
104     absl::Span<const ShapedBuffer* const> arguments,
105     HloExecutionProfile* hlo_execution_profile) {
106   std::vector<ExecutionInput> args;
107   args.reserve(arguments.size());
108   for (const ShapedBuffer* arg : arguments) {
109     args.emplace_back(MakeMaybeOwningDeviceMemoryTree(*arg));
110   }
111   TF_ASSIGN_OR_RETURN(ExecutionOutput out,
112                       ExecuteAsyncOnStream(run_options, std::move(args),
113                                            hlo_execution_profile));
114   return out.ConsumeResult();
115 }
116 
ExecuteOnStream(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments,HloExecutionProfile * hlo_execution_profile)117 StatusOr<ExecutionOutput> Executable::ExecuteOnStream(
118     const ServiceExecutableRunOptions* run_options,
119     std::vector<ExecutionInput> arguments,
120     HloExecutionProfile* hlo_execution_profile) {
121   StatusOr<ExecutionOutput> result = ExecuteAsyncOnStream(
122       run_options, std::move(arguments), hlo_execution_profile);
123   Status blocking_status = run_options->stream()->BlockHostUntilDone();
124   TF_RETURN_IF_ERROR(result.status());
125   TF_RETURN_IF_ERROR(blocking_status);
126   return result;
127 }
128 
ExecuteOnStreams(absl::Span<const ServiceExecutableRunOptions> run_options,absl::Span<const absl::Span<const ShapedBuffer * const>> arguments)129 StatusOr<std::vector<ScopedShapedBuffer>> Executable::ExecuteOnStreams(
130     absl::Span<const ServiceExecutableRunOptions> run_options,
131     absl::Span<const absl::Span<const ShapedBuffer* const>> arguments) {
132   TF_RET_CHECK(run_options.size() == arguments.size());
133 
134   std::vector<ScopedShapedBuffer> return_values;
135   return_values.reserve(run_options.size());
136 
137   if (run_options.size() == 1) {
138     TF_ASSIGN_OR_RETURN(auto rv,
139                         ExecuteOnStream(&run_options[0], arguments[0],
140                                         /*hlo_execution_profile=*/nullptr));
141     return_values.push_back(std::move(rv));
142     return std::move(return_values);
143   }
144 
145   for (size_t i = 0; i < run_options.size(); ++i) {
146     // We cannot BlockHostUntilDone() on the already-launched executions in case
147     // of error, since if the executions communicate, the initially launched
148     // executions may never complete if not all executions are running.
149     TF_ASSIGN_OR_RETURN(
150         auto rv, ExecuteAsyncOnStream(&run_options[i], arguments[i],
151                                       /*hlo_execution_profile=*/nullptr));
152     return_values.push_back(std::move(rv));
153   }
154   for (const auto& options : run_options) {
155     TF_RET_CHECK(options.stream() != nullptr);
156     TF_RETURN_IF_ERROR(options.stream()->BlockHostUntilDone());
157   }
158   return std::move(return_values);
159 }
160 
ExecuteOnStreamWrapper(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments)161 StatusOr<ScopedShapedBuffer> Executable::ExecuteOnStreamWrapper(
162     const ServiceExecutableRunOptions* run_options,
163     absl::Span<const ShapedBuffer* const> arguments) {
164   StatusOr<ScopedShapedBuffer> result =
165       ExecuteAsyncOnStreamWrapper(run_options, arguments);
166   Status block_status = run_options->stream()->BlockHostUntilDone();
167   TF_RETURN_IF_ERROR(result.status());
168   TF_RETURN_IF_ERROR(block_status);
169   return result;
170 }
171 
ExecuteOnStreamWrapper(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments)172 StatusOr<ExecutionOutput> Executable::ExecuteOnStreamWrapper(
173     const ServiceExecutableRunOptions* run_options,
174     std::vector<ExecutionInput> arguments) {
175   StatusOr<ExecutionOutput> result =
176       ExecuteAsyncOnStreamWrapper(run_options, std::move(arguments));
177   Status block_status = run_options->stream()->BlockHostUntilDone();
178   TF_RETURN_IF_ERROR(result.status());
179   TF_RETURN_IF_ERROR(block_status);
180   return result;
181 }
182 
183 struct ExecuteAsyncOnStreamWrapperState {
184   ExecutionProfile* profile;
185   std::shared_ptr<se::Timer> timer;
186   std::shared_ptr<HloExecutionProfile> profile_ptr;
187 };
188 
ExecuteWrapperBeforeExecution(const Executable & executable,const ServiceExecutableRunOptions * run_options)189 static ExecuteAsyncOnStreamWrapperState ExecuteWrapperBeforeExecution(
190     const Executable& executable,
191     const ServiceExecutableRunOptions* run_options) {
192   ExecuteAsyncOnStreamWrapperState state;
193   se::Stream* stream = run_options->stream();
194   state.profile = run_options->run_options().execution_profile();
195   if (state.profile != nullptr) {
196     state.timer = std::make_shared<se::Timer>(stream->parent());
197     stream->InitTimer(state.timer.get()).ThenStartTimer(state.timer.get());
198   }
199 
200   VLOG(1) << "enqueueing executable on stream...";
201   // If the profiling flag isn't enabled, we pass nullptr as the profile to
202   // indicate profiling is not requested.
203   state.profile_ptr =
204       executable.module_config().debug_options().xla_hlo_profile() &&
205               executable.hlo_profiling_enabled()
206           ? std::make_shared<HloExecutionProfile>(
207                 &executable.hlo_profile_printer_data(),
208                 &executable.hlo_profile_index_map())
209           : nullptr;
210   return state;
211 }
212 
ExecuteWrapperAfterExecution(Executable * executable,const ExecuteAsyncOnStreamWrapperState & state,Status return_status,se::Stream * stream)213 Status ExecuteWrapperAfterExecution(
214     Executable* executable, const ExecuteAsyncOnStreamWrapperState& state,
215     Status return_status, se::Stream* stream) {
216   if (!return_status.ok()) {
217     if (state.profile != nullptr) {
218       // Ensure the ThenStartTimer call has completed before we destroy timer.
219       // We already have a failure status to return, so just log this if it
220       // fails.
221       Status status = stream->BlockHostUntilDone();
222       if (!status.ok()) {
223         LOG(ERROR) << "Failed to BlockHostUntilDone: " << status;
224       }
225     }
226     return return_status;
227   }
228 
229   if (state.profile != nullptr) {
230     VLOG(1) << "enqueueing 'stop timer' and profiling callback...";
231     stream->ThenStopTimer(state.timer.get());
232 
233     // We block instead of using an async callback because reading the timer
234     // value may call back into the driver on GPU, which is not allowed.
235     TF_RETURN_IF_ERROR(stream->BlockHostUntilDone());
236 
237     const int64_t executable_size_in_bytes =
238         executable->SizeOfGeneratedCodeInBytes();
239     // Merge in run-time profile information from execution_profile.
240 
241     // Overall execution time (in nanoseconds) from the executor timer.
242     state.profile->set_compute_and_transfer_time_ns(state.timer->Nanoseconds());
243 
244     // TODO(b/28447609): The value in compute_and_transfer_time_ns is actually
245     // the compute time without the transfer time, so this way we get the
246     // correct compute time. We should instead have the correct value for
247     // compute_and_transfer_time and set compute_time to the compute time.
248     if (state.profile->compute_time_ns() == 0) {
249       state.profile->set_compute_time_ns(
250           state.profile->compute_and_transfer_time_ns());
251     }
252 
253     if (executable_size_in_bytes != 0) {
254       state.profile->set_executable_size_in_bytes(executable_size_in_bytes);
255     }
256   }
257 
258   if (executable->module_config().debug_options().xla_hlo_profile() &&
259       state.profile_ptr != nullptr) {
260     DumpToFileInDir(executable->module(), /*file_prefix=*/"",
261                     /*file_suffix=*/"hlo_execution_profile_data",
262                     state.profile_ptr->ToProto().SerializeAsString());
263   }
264 
265   if (state.profile_ptr != nullptr) {
266     const se::DeviceDescription* device_description =
267         &stream->parent()->GetDeviceDescription();
268     std::shared_ptr<HloExecutionProfile> profile = state.profile_ptr;
269     stream->ThenDoHostCallback([profile, device_description]() {
270       XLA_LOG_LINES(tensorflow::INFO,
271                     profile->ToString(device_description->clock_rate_ghz()));
272     });
273   }
274 
275   return return_status;
276 }
277 
ExecuteAsyncOnStreamWrapper(const ServiceExecutableRunOptions * run_options,absl::Span<const ShapedBuffer * const> arguments)278 StatusOr<ScopedShapedBuffer> Executable::ExecuteAsyncOnStreamWrapper(
279     const ServiceExecutableRunOptions* run_options,
280     absl::Span<const ShapedBuffer* const> arguments) {
281   auto state = ExecuteWrapperBeforeExecution(*this, run_options);
282   StatusOr<ScopedShapedBuffer> return_value =
283       ExecuteAsyncOnStream(run_options, arguments, state.profile_ptr.get());
284   TF_RETURN_IF_ERROR(ExecuteWrapperAfterExecution(
285       this, state, return_value.status(), run_options->stream()));
286   return return_value;
287 }
288 
ExecuteAsyncOnStreamWrapper(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments)289 StatusOr<ExecutionOutput> Executable::ExecuteAsyncOnStreamWrapper(
290     const ServiceExecutableRunOptions* run_options,
291     std::vector<ExecutionInput> arguments) {
292   auto state = ExecuteWrapperBeforeExecution(*this, run_options);
293   StatusOr<ExecutionOutput> return_value = ExecuteAsyncOnStream(
294       run_options, std::move(arguments), state.profile_ptr.get());
295   TF_RETURN_IF_ERROR(ExecuteWrapperAfterExecution(
296       this, state, return_value.status(), run_options->stream()));
297   return return_value;
298 }
299 
SizeOfGeneratedCodeInBytes() const300 int64_t Executable::SizeOfGeneratedCodeInBytes() const { return -1; }
301 
MarkToBeReleasedArguments(absl::Span<ExecutionInput> arguments,ExecutionOutput & result)302 void Executable::MarkToBeReleasedArguments(absl::Span<ExecutionInput> arguments,
303                                            ExecutionOutput& result) {
304   for (ExecutionInput& argument : arguments) {
305     for (auto& index_buffer : *argument.MutableBuffers()) {
306       if (std::optional<se::OwningDeviceMemory> maybe_owning_buffer =
307               index_buffer.second.Release()) {
308         result.AddToBeReleased(std::move(*maybe_owning_buffer));
309       }
310     }
311   }
312 }
313 
314 }  // namespace xla
315