1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_FL_SERVER_DISTRIBUTED_META_STORE_H_ 18 #define MINDSPORE_CCSRC_FL_SERVER_DISTRIBUTED_META_STORE_H_ 19 20 #include <string> 21 #include <memory> 22 #include <unordered_map> 23 #include "proto/ps.pb.h" 24 #include "fl/server/common.h" 25 #include "ps/core/server_node.h" 26 #include "ps/core/communicator/tcp_communicator.h" 27 #include "fl/server/consistent_hash_ring.h" 28 29 namespace mindspore { 30 namespace fl { 31 namespace server { 32 constexpr auto kModuleDistributedMetadataStore = "DistributedMetadataStore"; 33 // This class is used for distributed metadata storage using consistent hash. All metadata is distributedly 34 // stored in all servers. Caller doesn't need to know which server stores the metadata. It only needs to know what kind 35 // of operations should be done to the metadata. 36 37 // The metadata stored in the server is in protobuffer format because it's easy for serializing and communicating. The 38 // type of the protobuffer struct is decided by the caller using protobuffer's API. 39 class DistributedMetadataStore { 40 public: GetInstance()41 static DistributedMetadataStore &GetInstance() { 42 static DistributedMetadataStore instance; 43 return instance; 44 } 45 46 // Initialize metadata storage with the server node because communication is needed. 47 void Initialize(const std::shared_ptr<ps::core::ServerNode> &server_node); 48 49 // Register callbacks for the server to handle update/get metadata messages from other servers. 50 void RegisterMessageCallback(const std::shared_ptr<ps::core::TcpCommunicator> &communicator); 51 52 // Register metadata for the name with the initial value. This method should be only called once for each name. 53 void RegisterMetadata(const std::string &name, const PBMetadata &meta); 54 55 // Reset the metadata value for the name. 56 void ResetMetadata(const std::string &name); 57 58 // Update the metadata for the name. Parameter 'reason' is the reason why updating meta data failed. 59 bool UpdateMetadata(const std::string &name, const PBMetadata &meta, std::string *reason = nullptr); 60 61 // Get the metadata for the name. 62 PBMetadata GetMetadata(const std::string &name); 63 64 // Reinitialize the consistency hash ring and clear metadata after scaling operations are done. 65 bool ReInitForScaling(); 66 67 private: DistributedMetadataStore()68 DistributedMetadataStore() 69 : server_node_(nullptr), 70 communicator_(nullptr), 71 local_rank_(0), 72 server_num_(0), 73 router_(nullptr), 74 metadata_({}) {} 75 ~DistributedMetadataStore() = default; 76 DistributedMetadataStore(const DistributedMetadataStore &) = delete; 77 DistributedMetadataStore &operator=(const DistributedMetadataStore &) = delete; 78 79 // Initialize the consistent hash ring for distributed storage. 80 void InitHashRing(); 81 82 // Callback for updating metadata request sent to the server. 83 void HandleUpdateMetadataRequest(const std::shared_ptr<ps::core::MessageHandler> &message); 84 85 // Callback for getting metadata request sent to the server. 86 void HandleGetMetadataRequest(const std::shared_ptr<ps::core::MessageHandler> &message); 87 88 // Do updating metadata in the server where the metadata for the name is stored. 89 bool DoUpdateMetadata(const std::string &name, const PBMetadata &meta); 90 91 // Members for the communication between servers. 92 std::shared_ptr<ps::core::ServerNode> server_node_; 93 std::shared_ptr<ps::core::TcpCommunicator> communicator_; 94 uint32_t local_rank_; 95 uint32_t server_num_; 96 97 // Consistent hash ring. This is used for DistributedMetadataStore to find which server node the meta data is stored. 98 std::shared_ptr<ConsistentHashRing> router_; 99 100 // We store metadata which is serialized by ProtoBuffer so that data storage and data transmission API is easy to use. 101 // Key: data name. 102 // Value: ProtoBuffer Struct. 103 std::unordered_map<std::string, PBMetadata> metadata_; 104 105 // Because the metadata is read/written conccurently, we must ensure the operations are threadsafe. 106 std::unordered_map<std::string, std::mutex> mutex_; 107 }; 108 } // namespace server 109 } // namespace fl 110 } // namespace mindspore 111 #endif // MINDSPORE_CCSRC_FL_SERVER_DISTRIBUTED_META_STORE_H_ 112