1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #define EIGEN_USE_THREADS
17
18 #include "tensorflow/compiler/xla/service/backend.h"
19
20 #include <algorithm>
21 #include <string>
22 #include <utility>
23
24 #include "absl/memory/memory.h"
25 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
26 #include "tensorflow/compiler/xla/service/compiler.h"
27 #include "tensorflow/compiler/xla/service/platform_util.h"
28 #include "tensorflow/compiler/xla/status_macros.h"
29 #include "tensorflow/compiler/xla/statusor.h"
30 #include "tensorflow/compiler/xla/types.h"
31 #include "tensorflow/compiler/xla/util.h"
32 #include "tensorflow/core/lib/core/errors.h"
33 #include "tensorflow/core/lib/core/threadpool.h"
34 #include "tensorflow/core/platform/byte_order.h"
35 #include "tensorflow/core/platform/cpu_info.h"
36 #include "tensorflow/core/platform/env.h"
37 #include "tensorflow/core/platform/logging.h"
38 #include "tensorflow/core/platform/stream_executor_no_cuda.h"
39
40 namespace xla {
41
set_platform(se::Platform * platform)42 BackendOptions& BackendOptions::set_platform(se::Platform* platform) {
43 platform_ = platform;
44 return *this;
45 }
46
platform() const47 se::Platform* BackendOptions::platform() const { return platform_; }
48
set_intra_op_parallelism_threads(int num_threads)49 BackendOptions& BackendOptions::set_intra_op_parallelism_threads(
50 int num_threads) {
51 intra_op_parallelism_threads_ = num_threads;
52 return *this;
53 }
54
intra_op_parallelism_threads() const55 int BackendOptions::intra_op_parallelism_threads() const {
56 return intra_op_parallelism_threads_;
57 }
58
set_allowed_devices(const absl::optional<std::set<int>> & allowed_devices)59 BackendOptions& BackendOptions::set_allowed_devices(
60 const absl::optional<std::set<int>>& allowed_devices) {
61 allowed_devices_ = allowed_devices;
62 return *this;
63 }
64
allowed_devices() const65 const absl::optional<std::set<int>>& BackendOptions::allowed_devices() const {
66 return allowed_devices_;
67 }
68
69 namespace {
70
71 class EigenThreadPoolWrapper : public Eigen::ThreadPoolInterface {
72 public:
EigenThreadPoolWrapper(tensorflow::thread::ThreadPool * pool)73 explicit EigenThreadPoolWrapper(tensorflow::thread::ThreadPool* pool)
74 : pool_(pool) {}
~EigenThreadPoolWrapper()75 ~EigenThreadPoolWrapper() override {}
76
Schedule(std::function<void ()> fn)77 void Schedule(std::function<void()> fn) override {
78 pool_->Schedule(std::move(fn));
79 }
NumThreads() const80 int NumThreads() const override { return pool_->NumThreads(); }
CurrentThreadId() const81 int CurrentThreadId() const override { return pool_->CurrentThreadId(); }
82
83 private:
84 tensorflow::thread::ThreadPool* pool_ = nullptr;
85 };
86
87 } // namespace
88
89 // Define this in .cc file to avoid having to include eigen or forward declare
90 // these types in the header.
91 struct Backend::IntraOpThreadPool {
IntraOpThreadPoolxla::Backend::IntraOpThreadPool92 explicit IntraOpThreadPool(const int num_threads)
93 : pool(new tensorflow::thread::ThreadPool(tensorflow::Env::Default(),
94 "XLAEigen", num_threads)),
95 wrapper(new EigenThreadPoolWrapper(pool.get())),
96 device(new Eigen::ThreadPoolDevice(wrapper.get(),
97 wrapper->NumThreads())) {}
98
99 std::unique_ptr<tensorflow::thread::ThreadPool> pool;
100 std::unique_ptr<EigenThreadPoolWrapper> wrapper;
101 std::unique_ptr<Eigen::ThreadPoolDevice> device;
102 };
103
CreateBackend(const BackendOptions & options)104 /* static */ StatusOr<std::unique_ptr<Backend>> Backend::CreateBackend(
105 const BackendOptions& options) {
106 se::Platform* platform = options.platform();
107 TF_ASSIGN_OR_RETURN(auto compiler, Compiler::GetForPlatform(platform));
108 TF_ASSIGN_OR_RETURN(
109 auto stream_executors,
110 PlatformUtil::GetStreamExecutors(platform, options.allowed_devices()));
111 TF_ASSIGN_OR_RETURN(auto transfer_manager,
112 TransferManager::GetForPlatform(platform));
113 TF_ASSIGN_OR_RETURN(auto computation_placer,
114 ComputationPlacer::GetForPlatform(platform));
115 std::unique_ptr<Backend> backend(
116 new Backend(platform, compiler, stream_executors, transfer_manager,
117 computation_placer, options.intra_op_parallelism_threads()));
118 return std::move(backend);
119 }
120
121 /* static */ StatusOr<std::unique_ptr<Backend>>
CreateDefaultBackend()122 Backend::CreateDefaultBackend() {
123 TF_ASSIGN_OR_RETURN(se::Platform * platform,
124 PlatformUtil::GetDefaultPlatform());
125 BackendOptions backend_options;
126 backend_options.set_platform(platform);
127 return CreateBackend(backend_options);
128 }
129
BorrowStream(int device_ordinal)130 StatusOr<StreamPool::Ptr> Backend::BorrowStream(int device_ordinal) {
131 TF_ASSIGN_OR_RETURN(auto executor, stream_executor(device_ordinal));
132 return BorrowStream(executor);
133 }
134
BorrowStream(se::StreamExecutor * executor)135 StatusOr<StreamPool::Ptr> Backend::BorrowStream(se::StreamExecutor* executor) {
136 tensorflow::mutex_lock l(mu_);
137 if (!stream_pools_.contains(executor)) {
138 stream_pools_.emplace(executor, absl::make_unique<StreamPool>());
139 }
140 return stream_pools_.at(executor)->BorrowStream(executor);
141 }
142
Backend(se::Platform * platform,Compiler * compiler,absl::Span<se::StreamExecutor * const> stream_executors,TransferManager * transfer_manager,ComputationPlacer * computation_placer,int intra_op_parallelism_threads)143 Backend::Backend(se::Platform* platform, Compiler* compiler,
144 absl::Span<se::StreamExecutor* const> stream_executors,
145 TransferManager* transfer_manager,
146 ComputationPlacer* computation_placer,
147 int intra_op_parallelism_threads)
148 : platform_(platform),
149 compiler_(compiler),
150 transfer_manager_(transfer_manager),
151 computation_placer_(computation_placer) {
152 // The given set of stream executors set may include invalid executors.
153 for (se::StreamExecutor* exec : stream_executors) {
154 if (exec != nullptr) {
155 stream_executors_.push_back(exec);
156 }
157 }
158 // Create a memory allocator for the valid stream executors.
159 memory_allocator_ = absl::make_unique<StreamExecutorMemoryAllocator>(
160 platform, stream_executors);
161 CHECK(!stream_executors_.empty())
162 << "Service found no devices for backend " << platform_->Name() << '.';
163
164 if (platform->id() == se::host::kHostPlatformId) {
165 const int num_threads = intra_op_parallelism_threads > 0
166 ? intra_op_parallelism_threads
167 : tensorflow::port::NumSchedulableCPUs();
168 intra_op_thread_pool_.reset(new IntraOpThreadPool(num_threads));
169 }
170 }
171
~Backend()172 Backend::~Backend() {}
173
default_device_ordinal() const174 int Backend::default_device_ordinal() const {
175 return default_stream_executor()->device_ordinal();
176 }
177
eigen_intra_op_thread_pool_device() const178 const Eigen::ThreadPoolDevice* Backend::eigen_intra_op_thread_pool_device()
179 const {
180 if (intra_op_thread_pool_ == nullptr) {
181 return nullptr;
182 }
183 return intra_op_thread_pool_->device.get();
184 }
185
eigen_intra_op_thread_pool() const186 tensorflow::thread::ThreadPool* Backend::eigen_intra_op_thread_pool() const {
187 if (intra_op_thread_pool_ == nullptr) {
188 return nullptr;
189 }
190 return intra_op_thread_pool_->pool.get();
191 }
192
stream_executor(int device_ordinal) const193 StatusOr<se::StreamExecutor*> Backend::stream_executor(
194 int device_ordinal) const {
195 if (device_ordinal < 0 ||
196 device_ordinal > stream_executors_.back()->device_ordinal()) {
197 return InvalidArgument(
198 "Invalid device ordinal value (%d). Valid range is [0, %d].",
199 device_ordinal, stream_executors_.back()->device_ordinal());
200 }
201 for (auto* executor : stream_executors_) {
202 if (executor->device_ordinal() == device_ordinal) {
203 return executor;
204 }
205 }
206 return InvalidArgument("device %s not supported by XLA service",
207 device_name(device_ordinal));
208 }
209
devices_equivalent(int device_ordinal_a,int device_ordinal_b)210 StatusOr<bool> Backend::devices_equivalent(int device_ordinal_a,
211 int device_ordinal_b) {
212 // Use the name from device description to determine equivalence. This is a
213 // bit crude but works for GPUs which is the important case where we compile
214 // an executable for one GPU and want to know if it will run (well) on
215 // another.
216 TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_a,
217 stream_executor(device_ordinal_a));
218 TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_b,
219 stream_executor(device_ordinal_b));
220 return (executor_a->GetDeviceDescription().name() ==
221 executor_b->GetDeviceDescription().name());
222 }
223
ResetDevices()224 Status Backend::ResetDevices() {
225 return transfer_manager_->ResetDevices(stream_executors_);
226 }
227
228 } // namespace xla
229