1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #define EIGEN_USE_THREADS
17
18 #include "tensorflow/compiler/xla/service/backend.h"
19
20 #include <algorithm>
21 #include <string>
22 #include <utility>
23
24 #include "absl/memory/memory.h"
25 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
26 #include "tensorflow/compiler/xla/service/compiler.h"
27 #include "tensorflow/compiler/xla/service/platform_util.h"
28 #include "tensorflow/compiler/xla/status_macros.h"
29 #include "tensorflow/compiler/xla/statusor.h"
30 #include "tensorflow/compiler/xla/types.h"
31 #include "tensorflow/compiler/xla/util.h"
32 #include "tensorflow/core/lib/core/errors.h"
33 #include "tensorflow/core/lib/core/threadpool.h"
34 #include "tensorflow/core/platform/byte_order.h"
35 #include "tensorflow/core/platform/cpu_info.h"
36 #include "tensorflow/core/platform/env.h"
37 #include "tensorflow/core/platform/logging.h"
38 #include "tensorflow/core/platform/stream_executor_no_cuda.h"
39
40 namespace xla {
41
set_platform(se::Platform * platform)42 BackendOptions& BackendOptions::set_platform(se::Platform* platform) {
43 platform_ = platform;
44 return *this;
45 }
46
platform() const47 se::Platform* BackendOptions::platform() const { return platform_; }
48
set_intra_op_parallelism_threads(int num_threads)49 BackendOptions& BackendOptions::set_intra_op_parallelism_threads(
50 int num_threads) {
51 intra_op_parallelism_threads_ = num_threads;
52 return *this;
53 }
54
intra_op_parallelism_threads() const55 int BackendOptions::intra_op_parallelism_threads() const {
56 return intra_op_parallelism_threads_;
57 }
58
set_allowed_devices(const absl::optional<std::set<int>> & allowed_devices)59 BackendOptions& BackendOptions::set_allowed_devices(
60 const absl::optional<std::set<int>>& allowed_devices) {
61 allowed_devices_ = allowed_devices;
62 return *this;
63 }
64
allowed_devices() const65 const absl::optional<std::set<int>>& BackendOptions::allowed_devices() const {
66 return allowed_devices_;
67 }
68
69 // Define this in .cc file to avoid having to include eigen or forward declare
70 // these types in the header.
71 struct Backend::IntraOpThreadPool {
IntraOpThreadPoolxla::Backend::IntraOpThreadPool72 explicit IntraOpThreadPool(const int num_threads)
73 : pool(new tensorflow::thread::ThreadPool(tensorflow::Env::Default(),
74 "XLAEigen", num_threads)),
75 device(new Eigen::ThreadPoolDevice(pool->AsEigenThreadPool(),
76 pool->NumThreads())) {}
77
78 std::unique_ptr<tensorflow::thread::ThreadPool> pool;
79 std::unique_ptr<Eigen::ThreadPoolDevice> device;
80 };
81
CreateBackend(const BackendOptions & options)82 /* static */ StatusOr<std::unique_ptr<Backend>> Backend::CreateBackend(
83 const BackendOptions& options) {
84 se::Platform* platform = options.platform();
85 TF_ASSIGN_OR_RETURN(auto compiler, Compiler::GetForPlatform(platform));
86 TF_ASSIGN_OR_RETURN(
87 auto stream_executors,
88 PlatformUtil::GetStreamExecutors(platform, options.allowed_devices()));
89 TF_ASSIGN_OR_RETURN(auto transfer_manager,
90 TransferManager::GetForPlatform(platform));
91 TF_ASSIGN_OR_RETURN(auto computation_placer,
92 ComputationPlacer::GetForPlatform(platform));
93 std::unique_ptr<Backend> backend(
94 new Backend(platform, compiler, stream_executors, transfer_manager,
95 computation_placer, options.intra_op_parallelism_threads()));
96 return std::move(backend);
97 }
98
99 /* static */ StatusOr<std::unique_ptr<Backend>>
CreateDefaultBackend()100 Backend::CreateDefaultBackend() {
101 TF_ASSIGN_OR_RETURN(se::Platform * platform,
102 PlatformUtil::GetDefaultPlatform());
103 BackendOptions backend_options;
104 backend_options.set_platform(platform);
105 return CreateBackend(backend_options);
106 }
107
BorrowStream(int device_ordinal)108 StatusOr<StreamPool::Ptr> Backend::BorrowStream(int device_ordinal) {
109 TF_ASSIGN_OR_RETURN(auto executor, stream_executor(device_ordinal));
110 return BorrowStream(executor);
111 }
112
BorrowStream(se::StreamExecutor * executor)113 StatusOr<StreamPool::Ptr> Backend::BorrowStream(se::StreamExecutor* executor) {
114 tensorflow::mutex_lock l(mu_);
115 if (!stream_pools_.contains(executor)) {
116 stream_pools_.emplace(executor, absl::make_unique<StreamPool>());
117 }
118 return stream_pools_.at(executor)->BorrowStream(executor);
119 }
120
Backend(se::Platform * platform,Compiler * compiler,absl::Span<se::StreamExecutor * const> stream_executors,TransferManager * transfer_manager,ComputationPlacer * computation_placer,int intra_op_parallelism_threads)121 Backend::Backend(se::Platform* platform, Compiler* compiler,
122 absl::Span<se::StreamExecutor* const> stream_executors,
123 TransferManager* transfer_manager,
124 ComputationPlacer* computation_placer,
125 int intra_op_parallelism_threads)
126 : platform_(platform),
127 compiler_(compiler),
128 transfer_manager_(transfer_manager),
129 computation_placer_(computation_placer),
130 stream_executors_(stream_executors.begin(), stream_executors.end()) {
131 // Create a memory allocator for the valid stream executors.
132 memory_allocator_ = absl::make_unique<se::StreamExecutorMemoryAllocator>(
133 platform, stream_executors_);
134 CHECK(!stream_executors_.empty())
135 << "Service found no devices for backend " << platform_->Name() << '.';
136
137 if (platform->id() == se::host::kHostPlatformId) {
138 const int num_threads = intra_op_parallelism_threads > 0
139 ? intra_op_parallelism_threads
140 : tensorflow::port::MaxParallelism();
141 intra_op_thread_pool_.reset(new IntraOpThreadPool(num_threads));
142 }
143 }
144
~Backend()145 Backend::~Backend() {}
146
default_device_ordinal() const147 int Backend::default_device_ordinal() const {
148 return default_stream_executor()->device_ordinal();
149 }
150
eigen_intra_op_thread_pool_device() const151 const Eigen::ThreadPoolDevice* Backend::eigen_intra_op_thread_pool_device()
152 const {
153 if (intra_op_thread_pool_ == nullptr) {
154 return nullptr;
155 }
156 return intra_op_thread_pool_->device.get();
157 }
158
eigen_intra_op_thread_pool() const159 tensorflow::thread::ThreadPool* Backend::eigen_intra_op_thread_pool() const {
160 if (intra_op_thread_pool_ == nullptr) {
161 return nullptr;
162 }
163 return intra_op_thread_pool_->pool.get();
164 }
165
stream_executor(int device_ordinal) const166 StatusOr<se::StreamExecutor*> Backend::stream_executor(
167 int device_ordinal) const {
168 if (device_ordinal < 0 ||
169 device_ordinal > stream_executors_.back()->device_ordinal()) {
170 return InvalidArgument(
171 "Invalid device ordinal value (%d). Valid range is [0, %d].",
172 device_ordinal, stream_executors_.back()->device_ordinal());
173 }
174 for (auto* executor : stream_executors_) {
175 if (executor->device_ordinal() == device_ordinal) {
176 return executor;
177 }
178 }
179 return InvalidArgument("device %s not supported by XLA service",
180 device_name(device_ordinal));
181 }
182
devices_equivalent(int device_ordinal_a,int device_ordinal_b)183 StatusOr<bool> Backend::devices_equivalent(int device_ordinal_a,
184 int device_ordinal_b) {
185 // Use the name from device description to determine equivalence. This is a
186 // bit crude but works for GPUs which is the important case where we compile
187 // an executable for one GPU and want to know if it will run (well) on
188 // another.
189 TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_a,
190 stream_executor(device_ordinal_a));
191 TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_b,
192 stream_executor(device_ordinal_b));
193 return (executor_a->GetDeviceDescription().name() ==
194 executor_b->GetDeviceDescription().name());
195 }
196
ResetDevices()197 Status Backend::ResetDevices() {
198 return transfer_manager_->ResetDevices(stream_executors_);
199 }
200
201 } // namespace xla
202