1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "meta_data.h"
17
18 #include <openssl/rand.h>
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_errno.h"
22 #include "hash.h"
23 #include "log_print.h"
24 #include "securec.h"
25 #include "sync_types.h"
26 #include "time_helper.h"
27
28 namespace DistributedDB {
29 namespace {
30 const int STR_TO_LL_BY_DEVALUE = 10;
31 // store local timeoffset;this is a special key;
32 const std::string LOCALTIME_OFFSET_KEY = "localTimeOffset";
33 const std::string DEVICEID_PREFIX_KEY = "deviceId";
34 }
35
Metadata()36 Metadata::Metadata()
37 : localTimeOffset_(0),
38 naturalStoragePtr_(nullptr),
39 lastLocalTime_(0)
40 {}
41
~Metadata()42 Metadata::~Metadata()
43 {
44 naturalStoragePtr_ = nullptr;
45 metadataMap_.clear();
46 }
47
Initialize(ISyncInterface * storage)48 int Metadata::Initialize(ISyncInterface* storage)
49 {
50 naturalStoragePtr_ = storage;
51 std::vector<uint8_t> key;
52 std::vector<uint8_t> timeOffset;
53 DBCommon::StringToVector(LOCALTIME_OFFSET_KEY, key);
54
55 int errCode = GetMetadataFromDb(key, timeOffset);
56 if (errCode == -E_NOT_FOUND) {
57 int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
58 if (err != E_OK) {
59 LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
60 return err;
61 }
62 } else if (errCode == E_OK) {
63 localTimeOffset_ = StringToLong(timeOffset);
64 } else {
65 LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
66 return errCode;
67 }
68 {
69 std::lock_guard<std::mutex> lockGuard(metadataLock_);
70 metadataMap_.clear();
71 }
72 (void)querySyncWaterMarkHelper_.Initialize(storage);
73 return LoadAllMetadata();
74 }
75
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)76 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
77 {
78 MetaDataValue metadata;
79 std::lock_guard<std::mutex> lockGuard(metadataLock_);
80 GetMetaDataValue(deviceId, metadata, true);
81 metadata.timeOffset = inValue;
82 metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
83 LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
84 return SaveMetaDataValue(deviceId, metadata);
85 }
86
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)87 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
88 {
89 MetaDataValue metadata;
90 std::lock_guard<std::mutex> lockGuard(metadataLock_);
91 GetMetaDataValue(deviceId, metadata, true);
92 outValue = metadata.timeOffset;
93 }
94
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)95 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
96 {
97 MetaDataValue metadata;
98 std::lock_guard<std::mutex> lockGuard(metadataLock_);
99 GetMetaDataValue(deviceId, metadata, true);
100 outValue = metadata.localWaterMark;
101 }
102
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)103 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
104 {
105 MetaDataValue metadata;
106 std::lock_guard<std::mutex> lockGuard(metadataLock_);
107 GetMetaDataValue(deviceId, metadata, true);
108 metadata.localWaterMark = inValue;
109 LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
110 return SaveMetaDataValue(deviceId, metadata);
111 }
112
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue)113 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue)
114 {
115 MetaDataValue metadata;
116 std::lock_guard<std::mutex> lockGuard(metadataLock_);
117 GetMetaDataValue(deviceId, metadata, true);
118 outValue = metadata.peerWaterMark;
119 }
120
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)121 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
122 {
123 MetaDataValue metadata;
124 std::lock_guard<std::mutex> lockGuard(metadataLock_);
125 GetMetaDataValue(deviceId, metadata, isNeedHash);
126 metadata.peerWaterMark = inValue;
127 LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
128 return SaveMetaDataValue(deviceId, metadata);
129 }
130
SaveLocalTimeOffset(TimeOffset timeOffset)131 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
132 {
133 std::string timeOffsetString = std::to_string(timeOffset);
134 std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
135 std::vector<uint8_t> localTimeOffsetValue(
136 LOCALTIME_OFFSET_KEY.begin(), LOCALTIME_OFFSET_KEY.end());
137
138 std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
139 localTimeOffset_ = timeOffset;
140 LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
141 int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
142 if (errCode != E_OK) {
143 LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
144 }
145 return errCode;
146 }
147
GetLocalTimeOffset() const148 TimeOffset Metadata::GetLocalTimeOffset() const
149 {
150 TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
151 return localTimeOffset;
152 }
153
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)154 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
155 {
156 return EraseDeviceWaterMark(deviceId, isNeedHash, "");
157 }
158
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)159 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
160 {
161 // try to erase all the waterMark
162 // erase deleteSync recv waterMark
163 WaterMark waterMark = 0;
164 int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark);
165 // erase querySync recv waterMark
166 int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName);
167 // peerWaterMark must be erased at last
168 int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
169 if (errCode != E_OK) {
170 LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
171 return errCode;
172 }
173 if (errCodeQuerySync != E_OK) {
174 LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
175 return errCodeQuerySync;
176 }
177 if (errCodeDeleteSync != E_OK) {
178 LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
179 return errCodeDeleteSync;
180 }
181 return E_OK;
182 }
183
SetLastLocalTime(Timestamp lastLocalTime)184 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
185 {
186 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
187 if (lastLocalTime > lastLocalTime_) {
188 lastLocalTime_ = lastLocalTime;
189 }
190 }
191
GetLastLocalTime() const192 Timestamp Metadata::GetLastLocalTime() const
193 {
194 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
195 return lastLocalTime_;
196 }
197
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue)198 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue)
199 {
200 std::vector<uint8_t> value;
201 int errCode = SerializeMetaData(inValue, value);
202 if (errCode != E_OK) {
203 return errCode;
204 }
205
206 DeviceID hashDeviceId;
207 GetHashDeviceId(deviceId, hashDeviceId, true);
208 std::vector<uint8_t> key;
209 DBCommon::StringToVector(hashDeviceId, key);
210 errCode = SetMetadataToDb(key, value);
211 if (errCode != E_OK) {
212 LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
213 return errCode;
214 }
215 PutMetadataToMap(hashDeviceId, inValue);
216 return E_OK;
217 }
218
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)219 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
220 {
221 DeviceID hashDeviceId;
222 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
223 GetMetadataFromMap(hashDeviceId, outValue);
224 }
225
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)226 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
227 {
228 outValue.resize(sizeof(MetaDataValue));
229 errno_t err = memcpy_s(&outValue[0], outValue.size(), &inValue, sizeof(MetaDataValue));
230 if (err != EOK) {
231 return -E_SECUREC_ERROR;
232 }
233 return E_OK;
234 }
235
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue) const236 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue) const
237 {
238 if (inValue.empty()) {
239 return -E_INVALID_ARGS;
240 }
241
242 errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), &inValue[0], inValue.size());
243 if (err != EOK) {
244 return -E_SECUREC_ERROR;
245 }
246 return E_OK;
247 }
248
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const249 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
250 {
251 if (naturalStoragePtr_ == nullptr) {
252 return -E_INVALID_DB;
253 }
254 return naturalStoragePtr_->GetMetaData(key, outValue);
255 }
256
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)257 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
258 {
259 if (naturalStoragePtr_ == nullptr) {
260 return -E_INVALID_DB;
261 }
262 return naturalStoragePtr_->PutMetaData(key, inValue);
263 }
264
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)265 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
266 {
267 metadataMap_[deviceId] = value;
268 }
269
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)270 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
271 {
272 outValue = metadataMap_[deviceId];
273 }
274
StringToLong(const std::vector<uint8_t> & value) const275 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
276 {
277 std::string valueString(value.begin(), value.end());
278 int64_t longData = std::strtoll(valueString.c_str(), nullptr, STR_TO_LL_BY_DEVALUE);
279 LOGD("Metadata::StringToLong longData = %" PRId64, longData);
280 return longData;
281 }
282
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)283 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
284 {
285 if (naturalStoragePtr_ == nullptr) {
286 return -E_INVALID_DB;
287 }
288 return naturalStoragePtr_->GetAllMetaKeys(keys);
289 }
290
291 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)292 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
293 {
294 if (inKey.size() < expectPrefix.size()) {
295 return false;
296 }
297 std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
298 if (prefixInKey != expectPrefix) {
299 return false;
300 }
301 return true;
302 }
303 }
304
LoadAllMetadata()305 int Metadata::LoadAllMetadata()
306 {
307 std::vector<std::vector<uint8_t>> metaDataKeys;
308 int errCode = GetAllMetadataKey(metaDataKeys);
309 if (errCode != E_OK) {
310 LOGE("[Metadata] get all metadata key failed err=%d", errCode);
311 return errCode;
312 }
313
314 std::vector<std::vector<uint8_t>> querySyncIds;
315 for (const auto &deviceId : metaDataKeys) {
316 if (IsMetaDataKey(deviceId, DEVICEID_PREFIX_KEY)) {
317 errCode = LoadDeviceIdDataToMap(deviceId);
318 if (errCode != E_OK) {
319 return errCode;
320 }
321 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
322 querySyncIds.push_back(deviceId);
323 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
324 errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
325 if (errCode != E_OK) {
326 return errCode;
327 }
328 }
329 }
330 return querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
331 }
332
LoadDeviceIdDataToMap(const Key & key)333 int Metadata::LoadDeviceIdDataToMap(const Key &key)
334 {
335 std::vector<uint8_t> value;
336 int errCode = GetMetadataFromDb(key, value);
337 if (errCode != E_OK) {
338 return errCode;
339 }
340 MetaDataValue metaValue;
341 std::string metaKey(key.begin(), key.end());
342 errCode = DeSerializeMetaData(value, metaValue);
343 if (errCode != E_OK) {
344 return errCode;
345 }
346 std::lock_guard<std::mutex> lockGuard(metadataLock_);
347 PutMetadataToMap(metaKey, metaValue);
348 return errCode;
349 }
350
GetRandTimeOffset() const351 uint64_t Metadata::GetRandTimeOffset() const
352 {
353 const int randOffsetLength = 2; // 2 byte
354 uint8_t randBytes[randOffsetLength] = { 0 };
355 RAND_bytes(randBytes, randOffsetLength);
356
357 // use a 16 bit rand data to make a rand timeoffset
358 uint64_t randTimeOffset = (static_cast<uint16_t>(randBytes[1]) << 8) | randBytes[0]; // 16 bit data, 8 is offset
359 randTimeOffset = randTimeOffset * 1000 * 1000 * 10; // second, 1000 is scale
360 LOGD("[Metadata] GetRandTimeOffset %" PRIu64, randTimeOffset);
361 return randTimeOffset;
362 }
363
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)364 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
365 {
366 if (!isNeedHash) {
367 hashDeviceId = deviceId;
368 return;
369 }
370 if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
371 hashDeviceId = DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
372 deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
373 } else {
374 hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
375 }
376 }
377
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)378 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
379 const std::string &deviceId, WaterMark &waterMark)
380 {
381 QueryWaterMark queryWaterMark;
382 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
383 if (errCode != E_OK) {
384 return errCode;
385 }
386 WaterMark peerWaterMark;
387 GetPeerWaterMark(deviceId, peerWaterMark);
388 waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
389 return E_OK;
390 }
391
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)392 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
393 const std::string &deviceId, const WaterMark &waterMark)
394 {
395 return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
396 }
397
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)398 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
399 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
400 {
401 QueryWaterMark queryWaterMark;
402 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
403 if (errCode != E_OK) {
404 return errCode;
405 }
406 if (isAutoLift) {
407 WaterMark localWaterMark;
408 GetLocalWaterMark(deviceId, localWaterMark);
409 waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
410 } else {
411 waterMark = queryWaterMark.sendWaterMark;
412 }
413 return E_OK;
414 }
415
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)416 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
417 const std::string &deviceId, const WaterMark &waterMark)
418 {
419 return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
420 }
421
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)422 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp)
423 {
424 QueryWaterMark queryWaterMark;
425 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
426 if (errCode != E_OK) {
427 return errCode;
428 }
429 timestamp = queryWaterMark.lastQueryTime;
430 return E_OK;
431 }
432
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)433 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
434 const Timestamp ×tamp)
435 {
436 return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
437 }
438
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)439 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
440 {
441 DeleteWaterMark deleteWaterMark;
442 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
443 if (errCode != E_OK) {
444 return errCode;
445 }
446 if (isAutoLift) {
447 WaterMark localWaterMark;
448 GetLocalWaterMark(deviceId, localWaterMark);
449 waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
450 } else {
451 waterMark = deleteWaterMark.sendWaterMark;
452 }
453 return E_OK;
454 }
455
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)456 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
457 {
458 return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
459 }
460
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)461 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
462 {
463 DeleteWaterMark deleteWaterMark;
464 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
465 if (errCode != E_OK) {
466 return errCode;
467 }
468 WaterMark peerWaterMark;
469 GetPeerWaterMark(deviceId, peerWaterMark);
470 waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
471 return E_OK;
472 }
473
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)474 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
475 {
476 return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark);
477 }
478
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName)479 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName)
480 {
481 return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName);
482 }
483
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)484 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
485 {
486 MetaDataValue metadata;
487 std::lock_guard<std::mutex> lockGuard(metadataLock_);
488 DeviceID hashDeviceId;
489 GetHashDeviceId(deviceId, hashDeviceId, true);
490 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
491 metadata = metadataMap_[hashDeviceId];
492 outValue = metadata.dbCreateTime;
493 return;
494 }
495 outValue = 0;
496 LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
497 }
498
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)499 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
500 {
501 MetaDataValue metadata;
502 std::lock_guard<std::mutex> lockGuard(metadataLock_);
503 DeviceID hashDeviceId;
504 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
505 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
506 metadata = metadataMap_[hashDeviceId];
507 if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
508 metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
509 LOGI("Metadata::SetDbCreateTime,set cleardata mark,dev=%s,dbCreateTime=%" PRIu64,
510 STR_MASK(deviceId), inValue);
511 }
512 if (metadata.dbCreateTime == 0) {
513 LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
514 }
515 }
516 metadata.dbCreateTime = inValue;
517 return SaveMetaDataValue(deviceId, metadata);
518 }
519
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)520 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
521 {
522 MetaDataValue metadata;
523 std::lock_guard<std::mutex> lockGuard(metadataLock_);
524 DeviceID hashDeviceId;
525 GetHashDeviceId(deviceId, hashDeviceId, true);
526 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
527 metadata = metadataMap_[hashDeviceId];
528 metadata.clearDeviceDataMark = 0;
529 return SaveMetaDataValue(deviceId, metadata);
530 }
531 return -E_NOT_FOUND;
532 }
533
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)534 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
535 {
536 MetaDataValue metadata;
537 std::lock_guard<std::mutex> lockGuard(metadataLock_);
538 DeviceID hashDeviceId;
539 GetHashDeviceId(deviceId, hashDeviceId, true);
540 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
541 metadata = metadataMap_[hashDeviceId];
542 outValue = metadata.clearDeviceDataMark;
543 return;
544 }
545 outValue = 0;
546 }
547
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const548 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
549 {
550 std::vector<uint8_t> key;
551 std::vector<uint8_t> value;
552 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
553 DBCommon::StringToVector(hashqueryId, key);
554 int errCode = GetMetadataFromDb(key, value);
555 std::lock_guard<std::mutex> lockGuard(metadataLock_);
556 if (errCode == -E_NOT_FOUND) {
557 auto iter = queryIdMap_.find(deviceId);
558 if (iter != queryIdMap_.end()) {
559 if (iter->second.find(hashqueryId) == iter->second.end()) {
560 iter->second.insert(hashqueryId);
561 return INT64_MAX;
562 }
563 return 0;
564 } else {
565 queryIdMap_[deviceId] = { hashqueryId };
566 return INT64_MAX;
567 }
568 }
569 auto iter = queryIdMap_.find(deviceId);
570 // while value is found in db, it can be found in db later when db is not closed
571 // so no need to record the hashqueryId in map
572 if (errCode == E_OK && iter != queryIdMap_.end()) {
573 iter->second.erase(hashqueryId);
574 }
575 return StringToLong(value);
576 }
577
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)578 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
579 {
580 std::lock_guard<std::mutex> lockGuard(metadataLock_);
581 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
582 auto iter = queryIdMap_.find(deviceId);
583 if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
584 iter->second.erase(hashqueryId);
585 }
586 }
587 } // namespace DistributedDB