• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "query_sync_water_mark_helper.h"
17 
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25 
26 namespace DistributedDB {
27 namespace {
28     const uint32_t MAX_STORE_ITEMS = 100000;
29     // WaterMark Version
30     constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31     constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32     // Prefix Key in db
33     const std::string QUERY_SYNC_PREFIX_KEY = "querySync";
34     const std::string DELETE_SYNC_PREFIX_KEY = "deleteSync";
35 }
36 
QuerySyncWaterMarkHelper()37 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
38     : storage_(nullptr)
39 {}
40 
~QuerySyncWaterMarkHelper()41 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
42 {
43     storage_ = nullptr;
44     deviceIdToHashQuerySyncIdMap_.clear();
45     deleteSyncCache_.clear();
46     deviceIdToHashDeleteSyncIdMap_.clear();
47 }
48 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)49 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
50 {
51     if (storage_ == nullptr) {
52         return -E_INVALID_DB;
53     }
54     return storage_->GetMetaData(key, outValue);
55 }
56 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)57 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
58 {
59     if (storage_ == nullptr) {
60         return -E_INVALID_DB;
61     }
62     return storage_->PutMetaData(key, inValue);
63 }
64 
DeleteMetaDataFromDB(const std::vector<Key> & keys) const65 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
66 {
67     if (storage_ == nullptr) {
68         return -E_INVALID_DB;
69     }
70     return storage_->DeleteMetaData(keys);
71 }
72 
Initialize(ISyncInterface * storage)73 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
74 {
75     storage_ = storage;
76     return E_OK;
77 }
78 
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)79 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
80 {
81     std::vector<uint8_t> value;
82     int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
83     if (errCode != E_OK) {
84         return errCode;
85     }
86     DeleteWaterMark deleteWaterMark;
87     std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
88     errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
89     if (errCode != E_OK) {
90         return errCode;
91     }
92     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
93     deleteSyncCache_[dbKey] = deleteWaterMark;
94     return errCode;
95 }
96 
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)97 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
98     QueryWaterMark &queryWaterMark)
99 {
100     // first get from cache_
101     int errCode = querySyncCache_.Get(cacheKey, queryWaterMark);
102     bool addToCache = false;
103     if (errCode == -E_NOT_FOUND) {
104         // second get from db
105         errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
106         addToCache = true;
107     }
108     if (errCode == -E_NOT_FOUND) {
109         // third generate one and save to db
110         errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
111     }
112     // something error return
113     if (errCode != E_OK) {
114         LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
115         return errCode;
116     }
117     // remember add to cache_
118     if (addToCache) {
119         querySyncCache_.Put(cacheKey, queryWaterMark);
120     }
121     return errCode;
122 }
123 
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,QueryWaterMark & queryWaterMark)124 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
125     QueryWaterMark &queryWaterMark)
126 {
127     std::string cacheKey;
128     GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
129     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
130     return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
131 }
132 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)133 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
134     const std::string &deviceId, const WaterMark &waterMark)
135 {
136     std::string cacheKey;
137     GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
138     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
139     return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
140 }
141 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)142 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
143     const std::string &deviceId, const Timestamp &timestamp)
144 {
145     std::string cacheKey;
146     GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
147     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
148     QueryWaterMark queryWaterMark;
149     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
150     if (errCode != E_OK) {
151         return errCode;
152     }
153     queryWaterMark.lastQueryTime = timestamp;
154     return UpdateCacheAndSave(cacheKey, queryWaterMark);
155 }
156 
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)157 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
158     const WaterMark &waterMark)
159 {
160     QueryWaterMark queryWaterMark;
161     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
162     if (errCode != E_OK) {
163         return errCode;
164     }
165     queryWaterMark.recvWaterMark = waterMark;
166     return UpdateCacheAndSave(cacheKey, queryWaterMark);
167 }
168 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)169 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
170     const std::string &deviceId, const WaterMark &waterMark)
171 {
172     std::string cacheKey;
173     GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
174     QueryWaterMark queryWaterMark;
175     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
176     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
177     if (errCode != E_OK) {
178         return errCode;
179     }
180     queryWaterMark.sendWaterMark = waterMark;
181     return UpdateCacheAndSave(cacheKey, queryWaterMark);
182 }
183 
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)184 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
185     QueryWaterMark &queryWaterMark)
186 {
187     // update lastUsedTime
188     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
189     if (errCode != E_OK) {
190         return errCode;
191     }
192     // save db first
193     errCode = SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
194     if (errCode != E_OK) {
195         return errCode;
196     }
197     querySyncCache_.Put(cacheKey, queryWaterMark);
198     return errCode;
199 }
200 
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)201 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
202 {
203     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
204     if (errCode != E_OK) {
205         return errCode;
206     }
207     queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
208     return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
209 }
210 
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)211 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
212 {
213     // serialize value
214     Value dbValue;
215     int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
216     if (errCode != E_OK) {
217         return errCode;
218     }
219     // serialize key
220     Key dbKey;
221     DBCommon::StringToVector(dbKeyString, dbKey);
222     // save
223     errCode = SetMetadataToDb(dbKey, dbValue);
224     if (errCode != E_OK) {
225         LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
226     }
227     return errCode;
228 }
229 
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)230 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
231 {
232     // serialize key
233     Key dbKey;
234     DBCommon::StringToVector(dbKeyString, dbKey);
235     // search in db
236     Value dbValue;
237     int errCode = GetMetadataFromDb(dbKey, dbValue);
238     if (errCode != E_OK) {
239         return errCode;
240     }
241     return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
242 }
243 
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)244 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
245 {
246     uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
247     outValue.resize(length);
248     Parcel parcel(outValue.data(), outValue.size());
249     parcel.WriteUInt32(queryWaterMark.version);
250     parcel.EightByteAlign();
251     parcel.WriteUInt64(queryWaterMark.sendWaterMark);
252     parcel.WriteUInt64(queryWaterMark.recvWaterMark);
253     parcel.WriteUInt64(queryWaterMark.lastUsedTime);
254     parcel.WriteString(queryWaterMark.sql);
255     parcel.WriteUInt64(queryWaterMark.lastQueryTime);
256     if (parcel.IsError()) {
257         LOGE("[Meta] Parcel error when serialize queryWaterMark");
258         return -E_PARSE_FAIL;
259     }
260     return E_OK;
261 }
262 
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)263 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
264 {
265     Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
266     parcel.ReadUInt32(queryWaterMark.version);
267     parcel.EightByteAlign();
268     parcel.ReadUInt64(queryWaterMark.sendWaterMark);
269     parcel.ReadUInt64(queryWaterMark.recvWaterMark);
270     parcel.ReadUInt64(queryWaterMark.lastUsedTime);
271     parcel.ReadString(queryWaterMark.sql);
272     if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
273         parcel.ReadUInt64(queryWaterMark.lastQueryTime);
274     }
275     if (parcel.IsError()) {
276         LOGE("[Meta] Parcel error when deserialize queryWaterMark");
277         return -E_PARSE_FAIL;
278     }
279     return E_OK;
280 }
281 
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)282 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
283 {
284     uint64_t length = Parcel::GetUInt32Len(); // version
285     length = Parcel::GetEightByteAlign(length);
286     length += Parcel::GetUInt64Len(); // sendWaterMark
287     length += Parcel::GetUInt64Len(); // recvWaterMark
288     length += Parcel::GetUInt64Len(); // lastUsedTime
289     length += Parcel::GetStringLen(queryWaterMark.sql);
290     length += Parcel::GetUInt64Len(); // lastQueryTime
291     return length;
292 }
293 
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & queryId,DeviceID & hashQuerySyncId)294 void QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId,
295     const DeviceID &queryId, DeviceID &hashQuerySyncId)
296 {
297     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
298     if (deviceIdToHashQuerySyncIdMap_[deviceId].count(queryId) == 0) {
299         // do not modify this
300         hashQuerySyncId = QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId;
301         deviceIdToHashQuerySyncIdMap_[deviceId][queryId] = hashQuerySyncId;
302     } else {
303         hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceId][queryId];
304     }
305 }
306 
GetDeleteSyncWaterMark(const std::string & deviceId,DeleteWaterMark & deleteWaterMark)307 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark)
308 {
309     std::string hashId;
310     GetHashDeleteSyncDeviceId(deviceId, hashId);
311     return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
312 }
313 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)314 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
315 {
316     std::string hashId;
317     GetHashDeleteSyncDeviceId(deviceId, hashId);
318     DeleteWaterMark deleteWaterMark;
319     GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
320     deleteWaterMark.sendWaterMark = waterMark;
321     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
322     return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
323 }
324 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)325 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
326 {
327     std::string hashId;
328     GetHashDeleteSyncDeviceId(deviceId, hashId);
329     DeleteWaterMark deleteWaterMark;
330     GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
331     deleteWaterMark.recvWaterMark = waterMark;
332     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
333     return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
334 }
335 
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)336 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
337     const DeleteWaterMark &deleteWaterMark)
338 {
339     // save db first
340     int errCode = SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
341     if (errCode != E_OK) {
342         return errCode;
343     }
344     // modify cache
345     deleteSyncCache_[dbKey] = deleteWaterMark;
346     return errCode;
347 }
348 
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)349 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
350     DeleteWaterMark &deleteWaterMark)
351 {
352     // lock prevent different thread visit deleteSyncCache_
353     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
354     // if not found
355     if (deleteSyncCache_.find(hashDeviceId) == deleteSyncCache_.end()) {
356         DeleteWaterMark waterMark;
357         waterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
358         int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, waterMark);
359         if (errCode == -E_NOT_FOUND) {
360             deleteWaterMark.sendWaterMark = 0;
361             deleteWaterMark.recvWaterMark = 0;
362             errCode = E_OK;
363         }
364         if (errCode != E_OK) {
365             LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
366             return errCode;
367         }
368         deleteSyncCache_.insert(std::pair<DeviceID, DeleteWaterMark>(hashDeviceId, waterMark));
369     }
370     deleteWaterMark = deleteSyncCache_[hashDeviceId];
371     return E_OK;
372 }
373 
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)374 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
375     DeleteWaterMark &deleteWaterMark)
376 {
377     Key dbKey;
378     DBCommon::StringToVector(hashDeviceId, dbKey);
379     // search in db
380     Value dbValue;
381     int errCode = GetMetadataFromDb(dbKey, dbValue);
382     if (errCode != E_OK) {
383         return errCode;
384     }
385     // serialize value
386     return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
387 }
388 
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)389 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
390     const DeleteWaterMark &deleteWaterMark)
391 {
392     // serialize value
393     Value dbValue;
394     int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
395     if (errCode != E_OK) {
396         return errCode;
397     }
398     Key dbKey;
399     DBCommon::StringToVector(hashDeviceId, dbKey);
400     // save
401     errCode = SetMetadataToDb(dbKey, dbValue);
402     if (errCode != E_OK) {
403         LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
404     }
405     return errCode;
406 }
407 
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,DeviceID & hashDeleteSyncId)408 void QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, DeviceID &hashDeleteSyncId)
409 {
410     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
411     if (deviceIdToHashDeleteSyncIdMap_.count(deviceId) == 0) {
412         hashDeleteSyncId = DELETE_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
413         deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeleteSyncId));
414     } else {
415         hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceId];
416     }
417 }
418 
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)419 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
420     std::vector<uint8_t> &outValue)
421 {
422     uint64_t length = CalculateDeleteWaterMarkSize();
423     outValue.resize(length);
424     Parcel parcel(outValue.data(), outValue.size());
425     parcel.WriteUInt32(deleteWaterMark.version);
426     parcel.EightByteAlign();
427     parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
428     parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
429     if (parcel.IsError()) {
430         LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
431         return -E_PARSE_FAIL;
432     }
433     return E_OK;
434 }
435 
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)436 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
437     DeleteWaterMark &deleteWaterMark)
438 {
439     Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
440     parcel.ReadUInt32(deleteWaterMark.version);
441     parcel.EightByteAlign();
442     parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
443     parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
444     if (parcel.IsError()) {
445         LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
446         return -E_PARSE_FAIL;
447     }
448     return E_OK;
449 }
450 
CalculateDeleteWaterMarkSize()451 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
452 {
453     uint64_t length = Parcel::GetUInt32Len(); // version
454     length = Parcel::GetEightByteAlign(length);
455     length += Parcel::GetUInt64Len(); // sendWaterMark
456     length += Parcel::GetUInt64Len(); // recvWaterMark
457     return length;
458 }
459 
GetQuerySyncPrefixKey()460 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
461 {
462     return QUERY_SYNC_PREFIX_KEY;
463 }
464 
GetDeleteSyncPrefixKey()465 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
466 {
467     return DELETE_SYNC_PREFIX_KEY;
468 }
469 
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)470 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
471 {
472     if (querySyncIds.size() < MAX_STORE_ITEMS) {
473         return E_OK;
474     }
475     std::vector<std::pair<std::string, Timestamp>> allItems;
476     std::map<std::string, std::vector<uint8_t>> idMap;
477     std::vector<std::vector<uint8_t>> waitToRemove;
478     for (const auto &id : querySyncIds) {
479         Value value;
480         int errCode = GetMetadataFromDb(id, value);
481         if (errCode != E_OK) {
482             waitToRemove.push_back(id);
483             continue; // may be this failure cause by wrong data
484         }
485         QueryWaterMark queryWaterMark;
486         std::string queryKey(id.begin(), id.end());
487         errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
488         if (errCode != E_OK) {
489             waitToRemove.push_back(id);
490             continue; // may be this failure cause by wrong data
491         }
492         idMap.insert({queryKey, id});
493         allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
494     }
495     // we only remove broken data below
496     // 1. common data size less then 10w
497     // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
498     // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
499     if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
500         // remove in db
501         return DeleteMetaDataFromDB(waitToRemove);
502     }
503     uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
504     // quick select the k_th least used
505     std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
506         [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
507             return w1.second < w2.second;
508         });
509     for (uint32_t i = 0; i < removeCount; ++i) {
510         waitToRemove.push_back(idMap[allItems[i].first]);
511     }
512     // remove in db
513     return DeleteMetaDataFromDB(waitToRemove);
514 }
515 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName)516 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName)
517 {
518     // lock prevent other thread modify queryWaterMark at this moment
519     {
520         std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
521         std::string prefixKeyStr = QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
522         if (!tableName.empty()) {
523             std::string hashTableName = DBCommon::TransferHashString(tableName);
524             std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
525             prefixKeyStr += hexTableName;
526         }
527 
528         // remove in db
529         Key prefixKey;
530         DBCommon::StringToVector(prefixKeyStr, prefixKey);
531         int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
532         if (errCode != E_OK) {
533             LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
534             return errCode;
535         }
536         // clean cache
537         querySyncCache_.RemoveWithPrefixKey(prefixKeyStr);
538     }
539     return E_OK;
540 }
541 }  // namespace DistributedDB