1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "meta_data.h"
17
18 #include <openssl/rand.h>
19
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "platform_specific.h"
26 #include "securec.h"
27 #include "sync_types.h"
28 #include "time_helper.h"
29 #include "version.h"
30
31 namespace DistributedDB {
32 namespace {
33 // store local timeoffset;this is a special key;
34 constexpr const char *CLIENT_ID_PREFIX_KEY = "clientId";
35 constexpr const char *LOCAL_META_DATA_KEY = "localMetaData";
36 }
37
Metadata()38 Metadata::Metadata()
39 : localTimeOffset_(0),
40 naturalStoragePtr_(nullptr),
41 lastLocalTime_(0)
42 {}
43
~Metadata()44 Metadata::~Metadata()
45 {
46 naturalStoragePtr_ = nullptr;
47 }
48
Initialize(ISyncInterface * storage)49 int Metadata::Initialize(ISyncInterface* storage)
50 {
51 naturalStoragePtr_ = storage;
52 std::vector<uint8_t> key;
53 std::vector<uint8_t> timeOffset;
54 DBCommon::StringToVector(std::string(DBConstant::LOCALTIME_OFFSET_KEY), key);
55
56 int errCode = GetMetadataFromDb(key, timeOffset);
57 if (errCode == -E_NOT_FOUND) {
58 int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
59 if (err != E_OK) {
60 LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
61 return err;
62 }
63 } else if (errCode == E_OK) {
64 localTimeOffset_ = StringToLong(timeOffset);
65 } else {
66 LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
67 return errCode;
68 }
69 (void)querySyncWaterMarkHelper_.Initialize(storage);
70 return LoadAllMetadata();
71 }
72
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)73 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
74 {
75 MetaDataValue metadata;
76 std::lock_guard<std::mutex> lockGuard(metadataLock_);
77 GetMetaDataValue(deviceId, metadata, true);
78 metadata.timeOffset = inValue;
79 metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
80 LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
81 return SaveMetaDataValue(deviceId, metadata);
82 }
83
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)84 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
85 {
86 MetaDataValue metadata;
87 std::lock_guard<std::mutex> lockGuard(metadataLock_);
88 GetMetaDataValue(deviceId, metadata, true);
89 outValue = metadata.timeOffset;
90 }
91
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)92 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
93 {
94 MetaDataValue metadata;
95 std::lock_guard<std::mutex> lockGuard(metadataLock_);
96 GetMetaDataValue(deviceId, metadata, true);
97 outValue = metadata.localWaterMark;
98 }
99
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)100 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
101 {
102 MetaDataValue metadata;
103 std::lock_guard<std::mutex> lockGuard(metadataLock_);
104 GetMetaDataValue(deviceId, metadata, true);
105 metadata.localWaterMark = inValue;
106 LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
107 return SaveMetaDataValue(deviceId, metadata);
108 }
109
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue,bool isNeedHash)110 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash)
111 {
112 MetaDataValue metadata;
113 std::lock_guard<std::mutex> lockGuard(metadataLock_);
114 GetMetaDataValue(deviceId, metadata, isNeedHash);
115 outValue = metadata.peerWaterMark;
116 }
117
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)118 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
119 {
120 MetaDataValue metadata;
121 std::lock_guard<std::mutex> lockGuard(metadataLock_);
122 GetMetaDataValue(deviceId, metadata, isNeedHash);
123 metadata.peerWaterMark = inValue;
124 LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
125 return SaveMetaDataValue(deviceId, metadata, isNeedHash);
126 }
127
SaveLocalTimeOffset(TimeOffset timeOffset,bool saveIntoDb)128 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset, bool saveIntoDb)
129 {
130 std::string timeOffsetString = std::to_string(timeOffset);
131 std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
132 std::string keyStr(DBConstant::LOCALTIME_OFFSET_KEY);
133 std::vector<uint8_t> localTimeOffsetValue(keyStr.begin(), keyStr.end());
134
135 std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
136 localTimeOffset_ = timeOffset;
137 LOGI("Metadata::SaveLocalTimeOffset offset = %" PRId64 " save db %d", timeOffset, static_cast<int>(saveIntoDb));
138 if (!saveIntoDb) {
139 return E_OK;
140 }
141 int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
142 if (errCode != E_OK) {
143 LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
144 }
145 return errCode;
146 }
147
GetLocalTimeOffset() const148 TimeOffset Metadata::GetLocalTimeOffset() const
149 {
150 TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
151 return localTimeOffset;
152 }
153
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)154 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
155 {
156 return EraseDeviceWaterMark(deviceId, isNeedHash, "");
157 }
158
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)159 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
160 {
161 // reload meta data again
162 (void)LoadAllMetadata();
163 std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_);
164 // try to erase all the waterMark
165 // erase deleteSync recv waterMark
166 WaterMark waterMark = 0;
167 int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
168 if (errCodeDeleteSync != E_OK) {
169 LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
170 return errCodeDeleteSync;
171 }
172 // erase querySync recv waterMark
173 int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
174 if (errCodeQuerySync != E_OK) {
175 LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
176 return errCodeQuerySync;
177 }
178 // peerWaterMark must be erased at last
179 int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
180 if (errCode != E_OK) {
181 LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
182 return errCode;
183 }
184 return E_OK;
185 }
186
SetLastLocalTime(Timestamp lastLocalTime)187 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
188 {
189 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
190 if (lastLocalTime > lastLocalTime_) {
191 lastLocalTime_ = lastLocalTime;
192 }
193 }
194
GetLastLocalTime() const195 Timestamp Metadata::GetLastLocalTime() const
196 {
197 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
198 return lastLocalTime_;
199 }
200
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)201 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
202 {
203 std::vector<uint8_t> value;
204 int errCode = SerializeMetaData(inValue, value);
205 if (errCode != E_OK) {
206 return errCode;
207 }
208
209 DeviceID hashDeviceId;
210 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
211 std::vector<uint8_t> key;
212 DBCommon::StringToVector(hashDeviceId, key);
213 errCode = SetMetadataToDb(key, value);
214 if (errCode != E_OK) {
215 LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
216 }
217 return errCode;
218 }
219
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)220 int Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
221 {
222 int errCode = GetMetaDataValueFromDB(deviceId, isNeedHash, outValue);
223 if (errCode == -E_NOT_FOUND && RuntimeContext::GetInstance()->IsTimeChanged()) {
224 outValue = {};
225 outValue.syncMark = static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
226 }
227 return errCode;
228 }
229
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)230 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
231 {
232 outValue.resize(sizeof(MetaDataValue));
233 errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue));
234 if (err != EOK) {
235 return -E_SECUREC_ERROR;
236 }
237 return E_OK;
238 }
239
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)240 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
241 {
242 if (inValue.empty()) {
243 return -E_INVALID_ARGS;
244 }
245 errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size());
246 if (err != EOK) {
247 return -E_SECUREC_ERROR;
248 }
249 return E_OK;
250 }
251
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const252 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
253 {
254 if (naturalStoragePtr_ == nullptr) {
255 return -E_INVALID_DB;
256 }
257 return naturalStoragePtr_->GetMetaData(key, outValue);
258 }
259
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)260 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
261 {
262 if (naturalStoragePtr_ == nullptr) {
263 return -E_INVALID_DB;
264 }
265 return naturalStoragePtr_->PutMetaData(key, inValue, false);
266 }
267
StringToLong(const std::vector<uint8_t> & value) const268 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
269 {
270 std::string valueString(value.begin(), value.end());
271 int64_t longData = std::strtoll(valueString.c_str(), nullptr, DBConstant::STR_TO_LL_BY_DEVALUE);
272 if (errno == ERANGE && (longData == LLONG_MAX || longData == LLONG_MIN)) {
273 LOGW("[Metadata][StringToLong] convert string '%s' to number failed, longData = %" PRId64,
274 valueString.c_str(), longData);
275 }
276 LOGD("Metadata::StringToLong longData = %" PRId64, longData);
277 return longData;
278 }
279
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)280 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
281 {
282 if (naturalStoragePtr_ == nullptr) {
283 return -E_INVALID_DB;
284 }
285 return naturalStoragePtr_->GetAllMetaKeys(keys);
286 }
287
288 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)289 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
290 {
291 if (inKey.size() < expectPrefix.size()) {
292 return false;
293 }
294 std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
295 if (prefixInKey != expectPrefix) {
296 return false;
297 }
298 return true;
299 }
300 }
301
LoadAllMetadata()302 int Metadata::LoadAllMetadata()
303 {
304 std::vector<std::vector<uint8_t>> metaDataKeys;
305 int errCode = GetAllMetadataKey(metaDataKeys);
306 if (errCode != E_OK) {
307 LOGE("[Metadata] get all metadata key failed err=%d", errCode);
308 return errCode;
309 }
310
311 std::vector<std::vector<uint8_t>> querySyncIds;
312 for (const auto &deviceId : metaDataKeys) {
313 if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
314 querySyncIds.push_back(deviceId);
315 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
316 errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
317 if (errCode != E_OK) {
318 return errCode;
319 }
320 }
321 }
322 errCode = querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
323 if (errCode != E_OK) {
324 return errCode;
325 }
326 return InitLocalMetaData();
327 }
328
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)329 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
330 {
331 if (!isNeedHash) {
332 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId;
333 return;
334 }
335 if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
336 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
337 deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
338 } else {
339 hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
340 }
341 }
342
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)343 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
344 const std::string &deviceId, WaterMark &waterMark)
345 {
346 QueryWaterMark queryWaterMark;
347 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
348 if (errCode != E_OK) {
349 return errCode;
350 }
351 WaterMark peerWaterMark;
352 GetPeerWaterMark(deviceId, peerWaterMark);
353 waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
354 return E_OK;
355 }
356
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)357 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
358 const std::string &deviceId, const WaterMark &waterMark)
359 {
360 return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
361 }
362
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)363 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
364 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
365 {
366 QueryWaterMark queryWaterMark;
367 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
368 if (errCode != E_OK) {
369 return errCode;
370 }
371 if (isAutoLift) {
372 WaterMark localWaterMark;
373 GetLocalWaterMark(deviceId, localWaterMark);
374 waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
375 } else {
376 waterMark = queryWaterMark.sendWaterMark;
377 }
378 return E_OK;
379 }
380
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)381 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
382 const std::string &deviceId, const WaterMark &waterMark)
383 {
384 return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
385 }
386
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)387 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp)
388 {
389 QueryWaterMark queryWaterMark;
390 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
391 if (errCode != E_OK) {
392 return errCode;
393 }
394 timestamp = queryWaterMark.lastQueryTime;
395 return E_OK;
396 }
397
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)398 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
399 const Timestamp ×tamp)
400 {
401 return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
402 }
403
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)404 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
405 {
406 DeleteWaterMark deleteWaterMark;
407 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
408 if (errCode != E_OK) {
409 return errCode;
410 }
411 if (isAutoLift) {
412 WaterMark localWaterMark;
413 GetLocalWaterMark(deviceId, localWaterMark);
414 waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
415 } else {
416 waterMark = deleteWaterMark.sendWaterMark;
417 }
418 return E_OK;
419 }
420
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)421 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
422 {
423 return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
424 }
425
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)426 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
427 {
428 DeleteWaterMark deleteWaterMark;
429 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
430 if (errCode != E_OK) {
431 return errCode;
432 }
433 WaterMark peerWaterMark;
434 GetPeerWaterMark(deviceId, peerWaterMark);
435 waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
436 return E_OK;
437 }
438
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)439 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
440 {
441 return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
442 }
443
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)444 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
445 {
446 return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
447 }
448
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)449 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
450 {
451 std::lock_guard<std::mutex> lockGuard(metadataLock_);
452 MetaDataValue metadata;
453 int errCode = GetMetaDataValue(deviceId, metadata, true);
454 if (errCode == E_OK) {
455 outValue = metadata.dbCreateTime;
456 return;
457 }
458 outValue = 0;
459 LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
460 }
461
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)462 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
463 {
464 std::lock_guard<std::mutex> lockGuard(metadataLock_);
465 MetaDataValue metadata;
466 int errCode = GetMetaDataValue(deviceId, metadata, isNeedHash);
467 if (errCode == E_OK) {
468 if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
469 metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
470 LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64,
471 STR_MASK(deviceId), inValue);
472 }
473 if (metadata.dbCreateTime == 0) {
474 LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
475 }
476 } else if (errCode != -E_NOT_FOUND) {
477 return errCode;
478 }
479
480 metadata.dbCreateTime = inValue;
481 return SaveMetaDataValue(deviceId, metadata, isNeedHash);
482 }
483
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)484 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
485 {
486 std::lock_guard<std::mutex> lockGuard(metadataLock_);
487 MetaDataValue metadata;
488 int errCode = GetMetaDataValue(deviceId, metadata, true);
489 if (errCode == E_OK) {
490 metadata.clearDeviceDataMark = 0;
491 return SaveMetaDataValue(deviceId, metadata, true);
492 }
493 return errCode;
494 }
495
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)496 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
497 {
498 std::lock_guard<std::mutex> lockGuard(metadataLock_);
499 MetaDataValue metadata;
500 int errCode = GetMetaDataValue(deviceId, metadata, true);
501 if (errCode == E_OK) {
502 outValue = metadata.clearDeviceDataMark;
503 return;
504 }
505 outValue = 0;
506 }
507
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const508 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
509 {
510 std::vector<uint8_t> key;
511 std::vector<uint8_t> value;
512 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
513 DBCommon::StringToVector(hashqueryId, key);
514 int errCode = GetMetadataFromDb(key, value);
515 std::lock_guard<std::mutex> lockGuard(metadataLock_);
516 if (errCode == -E_NOT_FOUND) {
517 auto iter = queryIdMap_.find(deviceId);
518 if (iter != queryIdMap_.end()) {
519 if (iter->second.find(hashqueryId) == iter->second.end()) {
520 iter->second.insert(hashqueryId);
521 return INT64_MAX;
522 }
523 return 0;
524 } else {
525 queryIdMap_[deviceId] = { hashqueryId };
526 return INT64_MAX;
527 }
528 }
529 auto iter = queryIdMap_.find(deviceId);
530 // while value is found in db, it can be found in db later when db is not closed
531 // so no need to record the hashqueryId in map
532 if (errCode == E_OK && iter != queryIdMap_.end()) {
533 iter->second.erase(hashqueryId);
534 }
535 return static_cast<uint64_t>(StringToLong(value));
536 }
537
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)538 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
539 {
540 std::lock_guard<std::mutex> lockGuard(metadataLock_);
541 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
542 auto iter = queryIdMap_.find(deviceId);
543 if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
544 iter->second.erase(hashqueryId);
545 }
546 }
547
SaveClientId(const std::string & deviceId,const std::string & clientId)548 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
549 {
550 {
551 // already save in cache
552 std::lock_guard<std::mutex> autoLock(clientIdLock_);
553 if (clientIdCache_[deviceId] == clientId) {
554 return E_OK;
555 }
556 }
557 std::string keyStr;
558 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
559 std::string valueStr = DBCommon::TransferHashString(deviceId);
560 Key key;
561 DBCommon::StringToVector(keyStr, key);
562 Value value;
563 DBCommon::StringToVector(valueStr, value);
564 int errCode = SetMetadataToDb(key, value);
565 if (errCode != E_OK) {
566 return errCode;
567 }
568 std::lock_guard<std::mutex> autoLock(clientIdLock_);
569 clientIdCache_[deviceId] = clientId;
570 return E_OK;
571 }
572
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const573 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
574 {
575 // don't use cache here avoid invalid cache
576 std::string keyStr;
577 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
578 Key key;
579 DBCommon::StringToVector(keyStr, key);
580 Value value;
581 int errCode = GetMetadataFromDb(key, value);
582 if (errCode == -E_NOT_FOUND) {
583 LOGD("[Metadata] not found clientId by %.3s", clientId.c_str());
584 return -E_NOT_SUPPORT;
585 }
586 if (errCode != E_OK) {
587 LOGE("[Metadata] reload clientId failed %d", errCode);
588 return errCode;
589 }
590 DBCommon::VectorToString(value, hashDevId);
591 return E_OK;
592 }
593
LockWaterMark() const594 void Metadata::LockWaterMark() const
595 {
596 waterMarkMutex_.lock();
597 }
598
UnlockWaterMark() const599 void Metadata::UnlockWaterMark() const
600 {
601 waterMarkMutex_.unlock();
602 }
603
GetWaterMarkInfoFromDB(const std::string & dev,bool isNeedHash,WatermarkInfo & info)604 int Metadata::GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info)
605 {
606 // read from db avoid watermark update in diff process
607 std::lock_guard<std::mutex> lockGuard(metadataLock_);
608 MetaDataValue metadata;
609 int errCode = GetMetaDataValue(dev, metadata, isNeedHash);
610 if (errCode == -E_NOT_FOUND) {
611 LOGD("[Metadata] not found meta value");
612 return E_OK;
613 }
614 if (errCode != E_OK) {
615 LOGE("[Metadata] reload meta value failed %d", errCode);
616 return errCode;
617 }
618
619 info.sendMark = metadata.localWaterMark;
620 info.receiveMark = metadata.peerWaterMark;
621 return E_OK;
622 }
623
ClearAllAbilitySyncFinishMark()624 int Metadata::ClearAllAbilitySyncFinishMark()
625 {
626 return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK) |
627 static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION));
628 }
629
ClearAllTimeSyncFinishMark()630 int Metadata::ClearAllTimeSyncFinishMark()
631 {
632 return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK) |
633 static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET) |
634 static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK));
635 }
636
ClearAllMetaDataValue(uint32_t innerClearAction)637 int Metadata::ClearAllMetaDataValue(uint32_t innerClearAction)
638 {
639 std::vector<std::vector<uint8_t>> metaDataKeys;
640 int errCode = GetAllMetadataKey(metaDataKeys);
641 if (errCode != E_OK) {
642 LOGE("[Metadata][ClearAllMetaDataValue] get all metadata key failed err=%d", errCode);
643 return errCode;
644 }
645
646 std::lock_guard<std::mutex> lockGuard(metadataLock_);
647 for (const auto &deviceId : metaDataKeys) {
648 if (!IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) {
649 continue;
650 }
651 MetaDataValue metaData;
652 int innerErrCode = GetMetaDataValueFromDB(deviceId, metaData);
653 if (innerErrCode != E_OK) {
654 LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " get meta data failed %d", innerClearAction,
655 innerErrCode);
656 errCode = errCode == E_OK ? innerErrCode : errCode;
657 continue;
658 }
659 ClearMetaDataValue(innerClearAction, metaData);
660 std::vector<uint8_t> value;
661 innerErrCode = SerializeMetaData(metaData, value);
662 // clear sync mark without transaction here
663 // just ignore error metadata value
664 if (innerErrCode != E_OK) {
665 LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " serialize meta data failed %d", innerClearAction,
666 innerErrCode);
667 errCode = errCode == E_OK ? innerErrCode : errCode;
668 continue;
669 }
670
671 innerErrCode = SetMetadataToDb(deviceId, value);
672 if (innerErrCode != E_OK) {
673 LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " save meta data failed %d", innerClearAction,
674 innerErrCode);
675 errCode = errCode == E_OK ? innerErrCode : errCode;
676 }
677 }
678 if (errCode == E_OK) {
679 LOGD("[Metadata][ClearAllMetaDataValue] success clear action %" PRIu32, innerClearAction);
680 }
681 return errCode;
682 }
683
ClearMetaDataValue(uint32_t innerClearAction,MetaDataValue & metaDataValue)684 void Metadata::ClearMetaDataValue(uint32_t innerClearAction, MetaDataValue &metaDataValue)
685 {
686 auto mark = static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK);
687 if ((innerClearAction & mark) == mark) {
688 metaDataValue.syncMark =
689 DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_ABILITY_SYNC));
690 }
691 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK);
692 if ((innerClearAction & mark) == mark) {
693 metaDataValue.syncMark =
694 DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_SYNC));
695 }
696 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION);
697 if ((innerClearAction & mark) == mark) {
698 metaDataValue.remoteSchemaVersion = 0;
699 }
700 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET);
701 if ((innerClearAction & mark) == mark) {
702 metaDataValue.systemTimeOffset = 0;
703 }
704 mark = static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK);
705 if ((innerClearAction & mark) == mark) {
706 metaDataValue.syncMark |= static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
707 }
708 }
709
SetAbilitySyncFinishMark(const std::string & deviceId,bool finish)710 int Metadata::SetAbilitySyncFinishMark(const std::string &deviceId, bool finish)
711 {
712 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC, finish);
713 }
714
IsAbilitySyncFinish(const std::string & deviceId)715 bool Metadata::IsAbilitySyncFinish(const std::string &deviceId)
716 {
717 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC);
718 }
719
SetTimeSyncFinishMark(const std::string & deviceId,bool finish)720 int Metadata::SetTimeSyncFinishMark(const std::string &deviceId, bool finish)
721 {
722 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC, finish);
723 }
724
SetTimeChangeMark(const std::string & deviceId,bool change)725 int Metadata::SetTimeChangeMark(const std::string &deviceId, bool change)
726 {
727 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE, change);
728 }
729
IsTimeSyncFinish(const std::string & deviceId)730 bool Metadata::IsTimeSyncFinish(const std::string &deviceId)
731 {
732 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC);
733 }
734
IsTimeChange(const std::string & deviceId)735 bool Metadata::IsTimeChange(const std::string &deviceId)
736 {
737 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE);
738 }
739
SetSyncMark(const std::string & deviceId,SyncMark syncMark,bool finish)740 int Metadata::SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish)
741 {
742 MetaDataValue metadata;
743 std::lock_guard<std::mutex> lockGuard(metadataLock_);
744 GetMetaDataValue(deviceId, metadata, true);
745 auto mark = static_cast<uint64_t>(syncMark);
746 if (finish) {
747 metadata.syncMark |= mark;
748 } else {
749 metadata.syncMark = DBCommon::EraseBit(metadata.syncMark, mark);
750 }
751 LOGD("[Metadata] Mark:%" PRIx64 " sync finish:%d sync mark:%" PRIu64, mark, static_cast<int>(finish),
752 metadata.syncMark);
753 return SaveMetaDataValue(deviceId, metadata);
754 }
755
IsContainSyncMark(const std::string & deviceId,SyncMark syncMark)756 bool Metadata::IsContainSyncMark(const std::string &deviceId, SyncMark syncMark)
757 {
758 MetaDataValue metadata;
759 std::lock_guard<std::mutex> lockGuard(metadataLock_);
760 GetMetaDataValue(deviceId, metadata, true);
761 auto mark = static_cast<uint64_t>(syncMark);
762 return (metadata.syncMark & mark) == mark;
763 }
764
SetRemoteSchemaVersion(const std::string & deviceId,uint64_t schemaVersion)765 int Metadata::SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion)
766 {
767 MetaDataValue metadata;
768 std::lock_guard<std::mutex> lockGuard(metadataLock_);
769 GetMetaDataValue(deviceId, metadata, true);
770 metadata.remoteSchemaVersion = schemaVersion;
771 LOGI("[Metadata] Set %.3s schema version %" PRIu64, deviceId.c_str(), schemaVersion);
772 int errCode = SaveMetaDataValue(deviceId, metadata);
773 if (errCode != E_OK) {
774 LOGW("[Metadata] Set remote schema version failed");
775 }
776 return errCode;
777 }
778
GetRemoteSchemaVersion(const std::string & deviceId)779 uint64_t Metadata::GetRemoteSchemaVersion(const std::string &deviceId)
780 {
781 MetaDataValue metadata;
782 std::lock_guard<std::mutex> lockGuard(metadataLock_);
783 GetMetaDataValue(deviceId, metadata, true);
784 LOGI("[Metadata] Get %.3s schema version %" PRIu64, deviceId.c_str(), metadata.remoteSchemaVersion);
785 return metadata.remoteSchemaVersion;
786 }
787
SetSystemTimeOffset(const std::string & deviceId,int64_t systemTimeOffset)788 int Metadata::SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset)
789 {
790 MetaDataValue metadata;
791 std::lock_guard<std::mutex> lockGuard(metadataLock_);
792 GetMetaDataValue(deviceId, metadata, true);
793 metadata.systemTimeOffset = systemTimeOffset;
794 LOGI("[Metadata] Set %.3s systemTimeOffset %" PRId64, deviceId.c_str(), systemTimeOffset);
795 return SaveMetaDataValue(deviceId, metadata);
796 }
797
GetSystemTimeOffset(const std::string & deviceId)798 int64_t Metadata::GetSystemTimeOffset(const std::string &deviceId)
799 {
800 MetaDataValue metadata;
801 std::lock_guard<std::mutex> lockGuard(metadataLock_);
802 GetMetaDataValue(deviceId, metadata, true);
803 LOGI("[Metadata] Get %.3s systemTimeOffset %" PRId64, deviceId.c_str(), metadata.systemTimeOffset);
804 return metadata.systemTimeOffset;
805 }
806
GetLocalSchemaVersion()807 std::pair<int, uint64_t> Metadata::GetLocalSchemaVersion()
808 {
809 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
810 auto [errCode, localMetaData] = GetLocalMetaData();
811 if (errCode != E_OK) {
812 return {errCode, 0};
813 }
814 LOGI("[Metadata] Get local schema version %" PRIu64, localMetaData.localSchemaVersion);
815 return {errCode, localMetaData.localSchemaVersion};
816 }
817
SetLocalSchemaVersion(uint64_t schemaVersion)818 int Metadata::SetLocalSchemaVersion(uint64_t schemaVersion)
819 {
820 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
821 auto [errCode, localMetaData] = GetLocalMetaData();
822 if (errCode != E_OK) {
823 return errCode;
824 }
825 localMetaData.localSchemaVersion = schemaVersion;
826 LOGI("[Metadata] Set local schema version %" PRIu64, schemaVersion);
827 return SaveLocalMetaData(localMetaData);
828 }
829
SaveLocalMetaData(const LocalMetaData & localMetaData)830 int Metadata::SaveLocalMetaData(const LocalMetaData &localMetaData)
831 {
832 auto [errCode, value] = SerializeLocalMetaData(localMetaData);
833 if (errCode != E_OK) {
834 LOGE("[Metadata] Serialize local meta data failed %d", errCode);
835 return errCode;
836 }
837 std::string k(LOCAL_META_DATA_KEY);
838 Key key(k.begin(), k.end());
839 return SetMetadataToDb(key, value);
840 }
841
GetLocalMetaData()842 std::pair<int, LocalMetaData> Metadata::GetLocalMetaData()
843 {
844 std::string k(LOCAL_META_DATA_KEY);
845 Key key(k.begin(), k.end());
846 Value value;
847 int errCode = GetMetadataFromDb(key, value);
848 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
849 return {errCode, {}};
850 }
851 return DeSerializeLocalMetaData(value);
852 }
853
SerializeLocalMetaData(const LocalMetaData & localMetaData)854 std::pair<int, Value> Metadata::SerializeLocalMetaData(const LocalMetaData &localMetaData)
855 {
856 std::pair<int, Value> res = {E_OK, {}};
857 auto &[errCode, value] = res;
858 value.resize(CalculateLocalMetaDataLength());
859 Parcel parcel(value.data(), value.size());
860 (void)parcel.WriteUInt32(localMetaData.version);
861 parcel.EightByteAlign();
862 (void)parcel.WriteUInt64(localMetaData.localSchemaVersion);
863 if (parcel.IsError()) {
864 LOGE("[Metadata] Serialize localMetaData failed");
865 errCode = -E_SERIALIZE_ERROR;
866 value.clear();
867 return res;
868 }
869 return res;
870 }
871
DeSerializeLocalMetaData(const Value & value)872 std::pair<int, LocalMetaData> Metadata::DeSerializeLocalMetaData(const Value &value)
873 {
874 std::pair<int, LocalMetaData> res;
875 auto &[errCode, meta] = res;
876 if (value.empty()) {
877 errCode = E_OK;
878 return res;
879 }
880 Parcel parcel(const_cast<uint8_t *>(value.data()), value.size());
881 parcel.ReadUInt32(meta.version);
882 if (meta.version >= LOCAL_META_DATA_VERSION_V2) {
883 parcel.EightByteAlign();
884 }
885 parcel.ReadUInt64(meta.localSchemaVersion);
886 if (parcel.IsError()) {
887 LOGE("[Metadata] DeSerialize localMetaData failed");
888 errCode = -E_SERIALIZE_ERROR;
889 return res;
890 }
891 return res;
892 }
893
CalculateLocalMetaDataLength()894 uint64_t Metadata::CalculateLocalMetaDataLength()
895 {
896 uint64_t length = Parcel::GetUInt32Len(); // version
897 length = Parcel::GetEightByteAlign(length);
898 length += Parcel::GetUInt64Len(); // local schema version
899 return length;
900 }
901
MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)902 Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)
903 : metadataPtr_(std::move(metadata))
904 {
905 if (metadataPtr_ != nullptr) {
906 metadataPtr_->LockWaterMark();
907 }
908 }
909
~MetaWaterMarkAutoLock()910 Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock()
911 {
912 if (metadataPtr_ != nullptr) {
913 metadataPtr_->UnlockWaterMark();
914 }
915 }
916
InitLocalMetaData()917 int Metadata::InitLocalMetaData()
918 {
919 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
920 auto [errCode, localMetaData] = GetLocalMetaData();
921 if (errCode != E_OK) {
922 return errCode;
923 }
924 if (localMetaData.localSchemaVersion != 0) {
925 return E_OK;
926 }
927 uint64_t curTime = 0u;
928 errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
929 if (errCode != E_OK) {
930 LOGW("[Metadata] get system time failed when init schema version!");
931 localMetaData.localSchemaVersion += 1;
932 } else {
933 localMetaData.localSchemaVersion = curTime;
934 }
935 errCode = SaveLocalMetaData(localMetaData);
936 if (errCode != E_OK) {
937 LOGE("[Metadata] init local schema version failed:%d", errCode);
938 }
939 return errCode;
940 }
941
GetMetaDataValueFromDB(const std::string & deviceId,bool isNeedHash,MetaDataValue & metaDataValue)942 int Metadata::GetMetaDataValueFromDB(const std::string &deviceId, bool isNeedHash, MetaDataValue &metaDataValue)
943 {
944 DeviceID hashDeviceId;
945 Key devKey;
946 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
947 DBCommon::StringToVector(hashDeviceId, devKey);
948
949 return GetMetaDataValueFromDB(devKey, metaDataValue);
950 }
951
GetMetaDataValueFromDB(const Key & key,MetaDataValue & metaDataValue)952 int Metadata::GetMetaDataValueFromDB(const Key &key, MetaDataValue &metaDataValue)
953 {
954 std::vector<uint8_t> value;
955 int errCode = GetMetadataFromDb(key, value);
956 if (errCode != E_OK) {
957 LOGE("[Metadata] Get metadata from db failed %d", errCode);
958 return errCode;
959 }
960 return DeSerializeMetaData(value, metaDataValue);
961 }
962
GetRemoteSoftwareVersion(const std::string & deviceId)963 uint64_t Metadata::GetRemoteSoftwareVersion(const std::string &deviceId)
964 {
965 MetaDataValue metadata;
966 std::lock_guard<std::mutex> lockGuard(metadataLock_);
967 GetMetaDataValue(deviceId, metadata, true);
968 return metadata.remoteSoftwareVersion;
969 }
970
SetRemoteSoftwareVersion(const std::string & deviceId,uint64_t version)971 int Metadata::SetRemoteSoftwareVersion(const std::string &deviceId, uint64_t version)
972 {
973 MetaDataValue metadata;
974 std::lock_guard<std::mutex> lockGuard(metadataLock_);
975 GetMetaDataValue(deviceId, metadata, true);
976 metadata.remoteSoftwareVersion = version;
977 LOGI("[Metadata] Set %.3s version %" PRId64, deviceId.c_str(), version);
978 return SaveMetaDataValue(deviceId, metadata);
979 }
980 } // namespace DistributedDB