• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_single_ver_sync_db_Interface.h"
17 
18 #include <algorithm>
19 #include <thread>
20 
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "intercepted_data_impl.h"
26 #include "log_print.h"
27 #include "platform_specific.h"
28 #include "query_object.h"
29 #include "securec.h"
30 
31 namespace DistributedDB {
32 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)33     int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
34     {
35         int errCode = E_OK;
36         for (const auto &item : dataItems) {
37             auto entry = new (std::nothrow) GenericSingleVerKvEntry();
38             if (entry == nullptr) {
39                 LOGE("Create entry failed.");
40                 errCode = -E_OUT_OF_MEMORY;
41                 break;
42             }
43             DataItem storageItem;
44             storageItem.key = item.key;
45             storageItem.value = item.value;
46             storageItem.flag = item.flag;
47             storageItem.timestamp = item.timestamp;
48             storageItem.writeTimestamp = item.writeTimestamp;
49             entry->SetEntryData(std::move(storageItem));
50             entries.push_back(entry);
51         }
52         if (errCode != E_OK) {
53             for (auto &kvEntry : entries) {
54                 delete kvEntry;
55                 kvEntry = nullptr;
56             }
57             entries.clear();
58         }
59         return errCode;
60     }
61 }
62 
VirtualSingleVerSyncDBInterface()63 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
64 {
65     (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
66     LOGD("virtual device init db createTime");
67 }
68 
GetInterfaceType() const69 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
70 {
71     return SYNC_SVD;
72 }
73 
IncRefCount()74 void VirtualSingleVerSyncDBInterface::IncRefCount()
75 {
76 }
77 
DecRefCount()78 void VirtualSingleVerSyncDBInterface::DecRefCount()
79 {
80 }
81 
SetIdentifier(std::vector<uint8_t> & identifier)82 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
83 {
84     identifier_ = std::move(identifier);
85 }
86 
GetIdentifier() const87 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
88 {
89     return identifier_;
90 }
91 
GetMetaData(const Key & key,Value & value) const92 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
93 {
94     if (readBusy_) {
95         return -E_BUSY;
96     }
97     auto iter = metadata_.find(key);
98     if (iter != metadata_.end()) {
99         value = iter->second;
100         return E_OK;
101     }
102     return -E_NOT_FOUND;
103 }
104 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)105 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
106 {
107     (void)isInTransaction;
108     if (busy_) {
109         return -E_BUSY;
110     }
111     metadata_[key] = value;
112     return E_OK;
113 }
114 
DeleteMetaData(const std::vector<Key> & keys)115 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
116 {
117     for (const auto &key : keys) {
118         (void)metadata_.erase(key);
119     }
120     return E_OK;
121 }
122 
GetAllMetaKeys(std::vector<Key> & keys) const123 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
124 {
125     for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
126         keys.push_back(iter->first);
127     }
128     LOGD("GetAllMetaKeys size %zu", keys.size());
129     return E_OK;
130 }
131 
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const132 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
133     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
134 {
135     return -E_NOT_SUPPORT;
136 }
137 
GetUnSyncTotal(Timestamp begin,Timestamp end,uint32_t & total) const138 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(Timestamp begin, Timestamp end, uint32_t &total) const
139 {
140     total = 0;
141     for (const auto &data : dbData_) {
142         if (data.isLocal) {
143             if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
144                 total++;
145             }
146         }
147     }
148     return E_OK;
149 }
150 
GetUnSyncTotal(QueryObject & query,const SyncTimeRange & timeRange,uint32_t & total) const151 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(QueryObject &query, const SyncTimeRange &timeRange,
152     uint32_t &total) const
153 {
154     if (getDataDelayTime_ > 0) {
155         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
156     }
157     int errCode = DataControl();
158     if (errCode != E_OK) {
159         return errCode;
160     }
161 
162     total = 0;
163     const auto &startKey = query.GetPrefixKey();
164     Key endKey = startKey;
165     endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
166 
167     for (const auto &data : dbData_) {
168         // Only get local data.
169         if (!data.isLocal) {
170             continue;
171         }
172 
173         if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
174             if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
175                 total++;
176             }
177         } else {
178             if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
179                 data.key >= startKey && data.key <= endKey) {
180                 total++;
181             }
182         }
183     }
184 
185     LOGD("GetUnSyncTotal %u", total);
186     return E_OK;
187 }
188 
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const189 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
190     const DataSizeSpecInfo &dataSizeInfo) const
191 {
192     return -E_NOT_SUPPORT;
193 }
194 
ReleaseContinueToken(ContinueToken & continueStmtToken) const195 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
196 {
197     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
198     if (token != nullptr) {
199         delete token;
200         continueStmtToken = nullptr;
201     }
202     return;
203 }
204 
GetSchemaInfo() const205 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
206 {
207     return schemaObj_;
208 }
209 
CheckCompatible(const std::string & schema,uint8_t type) const210 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
211 {
212     if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
213         return true;
214     }
215     return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
216 }
217 
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)218 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
219 {
220     VirtualDataItem item;
221     item.key = key;
222     item.value = value;
223     item.timestamp = time;
224     item.writeTimestamp = time;
225     item.flag = static_cast<uint64_t>(flag);
226     item.isLocal = true;
227     dbData_.push_back(item);
228     return E_OK;
229 }
230 
GetMaxTimestamp(Timestamp & stamp) const231 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
232 {
233     for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
234         if (stamp < iter->writeTimestamp) {
235             stamp = iter->writeTimestamp;
236         }
237     }
238     LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
239 }
240 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)241 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
242 {
243     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
244     deviceData_.erase(deviceName);
245     uint32_t devId = 0;
246     if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
247         devId = deviceMapping_[deviceName];
248     }
249     for (auto &item : dbData_) {
250         if (item.deviceId == devId && devId > 0) {
251             item.flag = VirtualDataItem::DELETE_FLAG;
252         }
253     }
254     LOGD("RemoveDeviceData FINISH");
255     return E_OK;
256 }
257 
GetSyncData(const Key & key,VirtualDataItem & dataItem)258 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
259 {
260     auto iter = std::find_if(dbData_.begin(), dbData_.end(),
261         [key](const VirtualDataItem& item) { return item.key == key; });
262     if (iter != dbData_.end()) {
263         if (iter->flag == VirtualDataItem::DELETE_FLAG) {
264             return -E_NOT_FOUND;
265         }
266         dataItem.key = iter->key;
267         dataItem.value = iter->value;
268         dataItem.timestamp = iter->timestamp;
269         dataItem.writeTimestamp = iter->writeTimestamp;
270         dataItem.flag = iter->flag;
271         dataItem.isLocal = iter->isLocal;
272         return E_OK;
273     }
274     return -E_NOT_FOUND;
275 }
276 
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const277 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
278     std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
279     const DataSizeSpecInfo &dataSizeInfo) const
280 {
281     std::vector<VirtualDataItem> dataItems;
282     int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
283     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
284         LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
285         return errCode;
286     }
287     int innerCode = GetEntriesFromItems(entries, dataItems);
288     if (innerCode != E_OK) {
289         return innerCode;
290     }
291     return errCode;
292 }
293 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const294 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
295     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
296 {
297     if (continueStmtToken == nullptr) {
298         return -E_INVALID_ARGS;
299     }
300     int errCode = DataControl();
301     if (errCode != E_OK) {
302         return errCode;
303     }
304     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
305     Timestamp currentWaterMark = 0;
306     std::vector<VirtualDataItem> dataItems;
307     bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
308     if (isFinished) {
309         delete token;
310         continueStmtToken = nullptr;
311     } else {
312         currentWaterMark++;
313         token->begin = currentWaterMark;
314     }
315     int innerCode = GetEntriesFromItems(entries, dataItems);
316     if (innerCode != E_OK) {
317         return innerCode;
318     }
319     return isFinished ? E_OK : -E_UNFINISHED;
320 }
321 
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const322 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
323     std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
324 {
325     if (getDataDelayTime_ > 0) {
326         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
327     }
328     int errCode = DataControl();
329     if (errCode != E_OK) {
330         return errCode;
331     }
332     Timestamp currentWaterMark = 0;
333     bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
334     if (!isFinished) {
335         VirtualContinueToken *token = new(std::nothrow) VirtualContinueToken();
336         if (token == nullptr) {
337             LOGD("virtual alloc token failed");
338             dataItems.clear();
339             return -E_OUT_OF_MEMORY;
340         }
341         currentWaterMark++;
342         token->begin = currentWaterMark;
343         token->end = end;
344         continueStmtToken = static_cast<VirtualContinueToken *>(token);
345     }
346     LOGD("dataItems size %zu", dataItems.size());
347     return isFinished ? E_OK : -E_UNFINISHED;
348 }
349 
SetSaveDataDelayTime(uint64_t milliDelayTime)350 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
351 {
352     saveDataDelayTime_ = milliDelayTime;
353 }
354 
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const355 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
356     uint32_t blockSize, ContinueToken& continueStmtToken) const
357 {
358     if (continueStmtToken == nullptr) {
359         return -E_NOT_SUPPORT;
360     }
361     return 0;
362 }
363 
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)364 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
365     const std::string &deviceName)
366 {
367     if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
368         availableDeviceId_++;
369         deviceMapping_[deviceName] = availableDeviceId_;
370         LOGD("put deviceName=%s into device map", deviceName.c_str());
371     }
372     for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
373         LOGD("PutSyncData");
374         auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
375             [iter](VirtualDataItem item) { return item.key == iter->key; });
376         if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
377             // if has conflict, compare writeTimestamp
378             LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
379                 iter->writeTimestamp);
380             dbDataIter->key = iter->key;
381             dbDataIter->value = iter->value;
382             dbDataIter->timestamp = iter->timestamp;
383             dbDataIter->writeTimestamp = iter->writeTimestamp;
384             dbDataIter->flag = iter->flag;
385             dbDataIter->isLocal = false;
386             dbDataIter->deviceId = deviceMapping_[deviceName];
387         } else {
388             LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
389             VirtualDataItem dataItem;
390             dataItem.key = iter->key;
391             dataItem.value = iter->value;
392             dataItem.timestamp = iter->timestamp;
393             dataItem.writeTimestamp = iter->writeTimestamp;
394             dataItem.flag = iter->flag;
395             dataItem.isLocal = false;
396             dataItem.deviceId = deviceMapping_[deviceName];
397             dbData_.push_back(dataItem);
398         }
399     }
400     return E_OK;
401 }
402 
SetSchemaInfo(const std::string & schema)403 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
404 {
405     schema_ = schema;
406     SchemaObject emptyObj;
407     schemaObj_ = emptyObj;
408     schemaObj_.ParseFromSchemaString(schema);
409 }
410 
GetDbProperties() const411 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
412 {
413     return properties_;
414 }
415 
GetSecurityOption(SecurityOption & option) const416 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
417 {
418     if (getSecurityOptionCallBack_) {
419         return getSecurityOptionCallBack_(option);
420     }
421     if (secOption_.securityLabel == NOT_SET) {
422         return -E_NOT_SUPPORT;
423     }
424     option = secOption_;
425     return E_OK;
426 }
427 
IsReadable() const428 bool VirtualSingleVerSyncDBInterface::IsReadable() const
429 {
430     return true;
431 }
432 
SetSecurityOption(SecurityOption & option)433 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
434 {
435     secOption_ = option;
436 }
437 
NotifyRemotePushFinished(const std::string & targetId) const438 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
439 {
440     std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
441     if (pushNotifier_) {
442         pushNotifier_(targetId);
443         LOGI("[VirtualSingleVerSyncDBInterface] Notify remote push finished");
444     }
445 }
446 
GetDatabaseCreateTimestamp(Timestamp & outTime) const447 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
448 {
449     outTime = dbCreateTime_;
450     return E_OK;
451 }
452 
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const453 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
454     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
455     std::vector<SingleVerKvEntry *> &entries) const
456 {
457     if (getDataDelayTime_ > 0) {
458         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
459     }
460     int errCode = DataControl();
461     if (errCode != E_OK) {
462         return errCode;
463     }
464     const auto &startKey = query.GetPrefixKey();
465     Key endKey = startKey;
466     endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
467 
468     std::vector<VirtualDataItem> dataItems;
469     for (const auto &data : dbData_) {
470         // Only get local data.
471         if (!data.isLocal) {
472             continue;
473         }
474 
475         if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
476             if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
477                 dataItems.push_back(data);
478             }
479         } else {
480             if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
481                 data.key >= startKey && data.key <= endKey) {
482                 dataItems.push_back(data);
483             }
484         }
485     }
486 
487     LOGD("dataItems size %zu", dataItems.size());
488     return GetEntriesFromItems(entries, dataItems);
489 }
490 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const491 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
492 {
493     size_t prefixKeySize = keyPrefix.size();
494     for (auto iter = metadata_.begin(); iter != metadata_.end();) {
495         if (prefixKeySize <= iter->first.size() &&
496             keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
497             iter = metadata_.erase(iter);
498         } else {
499             ++iter;
500         }
501     }
502     return E_OK;
503 }
504 
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const505 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
506 {
507     if (compressSync_) {
508         needCompressOnSync = true;
509         compressionRate = 100; // compress rate 100
510     }
511     return E_OK;
512 }
513 
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const514 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
515 {
516     if (compressSync_) {
517         DataCompression::GetCompressionAlgo(algorithmSet);
518     }
519     return E_OK;
520 }
521 
PutSyncData(const DataItem & item)522 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
523 {
524     return E_OK;
525 }
526 
CheckAndInitQueryCondition(QueryObject & query) const527 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
528 {
529     return E_OK;
530 }
531 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const532 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
533     const std::string &sourceID, const std::string &targetID, bool isPush) const
534 {
535     return E_OK;
536 }
537 
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)538 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
539     const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
540 {
541     std::function<void()> callback;
542     {
543         std::lock_guard<std::mutex> autoLock(saveDataMutex_);
544         callback = saveDataCallback_;
545     }
546     if (callback) {
547         callback();
548     }
549     std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
550     std::vector<VirtualDataItem> dataItems;
551     for (auto kvEntry : entries) {
552         auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
553         VirtualDataItem item;
554         genericKvEntry->GetKey(item.key);
555         genericKvEntry->GetValue(item.value);
556         item.timestamp = genericKvEntry->GetTimestamp();
557         item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
558         item.flag = genericKvEntry->GetFlag();
559         item.isLocal = false;
560         dataItems.push_back(item);
561     }
562     return PutSyncData(dataItems, deviceName);
563 }
564 
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)565 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
566     bool needCacheSubscribe)
567 {
568     return E_OK;
569 }
570 
RemoveSubscribe(const std::string & subscribeId)571 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
572 {
573     return E_OK;
574 }
575 
RemoveSubscribe(const std::vector<std::string> & subscribeIds)576 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
577 {
578     return E_OK;
579 }
580 
SetBusy(bool busy,bool readBusy)581 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
582 {
583     busy_ = busy;
584     readBusy_ = readBusy;
585 }
586 
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)587 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
588 {
589     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
590     deviceData_[deviceName][key] = value;
591 }
592 
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)593 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
594 {
595     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
596     value = deviceData_[deviceName][key];
597 }
598 
SetDbProperties(KvDBProperties & kvDBProperties)599 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
600 {
601     properties_ = kvDBProperties;
602 }
603 
DelayGetSyncData(uint32_t milliDelayTime)604 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
605 {
606     getDataDelayTime_ = milliDelayTime;
607 }
608 
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)609 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
610 {
611     countDown_ = whichTime;
612     expectedErrCode_ = errCode;
613     isGetDataControl_ = isGetDataControl;
614 }
615 
DataControl() const616 int VirtualSingleVerSyncDBInterface::DataControl() const
617 {
618     static int getDataTimes = 0;
619     if (countDown_ == -1) { // init -1
620         getDataTimes = 0;
621     }
622     if (isGetDataControl_ && countDown_ > 0) {
623         getDataTimes++;
624     }
625     if (isGetDataControl_ && countDown_ == getDataTimes) {
626         LOGD("virtual device get data failed = %d", expectedErrCode_);
627         getDataTimes = 0;
628         return expectedErrCode_;
629     }
630     return E_OK;
631 }
632 
ResetDataControl()633 void VirtualSingleVerSyncDBInterface::ResetDataControl()
634 {
635     countDown_ = -1;
636     expectedErrCode_ = E_OK;
637 }
638 
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const639 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp &currentWaterMark,
640     const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
641 {
642     bool isFinished = true;
643     for (const auto &data : dbData_) {
644         if (data.isLocal) {
645             if (dataItems.size() >= dataSizeInfo.packetSize) {
646                 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
647                 isFinished = false;
648                 break;
649             }
650             if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
651                 dataItems.push_back(data);
652                 currentWaterMark = data.writeTimestamp;
653             }
654         }
655     }
656     return isFinished;
657 }
658 
SetSaveDataCallback(const std::function<void ()> & callback)659 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
660 {
661     std::lock_guard<std::mutex> autoLock(saveDataMutex_);
662     saveDataCallback_ = callback;
663 }
664 
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)665 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
666 {
667     getSecurityOptionCallBack_ = callBack;
668 }
669 
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)670 void VirtualSingleVerSyncDBInterface::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
671 {
672     std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
673     pushNotifier_ = pushNotifier;
674 }
675 
SetCompressSync(bool compressSync)676 void VirtualSingleVerSyncDBInterface::SetCompressSync(bool compressSync)
677 {
678     compressSync_ = compressSync;
679 }
680 }  // namespace DistributedDB
681