• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_FL_SERVER_DISTRIBUTED_META_STORE_H_
18 #define MINDSPORE_CCSRC_FL_SERVER_DISTRIBUTED_META_STORE_H_
19 
20 #include <string>
21 #include <memory>
22 #include <unordered_map>
23 #include "proto/ps.pb.h"
24 #include "fl/server/common.h"
25 #include "ps/core/server_node.h"
26 #include "ps/core/communicator/tcp_communicator.h"
27 #include "fl/server/consistent_hash_ring.h"
28 
29 namespace mindspore {
30 namespace fl {
31 namespace server {
32 constexpr auto kModuleDistributedMetadataStore = "DistributedMetadataStore";
33 // This class is used for distributed metadata storage using consistent hash. All metadata is distributedly
34 // stored in all servers. Caller doesn't need to know which server stores the metadata. It only needs to know what kind
35 // of operations should be done to the metadata.
36 
37 // The metadata stored in the server is in protobuffer format because it's easy for serializing and communicating. The
38 // type of the protobuffer struct is decided by the caller using protobuffer's API.
39 class DistributedMetadataStore {
40  public:
GetInstance()41   static DistributedMetadataStore &GetInstance() {
42     static DistributedMetadataStore instance;
43     return instance;
44   }
45 
46   // Initialize metadata storage with the server node because communication is needed.
47   void Initialize(const std::shared_ptr<ps::core::ServerNode> &server_node);
48 
49   // Register callbacks for the server to handle update/get metadata messages from other servers.
50   void RegisterMessageCallback(const std::shared_ptr<ps::core::TcpCommunicator> &communicator);
51 
52   // Register metadata for the name with the initial value. This method should be only called once for each name.
53   void RegisterMetadata(const std::string &name, const PBMetadata &meta);
54 
55   // Reset the metadata value for the name.
56   void ResetMetadata(const std::string &name);
57 
58   // Update the metadata for the name. Parameter 'reason' is the reason why updating meta data failed.
59   bool UpdateMetadata(const std::string &name, const PBMetadata &meta, std::string *reason = nullptr);
60 
61   // Get the metadata for the name.
62   PBMetadata GetMetadata(const std::string &name);
63 
64   // Reinitialize the consistency hash ring and clear metadata after scaling operations are done.
65   bool ReInitForScaling();
66 
67  private:
DistributedMetadataStore()68   DistributedMetadataStore()
69       : server_node_(nullptr),
70         communicator_(nullptr),
71         local_rank_(0),
72         server_num_(0),
73         router_(nullptr),
74         metadata_({}) {}
75   ~DistributedMetadataStore() = default;
76   DistributedMetadataStore(const DistributedMetadataStore &) = delete;
77   DistributedMetadataStore &operator=(const DistributedMetadataStore &) = delete;
78 
79   // Initialize the consistent hash ring for distributed storage.
80   void InitHashRing();
81 
82   // Callback for updating metadata request sent to the server.
83   void HandleUpdateMetadataRequest(const std::shared_ptr<ps::core::MessageHandler> &message);
84 
85   // Callback for getting metadata request sent to the server.
86   void HandleGetMetadataRequest(const std::shared_ptr<ps::core::MessageHandler> &message);
87 
88   // Do updating metadata in the server where the metadata for the name is stored.
89   bool DoUpdateMetadata(const std::string &name, const PBMetadata &meta);
90 
91   // Members for the communication between servers.
92   std::shared_ptr<ps::core::ServerNode> server_node_;
93   std::shared_ptr<ps::core::TcpCommunicator> communicator_;
94   uint32_t local_rank_;
95   uint32_t server_num_;
96 
97   // Consistent hash ring. This is used for DistributedMetadataStore to find which server node the meta data is stored.
98   std::shared_ptr<ConsistentHashRing> router_;
99 
100   // We store metadata which is serialized by ProtoBuffer so that data storage and data transmission API is easy to use.
101   // Key: data name.
102   // Value: ProtoBuffer Struct.
103   std::unordered_map<std::string, PBMetadata> metadata_;
104 
105   // Because the metadata is read/written conccurently, we must ensure the operations are threadsafe.
106   std::unordered_map<std::string, std::mutex> mutex_;
107 };
108 }  // namespace server
109 }  // namespace fl
110 }  // namespace mindspore
111 #endif  // MINDSPORE_CCSRC_FL_SERVER_DISTRIBUTED_META_STORE_H_
112