• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "query_sync_water_mark_helper.h"
17 
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25 
26 namespace DistributedDB {
27 namespace {
28     const uint32_t MAX_STORE_ITEMS = 100000;
29     // WaterMark Version
30     constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31     constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 }
33 
QuerySyncWaterMarkHelper()34 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
35     : storage_(nullptr)
36 {}
37 
~QuerySyncWaterMarkHelper()38 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
39 {
40     storage_ = nullptr;
41     deviceIdToHashQuerySyncIdMap_.clear();
42     deviceIdToHashDeleteSyncIdMap_.clear();
43 }
44 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)45 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
46 {
47     if (storage_ == nullptr) {
48         return -E_INVALID_DB;
49     }
50     return storage_->GetMetaData(key, outValue);
51 }
52 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)53 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
54 {
55     if (storage_ == nullptr) {
56         return -E_INVALID_DB;
57     }
58     return storage_->PutMetaData(key, inValue, false);
59 }
60 
DeleteMetaDataFromDB(const std::vector<Key> & keys) const61 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
62 {
63     if (storage_ == nullptr) {
64         return -E_INVALID_DB;
65     }
66     return storage_->DeleteMetaData(keys);
67 }
68 
Initialize(ISyncInterface * storage)69 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
70 {
71     storage_ = storage;
72     return E_OK;
73 }
74 
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)75 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
76 {
77     std::vector<uint8_t> value;
78     int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
79     if (errCode != E_OK) {
80         return errCode;
81     }
82     DeleteWaterMark deleteWaterMark;
83     std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
84     errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
85     if (errCode != E_OK) {
86         return errCode;
87     }
88     return errCode;
89 }
90 
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)91 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
92     QueryWaterMark &queryWaterMark)
93 {
94     // second get from db
95     int errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
96     if (errCode == -E_NOT_FOUND) {
97         // third generate one and save to db
98         errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
99     }
100     // something error return
101     if (errCode != E_OK) {
102         LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
103     }
104     return errCode;
105 }
106 
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,QueryWaterMark & queryWaterMark)107 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
108     QueryWaterMark &queryWaterMark)
109 {
110     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
111     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
112     return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
113 }
114 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)115 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
116     const std::string &deviceId, const WaterMark &waterMark)
117 {
118     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
119     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
120     return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
121 }
122 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)123 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
124     const std::string &deviceId, const Timestamp &timestamp)
125 {
126     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
127     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
128     QueryWaterMark queryWaterMark;
129     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
130     if (errCode != E_OK) {
131         return errCode;
132     }
133     queryWaterMark.lastQueryTime = timestamp;
134     return UpdateCacheAndSave(cacheKey, queryWaterMark);
135 }
136 
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)137 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
138     const WaterMark &waterMark)
139 {
140     QueryWaterMark queryWaterMark;
141     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
142     if (errCode != E_OK) {
143         return errCode;
144     }
145     queryWaterMark.recvWaterMark = waterMark;
146     return UpdateCacheAndSave(cacheKey, queryWaterMark);
147 }
148 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)149 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
150     const std::string &deviceId, const WaterMark &waterMark)
151 {
152     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
153     QueryWaterMark queryWaterMark;
154     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
155     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
156     if (errCode != E_OK) {
157         return errCode;
158     }
159     queryWaterMark.sendWaterMark = waterMark;
160     return UpdateCacheAndSave(cacheKey, queryWaterMark);
161 }
162 
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)163 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
164     QueryWaterMark &queryWaterMark)
165 {
166     // update lastUsedTime
167     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
168     if (errCode != E_OK) {
169         return errCode;
170     }
171     // save db
172     return SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
173 }
174 
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)175 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
176 {
177     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
178     if (errCode != E_OK) {
179         return errCode;
180     }
181     queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
182     return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
183 }
184 
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)185 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
186 {
187     // serialize value
188     Value dbValue;
189     int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
190     if (errCode != E_OK) {
191         return errCode;
192     }
193     // serialize key
194     Key dbKey;
195     DBCommon::StringToVector(dbKeyString, dbKey);
196     // save
197     errCode = SetMetadataToDb(dbKey, dbValue);
198     if (errCode != E_OK) {
199         LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
200     }
201     return errCode;
202 }
203 
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)204 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
205 {
206     // serialize key
207     Key dbKey;
208     DBCommon::StringToVector(dbKeyString, dbKey);
209     // search in db
210     Value dbValue;
211     int errCode = GetMetadataFromDb(dbKey, dbValue);
212     if (errCode != E_OK) {
213         return errCode;
214     }
215     return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
216 }
217 
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)218 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
219 {
220     uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
221     outValue.resize(length);
222     Parcel parcel(outValue.data(), outValue.size());
223     parcel.WriteUInt32(queryWaterMark.version);
224     parcel.EightByteAlign();
225     parcel.WriteUInt64(queryWaterMark.sendWaterMark);
226     parcel.WriteUInt64(queryWaterMark.recvWaterMark);
227     parcel.WriteUInt64(queryWaterMark.lastUsedTime);
228     parcel.WriteString(queryWaterMark.sql);
229     parcel.WriteUInt64(queryWaterMark.lastQueryTime);
230     if (parcel.IsError()) {
231         LOGE("[Meta] Parcel error when serialize queryWaterMark");
232         return -E_PARSE_FAIL;
233     }
234     return E_OK;
235 }
236 
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)237 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
238 {
239     Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
240     parcel.ReadUInt32(queryWaterMark.version);
241     parcel.EightByteAlign();
242     parcel.ReadUInt64(queryWaterMark.sendWaterMark);
243     parcel.ReadUInt64(queryWaterMark.recvWaterMark);
244     parcel.ReadUInt64(queryWaterMark.lastUsedTime);
245     parcel.ReadString(queryWaterMark.sql);
246     if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
247         parcel.ReadUInt64(queryWaterMark.lastQueryTime);
248     }
249     if (parcel.IsError()) {
250         LOGE("[Meta] Parcel error when deserialize queryWaterMark");
251         return -E_PARSE_FAIL;
252     }
253     return E_OK;
254 }
255 
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)256 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
257 {
258     uint64_t length = Parcel::GetUInt32Len(); // version
259     length = Parcel::GetEightByteAlign(length);
260     length += Parcel::GetUInt64Len(); // sendWaterMark
261     length += Parcel::GetUInt64Len(); // recvWaterMark
262     length += Parcel::GetUInt64Len(); // lastUsedTime
263     length += Parcel::GetStringLen(queryWaterMark.sql);
264     length += Parcel::GetUInt64Len(); // lastQueryTime
265     return length;
266 }
267 
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & queryId)268 DeviceID QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &queryId)
269 {
270     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
271     DeviceID hashQuerySyncId;
272     if (deviceIdToHashQuerySyncIdMap_[deviceId].count(queryId) == 0) {
273         // do not modify this
274         hashQuerySyncId = DBConstant::QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId;
275         deviceIdToHashQuerySyncIdMap_[deviceId][queryId] = hashQuerySyncId;
276     } else {
277         hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceId][queryId];
278     }
279     return hashQuerySyncId;
280 }
281 
GetDeleteSyncWaterMark(const std::string & deviceId,DeleteWaterMark & deleteWaterMark)282 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark)
283 {
284     std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
285     // lock prevent different thread visit deleteSyncCache_
286     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
287     return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
288 }
289 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)290 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
291 {
292     std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
293     DeleteWaterMark deleteWaterMark;
294     // lock prevent different thread visit deleteSyncCache_
295     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
296     int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
297     if (errCode != E_OK) {
298         return errCode;
299     }
300     deleteWaterMark.sendWaterMark = waterMark;
301     return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
302 }
303 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)304 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark,
305     bool isNeedHash)
306 {
307     std::string hashId = GetHashDeleteSyncDeviceId(deviceId, isNeedHash);
308     DeleteWaterMark deleteWaterMark;
309     // lock prevent different thread visit deleteSyncCache_
310     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
311     int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
312     if (errCode != E_OK) {
313         return errCode;
314     }
315     deleteWaterMark.recvWaterMark = waterMark;
316     return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
317 }
318 
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)319 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
320     const DeleteWaterMark &deleteWaterMark)
321 {
322     // save db
323     return SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
324 }
325 
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)326 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
327     DeleteWaterMark &deleteWaterMark)
328 {
329     deleteWaterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
330     int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, deleteWaterMark);
331     if (errCode == -E_NOT_FOUND) {
332         deleteWaterMark.sendWaterMark = 0;
333         deleteWaterMark.recvWaterMark = 0;
334         errCode = E_OK;
335     }
336     if (errCode != E_OK) {
337         LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
338     }
339     return errCode;
340 }
341 
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)342 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
343     DeleteWaterMark &deleteWaterMark)
344 {
345     Key dbKey;
346     DBCommon::StringToVector(hashDeviceId, dbKey);
347     // search in db
348     Value dbValue;
349     int errCode = GetMetadataFromDb(dbKey, dbValue);
350     if (errCode != E_OK) {
351         return errCode;
352     }
353     // serialize value
354     return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
355 }
356 
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)357 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
358     const DeleteWaterMark &deleteWaterMark)
359 {
360     // serialize value
361     Value dbValue;
362     int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
363     if (errCode != E_OK) {
364         return errCode;
365     }
366     Key dbKey;
367     DBCommon::StringToVector(hashDeviceId, dbKey);
368     // save
369     errCode = SetMetadataToDb(dbKey, dbValue);
370     if (errCode != E_OK) {
371         LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
372     }
373     return errCode;
374 }
375 
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,bool isNeedHash)376 DeviceID QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, bool isNeedHash)
377 {
378     DeviceID hashDeleteSyncId;
379     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
380     if (deviceIdToHashDeleteSyncIdMap_.count(deviceId) == 0) {
381         hashDeleteSyncId = DBConstant::DELETE_SYNC_PREFIX_KEY +
382             (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
383         deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeleteSyncId));
384     } else {
385         hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceId];
386     }
387     return hashDeleteSyncId;
388 }
389 
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)390 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
391     std::vector<uint8_t> &outValue)
392 {
393     uint64_t length = CalculateDeleteWaterMarkSize();
394     outValue.resize(length);
395     Parcel parcel(outValue.data(), outValue.size());
396     parcel.WriteUInt32(deleteWaterMark.version);
397     parcel.EightByteAlign();
398     parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
399     parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
400     if (parcel.IsError()) {
401         LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
402         return -E_PARSE_FAIL;
403     }
404     return E_OK;
405 }
406 
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)407 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
408     DeleteWaterMark &deleteWaterMark)
409 {
410     Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
411     parcel.ReadUInt32(deleteWaterMark.version);
412     parcel.EightByteAlign();
413     parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
414     parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
415     if (parcel.IsError()) {
416         LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
417         return -E_PARSE_FAIL;
418     }
419     return E_OK;
420 }
421 
CalculateDeleteWaterMarkSize()422 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
423 {
424     uint64_t length = Parcel::GetUInt32Len(); // version
425     length = Parcel::GetEightByteAlign(length);
426     length += Parcel::GetUInt64Len(); // sendWaterMark
427     length += Parcel::GetUInt64Len(); // recvWaterMark
428     return length;
429 }
430 
GetQuerySyncPrefixKey()431 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
432 {
433     return DBConstant::QUERY_SYNC_PREFIX_KEY;
434 }
435 
GetDeleteSyncPrefixKey()436 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
437 {
438     return DBConstant::DELETE_SYNC_PREFIX_KEY;
439 }
440 
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)441 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
442 {
443     if (querySyncIds.size() < MAX_STORE_ITEMS) {
444         return E_OK;
445     }
446     std::vector<std::pair<std::string, Timestamp>> allItems;
447     std::map<std::string, std::vector<uint8_t>> idMap;
448     std::vector<std::vector<uint8_t>> waitToRemove;
449     for (const auto &id : querySyncIds) {
450         Value value;
451         int errCode = GetMetadataFromDb(id, value);
452         if (errCode != E_OK) {
453             waitToRemove.push_back(id);
454             continue; // may be this failure cause by wrong data
455         }
456         QueryWaterMark queryWaterMark;
457         std::string queryKey(id.begin(), id.end());
458         errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
459         if (errCode != E_OK) {
460             waitToRemove.push_back(id);
461             continue; // may be this failure cause by wrong data
462         }
463         idMap.insert({queryKey, id});
464         allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
465     }
466     // we only remove broken data below
467     // 1. common data size less then 10w
468     // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
469     // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
470     if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
471         // remove in db
472         return DeleteMetaDataFromDB(waitToRemove);
473     }
474     uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
475     // quick select the k_th least used
476     std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
477         [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
478             return w1.second < w2.second;
479         });
480     for (uint32_t i = 0; i < removeCount; ++i) {
481         waitToRemove.push_back(idMap[allItems[i].first]);
482     }
483     // remove in db
484     return DeleteMetaDataFromDB(waitToRemove);
485 }
486 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)487 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName,
488     bool isNeedHash)
489 {
490     // lock prevent other thread modify queryWaterMark at this moment
491     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
492     std::string prefixKeyStr = DBConstant::QUERY_SYNC_PREFIX_KEY +
493         (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
494     if (!tableName.empty()) {
495         std::string hashTableName = DBCommon::TransferHashString(tableName);
496         std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
497         prefixKeyStr += hexTableName;
498     }
499 
500     // remove in db
501     Key prefixKey;
502     DBCommon::StringToVector(prefixKeyStr, prefixKey);
503     int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
504     if (errCode != E_OK) {
505         LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
506         return errCode;
507     }
508     return E_OK;
509 }
510 }  // namespace DistributedDB