1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "meta_data.h"
17
18 #include <openssl/rand.h>
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_errno.h"
22 #include "hash.h"
23 #include "log_print.h"
24 #include "securec.h"
25 #include "sync_types.h"
26 #include "time_helper.h"
27
28 namespace DistributedDB {
29 namespace {
30 const int STR_TO_LL_BY_DEVALUE = 10;
31 // store local timeoffset;this is a special key;
32 const std::string LOCALTIME_OFFSET_KEY = "localTimeOffset";
33 const std::string DEVICEID_PREFIX_KEY = "deviceId";
34 const char *CLIENT_ID_PREFIX_KEY = "clientId";
35 }
36
Metadata()37 Metadata::Metadata()
38 : localTimeOffset_(0),
39 naturalStoragePtr_(nullptr),
40 lastLocalTime_(0)
41 {}
42
~Metadata()43 Metadata::~Metadata()
44 {
45 naturalStoragePtr_ = nullptr;
46 metadataMap_.clear();
47 }
48
Initialize(ISyncInterface * storage)49 int Metadata::Initialize(ISyncInterface* storage)
50 {
51 naturalStoragePtr_ = storage;
52 std::vector<uint8_t> key;
53 std::vector<uint8_t> timeOffset;
54 DBCommon::StringToVector(LOCALTIME_OFFSET_KEY, key);
55
56 int errCode = GetMetadataFromDb(key, timeOffset);
57 if (errCode == -E_NOT_FOUND) {
58 int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
59 if (err != E_OK) {
60 LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
61 return err;
62 }
63 } else if (errCode == E_OK) {
64 localTimeOffset_ = StringToLong(timeOffset);
65 } else {
66 LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
67 return errCode;
68 }
69 {
70 std::lock_guard<std::mutex> lockGuard(metadataLock_);
71 metadataMap_.clear();
72 }
73 (void)querySyncWaterMarkHelper_.Initialize(storage);
74 return LoadAllMetadata();
75 }
76
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)77 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
78 {
79 MetaDataValue metadata;
80 std::lock_guard<std::mutex> lockGuard(metadataLock_);
81 GetMetaDataValue(deviceId, metadata, true);
82 metadata.timeOffset = inValue;
83 metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
84 LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
85 return SaveMetaDataValue(deviceId, metadata);
86 }
87
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)88 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
89 {
90 MetaDataValue metadata;
91 std::lock_guard<std::mutex> lockGuard(metadataLock_);
92 GetMetaDataValue(deviceId, metadata, true);
93 outValue = metadata.timeOffset;
94 }
95
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)96 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
97 {
98 MetaDataValue metadata;
99 std::lock_guard<std::mutex> lockGuard(metadataLock_);
100 GetMetaDataValue(deviceId, metadata, true);
101 outValue = metadata.localWaterMark;
102 }
103
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)104 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
105 {
106 MetaDataValue metadata;
107 std::lock_guard<std::mutex> lockGuard(metadataLock_);
108 GetMetaDataValue(deviceId, metadata, true);
109 metadata.localWaterMark = inValue;
110 LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
111 return SaveMetaDataValue(deviceId, metadata);
112 }
113
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue)114 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue)
115 {
116 MetaDataValue metadata;
117 std::lock_guard<std::mutex> lockGuard(metadataLock_);
118 GetMetaDataValue(deviceId, metadata, true);
119 outValue = metadata.peerWaterMark;
120 }
121
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)122 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
123 {
124 MetaDataValue metadata;
125 std::lock_guard<std::mutex> lockGuard(metadataLock_);
126 GetMetaDataValue(deviceId, metadata, isNeedHash);
127 metadata.peerWaterMark = inValue;
128 LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
129 return SaveMetaDataValue(deviceId, metadata, isNeedHash);
130 }
131
SaveLocalTimeOffset(TimeOffset timeOffset)132 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
133 {
134 std::string timeOffsetString = std::to_string(timeOffset);
135 std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
136 std::vector<uint8_t> localTimeOffsetValue(
137 LOCALTIME_OFFSET_KEY.begin(), LOCALTIME_OFFSET_KEY.end());
138
139 std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
140 localTimeOffset_ = timeOffset;
141 LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
142 int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
143 if (errCode != E_OK) {
144 LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
145 }
146 return errCode;
147 }
148
GetLocalTimeOffset() const149 TimeOffset Metadata::GetLocalTimeOffset() const
150 {
151 TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
152 return localTimeOffset;
153 }
154
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)155 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
156 {
157 return EraseDeviceWaterMark(deviceId, isNeedHash, "");
158 }
159
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)160 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
161 {
162 // try to erase all the waterMark
163 // erase deleteSync recv waterMark
164 WaterMark waterMark = 0;
165 int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
166 // erase querySync recv waterMark
167 int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
168 // peerWaterMark must be erased at last
169 int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
170 if (errCode != E_OK) {
171 LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
172 return errCode;
173 }
174 if (errCodeQuerySync != E_OK) {
175 LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
176 return errCodeQuerySync;
177 }
178 if (errCodeDeleteSync != E_OK) {
179 LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
180 return errCodeDeleteSync;
181 }
182 return E_OK;
183 }
184
SetLastLocalTime(Timestamp lastLocalTime)185 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
186 {
187 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
188 if (lastLocalTime > lastLocalTime_) {
189 lastLocalTime_ = lastLocalTime;
190 }
191 }
192
GetLastLocalTime() const193 Timestamp Metadata::GetLastLocalTime() const
194 {
195 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
196 return lastLocalTime_;
197 }
198
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)199 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
200 {
201 std::vector<uint8_t> value;
202 int errCode = SerializeMetaData(inValue, value);
203 if (errCode != E_OK) {
204 return errCode;
205 }
206
207 DeviceID hashDeviceId;
208 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
209 std::vector<uint8_t> key;
210 DBCommon::StringToVector(hashDeviceId, key);
211 errCode = SetMetadataToDb(key, value);
212 if (errCode != E_OK) {
213 LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
214 return errCode;
215 }
216 PutMetadataToMap(hashDeviceId, inValue);
217 return E_OK;
218 }
219
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)220 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
221 {
222 DeviceID hashDeviceId;
223 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
224 GetMetadataFromMap(hashDeviceId, outValue);
225 }
226
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)227 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
228 {
229 outValue.resize(sizeof(MetaDataValue));
230 errno_t err = memcpy_s(&outValue[0], outValue.size(), &inValue, sizeof(MetaDataValue));
231 if (err != EOK) {
232 return -E_SECUREC_ERROR;
233 }
234 return E_OK;
235 }
236
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)237 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
238 {
239 if (inValue.empty()) {
240 return -E_INVALID_ARGS;
241 }
242
243 errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), &inValue[0], inValue.size());
244 if (err != EOK) {
245 return -E_SECUREC_ERROR;
246 }
247 return E_OK;
248 }
249
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const250 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
251 {
252 if (naturalStoragePtr_ == nullptr) {
253 return -E_INVALID_DB;
254 }
255 return naturalStoragePtr_->GetMetaData(key, outValue);
256 }
257
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)258 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
259 {
260 if (naturalStoragePtr_ == nullptr) {
261 return -E_INVALID_DB;
262 }
263 return naturalStoragePtr_->PutMetaData(key, inValue);
264 }
265
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)266 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
267 {
268 metadataMap_[deviceId] = value;
269 }
270
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)271 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
272 {
273 outValue = metadataMap_[deviceId];
274 }
275
StringToLong(const std::vector<uint8_t> & value) const276 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
277 {
278 std::string valueString(value.begin(), value.end());
279 int64_t longData = std::strtoll(valueString.c_str(), nullptr, STR_TO_LL_BY_DEVALUE);
280 LOGD("Metadata::StringToLong longData = %" PRId64, longData);
281 return longData;
282 }
283
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)284 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
285 {
286 if (naturalStoragePtr_ == nullptr) {
287 return -E_INVALID_DB;
288 }
289 return naturalStoragePtr_->GetAllMetaKeys(keys);
290 }
291
292 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)293 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
294 {
295 if (inKey.size() < expectPrefix.size()) {
296 return false;
297 }
298 std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
299 if (prefixInKey != expectPrefix) {
300 return false;
301 }
302 return true;
303 }
304 }
305
LoadAllMetadata()306 int Metadata::LoadAllMetadata()
307 {
308 std::vector<std::vector<uint8_t>> metaDataKeys;
309 int errCode = GetAllMetadataKey(metaDataKeys);
310 if (errCode != E_OK) {
311 LOGE("[Metadata] get all metadata key failed err=%d", errCode);
312 return errCode;
313 }
314
315 std::vector<std::vector<uint8_t>> querySyncIds;
316 for (const auto &deviceId : metaDataKeys) {
317 if (IsMetaDataKey(deviceId, DEVICEID_PREFIX_KEY)) {
318 errCode = LoadDeviceIdDataToMap(deviceId);
319 if (errCode != E_OK) {
320 return errCode;
321 }
322 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
323 querySyncIds.push_back(deviceId);
324 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
325 errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
326 if (errCode != E_OK) {
327 return errCode;
328 }
329 }
330 }
331 return querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
332 }
333
LoadDeviceIdDataToMap(const Key & key)334 int Metadata::LoadDeviceIdDataToMap(const Key &key)
335 {
336 std::vector<uint8_t> value;
337 int errCode = GetMetadataFromDb(key, value);
338 if (errCode != E_OK) {
339 return errCode;
340 }
341 MetaDataValue metaValue;
342 std::string metaKey(key.begin(), key.end());
343 errCode = DeSerializeMetaData(value, metaValue);
344 if (errCode != E_OK) {
345 return errCode;
346 }
347 std::lock_guard<std::mutex> lockGuard(metadataLock_);
348 PutMetadataToMap(metaKey, metaValue);
349 return errCode;
350 }
351
GetRandTimeOffset() const352 uint64_t Metadata::GetRandTimeOffset() const
353 {
354 const int randOffsetLength = 2; // 2 byte
355 uint8_t randBytes[randOffsetLength] = { 0 };
356 RAND_bytes(randBytes, randOffsetLength);
357
358 // use a 16 bit rand data to make a rand timeoffset
359 uint64_t randTimeOffset = (static_cast<uint16_t>(randBytes[1]) << 8) | randBytes[0]; // 16 bit data, 8 is offset
360 randTimeOffset = randTimeOffset * 1000 * 1000 * 10; // second, 1000 is scale
361 LOGD("[Metadata] GetRandTimeOffset %" PRIu64, randTimeOffset);
362 return randTimeOffset;
363 }
364
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)365 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
366 {
367 if (!isNeedHash) {
368 hashDeviceId = DEVICEID_PREFIX_KEY + deviceId;
369 return;
370 }
371 if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
372 hashDeviceId = DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
373 deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
374 } else {
375 hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
376 }
377 }
378
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)379 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
380 const std::string &deviceId, WaterMark &waterMark)
381 {
382 QueryWaterMark queryWaterMark;
383 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
384 if (errCode != E_OK) {
385 return errCode;
386 }
387 WaterMark peerWaterMark;
388 GetPeerWaterMark(deviceId, peerWaterMark);
389 waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
390 return E_OK;
391 }
392
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)393 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
394 const std::string &deviceId, const WaterMark &waterMark)
395 {
396 return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
397 }
398
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)399 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
400 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
401 {
402 QueryWaterMark queryWaterMark;
403 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
404 if (errCode != E_OK) {
405 return errCode;
406 }
407 if (isAutoLift) {
408 WaterMark localWaterMark;
409 GetLocalWaterMark(deviceId, localWaterMark);
410 waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
411 } else {
412 waterMark = queryWaterMark.sendWaterMark;
413 }
414 return E_OK;
415 }
416
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)417 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
418 const std::string &deviceId, const WaterMark &waterMark)
419 {
420 return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
421 }
422
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)423 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp)
424 {
425 QueryWaterMark queryWaterMark;
426 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
427 if (errCode != E_OK) {
428 return errCode;
429 }
430 timestamp = queryWaterMark.lastQueryTime;
431 return E_OK;
432 }
433
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)434 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
435 const Timestamp ×tamp)
436 {
437 return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
438 }
439
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)440 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
441 {
442 DeleteWaterMark deleteWaterMark;
443 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
444 if (errCode != E_OK) {
445 return errCode;
446 }
447 if (isAutoLift) {
448 WaterMark localWaterMark;
449 GetLocalWaterMark(deviceId, localWaterMark);
450 waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
451 } else {
452 waterMark = deleteWaterMark.sendWaterMark;
453 }
454 return E_OK;
455 }
456
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)457 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
458 {
459 return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
460 }
461
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)462 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
463 {
464 DeleteWaterMark deleteWaterMark;
465 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
466 if (errCode != E_OK) {
467 return errCode;
468 }
469 WaterMark peerWaterMark;
470 GetPeerWaterMark(deviceId, peerWaterMark);
471 waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
472 return E_OK;
473 }
474
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)475 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
476 {
477 return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
478 }
479
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)480 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
481 {
482 return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
483 }
484
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)485 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
486 {
487 MetaDataValue metadata;
488 std::lock_guard<std::mutex> lockGuard(metadataLock_);
489 DeviceID hashDeviceId;
490 GetHashDeviceId(deviceId, hashDeviceId, true);
491 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
492 metadata = metadataMap_[hashDeviceId];
493 outValue = metadata.dbCreateTime;
494 return;
495 }
496 outValue = 0;
497 LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
498 }
499
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)500 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
501 {
502 MetaDataValue metadata;
503 std::lock_guard<std::mutex> lockGuard(metadataLock_);
504 DeviceID hashDeviceId;
505 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
506 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
507 metadata = metadataMap_[hashDeviceId];
508 if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
509 metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
510 LOGI("Metadata::SetDbCreateTime,set cleardata mark,dev=%s,dbCreateTime=%" PRIu64,
511 STR_MASK(deviceId), inValue);
512 }
513 if (metadata.dbCreateTime == 0) {
514 LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
515 }
516 }
517 metadata.dbCreateTime = inValue;
518 return SaveMetaDataValue(deviceId, metadata);
519 }
520
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)521 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
522 {
523 MetaDataValue metadata;
524 std::lock_guard<std::mutex> lockGuard(metadataLock_);
525 DeviceID hashDeviceId;
526 GetHashDeviceId(deviceId, hashDeviceId, true);
527 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
528 metadata = metadataMap_[hashDeviceId];
529 metadata.clearDeviceDataMark = 0;
530 return SaveMetaDataValue(deviceId, metadata);
531 }
532 return -E_NOT_FOUND;
533 }
534
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)535 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
536 {
537 MetaDataValue metadata;
538 std::lock_guard<std::mutex> lockGuard(metadataLock_);
539 DeviceID hashDeviceId;
540 GetHashDeviceId(deviceId, hashDeviceId, true);
541 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
542 metadata = metadataMap_[hashDeviceId];
543 outValue = metadata.clearDeviceDataMark;
544 return;
545 }
546 outValue = 0;
547 }
548
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const549 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
550 {
551 std::vector<uint8_t> key;
552 std::vector<uint8_t> value;
553 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
554 DBCommon::StringToVector(hashqueryId, key);
555 int errCode = GetMetadataFromDb(key, value);
556 std::lock_guard<std::mutex> lockGuard(metadataLock_);
557 if (errCode == -E_NOT_FOUND) {
558 auto iter = queryIdMap_.find(deviceId);
559 if (iter != queryIdMap_.end()) {
560 if (iter->second.find(hashqueryId) == iter->second.end()) {
561 iter->second.insert(hashqueryId);
562 return INT64_MAX;
563 }
564 return 0;
565 } else {
566 queryIdMap_[deviceId] = { hashqueryId };
567 return INT64_MAX;
568 }
569 }
570 auto iter = queryIdMap_.find(deviceId);
571 // while value is found in db, it can be found in db later when db is not closed
572 // so no need to record the hashqueryId in map
573 if (errCode == E_OK && iter != queryIdMap_.end()) {
574 iter->second.erase(hashqueryId);
575 }
576 return StringToLong(value);
577 }
578
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)579 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
580 {
581 std::lock_guard<std::mutex> lockGuard(metadataLock_);
582 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
583 auto iter = queryIdMap_.find(deviceId);
584 if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
585 iter->second.erase(hashqueryId);
586 }
587 }
588
589
SaveClientId(const std::string & deviceId,const std::string & clientId)590 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
591 {
592 {
593 // already save in cache
594 std::lock_guard<std::mutex> autoLock(clientIdLock_);
595 if (clientIdCache_[deviceId] == clientId) {
596 return E_OK;
597 }
598 }
599 std::string keyStr;
600 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
601 std::string valueStr = DBCommon::TransferHashString(deviceId);
602 Key key;
603 DBCommon::StringToVector(keyStr, key);
604 Value value;
605 DBCommon::StringToVector(valueStr, value);
606 int errCode = SetMetadataToDb(key, value);
607 if (errCode != E_OK) {
608 return errCode;
609 }
610 std::lock_guard<std::mutex> autoLock(clientIdLock_);
611 clientIdCache_[deviceId] = clientId;
612 return E_OK;
613 }
614
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)615 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
616 {
617 // don't use cache here avoid invalid cache
618 std::string keyStr;
619 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
620 Key key;
621 DBCommon::StringToVector(keyStr, key);
622 Value value;
623 int errCode = GetMetadataFromDb(key, value);
624 if (errCode == -E_NOT_FOUND) {
625 LOGD("[Metadata] not found clientId");
626 return -E_NOT_SUPPORT;
627 }
628 if (errCode != E_OK) {
629 LOGE("[Metadata] reload clientId failed %d", errCode);
630 return errCode;
631 }
632 DBCommon::VectorToString(value, hashDevId);
633 return E_OK;
634 }
635 } // namespace DistributedDB