1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_single_ver_sync_db_Interface.h"
17
18 #include <algorithm>
19 #include <thread>
20
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "intercepted_data_impl.h"
26 #include "log_print.h"
27 #include "platform_specific.h"
28 #include "query_object.h"
29 #include "securec.h"
30
31 namespace DistributedDB {
32 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)33 int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
34 {
35 int errCode = E_OK;
36 for (const auto &item : dataItems) {
37 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
38 if (entry == nullptr) {
39 LOGE("Create entry failed.");
40 errCode = -E_OUT_OF_MEMORY;
41 break;
42 }
43 DataItem storageItem;
44 storageItem.key = item.key;
45 storageItem.value = item.value;
46 storageItem.flag = item.flag;
47 storageItem.timestamp = item.timestamp;
48 storageItem.writeTimestamp = item.writeTimestamp;
49 entry->SetEntryData(std::move(storageItem));
50 entries.push_back(entry);
51 }
52 if (errCode != E_OK) {
53 for (auto &kvEntry : entries) {
54 delete kvEntry;
55 kvEntry = nullptr;
56 }
57 entries.clear();
58 }
59 return errCode;
60 }
61 }
62
VirtualSingleVerSyncDBInterface()63 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
64 {
65 (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
66 LOGD("virtual device init db createTime");
67 }
68
GetInterfaceType() const69 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
70 {
71 return SYNC_SVD;
72 }
73
IncRefCount()74 void VirtualSingleVerSyncDBInterface::IncRefCount()
75 {
76 }
77
DecRefCount()78 void VirtualSingleVerSyncDBInterface::DecRefCount()
79 {
80 }
81
SetIdentifier(std::vector<uint8_t> & identifier)82 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
83 {
84 identifier_ = std::move(identifier);
85 }
86
GetIdentifier() const87 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
88 {
89 return identifier_;
90 }
91
GetMetaData(const Key & key,Value & value) const92 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
93 {
94 if (readBusy_) {
95 return -E_BUSY;
96 }
97 auto iter = metadata_.find(key);
98 if (iter != metadata_.end()) {
99 value = iter->second;
100 return E_OK;
101 }
102 return -E_NOT_FOUND;
103 }
104
GetMetaDataByPrefixKey(const Key & keyPrefix,std::map<Key,Value> & data) const105 int VirtualSingleVerSyncDBInterface::GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const
106 {
107 if (readBusy_) {
108 return -E_BUSY;
109 }
110 for (const auto &metadata : metadata_) {
111 if (metadata.first.size() < keyPrefix.size()) {
112 continue;
113 }
114 if (std::equal(keyPrefix.begin(), keyPrefix.end(), metadata.first.begin())) {
115 data[metadata.first] = metadata.second;
116 }
117 }
118 return data.empty() ? -E_NOT_FOUND : E_OK;
119 }
120
PutMetaData(const Key & key,const Value & value,bool isInTransaction)121 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
122 {
123 (void)isInTransaction;
124 if (busy_) {
125 return -E_BUSY;
126 }
127 metadata_[key] = value;
128 return E_OK;
129 }
130
DeleteMetaData(const std::vector<Key> & keys)131 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
132 {
133 for (const auto &key : keys) {
134 (void)metadata_.erase(key);
135 }
136 return E_OK;
137 }
138
GetAllMetaKeys(std::vector<Key> & keys) const139 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
140 {
141 for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
142 keys.push_back(iter->first);
143 }
144 LOGD("GetAllMetaKeys size %zu", keys.size());
145 return E_OK;
146 }
147
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const148 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
149 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
150 {
151 return -E_NOT_SUPPORT;
152 }
153
GetUnSyncTotal(Timestamp begin,Timestamp end,uint32_t & total) const154 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(Timestamp begin, Timestamp end, uint32_t &total) const
155 {
156 total = 0;
157 for (const auto &data : dbData_) {
158 if (data.isLocal) {
159 if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
160 total++;
161 }
162 }
163 }
164 return E_OK;
165 }
166
GetUnSyncTotal(QueryObject & query,const SyncTimeRange & timeRange,uint32_t & total) const167 int VirtualSingleVerSyncDBInterface::GetUnSyncTotal(QueryObject &query, const SyncTimeRange &timeRange,
168 uint32_t &total) const
169 {
170 if (getDataDelayTime_ > 0) {
171 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
172 }
173 int errCode = DataControl();
174 if (errCode != E_OK) {
175 return errCode;
176 }
177
178 total = 0;
179 const auto &startKey = query.GetPrefixKey();
180 Key endKey = startKey;
181 endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
182
183 for (const auto &data : dbData_) {
184 // Only get local data.
185 if (!data.isLocal) {
186 continue;
187 }
188
189 if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
190 if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
191 total++;
192 }
193 } else {
194 if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
195 data.key >= startKey && data.key <= endKey) {
196 total++;
197 }
198 }
199 }
200
201 LOGD("GetUnSyncTotal %u", total);
202 return E_OK;
203 }
204
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const205 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
206 const DataSizeSpecInfo &dataSizeInfo) const
207 {
208 return -E_NOT_SUPPORT;
209 }
210
ReleaseContinueToken(ContinueToken & continueStmtToken) const211 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
212 {
213 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
214 if (token != nullptr) {
215 delete token;
216 continueStmtToken = nullptr;
217 }
218 return;
219 }
220
GetSchemaInfo() const221 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
222 {
223 return schemaObj_;
224 }
225
CheckCompatible(const std::string & schema,uint8_t type) const226 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
227 {
228 if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
229 return true;
230 }
231 return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
232 }
233
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)234 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
235 {
236 VirtualDataItem item;
237 item.key = key;
238 item.value = value;
239 item.timestamp = time;
240 item.writeTimestamp = time;
241 item.flag = static_cast<uint64_t>(flag);
242 item.isLocal = true;
243 dbData_.push_back(item);
244 return E_OK;
245 }
246
GetMaxTimestamp(Timestamp & stamp) const247 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
248 {
249 for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
250 if (stamp < iter->writeTimestamp) {
251 stamp = iter->writeTimestamp;
252 }
253 }
254 LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
255 }
256
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)257 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
258 {
259 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
260 deviceData_.erase(deviceName);
261 uint32_t devId = 0;
262 if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
263 devId = deviceMapping_[deviceName];
264 }
265 for (auto &item : dbData_) {
266 if (item.deviceId == devId && devId > 0) {
267 item.flag = VirtualDataItem::DELETE_FLAG;
268 }
269 }
270 LOGD("RemoveDeviceData FINISH");
271 return E_OK;
272 }
273
GetSyncData(const Key & key,VirtualDataItem & dataItem)274 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
275 {
276 auto iter = std::find_if(dbData_.begin(), dbData_.end(),
277 [key](const VirtualDataItem& item) { return item.key == key; });
278 if (iter != dbData_.end()) {
279 if (iter->flag == VirtualDataItem::DELETE_FLAG) {
280 return -E_NOT_FOUND;
281 }
282 dataItem.key = iter->key;
283 dataItem.value = iter->value;
284 dataItem.timestamp = iter->timestamp;
285 dataItem.writeTimestamp = iter->writeTimestamp;
286 dataItem.flag = iter->flag;
287 dataItem.isLocal = iter->isLocal;
288 return E_OK;
289 }
290 return -E_NOT_FOUND;
291 }
292
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const293 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
294 std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
295 const DataSizeSpecInfo &dataSizeInfo) const
296 {
297 std::vector<VirtualDataItem> dataItems;
298 int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
299 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
300 LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
301 return errCode;
302 }
303 int innerCode = GetEntriesFromItems(entries, dataItems);
304 if (innerCode != E_OK) {
305 return innerCode;
306 }
307 return errCode;
308 }
309
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const310 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
311 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
312 {
313 if (continueStmtToken == nullptr) {
314 return -E_INVALID_ARGS;
315 }
316 int errCode = DataControl();
317 if (errCode != E_OK) {
318 return errCode;
319 }
320 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
321 Timestamp currentWaterMark = 0;
322 std::vector<VirtualDataItem> dataItems;
323 bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
324 if (isFinished) {
325 delete token;
326 continueStmtToken = nullptr;
327 } else {
328 currentWaterMark++;
329 token->begin = currentWaterMark;
330 }
331 int innerCode = GetEntriesFromItems(entries, dataItems);
332 if (innerCode != E_OK) {
333 return innerCode;
334 }
335 return isFinished ? E_OK : -E_UNFINISHED;
336 }
337
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const338 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
339 std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
340 {
341 if (getDataDelayTime_ > 0) {
342 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
343 }
344 int errCode = DataControl();
345 if (errCode != E_OK) {
346 return errCode;
347 }
348 Timestamp currentWaterMark = 0;
349 bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
350 if (!isFinished) {
351 VirtualContinueToken *token = new(std::nothrow) VirtualContinueToken();
352 if (token == nullptr) {
353 LOGD("virtual alloc token failed");
354 dataItems.clear();
355 return -E_OUT_OF_MEMORY;
356 }
357 currentWaterMark++;
358 token->begin = currentWaterMark;
359 token->end = end;
360 continueStmtToken = static_cast<VirtualContinueToken *>(token);
361 }
362 LOGD("dataItems size %zu", dataItems.size());
363 return isFinished ? E_OK : -E_UNFINISHED;
364 }
365
SetSaveDataDelayTime(uint64_t milliDelayTime)366 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
367 {
368 saveDataDelayTime_ = milliDelayTime;
369 }
370
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const371 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
372 uint32_t blockSize, ContinueToken& continueStmtToken) const
373 {
374 if (continueStmtToken == nullptr) {
375 return -E_NOT_SUPPORT;
376 }
377 return 0;
378 }
379
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)380 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
381 const std::string &deviceName)
382 {
383 if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
384 availableDeviceId_++;
385 deviceMapping_[deviceName] = availableDeviceId_;
386 LOGD("put deviceName=%s into device map", deviceName.c_str());
387 }
388 for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
389 LOGD("PutSyncData");
390 auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
391 [iter](VirtualDataItem item) { return item.key == iter->key; });
392 if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
393 // if has conflict, compare writeTimestamp
394 LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
395 iter->writeTimestamp);
396 dbDataIter->key = iter->key;
397 dbDataIter->value = iter->value;
398 dbDataIter->timestamp = iter->timestamp;
399 dbDataIter->writeTimestamp = iter->writeTimestamp;
400 dbDataIter->flag = iter->flag;
401 dbDataIter->isLocal = false;
402 dbDataIter->deviceId = deviceMapping_[deviceName];
403 } else {
404 LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
405 VirtualDataItem dataItem;
406 dataItem.key = iter->key;
407 dataItem.value = iter->value;
408 dataItem.timestamp = iter->timestamp;
409 dataItem.writeTimestamp = iter->writeTimestamp;
410 dataItem.flag = iter->flag;
411 dataItem.isLocal = false;
412 dataItem.deviceId = deviceMapping_[deviceName];
413 dbData_.push_back(dataItem);
414 }
415 }
416 return E_OK;
417 }
418
SetSchemaInfo(const std::string & schema)419 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
420 {
421 schema_ = schema;
422 SchemaObject emptyObj;
423 schemaObj_ = emptyObj;
424 schemaObj_.ParseFromSchemaString(schema);
425 }
426
GetDbProperties() const427 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
428 {
429 return properties_;
430 }
431
GetSecurityOption(SecurityOption & option) const432 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
433 {
434 if (getSecurityOptionCallBack_) {
435 return getSecurityOptionCallBack_(option);
436 }
437 if (secOption_.securityLabel == NOT_SET) {
438 return -E_NOT_SUPPORT;
439 }
440 option = secOption_;
441 return E_OK;
442 }
443
IsReadable() const444 bool VirtualSingleVerSyncDBInterface::IsReadable() const
445 {
446 return true;
447 }
448
SetSecurityOption(SecurityOption & option)449 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
450 {
451 secOption_ = option;
452 }
453
NotifyRemotePushFinished(const std::string & targetId) const454 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
455 {
456 std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
457 if (pushNotifier_) {
458 pushNotifier_(targetId);
459 LOGI("[VirtualSingleVerSyncDBInterface] Notify remote push finished");
460 }
461 }
462
GetDatabaseCreateTimestamp(Timestamp & outTime) const463 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
464 {
465 outTime = dbCreateTime_;
466 return E_OK;
467 }
468
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const469 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
470 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
471 std::vector<SingleVerKvEntry *> &entries) const
472 {
473 if (getDataDelayTime_ > 0) {
474 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
475 }
476 int errCode = DataControl();
477 if (errCode != E_OK) {
478 return errCode;
479 }
480 const auto &startKey = query.GetPrefixKey();
481 Key endKey = startKey;
482 endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
483
484 std::vector<VirtualDataItem> dataItems;
485 for (const auto &data : dbData_) {
486 // Only get local data.
487 if (!data.isLocal) {
488 continue;
489 }
490
491 if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
492 if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
493 dataItems.push_back(data);
494 }
495 } else {
496 if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
497 data.key >= startKey && data.key <= endKey) {
498 dataItems.push_back(data);
499 }
500 }
501 }
502
503 LOGD("dataItems size %zu", dataItems.size());
504 return GetEntriesFromItems(entries, dataItems);
505 }
506
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const507 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
508 {
509 size_t prefixKeySize = keyPrefix.size();
510 for (auto iter = metadata_.begin(); iter != metadata_.end();) {
511 if (prefixKeySize <= iter->first.size() &&
512 keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
513 iter = metadata_.erase(iter);
514 } else {
515 ++iter;
516 }
517 }
518 return E_OK;
519 }
520
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const521 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
522 {
523 if (compressSync_) {
524 needCompressOnSync = true;
525 compressionRate = 100; // compress rate 100
526 }
527 return E_OK;
528 }
529
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const530 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
531 {
532 if (compressSync_) {
533 DataCompression::GetCompressionAlgo(algorithmSet);
534 }
535 return E_OK;
536 }
537
PutSyncData(const DataItem & item)538 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
539 {
540 return E_OK;
541 }
542
CheckAndInitQueryCondition(QueryObject & query) const543 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
544 {
545 return E_OK;
546 }
547
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const548 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
549 const std::string &sourceID, const std::string &targetID, bool isPush) const
550 {
551 return E_OK;
552 }
553
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)554 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
555 const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
556 {
557 std::function<void()> callback;
558 {
559 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
560 callback = saveDataCallback_;
561 }
562 if (callback) {
563 callback();
564 }
565 std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
566 std::vector<VirtualDataItem> dataItems;
567 for (auto kvEntry : entries) {
568 auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
569 VirtualDataItem item;
570 genericKvEntry->GetKey(item.key);
571 genericKvEntry->GetValue(item.value);
572 item.timestamp = genericKvEntry->GetTimestamp();
573 item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
574 item.flag = genericKvEntry->GetFlag();
575 item.isLocal = false;
576 dataItems.push_back(item);
577 }
578 return PutSyncData(dataItems, deviceName);
579 }
580
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)581 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
582 bool needCacheSubscribe)
583 {
584 return E_OK;
585 }
586
RemoveSubscribe(const std::string & subscribeId)587 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
588 {
589 return E_OK;
590 }
591
RemoveSubscribe(const std::vector<std::string> & subscribeIds)592 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
593 {
594 return E_OK;
595 }
596
SetBusy(bool busy,bool readBusy)597 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
598 {
599 busy_ = busy;
600 readBusy_ = readBusy;
601 }
602
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)603 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
604 {
605 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
606 deviceData_[deviceName][key] = value;
607 }
608
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)609 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
610 {
611 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
612 value = deviceData_[deviceName][key];
613 }
614
SetDbProperties(KvDBProperties & kvDBProperties)615 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
616 {
617 properties_ = kvDBProperties;
618 }
619
DelayGetSyncData(uint32_t milliDelayTime)620 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
621 {
622 getDataDelayTime_ = milliDelayTime;
623 }
624
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)625 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
626 {
627 countDown_ = whichTime;
628 expectedErrCode_ = errCode;
629 isGetDataControl_ = isGetDataControl;
630 }
631
DataControl() const632 int VirtualSingleVerSyncDBInterface::DataControl() const
633 {
634 static int getDataTimes = 0;
635 if (countDown_ == -1) { // init -1
636 getDataTimes = 0;
637 }
638 if (isGetDataControl_ && countDown_ > 0) {
639 getDataTimes++;
640 }
641 if (isGetDataControl_ && countDown_ == getDataTimes) {
642 LOGD("virtual device get data failed = %d", expectedErrCode_);
643 getDataTimes = 0;
644 return expectedErrCode_;
645 }
646 return E_OK;
647 }
648
ResetDataControl()649 void VirtualSingleVerSyncDBInterface::ResetDataControl()
650 {
651 countDown_ = -1;
652 expectedErrCode_ = E_OK;
653 }
654
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const655 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp ¤tWaterMark,
656 const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
657 {
658 bool isFinished = true;
659 for (const auto &data : dbData_) {
660 if (data.isLocal) {
661 if (dataItems.size() >= dataSizeInfo.packetSize) {
662 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
663 isFinished = false;
664 break;
665 }
666 if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
667 dataItems.push_back(data);
668 currentWaterMark = data.writeTimestamp;
669 }
670 }
671 }
672 return isFinished;
673 }
674
SetSaveDataCallback(const std::function<void ()> & callback)675 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
676 {
677 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
678 saveDataCallback_ = callback;
679 }
680
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)681 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
682 {
683 getSecurityOptionCallBack_ = callBack;
684 }
685
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)686 void VirtualSingleVerSyncDBInterface::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
687 {
688 std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
689 pushNotifier_ = pushNotifier;
690 }
691
SetCompressSync(bool compressSync)692 void VirtualSingleVerSyncDBInterface::SetCompressSync(bool compressSync)
693 {
694 compressSync_ = compressSync;
695 }
696 } // namespace DistributedDB
697