• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ps/core/recovery_base.h"
18 
19 namespace mindspore {
20 namespace ps {
21 namespace core {
Initialize(const std::string & config_json)22 bool RecoveryBase::Initialize(const std::string &config_json) {
23   std::unique_lock<std::mutex> lock(recovery_mtx_);
24   nlohmann::json recovery_config;
25   try {
26     recovery_config = nlohmann::json::parse(config_json);
27   } catch (nlohmann::json::exception &e) {
28     MS_LOG(ERROR) << "Parse the json:" << config_json;
29     return false;
30   }
31 
32   MS_LOG(INFO) << "The node is support recovery.";
33   if (!recovery_config.contains(kStoreType)) {
34     MS_LOG(WARNING) << "The " << kStoreType << " is not existed.";
35     return false;
36   }
37   std::string storage_file_path = "";
38   std::string type = recovery_config.at(kStoreType).dump();
39   if (type == kFileStorage) {
40     storage_type_ = StorageType::kFileStorage;
41 
42     if (!recovery_config.contains(kStoreFilePath)) {
43       MS_LOG(WARNING) << "The " << kStoreFilePath << " is not existed.";
44       return false;
45     }
46     storage_file_path = recovery_config.at(kStoreFilePath);
47     if (storage_file_path == "") {
48       MS_LOG(EXCEPTION) << "If the scheduler support recovery, and if the persistent storage is a file, the path of "
49                            "the file must be configured";
50     }
51     recovery_storage_ = std::make_unique<FileConfiguration>(storage_file_path);
52     MS_EXCEPTION_IF_NULL(recovery_storage_);
53     if (recovery_storage_->Initialize()) {
54       MS_LOG(INFO) << "The storage file path " << storage_file_path << " initialize success.";
55     } else {
56       return false;
57     }
58   }
59 
60   MS_LOG(INFO) << "The storage type is:" << storage_type_ << ", the storage file path is:" << storage_file_path;
61   return true;
62 }
63 
InitializeNodes(const std::string & config_json)64 bool RecoveryBase::InitializeNodes(const std::string &config_json) {
65   nlohmann::json recovery_config;
66   try {
67     recovery_config = nlohmann::json::parse(config_json);
68   } catch (nlohmann::json::exception &e) {
69     MS_LOG(ERROR) << "Parse the json:" << config_json;
70     return false;
71   }
72 
73   if (!recovery_config.contains(kSchedulerStoreFilePath)) {
74     MS_LOG(WARNING) << "The " << kStoreFilePath << " is not existed.";
75     return false;
76   }
77 
78   // this is only for scheduler
79   std::string scheduler_storage_file_path = recovery_config.at(kSchedulerStoreFilePath);
80   if (scheduler_storage_file_path == "") {
81     MS_LOG(WARNING) << "scheduler storage file path is not exist!";
82   }
83   scheduler_recovery_storage_ = std::make_unique<FileConfiguration>(scheduler_storage_file_path);
84   MS_EXCEPTION_IF_NULL(scheduler_recovery_storage_);
85   if (scheduler_recovery_storage_->Initialize()) {
86     MS_LOG(INFO) << "The scheduler storage file path " << scheduler_storage_file_path << " initialize success.";
87   } else {
88     return false;
89   }
90 
91   MS_LOG(INFO) << "the scheduler storage file path is:" << scheduler_storage_file_path;
92   return true;
93 }
94 
Persist(const core::ClusterConfig & clusterConfig)95 void RecoveryBase::Persist(const core::ClusterConfig &clusterConfig) {
96   std::unique_lock<std::mutex> lock(recovery_mtx_);
97   if (recovery_storage_ == nullptr) {
98     MS_LOG(WARNING) << "recovery storage is null, so don't persist meta data";
99     return;
100   }
101   recovery_storage_->PersistFile(clusterConfig);
102 }
103 
PersistNodesInfo(const core::ClusterConfig & clusterConfig)104 void RecoveryBase::PersistNodesInfo(const core::ClusterConfig &clusterConfig) {
105   std::unique_lock<std::mutex> lock(recovery_mtx_);
106   if (scheduler_recovery_storage_ == nullptr) {
107     MS_LOG(WARNING) << "scheduler recovery  storage is null, so don't persist nodes meta data";
108     return;
109   }
110   scheduler_recovery_storage_->PersistNodes(clusterConfig);
111 }
112 }  // namespace core
113 }  // namespace ps
114 }  // namespace mindspore
115