1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef META_DATA_H 17 #define META_DATA_H 18 19 #include <atomic> 20 #include <map> 21 #include <mutex> 22 #include <vector> 23 24 #include "db_types.h" 25 #include "ikvdb_sync_interface.h" 26 #include "query_sync_water_mark_helper.h" 27 28 namespace DistributedDB { 29 struct MetaDataValue { 30 TimeOffset timeOffset = 0; 31 uint64_t lastUpdateTime = 0; 32 uint64_t localWaterMark = 0; 33 uint64_t peerWaterMark = 0; 34 Timestamp dbCreateTime = 0; 35 uint64_t clearDeviceDataMark = 0; // Default 0 for not remove device data. 36 }; 37 38 class Metadata { 39 public: 40 Metadata(); 41 virtual ~Metadata(); 42 43 int Initialize(ISyncInterface *storage); 44 45 int SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue); 46 47 void GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue); 48 49 void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue); 50 51 int SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue); 52 53 void GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue); 54 55 int SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash); 56 57 int SaveLocalTimeOffset(TimeOffset timeOffset); 58 59 TimeOffset GetLocalTimeOffset() const; 60 61 int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash); 62 63 int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName); 64 65 void SetLastLocalTime(Timestamp lastLocalTime); 66 67 Timestamp GetLastLocalTime() const; 68 69 int SetSendQueryWaterMark(const std::string &queryIdentify, 70 const std::string &deviceId, const WaterMark &waterMark); 71 72 // the querySync's sendWatermark will increase by the device watermark 73 // if the sendWatermark less than device watermark 74 int GetSendQueryWaterMark(const std::string &queryIdentify, 75 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true); 76 77 int SetRecvQueryWaterMark(const std::string &queryIdentify, 78 const std::string &deviceId, const WaterMark &waterMark); 79 80 // the querySync's recvWatermark will increase by the device watermark 81 // if the watermark less than device watermark 82 int GetRecvQueryWaterMark(const std::string &queryIdentify, 83 const std::string &deviceId, WaterMark &waterMark); 84 85 virtual int SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, 86 const Timestamp ×tamp); 87 88 virtual int GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp); 89 90 int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark); 91 92 // the deleteSync's sendWatermark will increase by the device watermark 93 // if the sendWatermark less than device watermark 94 int GetSendDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true); 95 96 int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark, bool isNeedHash = true); 97 98 // the deleteSync's recvWatermark will increase by the device watermark 99 // if the recvWatermark less than device watermark 100 int GetRecvDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark); 101 102 void GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue); 103 104 int SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash); 105 106 int ResetMetaDataAfterRemoveData(const DeviceID &deviceId); 107 108 void GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue); 109 110 // always get value from db, value updated from storage trigger 111 uint64_t GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const; 112 113 void RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId); 114 115 int SaveClientId(const std::string &deviceId, const std::string &clientId); 116 117 int GetHashDeviceId(const std::string &clientId, std::string &hashDevId); 118 private: 119 120 int SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash = true); 121 122 // sync module need hash devices id 123 void GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash); 124 125 static int SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue); 126 127 static int DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue); 128 129 int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const; 130 131 int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue); 132 133 void PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value); 134 135 void GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue); 136 137 int64_t StringToLong(const std::vector<uint8_t> &value) const; 138 139 int GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys); 140 141 int LoadAllMetadata(); 142 143 uint64_t GetRandTimeOffset() const; 144 145 void GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash); 146 147 // this function will read data from db by metaData's key 148 // and then serialize it and put to map 149 int LoadDeviceIdDataToMap(const Key &key); 150 151 // reset the waterMark to zero 152 int ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash); 153 154 // store localTimeOffset in ram; if change, should add a lock first, change here and metadata, 155 // then release lock 156 std::atomic<TimeOffset> localTimeOffset_; 157 std::mutex localTimeOffsetLock_; 158 ISyncInterface *naturalStoragePtr_; 159 160 // if changed, it should be locked from save-to-db to change-in-memory.save to db must be first, 161 // if save to db fail, it will not be changed in memory. 162 std::map<std::string, MetaDataValue> metadataMap_; 163 mutable std::mutex metadataLock_; 164 std::map<DeviceID, DeviceID> deviceIdToHashDeviceIdMap_; 165 166 // store localTimeOffset in ram, used to make timestamp increase 167 mutable std::mutex lastLocalTimeLock_; 168 Timestamp lastLocalTime_; 169 170 QuerySyncWaterMarkHelper querySyncWaterMarkHelper_; 171 172 // set value: SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId) 173 // queryId is not in set while key is not found from db first time, and return lastTimestamp = INT64_MAX 174 // if query is in set return 0 while not found from db, means already sync before, don't trigger again 175 mutable std::map<DeviceID, std::set<std::string>> queryIdMap_; 176 177 std::mutex clientIdLock_; 178 std::map<DeviceID, std::string> clientIdCache_; 179 }; 180 } // namespace DistributedDB 181 #endif 182