1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "db_common.h"
17 #include "generic_single_ver_kv_entry.h"
18 #include "platform_specific.h"
19 #include "relational_remote_query_continue_token.h"
20 #include "runtime_context.h"
21 #include "virtual_relational_ver_sync_db_interface.h"
22 #include "virtual_single_ver_sync_db_Interface.h"
23
24 namespace DistributedDB {
25 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<DataItem> & dataItems)26 int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<DataItem> &dataItems)
27 {
28 int errCode = E_OK;
29 for (const auto &item : dataItems) {
30 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
31 if (entry == nullptr) {
32 LOGE("Create entry failed.");
33 errCode = -E_OUT_OF_MEMORY;
34 break;
35 }
36 DataItem storageItem;
37 storageItem.key = item.key;
38 storageItem.value = item.value;
39 storageItem.flag = item.flag;
40 storageItem.timestamp = item.timestamp;
41 storageItem.writeTimestamp = item.writeTimestamp;
42 storageItem.hashKey = item.hashKey;
43 entry->SetEntryData(std::move(storageItem));
44 entries.push_back(entry);
45 }
46 if (errCode != E_OK) {
47 LOGD("[GetEntriesFromItems] failed:%d", errCode);
48 for (auto &kvEntry : entries) {
49 delete kvEntry;
50 kvEntry = nullptr;
51 }
52 entries.clear();
53 }
54 LOGD("[GetEntriesFromItems] size:%zu", dataItems.size());
55 return errCode;
56 }
57
GetStr(const std::vector<uint8_t> & vec)58 std::string GetStr(const std::vector<uint8_t> &vec)
59 {
60 std::string str;
61 DBCommon::VectorToString(vec, str);
62 return str;
63 }
64 }
65
VirtualRelationalVerSyncDBInterface()66 VirtualRelationalVerSyncDBInterface::VirtualRelationalVerSyncDBInterface()
67 {
68 (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
69 LOGD("virtual device init db createTime");
70 }
71
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)72 int VirtualRelationalVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
73 const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
74 {
75 LOGD("[PutSyncData] size %zu", entries.size());
76 std::vector<DataItem> dataItems;
77 for (auto itemEntry : entries) {
78 auto *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
79 if (entry != nullptr) {
80 DataItem item;
81 item.origDev = entry->GetOrigDevice();
82 item.flag = entry->GetFlag();
83 item.timestamp = entry->GetTimestamp();
84 item.writeTimestamp = entry->GetWriteTimestamp();
85 entry->GetKey(item.key);
86 entry->GetValue(item.value);
87 entry->GetHashKey(item.hashKey);
88 dataItems.push_back(item);
89 }
90 }
91 OptTableDataWithLog optTableDataWithLog;
92 optTableDataWithLog.tableName = query.GetTableName();
93 int errCode = DataTransformer::TransformDataItem(dataItems, localFieldInfo_, optTableDataWithLog);
94 if (errCode != E_OK) {
95 return errCode;
96 }
97 for (const auto &optRowDataWithLog : optTableDataWithLog.dataList) {
98 VirtualRowData virtualRowData;
99 virtualRowData.logInfo = optRowDataWithLog.logInfo;
100 size_t index = 0;
101 for (const auto &optItem : optRowDataWithLog.optionalData) {
102 if (index >= localFieldInfo_.size()) {
103 break;
104 }
105 DataValue dataValue = std::move(optItem);
106 virtualRowData.objectData.PutDataValue(localFieldInfo_[index].GetFieldName(), dataValue);
107 index++;
108 }
109 syncData_[query.GetTableName()][GetStr(virtualRowData.logInfo.hashKey)] = virtualRowData;
110 }
111 LOGD("tableName %s", optTableDataWithLog.tableName.c_str());
112 return errCode;
113 }
114
PutLocalData(const std::vector<VirtualRowData> & dataList,const std::string & tableName)115 int VirtualRelationalVerSyncDBInterface::PutLocalData(const std::vector<VirtualRowData> &dataList,
116 const std::string &tableName)
117 {
118 for (const auto &item : dataList) {
119 localData_[tableName][GetStr(item.logInfo.hashKey)] = item;
120 }
121 return E_OK;
122 }
123
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const124 int VirtualRelationalVerSyncDBInterface::GetSyncData(QueryObject &query,
125 const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo,
126 ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const
127 {
128 if (getSyncDataResult_ != E_OK) {
129 return getSyncDataResult_;
130 }
131 if (localData_.find(query.GetTableName()) == localData_.end()) {
132 LOGD("[GetSyncData] No Data Return");
133 return E_OK;
134 }
135 std::vector<DataItem> dataItemList;
136 TableDataWithLog tableDataWithLog = {query.GetTableName(), {}};
137 for (const auto &[hashKey, virtualData] : localData_[query.GetTableName()]) {
138 if (virtualData.logInfo.timestamp < timeRange.beginTime ||
139 virtualData.logInfo.timestamp >= timeRange.endTime) {
140 LOGD("ignore hashkey %s", DBCommon::TransferStringToHex(hashKey).c_str());
141 continue;
142 }
143 RowDataWithLog rowData;
144 for (const auto &field : localFieldInfo_) {
145 DataValue dataValue;
146 (void)virtualData.objectData.GetDataValue(field.GetFieldName(), dataValue);
147 rowData.rowData.push_back(std::move(dataValue));
148 }
149 rowData.logInfo = virtualData.logInfo;
150 tableDataWithLog.dataList.push_back(rowData);
151 }
152
153 int errCode = DataTransformer::TransformTableData(tableDataWithLog, localFieldInfo_, dataItemList);
154 if (errCode != E_OK) {
155 return errCode;
156 }
157 continueStmtToken = nullptr;
158 return GetEntriesFromItems(entries, dataItemList);
159 }
160
GetSchemaInfo() const161 RelationalSchemaObject VirtualRelationalVerSyncDBInterface::GetSchemaInfo() const
162 {
163 return schemaObj_;
164 }
165
166
SetSchemaInfo(const RelationalSchemaObject & schema)167 void VirtualRelationalVerSyncDBInterface::SetSchemaInfo(const RelationalSchemaObject &schema)
168 {
169 schemaObj_ = schema;
170 }
171
GetDatabaseCreateTimestamp(Timestamp & outTime) const172 int VirtualRelationalVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
173 {
174 outTime = dbCreateTime_;
175 return E_OK;
176 }
177
GetTablesQuery()178 std::vector<QuerySyncObject> VirtualRelationalVerSyncDBInterface::GetTablesQuery()
179 {
180 return {};
181 }
182
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)183 int VirtualRelationalVerSyncDBInterface::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
184 {
185 return E_OK;
186 }
187
GetInterfaceType() const188 int VirtualRelationalVerSyncDBInterface::GetInterfaceType() const
189 {
190 return SYNC_RELATION;
191 }
192
IncRefCount()193 void VirtualRelationalVerSyncDBInterface::IncRefCount()
194 {
195 }
196
DecRefCount()197 void VirtualRelationalVerSyncDBInterface::DecRefCount()
198 {
199 }
200
GetIdentifier() const201 std::vector<uint8_t> VirtualRelationalVerSyncDBInterface::GetIdentifier() const
202 {
203 return {};
204 }
205
GetMaxTimestamp(Timestamp & stamp) const206 void VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(Timestamp &stamp) const
207 {
208 for (const auto &item : syncData_) {
209 for (const auto &entry : item.second) {
210 if (stamp < entry.second.logInfo.timestamp) {
211 stamp = entry.second.logInfo.timestamp;
212 }
213 }
214 }
215 LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
216 }
217
GetMetaData(const Key & key,Value & value) const218 int VirtualRelationalVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
219 {
220 auto iter = metadata_.find(key);
221 if (iter != metadata_.end()) {
222 value = iter->second;
223 return E_OK;
224 }
225 return -E_NOT_FOUND;
226 }
227
PutMetaData(const Key & key,const Value & value,bool isInTransaction)228 int VirtualRelationalVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
229 {
230 (void)isInTransaction;
231 metadata_[key] = value;
232 return E_OK;
233 }
234
DeleteMetaData(const std::vector<Key> & keys)235 int VirtualRelationalVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
236 {
237 for (const auto &key : keys) {
238 (void)metadata_.erase(key);
239 }
240 return E_OK;
241 }
242
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const243 int VirtualRelationalVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
244 {
245 size_t prefixKeySize = keyPrefix.size();
246 for (auto iter = metadata_.begin();iter != metadata_.end();) {
247 if (prefixKeySize <= iter->first.size() &&
248 keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
249 iter = metadata_.erase(iter);
250 } else {
251 ++iter;
252 }
253 }
254 return E_OK;
255 }
256
GetAllMetaKeys(std::vector<Key> & keys) const257 int VirtualRelationalVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
258 {
259 for (const auto &iter : metadata_) {
260 keys.push_back(iter.first);
261 }
262 LOGD("GetAllMetaKeys size %zu", keys.size());
263 return E_OK;
264 }
265
GetDbProperties() const266 const RelationalDBProperties &VirtualRelationalVerSyncDBInterface::GetDbProperties() const
267 {
268 return rdbProperties_;
269 }
270
SetLocalFieldInfo(const std::vector<FieldInfo> & localFieldInfo)271 void VirtualRelationalVerSyncDBInterface::SetLocalFieldInfo(const std::vector<FieldInfo> &localFieldInfo)
272 {
273 localFieldInfo_.clear();
274 localFieldInfo_ = localFieldInfo;
275 }
276
GetAllSyncData(const std::string & tableName,std::vector<VirtualRowData> & data)277 int VirtualRelationalVerSyncDBInterface::GetAllSyncData(const std::string &tableName,
278 std::vector<VirtualRowData> &data)
279 {
280 if (syncData_.find(tableName) == syncData_.end()) {
281 return -E_NOT_FOUND;
282 }
283 for (const auto &entry : syncData_[tableName]) {
284 if (entry.second.logInfo.flag != DataItem::DELETE_FLAG) {
285 data.push_back(entry.second);
286 }
287 }
288 return E_OK;
289 }
290
GetVirtualSyncData(const std::string & tableName,const std::string & hashKey,VirtualRowData & data)291 int VirtualRelationalVerSyncDBInterface::GetVirtualSyncData(const std::string &tableName,
292 const std::string &hashKey, VirtualRowData &data)
293 {
294 if (syncData_.find(tableName) == syncData_.end()) {
295 return -E_NOT_FOUND;
296 }
297 if (syncData_.find(hashKey) == syncData_.end()) {
298 return -E_NOT_FOUND;
299 }
300 data = syncData_[tableName][hashKey];
301 return E_OK;
302 }
303
EraseSyncData(const std::string & tableName)304 void VirtualRelationalVerSyncDBInterface::EraseSyncData(const std::string &tableName)
305 {
306 if (syncData_.find(tableName) == syncData_.end()) {
307 return;
308 }
309 syncData_.erase(tableName);
310 }
311
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)312 int VirtualRelationalVerSyncDBInterface::CreateDistributedDeviceTable(const std::string &device,
313 const RelationalSyncStrategy &syncStrategy)
314 {
315 return permitCreateDistributedTable_ ? E_OK : -E_NOT_SUPPORT;
316 }
317
RegisterSchemaChangedCallback(const std::function<void ()> & onSchemaChanged)318 int VirtualRelationalVerSyncDBInterface::RegisterSchemaChangedCallback(const std::function<void()> &onSchemaChanged)
319 {
320 return E_OK;
321 }
322
SetTableInfo(const TableInfo & tableInfo)323 void VirtualRelationalVerSyncDBInterface::SetTableInfo(const TableInfo &tableInfo)
324 {
325 schemaObj_.AddRelationalTable(tableInfo);
326 }
327
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const328 int VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const
329 {
330 (void)tableName;
331 timestamp = 0;
332 return E_OK;
333 }
334
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & data,ContinueToken & token) const335 int VirtualRelationalVerSyncDBInterface::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
336 RelationalRowDataSet &data, ContinueToken &token) const
337 {
338 return E_OK;
339 }
340
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)341 int VirtualRelationalVerSyncDBInterface::SaveRemoteDeviceSchema(const std::string &deviceId,
342 const std::string &remoteSchema, [[gnu::unused]] uint8_t type)
343 {
344 std::lock_guard<std::mutex> autoLock(remoteSchemaMutex_);
345 remoteSchema_[deviceId] = remoteSchema;
346 return E_OK;
347 }
348
GetSchemaFromDB(RelationalSchemaObject & schema)349 int VirtualRelationalVerSyncDBInterface::GetSchemaFromDB(RelationalSchemaObject &schema)
350 {
351 return E_OK;
352 }
353
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj) const354 int VirtualRelationalVerSyncDBInterface::GetRemoteDeviceSchema(const std::string &deviceId,
355 RelationalSchemaObject &schemaObj) const
356 {
357 if (schemaObj.IsSchemaValid()) {
358 LOGE("schema is already valid");
359 return -E_INVALID_ARGS;
360 }
361 std::lock_guard<std::mutex> autoLock(remoteSchemaMutex_);
362 auto schema = remoteSchema_.find(deviceId);
363 if (schema == remoteSchema_.end()) {
364 return -E_NOT_FOUND;
365 }
366 return schemaObj.ParseFromSchemaString(schema->second);
367 }
368
SetPermitCreateDistributedTable(bool permitCreateDistributedTable)369 void VirtualRelationalVerSyncDBInterface::SetPermitCreateDistributedTable(bool permitCreateDistributedTable)
370 {
371 permitCreateDistributedTable_ = permitCreateDistributedTable;
372 }
373
PutDataValue(const std::string & fieldName,const DataValue & value) const374 void ObjectData::PutDataValue(const std::string &fieldName, const DataValue &value) const
375 {
376 fieldData[fieldName] = value;
377 }
378
GetDataValue(const std::string & fieldName,DataValue & value) const379 int ObjectData::GetDataValue(const std::string &fieldName, DataValue &value) const
380 {
381 if (fieldData.find(fieldName) == fieldData.end()) {
382 return -E_NOT_FOUND;
383 }
384 value = fieldData[fieldName];
385 return E_OK;
386 }
387
GetSecurityOption(SecurityOption & option) const388 int VirtualRelationalVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
389 {
390 int errCode = RuntimeContext::GetInstance()->GetSecurityOption("", option);
391 LOGW("virtual get option errCode is %d", errCode);
392 return errCode;
393 }
394
ReleaseRemoteQueryContinueToken(ContinueToken & token) const395 void VirtualRelationalVerSyncDBInterface::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
396 {
397 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
398 delete remoteToken;
399 remoteToken = nullptr;
400 token = nullptr;
401 }
402
SetDistributedSchema(const DistributedDB::DistributedSchema & schema)403 void VirtualRelationalVerSyncDBInterface::SetDistributedSchema(const DistributedDB::DistributedSchema &schema)
404 {
405 schemaObj_.SetTableMode(DistributedTableMode::COLLABORATION);
406 schemaObj_.SetDistributedSchema(schema);
407 }
408
SetGetSyncDataResult(int errCode)409 void VirtualRelationalVerSyncDBInterface::SetGetSyncDataResult(int errCode)
410 {
411 getSyncDataResult_ = errCode;
412 }
413 }
414 #endif
415