1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_single_ver_sync_db_Interface.h"
17
18 #include <algorithm>
19 #include <thread>
20
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "intercepted_data_impl.h"
26 #include "log_print.h"
27 #include "platform_specific.h"
28 #include "query_object.h"
29 #include "securec.h"
30
31 namespace DistributedDB {
32 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)33 int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
34 {
35 int errCode = E_OK;
36 for (const auto &item : dataItems) {
37 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
38 if (entry == nullptr) {
39 LOGE("Create entry failed.");
40 errCode = -E_OUT_OF_MEMORY;
41 break;
42 }
43 DataItem storageItem;
44 storageItem.key = item.key;
45 storageItem.value = item.value;
46 storageItem.flag = item.flag;
47 storageItem.timestamp = item.timestamp;
48 storageItem.writeTimestamp = item.writeTimestamp;
49 entry->SetEntryData(std::move(storageItem));
50 entries.push_back(entry);
51 }
52 if (errCode != E_OK) {
53 for (auto &kvEntry : entries) {
54 delete kvEntry;
55 kvEntry = nullptr;
56 }
57 entries.clear();
58 }
59 return errCode;
60 }
61 }
62
VirtualSingleVerSyncDBInterface()63 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
64 {
65 (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
66 LOGD("virtual device init db createTime");
67 }
68
GetInterfaceType() const69 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
70 {
71 return SYNC_SVD;
72 }
73
IncRefCount()74 void VirtualSingleVerSyncDBInterface::IncRefCount()
75 {
76 }
77
DecRefCount()78 void VirtualSingleVerSyncDBInterface::DecRefCount()
79 {
80 }
81
SetIdentifier(std::vector<uint8_t> & identifier)82 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
83 {
84 identifier_ = std::move(identifier);
85 }
86
GetIdentifier() const87 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
88 {
89 return identifier_;
90 }
91
GetMetaData(const Key & key,Value & value) const92 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
93 {
94 if (readBusy_) {
95 return -E_BUSY;
96 }
97 auto iter = metadata_.find(key);
98 if (iter != metadata_.end()) {
99 value = iter->second;
100 return E_OK;
101 }
102 return -E_NOT_FOUND;
103 }
104
PutMetaData(const Key & key,const Value & value,bool isInTransaction)105 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
106 {
107 (void)isInTransaction;
108 if (busy_) {
109 return -E_BUSY;
110 }
111 metadata_[key] = value;
112 return E_OK;
113 }
114
DeleteMetaData(const std::vector<Key> & keys)115 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
116 {
117 for (const auto &key : keys) {
118 (void)metadata_.erase(key);
119 }
120 return E_OK;
121 }
122
GetAllMetaKeys(std::vector<Key> & keys) const123 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
124 {
125 for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
126 keys.push_back(iter->first);
127 }
128 LOGD("GetAllMetaKeys size %zu", keys.size());
129 return E_OK;
130 }
131
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const132 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
133 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
134 {
135 return -E_NOT_SUPPORT;
136 }
137
GetUnSyncTotal(Timestamp begin,Timestamp end,uint32_t & total) const138 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(Timestamp begin, Timestamp end, uint32_t &total) const
139 {
140 total = 0;
141 for (const auto &data : dbData_) {
142 if (data.isLocal) {
143 if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
144 total++;
145 }
146 }
147 }
148 return E_OK;
149 }
150
GetUnSyncTotal(QueryObject & query,const SyncTimeRange & timeRange,uint32_t & total) const151 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(QueryObject &query, const SyncTimeRange &timeRange,
152 uint32_t &total) const
153 {
154 if (getDataDelayTime_ > 0) {
155 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
156 }
157 int errCode = DataControl();
158 if (errCode != E_OK) {
159 return errCode;
160 }
161
162 total = 0;
163 const auto &startKey = query.GetPrefixKey();
164 Key endKey = startKey;
165 endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
166
167 for (const auto &data : dbData_) {
168 // Only get local data.
169 if (!data.isLocal) {
170 continue;
171 }
172
173 if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
174 if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
175 total++;
176 }
177 } else {
178 if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
179 data.key >= startKey && data.key <= endKey) {
180 total++;
181 }
182 }
183 }
184
185 LOGD("GetUnSyncTotal %u", total);
186 return E_OK;
187 }
188
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const189 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
190 const DataSizeSpecInfo &dataSizeInfo) const
191 {
192 return -E_NOT_SUPPORT;
193 }
194
ReleaseContinueToken(ContinueToken & continueStmtToken) const195 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
196 {
197 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
198 if (token != nullptr) {
199 delete token;
200 continueStmtToken = nullptr;
201 }
202 return;
203 }
204
GetSchemaInfo() const205 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
206 {
207 return schemaObj_;
208 }
209
CheckCompatible(const std::string & schema,uint8_t type) const210 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
211 {
212 if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
213 return true;
214 }
215 return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
216 }
217
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)218 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
219 {
220 VirtualDataItem item;
221 item.key = key;
222 item.value = value;
223 item.timestamp = time;
224 item.writeTimestamp = time;
225 item.flag = static_cast<uint64_t>(flag);
226 item.isLocal = true;
227 dbData_.push_back(item);
228 return E_OK;
229 }
230
GetMaxTimestamp(Timestamp & stamp) const231 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
232 {
233 for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
234 if (stamp < iter->writeTimestamp) {
235 stamp = iter->writeTimestamp;
236 }
237 }
238 LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
239 }
240
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)241 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
242 {
243 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
244 deviceData_.erase(deviceName);
245 uint32_t devId = 0;
246 if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
247 devId = deviceMapping_[deviceName];
248 }
249 for (auto &item : dbData_) {
250 if (item.deviceId == devId && devId > 0) {
251 item.flag = VirtualDataItem::DELETE_FLAG;
252 }
253 }
254 LOGD("RemoveDeviceData FINISH");
255 return E_OK;
256 }
257
GetSyncData(const Key & key,VirtualDataItem & dataItem)258 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
259 {
260 auto iter = std::find_if(dbData_.begin(), dbData_.end(),
261 [key](const VirtualDataItem& item) { return item.key == key; });
262 if (iter != dbData_.end()) {
263 if (iter->flag == VirtualDataItem::DELETE_FLAG) {
264 return -E_NOT_FOUND;
265 }
266 dataItem.key = iter->key;
267 dataItem.value = iter->value;
268 dataItem.timestamp = iter->timestamp;
269 dataItem.writeTimestamp = iter->writeTimestamp;
270 dataItem.flag = iter->flag;
271 dataItem.isLocal = iter->isLocal;
272 return E_OK;
273 }
274 return -E_NOT_FOUND;
275 }
276
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const277 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
278 std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
279 const DataSizeSpecInfo &dataSizeInfo) const
280 {
281 std::vector<VirtualDataItem> dataItems;
282 int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
283 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
284 LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
285 return errCode;
286 }
287 int innerCode = GetEntriesFromItems(entries, dataItems);
288 if (innerCode != E_OK) {
289 return innerCode;
290 }
291 return errCode;
292 }
293
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const294 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
295 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
296 {
297 if (continueStmtToken == nullptr) {
298 return -E_INVALID_ARGS;
299 }
300 int errCode = DataControl();
301 if (errCode != E_OK) {
302 return errCode;
303 }
304 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
305 Timestamp currentWaterMark = 0;
306 std::vector<VirtualDataItem> dataItems;
307 bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
308 if (isFinished) {
309 delete token;
310 continueStmtToken = nullptr;
311 } else {
312 currentWaterMark++;
313 token->begin = currentWaterMark;
314 }
315 int innerCode = GetEntriesFromItems(entries, dataItems);
316 if (innerCode != E_OK) {
317 return innerCode;
318 }
319 return isFinished ? E_OK : -E_UNFINISHED;
320 }
321
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const322 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
323 std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
324 {
325 if (getDataDelayTime_ > 0) {
326 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
327 }
328 int errCode = DataControl();
329 if (errCode != E_OK) {
330 return errCode;
331 }
332 Timestamp currentWaterMark = 0;
333 bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
334 if (!isFinished) {
335 VirtualContinueToken *token = new(std::nothrow) VirtualContinueToken();
336 if (token == nullptr) {
337 LOGD("virtual alloc token failed");
338 dataItems.clear();
339 return -E_OUT_OF_MEMORY;
340 }
341 currentWaterMark++;
342 token->begin = currentWaterMark;
343 token->end = end;
344 continueStmtToken = static_cast<VirtualContinueToken *>(token);
345 }
346 LOGD("dataItems size %zu", dataItems.size());
347 return isFinished ? E_OK : -E_UNFINISHED;
348 }
349
SetSaveDataDelayTime(uint64_t milliDelayTime)350 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
351 {
352 saveDataDelayTime_ = milliDelayTime;
353 }
354
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const355 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
356 uint32_t blockSize, ContinueToken& continueStmtToken) const
357 {
358 if (continueStmtToken == nullptr) {
359 return -E_NOT_SUPPORT;
360 }
361 return 0;
362 }
363
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)364 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
365 const std::string &deviceName)
366 {
367 if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
368 availableDeviceId_++;
369 deviceMapping_[deviceName] = availableDeviceId_;
370 LOGD("put deviceName=%s into device map", deviceName.c_str());
371 }
372 for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
373 LOGD("PutSyncData");
374 auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
375 [iter](VirtualDataItem item) { return item.key == iter->key; });
376 if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
377 // if has conflict, compare writeTimestamp
378 LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
379 iter->writeTimestamp);
380 dbDataIter->key = iter->key;
381 dbDataIter->value = iter->value;
382 dbDataIter->timestamp = iter->timestamp;
383 dbDataIter->writeTimestamp = iter->writeTimestamp;
384 dbDataIter->flag = iter->flag;
385 dbDataIter->isLocal = false;
386 dbDataIter->deviceId = deviceMapping_[deviceName];
387 } else {
388 LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
389 VirtualDataItem dataItem;
390 dataItem.key = iter->key;
391 dataItem.value = iter->value;
392 dataItem.timestamp = iter->timestamp;
393 dataItem.writeTimestamp = iter->writeTimestamp;
394 dataItem.flag = iter->flag;
395 dataItem.isLocal = false;
396 dataItem.deviceId = deviceMapping_[deviceName];
397 dbData_.push_back(dataItem);
398 }
399 }
400 return E_OK;
401 }
402
SetSchemaInfo(const std::string & schema)403 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
404 {
405 schema_ = schema;
406 SchemaObject emptyObj;
407 schemaObj_ = emptyObj;
408 schemaObj_.ParseFromSchemaString(schema);
409 }
410
GetDbProperties() const411 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
412 {
413 return properties_;
414 }
415
GetSecurityOption(SecurityOption & option) const416 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
417 {
418 if (getSecurityOptionCallBack_) {
419 return getSecurityOptionCallBack_(option);
420 }
421 if (secOption_.securityLabel == NOT_SET) {
422 return -E_NOT_SUPPORT;
423 }
424 option = secOption_;
425 return E_OK;
426 }
427
IsReadable() const428 bool VirtualSingleVerSyncDBInterface::IsReadable() const
429 {
430 return true;
431 }
432
SetSecurityOption(SecurityOption & option)433 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
434 {
435 secOption_ = option;
436 }
437
NotifyRemotePushFinished(const std::string & targetId) const438 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
439 {
440 std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
441 if (pushNotifier_) {
442 pushNotifier_(targetId);
443 LOGI("[VirtualSingleVerSyncDBInterface] Notify remote push finished");
444 }
445 }
446
GetDatabaseCreateTimestamp(Timestamp & outTime) const447 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
448 {
449 outTime = dbCreateTime_;
450 return E_OK;
451 }
452
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const453 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
454 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
455 std::vector<SingleVerKvEntry *> &entries) const
456 {
457 if (getDataDelayTime_ > 0) {
458 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
459 }
460 int errCode = DataControl();
461 if (errCode != E_OK) {
462 return errCode;
463 }
464 const auto &startKey = query.GetPrefixKey();
465 Key endKey = startKey;
466 endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
467
468 std::vector<VirtualDataItem> dataItems;
469 for (const auto &data : dbData_) {
470 // Only get local data.
471 if (!data.isLocal) {
472 continue;
473 }
474
475 if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
476 if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
477 dataItems.push_back(data);
478 }
479 } else {
480 if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
481 data.key >= startKey && data.key <= endKey) {
482 dataItems.push_back(data);
483 }
484 }
485 }
486
487 LOGD("dataItems size %zu", dataItems.size());
488 return GetEntriesFromItems(entries, dataItems);
489 }
490
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const491 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
492 {
493 size_t prefixKeySize = keyPrefix.size();
494 for (auto iter = metadata_.begin(); iter != metadata_.end();) {
495 if (prefixKeySize <= iter->first.size() &&
496 keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
497 iter = metadata_.erase(iter);
498 } else {
499 ++iter;
500 }
501 }
502 return E_OK;
503 }
504
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const505 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
506 {
507 if (compressSync_) {
508 needCompressOnSync = true;
509 compressionRate = 100; // compress rate 100
510 }
511 return E_OK;
512 }
513
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const514 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
515 {
516 if (compressSync_) {
517 DataCompression::GetCompressionAlgo(algorithmSet);
518 }
519 return E_OK;
520 }
521
PutSyncData(const DataItem & item)522 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
523 {
524 return E_OK;
525 }
526
CheckAndInitQueryCondition(QueryObject & query) const527 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
528 {
529 return E_OK;
530 }
531
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const532 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
533 const std::string &sourceID, const std::string &targetID, bool isPush) const
534 {
535 return E_OK;
536 }
537
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)538 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
539 const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
540 {
541 std::function<void()> callback;
542 {
543 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
544 callback = saveDataCallback_;
545 }
546 if (callback) {
547 callback();
548 }
549 std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
550 std::vector<VirtualDataItem> dataItems;
551 for (auto kvEntry : entries) {
552 auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
553 VirtualDataItem item;
554 genericKvEntry->GetKey(item.key);
555 genericKvEntry->GetValue(item.value);
556 item.timestamp = genericKvEntry->GetTimestamp();
557 item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
558 item.flag = genericKvEntry->GetFlag();
559 item.isLocal = false;
560 dataItems.push_back(item);
561 }
562 return PutSyncData(dataItems, deviceName);
563 }
564
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)565 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
566 bool needCacheSubscribe)
567 {
568 return E_OK;
569 }
570
RemoveSubscribe(const std::string & subscribeId)571 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
572 {
573 return E_OK;
574 }
575
RemoveSubscribe(const std::vector<std::string> & subscribeIds)576 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
577 {
578 return E_OK;
579 }
580
SetBusy(bool busy,bool readBusy)581 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
582 {
583 busy_ = busy;
584 readBusy_ = readBusy;
585 }
586
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)587 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
588 {
589 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
590 deviceData_[deviceName][key] = value;
591 }
592
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)593 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
594 {
595 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
596 value = deviceData_[deviceName][key];
597 }
598
SetDbProperties(KvDBProperties & kvDBProperties)599 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
600 {
601 properties_ = kvDBProperties;
602 }
603
DelayGetSyncData(uint32_t milliDelayTime)604 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
605 {
606 getDataDelayTime_ = milliDelayTime;
607 }
608
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)609 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
610 {
611 countDown_ = whichTime;
612 expectedErrCode_ = errCode;
613 isGetDataControl_ = isGetDataControl;
614 }
615
DataControl() const616 int VirtualSingleVerSyncDBInterface::DataControl() const
617 {
618 static int getDataTimes = 0;
619 if (countDown_ == -1) { // init -1
620 getDataTimes = 0;
621 }
622 if (isGetDataControl_ && countDown_ > 0) {
623 getDataTimes++;
624 }
625 if (isGetDataControl_ && countDown_ == getDataTimes) {
626 LOGD("virtual device get data failed = %d", expectedErrCode_);
627 getDataTimes = 0;
628 return expectedErrCode_;
629 }
630 return E_OK;
631 }
632
ResetDataControl()633 void VirtualSingleVerSyncDBInterface::ResetDataControl()
634 {
635 countDown_ = -1;
636 expectedErrCode_ = E_OK;
637 }
638
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const639 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp ¤tWaterMark,
640 const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
641 {
642 bool isFinished = true;
643 for (const auto &data : dbData_) {
644 if (data.isLocal) {
645 if (dataItems.size() >= dataSizeInfo.packetSize) {
646 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
647 isFinished = false;
648 break;
649 }
650 if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
651 dataItems.push_back(data);
652 currentWaterMark = data.writeTimestamp;
653 }
654 }
655 }
656 return isFinished;
657 }
658
SetSaveDataCallback(const std::function<void ()> & callback)659 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
660 {
661 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
662 saveDataCallback_ = callback;
663 }
664
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)665 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
666 {
667 getSecurityOptionCallBack_ = callBack;
668 }
669
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)670 void VirtualSingleVerSyncDBInterface::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
671 {
672 std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
673 pushNotifier_ = pushNotifier;
674 }
675
SetCompressSync(bool compressSync)676 void VirtualSingleVerSyncDBInterface::SetCompressSync(bool compressSync)
677 {
678 compressSync_ = compressSync;
679 }
680 } // namespace DistributedDB
681