• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "meta_data.h"
17 
18 #include <openssl/rand.h>
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_errno.h"
22 #include "hash.h"
23 #include "log_print.h"
24 #include "securec.h"
25 #include "sync_types.h"
26 #include "time_helper.h"
27 
28 namespace DistributedDB {
29 namespace {
30     const int STR_TO_LL_BY_DEVALUE = 10;
31     // store local timeoffset;this is a special key;
32     const std::string LOCALTIME_OFFSET_KEY = "localTimeOffset";
33     const std::string DEVICEID_PREFIX_KEY = "deviceId";
34 }
35 
Metadata()36 Metadata::Metadata()
37     : localTimeOffset_(0),
38       naturalStoragePtr_(nullptr),
39       lastLocalTime_(0)
40 {}
41 
~Metadata()42 Metadata::~Metadata()
43 {
44     naturalStoragePtr_ = nullptr;
45     metadataMap_.clear();
46 }
47 
Initialize(ISyncInterface * storage)48 int Metadata::Initialize(ISyncInterface* storage)
49 {
50     naturalStoragePtr_ = storage;
51     std::vector<uint8_t> key;
52     std::vector<uint8_t> timeOffset;
53     DBCommon::StringToVector(LOCALTIME_OFFSET_KEY, key);
54 
55     int errCode = GetMetadataFromDb(key, timeOffset);
56     if (errCode == -E_NOT_FOUND) {
57         int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
58         if (err != E_OK) {
59             LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
60             return err;
61         }
62     } else if (errCode == E_OK) {
63         localTimeOffset_ = StringToLong(timeOffset);
64     } else {
65         LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
66         return errCode;
67     }
68     {
69         std::lock_guard<std::mutex> lockGuard(metadataLock_);
70         metadataMap_.clear();
71     }
72     (void)querySyncWaterMarkHelper_.Initialize(storage);
73     return LoadAllMetadata();
74 }
75 
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)76 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
77 {
78     MetaDataValue metadata;
79     std::lock_guard<std::mutex> lockGuard(metadataLock_);
80     GetMetaDataValue(deviceId, metadata, true);
81     metadata.timeOffset = inValue;
82     metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
83     LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
84     return SaveMetaDataValue(deviceId, metadata);
85 }
86 
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)87 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
88 {
89     MetaDataValue metadata;
90     std::lock_guard<std::mutex> lockGuard(metadataLock_);
91     GetMetaDataValue(deviceId, metadata, true);
92     outValue = metadata.timeOffset;
93 }
94 
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)95 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
96 {
97     MetaDataValue metadata;
98     std::lock_guard<std::mutex> lockGuard(metadataLock_);
99     GetMetaDataValue(deviceId, metadata, true);
100     outValue = metadata.localWaterMark;
101 }
102 
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)103 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
104 {
105     MetaDataValue metadata;
106     std::lock_guard<std::mutex> lockGuard(metadataLock_);
107     GetMetaDataValue(deviceId, metadata, true);
108     metadata.localWaterMark = inValue;
109     LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
110     return SaveMetaDataValue(deviceId, metadata);
111 }
112 
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue)113 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue)
114 {
115     MetaDataValue metadata;
116     std::lock_guard<std::mutex> lockGuard(metadataLock_);
117     GetMetaDataValue(deviceId, metadata, true);
118     outValue = metadata.peerWaterMark;
119 }
120 
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)121 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
122 {
123     MetaDataValue metadata;
124     std::lock_guard<std::mutex> lockGuard(metadataLock_);
125     GetMetaDataValue(deviceId, metadata, isNeedHash);
126     metadata.peerWaterMark = inValue;
127     LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
128     return SaveMetaDataValue(deviceId, metadata);
129 }
130 
SaveLocalTimeOffset(TimeOffset timeOffset)131 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
132 {
133     std::string timeOffsetString = std::to_string(timeOffset);
134     std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
135     std::vector<uint8_t> localTimeOffsetValue(
136         LOCALTIME_OFFSET_KEY.begin(), LOCALTIME_OFFSET_KEY.end());
137 
138     std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
139     localTimeOffset_ = timeOffset;
140     LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
141     int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
142     if (errCode != E_OK) {
143         LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
144     }
145     return errCode;
146 }
147 
GetLocalTimeOffset() const148 TimeOffset Metadata::GetLocalTimeOffset() const
149 {
150     TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
151     return localTimeOffset;
152 }
153 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)154 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
155 {
156     return EraseDeviceWaterMark(deviceId, isNeedHash, "");
157 }
158 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)159 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
160 {
161     // try to erase all the waterMark
162     // erase deleteSync recv waterMark
163     WaterMark waterMark = 0;
164     int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark);
165     // erase querySync recv waterMark
166     int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName);
167     // peerWaterMark must be erased at last
168     int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
169     if (errCode != E_OK) {
170         LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
171         return errCode;
172     }
173     if (errCodeQuerySync != E_OK) {
174         LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
175         return errCodeQuerySync;
176     }
177     if (errCodeDeleteSync != E_OK) {
178         LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
179         return errCodeDeleteSync;
180     }
181     return E_OK;
182 }
183 
SetLastLocalTime(Timestamp lastLocalTime)184 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
185 {
186     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
187     if (lastLocalTime > lastLocalTime_) {
188         lastLocalTime_ = lastLocalTime;
189     }
190 }
191 
GetLastLocalTime() const192 Timestamp Metadata::GetLastLocalTime() const
193 {
194     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
195     return lastLocalTime_;
196 }
197 
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue)198 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue)
199 {
200     std::vector<uint8_t> value;
201     int errCode = SerializeMetaData(inValue, value);
202     if (errCode != E_OK) {
203         return errCode;
204     }
205 
206     DeviceID hashDeviceId;
207     GetHashDeviceId(deviceId, hashDeviceId, true);
208     std::vector<uint8_t> key;
209     DBCommon::StringToVector(hashDeviceId, key);
210     errCode = SetMetadataToDb(key, value);
211     if (errCode != E_OK) {
212         LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
213         return errCode;
214     }
215     PutMetadataToMap(hashDeviceId, inValue);
216     return E_OK;
217 }
218 
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)219 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
220 {
221     DeviceID hashDeviceId;
222     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
223     GetMetadataFromMap(hashDeviceId, outValue);
224 }
225 
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)226 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
227 {
228     outValue.resize(sizeof(MetaDataValue));
229     errno_t err = memcpy_s(&outValue[0], outValue.size(), &inValue, sizeof(MetaDataValue));
230     if (err != EOK) {
231         return -E_SECUREC_ERROR;
232     }
233     return E_OK;
234 }
235 
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue) const236 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue) const
237 {
238     if (inValue.empty()) {
239         return -E_INVALID_ARGS;
240     }
241 
242     errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), &inValue[0], inValue.size());
243     if (err != EOK) {
244         return -E_SECUREC_ERROR;
245     }
246     return E_OK;
247 }
248 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const249 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
250 {
251     if (naturalStoragePtr_ == nullptr) {
252         return -E_INVALID_DB;
253     }
254     return naturalStoragePtr_->GetMetaData(key, outValue);
255 }
256 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)257 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
258 {
259     if (naturalStoragePtr_ == nullptr) {
260         return -E_INVALID_DB;
261     }
262     return naturalStoragePtr_->PutMetaData(key, inValue);
263 }
264 
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)265 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
266 {
267     metadataMap_[deviceId] = value;
268 }
269 
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)270 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
271 {
272     outValue = metadataMap_[deviceId];
273 }
274 
StringToLong(const std::vector<uint8_t> & value) const275 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
276 {
277     std::string valueString(value.begin(), value.end());
278     int64_t longData = std::strtoll(valueString.c_str(), nullptr, STR_TO_LL_BY_DEVALUE);
279     LOGD("Metadata::StringToLong longData = %" PRId64, longData);
280     return longData;
281 }
282 
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)283 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
284 {
285     if (naturalStoragePtr_ == nullptr) {
286         return -E_INVALID_DB;
287     }
288     return naturalStoragePtr_->GetAllMetaKeys(keys);
289 }
290 
291 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)292 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
293 {
294     if (inKey.size() < expectPrefix.size()) {
295         return false;
296     }
297     std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
298     if (prefixInKey != expectPrefix) {
299         return false;
300     }
301     return true;
302 }
303 }
304 
LoadAllMetadata()305 int Metadata::LoadAllMetadata()
306 {
307     std::vector<std::vector<uint8_t>> metaDataKeys;
308     int errCode = GetAllMetadataKey(metaDataKeys);
309     if (errCode != E_OK) {
310         LOGE("[Metadata] get all metadata key failed err=%d", errCode);
311         return errCode;
312     }
313 
314     std::vector<std::vector<uint8_t>> querySyncIds;
315     for (const auto &deviceId : metaDataKeys) {
316         if (IsMetaDataKey(deviceId, DEVICEID_PREFIX_KEY)) {
317             errCode = LoadDeviceIdDataToMap(deviceId);
318             if (errCode != E_OK) {
319                 return errCode;
320             }
321         } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
322             querySyncIds.push_back(deviceId);
323         } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
324             errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
325             if (errCode != E_OK) {
326                 return errCode;
327             }
328         }
329     }
330     return querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
331 }
332 
LoadDeviceIdDataToMap(const Key & key)333 int Metadata::LoadDeviceIdDataToMap(const Key &key)
334 {
335     std::vector<uint8_t> value;
336     int errCode = GetMetadataFromDb(key, value);
337     if (errCode != E_OK) {
338         return errCode;
339     }
340     MetaDataValue metaValue;
341     std::string metaKey(key.begin(), key.end());
342     errCode = DeSerializeMetaData(value, metaValue);
343     if (errCode != E_OK) {
344         return errCode;
345     }
346     std::lock_guard<std::mutex> lockGuard(metadataLock_);
347     PutMetadataToMap(metaKey, metaValue);
348     return errCode;
349 }
350 
GetRandTimeOffset() const351 uint64_t Metadata::GetRandTimeOffset() const
352 {
353     const int randOffsetLength = 2; // 2 byte
354     uint8_t randBytes[randOffsetLength] = { 0 };
355     RAND_bytes(randBytes, randOffsetLength);
356 
357     // use a 16 bit rand data to make a rand timeoffset
358     uint64_t randTimeOffset = (static_cast<uint16_t>(randBytes[1]) << 8) | randBytes[0]; // 16 bit data, 8 is offset
359     randTimeOffset = randTimeOffset * 1000 * 1000 * 10; // second, 1000 is scale
360     LOGD("[Metadata] GetRandTimeOffset %" PRIu64, randTimeOffset);
361     return randTimeOffset;
362 }
363 
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)364 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
365 {
366     if (!isNeedHash) {
367         hashDeviceId = deviceId;
368         return;
369     }
370     if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
371         hashDeviceId = DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
372         deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
373     } else {
374         hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
375     }
376 }
377 
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)378 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
379     const std::string &deviceId, WaterMark &waterMark)
380 {
381     QueryWaterMark queryWaterMark;
382     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
383     if (errCode != E_OK) {
384         return errCode;
385     }
386     WaterMark peerWaterMark;
387     GetPeerWaterMark(deviceId, peerWaterMark);
388     waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
389     return E_OK;
390 }
391 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)392 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
393     const std::string &deviceId, const WaterMark &waterMark)
394 {
395     return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
396 }
397 
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)398 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
399     const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
400 {
401     QueryWaterMark queryWaterMark;
402     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
403     if (errCode != E_OK) {
404         return errCode;
405     }
406     if (isAutoLift) {
407         WaterMark localWaterMark;
408         GetLocalWaterMark(deviceId, localWaterMark);
409         waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
410     } else {
411         waterMark = queryWaterMark.sendWaterMark;
412     }
413     return E_OK;
414 }
415 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)416 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
417     const std::string &deviceId, const WaterMark &waterMark)
418 {
419     return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
420 }
421 
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)422 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp)
423 {
424     QueryWaterMark queryWaterMark;
425     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
426     if (errCode != E_OK) {
427         return errCode;
428     }
429     timestamp = queryWaterMark.lastQueryTime;
430     return E_OK;
431 }
432 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)433 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
434     const Timestamp &timestamp)
435 {
436     return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
437 }
438 
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)439 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
440 {
441     DeleteWaterMark deleteWaterMark;
442     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
443     if (errCode != E_OK) {
444         return errCode;
445     }
446     if (isAutoLift) {
447         WaterMark localWaterMark;
448         GetLocalWaterMark(deviceId, localWaterMark);
449         waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
450     } else {
451         waterMark = deleteWaterMark.sendWaterMark;
452     }
453     return E_OK;
454 }
455 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)456 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
457 {
458     return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
459 }
460 
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)461 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
462 {
463     DeleteWaterMark deleteWaterMark;
464     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
465     if (errCode != E_OK) {
466         return errCode;
467     }
468     WaterMark peerWaterMark;
469     GetPeerWaterMark(deviceId, peerWaterMark);
470     waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
471     return E_OK;
472 }
473 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)474 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
475 {
476     return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark);
477 }
478 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName)479 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName)
480 {
481     return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName);
482 }
483 
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)484 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
485 {
486     MetaDataValue metadata;
487     std::lock_guard<std::mutex> lockGuard(metadataLock_);
488     DeviceID hashDeviceId;
489     GetHashDeviceId(deviceId, hashDeviceId, true);
490     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
491         metadata = metadataMap_[hashDeviceId];
492         outValue = metadata.dbCreateTime;
493         return;
494     }
495     outValue = 0;
496     LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
497 }
498 
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)499 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
500 {
501     MetaDataValue metadata;
502     std::lock_guard<std::mutex> lockGuard(metadataLock_);
503     DeviceID hashDeviceId;
504     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
505     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
506         metadata = metadataMap_[hashDeviceId];
507         if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
508             metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
509             LOGI("Metadata::SetDbCreateTime,set cleardata mark,dev=%s,dbCreateTime=%" PRIu64,
510                 STR_MASK(deviceId), inValue);
511         }
512         if (metadata.dbCreateTime == 0) {
513             LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
514         }
515     }
516     metadata.dbCreateTime = inValue;
517     return SaveMetaDataValue(deviceId, metadata);
518 }
519 
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)520 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
521 {
522     MetaDataValue metadata;
523     std::lock_guard<std::mutex> lockGuard(metadataLock_);
524     DeviceID hashDeviceId;
525     GetHashDeviceId(deviceId, hashDeviceId, true);
526     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
527         metadata = metadataMap_[hashDeviceId];
528         metadata.clearDeviceDataMark = 0;
529         return SaveMetaDataValue(deviceId, metadata);
530     }
531     return -E_NOT_FOUND;
532 }
533 
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)534 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
535 {
536     MetaDataValue metadata;
537     std::lock_guard<std::mutex> lockGuard(metadataLock_);
538     DeviceID hashDeviceId;
539     GetHashDeviceId(deviceId, hashDeviceId, true);
540     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
541         metadata = metadataMap_[hashDeviceId];
542         outValue = metadata.clearDeviceDataMark;
543         return;
544     }
545     outValue = 0;
546 }
547 
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const548 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
549 {
550     std::vector<uint8_t> key;
551     std::vector<uint8_t> value;
552     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
553     DBCommon::StringToVector(hashqueryId, key);
554     int errCode = GetMetadataFromDb(key, value);
555     std::lock_guard<std::mutex> lockGuard(metadataLock_);
556     if (errCode == -E_NOT_FOUND) {
557         auto iter = queryIdMap_.find(deviceId);
558         if (iter != queryIdMap_.end()) {
559             if (iter->second.find(hashqueryId) == iter->second.end()) {
560                 iter->second.insert(hashqueryId);
561                 return INT64_MAX;
562             }
563             return 0;
564         } else {
565             queryIdMap_[deviceId] = { hashqueryId };
566             return INT64_MAX;
567         }
568     }
569     auto iter = queryIdMap_.find(deviceId);
570     // while value is found in db, it can be found in db later when db is not closed
571     // so no need to record the hashqueryId in map
572     if (errCode == E_OK && iter != queryIdMap_.end()) {
573         iter->second.erase(hashqueryId);
574     }
575     return StringToLong(value);
576 }
577 
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)578 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
579 {
580     std::lock_guard<std::mutex> lockGuard(metadataLock_);
581     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
582     auto iter = queryIdMap_.find(deviceId);
583     if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
584         iter->second.erase(hashqueryId);
585     }
586 }
587 }  // namespace DistributedDB