1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15"""Communication management API""" 16from mindspore import context 17from mindspore.parallel._ps_context import _is_role_pserver, _is_role_sched 18from ._comm_helper import Backend, _get_rank_helper, _get_size_helper, \ 19 _get_world_rank_from_group_rank_helper, _get_group_rank_from_world_rank_helper, \ 20 _create_group_helper, _destroy_group_helper, HCCL_WORLD_COMM_GROUP, NCCL_WORLD_COMM_GROUP, \ 21 _get_local_rank_helper, _get_local_size_helper, GlobalComm 22from .._c_expression import init_hccl, finalize_hccl, init_gpu_collective 23 24 25__all__ = ["init", "release", "get_rank", "get_local_rank", "get_group_size", 26 "get_local_rank_size", "get_world_rank_from_group_rank", 27 "get_group_rank_from_world_rank", "create_group", "destroy_group", 28 "HCCL_WORLD_COMM_GROUP", "NCCL_WORLD_COMM_GROUP"] 29 30DEFAULT_WORLD_COMM_GROUP = HCCL_WORLD_COMM_GROUP 31 32 33def _get_group(group): 34 """Return the world communication group if the `group` is `DEFAULT_WORLD_COMM_GROUP`.""" 35 if group == DEFAULT_WORLD_COMM_GROUP: 36 return GlobalComm.WORLD_COMM_GROUP 37 return group 38 39 40def _check_task_sink_envs(): 41 """ 42 Check whether task_sink environment variables have been exported or not. 43 44 return True if task_sink environment variables have been exported, False otherwise. 45 """ 46 import os 47 task_sink = os.getenv("GRAPH_OP_RUN") 48 if task_sink: 49 try: 50 if int(task_sink) == 1: 51 return False 52 except ValueError: 53 return True 54 finally: 55 pass 56 return True 57 58 59def _check_parallel_envs(): 60 """ 61 Check whether parallel environment variables have been exported or not. 62 63 Raises: 64 RuntimeError: If parallel environment variables have not been exported or have been exported to wrong values. 65 """ 66 if not GlobalComm.CHECK_ENVS: 67 return 68 import os 69 rank_id_str = os.getenv("RANK_ID") 70 if not rank_id_str: 71 raise RuntimeError("Environment variables RANK_ID has not been exported") 72 try: 73 int(rank_id_str) 74 except ValueError: 75 print("RANK_ID should be number") 76 finally: 77 pass 78 rank_table_file_str = os.getenv("MINDSPORE_HCCL_CONFIG_PATH") 79 rank_table_file_str_old = os.getenv("RANK_TABLE_FILE") 80 if not rank_table_file_str and not rank_table_file_str_old: 81 raise RuntimeError("Get hccl rank_table_file failed, " 82 "please export MINDSPORE_HCCL_CONFIG_PATH or RANK_TABLE_FILE.") 83 84 85def init(backend_name=None): 86 """ 87 Initialize distributed backend, e.g. HCCL/NCCL, it is required before using the communication service. 88 89 Note: 90 The full name of HCCL is Huawei Collective Communication Library. 91 The full name of NCCL is NVIDIA Collective Communication Library. 92 This method should be used after set_context. 93 94 Args: 95 backend_name (str): Backend, using HCCL/NCCL. If the `backend_name` is None, system will recognize 96 `device_target` by devices. Default: None. 97 98 Raises: 99 TypeError: If `backend_name` is not a string. 100 RuntimeError: If device target is invalid, or backend is invalid, or distributed initialization fails, 101 or the environment variables RANK_ID/MINDSPORE_HCCL_CONFIG_PATH 102 have not been exported when backend is HCCL. 103 104 Examples: 105 >>> from mindspore.communication import init 106 >>> init() 107 """ 108 if _is_role_pserver() or _is_role_sched(): 109 return 110 task_sink = _check_task_sink_envs() 111 device_target = context.get_context("device_target") 112 mode = context.get_context("mode") 113 mpi_init = False 114 if not task_sink and mode == context.GRAPH_MODE: 115 mpi_init = True 116 117 if backend_name is None: 118 if device_target == "Ascend": 119 backend_name = "hccl" 120 elif device_target == "GPU": 121 backend_name = "nccl" 122 else: 123 raise RuntimeError("Device target {} is not supported in parallel initialization, " 124 "please use Ascend or GPU.".format(device_target)) 125 if not isinstance(backend_name, str): 126 raise TypeError("Backend name must be a string, but got {}".format(type(backend_name))) 127 128 if backend_name == "hccl": 129 if device_target != "Ascend": 130 raise RuntimeError("Device target should be 'Ascend' to init hccl, but got {}".format(device_target)) 131 if not mpi_init: 132 _check_parallel_envs() 133 GlobalComm.BACKEND = Backend("hccl") 134 else: 135 GlobalComm.BACKEND = Backend("hccl_mpi") 136 init_hccl() 137 GlobalComm.WORLD_COMM_GROUP = HCCL_WORLD_COMM_GROUP 138 GlobalComm.INITED = True 139 elif backend_name == "nccl": 140 init_gpu_collective() 141 GlobalComm.BACKEND = Backend("nccl") 142 GlobalComm.WORLD_COMM_GROUP = NCCL_WORLD_COMM_GROUP 143 GlobalComm.INITED = True 144 else: 145 raise RuntimeError("Backend name {} is not supported.".format(backend_name)) 146 147 148def release(): 149 """ 150 Release distributed resource. e.g. HCCL/NCCL. 151 152 Note: 153 This method should be used after init(). 154 155 Raises: 156 RuntimeError: If failed to release distributed resource. 157 158 Examples: 159 >>> from mindspore.communication import init, release 160 >>> init() 161 >>> release() 162 """ 163 finalize_hccl() 164 165 166def get_rank(group=GlobalComm.WORLD_COMM_GROUP): 167 """ 168 Get the rank ID for the current device in the specified collective communication group. 169 170 Note: 171 This method should be used after init(). 172 173 Args: 174 group (str): The communication group to work on. Normally, the group should be created by create_group, 175 otherwise, using the default group. Default: WORLD_COMM_GROUP. 176 177 Returns: 178 int, the rank ID of the calling process within the group. 179 180 Raises: 181 TypeError: If group is not a string. 182 ValueError: If backend is invalid. 183 RuntimeError: If HCCL/NCCL is not available. 184 185 >>> from mindspore.communication import init, get_rank 186 >>> init() 187 >>> rank_id = get_rank() 188 >>> print(rank_id) 189 >>> # the result is the rank_id in world_group 190 """ 191 if not isinstance(group, str): 192 raise TypeError("Group name must be a string, but got {}".format(type(group))) 193 return _get_rank_helper(group=_get_group(group), backend=GlobalComm.BACKEND) 194 195 196def get_local_rank(group=GlobalComm.WORLD_COMM_GROUP): 197 """ 198 Gets local rank ID for current device in specified collective communication group. 199 200 Note: 201 GPU version of MindSpore doesn't support this method. 202 This method should be used after init(). 203 204 Args: 205 group (str): The communication group to work on. Normally, the group should be created by create_group, 206 otherwise, using the default group. Default: WORLD_COMM_GROUP. 207 208 Returns: 209 int, the local rank ID of the calling process within the group. 210 211 Raises: 212 TypeError: If group is not a string. 213 ValueError: If backend is invalid. 214 RuntimeError: If HCCL is not available or MindSpore is GPU version. 215 Examples: 216 >>> from mindspore.context import set_context 217 >>> from mindspore.communication.management import init, get_rank, get_local_rank 218 >>> set_context(device_target="Ascend", device_num=16) # 2 server, each server with 8 NPU. 219 >>> init() 220 >>> world_rank = get_rank() # rank_id is 9. 221 >>> local_rank = get_local_rank() 222 >>> print("local_rank is: {}, world_rank is {}".format(local_rank, world_rank)) 223 local_rank is: 1, world_rank is 9 224 """ 225 if not isinstance(group, str): 226 raise TypeError("Group name must be a string, but got {}".format(type(group))) 227 return _get_local_rank_helper(group=_get_group(group), backend=GlobalComm.BACKEND) 228 229 230def get_group_size(group=GlobalComm.WORLD_COMM_GROUP): 231 """ 232 Get the rank size of the specified collective communication group. 233 234 Note: 235 This method should be used after init(). 236 237 Args: 238 group (str): The communication group to work on. Normally, the group should be created by create_group, 239 otherwise, using the default group. Default: WORLD_COMM_GROUP. 240 241 Returns: 242 int, the rank size of the group. 243 244 Raises: 245 TypeError: If group is not a string. 246 ValueError: If backend is invalid. 247 RuntimeError: If HCCL/NCCL is not available. 248 249 Examples: 250 >>> from mindspore.context import set_context 251 >>> from mindspore.communication.management import init, get_group_size 252 >>> set_context(device_target="Ascend", device_num=8) 253 >>> init() 254 >>> group_size = get_group_size() 255 >>> print("group_size is: ", group_size) 256 group_size is: 8 257 """ 258 if not isinstance(group, str): 259 raise TypeError("Group name must be a string, but got {}".format(type(group))) 260 return _get_size_helper(group=_get_group(group), backend=GlobalComm.BACKEND) 261 262 263def get_local_rank_size(group=GlobalComm.WORLD_COMM_GROUP): 264 """ 265 Gets local rank size of the specified collective communication group. 266 267 Note: 268 GPU version of MindSpore doesn't support this method. 269 This method should be used after init(). 270 271 Args: 272 group (str): The communication group to work on. The group is created by create_group 273 or the default world communication group. Default: WORLD_COMM_GROUP. 274 275 Returns: 276 int, the local rank size where the calling process is within the group. 277 278 Raises: 279 TypeError: If group is not a string. 280 ValueError: If backend is invalid. 281 RuntimeError: If HCCL is not available or MindSpore is GPU version. 282 Examples: 283 >>> from mindspore.context import set_context 284 >>> from mindspore.communication.management import init, get_local_rank_size 285 >>> set_context(device_target="Ascend", device_num=16) # 2 server, each server with 8 NPU. 286 >>> init() 287 >>> local_rank_size = get_local_rank_size() 288 >>> print("local_rank_size is: ", local_rank_size) 289 local_rank_size is: 8 290 """ 291 if not isinstance(group, str): 292 raise TypeError("Group name must be a string, but got {}".format(type(group))) 293 return _get_local_size_helper(group=_get_group(group), backend=GlobalComm.BACKEND) 294 295 296def get_world_rank_from_group_rank(group, group_rank_id): 297 """ 298 Get the rank ID in the world communication group corresponding to 299 the rank ID in the specified user communication group. 300 301 Note: 302 GPU version of MindSpore doesn't support this method. 303 The parameter group should not be "hccl_world_group". 304 This method should be used after init(). 305 306 Args: 307 group (str): The communication group to work on. The group is created by create_group. 308 group_rank_id (int): A rank ID in the communication group. 309 310 Returns: 311 int, the rank ID in world communication group. 312 313 Raises: 314 TypeError: If `group_rank_id` is not an integer or the group is not a string. 315 ValueError: If group is 'hccl_world_group' or backend is invalid. 316 RuntimeError: If HCCL is not available or MindSpore is GPU version. 317 318 Examples: 319 >>> from mindspore.context import set_context 320 >>> from mindspore.communication.management import init, create_group, get_world_rank_from_group_rank 321 >>> set_context(device_target="Ascend") 322 >>> init() 323 >>> group = "0-4" 324 >>> rank_ids = [0,4] 325 >>> create_group(group, rank_ids) 326 >>> world_rank_id = get_world_rank_from_group_rank(group, 1) 327 >>> print("world_rank_id is: ", world_rank_id) 328 world_rank_id is: 4 329 """ 330 if not isinstance(group, str): 331 raise TypeError("Group name must be a string, but got {}".format(type(group))) 332 return _get_world_rank_from_group_rank_helper(group=group, group_rank_id=group_rank_id, backend=GlobalComm.BACKEND) 333 334 335def get_group_rank_from_world_rank(world_rank_id, group): 336 """ 337 Get the rank ID in the specified user communication group corresponding to 338 the rank ID in the world communication group. 339 340 Note: 341 GPU version of MindSpore doesn't support this method. 342 The parameter group should not be "hccl_world_group". 343 This method should be used after init(). 344 345 Args: 346 world_rank_id (int): A rank ID in the world communication group. 347 group (str): The communication group to work on. The group is created by create_group. 348 349 Returns: 350 int, the rank ID in the user communication group. 351 352 Raises: 353 TypeError: If world_rank_id is not an integer or the group is not a string. 354 ValueError: If group is 'hccl_world_group' or backend is invalid. 355 RuntimeError: If HCCL is not available or MindSpore is GPU version. 356 357 Examples: 358 >>> from mindspore.context import set_context 359 >>> from mindspore.communication.management import init, create_group, get_group_rank_from_world_rank 360 >>> set_context(device_target="Ascend") 361 >>> init() 362 >>> group = "0-4" 363 >>> rank_ids = [0,4] 364 >>> create_group(group, rank_ids) 365 >>> group_rank_id = get_group_rank_from_world_rank(4, group) 366 >>> print("group_rank_id is: ", group_rank_id) 367 group_rank_id is: 1 368 """ 369 if not isinstance(group, str): 370 raise TypeError("Group name must be a string, but got {}".format(type(group))) 371 return _get_group_rank_from_world_rank_helper(world_rank_id=world_rank_id, group=group, backend=GlobalComm.BACKEND) 372 373 374def create_group(group, rank_ids): 375 """ 376 Create a user collective communication group. 377 378 Note: 379 GPU version of MindSpore doesn't support this method. 380 381 The size of rank_ids should be larger than 1, rank_ids should not have duplicate data. 382 383 This method should be used after init(). 384 385 Only support global single communication group in PyNative mode. 386 387 Args: 388 group (str): The name of the communication group to be created. 389 rank_ids (list): A list of device IDs. 390 391 Raises: 392 TypeError: If group is not a string or `rank_ids` is not a list. 393 ValueError: If `rank_ids` size is not larger than 1, or `rank_ids` has duplicate data, or backend is invalid. 394 RuntimeError: If HCCL is not available or MindSpore is GPU version. 395 396 Examples: 397 >>> from mindspore.context import set_context 398 >>> from mindspore.ops import operations as ops 399 >>> from mindspore.communication.management import init, create_group 400 >>> set_context(device_target="Ascend") 401 >>> init() 402 >>> group = "0-8" 403 >>> rank_ids = [0,8] 404 >>> create_group(group, rank_ids) 405 >>> allreduce = ops.AllReduce(group) 406 """ 407 if not isinstance(group, str): 408 raise TypeError("Group name must be a string, but got {}".format(type(group))) 409 _create_group_helper(group, rank_ids, backend=GlobalComm.BACKEND) 410 411 412def destroy_group(group): 413 """ 414 Destroy the user collective communication group. 415 416 Note: 417 GPU version of MindSpore doesn't support this method. 418 The parameter group should not be "hccl_world_group". 419 This method should be used after init(). 420 421 Args: 422 group (str): The communication group to destroy, the group should be created by create_group. 423 424 Raises: 425 TypeError: If group is not a string. 426 ValueError: If group is "hccl_world_group" or backend is invalid. 427 RuntimeError: If HCCL is not available or MindSpore is GPU version. 428 """ 429 if not isinstance(group, str): 430 raise TypeError("Group name must be a string, but got {}".format(type(group))) 431 _destroy_group_helper(group, backend=GlobalComm.BACKEND) 432