1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_PS_CORE_FOLLOWER_SCALER_H_ 18 #define MINDSPORE_CCSRC_PS_CORE_FOLLOWER_SCALER_H_ 19 20 #include <map> 21 #include <mutex> 22 #include <atomic> 23 #include <memory> 24 #include <string> 25 #include <thread> 26 #include <functional> 27 #include <condition_variable> 28 #include "utils/log_adapter.h" 29 #include "ps/core/abstract_node.h" 30 31 namespace mindspore { 32 namespace ps { 33 namespace core { 34 class AbstractNode; 35 // Scaling state machine: kNormal->kPreparing->kWaiting->kScaling->kNormal 36 enum class NodeScaleState { 37 // This state means the server/worker node is not involved with scaling operations. 38 kNormal, 39 // This state means the server/worker node is preparing for scaling. The barriers will be called when 40 // server/worker node is in this state. 41 kPreparing, 42 // After barriers complete, the server/worker node switches into this state. This means this node is ready for 43 // scaling. When in this state, server/worker node is in safemode. 44 kWaiting, 45 // Server/worker node will switch to this state after scheduler's scaling out/in operation is done. 46 // When in this state, server/worker node can't send/receive messages. 47 kScaling 48 }; 49 50 // The class helps worker/server node to elastic scale while running a training job. In this class, the scaling events 51 // are triggered by scheduler and caught by worker/server. 52 53 // Modules which are involved with elastic scaling should register handlers to this class. After scheduler receives 54 // elastic scaling messages from user or cluster manager, it triggers events and the handlers will be called so that 55 // every module's consistency is guaranteed. 56 class FollowerScaler { 57 public: 58 explicit FollowerScaler(AbstractNode *node); 59 ~FollowerScaler(); 60 61 // The methods called after the events READY_FOR_SCALE_OUT/READY_FOR_SCALE_IN are triggered. 62 void ProcessBeforeScaleOut(); 63 void ProcessBeforeScaleIn(); 64 65 // The methods called after the events CLUSTER_SCALE_OUT_DONE/CLUSTER_SCALE_IN_DONE are triggered. 66 void ProcessAfterScaleOut(); 67 void ProcessAfterScaleIn(); 68 69 void RegisterBarrierBeforeScaleOut(const std::string &module, const BarrierBeforeScaleOut &barrier); 70 void RegisterBarrierBeforeScaleIn(const std::string &module, const BarrierBeforeScaleIn &barrier); 71 void RegisterHandlerAfterScaleOut(const std::string &module, const HandlerAfterScaleOut &handler); 72 void RegisterHandlerAfterScaleIn(const std::string &module, const HandlerAfterScaleIn &handler); 73 74 // Register the scaling event callbacks to the node. 75 void RegisterScaleEventCallbacks(); 76 77 private: 78 AbstractNode *node_; 79 80 std::atomic<NodeScaleState> scaling_state_; 81 82 // Callbacks for scaling events should not be blocked so we notify a thread to call 83 // barriers(barriers_before_scale_out_/barriers_before_scale_in_) or 84 // handlers(handlers_after_scale_out_/handlers_after_scale_in_). 85 std::atomic_bool running_; 86 std::thread process_before_scale_out_thread_; 87 std::thread process_before_scale_in_thread_; 88 std::thread process_after_scale_out_thread_; 89 std::thread process_after_scale_in_thread_; 90 91 // Variables for signals of scaling out/in operations. 92 std::mutex scale_out_mtx_; 93 std::mutex scale_in_mtx_; 94 std::condition_variable scale_out_cv_; 95 std::condition_variable scale_in_cv_; 96 97 // Barriers and handlers for scale out/in events. 98 std::map<std::string, BarrierBeforeScaleOut> barriers_before_scale_out_; 99 std::map<std::string, BarrierBeforeScaleIn> barriers_before_scale_in_; 100 std::map<std::string, HandlerAfterScaleOut> handlers_after_scale_out_; 101 std::map<std::string, HandlerAfterScaleIn> handlers_after_scale_in_; 102 103 std::function<void(void)> ready_for_scale_out_event_callback_; 104 std::function<void(void)> ready_for_scale_in_event_callback_; 105 std::function<void(void)> scale_out_done_event_callback_; 106 std::function<void(void)> scale_in_done_event_callback_; 107 }; 108 } // namespace core 109 } // namespace ps 110 } // namespace mindspore 111 #endif // MINDSPORE_CCSRC_PS_CORE_FOLLOWER_SCALER_H_ 112