1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/stream_executor/tpu/tpu_executable_interface.h"
17
18 #include <utility>
19
20 #include "absl/algorithm/container.h"
21 #include "tensorflow/compiler/xla/service/maybe_owning_device_memory.h"
22 #include "tensorflow/compiler/xla/service/shaped_buffer.h"
23 #include "tensorflow/compiler/xla/service/transfer_manager.h"
24 #include "tensorflow/compiler/xla/shape_util.h"
25 #include "tensorflow/compiler/xla/status_macros.h"
26 #include "tensorflow/compiler/xla/util.h"
27
28 namespace xla {
29
30 namespace {
31
32 // Write the tuple index buffers (arrays of pointers).
PopulateResultTupleBuffers(const ShapedBuffer & result,se::Stream * stream,se::Stream * transfer_stream)33 static Status PopulateResultTupleBuffers(const ShapedBuffer& result,
34 se::Stream* stream,
35 se::Stream* transfer_stream) {
36 TF_ASSIGN_OR_RETURN(auto transfer_manager, TransferManager::GetForPlatform(
37 stream->parent()->platform()));
38 if (transfer_manager->CanShapedBufferBeAccessedNow(stream->parent(),
39 result)) {
40 TF_RETURN_IF_ERROR(transfer_manager->WriteTupleIndexTablesAsync(
41 transfer_stream ? transfer_stream : stream, result));
42 if (transfer_stream && transfer_stream != stream) {
43 stream->ThenWaitFor(transfer_stream);
44 }
45 return Status::OK();
46 } else {
47 return transfer_manager->WriteTupleIndexTablesAsync(stream, result);
48 }
49 }
50
51 } // namespace
52
53 StatusOr<ExecutionOutput>
AllocateOutputMemoryWithInputReuse(const Shape & shape,const HloInputOutputAliasConfig & alias_config,se::DeviceMemoryAllocator * allocator,std::vector<ExecutionInput> * arguments,se::Stream * stream,se::Stream * transfer_stream)54 TpuExecutableInterface::AllocateOutputMemoryWithInputReuse(
55 const Shape& shape, const HloInputOutputAliasConfig& alias_config,
56 se::DeviceMemoryAllocator* allocator,
57 std::vector<ExecutionInput>* arguments, se::Stream* stream,
58 se::Stream* transfer_stream) {
59 auto stream_exec = stream->parent();
60 auto device_ordinal = stream_exec->device_ordinal();
61 VLOG(3) << "AllocateOutputMemoryWithInputReuse, device = " << device_ordinal
62 << " shape = " << ShapeUtil::HumanStringWithLayout(shape);
63 auto update_layout = [this](xla::Shape* subshape,
64 const xla::ShapeIndex& index) {
65 if (subshape->IsArray()) {
66 CHECK(subshape->has_layout());
67 if (!subshape->layout().tiles().empty()) {
68 // Already in device shape.
69 return;
70 }
71 *subshape = HostShapeToDeviceShape(*subshape);
72 }
73 };
74 Shape device_shape = shape;
75 xla::ShapeUtil::ForEachMutableSubshape(&device_shape, update_layout);
76
77 TF_RETURN_IF_ERROR(alias_config.ForEachAliasWithStatus(
78 [&](const ShapeIndex& output_index,
79 absl::optional<HloInputOutputAliasConfig::Alias> alias) {
80 if (alias && alias->must_alias()) {
81 VLOG(1) << alias->ToString();
82 const MaybeOwningDeviceMemory& original_input =
83 (*arguments)[alias->parameter_number].Buffers().element(
84 alias->parameter_index);
85 if (!original_input.HasOwnership()) {
86 return InvalidArgument(
87 "An input was configured to be must-alias at "
88 "compile time but not donated at runtime: %s",
89 alias->ToString());
90 }
91 }
92 return Status::OK();
93 }));
94
95 if (VLOG_IS_ON(3)) {
96 VLOG(3) << "AllocateOutputMemoryWithInputReuse, device = " << device_ordinal
97 << " shape = " << ShapeUtil::HumanStringWithLayout(shape);
98 if (!Shape::Equal().MinorToMajorOnlyInLayout()(shape, device_shape)) {
99 VLOG(3) << "Rewrote shape to device_shape: "
100 << ShapeUtil::HumanStringWithLayout(shape) << " -> "
101 << ShapeUtil::HumanStringWithLayout(device_shape);
102 }
103 }
104
105 ExecutionOutput result(std::move(device_shape), allocator, device_ordinal);
106 // Iterate through and allocate a buffer for each shape index, checking for
107 // possible input buffer reuse.
108 int64_t reused_buffer_bytes = 0;
109 int64_t total_result_buffer_bytes = 0;
110 for (auto& pair : result.MutableResult()->buffers()) {
111 const ShapeIndex& result_index = pair.first;
112 se::DeviceMemoryBase& result_buffer = pair.second;
113 int64_t allocation_bytes = ShapeSize(ShapeUtil::GetSubshape(
114 result.Result().on_device_shape(), result_index));
115 total_result_buffer_bytes += allocation_bytes;
116
117 // Return an InternalError if result_index is invalid. This avoids failing
118 // the CHECK when calling GetAliasedParameter
119 if (!ShapeUtil::IndexIsValid(alias_config.shape(), result_index)) {
120 return InternalError("result_index is invalid: %s",
121 result_index.ToString());
122 }
123
124 absl::optional<HloInputOutputAliasConfig::Alias> alias =
125 alias_config.GetAliasedParameter(result_index);
126 if (alias) {
127 TF_RET_CHECK(alias->parameter_number < arguments->size());
128 ExecutionInput& input = (*arguments)[alias->parameter_number];
129 MaybeOwningDeviceMemory* device_memory =
130 input.MutableBuffer(alias->parameter_index);
131 if (auto owning = device_memory->Release()) {
132 // If the caller passes the ownership of the device memory, reuse it
133 // as the output buffer. It is up to the caller whether or not to
134 // donate a buffer; the aliasing information describes which buffers
135 // may alias, not buffers that must alias.
136 se::DeviceMemoryBase device_memory_base = owning->Release();
137 *device_memory = device_memory_base;
138 result_buffer = device_memory_base;
139 reused_buffer_bytes += allocation_bytes;
140 // The caller is giving us the input buffer, but in case of error of the
141 // execute call, we should not be releasing it as it contains valid data
142 // (for example, it is a parameter which the user wants us to alias, in
143 // a gradient update computation). So we store the index into the result
144 // in the aliased vactor, which will be fed to the ExecutionOutput,
145 // which will be using the indices to drop the addresses from its own
146 // ScopedShapedBuffer result, if the ExecutionOutput is not committed.
147 result.AddAliasedIndex(result_index);
148 } else {
149 VLOG(2) << "An input was not reused since it is not donated "
150 << alias->ToString();
151 }
152 }
153
154 // We need to allocate a new output buffer for two cases:
155 // - There is no alias between this output and any input.
156 // - There is an alias, but the xla doesn't own the input memory so it can't
157 // donate buffer to the computation.
158 if (result_buffer.is_null()) {
159 const Shape& on_device_shape = result.Result().on_device_shape();
160 const Shape& on_device_subshape =
161 ShapeUtil::GetSubshape(on_device_shape, result_index);
162 TF_ASSIGN_OR_RETURN(
163 auto allocated_buffer,
164 allocator->Allocate(device_ordinal, allocation_bytes,
165 /*retry_on_failure=*/true,
166 on_device_subshape.layout().memory_space()));
167 // Store the allocated buffer in our ScopedShapedBuffer, which takes
168 // ownership.
169 result_buffer = allocated_buffer.Release();
170 }
171 TF_RET_CHECK(allocation_bytes == 0 || result_buffer != nullptr);
172 }
173
174 VLOG(1) << "Reused " << reused_buffer_bytes
175 << " parameter buffers (total result buffer size: "
176 << total_result_buffer_bytes << ")";
177
178 TF_RETURN_IF_ERROR(
179 PopulateResultTupleBuffers(result.Result(), stream, transfer_stream));
180 return std::move(result);
181 }
182
ExecuteAsyncOnStream(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments,HloExecutionProfile *)183 StatusOr<ExecutionOutput> TpuExecutableInterface::ExecuteAsyncOnStream(
184 const ServiceExecutableRunOptions* run_options,
185 std::vector<ExecutionInput> arguments,
186 HloExecutionProfile* /*hlo_execution_profile*/) {
187 std::vector<se::DeviceMemoryBase> memory_bases;
188 memory_bases.reserve(arguments.size());
189 for (auto& argument : arguments) {
190 memory_bases.push_back(argument.Buffer({}).AsDeviceMemoryBase());
191 }
192 se::Stream* stream = run_options->stream();
193
194 CHECK_NE(run_options->allocator(), nullptr);
195 const Shape& shape =
196 hlo_module_ == nullptr ? ShapeUtil::MakeNil() : result_shape();
197 const HloInputOutputAliasConfig& alias_config =
198 hlo_module_ == nullptr ? HloInputOutputAliasConfig()
199 : hlo_module_->input_output_alias_config();
200 TF_ASSIGN_OR_RETURN(
201 ExecutionOutput result,
202 AllocateOutputMemoryWithInputReuse(
203 shape, alias_config, run_options->allocator(), &arguments, stream,
204 run_options->run_options().host_to_device_stream()));
205
206 // Address of the buffer in TPU memory that is being speculated.
207 absl::optional<se::DeviceMemoryBase> cross_program_prefetch_addr;
208 if (hlo_module_) {
209 for (const auto& prefetch : hlo_module_->CrossProgramPrefetches()) {
210 const auto& parameter = prefetch.first;
211 const auto& index = prefetch.second;
212 CHECK_LT(parameter, arguments.size());
213 // Ensure the cross program prefetched buffer doesn't alias with any
214 // program outputs. If the input and output aliased, the buffer could be
215 // invalidated during program execution and the program could read stale
216 // data from fast memory instead of fresh data in large memory.
217 auto it = arguments[parameter].MutableBuffers()->find({index});
218 CHECK(it != arguments[parameter].MutableBuffers()->end());
219 CHECK(!it->second.AsDeviceMemoryBase().is_null());
220 if (absl::c_none_of(result.Result().buffers(), [&](auto index_addr_pair) {
221 return index_addr_pair.second.IsSameAs(
222 it->second.AsDeviceMemoryBase());
223 })) {
224 // Supports only one cross-program prefetch address.
225 cross_program_prefetch_addr = it->second.AsDeviceMemoryBase();
226 }
227 }
228 }
229
230 // MarkToBeReleasedArguments may std::move some elements of arguments, so it
231 // must run after the cross program prefetch address is calculated from the
232 // arguments.
233 MarkToBeReleasedArguments(absl::MakeSpan(arguments), result);
234
235 TF_RETURN_IF_ERROR(LoadProgramAndEnqueueToStream(
236 *run_options, memory_bases, result.Result().root_buffer(),
237 cross_program_prefetch_addr));
238 return std::move(result);
239 }
240
241 } // namespace xla
242