• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef META_DATA_H
17 #define META_DATA_H
18 
19 #include <atomic>
20 #include <map>
21 #include <mutex>
22 #include <vector>
23 
24 #include "db_types.h"
25 #include "ikvdb_sync_interface.h"
26 #include "query_sync_water_mark_helper.h"
27 
28 namespace DistributedDB {
29 struct MetaDataValue {
30     TimeOffset timeOffset = 0;
31     uint64_t lastUpdateTime = 0;
32     uint64_t localWaterMark = 0;
33     uint64_t peerWaterMark = 0;
34     Timestamp dbCreateTime = 0;
35     uint64_t clearDeviceDataMark = 0; // Default 0 for not remove device data.
36 };
37 
38 class Metadata {
39 public:
40     Metadata();
41     virtual ~Metadata();
42 
43     int Initialize(ISyncInterface *storage);
44 
45     int SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue);
46 
47     void GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue);
48 
49     void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue);
50 
51     int SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue);
52 
53     void GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue);
54 
55     int SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash);
56 
57     int SaveLocalTimeOffset(TimeOffset timeOffset);
58 
59     TimeOffset GetLocalTimeOffset() const;
60 
61     int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash);
62 
63     int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName);
64 
65     void SetLastLocalTime(Timestamp lastLocalTime);
66 
67     Timestamp GetLastLocalTime() const;
68 
69     int SetSendQueryWaterMark(const std::string &queryIdentify,
70         const std::string &deviceId, const WaterMark &waterMark);
71 
72     // the querySync's sendWatermark will increase by the device watermark
73     // if the sendWatermark less than device watermark
74     int GetSendQueryWaterMark(const std::string &queryIdentify,
75         const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true);
76 
77     int SetRecvQueryWaterMark(const std::string &queryIdentify,
78         const std::string &deviceId, const WaterMark &waterMark);
79 
80     // the querySync's recvWatermark will increase by the device watermark
81     // if the watermark less than device watermark
82     int GetRecvQueryWaterMark(const std::string &queryIdentify,
83         const std::string &deviceId, WaterMark &waterMark);
84 
85     virtual int SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
86         const Timestamp &timestamp);
87 
88     virtual int GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp);
89 
90     int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark);
91 
92     // the deleteSync's sendWatermark will increase by the device watermark
93     // if the sendWatermark less than device watermark
94     int GetSendDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true);
95 
96     int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark, bool isNeedHash = true);
97 
98     // the deleteSync's recvWatermark will increase by the device watermark
99     // if the recvWatermark less than device watermark
100     int GetRecvDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark);
101 
102     void GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue);
103 
104     int SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash);
105 
106     int ResetMetaDataAfterRemoveData(const DeviceID &deviceId);
107 
108     void GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue);
109 
110     // always get value from db, value updated from storage trigger
111     uint64_t GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const;
112 
113     void RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId);
114 
115     int SaveClientId(const std::string &deviceId, const std::string &clientId);
116 
117     int GetHashDeviceId(const std::string &clientId, std::string &hashDevId);
118 private:
119 
120     int SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash = true);
121 
122     // sync module need hash devices id
123     void GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash);
124 
125     static int SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue);
126 
127     static int DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue);
128 
129     int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const;
130 
131     int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue);
132 
133     void PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value);
134 
135     void GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue);
136 
137     int64_t StringToLong(const std::vector<uint8_t> &value) const;
138 
139     int GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys);
140 
141     int LoadAllMetadata();
142 
143     uint64_t GetRandTimeOffset() const;
144 
145     void GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash);
146 
147     // this function will read data from db by metaData's key
148     // and then serialize it and put to map
149     int LoadDeviceIdDataToMap(const Key &key);
150 
151     // reset the waterMark to zero
152     int ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash);
153 
154     // store localTimeOffset in ram; if change, should add a lock first, change here and metadata,
155     // then release lock
156     std::atomic<TimeOffset> localTimeOffset_;
157     std::mutex localTimeOffsetLock_;
158     ISyncInterface *naturalStoragePtr_;
159 
160     // if changed, it should be locked from save-to-db to change-in-memory.save to db must be first,
161     // if save to db fail, it will not be changed in memory.
162     std::map<std::string, MetaDataValue> metadataMap_;
163     mutable std::mutex metadataLock_;
164     std::map<DeviceID, DeviceID> deviceIdToHashDeviceIdMap_;
165 
166     // store localTimeOffset in ram, used to make timestamp increase
167     mutable std::mutex lastLocalTimeLock_;
168     Timestamp lastLocalTime_;
169 
170     QuerySyncWaterMarkHelper querySyncWaterMarkHelper_;
171 
172     // set value: SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId)
173     // queryId is not in set while key is not found from db first time, and return lastTimestamp = INT64_MAX
174     // if query is in set return 0 while not found from db, means already sync before, don't trigger again
175     mutable std::map<DeviceID, std::set<std::string>> queryIdMap_;
176 
177     std::mutex clientIdLock_;
178     std::map<DeviceID, std::string> clientIdCache_;
179 };
180 }  // namespace DistributedDB
181 #endif
182