• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/stream_executor/tpu/tpu_executable_interface.h"
17 
18 #include <utility>
19 
20 #include "absl/algorithm/container.h"
21 #include "tensorflow/compiler/xla/service/maybe_owning_device_memory.h"
22 #include "tensorflow/compiler/xla/service/shaped_buffer.h"
23 #include "tensorflow/compiler/xla/service/transfer_manager.h"
24 #include "tensorflow/compiler/xla/shape_util.h"
25 #include "tensorflow/compiler/xla/status_macros.h"
26 #include "tensorflow/compiler/xla/util.h"
27 
28 namespace xla {
29 
30 namespace {
31 
32 // Write the tuple index buffers (arrays of pointers).
PopulateResultTupleBuffers(const ShapedBuffer & result,se::Stream * stream,se::Stream * transfer_stream)33 static Status PopulateResultTupleBuffers(const ShapedBuffer& result,
34                                          se::Stream* stream,
35                                          se::Stream* transfer_stream) {
36   TF_ASSIGN_OR_RETURN(auto transfer_manager, TransferManager::GetForPlatform(
37                                                  stream->parent()->platform()));
38   if (transfer_manager->CanShapedBufferBeAccessedNow(stream->parent(),
39                                                      result)) {
40     TF_RETURN_IF_ERROR(transfer_manager->WriteTupleIndexTablesAsync(
41         transfer_stream ? transfer_stream : stream, result));
42     if (transfer_stream && transfer_stream != stream) {
43       stream->ThenWaitFor(transfer_stream);
44     }
45     return Status::OK();
46   } else {
47     return transfer_manager->WriteTupleIndexTablesAsync(stream, result);
48   }
49 }
50 
51 }  // namespace
52 
53 StatusOr<ExecutionOutput>
AllocateOutputMemoryWithInputReuse(const Shape & shape,const HloInputOutputAliasConfig & alias_config,se::DeviceMemoryAllocator * allocator,std::vector<ExecutionInput> * arguments,se::Stream * stream,se::Stream * transfer_stream)54 TpuExecutableInterface::AllocateOutputMemoryWithInputReuse(
55     const Shape& shape, const HloInputOutputAliasConfig& alias_config,
56     se::DeviceMemoryAllocator* allocator,
57     std::vector<ExecutionInput>* arguments, se::Stream* stream,
58     se::Stream* transfer_stream) {
59   auto stream_exec = stream->parent();
60   auto device_ordinal = stream_exec->device_ordinal();
61   VLOG(3) << "AllocateOutputMemoryWithInputReuse, device = " << device_ordinal
62           << " shape = " << ShapeUtil::HumanStringWithLayout(shape);
63   auto update_layout = [this](xla::Shape* subshape,
64                               const xla::ShapeIndex& index) {
65     if (subshape->IsArray()) {
66       CHECK(subshape->has_layout());
67       if (!subshape->layout().tiles().empty()) {
68         // Already in device shape.
69         return;
70       }
71       *subshape = HostShapeToDeviceShape(*subshape);
72     }
73   };
74   Shape device_shape = shape;
75   xla::ShapeUtil::ForEachMutableSubshape(&device_shape, update_layout);
76 
77   TF_RETURN_IF_ERROR(alias_config.ForEachAliasWithStatus(
78       [&](const ShapeIndex& output_index,
79           absl::optional<HloInputOutputAliasConfig::Alias> alias) {
80         if (alias && alias->must_alias()) {
81           VLOG(1) << alias->ToString();
82           const MaybeOwningDeviceMemory& original_input =
83               (*arguments)[alias->parameter_number].Buffers().element(
84                   alias->parameter_index);
85           if (!original_input.HasOwnership()) {
86             return InvalidArgument(
87                 "An input was configured to be must-alias at "
88                 "compile time but not donated at runtime: %s",
89                 alias->ToString());
90           }
91         }
92         return Status::OK();
93       }));
94 
95   if (VLOG_IS_ON(3)) {
96     VLOG(3) << "AllocateOutputMemoryWithInputReuse, device = " << device_ordinal
97             << " shape = " << ShapeUtil::HumanStringWithLayout(shape);
98     if (!Shape::Equal().MinorToMajorOnlyInLayout()(shape, device_shape)) {
99       VLOG(3) << "Rewrote shape to device_shape: "
100               << ShapeUtil::HumanStringWithLayout(shape) << " -> "
101               << ShapeUtil::HumanStringWithLayout(device_shape);
102     }
103   }
104 
105   ExecutionOutput result(std::move(device_shape), allocator, device_ordinal);
106   // Iterate through and allocate a buffer for each shape index, checking for
107   // possible input buffer reuse.
108   int64_t reused_buffer_bytes = 0;
109   int64_t total_result_buffer_bytes = 0;
110   for (auto& pair : result.MutableResult()->buffers()) {
111     const ShapeIndex& result_index = pair.first;
112     se::DeviceMemoryBase& result_buffer = pair.second;
113     int64_t allocation_bytes = ShapeSize(ShapeUtil::GetSubshape(
114         result.Result().on_device_shape(), result_index));
115     total_result_buffer_bytes += allocation_bytes;
116 
117     // Return an InternalError if result_index is invalid. This avoids failing
118     // the CHECK when calling GetAliasedParameter
119     if (!ShapeUtil::IndexIsValid(alias_config.shape(), result_index)) {
120       return InternalError("result_index is invalid: %s",
121                            result_index.ToString());
122     }
123 
124     absl::optional<HloInputOutputAliasConfig::Alias> alias =
125         alias_config.GetAliasedParameter(result_index);
126     if (alias) {
127       TF_RET_CHECK(alias->parameter_number < arguments->size());
128       ExecutionInput& input = (*arguments)[alias->parameter_number];
129       MaybeOwningDeviceMemory* device_memory =
130           input.MutableBuffer(alias->parameter_index);
131       if (auto owning = device_memory->Release()) {
132         // If the caller passes the ownership of the device memory, reuse it
133         // as the output buffer. It is up to the caller whether or not to
134         // donate a buffer; the aliasing information describes which buffers
135         // may alias, not buffers that must alias.
136         se::DeviceMemoryBase device_memory_base = owning->Release();
137         *device_memory = device_memory_base;
138         result_buffer = device_memory_base;
139         reused_buffer_bytes += allocation_bytes;
140         // The caller is giving us the input buffer, but in case of error of the
141         // execute call, we should not be releasing it as it contains valid data
142         // (for example, it is a parameter which the user wants us to alias, in
143         // a gradient update computation). So we store the index into the result
144         // in the aliased vactor, which will be fed to the ExecutionOutput,
145         // which will be using the indices to drop the addresses from its own
146         // ScopedShapedBuffer result, if the ExecutionOutput is not committed.
147         result.AddAliasedIndex(result_index);
148       } else {
149         VLOG(2) << "An input was not reused since it is not donated "
150                 << alias->ToString();
151       }
152     }
153 
154     // We need to allocate a new output buffer for two cases:
155     // - There is no alias between this output and any input.
156     // - There is an alias, but the xla doesn't own the input memory so it can't
157     // donate buffer to the computation.
158     if (result_buffer.is_null()) {
159       const Shape& on_device_shape = result.Result().on_device_shape();
160       const Shape& on_device_subshape =
161           ShapeUtil::GetSubshape(on_device_shape, result_index);
162       TF_ASSIGN_OR_RETURN(
163           auto allocated_buffer,
164           allocator->Allocate(device_ordinal, allocation_bytes,
165                               /*retry_on_failure=*/true,
166                               on_device_subshape.layout().memory_space()));
167       // Store the allocated buffer in our ScopedShapedBuffer, which takes
168       // ownership.
169       result_buffer = allocated_buffer.Release();
170     }
171     TF_RET_CHECK(allocation_bytes == 0 || result_buffer != nullptr);
172   }
173 
174   VLOG(1) << "Reused " << reused_buffer_bytes
175           << " parameter buffers (total result buffer size: "
176           << total_result_buffer_bytes << ")";
177 
178   TF_RETURN_IF_ERROR(
179       PopulateResultTupleBuffers(result.Result(), stream, transfer_stream));
180   return std::move(result);
181 }
182 
ExecuteAsyncOnStream(const ServiceExecutableRunOptions * run_options,std::vector<ExecutionInput> arguments,HloExecutionProfile *)183 StatusOr<ExecutionOutput> TpuExecutableInterface::ExecuteAsyncOnStream(
184     const ServiceExecutableRunOptions* run_options,
185     std::vector<ExecutionInput> arguments,
186     HloExecutionProfile* /*hlo_execution_profile*/) {
187   std::vector<se::DeviceMemoryBase> memory_bases;
188   memory_bases.reserve(arguments.size());
189   for (auto& argument : arguments) {
190     memory_bases.push_back(argument.Buffer({}).AsDeviceMemoryBase());
191   }
192   se::Stream* stream = run_options->stream();
193 
194   CHECK_NE(run_options->allocator(), nullptr);
195   const Shape& shape =
196       hlo_module_ == nullptr ? ShapeUtil::MakeNil() : result_shape();
197   const HloInputOutputAliasConfig& alias_config =
198       hlo_module_ == nullptr ? HloInputOutputAliasConfig()
199                              : hlo_module_->input_output_alias_config();
200   TF_ASSIGN_OR_RETURN(
201       ExecutionOutput result,
202       AllocateOutputMemoryWithInputReuse(
203           shape, alias_config, run_options->allocator(), &arguments, stream,
204           run_options->run_options().host_to_device_stream()));
205 
206   // Address of the buffer in TPU memory that is being speculated.
207   absl::optional<se::DeviceMemoryBase> cross_program_prefetch_addr;
208   if (hlo_module_) {
209     for (const auto& prefetch : hlo_module_->CrossProgramPrefetches()) {
210       const auto& parameter = prefetch.first;
211       const auto& index = prefetch.second;
212       CHECK_LT(parameter, arguments.size());
213       // Ensure the cross program prefetched buffer doesn't alias with any
214       // program outputs. If the input and output aliased, the buffer could be
215       // invalidated during program execution and the program could read stale
216       // data from fast memory instead of fresh data in large memory.
217       auto it = arguments[parameter].MutableBuffers()->find({index});
218       CHECK(it != arguments[parameter].MutableBuffers()->end());
219       CHECK(!it->second.AsDeviceMemoryBase().is_null());
220       if (absl::c_none_of(result.Result().buffers(), [&](auto index_addr_pair) {
221             return index_addr_pair.second.IsSameAs(
222                 it->second.AsDeviceMemoryBase());
223           })) {
224         // Supports only one cross-program prefetch address.
225         cross_program_prefetch_addr = it->second.AsDeviceMemoryBase();
226       }
227     }
228   }
229 
230   // MarkToBeReleasedArguments may std::move some elements of arguments, so it
231   // must run after the cross program prefetch address is calculated from the
232   // arguments.
233   MarkToBeReleasedArguments(absl::MakeSpan(arguments), result);
234 
235   TF_RETURN_IF_ERROR(LoadProgramAndEnqueueToStream(
236       *run_options, memory_bases, result.Result().root_buffer(),
237       cross_program_prefetch_addr));
238   return std::move(result);
239 }
240 
241 }  // namespace xla
242