• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_single_ver_sync_db_Interface.h"
17 
18 #include <algorithm>
19 #include <thread>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "intercepted_data_impl.h"
25 #include "log_print.h"
26 #include "platform_specific.h"
27 #include "query_object.h"
28 #include "securec.h"
29 
30 namespace DistributedDB {
31 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)32     int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
33     {
34         int errCode = E_OK;
35         for (const auto &item : dataItems) {
36             auto entry = new (std::nothrow) GenericSingleVerKvEntry();
37             if (entry == nullptr) {
38                 LOGE("Create entry failed.");
39                 errCode = -E_OUT_OF_MEMORY;
40                 break;
41             }
42             DataItem storageItem;
43             storageItem.key = item.key;
44             storageItem.value = item.value;
45             storageItem.flag = item.flag;
46             storageItem.timestamp = item.timestamp;
47             storageItem.writeTimestamp = item.writeTimestamp;
48             entry->SetEntryData(std::move(storageItem));
49             entries.push_back(entry);
50         }
51         if (errCode != E_OK) {
52             for (auto &kvEntry : entries) {
53                 delete kvEntry;
54                 kvEntry = nullptr;
55             }
56             entries.clear();
57         }
58         return errCode;
59     }
60 }
61 
VirtualSingleVerSyncDBInterface()62 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
63 {
64     (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
65     LOGD("virtual device init db createTime");
66 }
67 
GetInterfaceType() const68 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
69 {
70     return SYNC_SVD;
71 }
72 
IncRefCount()73 void VirtualSingleVerSyncDBInterface::IncRefCount()
74 {
75 }
76 
DecRefCount()77 void VirtualSingleVerSyncDBInterface::DecRefCount()
78 {
79 }
80 
SetIdentifier(std::vector<uint8_t> & identifier)81 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
82 {
83     identifier_ = std::move(identifier);
84 }
85 
GetIdentifier() const86 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
87 {
88     return identifier_;
89 }
90 
GetMetaData(const Key & key,Value & value) const91 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
92 {
93     if (readBusy_) {
94         return -E_BUSY;
95     }
96     auto iter = metadata_.find(key);
97     if (iter != metadata_.end()) {
98         value = iter->second;
99         return E_OK;
100     }
101     return -E_NOT_FOUND;
102 }
103 
PutMetaData(const Key & key,const Value & value)104 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value)
105 {
106     if (busy_) {
107         return -E_BUSY;
108     }
109     metadata_[key] = value;
110     return E_OK;
111 }
112 
DeleteMetaData(const std::vector<Key> & keys)113 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
114 {
115     for (const auto &key : keys) {
116         (void)metadata_.erase(key);
117     }
118     return E_OK;
119 }
120 
GetAllMetaKeys(std::vector<Key> & keys) const121 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
122 {
123     for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
124         keys.push_back(iter->first);
125     }
126     LOGD("GetAllMetaKeys size %zu", keys.size());
127     return E_OK;
128 }
129 
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const130 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
131     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
132 {
133     return -E_NOT_SUPPORT;
134 }
135 
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const136 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
137     const DataSizeSpecInfo &dataSizeInfo) const
138 {
139     return -E_NOT_SUPPORT;
140 }
141 
ReleaseContinueToken(ContinueToken & continueStmtToken) const142 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
143 {
144     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
145     if (token != nullptr) {
146         delete token;
147         continueStmtToken = nullptr;
148     }
149     return;
150 }
151 
GetSchemaInfo() const152 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
153 {
154     return schemaObj_;
155 }
156 
CheckCompatible(const std::string & schema,uint8_t type) const157 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
158 {
159     if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
160         return true;
161     }
162     return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
163 }
164 
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)165 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
166 {
167     VirtualDataItem item;
168     item.key = key;
169     item.value = value;
170     item.timestamp = time;
171     item.writeTimestamp = time;
172     item.flag = flag;
173     item.isLocal = true;
174     dbData_.push_back(item);
175     return E_OK;
176 }
177 
GetMaxTimestamp(Timestamp & stamp) const178 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
179 {
180     for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
181         if (stamp < iter->writeTimestamp) {
182             stamp = iter->writeTimestamp;
183         }
184     }
185     LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
186 }
187 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)188 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
189 {
190     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
191     deviceData_.erase(deviceName);
192     uint32_t devId = 0;
193     if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
194         devId = deviceMapping_[deviceName];
195     }
196     for (auto &item : dbData_) {
197         if (item.deviceId == devId && devId > 0) {
198             item.flag = VirtualDataItem::DELETE_FLAG;
199         }
200     }
201     LOGD("RemoveDeviceData FINISH");
202     return E_OK;
203 }
204 
GetSyncData(const Key & key,VirtualDataItem & dataItem)205 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
206 {
207     auto iter = std::find_if(dbData_.begin(), dbData_.end(),
208         [key](const VirtualDataItem& item) { return item.key == key; });
209     if (iter != dbData_.end()) {
210         if (iter->flag == VirtualDataItem::DELETE_FLAG) {
211             return -E_NOT_FOUND;
212         }
213         dataItem.key = iter->key;
214         dataItem.value = iter->value;
215         dataItem.timestamp = iter->timestamp;
216         dataItem.writeTimestamp = iter->writeTimestamp;
217         dataItem.flag = iter->flag;
218         dataItem.isLocal = iter->isLocal;
219         return E_OK;
220     }
221     return -E_NOT_FOUND;
222 }
223 
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const224 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
225     std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
226     const DataSizeSpecInfo &dataSizeInfo) const
227 {
228     std::vector<VirtualDataItem> dataItems;
229     int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
230     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
231         LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
232         return errCode;
233     }
234     int innerCode = GetEntriesFromItems(entries, dataItems);
235     if (innerCode != E_OK) {
236         return innerCode;
237     }
238     return errCode;
239 }
240 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const241 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
242     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
243 {
244     if (continueStmtToken == nullptr) {
245         return -E_INVALID_ARGS;
246     }
247     int errCode = DataControl();
248     if (errCode != E_OK) {
249         return errCode;
250     }
251     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
252     Timestamp currentWaterMark = 0;
253     std::vector<VirtualDataItem> dataItems;
254     bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
255     if (isFinished) {
256         delete token;
257         continueStmtToken = nullptr;
258     } else {
259         currentWaterMark++;
260         token->begin = currentWaterMark;
261     }
262     int innerCode = GetEntriesFromItems(entries, dataItems);
263     if (innerCode != E_OK) {
264         return innerCode;
265     }
266     return isFinished ? E_OK : -E_UNFINISHED;
267 }
268 
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const269 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
270     std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
271 {
272     if (getDataDelayTime_ > 0) {
273         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
274     }
275     int errCode = DataControl();
276     if (errCode != E_OK) {
277         return errCode;
278     }
279     Timestamp currentWaterMark = 0;
280     bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
281     if (!isFinished) {
282         VirtualContinueToken *token = new VirtualContinueToken();
283         if (token == nullptr) {
284             LOGD("virtual alloc token failed");
285             dataItems.clear();
286             return -E_OUT_OF_MEMORY;
287         }
288         currentWaterMark++;
289         token->begin = currentWaterMark;
290         token->end = end;
291         continueStmtToken = static_cast<VirtualContinueToken *>(token);
292     }
293     LOGD("dataItems size %zu", dataItems.size());
294     return isFinished ? E_OK : -E_UNFINISHED;
295 }
296 
SetSaveDataDelayTime(uint64_t milliDelayTime)297 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
298 {
299     saveDataDelayTime_ = milliDelayTime;
300 }
301 
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const302 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
303     uint32_t blockSize, ContinueToken& continueStmtToken) const
304 {
305     if (continueStmtToken == nullptr) {
306         return -E_NOT_SUPPORT;
307     }
308     return 0;
309 }
310 
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)311 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
312     const std::string &deviceName)
313 {
314     if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
315         availableDeviceId_++;
316         deviceMapping_[deviceName] = availableDeviceId_;
317         LOGD("put deviceName=%s into device map", deviceName.c_str());
318     }
319     for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
320         LOGD("PutSyncData");
321         auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
322             [iter](VirtualDataItem item) { return item.key == iter->key; });
323         if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
324             // if has conflict, compare writeTimestamp
325             LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
326                 iter->writeTimestamp);
327             dbDataIter->key = iter->key;
328             dbDataIter->value = iter->value;
329             dbDataIter->timestamp = iter->timestamp;
330             dbDataIter->writeTimestamp = iter->writeTimestamp;
331             dbDataIter->flag = iter->flag;
332             dbDataIter->isLocal = false;
333             dbDataIter->deviceId = deviceMapping_[deviceName];
334         } else {
335             LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
336             VirtualDataItem dataItem;
337             dataItem.key = iter->key;
338             dataItem.value = iter->value;
339             dataItem.timestamp = iter->timestamp;
340             dataItem.writeTimestamp = iter->writeTimestamp;
341             dataItem.flag = iter->flag;
342             dataItem.isLocal = false;
343             dataItem.deviceId = deviceMapping_[deviceName];
344             dbData_.push_back(dataItem);
345         }
346     }
347     return E_OK;
348 }
349 
SetSchemaInfo(const std::string & schema)350 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
351 {
352     schema_ = schema;
353     SchemaObject emptyObj;
354     schemaObj_ = emptyObj;
355     schemaObj_.ParseFromSchemaString(schema);
356 }
357 
GetDbProperties() const358 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
359 {
360     return properties_;
361 }
362 
GetSecurityOption(SecurityOption & option) const363 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
364 {
365     if (getSecurityOptionCallBack_) {
366         return getSecurityOptionCallBack_(option);
367     }
368     option = secOption_;
369     return E_OK;
370 }
371 
IsReadable() const372 bool VirtualSingleVerSyncDBInterface::IsReadable() const
373 {
374     return true;
375 }
376 
SetSecurityOption(SecurityOption & option)377 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
378 {
379     secOption_ = option;
380 }
381 
NotifyRemotePushFinished(const std::string & targetId) const382 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
383 {
384 }
385 
GetDatabaseCreateTimestamp(Timestamp & outTime) const386 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
387 {
388     outTime = dbCreateTime_;
389     return E_OK;
390 }
391 
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const392 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
393     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
394     std::vector<SingleVerKvEntry *> &entries) const
395 {
396     if (getDataDelayTime_ > 0) {
397         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
398     }
399     int errCode = DataControl();
400     if (errCode != E_OK) {
401         return errCode;
402     }
403     const auto &startKey = query.GetPrefixKey();
404     Key endKey = startKey;
405     endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
406 
407     std::vector<VirtualDataItem> dataItems;
408     for (const auto &data : dbData_) {
409         // Only get local data.
410         if (!data.isLocal) {
411             continue;
412         }
413 
414         if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
415             if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
416                 dataItems.push_back(data);
417             }
418         } else {
419             if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
420                 data.key >= startKey && data.key <= endKey) {
421                 dataItems.push_back(data);
422             }
423         }
424     }
425 
426     LOGD("dataItems size %zu", dataItems.size());
427     return GetEntriesFromItems(entries, dataItems);
428 }
429 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const430 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
431 {
432     size_t prefixKeySize = keyPrefix.size();
433     for (auto iter = metadata_.begin(); iter != metadata_.end();) {
434         if (prefixKeySize <= iter->first.size() &&
435             keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
436             iter = metadata_.erase(iter);
437         } else {
438             ++iter;
439         }
440     }
441     return E_OK;
442 }
443 
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const444 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
445 {
446     return E_OK;
447 }
448 
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const449 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
450 {
451     return E_OK;
452 }
453 
PutSyncData(const DataItem & item)454 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
455 {
456     return E_OK;
457 }
458 
CheckAndInitQueryCondition(QueryObject & query) const459 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
460 {
461     return E_OK;
462 }
463 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const464 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
465     const std::string &sourceID, const std::string &targetID) const
466 {
467     return E_OK;
468 }
469 
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)470 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
471     const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
472 {
473     std::function<void()> callback;
474     {
475         std::lock_guard<std::mutex> autoLock(saveDataMutex_);
476         callback = saveDataCallback_;
477     }
478     if (callback) {
479         callback();
480     }
481     std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
482     std::vector<VirtualDataItem> dataItems;
483     for (auto kvEntry : entries) {
484         auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
485         VirtualDataItem item;
486         genericKvEntry->GetKey(item.key);
487         genericKvEntry->GetValue(item.value);
488         item.timestamp = genericKvEntry->GetTimestamp();
489         item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
490         item.flag = genericKvEntry->GetFlag();
491         item.isLocal = false;
492         dataItems.push_back(item);
493     }
494     return PutSyncData(dataItems, deviceName);
495 }
496 
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)497 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
498     bool needCacheSubscribe)
499 {
500     return E_OK;
501 }
502 
RemoveSubscribe(const std::string & subscribeId)503 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
504 {
505     return E_OK;
506 }
507 
RemoveSubscribe(const std::vector<std::string> & subscribeIds)508 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
509 {
510     return E_OK;
511 }
512 
SetBusy(bool busy,bool readBusy)513 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
514 {
515     busy_ = busy;
516     readBusy_ = readBusy;
517 }
518 
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)519 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
520 {
521     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
522     deviceData_[deviceName][key] = value;
523 }
524 
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)525 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
526 {
527     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
528     value = deviceData_[deviceName][key];
529 }
530 
SetDbProperties(KvDBProperties & kvDBProperties)531 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
532 {
533     properties_ = kvDBProperties;
534 }
535 
DelayGetSyncData(uint32_t milliDelayTime)536 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
537 {
538     getDataDelayTime_ = milliDelayTime;
539 }
540 
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)541 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
542 {
543     countDown_ = whichTime;
544     expectedErrCode_ = errCode;
545     isGetDataControl_ = isGetDataControl;
546 }
547 
DataControl() const548 int VirtualSingleVerSyncDBInterface::DataControl() const
549 {
550     static int getDataTimes = 0;
551     if (countDown_ == -1) { // init -1
552         getDataTimes = 0;
553     }
554     if (isGetDataControl_ && countDown_ > 0) {
555         getDataTimes++;
556     }
557     if (isGetDataControl_ && countDown_ == getDataTimes) {
558         LOGD("virtual device get data failed = %d", expectedErrCode_);
559         getDataTimes = 0;
560         return expectedErrCode_;
561     }
562     return E_OK;
563 }
564 
ResetDataControl()565 void VirtualSingleVerSyncDBInterface::ResetDataControl()
566 {
567     countDown_ = -1;
568     expectedErrCode_ = E_OK;
569 }
570 
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const571 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp &currentWaterMark,
572     const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
573 {
574     bool isFinished = true;
575     for (const auto &data : dbData_) {
576         if (data.isLocal) {
577             if (dataItems.size() >= dataSizeInfo.packetSize) {
578                 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
579                 isFinished = false;
580                 break;
581             }
582             if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
583                 dataItems.push_back(data);
584                 currentWaterMark = data.writeTimestamp;
585             }
586         }
587     }
588     return isFinished;
589 }
590 
SetSaveDataCallback(const std::function<void ()> & callback)591 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
592 {
593     std::lock_guard<std::mutex> autoLock(saveDataMutex_);
594     saveDataCallback_ = callback;
595 }
596 
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)597 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
598 {
599     getSecurityOptionCallBack_ = callBack;
600 }
601 }  // namespace DistributedDB
602