• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "meta_data.h"
17 
18 #include <openssl/rand.h>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "platform_specific.h"
26 #include "securec.h"
27 #include "sync_types.h"
28 #include "time_helper.h"
29 #include "version.h"
30 
31 namespace DistributedDB {
32 namespace {
33     // store local timeoffset;this is a special key;
34     constexpr const char *CLIENT_ID_PREFIX_KEY = "clientId";
35     constexpr const char *LOCAL_META_DATA_KEY = "localMetaData";
36 }
37 
Metadata()38 Metadata::Metadata()
39     : localTimeOffset_(0),
40       naturalStoragePtr_(nullptr),
41       lastLocalTime_(0)
42 {}
43 
~Metadata()44 Metadata::~Metadata()
45 {
46     naturalStoragePtr_ = nullptr;
47 }
48 
Initialize(ISyncInterface * storage)49 int Metadata::Initialize(ISyncInterface* storage)
50 {
51     naturalStoragePtr_ = storage;
52     std::vector<uint8_t> key;
53     std::vector<uint8_t> timeOffset;
54     DBCommon::StringToVector(std::string(DBConstant::LOCALTIME_OFFSET_KEY), key);
55 
56     int errCode = GetMetadataFromDb(key, timeOffset);
57     if (errCode == -E_NOT_FOUND) {
58         int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
59         if (err != E_OK) {
60             LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
61             return err;
62         }
63     } else if (errCode == E_OK) {
64         localTimeOffset_ = StringToLong(timeOffset);
65     } else {
66         LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
67         return errCode;
68     }
69     (void)querySyncWaterMarkHelper_.Initialize(storage);
70     return LoadAllMetadata();
71 }
72 
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)73 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
74 {
75     MetaDataValue metadata;
76     std::lock_guard<std::mutex> lockGuard(metadataLock_);
77     GetMetaDataValue(deviceId, metadata, true);
78     metadata.timeOffset = inValue;
79     metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
80     LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
81     return SaveMetaDataValue(deviceId, metadata);
82 }
83 
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)84 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
85 {
86     MetaDataValue metadata;
87     std::lock_guard<std::mutex> lockGuard(metadataLock_);
88     GetMetaDataValue(deviceId, metadata, true);
89     outValue = metadata.timeOffset;
90 }
91 
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)92 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
93 {
94     MetaDataValue metadata;
95     std::lock_guard<std::mutex> lockGuard(metadataLock_);
96     GetMetaDataValue(deviceId, metadata, true);
97     outValue = metadata.localWaterMark;
98 }
99 
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)100 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
101 {
102     MetaDataValue metadata;
103     std::lock_guard<std::mutex> lockGuard(metadataLock_);
104     GetMetaDataValue(deviceId, metadata, true);
105     metadata.localWaterMark = inValue;
106     LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
107     return SaveMetaDataValue(deviceId, metadata);
108 }
109 
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue,bool isNeedHash)110 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash)
111 {
112     MetaDataValue metadata;
113     std::lock_guard<std::mutex> lockGuard(metadataLock_);
114     GetMetaDataValue(deviceId, metadata, isNeedHash);
115     outValue = metadata.peerWaterMark;
116 }
117 
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)118 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
119 {
120     MetaDataValue metadata;
121     std::lock_guard<std::mutex> lockGuard(metadataLock_);
122     GetMetaDataValue(deviceId, metadata, isNeedHash);
123     metadata.peerWaterMark = inValue;
124     LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
125     return SaveMetaDataValue(deviceId, metadata, isNeedHash);
126 }
127 
SaveLocalTimeOffset(TimeOffset timeOffset,bool saveIntoDb)128 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset, bool saveIntoDb)
129 {
130     std::string timeOffsetString = std::to_string(timeOffset);
131     std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
132     std::string keyStr(DBConstant::LOCALTIME_OFFSET_KEY);
133     std::vector<uint8_t> localTimeOffsetValue(keyStr.begin(), keyStr.end());
134 
135     std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
136     localTimeOffset_ = timeOffset;
137     LOGI("Metadata::SaveLocalTimeOffset offset = %" PRId64 " save db %d", timeOffset, static_cast<int>(saveIntoDb));
138     if (!saveIntoDb) {
139         return E_OK;
140     }
141     int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
142     if (errCode != E_OK) {
143         LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
144     }
145     return errCode;
146 }
147 
GetLocalTimeOffset() const148 TimeOffset Metadata::GetLocalTimeOffset() const
149 {
150     TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
151     return localTimeOffset;
152 }
153 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)154 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
155 {
156     return EraseDeviceWaterMark(deviceId, isNeedHash, "");
157 }
158 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)159 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
160 {
161     // reload meta data again
162     (void)LoadAllMetadata();
163     std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_);
164     // try to erase all the waterMark
165     // erase deleteSync recv waterMark
166     WaterMark waterMark = 0;
167     int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
168     if (errCodeDeleteSync != E_OK) {
169         LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
170         return errCodeDeleteSync;
171     }
172     // erase querySync recv waterMark
173     int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
174     if (errCodeQuerySync != E_OK) {
175         LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
176         return errCodeQuerySync;
177     }
178     // peerWaterMark must be erased at last
179     int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
180     if (errCode != E_OK) {
181         LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
182         return errCode;
183     }
184     return E_OK;
185 }
186 
SetLastLocalTime(Timestamp lastLocalTime)187 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
188 {
189     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
190     if (lastLocalTime > lastLocalTime_) {
191         lastLocalTime_ = lastLocalTime;
192     }
193 }
194 
GetLastLocalTime() const195 Timestamp Metadata::GetLastLocalTime() const
196 {
197     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
198     return lastLocalTime_;
199 }
200 
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)201 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
202 {
203     std::vector<uint8_t> value;
204     int errCode = SerializeMetaData(inValue, value);
205     if (errCode != E_OK) {
206         return errCode;
207     }
208 
209     DeviceID hashDeviceId;
210     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
211     std::vector<uint8_t> key;
212     DBCommon::StringToVector(hashDeviceId, key);
213     errCode = SetMetadataToDb(key, value);
214     if (errCode != E_OK) {
215         LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
216     }
217     return errCode;
218 }
219 
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)220 int Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
221 {
222     int errCode = GetMetaDataValueFromDB(deviceId, isNeedHash, outValue);
223     if (errCode == -E_NOT_FOUND && RuntimeContext::GetInstance()->IsTimeChanged()) {
224         outValue = {};
225         outValue.syncMark = static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
226     }
227     return errCode;
228 }
229 
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)230 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
231 {
232     outValue.resize(sizeof(MetaDataValue));
233     errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue));
234     if (err != EOK) {
235         return -E_SECUREC_ERROR;
236     }
237     return E_OK;
238 }
239 
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)240 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
241 {
242     if (inValue.empty()) {
243         return -E_INVALID_ARGS;
244     }
245     errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size());
246     if (err != EOK) {
247         return -E_SECUREC_ERROR;
248     }
249     return E_OK;
250 }
251 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const252 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
253 {
254     if (naturalStoragePtr_ == nullptr) {
255         return -E_INVALID_DB;
256     }
257     return naturalStoragePtr_->GetMetaData(key, outValue);
258 }
259 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)260 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
261 {
262     if (naturalStoragePtr_ == nullptr) {
263         return -E_INVALID_DB;
264     }
265     return naturalStoragePtr_->PutMetaData(key, inValue, false);
266 }
267 
StringToLong(const std::vector<uint8_t> & value) const268 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
269 {
270     std::string valueString(value.begin(), value.end());
271     int64_t longData = std::strtoll(valueString.c_str(), nullptr, DBConstant::STR_TO_LL_BY_DEVALUE);
272     if (errno == ERANGE && (longData == LLONG_MAX || longData == LLONG_MIN)) {
273         LOGW("[Metadata][StringToLong] convert string '%s' to number failed, longData = %" PRId64,
274             valueString.c_str(), longData);
275     }
276     LOGD("Metadata::StringToLong longData = %" PRId64, longData);
277     return longData;
278 }
279 
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)280 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
281 {
282     if (naturalStoragePtr_ == nullptr) {
283         return -E_INVALID_DB;
284     }
285     return naturalStoragePtr_->GetAllMetaKeys(keys);
286 }
287 
288 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)289 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
290 {
291     if (inKey.size() < expectPrefix.size()) {
292         return false;
293     }
294     std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
295     if (prefixInKey != expectPrefix) {
296         return false;
297     }
298     return true;
299 }
300 }
301 
LoadAllMetadata()302 int Metadata::LoadAllMetadata()
303 {
304     std::vector<std::vector<uint8_t>> metaDataKeys;
305     int errCode = GetAllMetadataKey(metaDataKeys);
306     if (errCode != E_OK) {
307         LOGE("[Metadata] get all metadata key failed err=%d", errCode);
308         return errCode;
309     }
310 
311     std::vector<std::vector<uint8_t>> querySyncIds;
312     for (const auto &deviceId : metaDataKeys) {
313         if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
314             querySyncIds.push_back(deviceId);
315         } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
316             errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
317             if (errCode != E_OK) {
318                 return errCode;
319             }
320         }
321     }
322     errCode = querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
323     if (errCode != E_OK) {
324         return errCode;
325     }
326     return InitLocalMetaData();
327 }
328 
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)329 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
330 {
331     if (!isNeedHash) {
332         hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId;
333         return;
334     }
335     if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
336         hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
337         deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
338     } else {
339         hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
340     }
341 }
342 
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)343 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
344     const std::string &deviceId, WaterMark &waterMark)
345 {
346     QueryWaterMark queryWaterMark;
347     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
348     if (errCode != E_OK) {
349         return errCode;
350     }
351     WaterMark peerWaterMark;
352     GetPeerWaterMark(deviceId, peerWaterMark);
353     waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
354     return E_OK;
355 }
356 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)357 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
358     const std::string &deviceId, const WaterMark &waterMark)
359 {
360     return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
361 }
362 
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)363 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
364     const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
365 {
366     QueryWaterMark queryWaterMark;
367     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
368     if (errCode != E_OK) {
369         return errCode;
370     }
371     if (isAutoLift) {
372         WaterMark localWaterMark;
373         GetLocalWaterMark(deviceId, localWaterMark);
374         waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
375     } else {
376         waterMark = queryWaterMark.sendWaterMark;
377     }
378     return E_OK;
379 }
380 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)381 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
382     const std::string &deviceId, const WaterMark &waterMark)
383 {
384     return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
385 }
386 
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)387 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp)
388 {
389     QueryWaterMark queryWaterMark;
390     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
391     if (errCode != E_OK) {
392         return errCode;
393     }
394     timestamp = queryWaterMark.lastQueryTime;
395     return E_OK;
396 }
397 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)398 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
399     const Timestamp &timestamp)
400 {
401     return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
402 }
403 
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)404 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
405 {
406     DeleteWaterMark deleteWaterMark;
407     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
408     if (errCode != E_OK) {
409         return errCode;
410     }
411     if (isAutoLift) {
412         WaterMark localWaterMark;
413         GetLocalWaterMark(deviceId, localWaterMark);
414         waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
415     } else {
416         waterMark = deleteWaterMark.sendWaterMark;
417     }
418     return E_OK;
419 }
420 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)421 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
422 {
423     return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
424 }
425 
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)426 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
427 {
428     DeleteWaterMark deleteWaterMark;
429     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
430     if (errCode != E_OK) {
431         return errCode;
432     }
433     WaterMark peerWaterMark;
434     GetPeerWaterMark(deviceId, peerWaterMark);
435     waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
436     return E_OK;
437 }
438 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)439 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
440 {
441     return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
442 }
443 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)444 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
445 {
446     return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
447 }
448 
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)449 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
450 {
451     std::lock_guard<std::mutex> lockGuard(metadataLock_);
452     MetaDataValue metadata;
453     int errCode = GetMetaDataValue(deviceId, metadata, true);
454     if (errCode == E_OK) {
455         outValue = metadata.dbCreateTime;
456         return;
457     }
458     outValue = 0;
459     LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
460 }
461 
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)462 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
463 {
464     std::lock_guard<std::mutex> lockGuard(metadataLock_);
465     MetaDataValue metadata;
466     int errCode = GetMetaDataValue(deviceId, metadata, isNeedHash);
467     if (errCode == E_OK) {
468         if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
469             metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
470             LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64,
471                 STR_MASK(deviceId), inValue);
472         }
473         if (metadata.dbCreateTime == 0) {
474             LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
475         }
476     } else if (errCode != -E_NOT_FOUND) {
477         return errCode;
478     }
479 
480     metadata.dbCreateTime = inValue;
481     return SaveMetaDataValue(deviceId, metadata, isNeedHash);
482 }
483 
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)484 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
485 {
486     std::lock_guard<std::mutex> lockGuard(metadataLock_);
487     MetaDataValue metadata;
488     int errCode = GetMetaDataValue(deviceId, metadata, true);
489     if (errCode == E_OK) {
490         metadata.clearDeviceDataMark = 0;
491         return SaveMetaDataValue(deviceId, metadata, true);
492     }
493     return errCode;
494 }
495 
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)496 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
497 {
498     std::lock_guard<std::mutex> lockGuard(metadataLock_);
499     MetaDataValue metadata;
500     int errCode = GetMetaDataValue(deviceId, metadata, true);
501     if (errCode == E_OK) {
502         outValue = metadata.clearDeviceDataMark;
503         return;
504     }
505     outValue = 0;
506 }
507 
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const508 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
509 {
510     std::vector<uint8_t> key;
511     std::vector<uint8_t> value;
512     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
513     DBCommon::StringToVector(hashqueryId, key);
514     int errCode = GetMetadataFromDb(key, value);
515     std::lock_guard<std::mutex> lockGuard(metadataLock_);
516     if (errCode == -E_NOT_FOUND) {
517         auto iter = queryIdMap_.find(deviceId);
518         if (iter != queryIdMap_.end()) {
519             if (iter->second.find(hashqueryId) == iter->second.end()) {
520                 iter->second.insert(hashqueryId);
521                 return INT64_MAX;
522             }
523             return 0;
524         } else {
525             queryIdMap_[deviceId] = { hashqueryId };
526             return INT64_MAX;
527         }
528     }
529     auto iter = queryIdMap_.find(deviceId);
530     // while value is found in db, it can be found in db later when db is not closed
531     // so no need to record the hashqueryId in map
532     if (errCode == E_OK && iter != queryIdMap_.end()) {
533         iter->second.erase(hashqueryId);
534     }
535     return static_cast<uint64_t>(StringToLong(value));
536 }
537 
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)538 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
539 {
540     std::lock_guard<std::mutex> lockGuard(metadataLock_);
541     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
542     auto iter = queryIdMap_.find(deviceId);
543     if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
544         iter->second.erase(hashqueryId);
545     }
546 }
547 
SaveClientId(const std::string & deviceId,const std::string & clientId)548 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
549 {
550     {
551         // already save in cache
552         std::lock_guard<std::mutex> autoLock(clientIdLock_);
553         if (clientIdCache_[deviceId] == clientId) {
554             return E_OK;
555         }
556     }
557     std::string keyStr;
558     keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
559     std::string valueStr = DBCommon::TransferHashString(deviceId);
560     Key key;
561     DBCommon::StringToVector(keyStr, key);
562     Value value;
563     DBCommon::StringToVector(valueStr, value);
564     int errCode = SetMetadataToDb(key, value);
565     if (errCode != E_OK) {
566         return errCode;
567     }
568     std::lock_guard<std::mutex> autoLock(clientIdLock_);
569     clientIdCache_[deviceId] = clientId;
570     return E_OK;
571 }
572 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const573 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
574 {
575     // don't use cache here avoid invalid cache
576     std::string keyStr;
577     keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
578     Key key;
579     DBCommon::StringToVector(keyStr, key);
580     Value value;
581     int errCode = GetMetadataFromDb(key, value);
582     if (errCode == -E_NOT_FOUND) {
583         LOGD("[Metadata] not found clientId by %.3s", clientId.c_str());
584         return -E_NOT_SUPPORT;
585     }
586     if (errCode != E_OK) {
587         LOGE("[Metadata] reload clientId failed %d", errCode);
588         return errCode;
589     }
590     DBCommon::VectorToString(value, hashDevId);
591     return E_OK;
592 }
593 
LockWaterMark() const594 void Metadata::LockWaterMark() const
595 {
596     waterMarkMutex_.lock();
597 }
598 
UnlockWaterMark() const599 void Metadata::UnlockWaterMark() const
600 {
601     waterMarkMutex_.unlock();
602 }
603 
GetWaterMarkInfoFromDB(const std::string & dev,bool isNeedHash,WatermarkInfo & info)604 int Metadata::GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info)
605 {
606     // read from db avoid watermark update in diff process
607     std::lock_guard<std::mutex> lockGuard(metadataLock_);
608     MetaDataValue metadata;
609     int errCode = GetMetaDataValue(dev, metadata, isNeedHash);
610     if (errCode == -E_NOT_FOUND) {
611         LOGD("[Metadata] not found meta value");
612         return E_OK;
613     }
614     if (errCode != E_OK) {
615         LOGE("[Metadata] reload meta value failed %d", errCode);
616         return errCode;
617     }
618 
619     info.sendMark = metadata.localWaterMark;
620     info.receiveMark = metadata.peerWaterMark;
621     return E_OK;
622 }
623 
ClearAllAbilitySyncFinishMark()624 int Metadata::ClearAllAbilitySyncFinishMark()
625 {
626     return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK) |
627         static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION));
628 }
629 
ClearAllTimeSyncFinishMark()630 int Metadata::ClearAllTimeSyncFinishMark()
631 {
632     return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK) |
633         static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET) |
634         static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK));
635 }
636 
ClearAllMetaDataValue(uint32_t innerClearAction)637 int Metadata::ClearAllMetaDataValue(uint32_t innerClearAction)
638 {
639     std::vector<std::vector<uint8_t>> metaDataKeys;
640     int errCode = GetAllMetadataKey(metaDataKeys);
641     if (errCode != E_OK) {
642         LOGE("[Metadata][ClearAllMetaDataValue] get all metadata key failed err=%d", errCode);
643         return errCode;
644     }
645 
646     std::lock_guard<std::mutex> lockGuard(metadataLock_);
647     for (const auto &deviceId : metaDataKeys) {
648         if (!IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) {
649             continue;
650         }
651         MetaDataValue metaData;
652         int innerErrCode = GetMetaDataValueFromDB(deviceId, metaData);
653         if (innerErrCode != E_OK) {
654             LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " get meta data failed %d", innerClearAction,
655                 innerErrCode);
656             errCode = errCode == E_OK ? innerErrCode : errCode;
657             continue;
658         }
659         ClearMetaDataValue(innerClearAction, metaData);
660         std::vector<uint8_t> value;
661         innerErrCode = SerializeMetaData(metaData, value);
662         // clear sync mark without transaction here
663         // just ignore error metadata value
664         if (innerErrCode != E_OK) {
665             LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " serialize meta data failed %d", innerClearAction,
666                 innerErrCode);
667             errCode = errCode == E_OK ? innerErrCode : errCode;
668             continue;
669         }
670 
671         innerErrCode = SetMetadataToDb(deviceId, value);
672         if (innerErrCode != E_OK) {
673             LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " save meta data failed %d", innerClearAction,
674                 innerErrCode);
675             errCode = errCode == E_OK ? innerErrCode : errCode;
676         }
677     }
678     if (errCode == E_OK) {
679         LOGD("[Metadata][ClearAllMetaDataValue] success clear action %" PRIu32, innerClearAction);
680     }
681     return errCode;
682 }
683 
ClearMetaDataValue(uint32_t innerClearAction,MetaDataValue & metaDataValue)684 void Metadata::ClearMetaDataValue(uint32_t innerClearAction, MetaDataValue &metaDataValue)
685 {
686     auto mark = static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK);
687     if ((innerClearAction & mark) == mark) {
688         metaDataValue.syncMark =
689             DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_ABILITY_SYNC));
690     }
691     mark = static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK);
692     if ((innerClearAction & mark) == mark) {
693         metaDataValue.syncMark =
694             DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_SYNC));
695     }
696     mark = static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION);
697     if ((innerClearAction & mark) == mark) {
698         metaDataValue.remoteSchemaVersion = 0;
699     }
700     mark = static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET);
701     if ((innerClearAction & mark) == mark) {
702         metaDataValue.systemTimeOffset = 0;
703     }
704     mark = static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK);
705     if ((innerClearAction & mark) == mark) {
706         metaDataValue.syncMark |= static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
707     }
708 }
709 
SetAbilitySyncFinishMark(const std::string & deviceId,bool finish)710 int Metadata::SetAbilitySyncFinishMark(const std::string &deviceId, bool finish)
711 {
712     return SetSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC, finish);
713 }
714 
IsAbilitySyncFinish(const std::string & deviceId)715 bool Metadata::IsAbilitySyncFinish(const std::string &deviceId)
716 {
717     return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC);
718 }
719 
SetTimeSyncFinishMark(const std::string & deviceId,bool finish)720 int Metadata::SetTimeSyncFinishMark(const std::string &deviceId, bool finish)
721 {
722     return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC, finish);
723 }
724 
SetTimeChangeMark(const std::string & deviceId,bool change)725 int Metadata::SetTimeChangeMark(const std::string &deviceId, bool change)
726 {
727     return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE, change);
728 }
729 
IsTimeSyncFinish(const std::string & deviceId)730 bool Metadata::IsTimeSyncFinish(const std::string &deviceId)
731 {
732     return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC);
733 }
734 
IsTimeChange(const std::string & deviceId)735 bool Metadata::IsTimeChange(const std::string &deviceId)
736 {
737     return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE);
738 }
739 
SetSyncMark(const std::string & deviceId,SyncMark syncMark,bool finish)740 int Metadata::SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish)
741 {
742     MetaDataValue metadata;
743     std::lock_guard<std::mutex> lockGuard(metadataLock_);
744     GetMetaDataValue(deviceId, metadata, true);
745     auto mark = static_cast<uint64_t>(syncMark);
746     if (finish) {
747         metadata.syncMark |= mark;
748     } else {
749         metadata.syncMark = DBCommon::EraseBit(metadata.syncMark, mark);
750     }
751     LOGD("[Metadata] Mark:%" PRIx64 " sync finish:%d sync mark:%" PRIu64, mark, static_cast<int>(finish),
752         metadata.syncMark);
753     return SaveMetaDataValue(deviceId, metadata);
754 }
755 
IsContainSyncMark(const std::string & deviceId,SyncMark syncMark)756 bool Metadata::IsContainSyncMark(const std::string &deviceId, SyncMark syncMark)
757 {
758     MetaDataValue metadata;
759     std::lock_guard<std::mutex> lockGuard(metadataLock_);
760     GetMetaDataValue(deviceId, metadata, true);
761     auto mark = static_cast<uint64_t>(syncMark);
762     return (metadata.syncMark & mark) == mark;
763 }
764 
SetRemoteSchemaVersion(const std::string & deviceId,uint64_t schemaVersion)765 int Metadata::SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion)
766 {
767     MetaDataValue metadata;
768     std::lock_guard<std::mutex> lockGuard(metadataLock_);
769     GetMetaDataValue(deviceId, metadata, true);
770     metadata.remoteSchemaVersion = schemaVersion;
771     LOGI("[Metadata] Set %.3s schema version %" PRIu64, deviceId.c_str(), schemaVersion);
772     int errCode = SaveMetaDataValue(deviceId, metadata);
773     if (errCode != E_OK) {
774         LOGW("[Metadata] Set remote schema version failed");
775     }
776     return errCode;
777 }
778 
GetRemoteSchemaVersion(const std::string & deviceId)779 uint64_t Metadata::GetRemoteSchemaVersion(const std::string &deviceId)
780 {
781     MetaDataValue metadata;
782     std::lock_guard<std::mutex> lockGuard(metadataLock_);
783     GetMetaDataValue(deviceId, metadata, true);
784     LOGI("[Metadata] Get %.3s schema version %" PRIu64, deviceId.c_str(), metadata.remoteSchemaVersion);
785     return metadata.remoteSchemaVersion;
786 }
787 
SetSystemTimeOffset(const std::string & deviceId,int64_t systemTimeOffset)788 int Metadata::SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset)
789 {
790     MetaDataValue metadata;
791     std::lock_guard<std::mutex> lockGuard(metadataLock_);
792     GetMetaDataValue(deviceId, metadata, true);
793     metadata.systemTimeOffset = systemTimeOffset;
794     LOGI("[Metadata] Set %.3s systemTimeOffset %" PRId64, deviceId.c_str(), systemTimeOffset);
795     return SaveMetaDataValue(deviceId, metadata);
796 }
797 
GetSystemTimeOffset(const std::string & deviceId)798 int64_t Metadata::GetSystemTimeOffset(const std::string &deviceId)
799 {
800     MetaDataValue metadata;
801     std::lock_guard<std::mutex> lockGuard(metadataLock_);
802     GetMetaDataValue(deviceId, metadata, true);
803     LOGI("[Metadata] Get %.3s systemTimeOffset %" PRId64, deviceId.c_str(), metadata.systemTimeOffset);
804     return metadata.systemTimeOffset;
805 }
806 
GetLocalSchemaVersion()807 std::pair<int, uint64_t> Metadata::GetLocalSchemaVersion()
808 {
809     std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
810     auto [errCode, localMetaData] = GetLocalMetaData();
811     if (errCode != E_OK) {
812         return {errCode, 0};
813     }
814     LOGI("[Metadata] Get local schema version %" PRIu64, localMetaData.localSchemaVersion);
815     return {errCode, localMetaData.localSchemaVersion};
816 }
817 
SetLocalSchemaVersion(uint64_t schemaVersion)818 int Metadata::SetLocalSchemaVersion(uint64_t schemaVersion)
819 {
820     std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
821     auto [errCode, localMetaData] = GetLocalMetaData();
822     if (errCode != E_OK) {
823         return errCode;
824     }
825     localMetaData.localSchemaVersion = schemaVersion;
826     LOGI("[Metadata] Set local schema version %" PRIu64, schemaVersion);
827     return SaveLocalMetaData(localMetaData);
828 }
829 
SaveLocalMetaData(const LocalMetaData & localMetaData)830 int Metadata::SaveLocalMetaData(const LocalMetaData &localMetaData)
831 {
832     auto [errCode, value] = SerializeLocalMetaData(localMetaData);
833     if (errCode != E_OK) {
834         LOGE("[Metadata] Serialize local meta data failed %d", errCode);
835         return errCode;
836     }
837     std::string k(LOCAL_META_DATA_KEY);
838     Key key(k.begin(), k.end());
839     return SetMetadataToDb(key, value);
840 }
841 
GetLocalMetaData()842 std::pair<int, LocalMetaData> Metadata::GetLocalMetaData()
843 {
844     std::string k(LOCAL_META_DATA_KEY);
845     Key key(k.begin(), k.end());
846     Value value;
847     int errCode = GetMetadataFromDb(key, value);
848     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
849         return {errCode, {}};
850     }
851     return DeSerializeLocalMetaData(value);
852 }
853 
SerializeLocalMetaData(const LocalMetaData & localMetaData)854 std::pair<int, Value> Metadata::SerializeLocalMetaData(const LocalMetaData &localMetaData)
855 {
856     std::pair<int, Value> res = {E_OK, {}};
857     auto &[errCode, value] = res;
858     value.resize(CalculateLocalMetaDataLength());
859     Parcel parcel(value.data(), value.size());
860     (void)parcel.WriteUInt32(localMetaData.version);
861     parcel.EightByteAlign();
862     (void)parcel.WriteUInt64(localMetaData.localSchemaVersion);
863     if (parcel.IsError()) {
864         LOGE("[Metadata] Serialize localMetaData failed");
865         errCode = -E_SERIALIZE_ERROR;
866         value.clear();
867         return res;
868     }
869     return res;
870 }
871 
DeSerializeLocalMetaData(const Value & value)872 std::pair<int, LocalMetaData> Metadata::DeSerializeLocalMetaData(const Value &value)
873 {
874     std::pair<int, LocalMetaData> res;
875     auto &[errCode, meta] = res;
876     if (value.empty()) {
877         errCode = E_OK;
878         return res;
879     }
880     Parcel parcel(const_cast<uint8_t *>(value.data()), value.size());
881     parcel.ReadUInt32(meta.version);
882     if (meta.version >= LOCAL_META_DATA_VERSION_V2) {
883         parcel.EightByteAlign();
884     }
885     parcel.ReadUInt64(meta.localSchemaVersion);
886     if (parcel.IsError()) {
887         LOGE("[Metadata] DeSerialize localMetaData failed");
888         errCode = -E_SERIALIZE_ERROR;
889         return res;
890     }
891     return res;
892 }
893 
CalculateLocalMetaDataLength()894 uint64_t Metadata::CalculateLocalMetaDataLength()
895 {
896     uint64_t length = Parcel::GetUInt32Len(); // version
897     length = Parcel::GetEightByteAlign(length);
898     length += Parcel::GetUInt64Len(); // local schema version
899     return length;
900 }
901 
MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)902 Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)
903     : metadataPtr_(std::move(metadata))
904 {
905     if (metadataPtr_ != nullptr) {
906         metadataPtr_->LockWaterMark();
907     }
908 }
909 
~MetaWaterMarkAutoLock()910 Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock()
911 {
912     if (metadataPtr_ != nullptr) {
913         metadataPtr_->UnlockWaterMark();
914     }
915 }
916 
InitLocalMetaData()917 int Metadata::InitLocalMetaData()
918 {
919     std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
920     auto [errCode, localMetaData] = GetLocalMetaData();
921     if (errCode != E_OK) {
922         return errCode;
923     }
924     if (localMetaData.localSchemaVersion != 0) {
925         return E_OK;
926     }
927     uint64_t curTime = 0u;
928     errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
929     if (errCode != E_OK) {
930         LOGW("[Metadata] get system time failed when init schema version!");
931         localMetaData.localSchemaVersion += 1;
932     } else {
933         localMetaData.localSchemaVersion = curTime;
934     }
935     errCode = SaveLocalMetaData(localMetaData);
936     if (errCode != E_OK) {
937         LOGE("[Metadata] init local schema version failed:%d", errCode);
938     }
939     return errCode;
940 }
941 
GetMetaDataValueFromDB(const std::string & deviceId,bool isNeedHash,MetaDataValue & metaDataValue)942 int Metadata::GetMetaDataValueFromDB(const std::string &deviceId, bool isNeedHash, MetaDataValue &metaDataValue)
943 {
944     DeviceID hashDeviceId;
945     Key devKey;
946     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
947     DBCommon::StringToVector(hashDeviceId, devKey);
948 
949     return GetMetaDataValueFromDB(devKey, metaDataValue);
950 }
951 
GetMetaDataValueFromDB(const Key & key,MetaDataValue & metaDataValue)952 int Metadata::GetMetaDataValueFromDB(const Key &key, MetaDataValue &metaDataValue)
953 {
954     std::vector<uint8_t> value;
955     int errCode = GetMetadataFromDb(key, value);
956     if (errCode != E_OK) {
957         LOGE("[Metadata] Get metadata from db failed %d", errCode);
958         return errCode;
959     }
960     return DeSerializeMetaData(value, metaDataValue);
961 }
962 
GetRemoteSoftwareVersion(const std::string & deviceId)963 uint64_t Metadata::GetRemoteSoftwareVersion(const std::string &deviceId)
964 {
965     MetaDataValue metadata;
966     std::lock_guard<std::mutex> lockGuard(metadataLock_);
967     GetMetaDataValue(deviceId, metadata, true);
968     return metadata.remoteSoftwareVersion;
969 }
970 
SetRemoteSoftwareVersion(const std::string & deviceId,uint64_t version)971 int Metadata::SetRemoteSoftwareVersion(const std::string &deviceId, uint64_t version)
972 {
973     MetaDataValue metadata;
974     std::lock_guard<std::mutex> lockGuard(metadataLock_);
975     GetMetaDataValue(deviceId, metadata, true);
976     metadata.remoteSoftwareVersion = version;
977     LOGI("[Metadata] Set %.3s version %" PRId64, deviceId.c_str(), version);
978     return SaveMetaDataValue(deviceId, metadata);
979 }
980 }  // namespace DistributedDB