1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_BASE_COLLECTIVE_EXECUTOR_H_ 16 #define TENSORFLOW_CORE_COMMON_RUNTIME_BASE_COLLECTIVE_EXECUTOR_H_ 17 18 #include <memory> 19 #include <string> 20 21 #include "tensorflow/core/common_runtime/buf_rendezvous.h" 22 #include "tensorflow/core/framework/collective.h" 23 #include "tensorflow/core/framework/device_attributes.pb.h" 24 25 namespace tensorflow { 26 class CollectiveImplementation; 27 class DeviceMgr; 28 class Device; 29 30 // Helper interface that aliases regular subfields of a Tensor as separate 31 // Tensors for in-place update. 32 class CollectiveAdapter { 33 public: ~CollectiveAdapter()34 virtual ~CollectiveAdapter() {} 35 36 // Move the backing tensor to 'output' with its original storage and 37 // shape. After this call this CollectiveAdapter object should be 38 // deleted immediately without calling any of its other methods. 39 virtual void ConsumeFinalValue(Tensor* output) = 0; 40 41 // const access to entire intermediate value for debugging 42 virtual const Tensor& Value() const = 0; 43 44 // Returns tensor for chunk i which aliases the backing buffer. 45 virtual Tensor ChunkAlias(int i) = 0; 46 47 // Returns tensor allocated on the same device but with its own 48 // separate backing buffer. Will have same type and size as 49 // chunk i. 50 virtual Tensor TempChunk(int i) const = 0; 51 52 // Bytes in chunk i 53 virtual int64 ChunkBytes(int i) const = 0; 54 55 // Generate a CPU RAM scalar tensor of the same DataType as the 56 // backing tensor with the given integer value. 57 virtual Tensor Scalar(int v) const = 0; 58 59 // Generate a scalar tensor of same DataType and on the same device 60 // as the backing tensor. 61 virtual Tensor Scalar(Allocator* a) const = 0; 62 63 // Debugging string describing buffer location 64 virtual string TBounds(const Tensor& t) const = 0; 65 66 virtual string DebugString() const = 0; 67 68 // Computes the number of elements per alias chunk tensor. 69 // 70 // A CHECK in tensor.cc expects that the memory buffer backing a 71 // Tensor will be aligned according to EIGEN_MAX_ALIGN_BYTES. To 72 // ensure that all chunk aliasing Tensors maintain this alignment we 73 // need to pick a chunk size that preserves it. Note than in extreme 74 // cases (impractical, but possible with very small tensors) one or 75 // more tail chunks can end up emptby. 76 static int64 AlignedChunkElts(int64 elt_bytes, int64 total_elts, 77 int64 num_chunks); 78 }; 79 80 // Create a CollectiveAdaptor wrapping 'output', specialized to its 81 // data-type and shape. If align_chunks == true then chunk size may 82 // be larger than output->NumElements() / num_chunks and one or more 83 // of the suffix chunks may be empty. Chunks will be arranged to start 84 // and end on alignment boundaries. If align_chunks == false then 85 // output->NumElements() % num_chunks must be 0 and all chunks will 86 // have exactly the same size, ignoring alignment issues. 87 CollectiveAdapter* MakeCollectiveAdapter(Tensor* output, int num_chunks, 88 Allocator* allocator, 89 bool align_chunks = true); 90 91 // Default implementation of CollectiveExecutor. Delegates the actual 92 // work of moving data to a class specialized for the operation type, 93 // arguments and device+interconnect topology. 94 class BaseCollectiveExecutor : public CollectiveExecutor { 95 public: BaseCollectiveExecutor(CollectiveExecutorMgrInterface * cem,PerStepCollectiveRemoteAccess * remote_access,int64 step_id,const DeviceMgr * dev_mgr,const string * gpu_ring_order)96 BaseCollectiveExecutor(CollectiveExecutorMgrInterface* cem, 97 PerStepCollectiveRemoteAccess* remote_access, 98 int64 step_id, const DeviceMgr* dev_mgr, 99 const string* gpu_ring_order) 100 : CollectiveExecutor(cem), 101 step_id_(step_id), 102 dev_mgr_(dev_mgr), 103 remote_access_(remote_access), 104 gpu_ring_order_(gpu_ring_order) {} 105 106 ~BaseCollectiveExecutor() override; 107 108 void StartAbort(const Status& s) override; 109 110 void ExecuteAsync(OpKernelContext* ctx, const CollectiveParams& col_params, 111 const string& exec_key, StatusCallback done) override; 112 113 void CompleteParamsAsync(const string& device, CollectiveParams* cp, 114 CancellationManager* cancel_mgr, 115 StatusCallback done) override; 116 remote_access()117 PerStepCollectiveRemoteAccess* remote_access() override { 118 return remote_access_.get(); 119 } 120 RecvFromPeer(const string & peer_device,const string & peer_task,bool peer_is_local,const string & key,Device * to_device,DeviceContext * to_device_ctx,const AllocatorAttributes & to_alloc_attr,Tensor * to_tensor,const DeviceLocality & client_locality,int stream_index,const StatusCallback & done)121 void RecvFromPeer(const string& peer_device, const string& peer_task, 122 bool peer_is_local, const string& key, Device* to_device, 123 DeviceContext* to_device_ctx, 124 const AllocatorAttributes& to_alloc_attr, Tensor* to_tensor, 125 const DeviceLocality& client_locality, int stream_index, 126 const StatusCallback& done) override { 127 remote_access_->RecvFromPeer( 128 peer_device, peer_task, peer_is_local, key, to_device, to_device_ctx, 129 to_alloc_attr, to_tensor, client_locality, stream_index, done); 130 } 131 PostToPeer(const string & peer_device,const string & peer_task,const string & key,Device * from_device,DeviceContext * from_device_ctx,const AllocatorAttributes & from_alloc_attr,const Tensor * from_tensor,const DeviceLocality & client_locality,const StatusCallback & done)132 void PostToPeer(const string& peer_device, const string& peer_task, 133 const string& key, Device* from_device, 134 DeviceContext* from_device_ctx, 135 const AllocatorAttributes& from_alloc_attr, 136 const Tensor* from_tensor, 137 const DeviceLocality& client_locality, 138 const StatusCallback& done) override { 139 remote_access_->PostToPeer(peer_device, peer_task, key, from_device, 140 from_device_ctx, from_alloc_attr, from_tensor, 141 client_locality, done); 142 } 143 144 // If we need to enforce an ordering on any portion of collective 145 // implementation, and the ordering is encoded via attribute on the collective 146 // op, this function will block until all dependencies for this collective 147 // have completed. 148 void WaitForDependencies(const CollectiveParams& col_params) override; 149 // Record that this collective has completed the portion of the implementation 150 // that needs to be ordered wrt other collectives, to unblock any of its 151 // dependent ops. 152 void Launched(const CollectiveParams& col_params) override; 153 154 protected: 155 const int64 step_id_; 156 const DeviceMgr* dev_mgr_; // Not owned. 157 std::unique_ptr<PerStepCollectiveRemoteAccess> remote_access_; 158 const string* gpu_ring_order_; // Not owned. 159 mutex launch_mu_; 160 condition_variable launch_cv_; 161 // collective instance key -> number of local devices for which NCCL ops have 162 // been launched. 163 std::unordered_map<int32, int32> launched_ GUARDED_BY(launch_mu_); 164 165 private: 166 Status CreateCollective(const CollectiveParams& col_params, 167 CollectiveImplementationInterface** col_impl); 168 // Check if all ops on which this collective depends on have launched. 169 bool CheckDependencies(const CollectiveParams& col_params) 170 EXCLUSIVE_LOCKS_REQUIRED(launch_mu_); 171 }; 172 173 } // namespace tensorflow 174 #endif // TENSORFLOW_CORE_COMMON_RUNTIME_BASE_COLLECTIVE_EXECUTOR_H_ 175