• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_single_ver_sync_db_Interface.h"
17 
18 #include <algorithm>
19 #include <thread>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "intercepted_data_impl.h"
25 #include "log_print.h"
26 #include "platform_specific.h"
27 #include "query_object.h"
28 #include "securec.h"
29 
30 namespace DistributedDB {
31 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)32     int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
33     {
34         int errCode = E_OK;
35         for (const auto &item : dataItems) {
36             auto entry = new (std::nothrow) GenericSingleVerKvEntry();
37             if (entry == nullptr) {
38                 LOGE("Create entry failed.");
39                 errCode = -E_OUT_OF_MEMORY;
40                 break;
41             }
42             DataItem storageItem;
43             storageItem.key = item.key;
44             storageItem.value = item.value;
45             storageItem.flag = item.flag;
46             storageItem.timestamp = item.timestamp;
47             storageItem.writeTimestamp = item.writeTimestamp;
48             entry->SetEntryData(std::move(storageItem));
49             entries.push_back(entry);
50         }
51         if (errCode != E_OK) {
52             for (auto &kvEntry : entries) {
53                 delete kvEntry;
54                 kvEntry = nullptr;
55             }
56             entries.clear();
57         }
58         return errCode;
59     }
60 }
61 
VirtualSingleVerSyncDBInterface()62 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
63 {
64     (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
65     LOGD("virtual device init db createTime");
66 }
67 
GetInterfaceType() const68 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
69 {
70     return SYNC_SVD;
71 }
72 
IncRefCount()73 void VirtualSingleVerSyncDBInterface::IncRefCount()
74 {
75 }
76 
DecRefCount()77 void VirtualSingleVerSyncDBInterface::DecRefCount()
78 {
79 }
80 
SetIdentifier(std::vector<uint8_t> & identifier)81 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
82 {
83     identifier_ = std::move(identifier);
84 }
85 
GetIdentifier() const86 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
87 {
88     return identifier_;
89 }
90 
GetMetaData(const Key & key,Value & value) const91 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
92 {
93     auto iter = metadata_.find(key);
94     if (iter != metadata_.end()) {
95         value = iter->second;
96         return E_OK;
97     }
98     return -E_NOT_FOUND;
99 }
100 
PutMetaData(const Key & key,const Value & value)101 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value)
102 {
103     if (busy_) {
104         return -E_BUSY;
105     }
106     metadata_[key] = value;
107     return E_OK;
108 }
109 
DeleteMetaData(const std::vector<Key> & keys)110 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
111 {
112     for (const auto &key : keys) {
113         (void)metadata_.erase(key);
114     }
115     return E_OK;
116 }
117 
GetAllMetaKeys(std::vector<Key> & keys) const118 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
119 {
120     for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
121         keys.push_back(iter->first);
122     }
123     LOGD("GetAllMetaKeys size %zu", keys.size());
124     return E_OK;
125 }
126 
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const127 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
128     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
129 {
130     return -E_NOT_SUPPORT;
131 }
132 
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const133 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
134     const DataSizeSpecInfo &dataSizeInfo) const
135 {
136     return -E_NOT_SUPPORT;
137 }
138 
ReleaseContinueToken(ContinueToken & continueStmtToken) const139 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
140 {
141     return;
142 }
143 
GetSchemaInfo() const144 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
145 {
146     return schemaObj_;
147 }
148 
CheckCompatible(const std::string & schema,uint8_t type) const149 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
150 {
151     if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
152         return true;
153     }
154     return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
155 }
156 
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)157 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
158 {
159     VirtualDataItem item;
160     item.key = key;
161     item.value = value;
162     item.timestamp = time;
163     item.writeTimestamp = time;
164     item.flag = flag;
165     item.isLocal = true;
166     dbData_.push_back(item);
167     return E_OK;
168 }
169 
GetMaxTimestamp(Timestamp & stamp) const170 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
171 {
172     for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
173         if (stamp < iter->writeTimestamp) {
174             stamp = iter->writeTimestamp;
175         }
176     }
177     LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
178 }
179 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)180 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
181 {
182     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
183     deviceData_.erase(deviceName);
184     uint32_t devId = 0;
185     if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
186         devId = deviceMapping_[deviceName];
187     }
188     for (auto &item : dbData_) {
189         if (item.deviceId == devId && devId > 0) {
190             item.flag = VirtualDataItem::DELETE_FLAG;
191         }
192     }
193     LOGD("RemoveDeviceData FINISH");
194     return E_OK;
195 }
196 
GetSyncData(const Key & key,VirtualDataItem & dataItem)197 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
198 {
199     auto iter = std::find_if(dbData_.begin(), dbData_.end(),
200         [key](const VirtualDataItem& item) { return item.key == key; });
201     if (iter != dbData_.end()) {
202         if (iter->flag == VirtualDataItem::DELETE_FLAG) {
203             return -E_NOT_FOUND;
204         }
205         dataItem.key = iter->key;
206         dataItem.value = iter->value;
207         dataItem.timestamp = iter->timestamp;
208         dataItem.writeTimestamp = iter->writeTimestamp;
209         dataItem.flag = iter->flag;
210         dataItem.isLocal = iter->isLocal;
211         return E_OK;
212     }
213     return -E_NOT_FOUND;
214 }
215 
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const216 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
217     std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
218     const DataSizeSpecInfo &dataSizeInfo) const
219 {
220     std::vector<VirtualDataItem> dataItems;
221     int errCode = GetSyncData(begin, end, dataSizeInfo.blockSize, dataItems, continueStmtToken);
222     if (errCode != E_OK) {
223         LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
224         return errCode;
225     }
226     return GetEntriesFromItems(entries, dataItems);
227 }
228 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const229 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
230     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
231 {
232     if (continueStmtToken == nullptr) {
233         return -E_NOT_SUPPORT;
234     }
235     return 0;
236 }
237 
GetSyncData(Timestamp begin,Timestamp end,uint32_t blockSize,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const238 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, uint32_t blockSize,
239     std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
240 {
241     if (getDataDelayTime_ > 0) {
242         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
243     }
244     int errCode = DataControl();
245     if (errCode != E_OK) {
246         return errCode;
247     }
248     for (const auto &data : dbData_) {
249         if (data.isLocal) {
250             if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
251                 dataItems.push_back(data);
252             }
253         }
254     }
255     continueStmtToken = nullptr;
256     LOGD("dataItems size %zu", dataItems.size());
257     return E_OK;
258 }
259 
SetSaveDataDelayTime(uint64_t milliDelayTime)260 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
261 {
262     saveDataDelayTime_ = milliDelayTime;
263 }
264 
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const265 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
266     uint32_t blockSize, ContinueToken& continueStmtToken) const
267 {
268     if (continueStmtToken == nullptr) {
269         return -E_NOT_SUPPORT;
270     }
271     return 0;
272 }
273 
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)274 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
275     const std::string &deviceName)
276 {
277     if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
278         availableDeviceId_++;
279         deviceMapping_[deviceName] = availableDeviceId_;
280         LOGD("put deviceName=%s into device map", deviceName.c_str());
281     }
282     for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
283         LOGD("PutSyncData");
284         auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
285             [iter](VirtualDataItem item) { return item.key == iter->key; });
286         if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
287             // if has conflict, compare writeTimestamp
288             LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
289                 iter->writeTimestamp);
290             dbDataIter->key = iter->key;
291             dbDataIter->value = iter->value;
292             dbDataIter->timestamp = iter->timestamp;
293             dbDataIter->writeTimestamp = iter->writeTimestamp;
294             dbDataIter->flag = iter->flag;
295             dbDataIter->isLocal = false;
296             dbDataIter->deviceId = deviceMapping_[deviceName];
297         } else {
298             LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
299             VirtualDataItem dataItem;
300             dataItem.key = iter->key;
301             dataItem.value = iter->value;
302             dataItem.timestamp = iter->timestamp;
303             dataItem.writeTimestamp = iter->writeTimestamp;
304             dataItem.flag = iter->flag;
305             dataItem.isLocal = false;
306             dataItem.deviceId = deviceMapping_[deviceName];
307             dbData_.push_back(dataItem);
308         }
309     }
310     return E_OK;
311 }
312 
SetSchemaInfo(const std::string & schema)313 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
314 {
315     schema_ = schema;
316     SchemaObject emptyObj;
317     schemaObj_ = emptyObj;
318     schemaObj_.ParseFromSchemaString(schema);
319 }
320 
GetDbProperties() const321 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
322 {
323     return properties_;
324 }
325 
GetSecurityOption(SecurityOption & option) const326 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
327 {
328     option = secOption_;
329     return E_OK;
330 }
331 
IsReadable() const332 bool VirtualSingleVerSyncDBInterface::IsReadable() const
333 {
334     return true;
335 }
336 
SetSecurityOption(SecurityOption & option)337 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
338 {
339     secOption_ = option;
340 }
341 
NotifyRemotePushFinished(const std::string & targetId) const342 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
343 {
344 }
345 
GetDatabaseCreateTimestamp(Timestamp & outTime) const346 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
347 {
348     outTime = dbCreateTime_;
349     return E_OK;
350 }
351 
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const352 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
353     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
354     std::vector<SingleVerKvEntry *> &entries) const
355 {
356     if (getDataDelayTime_ > 0) {
357         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
358     }
359     int errCode = DataControl();
360     if (errCode != E_OK) {
361         return errCode;
362     }
363     const auto &startKey = query.GetPrefixKey();
364     Key endKey = startKey;
365     endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
366 
367     std::vector<VirtualDataItem> dataItems;
368     for (const auto &data : dbData_) {
369         // Only get local data.
370         if (!data.isLocal) {
371             continue;
372         }
373 
374         if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
375             if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
376                 dataItems.push_back(data);
377             }
378         } else {
379             if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
380                 data.key >= startKey && data.key <= endKey) {
381                 dataItems.push_back(data);
382             }
383         }
384     }
385 
386     LOGD("dataItems size %zu", dataItems.size());
387     return GetEntriesFromItems(entries, dataItems);
388 }
389 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const390 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
391 {
392     size_t prefixKeySize = keyPrefix.size();
393     for (auto iter = metadata_.begin(); iter != metadata_.end();) {
394         if (prefixKeySize <= iter->first.size() &&
395             keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
396             iter = metadata_.erase(iter);
397         } else {
398             ++iter;
399         }
400     }
401     return E_OK;
402 }
403 
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const404 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
405 {
406     return E_OK;
407 }
408 
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const409 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
410 {
411     return E_OK;
412 }
413 
PutSyncData(const DataItem & item)414 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
415 {
416     return E_OK;
417 }
418 
CheckAndInitQueryCondition(QueryObject & query) const419 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
420 {
421     return E_OK;
422 }
423 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const424 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
425     const std::string &sourceID, const std::string &targetID) const
426 {
427     return E_OK;
428 }
429 
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)430 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
431     const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
432 {
433     std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
434     std::vector<VirtualDataItem> dataItems;
435     for (auto kvEntry : entries) {
436         auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
437         VirtualDataItem item;
438         genericKvEntry->GetKey(item.key);
439         genericKvEntry->GetValue(item.value);
440         item.timestamp = genericKvEntry->GetTimestamp();
441         item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
442         item.flag = genericKvEntry->GetFlag();
443         item.isLocal = false;
444         dataItems.push_back(item);
445     }
446     return PutSyncData(dataItems, deviceName);
447 }
448 
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)449 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
450     bool needCacheSubscribe)
451 {
452     return E_OK;
453 }
454 
RemoveSubscribe(const std::string & subscribeId)455 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
456 {
457     return E_OK;
458 }
459 
RemoveSubscribe(const std::vector<std::string> & subscribeIds)460 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
461 {
462     return E_OK;
463 }
464 
SetBusy(bool busy)465 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy)
466 {
467     busy_ = busy;
468 }
469 
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)470 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
471 {
472     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
473     deviceData_[deviceName][key] = value;
474 }
475 
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)476 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
477 {
478     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
479     value = deviceData_[deviceName][key];
480 }
481 
SetDbProperties(KvDBProperties & kvDBProperties)482 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
483 {
484     properties_ = kvDBProperties;
485 }
486 
DelayGetSyncData(uint32_t milliDelayTime)487 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
488 {
489     getDataDelayTime_ = milliDelayTime;
490 }
491 
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)492 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
493 {
494     countDown_ = whichTime;
495     expectedErrCode_ = errCode;
496     isGetDataControl_ = isGetDataControl;
497 }
498 
DataControl() const499 int VirtualSingleVerSyncDBInterface::DataControl() const
500 {
501     static int getDataTimes = 0;
502     if (countDown_ == -1) { // init -1
503         getDataTimes = 0;
504     }
505     if (isGetDataControl_ && countDown_ > 0) {
506         getDataTimes++;
507     }
508     if (isGetDataControl_ && countDown_ == getDataTimes) {
509         LOGD("virtual device get data failed = %d", expectedErrCode_);
510         getDataTimes = 0;
511         return expectedErrCode_;
512     }
513     return E_OK;
514 }
515 
ResetDataControl()516 void VirtualSingleVerSyncDBInterface::ResetDataControl()
517 {
518     countDown_ = -1;
519     expectedErrCode_ = E_OK;
520 }
521 }  // namespace DistributedDB
522