1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef META_DATA_H 17 #define META_DATA_H 18 19 #include <atomic> 20 #include <map> 21 #include <mutex> 22 #include <vector> 23 24 #include "db_types.h" 25 #include "ikvdb_sync_interface.h" 26 #include "query_sync_water_mark_helper.h" 27 28 namespace DistributedDB { 29 struct MetaDataValue { 30 TimeOffset timeOffset = 0; 31 uint64_t lastUpdateTime = 0; 32 uint64_t localWaterMark = 0; 33 uint64_t peerWaterMark = 0; 34 Timestamp dbCreateTime = 0; 35 uint64_t clearDeviceDataMark = 0; // Default 0 for not remove device data. 36 }; 37 38 class Metadata { 39 public: 40 class MetaWaterMarkAutoLock final { 41 public: 42 explicit MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata); 43 ~MetaWaterMarkAutoLock(); 44 private: 45 DISABLE_COPY_ASSIGN_MOVE(MetaWaterMarkAutoLock); 46 const std::shared_ptr<Metadata> metadataPtr_; 47 }; 48 49 Metadata(); 50 virtual ~Metadata(); 51 52 int Initialize(ISyncInterface *storage); 53 54 int SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue); 55 56 void GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue); 57 58 virtual void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue); 59 60 int SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue); 61 62 void GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue); 63 64 int SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash); 65 66 int SaveLocalTimeOffset(TimeOffset timeOffset); 67 68 TimeOffset GetLocalTimeOffset() const; 69 70 int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash); 71 72 int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName); 73 74 void SetLastLocalTime(Timestamp lastLocalTime); 75 76 Timestamp GetLastLocalTime() const; 77 78 int SetSendQueryWaterMark(const std::string &queryIdentify, 79 const std::string &deviceId, const WaterMark &waterMark); 80 81 // the querySync's sendWatermark will increase by the device watermark 82 // if the sendWatermark less than device watermark 83 int GetSendQueryWaterMark(const std::string &queryIdentify, 84 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true); 85 86 int SetRecvQueryWaterMark(const std::string &queryIdentify, 87 const std::string &deviceId, const WaterMark &waterMark); 88 89 // the querySync's recvWatermark will increase by the device watermark 90 // if the watermark less than device watermark 91 int GetRecvQueryWaterMark(const std::string &queryIdentify, 92 const std::string &deviceId, WaterMark &waterMark); 93 94 virtual int SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, 95 const Timestamp ×tamp); 96 97 virtual int GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp); 98 99 int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark); 100 101 // the deleteSync's sendWatermark will increase by the device watermark 102 // if the sendWatermark less than device watermark 103 int GetSendDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true); 104 105 int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark, bool isNeedHash = true); 106 107 // the deleteSync's recvWatermark will increase by the device watermark 108 // if the recvWatermark less than device watermark 109 int GetRecvDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark); 110 111 void GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue); 112 113 int SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash); 114 115 int ResetMetaDataAfterRemoveData(const DeviceID &deviceId); 116 117 void GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue); 118 119 // always get value from db, value updated from storage trigger 120 uint64_t GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const; 121 122 void RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId); 123 124 int SaveClientId(const std::string &deviceId, const std::string &clientId); 125 126 int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const; 127 128 void LockWaterMark() const; 129 130 void UnlockWaterMark() const; 131 private: 132 133 int SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash = true); 134 135 // sync module need hash devices id 136 void GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash); 137 138 static int SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue); 139 140 static int DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue); 141 142 int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const; 143 144 int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue); 145 146 void PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value); 147 148 void GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue); 149 150 int64_t StringToLong(const std::vector<uint8_t> &value) const; 151 152 int GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys); 153 154 int LoadAllMetadata(); 155 156 void GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash); 157 158 // this function will read data from db by metaData's key 159 // and then serialize it and put to map 160 int LoadDeviceIdDataToMap(const Key &key); 161 162 // reset the waterMark to zero 163 int ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash); 164 165 // store localTimeOffset in ram; if change, should add a lock first, change here and metadata, 166 // then release lock 167 std::atomic<TimeOffset> localTimeOffset_; 168 std::mutex localTimeOffsetLock_; 169 ISyncInterface *naturalStoragePtr_; 170 171 // if changed, it should be locked from save-to-db to change-in-memory.save to db must be first, 172 // if save to db fail, it will not be changed in memory. 173 std::map<std::string, MetaDataValue> metadataMap_; 174 mutable std::mutex metadataLock_; 175 std::map<DeviceID, DeviceID> deviceIdToHashDeviceIdMap_; 176 177 // store localTimeOffset in ram, used to make timestamp increase 178 mutable std::mutex lastLocalTimeLock_; 179 Timestamp lastLocalTime_; 180 181 QuerySyncWaterMarkHelper querySyncWaterMarkHelper_; 182 183 // set value: SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId) 184 // queryId is not in set while key is not found from db first time, and return lastTimestamp = INT64_MAX 185 // if query is in set return 0 while not found from db, means already sync before, don't trigger again 186 mutable std::map<DeviceID, std::set<std::string>> queryIdMap_; 187 188 std::mutex clientIdLock_; 189 std::map<DeviceID, std::string> clientIdCache_; 190 191 mutable std::recursive_mutex waterMarkMutex_; 192 }; 193 } // namespace DistributedDB 194 #endif 195