1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "query_sync_water_mark_helper.h"
17
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25
26 namespace DistributedDB {
27 namespace {
28 const uint32_t MAX_STORE_ITEMS = 100000;
29 // WaterMark Version
30 constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31 constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 }
33
QuerySyncWaterMarkHelper()34 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
35 : storage_(nullptr)
36 {}
37
~QuerySyncWaterMarkHelper()38 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
39 {
40 storage_ = nullptr;
41 deviceIdToHashQuerySyncIdMap_.clear();
42 deviceIdToHashDeleteSyncIdMap_.clear();
43 }
44
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)45 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
46 {
47 if (storage_ == nullptr) {
48 return -E_INVALID_DB;
49 }
50 return storage_->GetMetaData(key, outValue);
51 }
52
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)53 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
54 {
55 if (storage_ == nullptr) {
56 return -E_INVALID_DB;
57 }
58 return storage_->PutMetaData(key, inValue, false);
59 }
60
DeleteMetaDataFromDB(const std::vector<Key> & keys) const61 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
62 {
63 if (storage_ == nullptr) {
64 return -E_INVALID_DB;
65 }
66 return storage_->DeleteMetaData(keys);
67 }
68
Initialize(ISyncInterface * storage)69 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
70 {
71 storage_ = storage;
72 return E_OK;
73 }
74
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)75 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
76 {
77 std::vector<uint8_t> value;
78 int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
79 if (errCode != E_OK) {
80 return errCode;
81 }
82 DeleteWaterMark deleteWaterMark;
83 std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
84 errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
85 if (errCode != E_OK) {
86 return errCode;
87 }
88 return errCode;
89 }
90
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)91 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
92 QueryWaterMark &queryWaterMark)
93 {
94 // second get from db
95 int errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
96 if (errCode == -E_NOT_FOUND) {
97 // third generate one and save to db
98 errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
99 }
100 // something error return
101 if (errCode != E_OK) {
102 LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
103 }
104 return errCode;
105 }
106
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,QueryWaterMark & queryWaterMark)107 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
108 QueryWaterMark &queryWaterMark)
109 {
110 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
111 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
112 return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
113 }
114
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)115 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
116 const std::string &deviceId, const WaterMark &waterMark)
117 {
118 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
119 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
120 return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
121 }
122
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)123 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
124 const std::string &deviceId, const Timestamp ×tamp)
125 {
126 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
127 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
128 QueryWaterMark queryWaterMark;
129 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
130 if (errCode != E_OK) {
131 return errCode;
132 }
133 queryWaterMark.lastQueryTime = timestamp;
134 return UpdateCacheAndSave(cacheKey, queryWaterMark);
135 }
136
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)137 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
138 const WaterMark &waterMark)
139 {
140 QueryWaterMark queryWaterMark;
141 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
142 if (errCode != E_OK) {
143 return errCode;
144 }
145 queryWaterMark.recvWaterMark = waterMark;
146 return UpdateCacheAndSave(cacheKey, queryWaterMark);
147 }
148
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)149 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
150 const std::string &deviceId, const WaterMark &waterMark)
151 {
152 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
153 QueryWaterMark queryWaterMark;
154 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
155 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
156 if (errCode != E_OK) {
157 return errCode;
158 }
159 queryWaterMark.sendWaterMark = waterMark;
160 return UpdateCacheAndSave(cacheKey, queryWaterMark);
161 }
162
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)163 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
164 QueryWaterMark &queryWaterMark)
165 {
166 // update lastUsedTime
167 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
168 if (errCode != E_OK) {
169 return errCode;
170 }
171 // save db
172 return SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
173 }
174
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)175 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
176 {
177 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
178 if (errCode != E_OK) {
179 return errCode;
180 }
181 queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
182 return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
183 }
184
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)185 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
186 {
187 // serialize value
188 Value dbValue;
189 int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
190 if (errCode != E_OK) {
191 return errCode;
192 }
193 // serialize key
194 Key dbKey;
195 DBCommon::StringToVector(dbKeyString, dbKey);
196 // save
197 errCode = SetMetadataToDb(dbKey, dbValue);
198 if (errCode != E_OK) {
199 LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
200 }
201 return errCode;
202 }
203
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)204 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
205 {
206 // serialize key
207 Key dbKey;
208 DBCommon::StringToVector(dbKeyString, dbKey);
209 // search in db
210 Value dbValue;
211 int errCode = GetMetadataFromDb(dbKey, dbValue);
212 if (errCode != E_OK) {
213 return errCode;
214 }
215 return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
216 }
217
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)218 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
219 {
220 uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
221 outValue.resize(length);
222 Parcel parcel(outValue.data(), outValue.size());
223 parcel.WriteUInt32(queryWaterMark.version);
224 parcel.EightByteAlign();
225 parcel.WriteUInt64(queryWaterMark.sendWaterMark);
226 parcel.WriteUInt64(queryWaterMark.recvWaterMark);
227 parcel.WriteUInt64(queryWaterMark.lastUsedTime);
228 parcel.WriteString(queryWaterMark.sql);
229 parcel.WriteUInt64(queryWaterMark.lastQueryTime);
230 if (parcel.IsError()) {
231 LOGE("[Meta] Parcel error when serialize queryWaterMark");
232 return -E_PARSE_FAIL;
233 }
234 return E_OK;
235 }
236
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)237 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
238 {
239 Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
240 parcel.ReadUInt32(queryWaterMark.version);
241 parcel.EightByteAlign();
242 parcel.ReadUInt64(queryWaterMark.sendWaterMark);
243 parcel.ReadUInt64(queryWaterMark.recvWaterMark);
244 parcel.ReadUInt64(queryWaterMark.lastUsedTime);
245 parcel.ReadString(queryWaterMark.sql);
246 if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
247 parcel.ReadUInt64(queryWaterMark.lastQueryTime);
248 }
249 if (parcel.IsError()) {
250 LOGE("[Meta] Parcel error when deserialize queryWaterMark");
251 return -E_PARSE_FAIL;
252 }
253 return E_OK;
254 }
255
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)256 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
257 {
258 uint64_t length = Parcel::GetUInt32Len(); // version
259 length = Parcel::GetEightByteAlign(length);
260 length += Parcel::GetUInt64Len(); // sendWaterMark
261 length += Parcel::GetUInt64Len(); // recvWaterMark
262 length += Parcel::GetUInt64Len(); // lastUsedTime
263 length += Parcel::GetStringLen(queryWaterMark.sql);
264 length += Parcel::GetUInt64Len(); // lastQueryTime
265 return length;
266 }
267
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & queryId)268 DeviceID QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &queryId)
269 {
270 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
271 DeviceID hashQuerySyncId;
272 if (deviceIdToHashQuerySyncIdMap_[deviceId].count(queryId) == 0) {
273 // do not modify this
274 hashQuerySyncId = DBConstant::QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId;
275 deviceIdToHashQuerySyncIdMap_[deviceId][queryId] = hashQuerySyncId;
276 } else {
277 hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceId][queryId];
278 }
279 return hashQuerySyncId;
280 }
281
GetDeleteSyncWaterMark(const std::string & deviceId,DeleteWaterMark & deleteWaterMark)282 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark)
283 {
284 std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
285 // lock prevent different thread visit deleteSyncCache_
286 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
287 return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
288 }
289
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)290 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
291 {
292 std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
293 DeleteWaterMark deleteWaterMark;
294 // lock prevent different thread visit deleteSyncCache_
295 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
296 int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
297 if (errCode != E_OK) {
298 return errCode;
299 }
300 deleteWaterMark.sendWaterMark = waterMark;
301 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
302 }
303
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)304 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark,
305 bool isNeedHash)
306 {
307 std::string hashId = GetHashDeleteSyncDeviceId(deviceId, isNeedHash);
308 DeleteWaterMark deleteWaterMark;
309 // lock prevent different thread visit deleteSyncCache_
310 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
311 int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
312 if (errCode != E_OK) {
313 return errCode;
314 }
315 deleteWaterMark.recvWaterMark = waterMark;
316 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
317 }
318
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)319 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
320 const DeleteWaterMark &deleteWaterMark)
321 {
322 // save db
323 return SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
324 }
325
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)326 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
327 DeleteWaterMark &deleteWaterMark)
328 {
329 deleteWaterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
330 int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, deleteWaterMark);
331 if (errCode == -E_NOT_FOUND) {
332 deleteWaterMark.sendWaterMark = 0;
333 deleteWaterMark.recvWaterMark = 0;
334 errCode = E_OK;
335 }
336 if (errCode != E_OK) {
337 LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
338 }
339 return errCode;
340 }
341
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)342 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
343 DeleteWaterMark &deleteWaterMark)
344 {
345 Key dbKey;
346 DBCommon::StringToVector(hashDeviceId, dbKey);
347 // search in db
348 Value dbValue;
349 int errCode = GetMetadataFromDb(dbKey, dbValue);
350 if (errCode != E_OK) {
351 return errCode;
352 }
353 // serialize value
354 return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
355 }
356
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)357 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
358 const DeleteWaterMark &deleteWaterMark)
359 {
360 // serialize value
361 Value dbValue;
362 int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
363 if (errCode != E_OK) {
364 return errCode;
365 }
366 Key dbKey;
367 DBCommon::StringToVector(hashDeviceId, dbKey);
368 // save
369 errCode = SetMetadataToDb(dbKey, dbValue);
370 if (errCode != E_OK) {
371 LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
372 }
373 return errCode;
374 }
375
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,bool isNeedHash)376 DeviceID QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, bool isNeedHash)
377 {
378 DeviceID hashDeleteSyncId;
379 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
380 if (deviceIdToHashDeleteSyncIdMap_.count(deviceId) == 0) {
381 hashDeleteSyncId = DBConstant::DELETE_SYNC_PREFIX_KEY +
382 (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
383 deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeleteSyncId));
384 } else {
385 hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceId];
386 }
387 return hashDeleteSyncId;
388 }
389
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)390 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
391 std::vector<uint8_t> &outValue)
392 {
393 uint64_t length = CalculateDeleteWaterMarkSize();
394 outValue.resize(length);
395 Parcel parcel(outValue.data(), outValue.size());
396 parcel.WriteUInt32(deleteWaterMark.version);
397 parcel.EightByteAlign();
398 parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
399 parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
400 if (parcel.IsError()) {
401 LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
402 return -E_PARSE_FAIL;
403 }
404 return E_OK;
405 }
406
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)407 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
408 DeleteWaterMark &deleteWaterMark)
409 {
410 Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
411 parcel.ReadUInt32(deleteWaterMark.version);
412 parcel.EightByteAlign();
413 parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
414 parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
415 if (parcel.IsError()) {
416 LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
417 return -E_PARSE_FAIL;
418 }
419 return E_OK;
420 }
421
CalculateDeleteWaterMarkSize()422 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
423 {
424 uint64_t length = Parcel::GetUInt32Len(); // version
425 length = Parcel::GetEightByteAlign(length);
426 length += Parcel::GetUInt64Len(); // sendWaterMark
427 length += Parcel::GetUInt64Len(); // recvWaterMark
428 return length;
429 }
430
GetQuerySyncPrefixKey()431 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
432 {
433 return DBConstant::QUERY_SYNC_PREFIX_KEY;
434 }
435
GetDeleteSyncPrefixKey()436 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
437 {
438 return DBConstant::DELETE_SYNC_PREFIX_KEY;
439 }
440
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)441 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
442 {
443 if (querySyncIds.size() < MAX_STORE_ITEMS) {
444 return E_OK;
445 }
446 std::vector<std::pair<std::string, Timestamp>> allItems;
447 std::map<std::string, std::vector<uint8_t>> idMap;
448 std::vector<std::vector<uint8_t>> waitToRemove;
449 for (const auto &id : querySyncIds) {
450 Value value;
451 int errCode = GetMetadataFromDb(id, value);
452 if (errCode != E_OK) {
453 waitToRemove.push_back(id);
454 continue; // may be this failure cause by wrong data
455 }
456 QueryWaterMark queryWaterMark;
457 std::string queryKey(id.begin(), id.end());
458 errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
459 if (errCode != E_OK) {
460 waitToRemove.push_back(id);
461 continue; // may be this failure cause by wrong data
462 }
463 idMap.insert({queryKey, id});
464 allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
465 }
466 // we only remove broken data below
467 // 1. common data size less then 10w
468 // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
469 // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
470 if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
471 // remove in db
472 return DeleteMetaDataFromDB(waitToRemove);
473 }
474 uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
475 // quick select the k_th least used
476 std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
477 [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
478 return w1.second < w2.second;
479 });
480 for (uint32_t i = 0; i < removeCount; ++i) {
481 waitToRemove.push_back(idMap[allItems[i].first]);
482 }
483 // remove in db
484 return DeleteMetaDataFromDB(waitToRemove);
485 }
486
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)487 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName,
488 bool isNeedHash)
489 {
490 // lock prevent other thread modify queryWaterMark at this moment
491 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
492 std::string prefixKeyStr = DBConstant::QUERY_SYNC_PREFIX_KEY +
493 (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
494 if (!tableName.empty()) {
495 std::string hashTableName = DBCommon::TransferHashString(tableName);
496 std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
497 prefixKeyStr += hexTableName;
498 }
499
500 // remove in db
501 Key prefixKey;
502 DBCommon::StringToVector(prefixKeyStr, prefixKey);
503 int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
504 if (errCode != E_OK) {
505 LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
506 return errCode;
507 }
508 return E_OK;
509 }
510 } // namespace DistributedDB