1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_single_ver_sync_db_Interface.h"
17
18 #include <algorithm>
19 #include <thread>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "intercepted_data_impl.h"
25 #include "log_print.h"
26 #include "platform_specific.h"
27 #include "query_object.h"
28 #include "securec.h"
29
30 namespace DistributedDB {
31 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)32 int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
33 {
34 int errCode = E_OK;
35 for (const auto &item : dataItems) {
36 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
37 if (entry == nullptr) {
38 LOGE("Create entry failed.");
39 errCode = -E_OUT_OF_MEMORY;
40 break;
41 }
42 DataItem storageItem;
43 storageItem.key = item.key;
44 storageItem.value = item.value;
45 storageItem.flag = item.flag;
46 storageItem.timestamp = item.timestamp;
47 storageItem.writeTimestamp = item.writeTimestamp;
48 entry->SetEntryData(std::move(storageItem));
49 entries.push_back(entry);
50 }
51 if (errCode != E_OK) {
52 for (auto &kvEntry : entries) {
53 delete kvEntry;
54 kvEntry = nullptr;
55 }
56 entries.clear();
57 }
58 return errCode;
59 }
60 }
61
VirtualSingleVerSyncDBInterface()62 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
63 {
64 (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
65 LOGD("virtual device init db createTime");
66 }
67
GetInterfaceType() const68 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
69 {
70 return SYNC_SVD;
71 }
72
IncRefCount()73 void VirtualSingleVerSyncDBInterface::IncRefCount()
74 {
75 }
76
DecRefCount()77 void VirtualSingleVerSyncDBInterface::DecRefCount()
78 {
79 }
80
SetIdentifier(std::vector<uint8_t> & identifier)81 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
82 {
83 identifier_ = std::move(identifier);
84 }
85
GetIdentifier() const86 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
87 {
88 return identifier_;
89 }
90
GetMetaData(const Key & key,Value & value) const91 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
92 {
93 if (readBusy_) {
94 return -E_BUSY;
95 }
96 auto iter = metadata_.find(key);
97 if (iter != metadata_.end()) {
98 value = iter->second;
99 return E_OK;
100 }
101 return -E_NOT_FOUND;
102 }
103
PutMetaData(const Key & key,const Value & value)104 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value)
105 {
106 if (busy_) {
107 return -E_BUSY;
108 }
109 metadata_[key] = value;
110 return E_OK;
111 }
112
DeleteMetaData(const std::vector<Key> & keys)113 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
114 {
115 for (const auto &key : keys) {
116 (void)metadata_.erase(key);
117 }
118 return E_OK;
119 }
120
GetAllMetaKeys(std::vector<Key> & keys) const121 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
122 {
123 for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
124 keys.push_back(iter->first);
125 }
126 LOGD("GetAllMetaKeys size %zu", keys.size());
127 return E_OK;
128 }
129
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const130 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
131 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
132 {
133 return -E_NOT_SUPPORT;
134 }
135
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const136 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
137 const DataSizeSpecInfo &dataSizeInfo) const
138 {
139 return -E_NOT_SUPPORT;
140 }
141
ReleaseContinueToken(ContinueToken & continueStmtToken) const142 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
143 {
144 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
145 if (token != nullptr) {
146 delete token;
147 continueStmtToken = nullptr;
148 }
149 return;
150 }
151
GetSchemaInfo() const152 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
153 {
154 return schemaObj_;
155 }
156
CheckCompatible(const std::string & schema,uint8_t type) const157 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
158 {
159 if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
160 return true;
161 }
162 return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
163 }
164
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)165 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
166 {
167 VirtualDataItem item;
168 item.key = key;
169 item.value = value;
170 item.timestamp = time;
171 item.writeTimestamp = time;
172 item.flag = flag;
173 item.isLocal = true;
174 dbData_.push_back(item);
175 return E_OK;
176 }
177
GetMaxTimestamp(Timestamp & stamp) const178 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
179 {
180 for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
181 if (stamp < iter->writeTimestamp) {
182 stamp = iter->writeTimestamp;
183 }
184 }
185 LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
186 }
187
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)188 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
189 {
190 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
191 deviceData_.erase(deviceName);
192 uint32_t devId = 0;
193 if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
194 devId = deviceMapping_[deviceName];
195 }
196 for (auto &item : dbData_) {
197 if (item.deviceId == devId && devId > 0) {
198 item.flag = VirtualDataItem::DELETE_FLAG;
199 }
200 }
201 LOGD("RemoveDeviceData FINISH");
202 return E_OK;
203 }
204
GetSyncData(const Key & key,VirtualDataItem & dataItem)205 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
206 {
207 auto iter = std::find_if(dbData_.begin(), dbData_.end(),
208 [key](const VirtualDataItem& item) { return item.key == key; });
209 if (iter != dbData_.end()) {
210 if (iter->flag == VirtualDataItem::DELETE_FLAG) {
211 return -E_NOT_FOUND;
212 }
213 dataItem.key = iter->key;
214 dataItem.value = iter->value;
215 dataItem.timestamp = iter->timestamp;
216 dataItem.writeTimestamp = iter->writeTimestamp;
217 dataItem.flag = iter->flag;
218 dataItem.isLocal = iter->isLocal;
219 return E_OK;
220 }
221 return -E_NOT_FOUND;
222 }
223
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const224 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
225 std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
226 const DataSizeSpecInfo &dataSizeInfo) const
227 {
228 std::vector<VirtualDataItem> dataItems;
229 int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
230 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
231 LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
232 return errCode;
233 }
234 int innerCode = GetEntriesFromItems(entries, dataItems);
235 if (innerCode != E_OK) {
236 return innerCode;
237 }
238 return errCode;
239 }
240
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const241 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
242 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
243 {
244 if (continueStmtToken == nullptr) {
245 return -E_INVALID_ARGS;
246 }
247 int errCode = DataControl();
248 if (errCode != E_OK) {
249 return errCode;
250 }
251 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
252 Timestamp currentWaterMark = 0;
253 std::vector<VirtualDataItem> dataItems;
254 bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
255 if (isFinished) {
256 delete token;
257 continueStmtToken = nullptr;
258 } else {
259 currentWaterMark++;
260 token->begin = currentWaterMark;
261 }
262 int innerCode = GetEntriesFromItems(entries, dataItems);
263 if (innerCode != E_OK) {
264 return innerCode;
265 }
266 return isFinished ? E_OK : -E_UNFINISHED;
267 }
268
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const269 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
270 std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
271 {
272 if (getDataDelayTime_ > 0) {
273 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
274 }
275 int errCode = DataControl();
276 if (errCode != E_OK) {
277 return errCode;
278 }
279 Timestamp currentWaterMark = 0;
280 bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
281 if (!isFinished) {
282 VirtualContinueToken *token = new VirtualContinueToken();
283 if (token == nullptr) {
284 LOGD("virtual alloc token failed");
285 dataItems.clear();
286 return -E_OUT_OF_MEMORY;
287 }
288 currentWaterMark++;
289 token->begin = currentWaterMark;
290 token->end = end;
291 continueStmtToken = static_cast<VirtualContinueToken *>(token);
292 }
293 LOGD("dataItems size %zu", dataItems.size());
294 return isFinished ? E_OK : -E_UNFINISHED;
295 }
296
SetSaveDataDelayTime(uint64_t milliDelayTime)297 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
298 {
299 saveDataDelayTime_ = milliDelayTime;
300 }
301
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const302 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
303 uint32_t blockSize, ContinueToken& continueStmtToken) const
304 {
305 if (continueStmtToken == nullptr) {
306 return -E_NOT_SUPPORT;
307 }
308 return 0;
309 }
310
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)311 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
312 const std::string &deviceName)
313 {
314 if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
315 availableDeviceId_++;
316 deviceMapping_[deviceName] = availableDeviceId_;
317 LOGD("put deviceName=%s into device map", deviceName.c_str());
318 }
319 for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
320 LOGD("PutSyncData");
321 auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
322 [iter](VirtualDataItem item) { return item.key == iter->key; });
323 if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
324 // if has conflict, compare writeTimestamp
325 LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
326 iter->writeTimestamp);
327 dbDataIter->key = iter->key;
328 dbDataIter->value = iter->value;
329 dbDataIter->timestamp = iter->timestamp;
330 dbDataIter->writeTimestamp = iter->writeTimestamp;
331 dbDataIter->flag = iter->flag;
332 dbDataIter->isLocal = false;
333 dbDataIter->deviceId = deviceMapping_[deviceName];
334 } else {
335 LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
336 VirtualDataItem dataItem;
337 dataItem.key = iter->key;
338 dataItem.value = iter->value;
339 dataItem.timestamp = iter->timestamp;
340 dataItem.writeTimestamp = iter->writeTimestamp;
341 dataItem.flag = iter->flag;
342 dataItem.isLocal = false;
343 dataItem.deviceId = deviceMapping_[deviceName];
344 dbData_.push_back(dataItem);
345 }
346 }
347 return E_OK;
348 }
349
SetSchemaInfo(const std::string & schema)350 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
351 {
352 schema_ = schema;
353 SchemaObject emptyObj;
354 schemaObj_ = emptyObj;
355 schemaObj_.ParseFromSchemaString(schema);
356 }
357
GetDbProperties() const358 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
359 {
360 return properties_;
361 }
362
GetSecurityOption(SecurityOption & option) const363 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
364 {
365 if (getSecurityOptionCallBack_) {
366 return getSecurityOptionCallBack_(option);
367 }
368 option = secOption_;
369 return E_OK;
370 }
371
IsReadable() const372 bool VirtualSingleVerSyncDBInterface::IsReadable() const
373 {
374 return true;
375 }
376
SetSecurityOption(SecurityOption & option)377 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
378 {
379 secOption_ = option;
380 }
381
NotifyRemotePushFinished(const std::string & targetId) const382 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
383 {
384 }
385
GetDatabaseCreateTimestamp(Timestamp & outTime) const386 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
387 {
388 outTime = dbCreateTime_;
389 return E_OK;
390 }
391
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const392 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
393 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
394 std::vector<SingleVerKvEntry *> &entries) const
395 {
396 if (getDataDelayTime_ > 0) {
397 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
398 }
399 int errCode = DataControl();
400 if (errCode != E_OK) {
401 return errCode;
402 }
403 const auto &startKey = query.GetPrefixKey();
404 Key endKey = startKey;
405 endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
406
407 std::vector<VirtualDataItem> dataItems;
408 for (const auto &data : dbData_) {
409 // Only get local data.
410 if (!data.isLocal) {
411 continue;
412 }
413
414 if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
415 if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
416 dataItems.push_back(data);
417 }
418 } else {
419 if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
420 data.key >= startKey && data.key <= endKey) {
421 dataItems.push_back(data);
422 }
423 }
424 }
425
426 LOGD("dataItems size %zu", dataItems.size());
427 return GetEntriesFromItems(entries, dataItems);
428 }
429
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const430 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
431 {
432 size_t prefixKeySize = keyPrefix.size();
433 for (auto iter = metadata_.begin(); iter != metadata_.end();) {
434 if (prefixKeySize <= iter->first.size() &&
435 keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
436 iter = metadata_.erase(iter);
437 } else {
438 ++iter;
439 }
440 }
441 return E_OK;
442 }
443
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const444 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
445 {
446 return E_OK;
447 }
448
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const449 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
450 {
451 return E_OK;
452 }
453
PutSyncData(const DataItem & item)454 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
455 {
456 return E_OK;
457 }
458
CheckAndInitQueryCondition(QueryObject & query) const459 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
460 {
461 return E_OK;
462 }
463
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const464 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
465 const std::string &sourceID, const std::string &targetID) const
466 {
467 return E_OK;
468 }
469
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)470 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
471 const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
472 {
473 std::function<void()> callback;
474 {
475 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
476 callback = saveDataCallback_;
477 }
478 if (callback) {
479 callback();
480 }
481 std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
482 std::vector<VirtualDataItem> dataItems;
483 for (auto kvEntry : entries) {
484 auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
485 VirtualDataItem item;
486 genericKvEntry->GetKey(item.key);
487 genericKvEntry->GetValue(item.value);
488 item.timestamp = genericKvEntry->GetTimestamp();
489 item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
490 item.flag = genericKvEntry->GetFlag();
491 item.isLocal = false;
492 dataItems.push_back(item);
493 }
494 return PutSyncData(dataItems, deviceName);
495 }
496
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)497 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
498 bool needCacheSubscribe)
499 {
500 return E_OK;
501 }
502
RemoveSubscribe(const std::string & subscribeId)503 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
504 {
505 return E_OK;
506 }
507
RemoveSubscribe(const std::vector<std::string> & subscribeIds)508 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
509 {
510 return E_OK;
511 }
512
SetBusy(bool busy,bool readBusy)513 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
514 {
515 busy_ = busy;
516 readBusy_ = readBusy;
517 }
518
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)519 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
520 {
521 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
522 deviceData_[deviceName][key] = value;
523 }
524
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)525 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
526 {
527 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
528 value = deviceData_[deviceName][key];
529 }
530
SetDbProperties(KvDBProperties & kvDBProperties)531 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
532 {
533 properties_ = kvDBProperties;
534 }
535
DelayGetSyncData(uint32_t milliDelayTime)536 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
537 {
538 getDataDelayTime_ = milliDelayTime;
539 }
540
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)541 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
542 {
543 countDown_ = whichTime;
544 expectedErrCode_ = errCode;
545 isGetDataControl_ = isGetDataControl;
546 }
547
DataControl() const548 int VirtualSingleVerSyncDBInterface::DataControl() const
549 {
550 static int getDataTimes = 0;
551 if (countDown_ == -1) { // init -1
552 getDataTimes = 0;
553 }
554 if (isGetDataControl_ && countDown_ > 0) {
555 getDataTimes++;
556 }
557 if (isGetDataControl_ && countDown_ == getDataTimes) {
558 LOGD("virtual device get data failed = %d", expectedErrCode_);
559 getDataTimes = 0;
560 return expectedErrCode_;
561 }
562 return E_OK;
563 }
564
ResetDataControl()565 void VirtualSingleVerSyncDBInterface::ResetDataControl()
566 {
567 countDown_ = -1;
568 expectedErrCode_ = E_OK;
569 }
570
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const571 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp ¤tWaterMark,
572 const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
573 {
574 bool isFinished = true;
575 for (const auto &data : dbData_) {
576 if (data.isLocal) {
577 if (dataItems.size() >= dataSizeInfo.packetSize) {
578 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
579 isFinished = false;
580 break;
581 }
582 if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
583 dataItems.push_back(data);
584 currentWaterMark = data.writeTimestamp;
585 }
586 }
587 }
588 return isFinished;
589 }
590
SetSaveDataCallback(const std::function<void ()> & callback)591 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
592 {
593 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
594 saveDataCallback_ = callback;
595 }
596
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)597 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
598 {
599 getSecurityOptionCallBack_ = callBack;
600 }
601 } // namespace DistributedDB
602