• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_PS_CORE_FOLLOWER_SCALER_H_
18 #define MINDSPORE_CCSRC_PS_CORE_FOLLOWER_SCALER_H_
19 
20 #include <map>
21 #include <mutex>
22 #include <atomic>
23 #include <memory>
24 #include <string>
25 #include <thread>
26 #include <functional>
27 #include <condition_variable>
28 #include "utils/log_adapter.h"
29 #include "ps/core/abstract_node.h"
30 
31 namespace mindspore {
32 namespace ps {
33 namespace core {
34 class AbstractNode;
35 // Scaling state machine: kNormal->kPreparing->kWaiting->kScaling->kNormal
36 enum class NodeScaleState {
37   // This state means the server/worker node is not involved with scaling operations.
38   kNormal,
39   // This state means the server/worker node is preparing for scaling. The barriers will be called when
40   // server/worker node is in this state.
41   kPreparing,
42   // After barriers complete, the server/worker node switches into this state. This means this node is ready for
43   // scaling. When in this state, server/worker node is in safemode.
44   kWaiting,
45   // Server/worker node will switch to this state after scheduler's scaling out/in operation is done.
46   // When in this state, server/worker node can't send/receive messages.
47   kScaling
48 };
49 
50 // The class helps worker/server node to elastic scale while running a training job. In this class, the scaling events
51 // are triggered by scheduler and caught by worker/server.
52 
53 // Modules which are involved with elastic scaling should register handlers to this class. After scheduler receives
54 // elastic scaling messages from user or cluster manager, it triggers events and the handlers will be called so that
55 // every module's consistency is guaranteed.
56 class FollowerScaler {
57  public:
58   explicit FollowerScaler(AbstractNode *node);
59   ~FollowerScaler();
60 
61   // The methods called after the events READY_FOR_SCALE_OUT/READY_FOR_SCALE_IN are triggered.
62   void ProcessBeforeScaleOut();
63   void ProcessBeforeScaleIn();
64 
65   // The methods called after the events CLUSTER_SCALE_OUT_DONE/CLUSTER_SCALE_IN_DONE are triggered.
66   void ProcessAfterScaleOut();
67   void ProcessAfterScaleIn();
68 
69   void RegisterBarrierBeforeScaleOut(const std::string &module, const BarrierBeforeScaleOut &barrier);
70   void RegisterBarrierBeforeScaleIn(const std::string &module, const BarrierBeforeScaleIn &barrier);
71   void RegisterHandlerAfterScaleOut(const std::string &module, const HandlerAfterScaleOut &handler);
72   void RegisterHandlerAfterScaleIn(const std::string &module, const HandlerAfterScaleIn &handler);
73 
74   // Register the scaling event callbacks to the node.
75   void RegisterScaleEventCallbacks();
76 
77  private:
78   AbstractNode *node_;
79 
80   std::atomic<NodeScaleState> scaling_state_;
81 
82   // Callbacks for scaling events should not be blocked so we notify a thread to call
83   // barriers(barriers_before_scale_out_/barriers_before_scale_in_) or
84   // handlers(handlers_after_scale_out_/handlers_after_scale_in_).
85   std::atomic_bool running_;
86   std::thread process_before_scale_out_thread_;
87   std::thread process_before_scale_in_thread_;
88   std::thread process_after_scale_out_thread_;
89   std::thread process_after_scale_in_thread_;
90 
91   // Variables for signals of scaling out/in operations.
92   std::mutex scale_out_mtx_;
93   std::mutex scale_in_mtx_;
94   std::condition_variable scale_out_cv_;
95   std::condition_variable scale_in_cv_;
96 
97   // Barriers and handlers for scale out/in events.
98   std::map<std::string, BarrierBeforeScaleOut> barriers_before_scale_out_;
99   std::map<std::string, BarrierBeforeScaleIn> barriers_before_scale_in_;
100   std::map<std::string, HandlerAfterScaleOut> handlers_after_scale_out_;
101   std::map<std::string, HandlerAfterScaleIn> handlers_after_scale_in_;
102 
103   std::function<void(void)> ready_for_scale_out_event_callback_;
104   std::function<void(void)> ready_for_scale_in_event_callback_;
105   std::function<void(void)> scale_out_done_event_callback_;
106   std::function<void(void)> scale_in_done_event_callback_;
107 };
108 }  // namespace core
109 }  // namespace ps
110 }  // namespace mindspore
111 #endif  // MINDSPORE_CCSRC_PS_CORE_FOLLOWER_SCALER_H_
112