1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "query_sync_water_mark_helper.h"
17
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25
26 namespace DistributedDB {
27 namespace {
28 const uint32_t MAX_STORE_ITEMS = 100000;
29 // WaterMark Version
30 constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31 constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 // Prefix Key in db
33 const std::string QUERY_SYNC_PREFIX_KEY = "querySync";
34 const std::string DELETE_SYNC_PREFIX_KEY = "deleteSync";
35 }
36
QuerySyncWaterMarkHelper()37 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
38 : storage_(nullptr)
39 {}
40
~QuerySyncWaterMarkHelper()41 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
42 {
43 storage_ = nullptr;
44 deviceIdToHashQuerySyncIdMap_.clear();
45 deleteSyncCache_.clear();
46 deviceIdToHashDeleteSyncIdMap_.clear();
47 }
48
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)49 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
50 {
51 if (storage_ == nullptr) {
52 return -E_INVALID_DB;
53 }
54 return storage_->GetMetaData(key, outValue);
55 }
56
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)57 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
58 {
59 if (storage_ == nullptr) {
60 return -E_INVALID_DB;
61 }
62 return storage_->PutMetaData(key, inValue);
63 }
64
DeleteMetaDataFromDB(const std::vector<Key> & keys) const65 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
66 {
67 if (storage_ == nullptr) {
68 return -E_INVALID_DB;
69 }
70 return storage_->DeleteMetaData(keys);
71 }
72
Initialize(ISyncInterface * storage)73 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
74 {
75 storage_ = storage;
76 return E_OK;
77 }
78
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)79 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
80 {
81 std::vector<uint8_t> value;
82 int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
83 if (errCode != E_OK) {
84 return errCode;
85 }
86 DeleteWaterMark deleteWaterMark;
87 std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
88 errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
89 if (errCode != E_OK) {
90 return errCode;
91 }
92 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
93 deleteSyncCache_[dbKey] = deleteWaterMark;
94 return errCode;
95 }
96
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)97 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
98 QueryWaterMark &queryWaterMark)
99 {
100 // first get from cache_
101 int errCode = querySyncCache_.Get(cacheKey, queryWaterMark);
102 bool addToCache = false;
103 if (errCode == -E_NOT_FOUND) {
104 // second get from db
105 errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
106 addToCache = true;
107 }
108 if (errCode == -E_NOT_FOUND) {
109 // third generate one and save to db
110 errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
111 }
112 // something error return
113 if (errCode != E_OK) {
114 LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
115 return errCode;
116 }
117 // remember add to cache_
118 if (addToCache) {
119 querySyncCache_.Put(cacheKey, queryWaterMark);
120 }
121 return errCode;
122 }
123
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,QueryWaterMark & queryWaterMark)124 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
125 QueryWaterMark &queryWaterMark)
126 {
127 std::string cacheKey;
128 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
129 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
130 return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
131 }
132
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)133 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
134 const std::string &deviceId, const WaterMark &waterMark)
135 {
136 std::string cacheKey;
137 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
138 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
139 return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
140 }
141
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)142 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
143 const std::string &deviceId, const Timestamp ×tamp)
144 {
145 std::string cacheKey;
146 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
147 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
148 QueryWaterMark queryWaterMark;
149 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
150 if (errCode != E_OK) {
151 return errCode;
152 }
153 queryWaterMark.lastQueryTime = timestamp;
154 return UpdateCacheAndSave(cacheKey, queryWaterMark);
155 }
156
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)157 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
158 const WaterMark &waterMark)
159 {
160 QueryWaterMark queryWaterMark;
161 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
162 if (errCode != E_OK) {
163 return errCode;
164 }
165 queryWaterMark.recvWaterMark = waterMark;
166 return UpdateCacheAndSave(cacheKey, queryWaterMark);
167 }
168
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)169 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
170 const std::string &deviceId, const WaterMark &waterMark)
171 {
172 std::string cacheKey;
173 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
174 QueryWaterMark queryWaterMark;
175 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
176 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
177 if (errCode != E_OK) {
178 return errCode;
179 }
180 queryWaterMark.sendWaterMark = waterMark;
181 return UpdateCacheAndSave(cacheKey, queryWaterMark);
182 }
183
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)184 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
185 QueryWaterMark &queryWaterMark)
186 {
187 // update lastUsedTime
188 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
189 if (errCode != E_OK) {
190 return errCode;
191 }
192 // save db first
193 errCode = SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
194 if (errCode != E_OK) {
195 return errCode;
196 }
197 querySyncCache_.Put(cacheKey, queryWaterMark);
198 return errCode;
199 }
200
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)201 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
202 {
203 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
204 if (errCode != E_OK) {
205 return errCode;
206 }
207 queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
208 return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
209 }
210
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)211 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
212 {
213 // serialize value
214 Value dbValue;
215 int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
216 if (errCode != E_OK) {
217 return errCode;
218 }
219 // serialize key
220 Key dbKey;
221 DBCommon::StringToVector(dbKeyString, dbKey);
222 // save
223 errCode = SetMetadataToDb(dbKey, dbValue);
224 if (errCode != E_OK) {
225 LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
226 }
227 return errCode;
228 }
229
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)230 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
231 {
232 // serialize key
233 Key dbKey;
234 DBCommon::StringToVector(dbKeyString, dbKey);
235 // search in db
236 Value dbValue;
237 int errCode = GetMetadataFromDb(dbKey, dbValue);
238 if (errCode != E_OK) {
239 return errCode;
240 }
241 return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
242 }
243
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)244 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
245 {
246 uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
247 outValue.resize(length);
248 Parcel parcel(outValue.data(), outValue.size());
249 parcel.WriteUInt32(queryWaterMark.version);
250 parcel.EightByteAlign();
251 parcel.WriteUInt64(queryWaterMark.sendWaterMark);
252 parcel.WriteUInt64(queryWaterMark.recvWaterMark);
253 parcel.WriteUInt64(queryWaterMark.lastUsedTime);
254 parcel.WriteString(queryWaterMark.sql);
255 parcel.WriteUInt64(queryWaterMark.lastQueryTime);
256 if (parcel.IsError()) {
257 LOGE("[Meta] Parcel error when serialize queryWaterMark");
258 return -E_PARSE_FAIL;
259 }
260 return E_OK;
261 }
262
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)263 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
264 {
265 Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
266 parcel.ReadUInt32(queryWaterMark.version);
267 parcel.EightByteAlign();
268 parcel.ReadUInt64(queryWaterMark.sendWaterMark);
269 parcel.ReadUInt64(queryWaterMark.recvWaterMark);
270 parcel.ReadUInt64(queryWaterMark.lastUsedTime);
271 parcel.ReadString(queryWaterMark.sql);
272 if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
273 parcel.ReadUInt64(queryWaterMark.lastQueryTime);
274 }
275 if (parcel.IsError()) {
276 LOGE("[Meta] Parcel error when deserialize queryWaterMark");
277 return -E_PARSE_FAIL;
278 }
279 return E_OK;
280 }
281
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)282 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
283 {
284 uint64_t length = Parcel::GetUInt32Len(); // version
285 length = Parcel::GetEightByteAlign(length);
286 length += Parcel::GetUInt64Len(); // sendWaterMark
287 length += Parcel::GetUInt64Len(); // recvWaterMark
288 length += Parcel::GetUInt64Len(); // lastUsedTime
289 length += Parcel::GetStringLen(queryWaterMark.sql);
290 length += Parcel::GetUInt64Len(); // lastQueryTime
291 return length;
292 }
293
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & queryId,DeviceID & hashQuerySyncId)294 void QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId,
295 const DeviceID &queryId, DeviceID &hashQuerySyncId)
296 {
297 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
298 if (deviceIdToHashQuerySyncIdMap_[deviceId].count(queryId) == 0) {
299 // do not modify this
300 hashQuerySyncId = QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId;
301 deviceIdToHashQuerySyncIdMap_[deviceId][queryId] = hashQuerySyncId;
302 } else {
303 hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceId][queryId];
304 }
305 }
306
GetDeleteSyncWaterMark(const std::string & deviceId,DeleteWaterMark & deleteWaterMark)307 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark)
308 {
309 std::string hashId;
310 GetHashDeleteSyncDeviceId(deviceId, hashId);
311 return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
312 }
313
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)314 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
315 {
316 std::string hashId;
317 GetHashDeleteSyncDeviceId(deviceId, hashId);
318 DeleteWaterMark deleteWaterMark;
319 GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
320 deleteWaterMark.sendWaterMark = waterMark;
321 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
322 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
323 }
324
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)325 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
326 {
327 std::string hashId;
328 GetHashDeleteSyncDeviceId(deviceId, hashId);
329 DeleteWaterMark deleteWaterMark;
330 GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
331 deleteWaterMark.recvWaterMark = waterMark;
332 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
333 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
334 }
335
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)336 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
337 const DeleteWaterMark &deleteWaterMark)
338 {
339 // save db first
340 int errCode = SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
341 if (errCode != E_OK) {
342 return errCode;
343 }
344 // modify cache
345 deleteSyncCache_[dbKey] = deleteWaterMark;
346 return errCode;
347 }
348
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)349 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
350 DeleteWaterMark &deleteWaterMark)
351 {
352 // lock prevent different thread visit deleteSyncCache_
353 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
354 // if not found
355 if (deleteSyncCache_.find(hashDeviceId) == deleteSyncCache_.end()) {
356 DeleteWaterMark waterMark;
357 waterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
358 int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, waterMark);
359 if (errCode == -E_NOT_FOUND) {
360 deleteWaterMark.sendWaterMark = 0;
361 deleteWaterMark.recvWaterMark = 0;
362 errCode = E_OK;
363 }
364 if (errCode != E_OK) {
365 LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
366 return errCode;
367 }
368 deleteSyncCache_.insert(std::pair<DeviceID, DeleteWaterMark>(hashDeviceId, waterMark));
369 }
370 deleteWaterMark = deleteSyncCache_[hashDeviceId];
371 return E_OK;
372 }
373
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)374 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
375 DeleteWaterMark &deleteWaterMark)
376 {
377 Key dbKey;
378 DBCommon::StringToVector(hashDeviceId, dbKey);
379 // search in db
380 Value dbValue;
381 int errCode = GetMetadataFromDb(dbKey, dbValue);
382 if (errCode != E_OK) {
383 return errCode;
384 }
385 // serialize value
386 return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
387 }
388
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)389 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
390 const DeleteWaterMark &deleteWaterMark)
391 {
392 // serialize value
393 Value dbValue;
394 int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
395 if (errCode != E_OK) {
396 return errCode;
397 }
398 Key dbKey;
399 DBCommon::StringToVector(hashDeviceId, dbKey);
400 // save
401 errCode = SetMetadataToDb(dbKey, dbValue);
402 if (errCode != E_OK) {
403 LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
404 }
405 return errCode;
406 }
407
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,DeviceID & hashDeleteSyncId)408 void QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, DeviceID &hashDeleteSyncId)
409 {
410 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
411 if (deviceIdToHashDeleteSyncIdMap_.count(deviceId) == 0) {
412 hashDeleteSyncId = DELETE_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
413 deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeleteSyncId));
414 } else {
415 hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceId];
416 }
417 }
418
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)419 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
420 std::vector<uint8_t> &outValue)
421 {
422 uint64_t length = CalculateDeleteWaterMarkSize();
423 outValue.resize(length);
424 Parcel parcel(outValue.data(), outValue.size());
425 parcel.WriteUInt32(deleteWaterMark.version);
426 parcel.EightByteAlign();
427 parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
428 parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
429 if (parcel.IsError()) {
430 LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
431 return -E_PARSE_FAIL;
432 }
433 return E_OK;
434 }
435
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)436 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
437 DeleteWaterMark &deleteWaterMark)
438 {
439 Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
440 parcel.ReadUInt32(deleteWaterMark.version);
441 parcel.EightByteAlign();
442 parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
443 parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
444 if (parcel.IsError()) {
445 LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
446 return -E_PARSE_FAIL;
447 }
448 return E_OK;
449 }
450
CalculateDeleteWaterMarkSize()451 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
452 {
453 uint64_t length = Parcel::GetUInt32Len(); // version
454 length = Parcel::GetEightByteAlign(length);
455 length += Parcel::GetUInt64Len(); // sendWaterMark
456 length += Parcel::GetUInt64Len(); // recvWaterMark
457 return length;
458 }
459
GetQuerySyncPrefixKey()460 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
461 {
462 return QUERY_SYNC_PREFIX_KEY;
463 }
464
GetDeleteSyncPrefixKey()465 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
466 {
467 return DELETE_SYNC_PREFIX_KEY;
468 }
469
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)470 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
471 {
472 if (querySyncIds.size() < MAX_STORE_ITEMS) {
473 return E_OK;
474 }
475 std::vector<std::pair<std::string, Timestamp>> allItems;
476 std::map<std::string, std::vector<uint8_t>> idMap;
477 std::vector<std::vector<uint8_t>> waitToRemove;
478 for (const auto &id : querySyncIds) {
479 Value value;
480 int errCode = GetMetadataFromDb(id, value);
481 if (errCode != E_OK) {
482 waitToRemove.push_back(id);
483 continue; // may be this failure cause by wrong data
484 }
485 QueryWaterMark queryWaterMark;
486 std::string queryKey(id.begin(), id.end());
487 errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
488 if (errCode != E_OK) {
489 waitToRemove.push_back(id);
490 continue; // may be this failure cause by wrong data
491 }
492 idMap.insert({queryKey, id});
493 allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
494 }
495 // we only remove broken data below
496 // 1. common data size less then 10w
497 // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
498 // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
499 if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
500 // remove in db
501 return DeleteMetaDataFromDB(waitToRemove);
502 }
503 uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
504 // quick select the k_th least used
505 std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
506 [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
507 return w1.second < w2.second;
508 });
509 for (uint32_t i = 0; i < removeCount; ++i) {
510 waitToRemove.push_back(idMap[allItems[i].first]);
511 }
512 // remove in db
513 return DeleteMetaDataFromDB(waitToRemove);
514 }
515
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName)516 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName)
517 {
518 // lock prevent other thread modify queryWaterMark at this moment
519 {
520 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
521 std::string prefixKeyStr = QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
522 if (!tableName.empty()) {
523 std::string hashTableName = DBCommon::TransferHashString(tableName);
524 std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
525 prefixKeyStr += hexTableName;
526 }
527
528 // remove in db
529 Key prefixKey;
530 DBCommon::StringToVector(prefixKeyStr, prefixKey);
531 int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
532 if (errCode != E_OK) {
533 LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
534 return errCode;
535 }
536 // clean cache
537 querySyncCache_.RemoveWithPrefixKey(prefixKeyStr);
538 }
539 return E_OK;
540 }
541 } // namespace DistributedDB