• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "db_common.h"
17 #include "generic_single_ver_kv_entry.h"
18 #include "platform_specific.h"
19 #include "relational_remote_query_continue_token.h"
20 #include "runtime_context.h"
21 #include "virtual_relational_ver_sync_db_interface.h"
22 #include "virtual_single_ver_sync_db_Interface.h"
23 
24 namespace DistributedDB {
25 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<DataItem> & dataItems)26     int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<DataItem> &dataItems)
27     {
28         int errCode = E_OK;
29         for (const auto &item : dataItems) {
30             auto entry = new (std::nothrow) GenericSingleVerKvEntry();
31             if (entry == nullptr) {
32                 LOGE("Create entry failed.");
33                 errCode = -E_OUT_OF_MEMORY;
34                 break;
35             }
36             DataItem storageItem;
37             storageItem.key = item.key;
38             storageItem.value = item.value;
39             storageItem.flag = item.flag;
40             storageItem.timestamp = item.timestamp;
41             storageItem.writeTimestamp = item.writeTimestamp;
42             storageItem.hashKey = item.hashKey;
43             entry->SetEntryData(std::move(storageItem));
44             entries.push_back(entry);
45         }
46         if (errCode != E_OK) {
47             LOGD("[GetEntriesFromItems] failed:%d", errCode);
48             for (auto &kvEntry : entries) {
49                 delete kvEntry;
50                 kvEntry = nullptr;
51             }
52             entries.clear();
53         }
54         LOGD("[GetEntriesFromItems] size:%zu", dataItems.size());
55         return errCode;
56     }
57 
GetStr(const std::vector<uint8_t> & vec)58     std::string GetStr(const std::vector<uint8_t> &vec)
59     {
60         std::string str;
61         DBCommon::VectorToString(vec, str);
62         return str;
63     }
64 }
65 
VirtualRelationalVerSyncDBInterface()66 VirtualRelationalVerSyncDBInterface::VirtualRelationalVerSyncDBInterface()
67 {
68     (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
69     LOGD("virtual device init db createTime");
70 }
71 
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)72 int VirtualRelationalVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
73     const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
74 {
75     LOGD("[PutSyncData] size %zu", entries.size());
76     std::vector<DataItem> dataItems;
77     for (auto itemEntry : entries) {
78         auto *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
79         if (entry != nullptr) {
80             DataItem item;
81             item.origDev = entry->GetOrigDevice();
82             item.flag = entry->GetFlag();
83             item.timestamp = entry->GetTimestamp();
84             item.writeTimestamp = entry->GetWriteTimestamp();
85             entry->GetKey(item.key);
86             entry->GetValue(item.value);
87             entry->GetHashKey(item.hashKey);
88             dataItems.push_back(item);
89         }
90     }
91     OptTableDataWithLog optTableDataWithLog;
92     optTableDataWithLog.tableName = query.GetTableName();
93     int errCode = DataTransformer::TransformDataItem(dataItems, localFieldInfo_, optTableDataWithLog);
94     if (errCode != E_OK) {
95         return errCode;
96     }
97     for (const auto &optRowDataWithLog : optTableDataWithLog.dataList) {
98         VirtualRowData virtualRowData;
99         virtualRowData.logInfo = optRowDataWithLog.logInfo;
100         size_t index = 0;
101         for (const auto &optItem : optRowDataWithLog.optionalData) {
102             if (index >= localFieldInfo_.size()) {
103                 break;
104             }
105             DataValue dataValue = std::move(optItem);
106             virtualRowData.objectData.PutDataValue(localFieldInfo_[index].GetFieldName(), dataValue);
107             index++;
108         }
109         syncData_[query.GetTableName()][GetStr(virtualRowData.logInfo.hashKey)] = virtualRowData;
110     }
111     LOGD("tableName %s", optTableDataWithLog.tableName.c_str());
112     return errCode;
113 }
114 
PutLocalData(const std::vector<VirtualRowData> & dataList,const std::string & tableName)115 int VirtualRelationalVerSyncDBInterface::PutLocalData(const std::vector<VirtualRowData> &dataList,
116     const std::string &tableName)
117 {
118     for (const auto &item : dataList) {
119         localData_[tableName][GetStr(item.logInfo.hashKey)] = item;
120     }
121     return E_OK;
122 }
123 
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const124 int VirtualRelationalVerSyncDBInterface::GetSyncData(QueryObject &query,
125     const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo,
126     ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const
127 {
128     if (getSyncDataResult_ != E_OK) {
129         return getSyncDataResult_;
130     }
131     if (localData_.find(query.GetTableName()) == localData_.end()) {
132         LOGD("[GetSyncData] No Data Return");
133         return E_OK;
134     }
135     std::vector<DataItem> dataItemList;
136     TableDataWithLog tableDataWithLog = {query.GetTableName(), {}};
137     for (const auto &[hashKey, virtualData] : localData_[query.GetTableName()]) {
138         if (virtualData.logInfo.timestamp < timeRange.beginTime ||
139             virtualData.logInfo.timestamp >= timeRange.endTime) {
140             LOGD("ignore hashkey %s", DBCommon::TransferStringToHex(hashKey).c_str());
141             continue;
142         }
143         RowDataWithLog rowData;
144         for (const auto &field : localFieldInfo_) {
145             DataValue dataValue;
146             (void)virtualData.objectData.GetDataValue(field.GetFieldName(), dataValue);
147             rowData.rowData.push_back(std::move(dataValue));
148         }
149         rowData.logInfo = virtualData.logInfo;
150         tableDataWithLog.dataList.push_back(rowData);
151     }
152 
153     int errCode = DataTransformer::TransformTableData(tableDataWithLog, localFieldInfo_, dataItemList);
154     if (errCode != E_OK) {
155         return errCode;
156     }
157     continueStmtToken = nullptr;
158     return GetEntriesFromItems(entries, dataItemList);
159 }
160 
GetSchemaInfo() const161 RelationalSchemaObject VirtualRelationalVerSyncDBInterface::GetSchemaInfo() const
162 {
163     return schemaObj_;
164 }
165 
166 
SetSchemaInfo(const RelationalSchemaObject & schema)167 void VirtualRelationalVerSyncDBInterface::SetSchemaInfo(const RelationalSchemaObject &schema)
168 {
169     schemaObj_ = schema;
170 }
171 
GetDatabaseCreateTimestamp(Timestamp & outTime) const172 int VirtualRelationalVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
173 {
174     outTime = dbCreateTime_;
175     return E_OK;
176 }
177 
GetTablesQuery()178 std::vector<QuerySyncObject> VirtualRelationalVerSyncDBInterface::GetTablesQuery()
179 {
180     return {};
181 }
182 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)183 int VirtualRelationalVerSyncDBInterface::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
184 {
185     return E_OK;
186 }
187 
GetInterfaceType() const188 int VirtualRelationalVerSyncDBInterface::GetInterfaceType() const
189 {
190     return SYNC_RELATION;
191 }
192 
IncRefCount()193 void VirtualRelationalVerSyncDBInterface::IncRefCount()
194 {
195 }
196 
DecRefCount()197 void VirtualRelationalVerSyncDBInterface::DecRefCount()
198 {
199 }
200 
GetIdentifier() const201 std::vector<uint8_t> VirtualRelationalVerSyncDBInterface::GetIdentifier() const
202 {
203     return {};
204 }
205 
GetMaxTimestamp(Timestamp & stamp) const206 void VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(Timestamp &stamp) const
207 {
208     for (const auto &item : syncData_) {
209         for (const auto &entry : item.second) {
210             if (stamp < entry.second.logInfo.timestamp) {
211                 stamp = entry.second.logInfo.timestamp;
212             }
213         }
214     }
215     LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
216 }
217 
GetMetaData(const Key & key,Value & value) const218 int VirtualRelationalVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
219 {
220     auto iter = metadata_.find(key);
221     if (iter != metadata_.end()) {
222         value = iter->second;
223         return E_OK;
224     }
225     return -E_NOT_FOUND;
226 }
227 
GetMetaDataByPrefixKey(const Key & keyPrefix,std::map<Key,Value> & data) const228 int VirtualRelationalVerSyncDBInterface::GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const
229 {
230     for (const auto &metadata : metadata_) {
231         if (metadata.first.size() < keyPrefix.size()) {
232             continue;
233         }
234         if (std::equal(keyPrefix.begin(), keyPrefix.end(), metadata.first.begin())) {
235             data[metadata.first] = metadata.second;
236         }
237     }
238     return data.empty() ? -E_NOT_FOUND : E_OK;
239 }
240 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)241 int VirtualRelationalVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
242 {
243     (void)isInTransaction;
244     metadata_[key] = value;
245     return E_OK;
246 }
247 
DeleteMetaData(const std::vector<Key> & keys)248 int VirtualRelationalVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
249 {
250     for (const auto &key : keys) {
251         (void)metadata_.erase(key);
252     }
253     return E_OK;
254 }
255 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const256 int VirtualRelationalVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
257 {
258     size_t prefixKeySize = keyPrefix.size();
259     for (auto iter = metadata_.begin();iter != metadata_.end();) {
260         if (prefixKeySize <= iter->first.size() &&
261             keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
262             iter = metadata_.erase(iter);
263         } else {
264             ++iter;
265         }
266     }
267     return E_OK;
268 }
269 
GetAllMetaKeys(std::vector<Key> & keys) const270 int VirtualRelationalVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
271 {
272     for (const auto &iter : metadata_) {
273         keys.push_back(iter.first);
274     }
275     LOGD("GetAllMetaKeys size %zu", keys.size());
276     return E_OK;
277 }
278 
GetDbProperties() const279 const RelationalDBProperties &VirtualRelationalVerSyncDBInterface::GetDbProperties() const
280 {
281     return rdbProperties_;
282 }
283 
SetLocalFieldInfo(const std::vector<FieldInfo> & localFieldInfo)284 void VirtualRelationalVerSyncDBInterface::SetLocalFieldInfo(const std::vector<FieldInfo> &localFieldInfo)
285 {
286     localFieldInfo_.clear();
287     localFieldInfo_ = localFieldInfo;
288 }
289 
GetAllSyncData(const std::string & tableName,std::vector<VirtualRowData> & data)290 int VirtualRelationalVerSyncDBInterface::GetAllSyncData(const std::string &tableName,
291     std::vector<VirtualRowData> &data)
292 {
293     if (syncData_.find(tableName) == syncData_.end()) {
294         return -E_NOT_FOUND;
295     }
296     for (const auto &entry : syncData_[tableName]) {
297         if (entry.second.logInfo.flag != DataItem::DELETE_FLAG) {
298             data.push_back(entry.second);
299         }
300     }
301     return E_OK;
302 }
303 
GetVirtualSyncData(const std::string & tableName,const std::string & hashKey,VirtualRowData & data)304 int VirtualRelationalVerSyncDBInterface::GetVirtualSyncData(const std::string &tableName,
305     const std::string &hashKey, VirtualRowData &data)
306 {
307     if (syncData_.find(tableName) == syncData_.end()) {
308         return -E_NOT_FOUND;
309     }
310     if (syncData_.find(hashKey) == syncData_.end()) {
311         return -E_NOT_FOUND;
312     }
313     data = syncData_[tableName][hashKey];
314     return E_OK;
315 }
316 
EraseSyncData(const std::string & tableName)317 void VirtualRelationalVerSyncDBInterface::EraseSyncData(const std::string &tableName)
318 {
319     if (syncData_.find(tableName) == syncData_.end()) {
320         return;
321     }
322     syncData_.erase(tableName);
323 }
324 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)325 int VirtualRelationalVerSyncDBInterface::CreateDistributedDeviceTable(const std::string &device,
326     const RelationalSyncStrategy &syncStrategy)
327 {
328     return permitCreateDistributedTable_ ? E_OK : -E_NOT_SUPPORT;
329 }
330 
RegisterSchemaChangedCallback(const std::function<void ()> & onSchemaChanged)331 int VirtualRelationalVerSyncDBInterface::RegisterSchemaChangedCallback(const std::function<void()> &onSchemaChanged)
332 {
333     return E_OK;
334 }
335 
SetTableInfo(const TableInfo & tableInfo)336 void VirtualRelationalVerSyncDBInterface::SetTableInfo(const TableInfo &tableInfo)
337 {
338     schemaObj_.AddRelationalTable(tableInfo);
339 }
340 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const341 int VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
342 {
343     (void)tableName;
344     timestamp = 0;
345     return E_OK;
346 }
347 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & data,ContinueToken & token) const348 int VirtualRelationalVerSyncDBInterface::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
349     RelationalRowDataSet &data, ContinueToken &token) const
350 {
351     return E_OK;
352 }
353 
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)354 int VirtualRelationalVerSyncDBInterface::SaveRemoteDeviceSchema(const std::string &deviceId,
355     const std::string &remoteSchema, [[gnu::unused]] uint8_t type)
356 {
357     std::lock_guard<std::mutex> autoLock(remoteSchemaMutex_);
358     remoteSchema_[deviceId] = remoteSchema;
359     return E_OK;
360 }
361 
GetSchemaFromDB(RelationalSchemaObject & schema)362 int VirtualRelationalVerSyncDBInterface::GetSchemaFromDB(RelationalSchemaObject &schema)
363 {
364     return E_OK;
365 }
366 
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj) const367 int VirtualRelationalVerSyncDBInterface::GetRemoteDeviceSchema(const std::string &deviceId,
368     RelationalSchemaObject &schemaObj) const
369 {
370     if (schemaObj.IsSchemaValid()) {
371         LOGE("schema is already valid");
372         return -E_INVALID_ARGS;
373     }
374     std::lock_guard<std::mutex> autoLock(remoteSchemaMutex_);
375     auto schema = remoteSchema_.find(deviceId);
376     if (schema == remoteSchema_.end()) {
377         return -E_NOT_FOUND;
378     }
379     return schemaObj.ParseFromSchemaString(schema->second);
380 }
381 
SetPermitCreateDistributedTable(bool permitCreateDistributedTable)382 void VirtualRelationalVerSyncDBInterface::SetPermitCreateDistributedTable(bool permitCreateDistributedTable)
383 {
384     permitCreateDistributedTable_ = permitCreateDistributedTable;
385 }
386 
PutDataValue(const std::string & fieldName,const DataValue & value) const387 void ObjectData::PutDataValue(const std::string &fieldName, const DataValue &value) const
388 {
389     fieldData[fieldName] = value;
390 }
391 
GetDataValue(const std::string & fieldName,DataValue & value) const392 int ObjectData::GetDataValue(const std::string &fieldName, DataValue &value) const
393 {
394     if (fieldData.find(fieldName) == fieldData.end()) {
395         return -E_NOT_FOUND;
396     }
397     value = fieldData[fieldName];
398     return E_OK;
399 }
400 
GetSecurityOption(SecurityOption & option) const401 int VirtualRelationalVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
402 {
403     int errCode = RuntimeContext::GetInstance()->GetSecurityOption("", option);
404     LOGW("virtual get option errCode is %d", errCode);
405     return errCode;
406 }
407 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const408 void VirtualRelationalVerSyncDBInterface::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
409 {
410     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
411     delete remoteToken;
412     remoteToken = nullptr;
413     token = nullptr;
414 }
415 
SetDistributedSchema(const DistributedDB::DistributedSchema & schema)416 void VirtualRelationalVerSyncDBInterface::SetDistributedSchema(const DistributedDB::DistributedSchema &schema)
417 {
418     schemaObj_.SetTableMode(DistributedTableMode::COLLABORATION);
419     schemaObj_.SetDistributedSchema(schema);
420 }
421 
SetGetSyncDataResult(int errCode)422 void VirtualRelationalVerSyncDBInterface::SetGetSyncDataResult(int errCode)
423 {
424     getSyncDataResult_ = errCode;
425 }
426 }
427 #endif
428