1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_COLLECTIVE_COLLECTIVE_MANAGER_H_ 18 #define MINDSPORE_CCSRC_DISTRIBUTED_COLLECTIVE_COLLECTIVE_MANAGER_H_ 19 20 #include <string> 21 #include <memory> 22 #include <vector> 23 #include <atomic> 24 #include <unordered_map> 25 #include "utils/ms_utils.h" 26 #include "include/backend/distributed/constants.h" 27 #if defined(__linux__) && defined(WITH_BACKEND) 28 #include "include/backend/distributed/cluster/cluster_context.h" 29 #else 30 #include "include/backend/distributed/cluster/dummy_cluster_context.h" 31 #endif 32 #include "runtime/hardware/device_context_manager.h" 33 #include "include/backend/visible.h" 34 35 #ifndef EXPORT_WRAPPER 36 #define EXPORT_WRAPPER __attribute__((visibility("default"))) 37 #endif 38 namespace mindspore { 39 namespace distributed { 40 namespace collective { 41 using DeviceContext = device::DeviceContext; 42 using DeviceContextKey = device::DeviceContextKey; 43 using DeviceContextManager = device::DeviceContextManager; 44 using CollectiveCommunicationLib = device::CollectiveCommunicationLib; 45 using CommunicationGroupPtr = device::CommunicationGroupPtr; 46 47 // The collective communication API. 48 // MindSpore uses OpenMPI on CPU, NCCL on GPU, HCCL on Ascend, to achieve distributed training. 49 // Besides, MindSpore also has its own communication library which is implemented on the CPU side. 50 class BACKEND_EXPORT CollectiveManager { 51 public: 52 ~CollectiveManager(); 53 DISABLE_COPY_AND_ASSIGN(CollectiveManager); 54 static std::shared_ptr<CollectiveManager> instance(); 55 56 // Initialize the collective communication for distributed training. The backend type is read from MindSpore context. 57 bool Initialize(); 58 59 // Finalize the collective communication. 60 bool Finalize(); 61 62 // Create communication group. 63 bool CreateCommunicationGroup(const std::string &group_name, const std::vector<uint32_t> &group_ranks); 64 65 // Destroy the communication group. 66 bool DestroyCommunicationGroup(const std::string &group_name); 67 68 // Get the rank id of this process in the specified group. 69 uint32_t GetRankId(const std::string &group_name); 70 71 // Get the size of the specified group. 72 uint32_t GetGroupSize(const std::string &group_name); 73 74 uint32_t GetLocalRankId(const std::string &group_name); 75 76 uint32_t GetLocalGroupSize(const std::string &group_name); 77 78 uint32_t GetWorldRankFromGroupRank(const std::string &group_name, uint32_t local_rank); 79 80 uint32_t GetGroupRankFromWorldRank(uint32_t global_rank, const std::string &group_name); 81 82 std::vector<uint32_t> GetGroupRanks(const std::string &group_name); 83 84 // In some cases global rank id and rank size should be set by caller, e.g., when using MindSpore communication 85 // framework, they're generated by cluster::ClusterContext. 86 void set_global_rank_id(uint32_t global_rank_id); 87 void set_global_rank_size(uint32_t global_rank_size); 88 89 uint32_t global_rank_id() const; 90 uint32_t local_rank_id() const; 91 need_init()92 bool need_init() const { return need_init_.load(); } 93 94 // Set whether need reinitialize collective communication. set_need_reinit(bool need_reinit)95 void set_need_reinit(bool need_reinit) { need_reinit_ = need_reinit; } 96 // Get whether need reinitialize collective communication. need_reinit()97 bool need_reinit() const { return need_reinit_.load(); } 98 99 // Return collective manager is initialized. initialized()100 bool initialized() const { return inited_.load(); } get_group_map()101 std::unordered_map<std::string, std::vector<uint32_t>> get_group_map() { return group_map_; } 102 103 // Initialize and finalize Dummy communication lib. 104 bool InitializeDummyCommLib(); 105 bool FinalizeDummyCommLib(); 106 107 private: 108 CollectiveManager(); 109 110 // Initialize communication library on host side. 111 bool InitHostCommlib(); 112 113 // Initialize communication library on device side. 114 bool InitDeviceCommLib(); 115 116 // Assign the local rank id for this process. 117 bool AssignLocalRank(); 118 119 // Assign local rank and size for each group in current server. 120 bool GetLocalGroupRankAndSize(const std::vector<uint32_t> &group_ranks, uint32_t *local_group_rank, 121 uint32_t *local_group_size); 122 123 // Create communication group in simulation mode. 124 bool CreateSimulationGroup(const std::string &group_name, const std::vector<uint32_t> &group_ranks); 125 126 // Get timeout window for communicator initialization. 127 int64_t GetCommunicatorInitTimeout(); 128 129 std::atomic_bool inited_; 130 std::atomic_bool finalized_; 131 132 // Whether collective communication library should be initialized. This is represents this process is launched as 133 // distributed job. 134 std::atomic_bool need_init_; 135 136 // Whether need reinitialize collective communication, this value should be set to true once a training process 137 // exits unexpectedly is detected. 138 std::atomic_bool need_reinit_; 139 140 // The device context on both host and device side. They are used to access the communication library on different 141 // devices. 142 DeviceContext *host_ctx_; 143 DeviceContext *device_ctx_; 144 145 // Host communication library refers to the communication libaray for CPU, e.g., OpenMPI and MindSpore communication 146 // framework. 147 CollectiveCommunicationLib *host_comm_lib_instance_; 148 149 // Device communication library refers to the communication libaray for NPU or GPU, e.g., NCCL and HCCL. 150 // When only CPU backend is used, device communication library should not be initialized. 151 CollectiveCommunicationLib *device_comm_lib_instance_; 152 153 // alias of host_comm_lib_instance_ and device_comm_lib_instance_ to avoid condition branch. 154 CollectiveCommunicationLib *comm_lib_instance_; 155 156 // Dummy collective communication for single device compile. 157 std::shared_ptr<CollectiveCommunicationLib> dummy_comm_lib_instance_; 158 159 // The global rank id of this process. Normally this range is 0 to `total process number - 1`. 160 uint32_t global_rank_id_; 161 162 // The local rank id of this process within the same node. This is usually used as device id. 163 uint32_t local_rank_id_; 164 165 // The global rank size. Normally this is equal to `total process number`. 166 uint32_t global_rank_size_; 167 168 // Global group ranks. 169 std::vector<uint32_t> global_group_ranks_; 170 171 // The global group name on the host side. This is used for Creating global group on host side for AllGather 172 // operation of host name while assigning local rank. 173 std::string host_global_group_name_; 174 175 // This member represents whether the collective communication library is supported on the device side. If not, the 176 // device side library will be replace by library on the host side. 177 bool device_lib_supported_; 178 179 // This member represents whether host collective communication is needed. Currently only effects on Ascend, If is 180 // false, it means Ascend use ranktable file. 181 bool need_host_collective_; 182 183 // This member uses to assign local rank and size for each group. 184 std::vector<size_t> all_host_hashs_; 185 std::unordered_map<std::string, std::vector<uint32_t>> group_map_; 186 }; 187 188 // For scheduler node, CollectiveManager is not initialized. Return 0 as rank id. 189 #define BY_PASS_SCHED_RANK_ID \ 190 do { \ 191 if (cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) { \ 192 return static_cast<uint32_t>(0); \ 193 } \ 194 } while (0) 195 196 // For scheduler node, CollectiveManager is not initialized. Return 1 as rank size. 197 #define BY_PASS_SCHED_RANK_SIZE \ 198 do { \ 199 if (cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) { \ 200 return static_cast<uint32_t>(1); \ 201 } \ 202 } while (0) 203 } // namespace collective 204 } // namespace distributed 205 } // namespace mindspore 206 #endif // MINDSPORE_CCSRC_DISTRIBUTED_COLLECTIVE_COLLECTIVE_MANAGER_H_ 207