1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_single_ver_sync_db_Interface.h"
17
18 #include <algorithm>
19 #include <thread>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "intercepted_data_impl.h"
25 #include "log_print.h"
26 #include "platform_specific.h"
27 #include "query_object.h"
28 #include "securec.h"
29
30 namespace DistributedDB {
31 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)32 int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
33 {
34 int errCode = E_OK;
35 for (const auto &item : dataItems) {
36 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
37 if (entry == nullptr) {
38 LOGE("Create entry failed.");
39 errCode = -E_OUT_OF_MEMORY;
40 break;
41 }
42 DataItem storageItem;
43 storageItem.key = item.key;
44 storageItem.value = item.value;
45 storageItem.flag = item.flag;
46 storageItem.timestamp = item.timestamp;
47 storageItem.writeTimestamp = item.writeTimestamp;
48 entry->SetEntryData(std::move(storageItem));
49 entries.push_back(entry);
50 }
51 if (errCode != E_OK) {
52 for (auto &kvEntry : entries) {
53 delete kvEntry;
54 kvEntry = nullptr;
55 }
56 entries.clear();
57 }
58 return errCode;
59 }
60 }
61
VirtualSingleVerSyncDBInterface()62 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
63 {
64 (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
65 LOGD("virtual device init db createTime");
66 }
67
GetInterfaceType() const68 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
69 {
70 return SYNC_SVD;
71 }
72
IncRefCount()73 void VirtualSingleVerSyncDBInterface::IncRefCount()
74 {
75 }
76
DecRefCount()77 void VirtualSingleVerSyncDBInterface::DecRefCount()
78 {
79 }
80
SetIdentifier(std::vector<uint8_t> & identifier)81 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
82 {
83 identifier_ = std::move(identifier);
84 }
85
GetIdentifier() const86 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
87 {
88 return identifier_;
89 }
90
GetMetaData(const Key & key,Value & value) const91 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
92 {
93 auto iter = metadata_.find(key);
94 if (iter != metadata_.end()) {
95 value = iter->second;
96 return E_OK;
97 }
98 return -E_NOT_FOUND;
99 }
100
PutMetaData(const Key & key,const Value & value)101 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value)
102 {
103 if (busy_) {
104 return -E_BUSY;
105 }
106 metadata_[key] = value;
107 return E_OK;
108 }
109
DeleteMetaData(const std::vector<Key> & keys)110 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
111 {
112 for (const auto &key : keys) {
113 (void)metadata_.erase(key);
114 }
115 return E_OK;
116 }
117
GetAllMetaKeys(std::vector<Key> & keys) const118 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
119 {
120 for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
121 keys.push_back(iter->first);
122 }
123 LOGD("GetAllMetaKeys size %zu", keys.size());
124 return E_OK;
125 }
126
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const127 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
128 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
129 {
130 return -E_NOT_SUPPORT;
131 }
132
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const133 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
134 const DataSizeSpecInfo &dataSizeInfo) const
135 {
136 return -E_NOT_SUPPORT;
137 }
138
ReleaseContinueToken(ContinueToken & continueStmtToken) const139 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
140 {
141 return;
142 }
143
GetSchemaInfo() const144 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
145 {
146 return schemaObj_;
147 }
148
CheckCompatible(const std::string & schema,uint8_t type) const149 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
150 {
151 if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
152 return true;
153 }
154 return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
155 }
156
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)157 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
158 {
159 VirtualDataItem item;
160 item.key = key;
161 item.value = value;
162 item.timestamp = time;
163 item.writeTimestamp = time;
164 item.flag = flag;
165 item.isLocal = true;
166 dbData_.push_back(item);
167 return E_OK;
168 }
169
GetMaxTimestamp(Timestamp & stamp) const170 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
171 {
172 for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
173 if (stamp < iter->writeTimestamp) {
174 stamp = iter->writeTimestamp;
175 }
176 }
177 LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
178 }
179
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)180 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
181 {
182 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
183 deviceData_.erase(deviceName);
184 uint32_t devId = 0;
185 if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
186 devId = deviceMapping_[deviceName];
187 }
188 for (auto &item : dbData_) {
189 if (item.deviceId == devId && devId > 0) {
190 item.flag = VirtualDataItem::DELETE_FLAG;
191 }
192 }
193 LOGD("RemoveDeviceData FINISH");
194 return E_OK;
195 }
196
GetSyncData(const Key & key,VirtualDataItem & dataItem)197 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
198 {
199 auto iter = std::find_if(dbData_.begin(), dbData_.end(),
200 [key](const VirtualDataItem& item) { return item.key == key; });
201 if (iter != dbData_.end()) {
202 if (iter->flag == VirtualDataItem::DELETE_FLAG) {
203 return -E_NOT_FOUND;
204 }
205 dataItem.key = iter->key;
206 dataItem.value = iter->value;
207 dataItem.timestamp = iter->timestamp;
208 dataItem.writeTimestamp = iter->writeTimestamp;
209 dataItem.flag = iter->flag;
210 dataItem.isLocal = iter->isLocal;
211 return E_OK;
212 }
213 return -E_NOT_FOUND;
214 }
215
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const216 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
217 std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
218 const DataSizeSpecInfo &dataSizeInfo) const
219 {
220 std::vector<VirtualDataItem> dataItems;
221 int errCode = GetSyncData(begin, end, dataSizeInfo.blockSize, dataItems, continueStmtToken);
222 if (errCode != E_OK) {
223 LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
224 return errCode;
225 }
226 return GetEntriesFromItems(entries, dataItems);
227 }
228
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const229 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
230 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
231 {
232 if (continueStmtToken == nullptr) {
233 return -E_NOT_SUPPORT;
234 }
235 return 0;
236 }
237
GetSyncData(Timestamp begin,Timestamp end,uint32_t blockSize,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const238 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, uint32_t blockSize,
239 std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
240 {
241 if (getDataDelayTime_ > 0) {
242 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
243 }
244 int errCode = DataControl();
245 if (errCode != E_OK) {
246 return errCode;
247 }
248 for (const auto &data : dbData_) {
249 if (data.isLocal) {
250 if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
251 dataItems.push_back(data);
252 }
253 }
254 }
255 continueStmtToken = nullptr;
256 LOGD("dataItems size %zu", dataItems.size());
257 return E_OK;
258 }
259
SetSaveDataDelayTime(uint64_t milliDelayTime)260 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
261 {
262 saveDataDelayTime_ = milliDelayTime;
263 }
264
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const265 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
266 uint32_t blockSize, ContinueToken& continueStmtToken) const
267 {
268 if (continueStmtToken == nullptr) {
269 return -E_NOT_SUPPORT;
270 }
271 return 0;
272 }
273
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)274 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
275 const std::string &deviceName)
276 {
277 if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
278 availableDeviceId_++;
279 deviceMapping_[deviceName] = availableDeviceId_;
280 LOGD("put deviceName=%s into device map", deviceName.c_str());
281 }
282 for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
283 LOGD("PutSyncData");
284 auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
285 [iter](VirtualDataItem item) { return item.key == iter->key; });
286 if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
287 // if has conflict, compare writeTimestamp
288 LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
289 iter->writeTimestamp);
290 dbDataIter->key = iter->key;
291 dbDataIter->value = iter->value;
292 dbDataIter->timestamp = iter->timestamp;
293 dbDataIter->writeTimestamp = iter->writeTimestamp;
294 dbDataIter->flag = iter->flag;
295 dbDataIter->isLocal = false;
296 dbDataIter->deviceId = deviceMapping_[deviceName];
297 } else {
298 LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
299 VirtualDataItem dataItem;
300 dataItem.key = iter->key;
301 dataItem.value = iter->value;
302 dataItem.timestamp = iter->timestamp;
303 dataItem.writeTimestamp = iter->writeTimestamp;
304 dataItem.flag = iter->flag;
305 dataItem.isLocal = false;
306 dataItem.deviceId = deviceMapping_[deviceName];
307 dbData_.push_back(dataItem);
308 }
309 }
310 return E_OK;
311 }
312
SetSchemaInfo(const std::string & schema)313 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
314 {
315 schema_ = schema;
316 SchemaObject emptyObj;
317 schemaObj_ = emptyObj;
318 schemaObj_.ParseFromSchemaString(schema);
319 }
320
GetDbProperties() const321 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
322 {
323 return properties_;
324 }
325
GetSecurityOption(SecurityOption & option) const326 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
327 {
328 option = secOption_;
329 return E_OK;
330 }
331
IsReadable() const332 bool VirtualSingleVerSyncDBInterface::IsReadable() const
333 {
334 return true;
335 }
336
SetSecurityOption(SecurityOption & option)337 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
338 {
339 secOption_ = option;
340 }
341
NotifyRemotePushFinished(const std::string & targetId) const342 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
343 {
344 }
345
GetDatabaseCreateTimestamp(Timestamp & outTime) const346 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
347 {
348 outTime = dbCreateTime_;
349 return E_OK;
350 }
351
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const352 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
353 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
354 std::vector<SingleVerKvEntry *> &entries) const
355 {
356 if (getDataDelayTime_ > 0) {
357 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
358 }
359 int errCode = DataControl();
360 if (errCode != E_OK) {
361 return errCode;
362 }
363 const auto &startKey = query.GetPrefixKey();
364 Key endKey = startKey;
365 endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
366
367 std::vector<VirtualDataItem> dataItems;
368 for (const auto &data : dbData_) {
369 // Only get local data.
370 if (!data.isLocal) {
371 continue;
372 }
373
374 if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
375 if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
376 dataItems.push_back(data);
377 }
378 } else {
379 if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
380 data.key >= startKey && data.key <= endKey) {
381 dataItems.push_back(data);
382 }
383 }
384 }
385
386 LOGD("dataItems size %zu", dataItems.size());
387 return GetEntriesFromItems(entries, dataItems);
388 }
389
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const390 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
391 {
392 size_t prefixKeySize = keyPrefix.size();
393 for (auto iter = metadata_.begin(); iter != metadata_.end();) {
394 if (prefixKeySize <= iter->first.size() &&
395 keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
396 iter = metadata_.erase(iter);
397 } else {
398 ++iter;
399 }
400 }
401 return E_OK;
402 }
403
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const404 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
405 {
406 return E_OK;
407 }
408
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const409 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
410 {
411 return E_OK;
412 }
413
PutSyncData(const DataItem & item)414 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
415 {
416 return E_OK;
417 }
418
CheckAndInitQueryCondition(QueryObject & query) const419 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
420 {
421 return E_OK;
422 }
423
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const424 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
425 const std::string &sourceID, const std::string &targetID) const
426 {
427 return E_OK;
428 }
429
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)430 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
431 const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
432 {
433 std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
434 std::vector<VirtualDataItem> dataItems;
435 for (auto kvEntry : entries) {
436 auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
437 VirtualDataItem item;
438 genericKvEntry->GetKey(item.key);
439 genericKvEntry->GetValue(item.value);
440 item.timestamp = genericKvEntry->GetTimestamp();
441 item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
442 item.flag = genericKvEntry->GetFlag();
443 item.isLocal = false;
444 dataItems.push_back(item);
445 }
446 return PutSyncData(dataItems, deviceName);
447 }
448
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)449 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
450 bool needCacheSubscribe)
451 {
452 return E_OK;
453 }
454
RemoveSubscribe(const std::string & subscribeId)455 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
456 {
457 return E_OK;
458 }
459
RemoveSubscribe(const std::vector<std::string> & subscribeIds)460 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
461 {
462 return E_OK;
463 }
464
SetBusy(bool busy)465 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy)
466 {
467 busy_ = busy;
468 }
469
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)470 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
471 {
472 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
473 deviceData_[deviceName][key] = value;
474 }
475
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)476 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
477 {
478 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
479 value = deviceData_[deviceName][key];
480 }
481
SetDbProperties(KvDBProperties & kvDBProperties)482 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
483 {
484 properties_ = kvDBProperties;
485 }
486
DelayGetSyncData(uint32_t milliDelayTime)487 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
488 {
489 getDataDelayTime_ = milliDelayTime;
490 }
491
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)492 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
493 {
494 countDown_ = whichTime;
495 expectedErrCode_ = errCode;
496 isGetDataControl_ = isGetDataControl;
497 }
498
DataControl() const499 int VirtualSingleVerSyncDBInterface::DataControl() const
500 {
501 static int getDataTimes = 0;
502 if (countDown_ == -1) { // init -1
503 getDataTimes = 0;
504 }
505 if (isGetDataControl_ && countDown_ > 0) {
506 getDataTimes++;
507 }
508 if (isGetDataControl_ && countDown_ == getDataTimes) {
509 LOGD("virtual device get data failed = %d", expectedErrCode_);
510 getDataTimes = 0;
511 return expectedErrCode_;
512 }
513 return E_OK;
514 }
515
ResetDataControl()516 void VirtualSingleVerSyncDBInterface::ResetDataControl()
517 {
518 countDown_ = -1;
519 expectedErrCode_ = E_OK;
520 }
521 } // namespace DistributedDB
522