• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_single_ver_sync_db_Interface.h"
17 
18 #include <algorithm>
19 #include <thread>
20 
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "intercepted_data_impl.h"
26 #include "log_print.h"
27 #include "platform_specific.h"
28 #include "query_object.h"
29 #include "securec.h"
30 
31 namespace DistributedDB {
32 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)33     int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
34     {
35         int errCode = E_OK;
36         for (const auto &item : dataItems) {
37             auto entry = new (std::nothrow) GenericSingleVerKvEntry();
38             if (entry == nullptr) {
39                 LOGE("Create entry failed.");
40                 errCode = -E_OUT_OF_MEMORY;
41                 break;
42             }
43             DataItem storageItem;
44             storageItem.key = item.key;
45             storageItem.value = item.value;
46             storageItem.flag = item.flag;
47             storageItem.timestamp = item.timestamp;
48             storageItem.writeTimestamp = item.writeTimestamp;
49             entry->SetEntryData(std::move(storageItem));
50             entries.push_back(entry);
51         }
52         if (errCode != E_OK) {
53             for (auto &kvEntry : entries) {
54                 delete kvEntry;
55                 kvEntry = nullptr;
56             }
57             entries.clear();
58         }
59         return errCode;
60     }
61 }
62 
VirtualSingleVerSyncDBInterface()63 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
64 {
65     (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
66     LOGD("virtual device init db createTime");
67 }
68 
GetInterfaceType() const69 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
70 {
71     return SYNC_SVD;
72 }
73 
IncRefCount()74 void VirtualSingleVerSyncDBInterface::IncRefCount()
75 {
76 }
77 
DecRefCount()78 void VirtualSingleVerSyncDBInterface::DecRefCount()
79 {
80 }
81 
SetIdentifier(std::vector<uint8_t> & identifier)82 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
83 {
84     identifier_ = std::move(identifier);
85 }
86 
GetIdentifier() const87 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
88 {
89     return identifier_;
90 }
91 
GetMetaData(const Key & key,Value & value) const92 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
93 {
94     if (readBusy_) {
95         return -E_BUSY;
96     }
97     auto iter = metadata_.find(key);
98     if (iter != metadata_.end()) {
99         value = iter->second;
100         return E_OK;
101     }
102     return -E_NOT_FOUND;
103 }
104 
GetMetaDataByPrefixKey(const Key & keyPrefix,std::map<Key,Value> & data) const105 int VirtualSingleVerSyncDBInterface::GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const
106 {
107     if (readBusy_) {
108         return -E_BUSY;
109     }
110     for (const auto &metadata : metadata_) {
111         if (metadata.first.size() < keyPrefix.size()) {
112             continue;
113         }
114         if (std::equal(keyPrefix.begin(), keyPrefix.end(), metadata.first.begin())) {
115             data[metadata.first] = metadata.second;
116         }
117     }
118     return data.empty() ? -E_NOT_FOUND : E_OK;
119 }
120 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)121 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
122 {
123     (void)isInTransaction;
124     if (busy_) {
125         return -E_BUSY;
126     }
127     metadata_[key] = value;
128     return E_OK;
129 }
130 
DeleteMetaData(const std::vector<Key> & keys)131 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
132 {
133     for (const auto &key : keys) {
134         (void)metadata_.erase(key);
135     }
136     return E_OK;
137 }
138 
GetAllMetaKeys(std::vector<Key> & keys) const139 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
140 {
141     for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
142         keys.push_back(iter->first);
143     }
144     LOGD("GetAllMetaKeys size %zu", keys.size());
145     return E_OK;
146 }
147 
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const148 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
149     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
150 {
151     return -E_NOT_SUPPORT;
152 }
153 
GetUnSyncTotal(Timestamp begin,Timestamp end,uint32_t & total) const154 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(Timestamp begin, Timestamp end, uint32_t &total) const
155 {
156     total = 0;
157     for (const auto &data : dbData_) {
158         if (data.isLocal) {
159             if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
160                 total++;
161             }
162         }
163     }
164     return E_OK;
165 }
166 
GetUnSyncTotal(QueryObject & query,const SyncTimeRange & timeRange,uint32_t & total) const167 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(QueryObject &query, const SyncTimeRange &timeRange,
168     uint32_t &total) const
169 {
170     if (getDataDelayTime_ > 0) {
171         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
172     }
173     int errCode = DataControl();
174     if (errCode != E_OK) {
175         return errCode;
176     }
177 
178     total = 0;
179     const auto &startKey = query.GetPrefixKey();
180     Key endKey = startKey;
181     endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
182 
183     for (const auto &data : dbData_) {
184         // Only get local data.
185         if (!data.isLocal) {
186             continue;
187         }
188 
189         if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
190             if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
191                 total++;
192             }
193         } else {
194             if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
195                 data.key >= startKey && data.key <= endKey) {
196                 total++;
197             }
198         }
199     }
200 
201     LOGD("GetUnSyncTotal %u", total);
202     return E_OK;
203 }
204 
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const205 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
206     const DataSizeSpecInfo &dataSizeInfo) const
207 {
208     return -E_NOT_SUPPORT;
209 }
210 
ReleaseContinueToken(ContinueToken & continueStmtToken) const211 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
212 {
213     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
214     if (token != nullptr) {
215         delete token;
216         continueStmtToken = nullptr;
217     }
218     return;
219 }
220 
GetSchemaInfo() const221 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
222 {
223     return schemaObj_;
224 }
225 
CheckCompatible(const std::string & schema,uint8_t type) const226 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
227 {
228     if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
229         return true;
230     }
231     return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
232 }
233 
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)234 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
235 {
236     VirtualDataItem item;
237     item.key = key;
238     item.value = value;
239     item.timestamp = time;
240     item.writeTimestamp = time;
241     item.flag = static_cast<uint64_t>(flag);
242     item.isLocal = true;
243     dbData_.push_back(item);
244     return E_OK;
245 }
246 
GetMaxTimestamp(Timestamp & stamp) const247 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
248 {
249     for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
250         if (stamp < iter->writeTimestamp) {
251             stamp = iter->writeTimestamp;
252         }
253     }
254     LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
255 }
256 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)257 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
258 {
259     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
260     deviceData_.erase(deviceName);
261     uint32_t devId = 0;
262     if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
263         devId = deviceMapping_[deviceName];
264     }
265     for (auto &item : dbData_) {
266         if (item.deviceId == devId && devId > 0) {
267             item.flag = VirtualDataItem::DELETE_FLAG;
268         }
269     }
270     LOGD("RemoveDeviceData FINISH");
271     return E_OK;
272 }
273 
GetSyncData(const Key & key,VirtualDataItem & dataItem)274 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
275 {
276     auto iter = std::find_if(dbData_.begin(), dbData_.end(),
277         [key](const VirtualDataItem& item) { return item.key == key; });
278     if (iter != dbData_.end()) {
279         if (iter->flag == VirtualDataItem::DELETE_FLAG) {
280             return -E_NOT_FOUND;
281         }
282         dataItem.key = iter->key;
283         dataItem.value = iter->value;
284         dataItem.timestamp = iter->timestamp;
285         dataItem.writeTimestamp = iter->writeTimestamp;
286         dataItem.flag = iter->flag;
287         dataItem.isLocal = iter->isLocal;
288         return E_OK;
289     }
290     return -E_NOT_FOUND;
291 }
292 
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const293 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
294     std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
295     const DataSizeSpecInfo &dataSizeInfo) const
296 {
297     std::vector<VirtualDataItem> dataItems;
298     int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
299     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
300         LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
301         return errCode;
302     }
303     int innerCode = GetEntriesFromItems(entries, dataItems);
304     if (innerCode != E_OK) {
305         return innerCode;
306     }
307     return errCode;
308 }
309 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const310 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
311     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
312 {
313     if (continueStmtToken == nullptr) {
314         return -E_INVALID_ARGS;
315     }
316     int errCode = DataControl();
317     if (errCode != E_OK) {
318         return errCode;
319     }
320     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
321     Timestamp currentWaterMark = 0;
322     std::vector<VirtualDataItem> dataItems;
323     bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
324     if (isFinished) {
325         delete token;
326         continueStmtToken = nullptr;
327     } else {
328         currentWaterMark++;
329         token->begin = currentWaterMark;
330     }
331     int innerCode = GetEntriesFromItems(entries, dataItems);
332     if (innerCode != E_OK) {
333         return innerCode;
334     }
335     return isFinished ? E_OK : -E_UNFINISHED;
336 }
337 
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const338 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
339     std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
340 {
341     if (getDataDelayTime_ > 0) {
342         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
343     }
344     int errCode = DataControl();
345     if (errCode != E_OK) {
346         return errCode;
347     }
348     Timestamp currentWaterMark = 0;
349     bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
350     if (!isFinished) {
351         VirtualContinueToken *token = new(std::nothrow) VirtualContinueToken();
352         if (token == nullptr) {
353             LOGD("virtual alloc token failed");
354             dataItems.clear();
355             return -E_OUT_OF_MEMORY;
356         }
357         currentWaterMark++;
358         token->begin = currentWaterMark;
359         token->end = end;
360         continueStmtToken = static_cast<VirtualContinueToken *>(token);
361     }
362     LOGD("dataItems size %zu", dataItems.size());
363     return isFinished ? E_OK : -E_UNFINISHED;
364 }
365 
SetSaveDataDelayTime(uint64_t milliDelayTime)366 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
367 {
368     saveDataDelayTime_ = milliDelayTime;
369 }
370 
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const371 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
372     uint32_t blockSize, ContinueToken& continueStmtToken) const
373 {
374     if (continueStmtToken == nullptr) {
375         return -E_NOT_SUPPORT;
376     }
377     return 0;
378 }
379 
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)380 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
381     const std::string &deviceName)
382 {
383     if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
384         availableDeviceId_++;
385         deviceMapping_[deviceName] = availableDeviceId_;
386         LOGD("put deviceName=%s into device map", deviceName.c_str());
387     }
388     for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
389         LOGD("PutSyncData");
390         auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
391             [iter](VirtualDataItem item) { return item.key == iter->key; });
392         if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
393             // if has conflict, compare writeTimestamp
394             LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
395                 iter->writeTimestamp);
396             dbDataIter->key = iter->key;
397             dbDataIter->value = iter->value;
398             dbDataIter->timestamp = iter->timestamp;
399             dbDataIter->writeTimestamp = iter->writeTimestamp;
400             dbDataIter->flag = iter->flag;
401             dbDataIter->isLocal = false;
402             dbDataIter->deviceId = deviceMapping_[deviceName];
403         } else {
404             LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
405             VirtualDataItem dataItem;
406             dataItem.key = iter->key;
407             dataItem.value = iter->value;
408             dataItem.timestamp = iter->timestamp;
409             dataItem.writeTimestamp = iter->writeTimestamp;
410             dataItem.flag = iter->flag;
411             dataItem.isLocal = false;
412             dataItem.deviceId = deviceMapping_[deviceName];
413             dbData_.push_back(dataItem);
414         }
415     }
416     return E_OK;
417 }
418 
SetSchemaInfo(const std::string & schema)419 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
420 {
421     schema_ = schema;
422     SchemaObject emptyObj;
423     schemaObj_ = emptyObj;
424     schemaObj_.ParseFromSchemaString(schema);
425 }
426 
GetDbProperties() const427 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
428 {
429     return properties_;
430 }
431 
GetSecurityOption(SecurityOption & option) const432 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
433 {
434     if (getSecurityOptionCallBack_) {
435         return getSecurityOptionCallBack_(option);
436     }
437     if (secOption_.securityLabel == NOT_SET) {
438         return -E_NOT_SUPPORT;
439     }
440     option = secOption_;
441     return E_OK;
442 }
443 
IsReadable() const444 bool VirtualSingleVerSyncDBInterface::IsReadable() const
445 {
446     return true;
447 }
448 
SetSecurityOption(SecurityOption & option)449 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
450 {
451     secOption_ = option;
452 }
453 
NotifyRemotePushFinished(const std::string & targetId) const454 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
455 {
456     std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
457     if (pushNotifier_) {
458         pushNotifier_(targetId);
459         LOGI("[VirtualSingleVerSyncDBInterface] Notify remote push finished");
460     }
461 }
462 
GetDatabaseCreateTimestamp(Timestamp & outTime) const463 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
464 {
465     outTime = dbCreateTime_;
466     return E_OK;
467 }
468 
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const469 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
470     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
471     std::vector<SingleVerKvEntry *> &entries) const
472 {
473     if (getDataDelayTime_ > 0) {
474         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
475     }
476     int errCode = DataControl();
477     if (errCode != E_OK) {
478         return errCode;
479     }
480     const auto &startKey = query.GetPrefixKey();
481     Key endKey = startKey;
482     endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
483 
484     std::vector<VirtualDataItem> dataItems;
485     for (const auto &data : dbData_) {
486         // Only get local data.
487         if (!data.isLocal) {
488             continue;
489         }
490 
491         if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
492             if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
493                 dataItems.push_back(data);
494             }
495         } else {
496             if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
497                 data.key >= startKey && data.key <= endKey) {
498                 dataItems.push_back(data);
499             }
500         }
501     }
502 
503     LOGD("dataItems size %zu", dataItems.size());
504     return GetEntriesFromItems(entries, dataItems);
505 }
506 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const507 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
508 {
509     size_t prefixKeySize = keyPrefix.size();
510     for (auto iter = metadata_.begin(); iter != metadata_.end();) {
511         if (prefixKeySize <= iter->first.size() &&
512             keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
513             iter = metadata_.erase(iter);
514         } else {
515             ++iter;
516         }
517     }
518     return E_OK;
519 }
520 
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const521 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
522 {
523     if (compressSync_) {
524         needCompressOnSync = true;
525         compressionRate = 100; // compress rate 100
526     }
527     return E_OK;
528 }
529 
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const530 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
531 {
532     if (compressSync_) {
533         DataCompression::GetCompressionAlgo(algorithmSet);
534     }
535     return E_OK;
536 }
537 
PutSyncData(const DataItem & item)538 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
539 {
540     return E_OK;
541 }
542 
CheckAndInitQueryCondition(QueryObject & query) const543 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
544 {
545     return E_OK;
546 }
547 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const548 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
549     const std::string &sourceID, const std::string &targetID, bool isPush) const
550 {
551     return E_OK;
552 }
553 
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)554 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
555     const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
556 {
557     std::function<void()> callback;
558     {
559         std::lock_guard<std::mutex> autoLock(saveDataMutex_);
560         callback = saveDataCallback_;
561     }
562     if (callback) {
563         callback();
564     }
565     std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
566     std::vector<VirtualDataItem> dataItems;
567     for (auto kvEntry : entries) {
568         auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
569         VirtualDataItem item;
570         genericKvEntry->GetKey(item.key);
571         genericKvEntry->GetValue(item.value);
572         item.timestamp = genericKvEntry->GetTimestamp();
573         item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
574         item.flag = genericKvEntry->GetFlag();
575         item.isLocal = false;
576         dataItems.push_back(item);
577     }
578     return PutSyncData(dataItems, deviceName);
579 }
580 
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)581 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
582     bool needCacheSubscribe)
583 {
584     return E_OK;
585 }
586 
RemoveSubscribe(const std::string & subscribeId)587 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
588 {
589     return E_OK;
590 }
591 
RemoveSubscribe(const std::vector<std::string> & subscribeIds)592 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
593 {
594     return E_OK;
595 }
596 
SetBusy(bool busy,bool readBusy)597 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
598 {
599     busy_ = busy;
600     readBusy_ = readBusy;
601 }
602 
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)603 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
604 {
605     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
606     deviceData_[deviceName][key] = value;
607 }
608 
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)609 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
610 {
611     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
612     value = deviceData_[deviceName][key];
613 }
614 
SetDbProperties(KvDBProperties & kvDBProperties)615 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
616 {
617     properties_ = kvDBProperties;
618 }
619 
DelayGetSyncData(uint32_t milliDelayTime)620 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
621 {
622     getDataDelayTime_ = milliDelayTime;
623 }
624 
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)625 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
626 {
627     countDown_ = whichTime;
628     expectedErrCode_ = errCode;
629     isGetDataControl_ = isGetDataControl;
630 }
631 
DataControl() const632 int VirtualSingleVerSyncDBInterface::DataControl() const
633 {
634     static int getDataTimes = 0;
635     if (countDown_ == -1) { // init -1
636         getDataTimes = 0;
637     }
638     if (isGetDataControl_ && countDown_ > 0) {
639         getDataTimes++;
640     }
641     if (isGetDataControl_ && countDown_ == getDataTimes) {
642         LOGD("virtual device get data failed = %d", expectedErrCode_);
643         getDataTimes = 0;
644         return expectedErrCode_;
645     }
646     return E_OK;
647 }
648 
ResetDataControl()649 void VirtualSingleVerSyncDBInterface::ResetDataControl()
650 {
651     countDown_ = -1;
652     expectedErrCode_ = E_OK;
653 }
654 
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const655 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp &currentWaterMark,
656     const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
657 {
658     bool isFinished = true;
659     for (const auto &data : dbData_) {
660         if (data.isLocal) {
661             if (dataItems.size() >= dataSizeInfo.packetSize) {
662                 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
663                 isFinished = false;
664                 break;
665             }
666             if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
667                 dataItems.push_back(data);
668                 currentWaterMark = data.writeTimestamp;
669             }
670         }
671     }
672     return isFinished;
673 }
674 
SetSaveDataCallback(const std::function<void ()> & callback)675 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
676 {
677     std::lock_guard<std::mutex> autoLock(saveDataMutex_);
678     saveDataCallback_ = callback;
679 }
680 
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)681 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
682 {
683     getSecurityOptionCallBack_ = callBack;
684 }
685 
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)686 void VirtualSingleVerSyncDBInterface::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
687 {
688     std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
689     pushNotifier_ = pushNotifier;
690 }
691 
SetCompressSync(bool compressSync)692 void VirtualSingleVerSyncDBInterface::SetCompressSync(bool compressSync)
693 {
694     compressSync_ = compressSync;
695 }
696 }  // namespace DistributedDB
697