1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "query_sync_water_mark_helper.h"
17
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25
26 namespace DistributedDB {
27 namespace {
28 const uint32_t MAX_STORE_ITEMS = 100000;
29 // WaterMark Version
30 constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31 constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 // Prefix Key in db
33 const std::string QUERY_SYNC_PREFIX_KEY = "querySync";
34 const std::string DELETE_SYNC_PREFIX_KEY = "deleteSync";
35 }
36
QuerySyncWaterMarkHelper()37 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
38 : storage_(nullptr)
39 {}
40
~QuerySyncWaterMarkHelper()41 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
42 {
43 storage_ = nullptr;
44 deviceIdToHashQuerySyncIdMap_.clear();
45 deleteSyncCache_.clear();
46 deviceIdToHashDeleteSyncIdMap_.clear();
47 }
48
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)49 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
50 {
51 if (storage_ == nullptr) {
52 return -E_INVALID_DB;
53 }
54 return storage_->GetMetaData(key, outValue);
55 }
56
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)57 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
58 {
59 if (storage_ == nullptr) {
60 return -E_INVALID_DB;
61 }
62 return storage_->PutMetaData(key, inValue);
63 }
64
DeleteMetaDataFromDB(const std::vector<Key> & keys) const65 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
66 {
67 if (storage_ == nullptr) {
68 return -E_INVALID_DB;
69 }
70 return storage_->DeleteMetaData(keys);
71 }
72
Initialize(ISyncInterface * storage)73 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
74 {
75 storage_ = storage;
76 return E_OK;
77 }
78
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)79 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
80 {
81 std::vector<uint8_t> value;
82 int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
83 if (errCode != E_OK) {
84 return errCode;
85 }
86 DeleteWaterMark deleteWaterMark;
87 std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
88 errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
89 if (errCode != E_OK) {
90 return errCode;
91 }
92 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
93 deleteSyncCache_[dbKey] = deleteWaterMark;
94 return errCode;
95 }
96
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)97 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
98 QueryWaterMark &queryWaterMark)
99 {
100 // first get from cache_
101 int errCode = querySyncCache_.Get(cacheKey, queryWaterMark);
102 bool addToCache = false;
103 if (errCode == -E_NOT_FOUND) {
104 // second get from db
105 errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
106 addToCache = true;
107 }
108 if (errCode == -E_NOT_FOUND) {
109 // third generate one and save to db
110 errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
111 }
112 // something error return
113 if (errCode != E_OK) {
114 LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
115 return errCode;
116 }
117 // remember add to cache_
118 if (addToCache) {
119 querySyncCache_.Put(cacheKey, queryWaterMark);
120 }
121 return errCode;
122 }
123
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,QueryWaterMark & queryWaterMark)124 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
125 QueryWaterMark &queryWaterMark)
126 {
127 std::string cacheKey;
128 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
129 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
130 return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
131 }
132
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)133 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
134 const std::string &deviceId, const WaterMark &waterMark)
135 {
136 std::string cacheKey;
137 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
138 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
139 return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
140 }
141
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)142 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
143 const std::string &deviceId, const Timestamp ×tamp)
144 {
145 std::string cacheKey;
146 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
147 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
148 QueryWaterMark queryWaterMark;
149 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
150 if (errCode != E_OK) {
151 return errCode;
152 }
153 queryWaterMark.lastQueryTime = timestamp;
154 return UpdateCacheAndSave(cacheKey, queryWaterMark);
155 }
156
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)157 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
158 const WaterMark &waterMark)
159 {
160 QueryWaterMark queryWaterMark;
161 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
162 if (errCode != E_OK) {
163 return errCode;
164 }
165 queryWaterMark.recvWaterMark = waterMark;
166 return UpdateCacheAndSave(cacheKey, queryWaterMark);
167 }
168
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)169 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
170 const std::string &deviceId, const WaterMark &waterMark)
171 {
172 std::string cacheKey;
173 GetHashQuerySyncDeviceId(deviceId, queryIdentify, cacheKey);
174 QueryWaterMark queryWaterMark;
175 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
176 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
177 if (errCode != E_OK) {
178 return errCode;
179 }
180 queryWaterMark.sendWaterMark = waterMark;
181 return UpdateCacheAndSave(cacheKey, queryWaterMark);
182 }
183
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)184 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
185 QueryWaterMark &queryWaterMark)
186 {
187 // update lastUsedTime
188 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
189 if (errCode != E_OK) {
190 return errCode;
191 }
192 // save db first
193 errCode = SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
194 if (errCode != E_OK) {
195 return errCode;
196 }
197 querySyncCache_.Put(cacheKey, queryWaterMark);
198 return errCode;
199 }
200
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)201 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
202 {
203 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
204 if (errCode != E_OK) {
205 return errCode;
206 }
207 queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
208 return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
209 }
210
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)211 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
212 {
213 // serialize value
214 Value dbValue;
215 int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
216 if (errCode != E_OK) {
217 return errCode;
218 }
219 // serialize key
220 Key dbKey;
221 DBCommon::StringToVector(dbKeyString, dbKey);
222 // save
223 errCode = SetMetadataToDb(dbKey, dbValue);
224 if (errCode != E_OK) {
225 LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
226 }
227 return errCode;
228 }
229
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)230 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
231 {
232 // serialize key
233 Key dbKey;
234 DBCommon::StringToVector(dbKeyString, dbKey);
235 // search in db
236 Value dbValue;
237 int errCode = GetMetadataFromDb(dbKey, dbValue);
238 if (errCode != E_OK) {
239 return errCode;
240 }
241 return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
242 }
243
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)244 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
245 {
246 uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
247 outValue.resize(length);
248 Parcel parcel(outValue.data(), outValue.size());
249 parcel.WriteUInt32(queryWaterMark.version);
250 parcel.EightByteAlign();
251 parcel.WriteUInt64(queryWaterMark.sendWaterMark);
252 parcel.WriteUInt64(queryWaterMark.recvWaterMark);
253 parcel.WriteUInt64(queryWaterMark.lastUsedTime);
254 parcel.WriteString(queryWaterMark.sql);
255 parcel.WriteUInt64(queryWaterMark.lastQueryTime);
256 if (parcel.IsError()) {
257 LOGE("[Meta] Parcel error when serialize queryWaterMark");
258 return -E_PARSE_FAIL;
259 }
260 return E_OK;
261 }
262
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)263 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
264 {
265 Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
266 parcel.ReadUInt32(queryWaterMark.version);
267 parcel.EightByteAlign();
268 parcel.ReadUInt64(queryWaterMark.sendWaterMark);
269 parcel.ReadUInt64(queryWaterMark.recvWaterMark);
270 parcel.ReadUInt64(queryWaterMark.lastUsedTime);
271 parcel.ReadString(queryWaterMark.sql);
272 if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
273 parcel.ReadUInt64(queryWaterMark.lastQueryTime);
274 }
275 if (parcel.IsError()) {
276 LOGE("[Meta] Parcel error when deserialize queryWaterMark");
277 return -E_PARSE_FAIL;
278 }
279 return E_OK;
280 }
281
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)282 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
283 {
284 uint64_t length = Parcel::GetUInt32Len(); // version
285 length = Parcel::GetEightByteAlign(length);
286 length += Parcel::GetUInt64Len(); // sendWaterMark
287 length += Parcel::GetUInt64Len(); // recvWaterMark
288 length += Parcel::GetUInt64Len(); // lastUsedTime
289 length += Parcel::GetStringLen(queryWaterMark.sql);
290 length += Parcel::GetUInt64Len(); // lastQueryTime
291 return length;
292 }
293
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & queryId,DeviceID & hashQuerySyncId)294 void QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId,
295 const DeviceID &queryId, DeviceID &hashQuerySyncId)
296 {
297 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
298 if (deviceIdToHashQuerySyncIdMap_[deviceId].count(queryId) == 0) {
299 // do not modify this
300 hashQuerySyncId = QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId;
301 deviceIdToHashQuerySyncIdMap_[deviceId][queryId] = hashQuerySyncId;
302 } else {
303 hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceId][queryId];
304 }
305 }
306
GetDeleteSyncWaterMark(const std::string & deviceId,DeleteWaterMark & deleteWaterMark)307 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark)
308 {
309 std::string hashId;
310 GetHashDeleteSyncDeviceId(deviceId, hashId);
311 return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
312 }
313
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)314 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
315 {
316 std::string hashId;
317 GetHashDeleteSyncDeviceId(deviceId, hashId);
318 DeleteWaterMark deleteWaterMark;
319 GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
320 deleteWaterMark.sendWaterMark = waterMark;
321 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
322 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
323 }
324
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)325 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark,
326 bool isNeedHash)
327 {
328 std::string hashId;
329 GetHashDeleteSyncDeviceId(deviceId, hashId, isNeedHash);
330 DeleteWaterMark deleteWaterMark;
331 GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
332 deleteWaterMark.recvWaterMark = waterMark;
333 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
334 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
335 }
336
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)337 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
338 const DeleteWaterMark &deleteWaterMark)
339 {
340 // save db first
341 int errCode = SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
342 if (errCode != E_OK) {
343 return errCode;
344 }
345 // modify cache
346 deleteSyncCache_[dbKey] = deleteWaterMark;
347 return errCode;
348 }
349
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)350 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
351 DeleteWaterMark &deleteWaterMark)
352 {
353 // lock prevent different thread visit deleteSyncCache_
354 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
355 // if not found
356 if (deleteSyncCache_.find(hashDeviceId) == deleteSyncCache_.end()) {
357 DeleteWaterMark waterMark;
358 waterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
359 int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, waterMark);
360 if (errCode == -E_NOT_FOUND) {
361 deleteWaterMark.sendWaterMark = 0;
362 deleteWaterMark.recvWaterMark = 0;
363 errCode = E_OK;
364 }
365 if (errCode != E_OK) {
366 LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
367 return errCode;
368 }
369 deleteSyncCache_.insert(std::pair<DeviceID, DeleteWaterMark>(hashDeviceId, waterMark));
370 }
371 deleteWaterMark = deleteSyncCache_[hashDeviceId];
372 return E_OK;
373 }
374
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)375 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
376 DeleteWaterMark &deleteWaterMark)
377 {
378 Key dbKey;
379 DBCommon::StringToVector(hashDeviceId, dbKey);
380 // search in db
381 Value dbValue;
382 int errCode = GetMetadataFromDb(dbKey, dbValue);
383 if (errCode != E_OK) {
384 return errCode;
385 }
386 // serialize value
387 return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
388 }
389
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)390 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
391 const DeleteWaterMark &deleteWaterMark)
392 {
393 // serialize value
394 Value dbValue;
395 int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
396 if (errCode != E_OK) {
397 return errCode;
398 }
399 Key dbKey;
400 DBCommon::StringToVector(hashDeviceId, dbKey);
401 // save
402 errCode = SetMetadataToDb(dbKey, dbValue);
403 if (errCode != E_OK) {
404 LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
405 }
406 return errCode;
407 }
408
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,DeviceID & hashDeleteSyncId,bool isNeedHash)409 void QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, DeviceID &hashDeleteSyncId,
410 bool isNeedHash)
411 {
412 if (!isNeedHash) {
413 hashDeleteSyncId = DELETE_SYNC_PREFIX_KEY + deviceId;
414 }
415 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
416 if (deviceIdToHashDeleteSyncIdMap_.count(deviceId) == 0) {
417 hashDeleteSyncId = DELETE_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
418 deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeleteSyncId));
419 } else {
420 hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceId];
421 }
422 }
423
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)424 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
425 std::vector<uint8_t> &outValue)
426 {
427 uint64_t length = CalculateDeleteWaterMarkSize();
428 outValue.resize(length);
429 Parcel parcel(outValue.data(), outValue.size());
430 parcel.WriteUInt32(deleteWaterMark.version);
431 parcel.EightByteAlign();
432 parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
433 parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
434 if (parcel.IsError()) {
435 LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
436 return -E_PARSE_FAIL;
437 }
438 return E_OK;
439 }
440
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)441 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
442 DeleteWaterMark &deleteWaterMark)
443 {
444 Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
445 parcel.ReadUInt32(deleteWaterMark.version);
446 parcel.EightByteAlign();
447 parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
448 parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
449 if (parcel.IsError()) {
450 LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
451 return -E_PARSE_FAIL;
452 }
453 return E_OK;
454 }
455
CalculateDeleteWaterMarkSize()456 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
457 {
458 uint64_t length = Parcel::GetUInt32Len(); // version
459 length = Parcel::GetEightByteAlign(length);
460 length += Parcel::GetUInt64Len(); // sendWaterMark
461 length += Parcel::GetUInt64Len(); // recvWaterMark
462 return length;
463 }
464
GetQuerySyncPrefixKey()465 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
466 {
467 return QUERY_SYNC_PREFIX_KEY;
468 }
469
GetDeleteSyncPrefixKey()470 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
471 {
472 return DELETE_SYNC_PREFIX_KEY;
473 }
474
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)475 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
476 {
477 if (querySyncIds.size() < MAX_STORE_ITEMS) {
478 return E_OK;
479 }
480 std::vector<std::pair<std::string, Timestamp>> allItems;
481 std::map<std::string, std::vector<uint8_t>> idMap;
482 std::vector<std::vector<uint8_t>> waitToRemove;
483 for (const auto &id : querySyncIds) {
484 Value value;
485 int errCode = GetMetadataFromDb(id, value);
486 if (errCode != E_OK) {
487 waitToRemove.push_back(id);
488 continue; // may be this failure cause by wrong data
489 }
490 QueryWaterMark queryWaterMark;
491 std::string queryKey(id.begin(), id.end());
492 errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
493 if (errCode != E_OK) {
494 waitToRemove.push_back(id);
495 continue; // may be this failure cause by wrong data
496 }
497 idMap.insert({queryKey, id});
498 allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
499 }
500 // we only remove broken data below
501 // 1. common data size less then 10w
502 // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
503 // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
504 if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
505 // remove in db
506 return DeleteMetaDataFromDB(waitToRemove);
507 }
508 uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
509 // quick select the k_th least used
510 std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
511 [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
512 return w1.second < w2.second;
513 });
514 for (uint32_t i = 0; i < removeCount; ++i) {
515 waitToRemove.push_back(idMap[allItems[i].first]);
516 }
517 // remove in db
518 return DeleteMetaDataFromDB(waitToRemove);
519 }
520
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)521 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName,
522 bool isNeedHash)
523 {
524 // lock prevent other thread modify queryWaterMark at this moment
525 {
526 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
527 std::string prefixKeyStr;
528 if (isNeedHash) {
529 prefixKeyStr = QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
530 } else {
531 prefixKeyStr = QUERY_SYNC_PREFIX_KEY + deviceId;
532 }
533 if (!tableName.empty()) {
534 std::string hashTableName = DBCommon::TransferHashString(tableName);
535 std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
536 prefixKeyStr += hexTableName;
537 }
538
539 // remove in db
540 Key prefixKey;
541 DBCommon::StringToVector(prefixKeyStr, prefixKey);
542 int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
543 if (errCode != E_OK) {
544 LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
545 return errCode;
546 }
547 // clean cache
548 querySyncCache_.RemoveWithPrefixKey(prefixKeyStr);
549 }
550 return E_OK;
551 }
552 } // namespace DistributedDB