• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "db_common.h"
17 #include "generic_single_ver_kv_entry.h"
18 #include "platform_specific.h"
19 #include "relational_remote_query_continue_token.h"
20 #include "runtime_context.h"
21 #include "virtual_relational_ver_sync_db_interface.h"
22 #include "virtual_single_ver_sync_db_Interface.h"
23 
24 namespace DistributedDB {
25 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<DataItem> & dataItems)26     int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<DataItem> &dataItems)
27     {
28         int errCode = E_OK;
29         for (const auto &item : dataItems) {
30             auto entry = new (std::nothrow) GenericSingleVerKvEntry();
31             if (entry == nullptr) {
32                 LOGE("Create entry failed.");
33                 errCode = -E_OUT_OF_MEMORY;
34                 break;
35             }
36             DataItem storageItem;
37             storageItem.key = item.key;
38             storageItem.value = item.value;
39             storageItem.flag = item.flag;
40             storageItem.timestamp = item.timestamp;
41             storageItem.writeTimestamp = item.writeTimestamp;
42             storageItem.hashKey = item.hashKey;
43             entry->SetEntryData(std::move(storageItem));
44             entries.push_back(entry);
45         }
46         if (errCode != E_OK) {
47             LOGD("[GetEntriesFromItems] failed:%d", errCode);
48             for (auto &kvEntry : entries) {
49                 delete kvEntry;
50                 kvEntry = nullptr;
51             }
52             entries.clear();
53         }
54         LOGD("[GetEntriesFromItems] size:%zu", dataItems.size());
55         return errCode;
56     }
57 
GetStr(const std::vector<uint8_t> & vec)58     std::string GetStr(const std::vector<uint8_t> &vec)
59     {
60         std::string str;
61         DBCommon::VectorToString(vec, str);
62         return str;
63     }
64 }
65 
VirtualRelationalVerSyncDBInterface()66 VirtualRelationalVerSyncDBInterface::VirtualRelationalVerSyncDBInterface()
67 {
68     (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
69     LOGD("virtual device init db createTime");
70 }
71 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)72 int VirtualRelationalVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &object,
73     const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
74 {
75     LOGD("[PutSyncData] size %zu", entries.size());
76     std::vector<DataItem> dataItems;
77     for (auto itemEntry : entries) {
78         auto *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
79         if (entry != nullptr) {
80             DataItem item;
81             item.origDev = entry->GetOrigDevice();
82             item.flag = entry->GetFlag();
83             item.timestamp = entry->GetTimestamp();
84             item.writeTimestamp = entry->GetWriteTimestamp();
85             entry->GetKey(item.key);
86             entry->GetValue(item.value);
87             entry->GetHashKey(item.hashKey);
88             dataItems.push_back(item);
89         }
90     }
91     OptTableDataWithLog optTableDataWithLog;
92     optTableDataWithLog.tableName = object.GetTableName();
93     int errCode = DataTransformer::TransformDataItem(dataItems, localFieldInfo_,
94         localFieldInfo_, optTableDataWithLog);
95     if (errCode != E_OK) {
96         return errCode;
97     }
98     for (const auto &optRowDataWithLog : optTableDataWithLog.dataList) {
99         VirtualRowData virtualRowData;
100         virtualRowData.logInfo = optRowDataWithLog.logInfo;
101         size_t index = 0;
102         for (const auto &optItem : optRowDataWithLog.optionalData) {
103             if (index >= localFieldInfo_.size()) {
104                 break;
105             }
106             DataValue dataValue = std::move(optItem);
107             LOGD("type:%d", static_cast<int>(optItem.GetType()));
108             virtualRowData.objectData.PutDataValue(localFieldInfo_[index].GetFieldName(), dataValue);
109             index++;
110         }
111         syncData_[object.GetTableName()][GetStr(virtualRowData.logInfo.hashKey)] = virtualRowData;
112     }
113     LOGD("tableName %s", optTableDataWithLog.tableName.c_str());
114     return errCode;
115 }
116 
PutLocalData(const std::vector<VirtualRowData> & dataList,const std::string & tableName)117 int VirtualRelationalVerSyncDBInterface::PutLocalData(const std::vector<VirtualRowData> &dataList,
118     const std::string &tableName)
119 {
120     for (const auto &item : dataList) {
121         localData_[tableName][GetStr(item.logInfo.hashKey)] = item;
122     }
123     return E_OK;
124 }
125 
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const126 int VirtualRelationalVerSyncDBInterface::GetSyncData(QueryObject &query,
127     const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo,
128     ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const
129 {
130     if (localData_.find(query.GetTableName()) == localData_.end()) {
131         LOGD("[GetSyncData] No Data Return");
132         return E_OK;
133     }
134     std::vector<DataItem> dataItemList;
135     TableDataWithLog tableDataWithLog = {query.GetTableName(), {}};
136     for (const auto &[hashKey, virtualData] : localData_[query.GetTableName()]) {
137         if (virtualData.logInfo.timestamp < timeRange.beginTime ||
138             virtualData.logInfo.timestamp >= timeRange.endTime) {
139             LOGD("ignore hashkey %s", DBCommon::TransferStringToHex(hashKey).c_str());
140             continue;
141         }
142         RowDataWithLog rowData;
143         for (const auto &field : localFieldInfo_) {
144             DataValue dataValue;
145             (void)virtualData.objectData.GetDataValue(field.GetFieldName(), dataValue);
146             rowData.rowData.push_back(std::move(dataValue));
147         }
148         rowData.logInfo = virtualData.logInfo;
149         tableDataWithLog.dataList.push_back(rowData);
150     }
151 
152     int errCode = DataTransformer::TransformTableData(tableDataWithLog, localFieldInfo_, dataItemList);
153     if (errCode != E_OK) {
154         return errCode;
155     }
156     continueStmtToken = nullptr;
157     return GetEntriesFromItems(entries, dataItemList);
158 }
159 
GetSchemaInfo() const160 RelationalSchemaObject VirtualRelationalVerSyncDBInterface::GetSchemaInfo() const
161 {
162     return schemaObj_;
163 }
164 
165 
SetSchemaInfo(const RelationalSchemaObject & schema)166 void VirtualRelationalVerSyncDBInterface::SetSchemaInfo(const RelationalSchemaObject &schema)
167 {
168     schemaObj_ = schema;
169 }
170 
GetDatabaseCreateTimestamp(Timestamp & outTime) const171 int VirtualRelationalVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
172 {
173     outTime = dbCreateTime_;
174     return E_OK;
175 }
176 
GetBatchMetaData(const std::vector<Key> & keys,std::vector<Entry> & entries) const177 int VirtualRelationalVerSyncDBInterface::GetBatchMetaData(const std::vector<Key> &keys,
178     std::vector<Entry> &entries) const
179 {
180     int errCode = E_OK;
181     for (const auto &key : keys) {
182         Entry entry;
183         entry.key = key;
184         errCode = GetMetaData(key, entry.value);
185         if (errCode != E_OK) {
186             return errCode;
187         }
188         entries.push_back(entry);
189     }
190     return errCode;
191 }
192 
PutBatchMetaData(std::vector<Entry> & entries)193 int VirtualRelationalVerSyncDBInterface::PutBatchMetaData(std::vector<Entry> &entries)
194 {
195     int errCode = E_OK;
196     for (const auto &entry : entries) {
197         errCode = PutMetaData(entry.key, entry.value);
198         if (errCode != E_OK) {
199             return errCode;
200         }
201     }
202     return errCode;
203 }
204 
GetTablesQuery()205 std::vector<QuerySyncObject> VirtualRelationalVerSyncDBInterface::GetTablesQuery()
206 {
207     return {};
208 }
209 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)210 int VirtualRelationalVerSyncDBInterface::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
211 {
212     return E_OK;
213 }
214 
GetInterfaceType() const215 int VirtualRelationalVerSyncDBInterface::GetInterfaceType() const
216 {
217     return SYNC_RELATION;
218 }
219 
IncRefCount()220 void VirtualRelationalVerSyncDBInterface::IncRefCount()
221 {
222 }
223 
DecRefCount()224 void VirtualRelationalVerSyncDBInterface::DecRefCount()
225 {
226 }
227 
GetIdentifier() const228 std::vector<uint8_t> VirtualRelationalVerSyncDBInterface::GetIdentifier() const
229 {
230     return {};
231 }
232 
GetMaxTimestamp(Timestamp & stamp) const233 void VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(Timestamp &stamp) const
234 {
235     for (const auto &item : syncData_) {
236         for (const auto &entry : item.second) {
237             if (stamp < entry.second.logInfo.timestamp) {
238                 stamp = entry.second.logInfo.timestamp;
239             }
240         }
241     }
242     LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
243 }
244 
GetMetaData(const Key & key,Value & value) const245 int VirtualRelationalVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
246 {
247     auto iter = metadata_.find(key);
248     if (iter != metadata_.end()) {
249         value = iter->second;
250         return E_OK;
251     }
252     return -E_NOT_FOUND;
253 }
254 
PutMetaData(const Key & key,const Value & value)255 int VirtualRelationalVerSyncDBInterface::PutMetaData(const Key &key, const Value &value)
256 {
257     metadata_[key] = value;
258     return E_OK;
259 }
260 
DeleteMetaData(const std::vector<Key> & keys)261 int VirtualRelationalVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
262 {
263     for (const auto &key : keys) {
264         (void)metadata_.erase(key);
265     }
266     return E_OK;
267 }
268 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const269 int VirtualRelationalVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
270 {
271     size_t prefixKeySize = keyPrefix.size();
272     for (auto iter = metadata_.begin();iter != metadata_.end();) {
273         if (prefixKeySize <= iter->first.size() &&
274             keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
275             iter = metadata_.erase(iter);
276         } else {
277             ++iter;
278         }
279     }
280     return E_OK;
281 }
282 
GetAllMetaKeys(std::vector<Key> & keys) const283 int VirtualRelationalVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
284 {
285     for (const auto &iter : metadata_) {
286         keys.push_back(iter.first);
287     }
288     LOGD("GetAllMetaKeys size %zu", keys.size());
289     return E_OK;
290 }
291 
GetDbProperties() const292 const RelationalDBProperties &VirtualRelationalVerSyncDBInterface::GetDbProperties() const
293 {
294     return rdbProperties_;
295 }
296 
SetLocalFieldInfo(const std::vector<FieldInfo> & localFieldInfo)297 void VirtualRelationalVerSyncDBInterface::SetLocalFieldInfo(const std::vector<FieldInfo> &localFieldInfo)
298 {
299     localFieldInfo_.clear();
300     localFieldInfo_ = localFieldInfo;
301 }
302 
GetAllSyncData(const std::string & tableName,std::vector<VirtualRowData> & data)303 int VirtualRelationalVerSyncDBInterface::GetAllSyncData(const std::string &tableName,
304     std::vector<VirtualRowData> &data)
305 {
306     if (syncData_.find(tableName) == syncData_.end()) {
307         return -E_NOT_FOUND;
308     }
309     for (const auto &entry : syncData_[tableName]) {
310         data.push_back(entry.second);
311     }
312     return E_OK;
313 }
314 
GetVirtualSyncData(const std::string & tableName,const std::string & hashKey,VirtualRowData & data)315 int VirtualRelationalVerSyncDBInterface::GetVirtualSyncData(const std::string &tableName,
316     const std::string &hashKey, VirtualRowData &data)
317 {
318     if (syncData_.find(tableName) == syncData_.end()) {
319         return -E_NOT_FOUND;
320     }
321     if (syncData_.find(hashKey) == syncData_.end()) {
322         return -E_NOT_FOUND;
323     }
324     data = syncData_[tableName][hashKey];
325     return E_OK;
326 }
327 
EraseSyncData(const std::string & tableName)328 void VirtualRelationalVerSyncDBInterface::EraseSyncData(const std::string &tableName)
329 {
330     if (syncData_.find(tableName) == syncData_.end()) {
331         return;
332     }
333     syncData_.erase(tableName);
334 }
335 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)336 int VirtualRelationalVerSyncDBInterface::CreateDistributedDeviceTable(const std::string &device,
337     const RelationalSyncStrategy &syncStrategy)
338 {
339     return permitCreateDistributedTable_ ? E_OK : -E_NOT_SUPPORT;
340 }
341 
RegisterSchemaChangedCallback(const std::function<void ()> & onSchemaChanged)342 int VirtualRelationalVerSyncDBInterface::RegisterSchemaChangedCallback(const std::function<void()> &onSchemaChanged)
343 {
344     return E_OK;
345 }
346 
SetTableInfo(const TableInfo & tableInfo)347 void VirtualRelationalVerSyncDBInterface::SetTableInfo(const TableInfo &tableInfo)
348 {
349     schemaObj_.AddRelationalTable(tableInfo);
350 }
351 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const352 int VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
353 {
354     (void)tableName;
355     timestamp = 0;
356     return E_OK;
357 }
358 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & data,ContinueToken & token) const359 int VirtualRelationalVerSyncDBInterface::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
360     RelationalRowDataSet &data, ContinueToken &token) const
361 {
362     return E_OK;
363 }
364 
GetRelationalDbProperties() const365 const RelationalDBProperties &VirtualRelationalVerSyncDBInterface::GetRelationalDbProperties() const
366 {
367     return rdbProperties_;
368 }
369 
SetPermitCreateDistributedTable(bool permitCreateDistributedTable)370 void VirtualRelationalVerSyncDBInterface::SetPermitCreateDistributedTable(bool permitCreateDistributedTable)
371 {
372     permitCreateDistributedTable_ = permitCreateDistributedTable;
373 }
374 
PutDataValue(const std::string & fieldName,const DataValue & value) const375 void ObjectData::PutDataValue(const std::string &fieldName, const DataValue &value) const
376 {
377     fieldData[fieldName] = value;
378 }
379 
GetDataValue(const std::string & fieldName,DataValue & value) const380 int ObjectData::GetDataValue(const std::string &fieldName, DataValue &value) const
381 {
382     if (fieldData.find(fieldName) == fieldData.end()) {
383         return -E_NOT_FOUND;
384     }
385     value = fieldData[fieldName];
386     return E_OK;
387 }
388 
GetSecurityOption(SecurityOption & option) const389 int VirtualRelationalVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
390 {
391     return RuntimeContext::GetInstance()->GetSecurityOption("", option);
392 }
393 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const394 void VirtualRelationalVerSyncDBInterface::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
395 {
396     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
397     delete remoteToken;
398     remoteToken = nullptr;
399     token = nullptr;
400 }
401 }
402 #endif