• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "query_sync_water_mark_helper.h"
17 
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25 
26 namespace DistributedDB {
27 namespace {
28     const uint32_t MAX_STORE_ITEMS = 100000;
29     // WaterMark Version
30     constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31     constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 }
33 
QuerySyncWaterMarkHelper()34 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
35     : storage_(nullptr)
36 {}
37 
~QuerySyncWaterMarkHelper()38 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
39 {
40     storage_ = nullptr;
41     deviceIdToHashQuerySyncIdMap_.clear();
42     deviceIdToHashDeleteSyncIdMap_.clear();
43 }
44 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)45 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
46 {
47     if (storage_ == nullptr) {
48         return -E_INVALID_DB;
49     }
50     return storage_->GetMetaData(key, outValue);
51 }
52 
GetMetadataByPrefixKeyFromDb(const Key & prefixKey,std::map<Key,Value> & data)53 int QuerySyncWaterMarkHelper::GetMetadataByPrefixKeyFromDb(const Key &prefixKey, std::map<Key, Value> &data)
54 {
55     if (storage_ == nullptr) {
56         return -E_INVALID_DB;
57     }
58     return storage_->GetMetaDataByPrefixKey(prefixKey, data);
59 }
60 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)61 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
62 {
63     if (storage_ == nullptr) {
64         return -E_INVALID_DB;
65     }
66     return storage_->PutMetaData(key, inValue, false);
67 }
68 
DeleteMetaDataFromDB(const std::vector<Key> & keys) const69 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
70 {
71     if (storage_ == nullptr) {
72         return -E_INVALID_DB;
73     }
74     return storage_->DeleteMetaData(keys);
75 }
76 
Initialize(ISyncInterface * storage)77 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
78 {
79     storage_ = storage;
80     return E_OK;
81 }
82 
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)83 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
84 {
85     std::vector<uint8_t> value;
86     int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
87     if (errCode != E_OK) {
88         return errCode;
89     }
90     DeleteWaterMark deleteWaterMark;
91     std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
92     errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
93     if (errCode != E_OK) {
94         return errCode;
95     }
96     return errCode;
97 }
98 
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)99 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
100     QueryWaterMark &queryWaterMark)
101 {
102     // second get from db
103     int errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
104     if (errCode == -E_NOT_FOUND) {
105         // third generate one and save to db
106         errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
107     }
108     // something error return
109     if (errCode != E_OK) {
110         LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
111     }
112     return errCode;
113 }
114 
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,QueryWaterMark & queryWaterMark)115 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
116     const std::string &userId, QueryWaterMark &queryWaterMark)
117 {
118     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
119     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
120     return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
121 }
122 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,const WaterMark & waterMark)123 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
124     const std::string &deviceId, const std::string &userId, const WaterMark &waterMark)
125 {
126     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
127     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
128     return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
129 }
130 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,const Timestamp & timestamp)131 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
132     const std::string &deviceId, const std::string &userId, const Timestamp &timestamp)
133 {
134     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
135     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
136     QueryWaterMark queryWaterMark;
137     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
138     if (errCode != E_OK) {
139         return errCode;
140     }
141     queryWaterMark.lastQueryTime = timestamp;
142     return UpdateCacheAndSave(cacheKey, queryWaterMark);
143 }
144 
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)145 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
146     const WaterMark &waterMark)
147 {
148     QueryWaterMark queryWaterMark;
149     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
150     if (errCode != E_OK) {
151         return errCode;
152     }
153     queryWaterMark.recvWaterMark = waterMark;
154     return UpdateCacheAndSave(cacheKey, queryWaterMark);
155 }
156 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,const WaterMark & waterMark)157 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
158     const std::string &deviceId, const std::string &userId, const WaterMark &waterMark)
159 {
160     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
161     QueryWaterMark queryWaterMark;
162     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
163     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
164     if (errCode != E_OK) {
165         return errCode;
166     }
167     queryWaterMark.sendWaterMark = waterMark;
168     return UpdateCacheAndSave(cacheKey, queryWaterMark);
169 }
170 
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)171 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
172     QueryWaterMark &queryWaterMark)
173 {
174     // update lastUsedTime
175     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
176     if (errCode != E_OK) {
177         return errCode;
178     }
179     // save db
180     return SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
181 }
182 
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)183 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
184 {
185     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
186     if (errCode != E_OK) {
187         return errCode;
188     }
189     queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
190     return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
191 }
192 
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)193 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
194 {
195     // serialize value
196     Value dbValue;
197     int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
198     if (errCode != E_OK) {
199         return errCode;
200     }
201     // serialize key
202     Key dbKey;
203     DBCommon::StringToVector(dbKeyString, dbKey);
204     // save
205     errCode = SetMetadataToDb(dbKey, dbValue);
206     if (errCode != E_OK) {
207         LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
208     }
209     return errCode;
210 }
211 
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)212 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
213 {
214     // serialize key
215     Key dbKey;
216     DBCommon::StringToVector(dbKeyString, dbKey);
217     // search in db
218     Value dbValue;
219     int errCode = GetMetadataFromDb(dbKey, dbValue);
220     if (errCode != E_OK) {
221         return errCode;
222     }
223     return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
224 }
225 
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)226 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
227 {
228     uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
229     outValue.resize(length);
230     Parcel parcel(outValue.data(), outValue.size());
231     parcel.WriteUInt32(queryWaterMark.version);
232     parcel.EightByteAlign();
233     parcel.WriteUInt64(queryWaterMark.sendWaterMark);
234     parcel.WriteUInt64(queryWaterMark.recvWaterMark);
235     parcel.WriteUInt64(queryWaterMark.lastUsedTime);
236     parcel.WriteString(queryWaterMark.sql);
237     parcel.WriteUInt64(queryWaterMark.lastQueryTime);
238     if (parcel.IsError()) {
239         LOGE("[Meta] Parcel error when serialize queryWaterMark");
240         return -E_PARSE_FAIL;
241     }
242     return E_OK;
243 }
244 
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)245 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
246 {
247     Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
248     parcel.ReadUInt32(queryWaterMark.version);
249     parcel.EightByteAlign();
250     parcel.ReadUInt64(queryWaterMark.sendWaterMark);
251     parcel.ReadUInt64(queryWaterMark.recvWaterMark);
252     parcel.ReadUInt64(queryWaterMark.lastUsedTime);
253     parcel.ReadString(queryWaterMark.sql);
254     if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
255         parcel.ReadUInt64(queryWaterMark.lastQueryTime);
256     }
257     if (parcel.IsError()) {
258         LOGE("[Meta] Parcel error when deserialize queryWaterMark");
259         return -E_PARSE_FAIL;
260     }
261     return E_OK;
262 }
263 
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)264 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
265 {
266     uint64_t length = Parcel::GetUInt32Len(); // version
267     length = Parcel::GetEightByteAlign(length);
268     length += Parcel::GetUInt64Len(); // sendWaterMark
269     length += Parcel::GetUInt64Len(); // recvWaterMark
270     length += Parcel::GetUInt64Len(); // lastUsedTime
271     length += Parcel::GetStringLen(queryWaterMark.sql);
272     length += Parcel::GetUInt64Len(); // lastQueryTime
273     return length;
274 }
275 
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & userId,const DeviceID & queryId)276 DeviceID QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &userId,
277     const DeviceID &queryId)
278 {
279     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
280     DeviceID hashQuerySyncId;
281     DeviceSyncTarget deviceInfo(deviceId, userId);
282     if (deviceIdToHashQuerySyncIdMap_[deviceInfo].count(queryId) == 0) {
283         // do not modify this
284         hashQuerySyncId = DBConstant::QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId +
285             DBConstant::USERID_PREFIX_KEY + userId;
286         deviceIdToHashQuerySyncIdMap_[deviceInfo][queryId] = hashQuerySyncId;
287     } else {
288         hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceInfo][queryId];
289     }
290     return hashQuerySyncId;
291 }
292 
GetDeleteSyncWaterMark(const std::string & deviceId,const std::string & userId,DeleteWaterMark & deleteWaterMark)293 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, const std::string &userId,
294     DeleteWaterMark &deleteWaterMark)
295 {
296     std::string hashId = GetHashDeleteSyncDeviceId(deviceId, userId);
297     // lock prevent different thread visit deleteSyncCache_
298     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
299     return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
300 }
301 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const std::string & userId,const WaterMark & waterMark)302 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const std::string &userId,
303     const WaterMark &waterMark)
304 {
305     std::string hashId = GetHashDeleteSyncDeviceId(deviceId, userId);
306     DeleteWaterMark deleteWaterMark;
307     // lock prevent different thread visit deleteSyncCache_
308     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
309     int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
310     if (errCode != E_OK) {
311         return errCode;
312     }
313     deleteWaterMark.sendWaterMark = waterMark;
314     return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
315 }
316 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const std::string & userId,const WaterMark & waterMark,bool isNeedHash)317 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const std::string &userId,
318     const WaterMark &waterMark, bool isNeedHash)
319 {
320     std::string hashId = GetHashDeleteSyncDeviceId(deviceId, userId, isNeedHash);
321     // lock prevent different thread visit deleteSyncCache_
322     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
323     int errCode = E_OK;
324     std::map<Key, DeleteWaterMark> deleteWaterMarks;
325     if (!userId.empty()) {
326         DeleteWaterMark deleteWaterMark;
327         Key dbKey;
328         DBCommon::StringToVector(hashId, dbKey);
329         errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
330         deleteWaterMark.recvWaterMark = waterMark;
331         deleteWaterMarks[dbKey] = deleteWaterMark;
332     } else {
333         errCode = GetDeleteWatersMarkFromDB(hashId, deleteWaterMarks);
334     }
335     if (errCode != E_OK) {
336         return errCode;
337     }
338     for (auto &deleteWaterMark : deleteWaterMarks) {
339         deleteWaterMark.second.recvWaterMark = waterMark;
340         errCode = UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark.second);
341         if (errCode != E_OK) {
342             return errCode;
343         }
344     }
345     return E_OK;
346 }
347 
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)348 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
349     const DeleteWaterMark &deleteWaterMark)
350 {
351     // save db
352     return SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
353 }
354 
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)355 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
356     DeleteWaterMark &deleteWaterMark)
357 {
358     deleteWaterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
359     int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, deleteWaterMark);
360     if (errCode == -E_NOT_FOUND) {
361         deleteWaterMark.sendWaterMark = 0;
362         deleteWaterMark.recvWaterMark = 0;
363         errCode = E_OK;
364     }
365     if (errCode != E_OK) {
366         LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
367     }
368     return errCode;
369 }
370 
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)371 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
372     DeleteWaterMark &deleteWaterMark)
373 {
374     Key dbKey;
375     DBCommon::StringToVector(hashDeviceId, dbKey);
376     // search in db
377     Value dbValue;
378     int errCode = GetMetadataFromDb(dbKey, dbValue);
379     if (errCode != E_OK) {
380         return errCode;
381     }
382     // serialize value
383     return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
384 }
385 
GetDeleteWatersMarkFromDB(const DeviceID & hashId,std::map<Key,DeleteWaterMark> & deleteWaterMarks)386 int QuerySyncWaterMarkHelper::GetDeleteWatersMarkFromDB(const DeviceID &hashId,
387     std::map<Key, DeleteWaterMark> &deleteWaterMarks)
388 {
389     Key dbKeyPrefix;
390     DBCommon::StringToVector(hashId, dbKeyPrefix);
391     // search in db
392     std::map<Key, Value> dbData;
393     int errCode = GetMetadataByPrefixKeyFromDb(dbKeyPrefix, dbData);
394     if (errCode == -E_NOT_FOUND) {
395         DeleteWaterMark deleteWaterMark;
396         deleteWaterMarks[dbKeyPrefix] = deleteWaterMark;
397         return E_OK;
398     }
399     if (errCode != E_OK) {
400         return errCode;
401     }
402     // serialize value
403     for (const auto &oneDbData : dbData) {
404         DeleteWaterMark deleteWaterMark;
405         errCode = DeSerializeDeleteWaterMark(oneDbData.second, deleteWaterMark);
406         if (errCode != E_OK) {
407             return errCode;
408         }
409         deleteWaterMarks[oneDbData.first] = deleteWaterMark;
410     }
411     return E_OK;
412 }
413 
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)414 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
415     const DeleteWaterMark &deleteWaterMark)
416 {
417     // serialize value
418     Value dbValue;
419     int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
420     if (errCode != E_OK) {
421         return errCode;
422     }
423     Key dbKey;
424     DBCommon::StringToVector(hashDeviceId, dbKey);
425     // save
426     errCode = SetMetadataToDb(dbKey, dbValue);
427     if (errCode != E_OK) {
428         LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
429     }
430     return errCode;
431 }
432 
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,const DeviceID & userId,bool isNeedHash)433 DeviceID QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, const DeviceID &userId,
434     bool isNeedHash)
435 {
436     DeviceID hashDeleteSyncId;
437     DeviceSyncTarget deviceInfo(deviceId, userId);
438     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
439     if (deviceIdToHashDeleteSyncIdMap_.count(deviceInfo) == 0) {
440         hashDeleteSyncId = DBConstant::DELETE_SYNC_PREFIX_KEY +
441             (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId) + DBConstant::USERID_PREFIX_KEY + userId;
442         deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceSyncTarget, DeviceID>(deviceInfo, hashDeleteSyncId));
443     } else {
444         hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceInfo];
445     }
446     return hashDeleteSyncId;
447 }
448 
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)449 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
450     std::vector<uint8_t> &outValue)
451 {
452     uint64_t length = CalculateDeleteWaterMarkSize();
453     outValue.resize(length);
454     Parcel parcel(outValue.data(), outValue.size());
455     parcel.WriteUInt32(deleteWaterMark.version);
456     parcel.EightByteAlign();
457     parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
458     parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
459     if (parcel.IsError()) {
460         LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
461         return -E_PARSE_FAIL;
462     }
463     return E_OK;
464 }
465 
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)466 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
467     DeleteWaterMark &deleteWaterMark)
468 {
469     Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
470     parcel.ReadUInt32(deleteWaterMark.version);
471     parcel.EightByteAlign();
472     parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
473     parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
474     if (parcel.IsError()) {
475         LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
476         return -E_PARSE_FAIL;
477     }
478     return E_OK;
479 }
480 
CalculateDeleteWaterMarkSize()481 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
482 {
483     uint64_t length = Parcel::GetUInt32Len(); // version
484     length = Parcel::GetEightByteAlign(length);
485     length += Parcel::GetUInt64Len(); // sendWaterMark
486     length += Parcel::GetUInt64Len(); // recvWaterMark
487     return length;
488 }
489 
GetQuerySyncPrefixKey()490 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
491 {
492     return DBConstant::QUERY_SYNC_PREFIX_KEY;
493 }
494 
GetDeleteSyncPrefixKey()495 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
496 {
497     return DBConstant::DELETE_SYNC_PREFIX_KEY;
498 }
499 
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)500 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
501 {
502     if (querySyncIds.size() < MAX_STORE_ITEMS) {
503         return E_OK;
504     }
505     std::vector<std::pair<std::string, Timestamp>> allItems;
506     std::map<std::string, std::vector<uint8_t>> idMap;
507     std::vector<std::vector<uint8_t>> waitToRemove;
508     for (const auto &id : querySyncIds) {
509         Value value;
510         int errCode = GetMetadataFromDb(id, value);
511         if (errCode != E_OK) {
512             waitToRemove.push_back(id);
513             continue; // may be this failure cause by wrong data
514         }
515         QueryWaterMark queryWaterMark;
516         std::string queryKey(id.begin(), id.end());
517         errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
518         if (errCode != E_OK) {
519             waitToRemove.push_back(id);
520             continue; // may be this failure cause by wrong data
521         }
522         idMap.insert({queryKey, id});
523         allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
524     }
525     // we only remove broken data below
526     // 1. common data size less then 10w
527     // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
528     // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
529     if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
530         // remove in db
531         return DeleteMetaDataFromDB(waitToRemove);
532     }
533     uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
534     // quick select the k_th least used
535     std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
536         [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
537             return w1.second < w2.second;
538         });
539     for (uint32_t i = 0; i < removeCount; ++i) {
540         waitToRemove.push_back(idMap[allItems[i].first]);
541     }
542     // remove in db
543     return DeleteMetaDataFromDB(waitToRemove);
544 }
545 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const DeviceID & userId,const std::string & tableName,bool isNeedHash)546 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const DeviceID &userId,
547     const std::string &tableName, bool isNeedHash)
548 {
549     // lock prevent other thread modify queryWaterMark at this moment
550     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
551     std::string prefixKeyStr = DBConstant::QUERY_SYNC_PREFIX_KEY +
552         (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
553     if (!tableName.empty()) {
554         std::string hashTableName = DBCommon::TransferHashString(tableName);
555         std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
556         prefixKeyStr += hexTableName;
557     }
558     if (!userId.empty()) {
559         prefixKeyStr += DBConstant::USERID_PREFIX_KEY + userId;
560     }
561 
562     // remove in db
563     Key prefixKey;
564     DBCommon::StringToVector(prefixKeyStr, prefixKey);
565     int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
566     if (errCode != E_OK) {
567         LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
568         return errCode;
569     }
570     return E_OK;
571 }
572 }  // namespace DistributedDB