• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "meta_data.h"
17 
18 #include <openssl/rand.h>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "securec.h"
26 #include "sync_types.h"
27 #include "time_helper.h"
28 
29 namespace DistributedDB {
30 namespace {
31     const int STR_TO_LL_BY_DEVALUE = 10;
32     // store local timeoffset;this is a special key;
33     const std::string LOCALTIME_OFFSET_KEY = "localTimeOffset";
34     const char *CLIENT_ID_PREFIX_KEY = "clientId";
35 }
36 
Metadata()37 Metadata::Metadata()
38     : localTimeOffset_(0),
39       naturalStoragePtr_(nullptr),
40       lastLocalTime_(0)
41 {}
42 
~Metadata()43 Metadata::~Metadata()
44 {
45     naturalStoragePtr_ = nullptr;
46     metadataMap_.clear();
47 }
48 
Initialize(ISyncInterface * storage)49 int Metadata::Initialize(ISyncInterface* storage)
50 {
51     naturalStoragePtr_ = storage;
52     std::vector<uint8_t> key;
53     std::vector<uint8_t> timeOffset;
54     DBCommon::StringToVector(LOCALTIME_OFFSET_KEY, key);
55 
56     int errCode = GetMetadataFromDb(key, timeOffset);
57     if (errCode == -E_NOT_FOUND) {
58         int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
59         if (err != E_OK) {
60             LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
61             return err;
62         }
63     } else if (errCode == E_OK) {
64         localTimeOffset_ = StringToLong(timeOffset);
65     } else {
66         LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
67         return errCode;
68     }
69     {
70         std::lock_guard<std::mutex> lockGuard(metadataLock_);
71         metadataMap_.clear();
72     }
73     (void)querySyncWaterMarkHelper_.Initialize(storage);
74     return LoadAllMetadata();
75 }
76 
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)77 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
78 {
79     MetaDataValue metadata;
80     std::lock_guard<std::mutex> lockGuard(metadataLock_);
81     GetMetaDataValue(deviceId, metadata, true);
82     metadata.timeOffset = inValue;
83     metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
84     LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
85     return SaveMetaDataValue(deviceId, metadata);
86 }
87 
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)88 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
89 {
90     MetaDataValue metadata;
91     std::lock_guard<std::mutex> lockGuard(metadataLock_);
92     GetMetaDataValue(deviceId, metadata, true);
93     outValue = metadata.timeOffset;
94 }
95 
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)96 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
97 {
98     MetaDataValue metadata;
99     std::lock_guard<std::mutex> lockGuard(metadataLock_);
100     GetMetaDataValue(deviceId, metadata, true);
101     outValue = metadata.localWaterMark;
102 }
103 
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)104 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
105 {
106     MetaDataValue metadata;
107     std::lock_guard<std::mutex> lockGuard(metadataLock_);
108     GetMetaDataValue(deviceId, metadata, true);
109     metadata.localWaterMark = inValue;
110     LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
111     return SaveMetaDataValue(deviceId, metadata);
112 }
113 
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue)114 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue)
115 {
116     MetaDataValue metadata;
117     std::lock_guard<std::mutex> lockGuard(metadataLock_);
118     GetMetaDataValue(deviceId, metadata, true);
119     outValue = metadata.peerWaterMark;
120 }
121 
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)122 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
123 {
124     MetaDataValue metadata;
125     std::lock_guard<std::mutex> lockGuard(metadataLock_);
126     GetMetaDataValue(deviceId, metadata, isNeedHash);
127     metadata.peerWaterMark = inValue;
128     LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
129     return SaveMetaDataValue(deviceId, metadata, isNeedHash);
130 }
131 
SaveLocalTimeOffset(TimeOffset timeOffset)132 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
133 {
134     std::string timeOffsetString = std::to_string(timeOffset);
135     std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
136     std::vector<uint8_t> localTimeOffsetValue(
137         LOCALTIME_OFFSET_KEY.begin(), LOCALTIME_OFFSET_KEY.end());
138 
139     std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
140     localTimeOffset_ = timeOffset;
141     LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
142     int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
143     if (errCode != E_OK) {
144         LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
145     }
146     return errCode;
147 }
148 
GetLocalTimeOffset() const149 TimeOffset Metadata::GetLocalTimeOffset() const
150 {
151     TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
152     return localTimeOffset;
153 }
154 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)155 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
156 {
157     return EraseDeviceWaterMark(deviceId, isNeedHash, "");
158 }
159 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)160 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
161 {
162     std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_);
163     // try to erase all the waterMark
164     // erase deleteSync recv waterMark
165     WaterMark waterMark = 0;
166     int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
167     if (errCodeDeleteSync != E_OK) {
168         LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
169         return errCodeDeleteSync;
170     }
171     // erase querySync recv waterMark
172     int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
173     if (errCodeQuerySync != E_OK) {
174         LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
175         return errCodeQuerySync;
176     }
177     // peerWaterMark must be erased at last
178     int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
179     if (errCode != E_OK) {
180         LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
181         return errCode;
182     }
183     return E_OK;
184 }
185 
SetLastLocalTime(Timestamp lastLocalTime)186 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
187 {
188     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
189     if (lastLocalTime > lastLocalTime_) {
190         lastLocalTime_ = lastLocalTime;
191     }
192 }
193 
GetLastLocalTime() const194 Timestamp Metadata::GetLastLocalTime() const
195 {
196     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
197     return lastLocalTime_;
198 }
199 
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)200 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
201 {
202     std::vector<uint8_t> value;
203     int errCode = SerializeMetaData(inValue, value);
204     if (errCode != E_OK) {
205         return errCode;
206     }
207 
208     DeviceID hashDeviceId;
209     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
210     std::vector<uint8_t> key;
211     DBCommon::StringToVector(hashDeviceId, key);
212     errCode = SetMetadataToDb(key, value);
213     if (errCode != E_OK) {
214         LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
215         return errCode;
216     }
217     PutMetadataToMap(hashDeviceId, inValue);
218     return E_OK;
219 }
220 
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)221 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
222 {
223     DeviceID hashDeviceId;
224     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
225     GetMetadataFromMap(hashDeviceId, outValue);
226 }
227 
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)228 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
229 {
230     outValue.resize(sizeof(MetaDataValue));
231     errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue));
232     if (err != EOK) {
233         return -E_SECUREC_ERROR;
234     }
235     return E_OK;
236 }
237 
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)238 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
239 {
240     if (inValue.empty()) {
241         return -E_INVALID_ARGS;
242     }
243     errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size());
244     if (err != EOK) {
245         return -E_SECUREC_ERROR;
246     }
247     return E_OK;
248 }
249 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const250 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
251 {
252     if (naturalStoragePtr_ == nullptr) {
253         return -E_INVALID_DB;
254     }
255     return naturalStoragePtr_->GetMetaData(key, outValue);
256 }
257 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)258 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
259 {
260     if (naturalStoragePtr_ == nullptr) {
261         return -E_INVALID_DB;
262     }
263     return naturalStoragePtr_->PutMetaData(key, inValue);
264 }
265 
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)266 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
267 {
268     metadataMap_[deviceId] = value;
269 }
270 
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)271 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
272 {
273     outValue = metadataMap_[deviceId];
274 }
275 
StringToLong(const std::vector<uint8_t> & value) const276 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
277 {
278     std::string valueString(value.begin(), value.end());
279     int64_t longData = std::strtoll(valueString.c_str(), nullptr, STR_TO_LL_BY_DEVALUE);
280     LOGD("Metadata::StringToLong longData = %" PRId64, longData);
281     return longData;
282 }
283 
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)284 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
285 {
286     if (naturalStoragePtr_ == nullptr) {
287         return -E_INVALID_DB;
288     }
289     return naturalStoragePtr_->GetAllMetaKeys(keys);
290 }
291 
292 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)293 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
294 {
295     if (inKey.size() < expectPrefix.size()) {
296         return false;
297     }
298     std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
299     if (prefixInKey != expectPrefix) {
300         return false;
301     }
302     return true;
303 }
304 }
305 
LoadAllMetadata()306 int Metadata::LoadAllMetadata()
307 {
308     std::vector<std::vector<uint8_t>> metaDataKeys;
309     int errCode = GetAllMetadataKey(metaDataKeys);
310     if (errCode != E_OK) {
311         LOGE("[Metadata] get all metadata key failed err=%d", errCode);
312         return errCode;
313     }
314 
315     std::vector<std::vector<uint8_t>> querySyncIds;
316     for (const auto &deviceId : metaDataKeys) {
317         if (IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) {
318             errCode = LoadDeviceIdDataToMap(deviceId);
319             if (errCode != E_OK) {
320                 return errCode;
321             }
322         } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
323             querySyncIds.push_back(deviceId);
324         } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
325             errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
326             if (errCode != E_OK) {
327                 return errCode;
328             }
329         }
330     }
331     return querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
332 }
333 
LoadDeviceIdDataToMap(const Key & key)334 int Metadata::LoadDeviceIdDataToMap(const Key &key)
335 {
336     std::vector<uint8_t> value;
337     int errCode = GetMetadataFromDb(key, value);
338     if (errCode != E_OK) {
339         return errCode;
340     }
341     MetaDataValue metaValue;
342     std::string metaKey(key.begin(), key.end());
343     errCode = DeSerializeMetaData(value, metaValue);
344     if (errCode != E_OK) {
345         return errCode;
346     }
347     std::lock_guard<std::mutex> lockGuard(metadataLock_);
348     PutMetadataToMap(metaKey, metaValue);
349     return errCode;
350 }
351 
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)352 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
353 {
354     if (!isNeedHash) {
355         hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId;
356         return;
357     }
358     if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
359         hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
360         deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
361     } else {
362         hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
363     }
364 }
365 
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)366 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
367     const std::string &deviceId, WaterMark &waterMark)
368 {
369     QueryWaterMark queryWaterMark;
370     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
371     if (errCode != E_OK) {
372         return errCode;
373     }
374     WaterMark peerWaterMark;
375     GetPeerWaterMark(deviceId, peerWaterMark);
376     waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
377     return E_OK;
378 }
379 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)380 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
381     const std::string &deviceId, const WaterMark &waterMark)
382 {
383     return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
384 }
385 
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)386 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
387     const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
388 {
389     QueryWaterMark queryWaterMark;
390     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
391     if (errCode != E_OK) {
392         return errCode;
393     }
394     if (isAutoLift) {
395         WaterMark localWaterMark;
396         GetLocalWaterMark(deviceId, localWaterMark);
397         waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
398     } else {
399         waterMark = queryWaterMark.sendWaterMark;
400     }
401     return E_OK;
402 }
403 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)404 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
405     const std::string &deviceId, const WaterMark &waterMark)
406 {
407     return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
408 }
409 
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)410 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp)
411 {
412     QueryWaterMark queryWaterMark;
413     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
414     if (errCode != E_OK) {
415         return errCode;
416     }
417     timestamp = queryWaterMark.lastQueryTime;
418     return E_OK;
419 }
420 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)421 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
422     const Timestamp &timestamp)
423 {
424     return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
425 }
426 
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)427 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
428 {
429     DeleteWaterMark deleteWaterMark;
430     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
431     if (errCode != E_OK) {
432         return errCode;
433     }
434     if (isAutoLift) {
435         WaterMark localWaterMark;
436         GetLocalWaterMark(deviceId, localWaterMark);
437         waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
438     } else {
439         waterMark = deleteWaterMark.sendWaterMark;
440     }
441     return E_OK;
442 }
443 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)444 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
445 {
446     return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
447 }
448 
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)449 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
450 {
451     DeleteWaterMark deleteWaterMark;
452     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
453     if (errCode != E_OK) {
454         return errCode;
455     }
456     WaterMark peerWaterMark;
457     GetPeerWaterMark(deviceId, peerWaterMark);
458     waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
459     return E_OK;
460 }
461 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)462 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
463 {
464     return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
465 }
466 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)467 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
468 {
469     return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
470 }
471 
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)472 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
473 {
474     MetaDataValue metadata;
475     std::lock_guard<std::mutex> lockGuard(metadataLock_);
476     DeviceID hashDeviceId;
477     GetHashDeviceId(deviceId, hashDeviceId, true);
478     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
479         metadata = metadataMap_[hashDeviceId];
480         outValue = metadata.dbCreateTime;
481         return;
482     }
483     outValue = 0;
484     LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
485 }
486 
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)487 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
488 {
489     MetaDataValue metadata;
490     std::lock_guard<std::mutex> lockGuard(metadataLock_);
491     DeviceID hashDeviceId;
492     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
493     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
494         metadata = metadataMap_[hashDeviceId];
495         if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
496             metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
497             LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64,
498                 STR_MASK(deviceId), inValue);
499         }
500         if (metadata.dbCreateTime == 0) {
501             LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
502         }
503     }
504     metadata.dbCreateTime = inValue;
505     return SaveMetaDataValue(deviceId, metadata);
506 }
507 
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)508 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
509 {
510     MetaDataValue metadata;
511     std::lock_guard<std::mutex> lockGuard(metadataLock_);
512     DeviceID hashDeviceId;
513     GetHashDeviceId(deviceId, hashDeviceId, true);
514     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
515         metadata = metadataMap_[hashDeviceId];
516         metadata.clearDeviceDataMark = 0; // clear mark
517         return SaveMetaDataValue(deviceId, metadata);
518     }
519     return -E_NOT_FOUND;
520 }
521 
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)522 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
523 {
524     MetaDataValue metadata;
525     std::lock_guard<std::mutex> lockGuard(metadataLock_);
526     DeviceID hashDeviceId;
527     GetHashDeviceId(deviceId, hashDeviceId, true);
528     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
529         metadata = metadataMap_[hashDeviceId];
530         outValue = metadata.clearDeviceDataMark;
531         return;
532     }
533     outValue = 0;
534 }
535 
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const536 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
537 {
538     std::vector<uint8_t> key;
539     std::vector<uint8_t> value;
540     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
541     DBCommon::StringToVector(hashqueryId, key);
542     int errCode = GetMetadataFromDb(key, value);
543     std::lock_guard<std::mutex> lockGuard(metadataLock_);
544     if (errCode == -E_NOT_FOUND) {
545         auto iter = queryIdMap_.find(deviceId);
546         if (iter != queryIdMap_.end()) {
547             if (iter->second.find(hashqueryId) == iter->second.end()) {
548                 iter->second.insert(hashqueryId);
549                 return INT64_MAX;
550             }
551             return 0;
552         } else {
553             queryIdMap_[deviceId] = { hashqueryId };
554             return INT64_MAX;
555         }
556     }
557     auto iter = queryIdMap_.find(deviceId);
558     // while value is found in db, it can be found in db later when db is not closed
559     // so no need to record the hashqueryId in map
560     if (errCode == E_OK && iter != queryIdMap_.end()) {
561         iter->second.erase(hashqueryId);
562     }
563     return StringToLong(value);
564 }
565 
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)566 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
567 {
568     std::lock_guard<std::mutex> lockGuard(metadataLock_);
569     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
570     auto iter = queryIdMap_.find(deviceId);
571     if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
572         iter->second.erase(hashqueryId);
573     }
574 }
575 
SaveClientId(const std::string & deviceId,const std::string & clientId)576 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
577 {
578     {
579         // already save in cache
580         std::lock_guard<std::mutex> autoLock(clientIdLock_);
581         if (clientIdCache_[deviceId] == clientId) {
582             return E_OK;
583         }
584     }
585     std::string keyStr;
586     keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
587     std::string valueStr = DBCommon::TransferHashString(deviceId);
588     Key key;
589     DBCommon::StringToVector(keyStr, key);
590     Value value;
591     DBCommon::StringToVector(valueStr, value);
592     int errCode = SetMetadataToDb(key, value);
593     if (errCode != E_OK) {
594         return errCode;
595     }
596     std::lock_guard<std::mutex> autoLock(clientIdLock_);
597     clientIdCache_[deviceId] = clientId;
598     return E_OK;
599 }
600 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const601 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
602 {
603     // don't use cache here avoid invalid cache
604     std::string keyStr;
605     keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
606     Key key;
607     DBCommon::StringToVector(keyStr, key);
608     Value value;
609     int errCode = GetMetadataFromDb(key, value);
610     if (errCode == -E_NOT_FOUND) {
611         LOGD("[Metadata] not found clientId");
612         return -E_NOT_SUPPORT;
613     }
614     if (errCode != E_OK) {
615         LOGE("[Metadata] reload clientId failed %d", errCode);
616         return errCode;
617     }
618     DBCommon::VectorToString(value, hashDevId);
619     return E_OK;
620 }
621 
LockWaterMark() const622 void Metadata::LockWaterMark() const
623 {
624     waterMarkMutex_.lock();
625 }
626 
UnlockWaterMark() const627 void Metadata::UnlockWaterMark() const
628 {
629     waterMarkMutex_.unlock();
630 }
631 
MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)632 Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)
633     : metadataPtr_(std::move(metadata))
634 {
635     if (metadataPtr_ != nullptr) {
636         metadataPtr_->LockWaterMark();
637     }
638 }
639 
~MetaWaterMarkAutoLock()640 Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock()
641 {
642     if (metadataPtr_ != nullptr) {
643         metadataPtr_->UnlockWaterMark();
644     }
645 }
646 }  // namespace DistributedDB