• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_COLLECTIVE_COLLECTIVE_MANAGER_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_COLLECTIVE_COLLECTIVE_MANAGER_H_
19 
20 #include <string>
21 #include <memory>
22 #include <vector>
23 #include <atomic>
24 #include <unordered_map>
25 #include "utils/ms_utils.h"
26 #include "include/backend/distributed/constants.h"
27 #if defined(__linux__) && defined(WITH_BACKEND)
28 #include "include/backend/distributed/cluster/cluster_context.h"
29 #else
30 #include "include/backend/distributed/cluster/dummy_cluster_context.h"
31 #endif
32 #include "runtime/hardware/device_context_manager.h"
33 #include "include/backend/visible.h"
34 
35 #ifndef EXPORT_WRAPPER
36 #define EXPORT_WRAPPER __attribute__((visibility("default")))
37 #endif
38 namespace mindspore {
39 namespace distributed {
40 namespace collective {
41 using DeviceContext = device::DeviceContext;
42 using DeviceContextKey = device::DeviceContextKey;
43 using DeviceContextManager = device::DeviceContextManager;
44 using CollectiveCommunicationLib = device::CollectiveCommunicationLib;
45 using CommunicationGroupPtr = device::CommunicationGroupPtr;
46 
47 // The collective communication API.
48 // MindSpore uses OpenMPI on CPU, NCCL on GPU, HCCL on Ascend, to achieve distributed training.
49 // Besides, MindSpore also has its own communication library which is implemented on the CPU side.
50 class BACKEND_EXPORT CollectiveManager {
51  public:
52   ~CollectiveManager();
53   DISABLE_COPY_AND_ASSIGN(CollectiveManager);
54   static std::shared_ptr<CollectiveManager> instance();
55 
56   // Initialize the collective communication for distributed training. The backend type is read from MindSpore context.
57   bool Initialize();
58 
59   // Finalize the collective communication.
60   bool Finalize();
61 
62   // Create communication group.
63   bool CreateCommunicationGroup(const std::string &group_name, const std::vector<uint32_t> &group_ranks);
64 
65   // Destroy the communication group.
66   bool DestroyCommunicationGroup(const std::string &group_name);
67 
68   // Get the rank id of this process in the specified group.
69   uint32_t GetRankId(const std::string &group_name);
70 
71   // Get the size of the specified group.
72   uint32_t GetGroupSize(const std::string &group_name);
73 
74   uint32_t GetLocalRankId(const std::string &group_name);
75 
76   uint32_t GetLocalGroupSize(const std::string &group_name);
77 
78   uint32_t GetWorldRankFromGroupRank(const std::string &group_name, uint32_t local_rank);
79 
80   uint32_t GetGroupRankFromWorldRank(uint32_t global_rank, const std::string &group_name);
81 
82   std::vector<uint32_t> GetGroupRanks(const std::string &group_name);
83 
84   // In some cases global rank id and rank size should be set by caller, e.g., when using MindSpore communication
85   // framework, they're generated by cluster::ClusterContext.
86   void set_global_rank_id(uint32_t global_rank_id);
87   void set_global_rank_size(uint32_t global_rank_size);
88 
89   uint32_t global_rank_id() const;
90   uint32_t local_rank_id() const;
91 
need_init()92   bool need_init() const { return need_init_.load(); }
93 
94   // Set whether need reinitialize collective communication.
set_need_reinit(bool need_reinit)95   void set_need_reinit(bool need_reinit) { need_reinit_ = need_reinit; }
96   // Get whether need reinitialize collective communication.
need_reinit()97   bool need_reinit() const { return need_reinit_.load(); }
98 
99   // Return collective manager is initialized.
initialized()100   bool initialized() const { return inited_.load(); }
get_group_map()101   std::unordered_map<std::string, std::vector<uint32_t>> get_group_map() { return group_map_; }
102 
103   // Initialize and finalize Dummy communication lib.
104   bool InitializeDummyCommLib();
105   bool FinalizeDummyCommLib();
106 
107  private:
108   CollectiveManager();
109 
110   // Initialize communication library on host side.
111   bool InitHostCommlib();
112 
113   // Initialize communication library on device side.
114   bool InitDeviceCommLib();
115 
116   // Assign the local rank id for this process.
117   bool AssignLocalRank();
118 
119   // Assign local rank and size for each group in current server.
120   bool GetLocalGroupRankAndSize(const std::vector<uint32_t> &group_ranks, uint32_t *local_group_rank,
121                                 uint32_t *local_group_size);
122 
123   // Create communication group in simulation mode.
124   bool CreateSimulationGroup(const std::string &group_name, const std::vector<uint32_t> &group_ranks);
125 
126   // Get timeout window for communicator initialization.
127   int64_t GetCommunicatorInitTimeout();
128 
129   std::atomic_bool inited_;
130   std::atomic_bool finalized_;
131 
132   // Whether collective communication library should be initialized. This is represents this process is launched as
133   // distributed job.
134   std::atomic_bool need_init_;
135 
136   // Whether need reinitialize collective communication, this value should be set to true once a training process
137   // exits unexpectedly is detected.
138   std::atomic_bool need_reinit_;
139 
140   // The device context on both host and device side. They are used to access the communication library on different
141   // devices.
142   DeviceContext *host_ctx_;
143   DeviceContext *device_ctx_;
144 
145   // Host communication library refers to the communication libaray for CPU, e.g., OpenMPI and MindSpore communication
146   // framework.
147   CollectiveCommunicationLib *host_comm_lib_instance_;
148 
149   // Device communication library refers to the communication libaray for NPU or GPU, e.g., NCCL and HCCL.
150   // When only CPU backend is used, device communication library should not be initialized.
151   CollectiveCommunicationLib *device_comm_lib_instance_;
152 
153   // alias of host_comm_lib_instance_ and device_comm_lib_instance_ to avoid condition branch.
154   CollectiveCommunicationLib *comm_lib_instance_;
155 
156   // Dummy collective communication for single device compile.
157   std::shared_ptr<CollectiveCommunicationLib> dummy_comm_lib_instance_;
158 
159   // The global rank id of this process. Normally this range is 0 to `total process number - 1`.
160   uint32_t global_rank_id_;
161 
162   // The local rank id of this process within the same node. This is usually used as device id.
163   uint32_t local_rank_id_;
164 
165   // The global rank size. Normally this is equal to `total process number`.
166   uint32_t global_rank_size_;
167 
168   // Global group ranks.
169   std::vector<uint32_t> global_group_ranks_;
170 
171   // The global group name on the host side. This is used for Creating global group on host side for AllGather
172   // operation of host name while assigning local rank.
173   std::string host_global_group_name_;
174 
175   // This member represents whether the collective communication library is supported on the device side. If not, the
176   // device side library will be replace by library on the host side.
177   bool device_lib_supported_;
178 
179   // This member represents whether host collective communication is needed. Currently only effects on Ascend, If is
180   // false, it means Ascend use ranktable file.
181   bool need_host_collective_;
182 
183   // This member uses to assign local rank and size for each group.
184   std::vector<size_t> all_host_hashs_;
185   std::unordered_map<std::string, std::vector<uint32_t>> group_map_;
186 };
187 
188 // For scheduler node, CollectiveManager is not initialized. Return 0 as rank id.
189 #define BY_PASS_SCHED_RANK_ID                                                      \
190   do {                                                                             \
191     if (cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) { \
192       return static_cast<uint32_t>(0);                                             \
193     }                                                                              \
194   } while (0)
195 
196 // For scheduler node, CollectiveManager is not initialized. Return 1 as rank size.
197 #define BY_PASS_SCHED_RANK_SIZE                                                    \
198   do {                                                                             \
199     if (cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) { \
200       return static_cast<uint32_t>(1);                                             \
201     }                                                                              \
202   } while (0)
203 }  // namespace collective
204 }  // namespace distributed
205 }  // namespace mindspore
206 #endif  // MINDSPORE_CCSRC_DISTRIBUTED_COLLECTIVE_COLLECTIVE_MANAGER_H_
207