• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "include/backend/distributed/init.h"
18 #include <vector>
19 #include <string>
20 #include <memory>
21 #include "include/backend/distributed/recovery/recovery_context.h"
22 #include "runtime/graph_scheduler/graph_scheduler.h"
23 #include "runtime/graph_scheduler/embedding_cache_scheduler.h"
24 
25 namespace mindspore {
26 namespace distributed {
27 using distributed::recovery::RecoveryContext;
28 
Initialize()29 bool Initialize() {
30   // If this process participates in the cluster building, we need to initialize cluster context.
31   if (common::UseDynamicCluster()) {
32     if (!InitializeCluster()) {
33       MS_LOG(EXCEPTION)
34         << "Failed to initialize distributed job cluster because some processes in the cluster are not successfully "
35            "spawned. You can run command: 'grep -rn -E 'ERROR|CRITICAL' -C 10' in your log directory to filter out "
36            "error info. It may be one of the following reasons:\n1."
37         << kWorkerProcessNotEnoughError << "\n2." << kSchedPortOccupiedError << "\n3."
38         << kSchedWorkerAddrNotConsistentError;
39     }
40   }
41 
42   // Initialize the collective manager regardless of whether the cluster is initialized or not.
43   if (!InitializeCollective()) {
44     MS_LOG(EXCEPTION)
45       << "Failed to initialize collective communication because some processes in the cluster are not successfully "
46          "spawned. You can run command: 'grep -rn -E 'ERROR|CRITICAL' -C 10' in your log directory to filter out error "
47          "info.";
48   }
49 
50   // If this is a scheduler node, it does not need to execute other codes like graph compiling and running. We should
51   // finalize it immediately.
52   if (cluster::ClusterContext::instance()->initialized() &&
53       cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) {
54     MS_LOG(INFO) << "Scheduler starts to wait for cluster to exit.";
55     (void)cluster::ClusterContext::instance()->Finalize(UINT32_MAX);
56     MS_LOG(INFO) << "Scheduler ends waiting for cluster to exit.";
57     exit(0);
58     return true;
59   }
60 
61   MsException::Instance().CheckException();
62   return true;
63 }
64 
Finalize()65 bool Finalize() {
66   if (!FinalizeCollective()) {
67     MS_LOG(ERROR) << "Failed to finalize collective communication.";
68     return false;
69   }
70 
71   if (!FinalizeCluster()) {
72     MS_LOG(ERROR) << "Failed to finalize cluster.";
73     return false;
74   }
75 
76   return true;
77 }
78 
InitializeCluster()79 bool InitializeCluster() {
80   if (!cluster::ClusterContext::instance()->Initialize()) {
81     MS_LOG(ERROR) << "Failed to initialize cluster.";
82     return false;
83   }
84 #if ((defined ENABLE_CPU) && (!defined _WIN32) && !defined(__APPLE__))
85   auto node = cluster::ClusterContext::instance()->node();
86   MS_EXCEPTION_IF_NULL(node);
87 
88   // Set the callback for the cluster node.
89   auto callback = std::make_shared<std::function<void(void)>>([]() {
90     MS_LOG(INFO) << "Callback on exception is called.";
91     if (!collective::CollectiveManager::instance()->Finalize()) {
92       MS_LOG(EXCEPTION) << "Failed to finalize the collective communication lib.";
93     }
94     // Abort graph scheduler to avoid hang in rpc communication.
95     auto &graph_scheduler = runtime::GraphScheduler::GetInstance();
96     if (graph_scheduler.initialized() && graph_scheduler.rpc_node_scheduler() != nullptr) {
97       graph_scheduler.rpc_node_scheduler()->Abort();
98     }
99 
100     MS_LOG(INFO) << "Begin finalize the EmbeddingCacheScheduler.";
101     runtime::EmbeddingCacheScheduler::GetInstance().Finalize(false);
102     MS_LOG(INFO) << "End finalize the EmbeddingCacheScheduler.";
103     // Forcibly Kill this process.
104     (void)kill(getpid(), SIGTERM);
105   });
106   node->set_abnormal_callback(callback);
107 
108   if (cluster::ClusterContext::instance()->initialized() && !collective::CollectiveManager::instance()->initialized()) {
109     // Scheduler don't use collective communication library.
110     const auto &cluster_ctx = cluster::ClusterContext::instance();
111     MS_EXCEPTION_IF_NULL(cluster_ctx);
112     if (cluster_ctx->node_role() != kEnvRoleOfScheduler) {
113       // Global rank id and size should be manually set if cluster is initialized by MindSpore communication framework.
114       collective::CollectiveManager::instance()->set_global_rank_id(node->rank_id());
115       auto global_rank_size = cluster_ctx->node_num(cluster_ctx->node_role());
116       collective::CollectiveManager::instance()->set_global_rank_size(global_rank_size);
117 
118       if (RecoveryContext::GetInstance()->enable_recovery()) {
119         RecoveryContext::GetInstance()->set_global_rank_id(node->rank_id());
120         RecoveryContext::GetInstance()->set_global_rank_size(global_rank_size);
121       }
122     }
123   }
124 #endif
125   return true;
126 }
127 
FinalizeCluster()128 bool FinalizeCluster() { return cluster::ClusterContext::instance()->Finalize(); }
129 
InitializeCollective()130 bool InitializeCollective() {
131   if (collective::CollectiveManager::instance()->initialized()) {
132     return true;
133   }
134   if (cluster::ClusterContext::instance()->initialized() &&
135       cluster::ClusterContext::instance()->node_role() == kEnvRoleOfScheduler) {
136     MS_LOG(INFO) << "Scheduler node does not need to initialize collective communication.";
137     return true;
138   }
139   if (!collective::CollectiveManager::instance()->Initialize()) {
140     return false;
141   }
142 
143   if (RecoveryContext::GetInstance()->enable_recovery()) {
144     RecoveryContext::GetInstance()->ObtainGlobalLatestCkptInfo();
145   }
146   return true;
147 }
148 
FinalizeCollective()149 bool FinalizeCollective() { return collective::CollectiveManager::instance()->Finalize(); }
150 
set_cluster_exit_with_exception()151 void set_cluster_exit_with_exception() { cluster::ClusterContext::instance()->set_cluster_exit_with_exception(); }
152 
cluster_exit_with_exception()153 bool cluster_exit_with_exception() { return cluster::ClusterContext::instance()->cluster_exit_with_exception(); }
154 }  // namespace distributed
155 }  // namespace mindspore
156