1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "query_sync_water_mark_helper.h"
17
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25
26 namespace DistributedDB {
27 namespace {
28 const uint32_t MAX_STORE_ITEMS = 100000;
29 // WaterMark Version
30 constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31 constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 }
33
QuerySyncWaterMarkHelper()34 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
35 : storage_(nullptr)
36 {}
37
~QuerySyncWaterMarkHelper()38 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
39 {
40 storage_ = nullptr;
41 deviceIdToHashQuerySyncIdMap_.clear();
42 deviceIdToHashDeleteSyncIdMap_.clear();
43 }
44
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)45 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
46 {
47 if (storage_ == nullptr) {
48 return -E_INVALID_DB;
49 }
50 return storage_->GetMetaData(key, outValue);
51 }
52
GetMetadataByPrefixKeyFromDb(const Key & prefixKey,std::map<Key,Value> & data)53 int QuerySyncWaterMarkHelper::GetMetadataByPrefixKeyFromDb(const Key &prefixKey, std::map<Key, Value> &data)
54 {
55 if (storage_ == nullptr) {
56 return -E_INVALID_DB;
57 }
58 return storage_->GetMetaDataByPrefixKey(prefixKey, data);
59 }
60
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)61 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
62 {
63 if (storage_ == nullptr) {
64 return -E_INVALID_DB;
65 }
66 return storage_->PutMetaData(key, inValue, false);
67 }
68
DeleteMetaDataFromDB(const std::vector<Key> & keys) const69 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
70 {
71 if (storage_ == nullptr) {
72 return -E_INVALID_DB;
73 }
74 return storage_->DeleteMetaData(keys);
75 }
76
Initialize(ISyncInterface * storage)77 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
78 {
79 storage_ = storage;
80 return E_OK;
81 }
82
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)83 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
84 {
85 std::vector<uint8_t> value;
86 int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
87 if (errCode != E_OK) {
88 return errCode;
89 }
90 DeleteWaterMark deleteWaterMark;
91 std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
92 errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
93 if (errCode != E_OK) {
94 return errCode;
95 }
96 return errCode;
97 }
98
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)99 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
100 QueryWaterMark &queryWaterMark)
101 {
102 // second get from db
103 int errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
104 if (errCode == -E_NOT_FOUND) {
105 // third generate one and save to db
106 errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
107 }
108 // something error return
109 if (errCode != E_OK) {
110 LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
111 }
112 return errCode;
113 }
114
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,QueryWaterMark & queryWaterMark)115 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
116 const std::string &userId, QueryWaterMark &queryWaterMark)
117 {
118 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
119 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
120 return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
121 }
122
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,const WaterMark & waterMark)123 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
124 const std::string &deviceId, const std::string &userId, const WaterMark &waterMark)
125 {
126 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
127 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
128 return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
129 }
130
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,const Timestamp & timestamp)131 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
132 const std::string &deviceId, const std::string &userId, const Timestamp ×tamp)
133 {
134 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
135 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
136 QueryWaterMark queryWaterMark;
137 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
138 if (errCode != E_OK) {
139 return errCode;
140 }
141 queryWaterMark.lastQueryTime = timestamp;
142 return UpdateCacheAndSave(cacheKey, queryWaterMark);
143 }
144
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)145 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
146 const WaterMark &waterMark)
147 {
148 QueryWaterMark queryWaterMark;
149 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
150 if (errCode != E_OK) {
151 return errCode;
152 }
153 queryWaterMark.recvWaterMark = waterMark;
154 return UpdateCacheAndSave(cacheKey, queryWaterMark);
155 }
156
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const std::string & userId,const WaterMark & waterMark)157 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
158 const std::string &deviceId, const std::string &userId, const WaterMark &waterMark)
159 {
160 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, userId, queryIdentify);
161 QueryWaterMark queryWaterMark;
162 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
163 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
164 if (errCode != E_OK) {
165 return errCode;
166 }
167 queryWaterMark.sendWaterMark = waterMark;
168 return UpdateCacheAndSave(cacheKey, queryWaterMark);
169 }
170
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)171 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
172 QueryWaterMark &queryWaterMark)
173 {
174 // update lastUsedTime
175 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
176 if (errCode != E_OK) {
177 return errCode;
178 }
179 // save db
180 return SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
181 }
182
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)183 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
184 {
185 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
186 if (errCode != E_OK) {
187 return errCode;
188 }
189 queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
190 return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
191 }
192
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)193 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
194 {
195 // serialize value
196 Value dbValue;
197 int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
198 if (errCode != E_OK) {
199 return errCode;
200 }
201 // serialize key
202 Key dbKey;
203 DBCommon::StringToVector(dbKeyString, dbKey);
204 // save
205 errCode = SetMetadataToDb(dbKey, dbValue);
206 if (errCode != E_OK) {
207 LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
208 }
209 return errCode;
210 }
211
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)212 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
213 {
214 // serialize key
215 Key dbKey;
216 DBCommon::StringToVector(dbKeyString, dbKey);
217 // search in db
218 Value dbValue;
219 int errCode = GetMetadataFromDb(dbKey, dbValue);
220 if (errCode != E_OK) {
221 return errCode;
222 }
223 return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
224 }
225
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)226 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
227 {
228 uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
229 outValue.resize(length);
230 Parcel parcel(outValue.data(), outValue.size());
231 parcel.WriteUInt32(queryWaterMark.version);
232 parcel.EightByteAlign();
233 parcel.WriteUInt64(queryWaterMark.sendWaterMark);
234 parcel.WriteUInt64(queryWaterMark.recvWaterMark);
235 parcel.WriteUInt64(queryWaterMark.lastUsedTime);
236 parcel.WriteString(queryWaterMark.sql);
237 parcel.WriteUInt64(queryWaterMark.lastQueryTime);
238 if (parcel.IsError()) {
239 LOGE("[Meta] Parcel error when serialize queryWaterMark");
240 return -E_PARSE_FAIL;
241 }
242 return E_OK;
243 }
244
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)245 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
246 {
247 Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
248 parcel.ReadUInt32(queryWaterMark.version);
249 parcel.EightByteAlign();
250 parcel.ReadUInt64(queryWaterMark.sendWaterMark);
251 parcel.ReadUInt64(queryWaterMark.recvWaterMark);
252 parcel.ReadUInt64(queryWaterMark.lastUsedTime);
253 parcel.ReadString(queryWaterMark.sql);
254 if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
255 parcel.ReadUInt64(queryWaterMark.lastQueryTime);
256 }
257 if (parcel.IsError()) {
258 LOGE("[Meta] Parcel error when deserialize queryWaterMark");
259 return -E_PARSE_FAIL;
260 }
261 return E_OK;
262 }
263
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)264 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
265 {
266 uint64_t length = Parcel::GetUInt32Len(); // version
267 length = Parcel::GetEightByteAlign(length);
268 length += Parcel::GetUInt64Len(); // sendWaterMark
269 length += Parcel::GetUInt64Len(); // recvWaterMark
270 length += Parcel::GetUInt64Len(); // lastUsedTime
271 length += Parcel::GetStringLen(queryWaterMark.sql);
272 length += Parcel::GetUInt64Len(); // lastQueryTime
273 return length;
274 }
275
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & userId,const DeviceID & queryId)276 DeviceID QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &userId,
277 const DeviceID &queryId)
278 {
279 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
280 DeviceID hashQuerySyncId;
281 DeviceSyncTarget deviceInfo(deviceId, userId);
282 if (deviceIdToHashQuerySyncIdMap_[deviceInfo].count(queryId) == 0) {
283 // do not modify this
284 hashQuerySyncId = DBConstant::QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId +
285 DBConstant::USERID_PREFIX_KEY + userId;
286 deviceIdToHashQuerySyncIdMap_[deviceInfo][queryId] = hashQuerySyncId;
287 } else {
288 hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceInfo][queryId];
289 }
290 return hashQuerySyncId;
291 }
292
GetDeleteSyncWaterMark(const std::string & deviceId,const std::string & userId,DeleteWaterMark & deleteWaterMark)293 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, const std::string &userId,
294 DeleteWaterMark &deleteWaterMark)
295 {
296 std::string hashId = GetHashDeleteSyncDeviceId(deviceId, userId);
297 // lock prevent different thread visit deleteSyncCache_
298 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
299 return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
300 }
301
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const std::string & userId,const WaterMark & waterMark)302 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const std::string &userId,
303 const WaterMark &waterMark)
304 {
305 std::string hashId = GetHashDeleteSyncDeviceId(deviceId, userId);
306 DeleteWaterMark deleteWaterMark;
307 // lock prevent different thread visit deleteSyncCache_
308 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
309 int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
310 if (errCode != E_OK) {
311 return errCode;
312 }
313 deleteWaterMark.sendWaterMark = waterMark;
314 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
315 }
316
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const std::string & userId,const WaterMark & waterMark,bool isNeedHash)317 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const std::string &userId,
318 const WaterMark &waterMark, bool isNeedHash)
319 {
320 std::string hashId = GetHashDeleteSyncDeviceId(deviceId, userId, isNeedHash);
321 // lock prevent different thread visit deleteSyncCache_
322 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
323 int errCode = E_OK;
324 std::map<Key, DeleteWaterMark> deleteWaterMarks;
325 if (!userId.empty()) {
326 DeleteWaterMark deleteWaterMark;
327 Key dbKey;
328 DBCommon::StringToVector(hashId, dbKey);
329 errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
330 deleteWaterMark.recvWaterMark = waterMark;
331 deleteWaterMarks[dbKey] = deleteWaterMark;
332 } else {
333 errCode = GetDeleteWatersMarkFromDB(hashId, deleteWaterMarks);
334 }
335 if (errCode != E_OK) {
336 return errCode;
337 }
338 for (auto &deleteWaterMark : deleteWaterMarks) {
339 deleteWaterMark.second.recvWaterMark = waterMark;
340 errCode = UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark.second);
341 if (errCode != E_OK) {
342 return errCode;
343 }
344 }
345 return E_OK;
346 }
347
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)348 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
349 const DeleteWaterMark &deleteWaterMark)
350 {
351 // save db
352 return SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
353 }
354
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)355 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
356 DeleteWaterMark &deleteWaterMark)
357 {
358 deleteWaterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
359 int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, deleteWaterMark);
360 if (errCode == -E_NOT_FOUND) {
361 deleteWaterMark.sendWaterMark = 0;
362 deleteWaterMark.recvWaterMark = 0;
363 errCode = E_OK;
364 }
365 if (errCode != E_OK) {
366 LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
367 }
368 return errCode;
369 }
370
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)371 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
372 DeleteWaterMark &deleteWaterMark)
373 {
374 Key dbKey;
375 DBCommon::StringToVector(hashDeviceId, dbKey);
376 // search in db
377 Value dbValue;
378 int errCode = GetMetadataFromDb(dbKey, dbValue);
379 if (errCode != E_OK) {
380 return errCode;
381 }
382 // serialize value
383 return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
384 }
385
GetDeleteWatersMarkFromDB(const DeviceID & hashId,std::map<Key,DeleteWaterMark> & deleteWaterMarks)386 int QuerySyncWaterMarkHelper::GetDeleteWatersMarkFromDB(const DeviceID &hashId,
387 std::map<Key, DeleteWaterMark> &deleteWaterMarks)
388 {
389 Key dbKeyPrefix;
390 DBCommon::StringToVector(hashId, dbKeyPrefix);
391 // search in db
392 std::map<Key, Value> dbData;
393 int errCode = GetMetadataByPrefixKeyFromDb(dbKeyPrefix, dbData);
394 if (errCode == -E_NOT_FOUND) {
395 DeleteWaterMark deleteWaterMark;
396 deleteWaterMarks[dbKeyPrefix] = deleteWaterMark;
397 return E_OK;
398 }
399 if (errCode != E_OK) {
400 return errCode;
401 }
402 // serialize value
403 for (const auto &oneDbData : dbData) {
404 DeleteWaterMark deleteWaterMark;
405 errCode = DeSerializeDeleteWaterMark(oneDbData.second, deleteWaterMark);
406 if (errCode != E_OK) {
407 return errCode;
408 }
409 deleteWaterMarks[oneDbData.first] = deleteWaterMark;
410 }
411 return E_OK;
412 }
413
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)414 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
415 const DeleteWaterMark &deleteWaterMark)
416 {
417 // serialize value
418 Value dbValue;
419 int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
420 if (errCode != E_OK) {
421 return errCode;
422 }
423 Key dbKey;
424 DBCommon::StringToVector(hashDeviceId, dbKey);
425 // save
426 errCode = SetMetadataToDb(dbKey, dbValue);
427 if (errCode != E_OK) {
428 LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
429 }
430 return errCode;
431 }
432
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,const DeviceID & userId,bool isNeedHash)433 DeviceID QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, const DeviceID &userId,
434 bool isNeedHash)
435 {
436 DeviceID hashDeleteSyncId;
437 DeviceSyncTarget deviceInfo(deviceId, userId);
438 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
439 if (deviceIdToHashDeleteSyncIdMap_.count(deviceInfo) == 0) {
440 hashDeleteSyncId = DBConstant::DELETE_SYNC_PREFIX_KEY +
441 (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId) + DBConstant::USERID_PREFIX_KEY + userId;
442 deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceSyncTarget, DeviceID>(deviceInfo, hashDeleteSyncId));
443 } else {
444 hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceInfo];
445 }
446 return hashDeleteSyncId;
447 }
448
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)449 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
450 std::vector<uint8_t> &outValue)
451 {
452 uint64_t length = CalculateDeleteWaterMarkSize();
453 outValue.resize(length);
454 Parcel parcel(outValue.data(), outValue.size());
455 parcel.WriteUInt32(deleteWaterMark.version);
456 parcel.EightByteAlign();
457 parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
458 parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
459 if (parcel.IsError()) {
460 LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
461 return -E_PARSE_FAIL;
462 }
463 return E_OK;
464 }
465
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)466 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
467 DeleteWaterMark &deleteWaterMark)
468 {
469 Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
470 parcel.ReadUInt32(deleteWaterMark.version);
471 parcel.EightByteAlign();
472 parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
473 parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
474 if (parcel.IsError()) {
475 LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
476 return -E_PARSE_FAIL;
477 }
478 return E_OK;
479 }
480
CalculateDeleteWaterMarkSize()481 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
482 {
483 uint64_t length = Parcel::GetUInt32Len(); // version
484 length = Parcel::GetEightByteAlign(length);
485 length += Parcel::GetUInt64Len(); // sendWaterMark
486 length += Parcel::GetUInt64Len(); // recvWaterMark
487 return length;
488 }
489
GetQuerySyncPrefixKey()490 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
491 {
492 return DBConstant::QUERY_SYNC_PREFIX_KEY;
493 }
494
GetDeleteSyncPrefixKey()495 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
496 {
497 return DBConstant::DELETE_SYNC_PREFIX_KEY;
498 }
499
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)500 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
501 {
502 if (querySyncIds.size() < MAX_STORE_ITEMS) {
503 return E_OK;
504 }
505 std::vector<std::pair<std::string, Timestamp>> allItems;
506 std::map<std::string, std::vector<uint8_t>> idMap;
507 std::vector<std::vector<uint8_t>> waitToRemove;
508 for (const auto &id : querySyncIds) {
509 Value value;
510 int errCode = GetMetadataFromDb(id, value);
511 if (errCode != E_OK) {
512 waitToRemove.push_back(id);
513 continue; // may be this failure cause by wrong data
514 }
515 QueryWaterMark queryWaterMark;
516 std::string queryKey(id.begin(), id.end());
517 errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
518 if (errCode != E_OK) {
519 waitToRemove.push_back(id);
520 continue; // may be this failure cause by wrong data
521 }
522 idMap.insert({queryKey, id});
523 allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
524 }
525 // we only remove broken data below
526 // 1. common data size less then 10w
527 // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
528 // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
529 if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
530 // remove in db
531 return DeleteMetaDataFromDB(waitToRemove);
532 }
533 uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
534 // quick select the k_th least used
535 std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
536 [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
537 return w1.second < w2.second;
538 });
539 for (uint32_t i = 0; i < removeCount; ++i) {
540 waitToRemove.push_back(idMap[allItems[i].first]);
541 }
542 // remove in db
543 return DeleteMetaDataFromDB(waitToRemove);
544 }
545
ResetRecvQueryWaterMark(const DeviceID & deviceId,const DeviceID & userId,const std::string & tableName,bool isNeedHash)546 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const DeviceID &userId,
547 const std::string &tableName, bool isNeedHash)
548 {
549 // lock prevent other thread modify queryWaterMark at this moment
550 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
551 std::string prefixKeyStr = DBConstant::QUERY_SYNC_PREFIX_KEY +
552 (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
553 if (!tableName.empty()) {
554 std::string hashTableName = DBCommon::TransferHashString(tableName);
555 std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
556 prefixKeyStr += hexTableName;
557 }
558 if (!userId.empty()) {
559 prefixKeyStr += DBConstant::USERID_PREFIX_KEY + userId;
560 }
561
562 // remove in db
563 Key prefixKey;
564 DBCommon::StringToVector(prefixKeyStr, prefixKey);
565 int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
566 if (errCode != E_OK) {
567 LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
568 return errCode;
569 }
570 return E_OK;
571 }
572 } // namespace DistributedDB