• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef META_DATA_H
17 #define META_DATA_H
18 
19 #include <atomic>
20 #include <map>
21 #include <mutex>
22 #include <vector>
23 
24 #include "db_types.h"
25 #include "ikvdb_sync_interface.h"
26 #include "query_sync_water_mark_helper.h"
27 
28 namespace DistributedDB {
29 struct MetaDataValue {
30     TimeOffset timeOffset = 0;
31     uint64_t lastUpdateTime = 0;
32     uint64_t localWaterMark = 0;
33     uint64_t peerWaterMark = 0;
34     Timestamp dbCreateTime = 0;
35     uint64_t clearDeviceDataMark = 0; // Default 0 for not remove device data.
36 };
37 
38 class Metadata {
39 public:
40     class MetaWaterMarkAutoLock final {
41     public:
42         explicit MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata);
43         ~MetaWaterMarkAutoLock();
44     private:
45         DISABLE_COPY_ASSIGN_MOVE(MetaWaterMarkAutoLock);
46         const std::shared_ptr<Metadata> metadataPtr_;
47     };
48 
49     Metadata();
50     virtual ~Metadata();
51 
52     int Initialize(ISyncInterface *storage);
53 
54     int SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue);
55 
56     void GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue);
57 
58     virtual void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue);
59 
60     int SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue);
61 
62     void GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue);
63 
64     int SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash);
65 
66     int SaveLocalTimeOffset(TimeOffset timeOffset);
67 
68     TimeOffset GetLocalTimeOffset() const;
69 
70     int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash);
71 
72     int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName);
73 
74     void SetLastLocalTime(Timestamp lastLocalTime);
75 
76     Timestamp GetLastLocalTime() const;
77 
78     int SetSendQueryWaterMark(const std::string &queryIdentify,
79         const std::string &deviceId, const WaterMark &waterMark);
80 
81     // the querySync's sendWatermark will increase by the device watermark
82     // if the sendWatermark less than device watermark
83     int GetSendQueryWaterMark(const std::string &queryIdentify,
84         const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true);
85 
86     int SetRecvQueryWaterMark(const std::string &queryIdentify,
87         const std::string &deviceId, const WaterMark &waterMark);
88 
89     // the querySync's recvWatermark will increase by the device watermark
90     // if the watermark less than device watermark
91     int GetRecvQueryWaterMark(const std::string &queryIdentify,
92         const std::string &deviceId, WaterMark &waterMark);
93 
94     virtual int SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
95         const Timestamp &timestamp);
96 
97     virtual int GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp);
98 
99     int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark);
100 
101     // the deleteSync's sendWatermark will increase by the device watermark
102     // if the sendWatermark less than device watermark
103     int GetSendDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true);
104 
105     int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark, bool isNeedHash = true);
106 
107     // the deleteSync's recvWatermark will increase by the device watermark
108     // if the recvWatermark less than device watermark
109     int GetRecvDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark);
110 
111     void GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue);
112 
113     int SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash);
114 
115     int ResetMetaDataAfterRemoveData(const DeviceID &deviceId);
116 
117     void GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue);
118 
119     // always get value from db, value updated from storage trigger
120     uint64_t GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const;
121 
122     void RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId);
123 
124     int SaveClientId(const std::string &deviceId, const std::string &clientId);
125 
126     int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const;
127 
128     void LockWaterMark() const;
129 
130     void UnlockWaterMark() const;
131 private:
132 
133     int SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash = true);
134 
135     // sync module need hash devices id
136     void GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash);
137 
138     static int SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue);
139 
140     static int DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue);
141 
142     int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const;
143 
144     int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue);
145 
146     void PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value);
147 
148     void GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue);
149 
150     int64_t StringToLong(const std::vector<uint8_t> &value) const;
151 
152     int GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys);
153 
154     int LoadAllMetadata();
155 
156     void GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash);
157 
158     // this function will read data from db by metaData's key
159     // and then serialize it and put to map
160     int LoadDeviceIdDataToMap(const Key &key);
161 
162     // reset the waterMark to zero
163     int ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash);
164 
165     // store localTimeOffset in ram; if change, should add a lock first, change here and metadata,
166     // then release lock
167     std::atomic<TimeOffset> localTimeOffset_;
168     std::mutex localTimeOffsetLock_;
169     ISyncInterface *naturalStoragePtr_;
170 
171     // if changed, it should be locked from save-to-db to change-in-memory.save to db must be first,
172     // if save to db fail, it will not be changed in memory.
173     std::map<std::string, MetaDataValue> metadataMap_;
174     mutable std::mutex metadataLock_;
175     std::map<DeviceID, DeviceID> deviceIdToHashDeviceIdMap_;
176 
177     // store localTimeOffset in ram, used to make timestamp increase
178     mutable std::mutex lastLocalTimeLock_;
179     Timestamp lastLocalTime_;
180 
181     QuerySyncWaterMarkHelper querySyncWaterMarkHelper_;
182 
183     // set value: SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId)
184     // queryId is not in set while key is not found from db first time, and return lastTimestamp = INT64_MAX
185     // if query is in set return 0 while not found from db, means already sync before, don't trigger again
186     mutable std::map<DeviceID, std::set<std::string>> queryIdMap_;
187 
188     std::mutex clientIdLock_;
189     std::map<DeviceID, std::string> clientIdCache_;
190 
191     mutable std::recursive_mutex waterMarkMutex_;
192 };
193 }  // namespace DistributedDB
194 #endif
195