1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17
18 #include <utility>
19
20 #include "data_compression.h"
21 #include "db_common.h"
22 #include "db_dfx_adapter.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "platform_specific.h"
25 #include "relational_remote_query_continue_token.h"
26 #include "res_finalizer.h"
27 #include "runtime_context.h"
28
29 namespace DistributedDB {
30 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)31 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
32 {
33 static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
34 (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
35 std::string(CLOSE_CONN_TASK),
36 [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBType::DB_RELATION, properties); }
37 );
38 }
39 }
40
41 #define CHECK_STORAGE_ENGINE do { \
42 if (storageEngine_ == nullptr) { \
43 return -E_INVALID_DB; \
44 } \
45 } while (0)
46
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48 : storageEngine_(std::move(engine)),
49 isCachedOption_(false)
50 {}
51
~RelationalSyncAbleStorage()52 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
53 {}
54
55 // Get interface type of this relational db.
GetInterfaceType() const56 int RelationalSyncAbleStorage::GetInterfaceType() const
57 {
58 return SYNC_RELATION;
59 }
60
61 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()62 void RelationalSyncAbleStorage::IncRefCount()
63 {
64 LOGD("RelationalSyncAbleStorage ref +1");
65 IncObjRef(this);
66 }
67
68 // Drop the interface ref-count.
DecRefCount()69 void RelationalSyncAbleStorage::DecRefCount()
70 {
71 LOGD("RelationalSyncAbleStorage ref -1");
72 DecObjRef(this);
73 }
74
75 // Get the identifier of this rdb.
GetIdentifier() const76 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
77 {
78 std::string identifier = storageEngine_->GetIdentifier();
79 return std::vector<uint8_t>(identifier.begin(), identifier.end());
80 }
81
GetDualTupleIdentifier() const82 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
83 {
84 std::string identifier = storageEngine_->GetProperties().GetStringProp(
85 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
86 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
87 return identifierVect;
88 }
89
90 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const91 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp ×tamp) const
92 {
93 int errCode = E_OK;
94 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
95 if (handle == nullptr) {
96 return;
97 }
98 timestamp = 0;
99 errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
100 if (errCode != E_OK) {
101 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
102 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
103 }
104 ReleaseHandle(handle);
105 return;
106 }
107
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const108 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const
109 {
110 int errCode = E_OK;
111 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
112 if (handle == nullptr) {
113 return errCode;
114 }
115 timestamp = 0;
116 errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
117 if (errCode != E_OK) {
118 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
119 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
120 }
121 ReleaseHandle(handle);
122 return errCode;
123 }
124
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const125 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
126 OperatePerm perm) const
127 {
128 if (storageEngine_ == nullptr) {
129 errCode = -E_INVALID_DB;
130 return nullptr;
131 }
132 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
133 storageEngine_->FindExecutor(isWrite, perm, errCode));
134 if (handle == nullptr) {
135 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
136 }
137 return handle;
138 }
139
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const140 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
141 {
142 if (storageEngine_ == nullptr) {
143 return;
144 }
145 StorageExecutor *databaseHandle = handle;
146 storageEngine_->Recycle(databaseHandle);
147 std::function<void()> listener = nullptr;
148 {
149 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
150 listener = heartBeatListener_;
151 }
152 if (listener) {
153 listener();
154 }
155 }
156
157 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const158 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
159 {
160 CHECK_STORAGE_ENGINE;
161 if (key.size() > DBConstant::MAX_KEY_SIZE) {
162 return -E_INVALID_ARGS;
163 }
164
165 int errCode = E_OK;
166 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
167 if (handle == nullptr) {
168 return errCode;
169 }
170 errCode = handle->GetKvData(key, value);
171 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
172 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
173 }
174 ReleaseHandle(handle);
175 return errCode;
176 }
177
178 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)179 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
180 {
181 CHECK_STORAGE_ENGINE;
182 int errCode = E_OK;
183 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
184 if (handle == nullptr) {
185 return errCode;
186 }
187
188 errCode = handle->PutKvData(key, value); // meta doesn't need time.
189 if (errCode != E_OK) {
190 LOGE("Put kv data err:%d", errCode);
191 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
192 }
193 ReleaseHandle(handle);
194 return errCode;
195 }
196
197 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)198 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
199 {
200 for (const auto &key : keys) {
201 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
202 return -E_INVALID_ARGS;
203 }
204 }
205 int errCode = E_OK;
206 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
207 if (handle == nullptr) {
208 return errCode;
209 }
210
211 handle->StartTransaction(TransactType::IMMEDIATE);
212 errCode = handle->DeleteMetaData(keys);
213 if (errCode != E_OK) {
214 handle->Rollback();
215 LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
216 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
217 } else {
218 handle->Commit();
219 }
220 ReleaseHandle(handle);
221 return errCode;
222 }
223
224 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const225 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
226 {
227 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
228 return -E_INVALID_ARGS;
229 }
230
231 int errCode = E_OK;
232 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
233 if (handle == nullptr) {
234 return errCode;
235 }
236
237 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
238 if (errCode != E_OK) {
239 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
240 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
241 }
242 ReleaseHandle(handle);
243 return errCode;
244 }
245
246 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const247 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
248 {
249 CHECK_STORAGE_ENGINE;
250 int errCode = E_OK;
251 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
252 if (handle == nullptr) {
253 return errCode;
254 }
255
256 errCode = handle->GetAllMetaKeys(keys);
257 if (errCode != E_OK) {
258 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
259 }
260 ReleaseHandle(handle);
261 return errCode;
262 }
263
GetDbProperties() const264 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
265 {
266 return storageEngine_->GetProperties();
267 }
268
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)269 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
270 {
271 int errCode = E_OK;
272 for (auto &item : dataItems) {
273 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
274 if (entry == nullptr) {
275 errCode = -E_OUT_OF_MEMORY;
276 LOGE("GetKvEntries failed, errCode:%d", errCode);
277 SingleVerKvEntry::Release(entries);
278 break;
279 }
280 entry->SetEntryData(std::move(item));
281 entries.push_back(entry);
282 }
283 return errCode;
284 }
285
GetDataItemSerialSize(const DataItem & item,size_t appendLen)286 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
287 {
288 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
289 // the size would not be very large.
290 static const size_t maxOrigDevLength = 40;
291 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
292 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
293 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
294 return dataSize;
295 }
296
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)297 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
298 size_t appendLen)
299 {
300 bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
301 for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
302 blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
303 reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
304 }
305 return !reachThreshold;
306 }
307
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)308 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
309 SQLiteSingleVerRelationalContinueToken *&token)
310 {
311 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
312 delete token;
313 token = nullptr;
314 return;
315 }
316
317 if (dataItems.empty()) {
318 errCode = -E_INTERNAL_ERROR;
319 LOGE("Get data unfinished but data items is empty.");
320 delete token;
321 token = nullptr;
322 return;
323 }
324 token->SetNextBeginTime(dataItems.back());
325 }
326
327 /**
328 * Caller must ensure that parameter token is valid.
329 * If error happened, token will be deleted here.
330 */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const331 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
332 SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
333 {
334 if (storageEngine_ == nullptr) {
335 return -E_INVALID_DB;
336 }
337
338 int errCode = E_OK;
339 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
340 OperatePerm::NORMAL_PERM, errCode));
341 if (handle == nullptr) {
342 goto ERROR;
343 }
344
345 do {
346 errCode = handle->GetSyncDataByQuery(dataItems,
347 Parcel::GetAppendedLen(),
348 dataSizeInfo,
349 std::bind(&SQLiteSingleVerRelationalContinueToken::GetStatement, *token,
350 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
351 storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
352 if (errCode == -E_FINISHED) {
353 token->FinishGetData();
354 errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
355 }
356 } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
357
358 ERROR:
359 if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
360 dataItems.clear();
361 }
362 ProcessContinueTokenForQuerySync(dataItems, errCode, token);
363 ReleaseHandle(handle);
364 return errCode;
365 }
366
367 // use kv struct data to sync
368 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const369 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
370 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
371 std::vector<SingleVerKvEntry *> &entries) const
372 {
373 if (!timeRange.IsValid()) {
374 return -E_INVALID_ARGS;
375 }
376 query.SetSchema(storageEngine_->GetSchema());
377 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
378 if (token == nullptr) {
379 LOGE("[SingleVerNStore] Allocate continue token failed.");
380 return -E_OUT_OF_MEMORY;
381 }
382
383 continueStmtToken = static_cast<ContinueToken>(token);
384 return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
385 }
386
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const387 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
388 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
389 {
390 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
391 if (!token->CheckValid()) {
392 return -E_INVALID_ARGS;
393 }
394 RelationalSchemaObject schema = storageEngine_->GetSchema();
395 const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
396 std::vector<std::string> fieldNames;
397 for (const auto &fieldInfo : fieldInfos) {
398 fieldNames.push_back(fieldInfo.GetFieldName());
399 }
400 token->SetFieldNames(fieldNames);
401
402 std::vector<DataItem> dataItems;
403 int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
404 if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
405 continueStmtToken = static_cast<ContinueToken>(token);
406 return errCode;
407 }
408
409 int innerCode = GetKvEntriesByDataItems(entries, dataItems);
410 if (innerCode != E_OK) {
411 errCode = innerCode;
412 delete token;
413 token = nullptr;
414 }
415 continueStmtToken = static_cast<ContinueToken>(token);
416 return errCode;
417 }
418
419 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)420 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
421 {
422 std::vector<DataItem> dataItems;
423 for (const auto &itemEntry : entries) {
424 GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
425 if (entry != nullptr) {
426 DataItem item;
427 item.origDev = entry->GetOrigDevice();
428 item.flag = entry->GetFlag();
429 item.timestamp = entry->GetTimestamp();
430 item.writeTimestamp = entry->GetWriteTimestamp();
431 entry->GetKey(item.key);
432 entry->GetValue(item.value);
433 entry->GetHashKey(item.hashKey);
434 dataItems.push_back(item);
435 }
436 }
437 return dataItems;
438 }
439 }
440
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)441 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
442 const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
443 {
444 std::vector<DataItem> dataItems = ConvertEntries(entries);
445 return PutSyncData(object, dataItems, deviceName);
446 }
447
448 namespace {
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)449 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
450 {
451 return engine->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
452 DistributedTableMode::SPLIT_BY_DEVICE) == DistributedTableMode::COLLABORATION;
453 }
454 }
455
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)456 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
457 const std::string &deviceName)
458 {
459 int errCode = E_OK;
460 LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
461 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
462 if (handle == nullptr) {
463 return errCode;
464 }
465 QueryObject query = object;
466 query.SetSchema(storageEngine_->GetSchema());
467
468 TableInfo table = storageEngine_->GetSchema().GetTable(object.GetTableName());
469 StoreInfo info = {
470 storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
471 storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
472 storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
473 };
474 if (!IsCollaborationMode(storageEngine_)) {
475 // Set table name for SPLIT_BY_DEVICE mode
476 table.SetTableName(DBCommon::GetDistributedTableName(deviceName, object.GetTableName(), info));
477 }
478 DBDfxAdapter::StartTraceSQL();
479 errCode = handle->SaveSyncItems(query, dataItems, deviceName, table);
480 DBDfxAdapter::FinishTraceSQL();
481 if (errCode == E_OK) {
482 // dataItems size > 0 now because already check before
483 // all dataItems will write into db now, so need to observer notify here
484 // if some dataItems will not write into db in the future, observer notify here need change
485 TriggerObserverAction(deviceName);
486 }
487
488 ReleaseHandle(handle);
489 return errCode;
490 }
491
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)492 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
493 const std::string &deviceName)
494 {
495 if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
496 LOGW("Device length is invalid for sync put");
497 return -E_INVALID_ARGS;
498 }
499
500 int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
501 if (errCode != E_OK) {
502 LOGE("[Relational] PutSyncData errCode:%d", errCode);
503 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
504 }
505 return errCode;
506 }
507
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)508 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
509 {
510 (void) deviceName;
511 (void) isNeedNotify;
512 return -E_NOT_SUPPORT;
513 }
514
GetSchemaInfo() const515 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
516 {
517 return storageEngine_->GetSchema();
518 }
519
GetSecurityOption(SecurityOption & option) const520 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
521 {
522 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
523 if (isCachedOption_) {
524 option = securityOption_;
525 return E_OK;
526 }
527 std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
528 int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
529 if (errCode == E_OK) {
530 option = securityOption_;
531 isCachedOption_ = true;
532 }
533 return errCode;
534 }
535
NotifyRemotePushFinished(const std::string & deviceId) const536 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
537 {
538 return;
539 }
540
541 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const542 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
543 {
544 return OS::GetCurrentSysTimeInMicrosecond(outTime);
545 }
546
547 // Get batch meta data associated with the given key.
GetBatchMetaData(const std::vector<Key> & keys,std::vector<Entry> & entries) const548 int RelationalSyncAbleStorage::GetBatchMetaData(const std::vector<Key> &keys, std::vector<Entry> &entries) const
549 {
550 return -E_NOT_SUPPORT;
551 }
552
553 // Put batch meta data as a key-value entry vector
PutBatchMetaData(std::vector<Entry> & entries)554 int RelationalSyncAbleStorage::PutBatchMetaData(std::vector<Entry> &entries)
555 {
556 return -E_NOT_SUPPORT;
557 }
558
GetTablesQuery()559 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
560 {
561 return {};
562 }
563
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)564 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
565 {
566 (void) queryObj;
567 return -E_NOT_SUPPORT;
568 }
569
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)570 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
571 const RelationalSyncStrategy &syncStrategy)
572 {
573 auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
574 DistributedTableMode::SPLIT_BY_DEVICE);
575 if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
576 LOGD("No need create device table in COLLABORATION mode.");
577 return E_OK;
578 }
579
580 int errCode = E_OK;
581 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
582 if (handle == nullptr) {
583 return errCode;
584 }
585
586 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
587 if (errCode != E_OK) {
588 LOGE("Start transaction failed:%d", errCode);
589 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
590 ReleaseHandle(handle);
591 return errCode;
592 }
593
594 StoreInfo info = {
595 storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
596 storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
597 storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
598 };
599 for (const auto &[table, strategy] : syncStrategy) {
600 if (!strategy.permitSync) {
601 continue;
602 }
603
604 errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
605 if (errCode != E_OK) {
606 LOGE("Create distributed device table failed. %d", errCode);
607 break;
608 }
609 }
610
611 if (errCode == E_OK) {
612 errCode = handle->Commit();
613 } else {
614 (void)handle->Rollback();
615 }
616
617 ReleaseHandle(handle);
618 return errCode;
619 }
620
RegisterSchemaChangedCallback(const std::function<void ()> & callback)621 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
622 {
623 std::lock_guard lock(onSchemaChangedMutex_);
624 onSchemaChanged_ = callback;
625 return E_OK;
626 }
627
NotifySchemaChanged()628 void RelationalSyncAbleStorage::NotifySchemaChanged()
629 {
630 std::lock_guard lock(onSchemaChangedMutex_);
631 if (onSchemaChanged_) {
632 LOGD("Notify relational schema was changed");
633 onSchemaChanged_();
634 }
635 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const636 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
637 {
638 algorithmSet.clear();
639 DataCompression::GetCompressionAlgo(algorithmSet);
640 return E_OK;
641 }
642
RegisterObserverAction(const RelationalObserverAction & action)643 void RelationalSyncAbleStorage::RegisterObserverAction(const RelationalObserverAction &action)
644 {
645 std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
646 dataChangeDeviceCallback_ = action;
647 }
648
TriggerObserverAction(const std::string & deviceName)649 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName)
650 {
651 {
652 std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
653 if (!dataChangeDeviceCallback_) {
654 return;
655 }
656 }
657 IncObjRef(this);
658 int taskErrCode = RuntimeContext::GetInstance()->ScheduleTask([this, deviceName] {
659 std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
660 if (dataChangeDeviceCallback_) {
661 dataChangeDeviceCallback_(deviceName);
662 }
663 DecObjRef(this);
664 });
665 if (taskErrCode != E_OK) {
666 LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
667 DecObjRef(this);
668 }
669 }
670
RegisterHeartBeatListener(const std::function<void ()> & listener)671 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
672 {
673 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
674 heartBeatListener_ = listener;
675 }
676
CheckAndInitQueryCondition(QueryObject & query) const677 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
678 {
679 RelationalSchemaObject schema = storageEngine_->GetSchema();
680 TableInfo table = schema.GetTable(query.GetTableName());
681 if (table.GetTableName() != query.GetTableName()) {
682 LOGE("Query table is not a distributed table.");
683 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
684 }
685 query.SetSchema(schema);
686
687 int errCode = E_OK;
688 auto *handle = GetHandle(false, errCode);
689 if (handle == nullptr) {
690 return errCode;
691 }
692
693 errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
694 if (errCode != E_OK) {
695 LOGE("Check relational query condition failed. %d", errCode);
696 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
697 }
698
699 ReleaseHandle(handle);
700 return errCode;
701 }
702
CheckCompatible(const std::string & schema,uint8_t type) const703 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
704 {
705 // return true if is relational schema.
706 return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
707 }
708
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const709 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
710 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
711 {
712 if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
713 return -E_NOT_SUPPORT;
714 }
715 if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
716 LOGE("[ExecuteQuery] invalid args");
717 return -E_INVALID_ARGS;
718 }
719 int errCode = E_OK;
720 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
721 if (handle == nullptr) {
722 LOGE("[ExecuteQuery] get handle fail:%d", errCode);
723 return errCode;
724 }
725 errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
726 if (errCode != E_OK) {
727 LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
728 }
729 ReleaseHandle(handle);
730 return errCode;
731 }
732
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const733 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
734 RelationalRowDataSet &dataSet, ContinueToken &token) const
735 {
736 dataSet.Clear();
737 if (token == nullptr) {
738 // start query
739 std::vector<std::string> colNames;
740 std::vector<RelationalRowData *> data;
741 ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
742
743 int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
744 if (errCode != E_OK) {
745 return errCode;
746 }
747
748 // create one token
749 token = static_cast<ContinueToken>(
750 new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
751 if (token == nullptr) {
752 LOGE("ExecuteQuery OOM");
753 return -E_OUT_OF_MEMORY;
754 }
755 }
756
757 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
758 if (!remoteToken->CheckValid()) {
759 LOGE("ExecuteQuery invalid token");
760 return -E_INVALID_ARGS;
761 }
762
763 int errCode = remoteToken->GetData(packetSize, dataSet);
764 if (errCode == -E_UNFINISHED) {
765 errCode = E_OK;
766 } else {
767 if (errCode != E_OK) {
768 dataSet.Clear();
769 }
770 delete remoteToken;
771 remoteToken = nullptr;
772 token = nullptr;
773 }
774 LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
775 return errCode;
776 }
777
GetRelationalDbProperties() const778 const RelationalDBProperties &RelationalSyncAbleStorage::GetRelationalDbProperties() const
779 {
780 return storageEngine_->GetProperties();
781 }
782
ReleaseRemoteQueryContinueToken(ContinueToken & token) const783 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
784 {
785 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
786 delete remoteToken;
787 remoteToken = nullptr;
788 token = nullptr;
789 }
790 }
791 #endif