• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #define EIGEN_USE_THREADS
17 
18 #include "tensorflow/compiler/xla/service/backend.h"
19 
20 #include <algorithm>
21 #include <string>
22 #include <utility>
23 
24 #include "absl/memory/memory.h"
25 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
26 #include "tensorflow/compiler/xla/service/compiler.h"
27 #include "tensorflow/compiler/xla/service/platform_util.h"
28 #include "tensorflow/compiler/xla/status_macros.h"
29 #include "tensorflow/compiler/xla/statusor.h"
30 #include "tensorflow/compiler/xla/types.h"
31 #include "tensorflow/compiler/xla/util.h"
32 #include "tensorflow/core/lib/core/errors.h"
33 #include "tensorflow/core/lib/core/threadpool.h"
34 #include "tensorflow/core/platform/byte_order.h"
35 #include "tensorflow/core/platform/cpu_info.h"
36 #include "tensorflow/core/platform/env.h"
37 #include "tensorflow/core/platform/logging.h"
38 #include "tensorflow/core/platform/stream_executor_no_cuda.h"
39 
40 namespace xla {
41 
set_platform(se::Platform * platform)42 BackendOptions& BackendOptions::set_platform(se::Platform* platform) {
43   platform_ = platform;
44   return *this;
45 }
46 
platform() const47 se::Platform* BackendOptions::platform() const { return platform_; }
48 
set_intra_op_parallelism_threads(int num_threads)49 BackendOptions& BackendOptions::set_intra_op_parallelism_threads(
50     int num_threads) {
51   intra_op_parallelism_threads_ = num_threads;
52   return *this;
53 }
54 
intra_op_parallelism_threads() const55 int BackendOptions::intra_op_parallelism_threads() const {
56   return intra_op_parallelism_threads_;
57 }
58 
set_allowed_devices(const absl::optional<std::set<int>> & allowed_devices)59 BackendOptions& BackendOptions::set_allowed_devices(
60     const absl::optional<std::set<int>>& allowed_devices) {
61   allowed_devices_ = allowed_devices;
62   return *this;
63 }
64 
allowed_devices() const65 const absl::optional<std::set<int>>& BackendOptions::allowed_devices() const {
66   return allowed_devices_;
67 }
68 
69 // Define this in .cc file to avoid having to include eigen or forward declare
70 // these types in the header.
71 struct Backend::IntraOpThreadPool {
IntraOpThreadPoolxla::Backend::IntraOpThreadPool72   explicit IntraOpThreadPool(const int num_threads)
73       : pool(new tensorflow::thread::ThreadPool(tensorflow::Env::Default(),
74                                                 "XLAEigen", num_threads)),
75         device(new Eigen::ThreadPoolDevice(pool->AsEigenThreadPool(),
76                                            pool->NumThreads())) {}
77 
78   std::unique_ptr<tensorflow::thread::ThreadPool> pool;
79   std::unique_ptr<Eigen::ThreadPoolDevice> device;
80 };
81 
CreateBackend(const BackendOptions & options)82 /* static */ StatusOr<std::unique_ptr<Backend>> Backend::CreateBackend(
83     const BackendOptions& options) {
84   se::Platform* platform = options.platform();
85   TF_ASSIGN_OR_RETURN(auto compiler, Compiler::GetForPlatform(platform));
86   TF_ASSIGN_OR_RETURN(
87       auto stream_executors,
88       PlatformUtil::GetStreamExecutors(platform, options.allowed_devices()));
89   TF_ASSIGN_OR_RETURN(auto transfer_manager,
90                       TransferManager::GetForPlatform(platform));
91   TF_ASSIGN_OR_RETURN(auto computation_placer,
92                       ComputationPlacer::GetForPlatform(platform));
93   std::unique_ptr<Backend> backend(
94       new Backend(platform, compiler, stream_executors, transfer_manager,
95                   computation_placer, options.intra_op_parallelism_threads()));
96   return std::move(backend);
97 }
98 
99 /* static */ StatusOr<std::unique_ptr<Backend>>
CreateDefaultBackend()100 Backend::CreateDefaultBackend() {
101   TF_ASSIGN_OR_RETURN(se::Platform * platform,
102                       PlatformUtil::GetDefaultPlatform());
103   BackendOptions backend_options;
104   backend_options.set_platform(platform);
105   return CreateBackend(backend_options);
106 }
107 
BorrowStream(int device_ordinal)108 StatusOr<StreamPool::Ptr> Backend::BorrowStream(int device_ordinal) {
109   TF_ASSIGN_OR_RETURN(auto executor, stream_executor(device_ordinal));
110   return BorrowStream(executor);
111 }
112 
BorrowStream(se::StreamExecutor * executor)113 StatusOr<StreamPool::Ptr> Backend::BorrowStream(se::StreamExecutor* executor) {
114   tensorflow::mutex_lock l(mu_);
115   if (!stream_pools_.contains(executor)) {
116     stream_pools_.emplace(executor, absl::make_unique<StreamPool>());
117   }
118   return stream_pools_.at(executor)->BorrowStream(executor);
119 }
120 
Backend(se::Platform * platform,Compiler * compiler,absl::Span<se::StreamExecutor * const> stream_executors,TransferManager * transfer_manager,ComputationPlacer * computation_placer,int intra_op_parallelism_threads)121 Backend::Backend(se::Platform* platform, Compiler* compiler,
122                  absl::Span<se::StreamExecutor* const> stream_executors,
123                  TransferManager* transfer_manager,
124                  ComputationPlacer* computation_placer,
125                  int intra_op_parallelism_threads)
126     : platform_(platform),
127       compiler_(compiler),
128       transfer_manager_(transfer_manager),
129       computation_placer_(computation_placer),
130       stream_executors_(stream_executors.begin(), stream_executors.end()) {
131   // Create a memory allocator for the valid stream executors.
132   memory_allocator_ = absl::make_unique<se::StreamExecutorMemoryAllocator>(
133       platform, stream_executors_);
134   CHECK(!stream_executors_.empty())
135       << "Service found no devices for backend " << platform_->Name() << '.';
136 
137   if (platform->id() == se::host::kHostPlatformId) {
138     const int num_threads = intra_op_parallelism_threads > 0
139                                 ? intra_op_parallelism_threads
140                                 : tensorflow::port::MaxParallelism();
141     intra_op_thread_pool_.reset(new IntraOpThreadPool(num_threads));
142   }
143 }
144 
~Backend()145 Backend::~Backend() {}
146 
default_device_ordinal() const147 int Backend::default_device_ordinal() const {
148   return default_stream_executor()->device_ordinal();
149 }
150 
eigen_intra_op_thread_pool_device() const151 const Eigen::ThreadPoolDevice* Backend::eigen_intra_op_thread_pool_device()
152     const {
153   if (intra_op_thread_pool_ == nullptr) {
154     return nullptr;
155   }
156   return intra_op_thread_pool_->device.get();
157 }
158 
eigen_intra_op_thread_pool() const159 tensorflow::thread::ThreadPool* Backend::eigen_intra_op_thread_pool() const {
160   if (intra_op_thread_pool_ == nullptr) {
161     return nullptr;
162   }
163   return intra_op_thread_pool_->pool.get();
164 }
165 
stream_executor(int device_ordinal) const166 StatusOr<se::StreamExecutor*> Backend::stream_executor(
167     int device_ordinal) const {
168   if (device_ordinal < 0 ||
169       device_ordinal > stream_executors_.back()->device_ordinal()) {
170     return InvalidArgument(
171         "Invalid device ordinal value (%d). Valid range is [0, %d].",
172         device_ordinal, stream_executors_.back()->device_ordinal());
173   }
174   for (auto* executor : stream_executors_) {
175     if (executor->device_ordinal() == device_ordinal) {
176       return executor;
177     }
178   }
179   return InvalidArgument("device %s not supported by XLA service",
180                          device_name(device_ordinal));
181 }
182 
devices_equivalent(int device_ordinal_a,int device_ordinal_b)183 StatusOr<bool> Backend::devices_equivalent(int device_ordinal_a,
184                                            int device_ordinal_b) {
185   // Use the name from device description to determine equivalence. This is a
186   // bit crude but works for GPUs which is the important case where we compile
187   // an executable for one GPU and want to know if it will run (well) on
188   // another.
189   TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_a,
190                       stream_executor(device_ordinal_a));
191   TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_b,
192                       stream_executor(device_ordinal_b));
193   return (executor_a->GetDeviceDescription().name() ==
194           executor_b->GetDeviceDescription().name());
195 }
196 
ResetDevices()197 Status Backend::ResetDevices() {
198   return transfer_manager_->ResetDevices(stream_executors_);
199 }
200 
201 }  // namespace xla
202