1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "include/backend/distributed/init.h" 18 #include <vector> 19 #include <string> 20 #include <memory> 21 #include "include/backend/distributed/recovery/recovery_context.h" 22 #include "runtime/graph_scheduler/graph_scheduler.h" 23 #include "runtime/graph_scheduler/embedding_cache_scheduler.h" 24 25 namespace mindspore { 26 namespace distributed { 27 using distributed::recovery::RecoveryContext; 28 Initialize()29bool Initialize() { 30 // If this process participates in the cluster building, we need to initialize cluster context. 31 if (common::UseDynamicCluster()) { 32 if (!InitializeCluster()) { 33 MS_LOG(EXCEPTION) 34 << "Failed to initialize distributed job cluster because some processes in the cluster are not successfully " 35 "spawned. You can run command: 'grep -rn -E 'ERROR|CRITICAL' -C 10' in your log directory to filter out " 36 "error info. It may be one of the following reasons:\n1." 37 << kWorkerProcessNotEnoughError << "\n2." << kSchedPortOccupiedError << "\n3." 38 << kSchedWorkerAddrNotConsistentError; 39 } 40 } 41 42 // Initialize the collective manager regardless of whether the cluster is initialized or not. 43 if (!InitializeCollective()) { 44 MS_LOG(EXCEPTION) 45 << "Failed to initialize collective communication because some processes in the cluster are not successfully " 46 "spawned. You can run command: 'grep -rn -E 'ERROR|CRITICAL' -C 10' in your log directory to filter out error " 47 "info."; 48 } 49 50 // If this is a scheduler node, it does not need to execute other codes like graph compiling and running. We should 51 // finalize it immediately. 52 if (cluster::ClusterContext::instance()->initialized() && 53 cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) { 54 MS_LOG(INFO) << "Scheduler starts to wait for cluster to exit."; 55 (void)cluster::ClusterContext::instance()->Finalize(UINT32_MAX); 56 MS_LOG(INFO) << "Scheduler ends waiting for cluster to exit."; 57 exit(0); 58 return true; 59 } 60 61 MsException::Instance().CheckException(); 62 return true; 63 } 64 Finalize()65bool Finalize() { 66 if (!FinalizeCollective()) { 67 MS_LOG(ERROR) << "Failed to finalize collective communication."; 68 return false; 69 } 70 71 if (!FinalizeCluster()) { 72 MS_LOG(ERROR) << "Failed to finalize cluster."; 73 return false; 74 } 75 76 return true; 77 } 78 InitializeCluster()79bool InitializeCluster() { 80 if (!cluster::ClusterContext::instance()->Initialize()) { 81 MS_LOG(ERROR) << "Failed to initialize cluster."; 82 return false; 83 } 84 #if ((defined ENABLE_CPU) && (!defined _WIN32) && !defined(__APPLE__)) 85 auto node = cluster::ClusterContext::instance()->node(); 86 MS_EXCEPTION_IF_NULL(node); 87 88 // Set the callback for the cluster node. 89 auto callback = std::make_shared<std::function<void(void)>>([]() { 90 MS_LOG(INFO) << "Callback on exception is called."; 91 if (!collective::CollectiveManager::instance()->Finalize()) { 92 MS_LOG(EXCEPTION) << "Failed to finalize the collective communication lib."; 93 } 94 // Abort graph scheduler to avoid hang in rpc communication. 95 auto &graph_scheduler = runtime::GraphScheduler::GetInstance(); 96 if (graph_scheduler.initialized() && graph_scheduler.rpc_node_scheduler() != nullptr) { 97 graph_scheduler.rpc_node_scheduler()->Abort(); 98 } 99 100 MS_LOG(INFO) << "Begin finalize the EmbeddingCacheScheduler."; 101 runtime::EmbeddingCacheScheduler::GetInstance().Finalize(false); 102 MS_LOG(INFO) << "End finalize the EmbeddingCacheScheduler."; 103 // Forcibly Kill this process. 104 (void)kill(getpid(), SIGTERM); 105 }); 106 node->set_abnormal_callback(callback); 107 108 if (cluster::ClusterContext::instance()->initialized() && !collective::CollectiveManager::instance()->initialized()) { 109 // Scheduler don't use collective communication library. 110 const auto &cluster_ctx = cluster::ClusterContext::instance(); 111 MS_EXCEPTION_IF_NULL(cluster_ctx); 112 if (cluster_ctx->node_role() != kEnvRoleOfScheduler) { 113 // Global rank id and size should be manually set if cluster is initialized by MindSpore communication framework. 114 collective::CollectiveManager::instance()->set_global_rank_id(node->rank_id()); 115 auto global_rank_size = cluster_ctx->node_num(cluster_ctx->node_role()); 116 collective::CollectiveManager::instance()->set_global_rank_size(global_rank_size); 117 118 if (RecoveryContext::GetInstance()->enable_recovery()) { 119 RecoveryContext::GetInstance()->set_global_rank_id(node->rank_id()); 120 RecoveryContext::GetInstance()->set_global_rank_size(global_rank_size); 121 } 122 } 123 } 124 #endif 125 return true; 126 } 127 FinalizeCluster()128bool FinalizeCluster() { return cluster::ClusterContext::instance()->Finalize(); } 129 InitializeCollective()130bool InitializeCollective() { 131 if (collective::CollectiveManager::instance()->initialized()) { 132 return true; 133 } 134 if (cluster::ClusterContext::instance()->initialized() && 135 cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) { 136 MS_LOG(INFO) << "Scheduler node does not need to initialize collective communication."; 137 return true; 138 } 139 if (!collective::CollectiveManager::instance()->Initialize()) { 140 return false; 141 } 142 143 if (RecoveryContext::GetInstance()->enable_recovery()) { 144 RecoveryContext::GetInstance()->ObtainGlobalLatestCkptInfo(); 145 } 146 return true; 147 } 148 FinalizeCollective()149bool FinalizeCollective() { return collective::CollectiveManager::instance()->Finalize(); } 150 set_cluster_exit_with_exception()151void set_cluster_exit_with_exception() { cluster::ClusterContext::instance()->set_cluster_exit_with_exception(); } 152 cluster_exit_with_exception()153bool cluster_exit_with_exception() { return cluster::ClusterContext::instance()->cluster_exit_with_exception(); } 154 } // namespace distributed 155 } // namespace mindspore 156