1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "meta_data.h"
17
18 #include <openssl/rand.h>
19
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "securec.h"
26 #include "sync_types.h"
27 #include "time_helper.h"
28
29 namespace DistributedDB {
30 namespace {
31 const int STR_TO_LL_BY_DEVALUE = 10;
32 // store local timeoffset;this is a special key;
33 const std::string LOCALTIME_OFFSET_KEY = "localTimeOffset";
34 const char *CLIENT_ID_PREFIX_KEY = "clientId";
35 }
36
Metadata()37 Metadata::Metadata()
38 : localTimeOffset_(0),
39 naturalStoragePtr_(nullptr),
40 lastLocalTime_(0)
41 {}
42
~Metadata()43 Metadata::~Metadata()
44 {
45 naturalStoragePtr_ = nullptr;
46 metadataMap_.clear();
47 }
48
Initialize(ISyncInterface * storage)49 int Metadata::Initialize(ISyncInterface* storage)
50 {
51 naturalStoragePtr_ = storage;
52 std::vector<uint8_t> key;
53 std::vector<uint8_t> timeOffset;
54 DBCommon::StringToVector(LOCALTIME_OFFSET_KEY, key);
55
56 int errCode = GetMetadataFromDb(key, timeOffset);
57 if (errCode == -E_NOT_FOUND) {
58 int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
59 if (err != E_OK) {
60 LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
61 return err;
62 }
63 } else if (errCode == E_OK) {
64 localTimeOffset_ = StringToLong(timeOffset);
65 } else {
66 LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
67 return errCode;
68 }
69 {
70 std::lock_guard<std::mutex> lockGuard(metadataLock_);
71 metadataMap_.clear();
72 }
73 (void)querySyncWaterMarkHelper_.Initialize(storage);
74 return LoadAllMetadata();
75 }
76
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)77 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
78 {
79 MetaDataValue metadata;
80 std::lock_guard<std::mutex> lockGuard(metadataLock_);
81 GetMetaDataValue(deviceId, metadata, true);
82 metadata.timeOffset = inValue;
83 metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
84 LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
85 return SaveMetaDataValue(deviceId, metadata);
86 }
87
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)88 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
89 {
90 MetaDataValue metadata;
91 std::lock_guard<std::mutex> lockGuard(metadataLock_);
92 GetMetaDataValue(deviceId, metadata, true);
93 outValue = metadata.timeOffset;
94 }
95
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)96 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
97 {
98 MetaDataValue metadata;
99 std::lock_guard<std::mutex> lockGuard(metadataLock_);
100 GetMetaDataValue(deviceId, metadata, true);
101 outValue = metadata.localWaterMark;
102 }
103
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)104 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
105 {
106 MetaDataValue metadata;
107 std::lock_guard<std::mutex> lockGuard(metadataLock_);
108 GetMetaDataValue(deviceId, metadata, true);
109 metadata.localWaterMark = inValue;
110 LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
111 return SaveMetaDataValue(deviceId, metadata);
112 }
113
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue)114 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue)
115 {
116 MetaDataValue metadata;
117 std::lock_guard<std::mutex> lockGuard(metadataLock_);
118 GetMetaDataValue(deviceId, metadata, true);
119 outValue = metadata.peerWaterMark;
120 }
121
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)122 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
123 {
124 MetaDataValue metadata;
125 std::lock_guard<std::mutex> lockGuard(metadataLock_);
126 GetMetaDataValue(deviceId, metadata, isNeedHash);
127 metadata.peerWaterMark = inValue;
128 LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
129 return SaveMetaDataValue(deviceId, metadata, isNeedHash);
130 }
131
SaveLocalTimeOffset(TimeOffset timeOffset)132 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
133 {
134 std::string timeOffsetString = std::to_string(timeOffset);
135 std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
136 std::vector<uint8_t> localTimeOffsetValue(
137 LOCALTIME_OFFSET_KEY.begin(), LOCALTIME_OFFSET_KEY.end());
138
139 std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
140 localTimeOffset_ = timeOffset;
141 LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
142 int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
143 if (errCode != E_OK) {
144 LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
145 }
146 return errCode;
147 }
148
GetLocalTimeOffset() const149 TimeOffset Metadata::GetLocalTimeOffset() const
150 {
151 TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
152 return localTimeOffset;
153 }
154
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)155 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
156 {
157 return EraseDeviceWaterMark(deviceId, isNeedHash, "");
158 }
159
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)160 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
161 {
162 std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_);
163 // try to erase all the waterMark
164 // erase deleteSync recv waterMark
165 WaterMark waterMark = 0;
166 int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
167 if (errCodeDeleteSync != E_OK) {
168 LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
169 return errCodeDeleteSync;
170 }
171 // erase querySync recv waterMark
172 int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
173 if (errCodeQuerySync != E_OK) {
174 LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
175 return errCodeQuerySync;
176 }
177 // peerWaterMark must be erased at last
178 int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
179 if (errCode != E_OK) {
180 LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
181 return errCode;
182 }
183 return E_OK;
184 }
185
SetLastLocalTime(Timestamp lastLocalTime)186 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
187 {
188 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
189 if (lastLocalTime > lastLocalTime_) {
190 lastLocalTime_ = lastLocalTime;
191 }
192 }
193
GetLastLocalTime() const194 Timestamp Metadata::GetLastLocalTime() const
195 {
196 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
197 return lastLocalTime_;
198 }
199
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)200 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
201 {
202 std::vector<uint8_t> value;
203 int errCode = SerializeMetaData(inValue, value);
204 if (errCode != E_OK) {
205 return errCode;
206 }
207
208 DeviceID hashDeviceId;
209 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
210 std::vector<uint8_t> key;
211 DBCommon::StringToVector(hashDeviceId, key);
212 errCode = SetMetadataToDb(key, value);
213 if (errCode != E_OK) {
214 LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
215 return errCode;
216 }
217 PutMetadataToMap(hashDeviceId, inValue);
218 return E_OK;
219 }
220
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)221 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
222 {
223 DeviceID hashDeviceId;
224 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
225 GetMetadataFromMap(hashDeviceId, outValue);
226 }
227
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)228 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
229 {
230 outValue.resize(sizeof(MetaDataValue));
231 errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue));
232 if (err != EOK) {
233 return -E_SECUREC_ERROR;
234 }
235 return E_OK;
236 }
237
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)238 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
239 {
240 if (inValue.empty()) {
241 return -E_INVALID_ARGS;
242 }
243 errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size());
244 if (err != EOK) {
245 return -E_SECUREC_ERROR;
246 }
247 return E_OK;
248 }
249
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const250 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
251 {
252 if (naturalStoragePtr_ == nullptr) {
253 return -E_INVALID_DB;
254 }
255 return naturalStoragePtr_->GetMetaData(key, outValue);
256 }
257
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)258 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
259 {
260 if (naturalStoragePtr_ == nullptr) {
261 return -E_INVALID_DB;
262 }
263 return naturalStoragePtr_->PutMetaData(key, inValue);
264 }
265
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)266 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
267 {
268 metadataMap_[deviceId] = value;
269 }
270
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)271 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
272 {
273 outValue = metadataMap_[deviceId];
274 }
275
StringToLong(const std::vector<uint8_t> & value) const276 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
277 {
278 std::string valueString(value.begin(), value.end());
279 int64_t longData = std::strtoll(valueString.c_str(), nullptr, STR_TO_LL_BY_DEVALUE);
280 LOGD("Metadata::StringToLong longData = %" PRId64, longData);
281 return longData;
282 }
283
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)284 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
285 {
286 if (naturalStoragePtr_ == nullptr) {
287 return -E_INVALID_DB;
288 }
289 return naturalStoragePtr_->GetAllMetaKeys(keys);
290 }
291
292 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)293 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
294 {
295 if (inKey.size() < expectPrefix.size()) {
296 return false;
297 }
298 std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
299 if (prefixInKey != expectPrefix) {
300 return false;
301 }
302 return true;
303 }
304 }
305
LoadAllMetadata()306 int Metadata::LoadAllMetadata()
307 {
308 std::vector<std::vector<uint8_t>> metaDataKeys;
309 int errCode = GetAllMetadataKey(metaDataKeys);
310 if (errCode != E_OK) {
311 LOGE("[Metadata] get all metadata key failed err=%d", errCode);
312 return errCode;
313 }
314
315 std::vector<std::vector<uint8_t>> querySyncIds;
316 for (const auto &deviceId : metaDataKeys) {
317 if (IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) {
318 errCode = LoadDeviceIdDataToMap(deviceId);
319 if (errCode != E_OK) {
320 return errCode;
321 }
322 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
323 querySyncIds.push_back(deviceId);
324 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
325 errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
326 if (errCode != E_OK) {
327 return errCode;
328 }
329 }
330 }
331 return querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
332 }
333
LoadDeviceIdDataToMap(const Key & key)334 int Metadata::LoadDeviceIdDataToMap(const Key &key)
335 {
336 std::vector<uint8_t> value;
337 int errCode = GetMetadataFromDb(key, value);
338 if (errCode != E_OK) {
339 return errCode;
340 }
341 MetaDataValue metaValue;
342 std::string metaKey(key.begin(), key.end());
343 errCode = DeSerializeMetaData(value, metaValue);
344 if (errCode != E_OK) {
345 return errCode;
346 }
347 std::lock_guard<std::mutex> lockGuard(metadataLock_);
348 PutMetadataToMap(metaKey, metaValue);
349 return errCode;
350 }
351
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)352 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
353 {
354 if (!isNeedHash) {
355 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId;
356 return;
357 }
358 if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
359 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
360 deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
361 } else {
362 hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
363 }
364 }
365
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)366 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
367 const std::string &deviceId, WaterMark &waterMark)
368 {
369 QueryWaterMark queryWaterMark;
370 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
371 if (errCode != E_OK) {
372 return errCode;
373 }
374 WaterMark peerWaterMark;
375 GetPeerWaterMark(deviceId, peerWaterMark);
376 waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
377 return E_OK;
378 }
379
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)380 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
381 const std::string &deviceId, const WaterMark &waterMark)
382 {
383 return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
384 }
385
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)386 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
387 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
388 {
389 QueryWaterMark queryWaterMark;
390 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
391 if (errCode != E_OK) {
392 return errCode;
393 }
394 if (isAutoLift) {
395 WaterMark localWaterMark;
396 GetLocalWaterMark(deviceId, localWaterMark);
397 waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
398 } else {
399 waterMark = queryWaterMark.sendWaterMark;
400 }
401 return E_OK;
402 }
403
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)404 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
405 const std::string &deviceId, const WaterMark &waterMark)
406 {
407 return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
408 }
409
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)410 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp)
411 {
412 QueryWaterMark queryWaterMark;
413 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
414 if (errCode != E_OK) {
415 return errCode;
416 }
417 timestamp = queryWaterMark.lastQueryTime;
418 return E_OK;
419 }
420
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)421 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
422 const Timestamp ×tamp)
423 {
424 return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
425 }
426
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)427 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
428 {
429 DeleteWaterMark deleteWaterMark;
430 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
431 if (errCode != E_OK) {
432 return errCode;
433 }
434 if (isAutoLift) {
435 WaterMark localWaterMark;
436 GetLocalWaterMark(deviceId, localWaterMark);
437 waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
438 } else {
439 waterMark = deleteWaterMark.sendWaterMark;
440 }
441 return E_OK;
442 }
443
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)444 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
445 {
446 return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
447 }
448
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)449 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
450 {
451 DeleteWaterMark deleteWaterMark;
452 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
453 if (errCode != E_OK) {
454 return errCode;
455 }
456 WaterMark peerWaterMark;
457 GetPeerWaterMark(deviceId, peerWaterMark);
458 waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
459 return E_OK;
460 }
461
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)462 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
463 {
464 return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
465 }
466
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)467 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
468 {
469 return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
470 }
471
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)472 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
473 {
474 MetaDataValue metadata;
475 std::lock_guard<std::mutex> lockGuard(metadataLock_);
476 DeviceID hashDeviceId;
477 GetHashDeviceId(deviceId, hashDeviceId, true);
478 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
479 metadata = metadataMap_[hashDeviceId];
480 outValue = metadata.dbCreateTime;
481 return;
482 }
483 outValue = 0;
484 LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
485 }
486
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)487 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
488 {
489 MetaDataValue metadata;
490 std::lock_guard<std::mutex> lockGuard(metadataLock_);
491 DeviceID hashDeviceId;
492 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
493 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
494 metadata = metadataMap_[hashDeviceId];
495 if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
496 metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
497 LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64,
498 STR_MASK(deviceId), inValue);
499 }
500 if (metadata.dbCreateTime == 0) {
501 LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
502 }
503 }
504 metadata.dbCreateTime = inValue;
505 return SaveMetaDataValue(deviceId, metadata);
506 }
507
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)508 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
509 {
510 MetaDataValue metadata;
511 std::lock_guard<std::mutex> lockGuard(metadataLock_);
512 DeviceID hashDeviceId;
513 GetHashDeviceId(deviceId, hashDeviceId, true);
514 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
515 metadata = metadataMap_[hashDeviceId];
516 metadata.clearDeviceDataMark = 0; // clear mark
517 return SaveMetaDataValue(deviceId, metadata);
518 }
519 return -E_NOT_FOUND;
520 }
521
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)522 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
523 {
524 MetaDataValue metadata;
525 std::lock_guard<std::mutex> lockGuard(metadataLock_);
526 DeviceID hashDeviceId;
527 GetHashDeviceId(deviceId, hashDeviceId, true);
528 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
529 metadata = metadataMap_[hashDeviceId];
530 outValue = metadata.clearDeviceDataMark;
531 return;
532 }
533 outValue = 0;
534 }
535
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const536 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
537 {
538 std::vector<uint8_t> key;
539 std::vector<uint8_t> value;
540 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
541 DBCommon::StringToVector(hashqueryId, key);
542 int errCode = GetMetadataFromDb(key, value);
543 std::lock_guard<std::mutex> lockGuard(metadataLock_);
544 if (errCode == -E_NOT_FOUND) {
545 auto iter = queryIdMap_.find(deviceId);
546 if (iter != queryIdMap_.end()) {
547 if (iter->second.find(hashqueryId) == iter->second.end()) {
548 iter->second.insert(hashqueryId);
549 return INT64_MAX;
550 }
551 return 0;
552 } else {
553 queryIdMap_[deviceId] = { hashqueryId };
554 return INT64_MAX;
555 }
556 }
557 auto iter = queryIdMap_.find(deviceId);
558 // while value is found in db, it can be found in db later when db is not closed
559 // so no need to record the hashqueryId in map
560 if (errCode == E_OK && iter != queryIdMap_.end()) {
561 iter->second.erase(hashqueryId);
562 }
563 return StringToLong(value);
564 }
565
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)566 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
567 {
568 std::lock_guard<std::mutex> lockGuard(metadataLock_);
569 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
570 auto iter = queryIdMap_.find(deviceId);
571 if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
572 iter->second.erase(hashqueryId);
573 }
574 }
575
SaveClientId(const std::string & deviceId,const std::string & clientId)576 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
577 {
578 {
579 // already save in cache
580 std::lock_guard<std::mutex> autoLock(clientIdLock_);
581 if (clientIdCache_[deviceId] == clientId) {
582 return E_OK;
583 }
584 }
585 std::string keyStr;
586 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
587 std::string valueStr = DBCommon::TransferHashString(deviceId);
588 Key key;
589 DBCommon::StringToVector(keyStr, key);
590 Value value;
591 DBCommon::StringToVector(valueStr, value);
592 int errCode = SetMetadataToDb(key, value);
593 if (errCode != E_OK) {
594 return errCode;
595 }
596 std::lock_guard<std::mutex> autoLock(clientIdLock_);
597 clientIdCache_[deviceId] = clientId;
598 return E_OK;
599 }
600
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const601 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
602 {
603 // don't use cache here avoid invalid cache
604 std::string keyStr;
605 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
606 Key key;
607 DBCommon::StringToVector(keyStr, key);
608 Value value;
609 int errCode = GetMetadataFromDb(key, value);
610 if (errCode == -E_NOT_FOUND) {
611 LOGD("[Metadata] not found clientId");
612 return -E_NOT_SUPPORT;
613 }
614 if (errCode != E_OK) {
615 LOGE("[Metadata] reload clientId failed %d", errCode);
616 return errCode;
617 }
618 DBCommon::VectorToString(value, hashDevId);
619 return E_OK;
620 }
621
LockWaterMark() const622 void Metadata::LockWaterMark() const
623 {
624 waterMarkMutex_.lock();
625 }
626
UnlockWaterMark() const627 void Metadata::UnlockWaterMark() const
628 {
629 waterMarkMutex_.unlock();
630 }
631
MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)632 Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)
633 : metadataPtr_(std::move(metadata))
634 {
635 if (metadataPtr_ != nullptr) {
636 metadataPtr_->LockWaterMark();
637 }
638 }
639
~MetaWaterMarkAutoLock()640 Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock()
641 {
642 if (metadataPtr_ != nullptr) {
643 metadataPtr_->UnlockWaterMark();
644 }
645 }
646 } // namespace DistributedDB