1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17
18 #include <utility>
19
20 #include "cloud/cloud_db_constant.h"
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_dfx_adapter.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "platform_specific.h"
26 #include "relational_remote_query_continue_token.h"
27 #include "relational_sync_data_inserter.h"
28 #include "res_finalizer.h"
29 #include "runtime_context.h"
30
31 namespace DistributedDB {
32 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)33 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
34 {
35 static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
36 (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
37 std::string(CLOSE_CONN_TASK),
38 [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
39 );
40 }
41 }
42
43 #define CHECK_STORAGE_ENGINE do { \
44 if (storageEngine_ == nullptr) { \
45 return -E_INVALID_DB; \
46 } \
47 } while (0)
48
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)49 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
50 : storageEngine_(std::move(engine)),
51 isCachedOption_(false)
52 {}
53
~RelationalSyncAbleStorage()54 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
55 {
56 syncAbleEngine_ = nullptr;
57 }
58
59 // Get interface type of this relational db.
GetInterfaceType() const60 int RelationalSyncAbleStorage::GetInterfaceType() const
61 {
62 return SYNC_RELATION;
63 }
64
65 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()66 void RelationalSyncAbleStorage::IncRefCount()
67 {
68 LOGD("RelationalSyncAbleStorage ref +1");
69 IncObjRef(this);
70 }
71
72 // Drop the interface ref-count.
DecRefCount()73 void RelationalSyncAbleStorage::DecRefCount()
74 {
75 LOGD("RelationalSyncAbleStorage ref -1");
76 DecObjRef(this);
77 }
78
79 // Get the identifier of this rdb.
GetIdentifier() const80 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
81 {
82 std::string identifier = storageEngine_->GetIdentifier();
83 return std::vector<uint8_t>(identifier.begin(), identifier.end());
84 }
85
GetDualTupleIdentifier() const86 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
87 {
88 std::string identifier = storageEngine_->GetProperties().GetStringProp(
89 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
90 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
91 return identifierVect;
92 }
93
94 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const95 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp ×tamp) const
96 {
97 int errCode = E_OK;
98 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
99 if (handle == nullptr) {
100 return;
101 }
102 timestamp = 0;
103 errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
104 if (errCode != E_OK) {
105 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
106 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
107 }
108 ReleaseHandle(handle);
109 return;
110 }
111
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const112 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const
113 {
114 int errCode = E_OK;
115 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
116 if (handle == nullptr) {
117 return errCode;
118 }
119 timestamp = 0;
120 errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
121 if (errCode != E_OK) {
122 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
123 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
124 }
125 ReleaseHandle(handle);
126 return errCode;
127 }
128
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const129 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
130 OperatePerm perm) const
131 {
132 if (storageEngine_ == nullptr) {
133 errCode = -E_INVALID_DB;
134 return nullptr;
135 }
136 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
137 storageEngine_->FindExecutor(isWrite, perm, errCode));
138 if (handle == nullptr) {
139 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
140 }
141 return handle;
142 }
143
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const144 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
145 int &errCode, OperatePerm perm) const
146 {
147 if (storageEngine_ == nullptr) {
148 errCode = -E_INVALID_DB;
149 return nullptr;
150 }
151 if (transactionHandle_ != nullptr) {
152 return transactionHandle_;
153 }
154 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
155 storageEngine_->FindExecutor(isWrite, perm, errCode));
156 if (errCode != E_OK) {
157 ReleaseHandle(handle);
158 handle = nullptr;
159 }
160 return handle;
161 }
162
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const163 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
164 {
165 if (storageEngine_ == nullptr) {
166 return;
167 }
168 StorageExecutor *databaseHandle = handle;
169 storageEngine_->Recycle(databaseHandle);
170 std::function<void()> listener = nullptr;
171 {
172 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
173 listener = heartBeatListener_;
174 }
175 if (listener) {
176 listener();
177 }
178 }
179
180 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const181 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
182 {
183 CHECK_STORAGE_ENGINE;
184 if (key.size() > DBConstant::MAX_KEY_SIZE) {
185 return -E_INVALID_ARGS;
186 }
187 int errCode = E_OK;
188 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
189 if (handle == nullptr) {
190 return errCode;
191 }
192 errCode = handle->GetKvData(key, value);
193 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
194 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
195 }
196 ReleaseHandle(handle);
197 return errCode;
198 }
199
200 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)201 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
202 {
203 CHECK_STORAGE_ENGINE;
204 int errCode = E_OK;
205 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
206 if (handle == nullptr) {
207 return errCode;
208 }
209
210 errCode = handle->PutKvData(key, value); // meta doesn't need time.
211 if (errCode != E_OK) {
212 LOGE("Put kv data err:%d", errCode);
213 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
214 }
215 ReleaseHandle(handle);
216 return errCode;
217 }
218
219 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)220 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
221 {
222 CHECK_STORAGE_ENGINE;
223 for (const auto &key : keys) {
224 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
225 return -E_INVALID_ARGS;
226 }
227 }
228 int errCode = E_OK;
229 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
230 if (handle == nullptr) {
231 return errCode;
232 }
233
234 handle->StartTransaction(TransactType::IMMEDIATE);
235 errCode = handle->DeleteMetaData(keys);
236 if (errCode != E_OK) {
237 handle->Rollback();
238 LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
239 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
240 } else {
241 handle->Commit();
242 }
243 ReleaseHandle(handle);
244 return errCode;
245 }
246
247 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const248 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
249 {
250 CHECK_STORAGE_ENGINE;
251 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
252 return -E_INVALID_ARGS;
253 }
254
255 int errCode = E_OK;
256 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
257 if (handle == nullptr) {
258 return errCode;
259 }
260
261 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
262 if (errCode != E_OK) {
263 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
264 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
265 }
266 ReleaseHandle(handle);
267 return errCode;
268 }
269
270 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const271 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
272 {
273 CHECK_STORAGE_ENGINE;
274 int errCode = E_OK;
275 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
276 if (handle == nullptr) {
277 return errCode;
278 }
279
280 errCode = handle->GetAllMetaKeys(keys);
281 if (errCode != E_OK) {
282 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
283 }
284 ReleaseHandle(handle);
285 return errCode;
286 }
287
GetDbProperties() const288 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
289 {
290 return storageEngine_->GetProperties();
291 }
292
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)293 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
294 {
295 int errCode = E_OK;
296 for (auto &item : dataItems) {
297 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
298 if (entry == nullptr) {
299 errCode = -E_OUT_OF_MEMORY;
300 LOGE("GetKvEntries failed, errCode:%d", errCode);
301 SingleVerKvEntry::Release(entries);
302 break;
303 }
304 entry->SetEntryData(std::move(item));
305 entries.push_back(entry);
306 }
307 return errCode;
308 }
309
GetDataItemSerialSize(const DataItem & item,size_t appendLen)310 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
311 {
312 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
313 // the size would not be very large.
314 static const size_t maxOrigDevLength = 40;
315 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
316 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
317 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
318 return dataSize;
319 }
320
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)321 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
322 size_t appendLen)
323 {
324 bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
325 for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
326 blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
327 reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
328 }
329 return !reachThreshold;
330 }
331
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)332 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
333 SQLiteSingleVerRelationalContinueToken *&token)
334 {
335 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
336 delete token;
337 token = nullptr;
338 return;
339 }
340
341 if (dataItems.empty()) {
342 errCode = -E_INTERNAL_ERROR;
343 LOGE("Get data unfinished but data items is empty.");
344 delete token;
345 token = nullptr;
346 return;
347 }
348 token->SetNextBeginTime(dataItems.back());
349 token->UpdateNextSyncOffset(dataItems.size());
350 }
351
352 /**
353 * Caller must ensure that parameter token is valid.
354 * If error happened, token will be deleted here.
355 */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const356 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
357 SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
358 {
359 if (storageEngine_ == nullptr) {
360 return -E_INVALID_DB;
361 }
362
363 int errCode = E_OK;
364 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
365 OperatePerm::NORMAL_PERM, errCode));
366 if (handle == nullptr) {
367 goto ERROR;
368 }
369
370 do {
371 errCode = handle->GetSyncDataByQuery(dataItems,
372 Parcel::GetAppendedLen(),
373 dataSizeInfo,
374 std::bind(&SQLiteSingleVerRelationalContinueToken::GetStatement, *token,
375 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
376 storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
377 if (errCode == -E_FINISHED) {
378 token->FinishGetData();
379 errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
380 }
381 } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
382
383 ERROR:
384 if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
385 dataItems.clear();
386 }
387 ProcessContinueTokenForQuerySync(dataItems, errCode, token);
388 ReleaseHandle(handle);
389 return errCode;
390 }
391
392 // use kv struct data to sync
393 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const394 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
395 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
396 std::vector<SingleVerKvEntry *> &entries) const
397 {
398 if (!timeRange.IsValid()) {
399 return -E_INVALID_ARGS;
400 }
401 query.SetSchema(storageEngine_->GetSchema());
402 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
403 if (token == nullptr) {
404 LOGE("[SingleVerNStore] Allocate continue token failed.");
405 return -E_OUT_OF_MEMORY;
406 }
407
408 continueStmtToken = static_cast<ContinueToken>(token);
409 return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
410 }
411
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const412 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
413 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
414 {
415 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
416 if (!token->CheckValid()) {
417 return -E_INVALID_ARGS;
418 }
419 RelationalSchemaObject schema = storageEngine_->GetSchema();
420 const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
421 std::vector<std::string> fieldNames;
422 fieldNames.reserve(fieldInfos.size());
423 for (const auto &fieldInfo : fieldInfos) { // order by cid
424 fieldNames.push_back(fieldInfo.GetFieldName());
425 }
426 token->SetFieldNames(fieldNames);
427
428 std::vector<DataItem> dataItems;
429 int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
430 if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
431 continueStmtToken = static_cast<ContinueToken>(token);
432 return errCode;
433 }
434
435 int innerCode = GetKvEntriesByDataItems(entries, dataItems);
436 if (innerCode != E_OK) {
437 errCode = innerCode;
438 delete token;
439 token = nullptr;
440 }
441 continueStmtToken = static_cast<ContinueToken>(token);
442 return errCode;
443 }
444
445 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)446 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
447 {
448 std::vector<DataItem> dataItems;
449 for (const auto &itemEntry : entries) {
450 GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
451 if (entry != nullptr) {
452 DataItem item;
453 item.origDev = entry->GetOrigDevice();
454 item.flag = entry->GetFlag();
455 item.timestamp = entry->GetTimestamp();
456 item.writeTimestamp = entry->GetWriteTimestamp();
457 entry->GetKey(item.key);
458 entry->GetValue(item.value);
459 entry->GetHashKey(item.hashKey);
460 dataItems.push_back(item);
461 }
462 }
463 return dataItems;
464 }
465 }
466
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)467 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
468 const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
469 {
470 std::vector<DataItem> dataItems = ConvertEntries(entries);
471 return PutSyncData(object, dataItems, deviceName);
472 }
473
474 namespace {
GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)475 inline DistributedTableMode GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
476 {
477 return static_cast<DistributedTableMode>(engine->GetProperties().GetIntProp(
478 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
479 }
480
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)481 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
482 {
483 return GetCollaborationMode(engine) == DistributedTableMode::COLLABORATION;
484 }
485 }
486
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)487 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
488 const std::string &deviceName)
489 {
490 int errCode = E_OK;
491 LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
492 QueryObject query = object;
493 query.SetSchema(storageEngine_->GetSchema());
494
495 RelationalSchemaObject remoteSchema;
496 errCode = GetRemoteDeviceSchema(deviceName, remoteSchema);
497 if (errCode != E_OK) {
498 LOGE("Find remote schema failed. err=%d", errCode);
499 return errCode;
500 }
501
502 StoreInfo info = {
503 storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
504 storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
505 storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
506 };
507 auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
508 remoteSchema.GetTable(query.GetTableName()).GetFieldInfos(), info);
509 inserter.SetEntries(dataItems);
510
511 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
512 if (handle == nullptr) {
513 return errCode;
514 }
515
516 DBDfxAdapter::StartTraceSQL();
517
518 errCode = handle->SaveSyncItems(inserter);
519
520 DBDfxAdapter::FinishTraceSQL();
521 if (errCode == E_OK) {
522 // dataItems size > 0 now because already check before
523 // all dataItems will write into db now, so need to observer notify here
524 // if some dataItems will not write into db in the future, observer notify here need change
525 ChangedData data;
526 TriggerObserverAction(deviceName, std::move(data), false);
527 }
528
529 ReleaseHandle(handle);
530 return errCode;
531 }
532
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)533 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
534 const std::string &deviceName)
535 {
536 if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
537 LOGW("Device length is invalid for sync put");
538 return -E_INVALID_ARGS;
539 }
540
541 int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
542 if (errCode != E_OK) {
543 LOGE("[Relational] PutSyncData errCode:%d", errCode);
544 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
545 }
546 return errCode;
547 }
548
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)549 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
550 {
551 (void) deviceName;
552 (void) isNeedNotify;
553 return -E_NOT_SUPPORT;
554 }
555
GetSchemaInfo() const556 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
557 {
558 return storageEngine_->GetSchema();
559 }
560
GetSecurityOption(SecurityOption & option) const561 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
562 {
563 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
564 if (isCachedOption_) {
565 option = securityOption_;
566 return E_OK;
567 }
568 std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
569 int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
570 if (errCode == E_OK) {
571 option = securityOption_;
572 isCachedOption_ = true;
573 }
574 return errCode;
575 }
576
NotifyRemotePushFinished(const std::string & deviceId) const577 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
578 {
579 return;
580 }
581
582 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const583 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
584 {
585 return OS::GetCurrentSysTimeInMicrosecond(outTime);
586 }
587
GetTablesQuery()588 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
589 {
590 auto tableNames = storageEngine_->GetSchema().GetTableNames();
591 std::vector<QuerySyncObject> queries;
592 queries.reserve(tableNames.size());
593 for (const auto &it : tableNames) {
594 queries.emplace_back(Query::Select(it));
595 }
596 return queries;
597 }
598
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)599 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
600 {
601 (void) queryObj;
602 return -E_NOT_SUPPORT;
603 }
604
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const605 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
606 const std::string &targetID) const
607 {
608 return E_OK;
609 }
610
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)611 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
612 const RelationalSyncStrategy &syncStrategy)
613 {
614 auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
615 DistributedTableMode::SPLIT_BY_DEVICE);
616 if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
617 LOGD("No need create device table in COLLABORATION mode.");
618 return E_OK;
619 }
620
621 int errCode = E_OK;
622 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
623 if (handle == nullptr) {
624 return errCode;
625 }
626
627 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
628 if (errCode != E_OK) {
629 LOGE("Start transaction failed:%d", errCode);
630 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
631 ReleaseHandle(handle);
632 return errCode;
633 }
634
635 StoreInfo info = {
636 storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
637 storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
638 storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
639 };
640 for (const auto &[table, strategy] : syncStrategy) {
641 if (!strategy.permitSync) {
642 continue;
643 }
644
645 errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
646 if (errCode != E_OK) {
647 LOGE("Create distributed device table failed. %d", errCode);
648 break;
649 }
650 }
651
652 if (errCode == E_OK) {
653 errCode = handle->Commit();
654 } else {
655 (void)handle->Rollback();
656 }
657
658 ReleaseHandle(handle);
659 return errCode;
660 }
661
RegisterSchemaChangedCallback(const std::function<void ()> & callback)662 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
663 {
664 std::lock_guard lock(onSchemaChangedMutex_);
665 onSchemaChanged_ = callback;
666 return E_OK;
667 }
668
NotifySchemaChanged()669 void RelationalSyncAbleStorage::NotifySchemaChanged()
670 {
671 std::lock_guard lock(onSchemaChangedMutex_);
672 if (onSchemaChanged_) {
673 LOGD("Notify relational schema was changed");
674 onSchemaChanged_();
675 }
676 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const677 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
678 {
679 algorithmSet.clear();
680 DataCompression::GetCompressionAlgo(algorithmSet);
681 return E_OK;
682 }
683
RegisterObserverAction(uint64_t connectionId,const RelationalObserverAction & action)684 void RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const RelationalObserverAction &action)
685 {
686 std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
687 dataChangeCallbackMap_[connectionId] = action;
688 }
689
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)690 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
691 ChangedData &&changedData, bool isChangedData)
692 {
693 IncObjRef(this);
694 int taskErrCode =
695 RuntimeContext::GetInstance()->ScheduleTask([this, deviceName, changedData, isChangedData] () mutable {
696 std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
697 if (!dataChangeCallbackMap_.empty()) {
698 auto it = dataChangeCallbackMap_.rbegin(); // call the last valid observer
699 if (it->second != nullptr) {
700 it->second(deviceName, std::move(changedData), isChangedData);
701 }
702 }
703 DecObjRef(this);
704 });
705 if (taskErrCode != E_OK) {
706 LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
707 DecObjRef(this);
708 }
709 }
710
RegisterHeartBeatListener(const std::function<void ()> & listener)711 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
712 {
713 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
714 heartBeatListener_ = listener;
715 }
716
CheckAndInitQueryCondition(QueryObject & query) const717 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
718 {
719 RelationalSchemaObject schema = storageEngine_->GetSchema();
720 TableInfo table = schema.GetTable(query.GetTableName());
721 if (!table.IsValid()) {
722 LOGE("Query table is not a distributed table.");
723 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
724 }
725 if (table.GetTableSyncType() == CLOUD_COOPERATION) {
726 LOGE("cloud table mode is not support");
727 return -E_NOT_SUPPORT;
728 }
729 query.SetSchema(schema);
730
731 int errCode = E_OK;
732 auto *handle = GetHandle(true, errCode);
733 if (handle == nullptr) {
734 return errCode;
735 }
736
737 errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
738 if (errCode != E_OK) {
739 LOGE("Check relational query condition failed. %d", errCode);
740 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
741 }
742
743 ReleaseHandle(handle);
744 return errCode;
745 }
746
CheckCompatible(const std::string & schema,uint8_t type) const747 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
748 {
749 // return true if is relational schema.
750 return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
751 }
752
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const753 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
754 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
755 {
756 if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
757 return -E_NOT_SUPPORT;
758 }
759 if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
760 LOGE("[ExecuteQuery] invalid args");
761 return -E_INVALID_ARGS;
762 }
763 int errCode = E_OK;
764 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
765 if (handle == nullptr) {
766 LOGE("[ExecuteQuery] get handle fail:%d", errCode);
767 return errCode;
768 }
769 errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
770 if (errCode != E_OK) {
771 LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
772 }
773 ReleaseHandle(handle);
774 return errCode;
775 }
776
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const777 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
778 RelationalRowDataSet &dataSet, ContinueToken &token) const
779 {
780 dataSet.Clear();
781 if (token == nullptr) {
782 // start query
783 std::vector<std::string> colNames;
784 std::vector<RelationalRowData *> data;
785 ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
786
787 int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
788 if (errCode != E_OK) {
789 return errCode;
790 }
791
792 // create one token
793 token = static_cast<ContinueToken>(
794 new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
795 if (token == nullptr) {
796 LOGE("ExecuteQuery OOM");
797 return -E_OUT_OF_MEMORY;
798 }
799 }
800
801 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
802 if (!remoteToken->CheckValid()) {
803 LOGE("ExecuteQuery invalid token");
804 return -E_INVALID_ARGS;
805 }
806
807 int errCode = remoteToken->GetData(packetSize, dataSet);
808 if (errCode == -E_UNFINISHED) {
809 errCode = E_OK;
810 } else {
811 if (errCode != E_OK) {
812 dataSet.Clear();
813 }
814 delete remoteToken;
815 remoteToken = nullptr;
816 token = nullptr;
817 }
818 LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
819 return errCode;
820 }
821
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)822 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
823 uint8_t type)
824 {
825 if (ReadSchemaType(type) != SchemaType::RELATIVE) {
826 return -E_INVALID_ARGS;
827 }
828
829 RelationalSchemaObject schemaObj;
830 int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
831 if (errCode != E_OK) {
832 LOGE("Parse remote schema failed. err=%d", errCode);
833 return errCode;
834 }
835
836 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
837 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
838 Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
839 errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
840 if (errCode != E_OK) {
841 LOGE("Save remote schema failed. err=%d", errCode);
842 return errCode;
843 }
844
845 return remoteDeviceSchema_.Put(deviceId, remoteSchema);
846 }
847
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj)848 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj)
849 {
850 if (schemaObj.IsSchemaValid()) {
851 return -E_INVALID_ARGS;
852 }
853
854 std::string remoteSchema;
855 int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
856 if (errCode == -E_NOT_FOUND) {
857 LOGW("Get remote device schema miss cached.");
858 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
859 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
860 Value remoteSchemaBuff;
861 errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
862 if (errCode != E_OK) {
863 LOGE("Get remote device schema from meta failed. err=%d", errCode);
864 return errCode;
865 }
866 remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
867 errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
868 }
869
870 if (errCode != E_OK) {
871 LOGE("Get remote device schema failed. err=%d", errCode);
872 return errCode;
873 }
874
875 errCode = schemaObj.ParseFromSchemaString(remoteSchema);
876 if (errCode != E_OK) {
877 LOGE("Parse remote schema failed. err=%d", errCode);
878 }
879 return errCode;
880 }
881
ReleaseRemoteQueryContinueToken(ContinueToken & token) const882 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
883 {
884 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
885 delete remoteToken;
886 remoteToken = nullptr;
887 token = nullptr;
888 }
889
StartTransaction(TransactType type)890 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
891 {
892 CHECK_STORAGE_ENGINE;
893 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
894 if (transactionHandle_ != nullptr) {
895 LOGD("Transaction started already.");
896 return -E_TRANSACT_STATE;
897 }
898 int errCode = E_OK;
899 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
900 storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
901 if (handle == nullptr) {
902 ReleaseHandle(handle);
903 return errCode;
904 }
905 errCode = handle->StartTransaction(type);
906 if (errCode != E_OK) {
907 ReleaseHandle(handle);
908 return errCode;
909 }
910 transactionHandle_ = handle;
911 return errCode;
912 }
913
Commit()914 int RelationalSyncAbleStorage::Commit()
915 {
916 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
917 if (transactionHandle_ == nullptr) {
918 LOGE("relation database is null or the transaction has not been started");
919 return -E_INVALID_DB;
920 }
921 int errCode = transactionHandle_->Commit();
922 ReleaseHandle(transactionHandle_);
923 transactionHandle_ = nullptr;
924 LOGD("connection commit transaction!");
925 return errCode;
926 }
927
Rollback()928 int RelationalSyncAbleStorage::Rollback()
929 {
930 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
931 if (transactionHandle_ == nullptr) {
932 LOGE("Invalid handle for rollback or the transaction has not been started.");
933 return -E_INVALID_DB;
934 }
935
936 int errCode = transactionHandle_->Rollback();
937 ReleaseHandle(transactionHandle_);
938 transactionHandle_ = nullptr;
939 LOGI("connection rollback transaction!");
940 return errCode;
941 }
942
GetUploadCount(const std::string & tableName,const Timestamp & timestamp,const bool isCloudForcePush,int64_t & count)943 int RelationalSyncAbleStorage::GetUploadCount(const std::string &tableName, const Timestamp ×tamp,
944 const bool isCloudForcePush, int64_t &count)
945 {
946 int errCode = E_OK;
947 auto *handle = GetHandleExpectTransaction(false, errCode);
948 if (handle == nullptr) {
949 return errCode;
950 }
951 errCode = handle->GetUploadCount(tableName, timestamp, isCloudForcePush, count);
952 if (transactionHandle_ == nullptr) {
953 ReleaseHandle(handle);
954 }
955 return errCode;
956 }
957
FillCloudGid(const CloudSyncData & data)958 int RelationalSyncAbleStorage::FillCloudGid(const CloudSyncData &data)
959 {
960 if (storageEngine_ == nullptr) {
961 return -E_INVALID_DB;
962 }
963 int errCode = E_OK;
964 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
965 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
966 if (writeHandle == nullptr) {
967 return errCode;
968 }
969 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
970 if (errCode != E_OK) {
971 ReleaseHandle(writeHandle);
972 return errCode;
973 }
974 errCode = writeHandle->UpdateCloudLogGid(data);
975 if (errCode != E_OK) {
976 writeHandle->Rollback();
977 ReleaseHandle(writeHandle);
978 return errCode;
979 }
980 errCode = writeHandle->Commit();
981 ReleaseHandle(writeHandle);
982 return errCode;
983 }
984
GetCloudData(const TableSchema & tableSchema,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)985 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const Timestamp &beginTime,
986 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
987 {
988 if (transactionHandle_ == nullptr) {
989 LOGE(" the transaction has not been started");
990 return -E_INVALID_DB;
991 }
992 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
993 QueryObject queryObject;
994 queryObject.SetTableName(tableSchema.name);
995 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, queryObject);
996 if (token == nullptr) {
997 LOGE("[SingleVerNStore] Allocate continue token failed.");
998 return -E_OUT_OF_MEMORY;
999 }
1000 token->SetCloudTableSchema(tableSchema);
1001 continueStmtToken = static_cast<ContinueToken>(token);
1002 return GetCloudDataNext(continueStmtToken, cloudDataResult);
1003 }
1004
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1005 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1006 CloudSyncData &cloudDataResult)
1007 {
1008 if (continueStmtToken == nullptr) {
1009 return -E_INVALID_ARGS;
1010 }
1011 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1012 if (!token->CheckValid()) {
1013 return -E_INVALID_ARGS;
1014 }
1015 if (transactionHandle_ == nullptr) {
1016 LOGE("the transaction has not been started, release the token");
1017 ReleaseCloudDataToken(continueStmtToken);
1018 return -E_INVALID_DB;
1019 }
1020 int errCode = transactionHandle_->GetSyncCloudData(cloudDataResult, CloudDbConstant::MAX_UPLOAD_SIZE, *token);
1021 if (errCode != -E_UNFINISHED) {
1022 delete token;
1023 token = nullptr;
1024 }
1025 continueStmtToken = static_cast<ContinueToken>(token);
1026 return errCode;
1027 }
1028
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1029 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1030 {
1031 if (continueStmtToken == nullptr) {
1032 return E_OK;
1033 }
1034 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1035 if (!token->CheckValid()) {
1036 return E_OK;
1037 }
1038 int errCode = token->ReleaseCloudStatement();
1039 delete token;
1040 token = nullptr;
1041 return errCode;
1042 }
1043
ChkSchema(const TableName & tableName)1044 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1045 {
1046 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1047 RelationalSchemaObject localSchema = GetSchemaInfo();
1048 return schemaMgr_.ChkSchema(tableName, localSchema);
1049 }
1050
SetCloudDbSchema(const DataBaseSchema & schema)1051 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1052 {
1053 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1054 schemaMgr_.SetCloudDbSchema(schema);
1055 return E_OK;
1056 }
1057
GetCloudDbSchema(DataBaseSchema & cloudSchema)1058 int RelationalSyncAbleStorage::GetCloudDbSchema(DataBaseSchema &cloudSchema)
1059 {
1060 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1061 cloudSchema = *(schemaMgr_.GetCloudDbSchema());
1062 return E_OK;
1063 }
1064
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1065 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1066 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1067 {
1068 if (transactionHandle_ == nullptr) {
1069 LOGE(" the transaction has not been started");
1070 return -E_INVALID_DB;
1071 }
1072
1073 TableSchema tableSchema;
1074 int errCode = GetCloudTableSchema(tableName, tableSchema);
1075 if (errCode != E_OK) {
1076 LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1077 return errCode;
1078 }
1079 return transactionHandle_->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1080 }
1081
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1082 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1083 {
1084 if (transactionHandle_ == nullptr) {
1085 LOGE(" the transaction has not been started");
1086 return -E_INVALID_DB;
1087 }
1088
1089 TableSchema tableSchema;
1090 int errCode = GetCloudTableSchema(tableName, tableSchema);
1091 if (errCode != E_OK) {
1092 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1093 return errCode;
1094 }
1095 return transactionHandle_->PutCloudSyncData(tableName, tableSchema, downloadData);
1096 }
1097
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1098 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1099 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1100 {
1101 if (transactionHandle_ == nullptr) {
1102 LOGE("the transaction has not been started");
1103 return -E_INVALID_DB;
1104 }
1105
1106 return transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets);
1107 }
1108
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1109 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1110 {
1111 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1112 return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1113 }
1114
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1115 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1116 bool isDownloadSuccess)
1117 {
1118 if (storageEngine_ == nullptr) {
1119 return -E_INVALID_DB;
1120 }
1121 if (transactionHandle_ == nullptr) {
1122 LOGE("the transaction has not been started when fill asset for download.");
1123 return -E_INVALID_DB;
1124 }
1125 TableSchema tableSchema;
1126 int errCode = GetCloudTableSchema(tableName, tableSchema);
1127 if (errCode != E_OK) {
1128 LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1129 return errCode;
1130 }
1131 errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess);
1132 if (errCode != E_OK) {
1133 LOGE("fill cloud asset for download failed.%d", errCode);
1134 }
1135 return errCode;
1136 }
1137
SetLogTriggerStatus(bool status)1138 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1139 {
1140 int errCode = E_OK;
1141 auto *handle = GetHandleExpectTransaction(false, errCode);
1142 if (handle == nullptr) {
1143 return errCode;
1144 }
1145 errCode = handle->SetLogTriggerStatus(status);
1146 if (transactionHandle_ == nullptr) {
1147 ReleaseHandle(handle);
1148 }
1149 return errCode;
1150 }
1151
FillCloudGidAndAsset(const OpType opType,const CloudSyncData & data)1152 int RelationalSyncAbleStorage::FillCloudGidAndAsset(const OpType opType, const CloudSyncData &data)
1153 {
1154 CHECK_STORAGE_ENGINE;
1155 if (opType == OpType::UPDATE && data.updData.assets.empty()) {
1156 return E_OK;
1157 }
1158 int errCode = E_OK;
1159 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1160 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1161 if (writeHandle == nullptr) {
1162 return errCode;
1163 }
1164 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1165 if (errCode != E_OK) {
1166 ReleaseHandle(writeHandle);
1167 return errCode;
1168 }
1169 if (opType == OpType::INSERT) {
1170 errCode = writeHandle->UpdateCloudLogGid(data);
1171 if (errCode != E_OK) {
1172 LOGE("Failed to fill cloud log gid, %d.", errCode);
1173 writeHandle->Rollback();
1174 ReleaseHandle(writeHandle);
1175 return errCode;
1176 }
1177 if (!data.insData.assets.empty()) {
1178 errCode = writeHandle->FillCloudAssetForUpload(data.tableName, data.insData);
1179 }
1180 } else {
1181 errCode = writeHandle->FillCloudAssetForUpload(data.tableName, data.updData);
1182 }
1183 if (errCode != E_OK) {
1184 LOGE("Failed to fill cloud asset, %d.", errCode);
1185 writeHandle->Rollback();
1186 ReleaseHandle(writeHandle);
1187 return errCode;
1188 }
1189 errCode = writeHandle->Commit();
1190 ReleaseHandle(writeHandle);
1191 return errCode;
1192 }
1193
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1194 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1195 {
1196 syncAbleEngine_ = syncAbleEngine;
1197 }
1198
GetIdentify() const1199 std::string RelationalSyncAbleStorage::GetIdentify() const
1200 {
1201 if (storageEngine_ == nullptr) {
1202 LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1203 return "";
1204 }
1205 return storageEngine_->GetIdentifier();
1206 }
1207
EraseDataChangeCallback(uint64_t connectionId)1208 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1209 {
1210 std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
1211 auto it = dataChangeCallbackMap_.find(connectionId);
1212 if (it != dataChangeCallbackMap_.end()) {
1213 dataChangeCallbackMap_.erase(it);
1214 }
1215 }
1216
ReleaseContinueToken(ContinueToken & continueStmtToken) const1217 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1218 {
1219 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1220 if (token == nullptr || !(token->CheckValid())) {
1221 LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1222 return;
1223 }
1224 delete token;
1225 continueStmtToken = nullptr;
1226 }
1227 }
1228 #endif