1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17
18 #include <utility>
19
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34
35 namespace DistributedDB {
36 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)37 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
38 {
39 static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
40 (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
41 std::string(CLOSE_CONN_TASK),
42 [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
43 );
44 }
45 }
46
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48 : storageEngine_(std::move(engine)),
49 reusedHandle_(nullptr),
50 isCachedOption_(false)
51 {}
52
~RelationalSyncAbleStorage()53 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
54 {
55 syncAbleEngine_ = nullptr;
56 }
57
58 // Get interface type of this relational db.
GetInterfaceType() const59 int RelationalSyncAbleStorage::GetInterfaceType() const
60 {
61 return SYNC_RELATION;
62 }
63
64 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()65 void RelationalSyncAbleStorage::IncRefCount()
66 {
67 LOGD("RelationalSyncAbleStorage ref +1");
68 IncObjRef(this);
69 }
70
71 // Drop the interface ref-count.
DecRefCount()72 void RelationalSyncAbleStorage::DecRefCount()
73 {
74 LOGD("RelationalSyncAbleStorage ref -1");
75 DecObjRef(this);
76 }
77
78 // Get the identifier of this rdb.
GetIdentifier() const79 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
80 {
81 std::string identifier = storageEngine_->GetIdentifier();
82 return std::vector<uint8_t>(identifier.begin(), identifier.end());
83 }
84
GetDualTupleIdentifier() const85 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
86 {
87 std::string identifier = storageEngine_->GetProperties().GetStringProp(
88 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
89 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
90 return identifierVect;
91 }
92
93 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const94 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp ×tamp) const
95 {
96 int errCode = E_OK;
97 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
98 if (handle == nullptr) {
99 return;
100 }
101 timestamp = 0;
102 errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
103 if (errCode != E_OK) {
104 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
105 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
106 }
107 ReleaseHandle(handle);
108 return;
109 }
110
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const111 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const
112 {
113 int errCode = E_OK;
114 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
115 if (handle == nullptr) {
116 return errCode;
117 }
118 timestamp = 0;
119 errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
120 if (errCode != E_OK) {
121 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
122 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
123 }
124 ReleaseHandle(handle);
125 return errCode;
126 }
127
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const128 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
129 OperatePerm perm) const
130 {
131 if (storageEngine_ == nullptr) {
132 errCode = -E_INVALID_DB;
133 return nullptr;
134 }
135 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
136 storageEngine_->FindExecutor(isWrite, perm, errCode));
137 if (handle == nullptr) {
138 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
139 }
140 return handle;
141 }
142
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const143 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
144 int &errCode, OperatePerm perm) const
145 {
146 if (storageEngine_ == nullptr) {
147 errCode = -E_INVALID_DB;
148 return nullptr;
149 }
150 if (transactionHandle_ != nullptr) {
151 return transactionHandle_;
152 }
153 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
154 storageEngine_->FindExecutor(isWrite, perm, errCode));
155 if (errCode != E_OK) {
156 ReleaseHandle(handle);
157 handle = nullptr;
158 }
159 return handle;
160 }
161
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const162 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
163 {
164 if (storageEngine_ == nullptr) {
165 return;
166 }
167 StorageExecutor *databaseHandle = handle;
168 storageEngine_->Recycle(databaseHandle);
169 std::function<void()> listener = nullptr;
170 {
171 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
172 listener = heartBeatListener_;
173 }
174 if (listener) {
175 listener();
176 }
177 }
178
179 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const180 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
181 {
182 if (storageEngine_ == nullptr) {
183 return -E_INVALID_DB;
184 }
185 if (key.size() > DBConstant::MAX_KEY_SIZE) {
186 return -E_INVALID_ARGS;
187 }
188 int errCode = E_OK;
189 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
190 if (handle == nullptr) {
191 return errCode;
192 }
193 errCode = handle->GetKvData(key, value);
194 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
195 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
196 }
197 ReleaseHandle(handle);
198 return errCode;
199 }
200
201 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)202 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
203 {
204 if (storageEngine_ == nullptr) {
205 return -E_INVALID_DB;
206 }
207 int errCode = E_OK;
208 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
209 if (handle == nullptr) {
210 return errCode;
211 }
212
213 errCode = handle->PutKvData(key, value); // meta doesn't need time.
214 if (errCode != E_OK) {
215 LOGE("Put kv data err:%d", errCode);
216 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
217 }
218 ReleaseHandle(handle);
219 return errCode;
220 }
221
PutMetaData(const Key & key,const Value & value,bool isInTransaction)222 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
223 {
224 if (storageEngine_ == nullptr) {
225 return -E_INVALID_DB;
226 }
227 int errCode = E_OK;
228 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
229 std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
230
231 // try to recycle using the handle
232 if (isInTransaction) {
233 handLock.lock();
234 if (reusedHandle_ != nullptr) {
235 handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
236 } else {
237 isInTransaction = false;
238 handLock.unlock();
239 }
240 }
241
242 if (handle == nullptr) {
243 handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
244 if (handle == nullptr) {
245 return errCode;
246 }
247 }
248
249 errCode = handle->PutKvData(key, value);
250 if (errCode != E_OK) {
251 LOGE("Put kv data err:%d", errCode);
252 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
253 }
254 if (!isInTransaction) {
255 ReleaseHandle(handle);
256 }
257 return errCode;
258 }
259
260 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)261 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
262 {
263 if (storageEngine_ == nullptr) {
264 return -E_INVALID_DB;
265 }
266 for (const auto &key : keys) {
267 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
268 return -E_INVALID_ARGS;
269 }
270 }
271 int errCode = E_OK;
272 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
273 if (handle == nullptr) {
274 return errCode;
275 }
276
277 handle->StartTransaction(TransactType::IMMEDIATE);
278 errCode = handle->DeleteMetaData(keys);
279 if (errCode != E_OK) {
280 handle->Rollback();
281 LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
282 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
283 } else {
284 handle->Commit();
285 }
286 ReleaseHandle(handle);
287 return errCode;
288 }
289
290 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const291 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
292 {
293 if (storageEngine_ == nullptr) {
294 return -E_INVALID_DB;
295 }
296 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
297 return -E_INVALID_ARGS;
298 }
299
300 int errCode = E_OK;
301 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
302 if (handle == nullptr) {
303 return errCode;
304 }
305
306 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
307 if (errCode != E_OK) {
308 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
309 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
310 }
311 ReleaseHandle(handle);
312 return errCode;
313 }
314
315 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const316 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
317 {
318 if (storageEngine_ == nullptr) {
319 return -E_INVALID_DB;
320 }
321 int errCode = E_OK;
322 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
323 if (handle == nullptr) {
324 return errCode;
325 }
326
327 errCode = handle->GetAllMetaKeys(keys);
328 if (errCode != E_OK) {
329 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
330 }
331 ReleaseHandle(handle);
332 return errCode;
333 }
334
GetDbProperties() const335 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
336 {
337 return storageEngine_->GetProperties();
338 }
339
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)340 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
341 {
342 int errCode = E_OK;
343 for (auto &item : dataItems) {
344 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
345 if (entry == nullptr) {
346 errCode = -E_OUT_OF_MEMORY;
347 LOGE("GetKvEntries failed, errCode:%d", errCode);
348 SingleVerKvEntry::Release(entries);
349 break;
350 }
351 entry->SetEntryData(std::move(item));
352 entries.push_back(entry);
353 }
354 return errCode;
355 }
356
GetDataItemSerialSize(const DataItem & item,size_t appendLen)357 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
358 {
359 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
360 // the size would not be very large.
361 static const size_t maxOrigDevLength = 40;
362 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
363 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
364 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
365 return dataSize;
366 }
367
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)368 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
369 size_t appendLen)
370 {
371 bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
372 for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
373 blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
374 reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
375 }
376 return !reachThreshold;
377 }
378
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)379 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
380 SQLiteSingleVerRelationalContinueToken *&token)
381 {
382 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
383 delete token;
384 token = nullptr;
385 return;
386 }
387
388 if (dataItems.empty()) {
389 errCode = -E_INTERNAL_ERROR;
390 LOGE("Get data unfinished but data items is empty.");
391 delete token;
392 token = nullptr;
393 return;
394 }
395 token->SetNextBeginTime(dataItems.back());
396 token->UpdateNextSyncOffset(dataItems.size());
397 }
398
399 /**
400 * Caller must ensure that parameter token is valid.
401 * If error happened, token will be deleted here.
402 */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const403 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
404 SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
405 {
406 if (storageEngine_ == nullptr) {
407 return -E_INVALID_DB;
408 }
409
410 int errCode = E_OK;
411 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
412 OperatePerm::NORMAL_PERM, errCode));
413 if (handle == nullptr) {
414 goto ERROR;
415 }
416
417 do {
418 errCode = handle->GetSyncDataByQuery(dataItems,
419 Parcel::GetAppendedLen(),
420 dataSizeInfo,
421 [token](sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt, bool &isGettingDeletedData) {
422 return token->GetStatement(db, queryStmt, fullStmt, isGettingDeletedData);
423 }, storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
424 if (errCode == -E_FINISHED) {
425 token->FinishGetData();
426 errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
427 }
428 } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
429
430 ERROR:
431 if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
432 dataItems.clear();
433 }
434 ProcessContinueTokenForQuerySync(dataItems, errCode, token);
435 ReleaseHandle(handle);
436 return errCode;
437 }
438
439 // use kv struct data to sync
440 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const441 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
442 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
443 std::vector<SingleVerKvEntry *> &entries) const
444 {
445 if (!timeRange.IsValid()) {
446 return -E_INVALID_ARGS;
447 }
448 query.SetSchema(storageEngine_->GetSchema());
449 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
450 if (token == nullptr) {
451 LOGE("[SingleVerNStore] Allocate continue token failed.");
452 return -E_OUT_OF_MEMORY;
453 }
454
455 continueStmtToken = static_cast<ContinueToken>(token);
456 return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
457 }
458
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const459 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
460 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
461 {
462 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
463 if (!token->CheckValid()) {
464 return -E_INVALID_ARGS;
465 }
466 RelationalSchemaObject schema = storageEngine_->GetSchema();
467 const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
468 std::vector<std::string> fieldNames;
469 fieldNames.reserve(fieldInfos.size());
470 for (const auto &fieldInfo : fieldInfos) { // order by cid
471 fieldNames.push_back(fieldInfo.GetFieldName());
472 }
473 token->SetFieldNames(fieldNames);
474
475 std::vector<DataItem> dataItems;
476 int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
477 if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
478 continueStmtToken = static_cast<ContinueToken>(token);
479 return errCode;
480 }
481
482 int innerCode = GetKvEntriesByDataItems(entries, dataItems);
483 if (innerCode != E_OK) {
484 errCode = innerCode;
485 delete token;
486 token = nullptr;
487 }
488 continueStmtToken = static_cast<ContinueToken>(token);
489 return errCode;
490 }
491
492 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)493 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
494 {
495 std::vector<DataItem> dataItems;
496 for (const auto &itemEntry : entries) {
497 GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
498 if (entry != nullptr) {
499 DataItem item;
500 item.origDev = entry->GetOrigDevice();
501 item.flag = entry->GetFlag();
502 item.timestamp = entry->GetTimestamp();
503 item.writeTimestamp = entry->GetWriteTimestamp();
504 entry->GetKey(item.key);
505 entry->GetValue(item.value);
506 entry->GetHashKey(item.hashKey);
507 dataItems.push_back(item);
508 }
509 }
510 return dataItems;
511 }
512 }
513
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)514 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
515 const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
516 {
517 std::vector<DataItem> dataItems = ConvertEntries(entries);
518 return PutSyncData(object, dataItems, deviceName);
519 }
520
521 namespace {
GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)522 inline DistributedTableMode GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
523 {
524 return static_cast<DistributedTableMode>(engine->GetProperties().GetIntProp(
525 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
526 }
527
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)528 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
529 {
530 return GetCollaborationMode(engine) == DistributedTableMode::COLLABORATION;
531 }
532 }
533
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)534 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
535 const std::string &deviceName)
536 {
537 int errCode = E_OK;
538 LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
539 QueryObject query = object;
540 query.SetSchema(storageEngine_->GetSchema());
541
542 RelationalSchemaObject remoteSchema;
543 errCode = GetRemoteDeviceSchema(deviceName, remoteSchema);
544 if (errCode != E_OK) {
545 LOGE("Find remote schema failed. err=%d", errCode);
546 return errCode;
547 }
548
549 StoreInfo info = GetStoreInfo();
550 auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
551 remoteSchema.GetTable(query.GetTableName()).GetFieldInfos(), info);
552 inserter.SetEntries(dataItems);
553
554 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
555 if (handle == nullptr) {
556 return errCode;
557 }
558
559 DBDfxAdapter::StartTracing();
560
561 errCode = handle->SaveSyncItems(inserter);
562
563 DBDfxAdapter::FinishTracing();
564 if (errCode == E_OK) {
565 // dataItems size > 0 now because already check before
566 // all dataItems will write into db now, so need to observer notify here
567 // if some dataItems will not write into db in the future, observer notify here need change
568 ChangedData data;
569 TriggerObserverAction(deviceName, std::move(data), false);
570 }
571
572 ReleaseHandle(handle);
573 return errCode;
574 }
575
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)576 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
577 const std::string &deviceName)
578 {
579 if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
580 LOGW("Device length is invalid for sync put");
581 return -E_INVALID_ARGS;
582 }
583
584 int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
585 if (errCode != E_OK) {
586 LOGE("[Relational] PutSyncData errCode:%d", errCode);
587 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
588 }
589 return errCode;
590 }
591
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)592 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
593 {
594 (void) deviceName;
595 (void) isNeedNotify;
596 return -E_NOT_SUPPORT;
597 }
598
GetSchemaInfo() const599 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
600 {
601 return storageEngine_->GetSchema();
602 }
603
GetSecurityOption(SecurityOption & option) const604 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
605 {
606 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
607 if (isCachedOption_) {
608 option = securityOption_;
609 return E_OK;
610 }
611 std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
612 int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
613 if (errCode == E_OK) {
614 option = securityOption_;
615 isCachedOption_ = true;
616 }
617 return errCode;
618 }
619
NotifyRemotePushFinished(const std::string & deviceId) const620 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
621 {
622 return;
623 }
624
625 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const626 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
627 {
628 return OS::GetCurrentSysTimeInMicrosecond(outTime);
629 }
630
GetTablesQuery()631 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
632 {
633 auto tableNames = storageEngine_->GetSchema().GetTableNames();
634 std::vector<QuerySyncObject> queries;
635 queries.reserve(tableNames.size());
636 for (const auto &it : tableNames) {
637 queries.emplace_back(Query::Select(it));
638 }
639 return queries;
640 }
641
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)642 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
643 {
644 (void) queryObj;
645 return -E_NOT_SUPPORT;
646 }
647
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const648 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
649 const std::string &targetID, bool isPush) const
650 {
651 return E_OK;
652 }
653
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)654 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
655 const RelationalSyncStrategy &syncStrategy)
656 {
657 auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
658 DistributedTableMode::SPLIT_BY_DEVICE);
659 if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
660 LOGD("No need create device table in COLLABORATION mode.");
661 return E_OK;
662 }
663
664 int errCode = E_OK;
665 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
666 if (handle == nullptr) {
667 return errCode;
668 }
669
670 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
671 if (errCode != E_OK) {
672 LOGE("Start transaction failed:%d", errCode);
673 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
674 ReleaseHandle(handle);
675 return errCode;
676 }
677
678 StoreInfo info = GetStoreInfo();
679 for (const auto &[table, strategy] : syncStrategy) {
680 if (!strategy.permitSync) {
681 continue;
682 }
683
684 errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
685 if (errCode != E_OK) {
686 LOGE("Create distributed device table failed. %d", errCode);
687 break;
688 }
689 }
690
691 if (errCode == E_OK) {
692 errCode = handle->Commit();
693 } else {
694 (void)handle->Rollback();
695 }
696
697 ReleaseHandle(handle);
698 return errCode;
699 }
700
RegisterSchemaChangedCallback(const std::function<void ()> & callback)701 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
702 {
703 std::lock_guard lock(onSchemaChangedMutex_);
704 onSchemaChanged_ = callback;
705 return E_OK;
706 }
707
NotifySchemaChanged()708 void RelationalSyncAbleStorage::NotifySchemaChanged()
709 {
710 std::lock_guard lock(onSchemaChangedMutex_);
711 if (onSchemaChanged_) {
712 LOGD("Notify relational schema was changed");
713 onSchemaChanged_();
714 }
715 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const716 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
717 {
718 algorithmSet.clear();
719 DataCompression::GetCompressionAlgo(algorithmSet);
720 return E_OK;
721 }
722
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)723 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
724 const RelationalObserverAction &action)
725 {
726 int errCode = E_OK;
727 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, action, &errCode] () mutable {
728 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
729 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
730 auto it = dataChangeCallbackMap_.find(connectionId);
731 if (it != dataChangeCallbackMap_.end()) {
732 if (it->second.find(observer) != it->second.end()) {
733 LOGE("obsever already registered");
734 errCode = -E_ALREADY_SET;
735 return;
736 }
737 if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
738 LOGE("The number of relational observers has been over limit");
739 errCode = -E_MAX_LIMITS;
740 return;
741 }
742 it->second[observer] = action;
743 } else {
744 dataChangeCallbackMap_[connectionId][observer] = action;
745 }
746 LOGI("register relational observer ok");
747 }, nullptr, &dataChangeCallbackMap_);
748 ADAPTER_WAIT(handle);
749 return errCode;
750 }
751
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)752 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
753 {
754 if (observer == nullptr) {
755 EraseDataChangeCallback(connectionId);
756 return E_OK;
757 }
758 int errCode = -E_NOT_FOUND;
759 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, &errCode] () mutable {
760 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
761 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
762 auto it = dataChangeCallbackMap_.find(connectionId);
763 if (it == dataChangeCallbackMap_.end()) {
764 return;
765 }
766 auto action = it->second.find(observer);
767 if (action == it->second.end()) {
768 return;
769 }
770 it->second.erase(action);
771 LOGI("unregister relational observer.");
772 if (it->second.empty()) {
773 dataChangeCallbackMap_.erase(it);
774 LOGI("observer for this delegate is zero now");
775 }
776 errCode = E_OK;
777 }, nullptr, &dataChangeCallbackMap_);
778 ADAPTER_WAIT(handle);
779 return errCode;
780 }
781
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,const std::string & deviceName,const ChangedData & changedData,bool isChangedData,int & observerCnt)782 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
783 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
784 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, int &observerCnt)
785 {
786 for (auto &action : item.second) {
787 if (action.second == nullptr) {
788 continue;
789 }
790 observerCnt++;
791 ChangedData observerChangeData = changedData;
792 if (action.first != nullptr) {
793 FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
794 }
795 action.second(deviceName, std::move(observerChangeData), isChangedData);
796 }
797 }
798
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)799 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
800 ChangedData &&changedData, bool isChangedData)
801 {
802 IncObjRef(this);
803 int taskErrCode =
804 ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData] () mutable {
805 LOGD("begin to trigger relational observer.");
806 int observerCnt = 0;
807 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
808 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
809 for (const auto &item : dataChangeCallbackMap_) {
810 ExecuteDataChangeCallback(item, deviceName, changedData, isChangedData, observerCnt);
811 }
812 LOGD("relational observer size = %d", observerCnt);
813 DecObjRef(this);
814 }, &dataChangeCallbackMap_);
815 if (taskErrCode != E_OK) {
816 LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
817 DecObjRef(this);
818 }
819 }
820
RegisterHeartBeatListener(const std::function<void ()> & listener)821 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
822 {
823 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
824 heartBeatListener_ = listener;
825 }
826
CheckAndInitQueryCondition(QueryObject & query) const827 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
828 {
829 RelationalSchemaObject schema = storageEngine_->GetSchema();
830 TableInfo table = schema.GetTable(query.GetTableName());
831 if (!table.IsValid()) {
832 LOGE("Query table is not a distributed table.");
833 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
834 }
835 if (table.GetTableSyncType() == CLOUD_COOPERATION) {
836 LOGE("cloud table mode is not support");
837 return -E_NOT_SUPPORT;
838 }
839 query.SetSchema(schema);
840
841 int errCode = E_OK;
842 auto *handle = GetHandle(true, errCode);
843 if (handle == nullptr) {
844 return errCode;
845 }
846
847 errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
848 if (errCode != E_OK) {
849 LOGE("Check relational query condition failed. %d", errCode);
850 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
851 }
852
853 ReleaseHandle(handle);
854 return errCode;
855 }
856
CheckCompatible(const std::string & schema,uint8_t type) const857 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
858 {
859 // return true if is relational schema.
860 return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
861 }
862
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const863 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
864 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
865 {
866 if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
867 return -E_NOT_SUPPORT;
868 }
869 if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
870 LOGE("[ExecuteQuery] invalid args");
871 return -E_INVALID_ARGS;
872 }
873 int errCode = E_OK;
874 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
875 if (handle == nullptr) {
876 LOGE("[ExecuteQuery] get handle fail:%d", errCode);
877 return errCode;
878 }
879 errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
880 if (errCode != E_OK) {
881 LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
882 }
883 ReleaseHandle(handle);
884 return errCode;
885 }
886
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const887 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
888 RelationalRowDataSet &dataSet, ContinueToken &token) const
889 {
890 dataSet.Clear();
891 if (token == nullptr) {
892 // start query
893 std::vector<std::string> colNames;
894 std::vector<RelationalRowData *> data;
895 ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
896
897 int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
898 if (errCode != E_OK) {
899 return errCode;
900 }
901
902 // create one token
903 token = static_cast<ContinueToken>(
904 new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
905 if (token == nullptr) {
906 LOGE("ExecuteQuery OOM");
907 return -E_OUT_OF_MEMORY;
908 }
909 }
910
911 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
912 if (!remoteToken->CheckValid()) {
913 LOGE("ExecuteQuery invalid token");
914 return -E_INVALID_ARGS;
915 }
916
917 int errCode = remoteToken->GetData(packetSize, dataSet);
918 if (errCode == -E_UNFINISHED) {
919 errCode = E_OK;
920 } else {
921 if (errCode != E_OK) {
922 dataSet.Clear();
923 }
924 delete remoteToken;
925 remoteToken = nullptr;
926 token = nullptr;
927 }
928 LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
929 return errCode;
930 }
931
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)932 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
933 uint8_t type)
934 {
935 if (ReadSchemaType(type) != SchemaType::RELATIVE) {
936 return -E_INVALID_ARGS;
937 }
938
939 RelationalSchemaObject schemaObj;
940 int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
941 if (errCode != E_OK) {
942 LOGE("Parse remote schema failed. err=%d", errCode);
943 return errCode;
944 }
945
946 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
947 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
948 Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
949 errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
950 if (errCode != E_OK) {
951 LOGE("Save remote schema failed. err=%d", errCode);
952 return errCode;
953 }
954
955 return remoteDeviceSchema_.Put(deviceId, remoteSchema);
956 }
957
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj)958 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj)
959 {
960 if (schemaObj.IsSchemaValid()) {
961 return -E_INVALID_ARGS;
962 }
963
964 std::string remoteSchema;
965 int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
966 if (errCode == -E_NOT_FOUND) {
967 LOGW("Get remote device schema miss cached.");
968 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
969 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
970 Value remoteSchemaBuff;
971 errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
972 if (errCode != E_OK) {
973 LOGE("Get remote device schema from meta failed. err=%d", errCode);
974 return errCode;
975 }
976 remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
977 errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
978 }
979
980 if (errCode != E_OK) {
981 LOGE("Get remote device schema failed. err=%d", errCode);
982 return errCode;
983 }
984
985 errCode = schemaObj.ParseFromSchemaString(remoteSchema);
986 if (errCode != E_OK) {
987 LOGE("Parse remote schema failed. err=%d", errCode);
988 }
989 return errCode;
990 }
991
SetReusedHandle(StorageExecutor * handle)992 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
993 {
994 std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
995 reusedHandle_ = handle;
996 }
997
ReleaseRemoteQueryContinueToken(ContinueToken & token) const998 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
999 {
1000 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
1001 delete remoteToken;
1002 remoteToken = nullptr;
1003 token = nullptr;
1004 }
1005
GetStoreInfo() const1006 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
1007 {
1008 StoreInfo info = {
1009 storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
1010 storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
1011 storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
1012 };
1013 return info;
1014 }
1015
StartTransaction(TransactType type)1016 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
1017 {
1018 if (storageEngine_ == nullptr) {
1019 return -E_INVALID_DB;
1020 }
1021 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1022 if (transactionHandle_ != nullptr) {
1023 LOGD("Transaction started already.");
1024 return -E_TRANSACT_STATE;
1025 }
1026 int errCode = E_OK;
1027 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1028 storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1029 if (handle == nullptr) {
1030 ReleaseHandle(handle);
1031 return errCode;
1032 }
1033 errCode = handle->StartTransaction(type);
1034 if (errCode != E_OK) {
1035 ReleaseHandle(handle);
1036 return errCode;
1037 }
1038 transactionHandle_ = handle;
1039 return errCode;
1040 }
1041
Commit()1042 int RelationalSyncAbleStorage::Commit()
1043 {
1044 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1045 if (transactionHandle_ == nullptr) {
1046 LOGE("relation database is null or the transaction has not been started");
1047 return -E_INVALID_DB;
1048 }
1049 int errCode = transactionHandle_->Commit();
1050 ReleaseHandle(transactionHandle_);
1051 transactionHandle_ = nullptr;
1052 LOGD("connection commit transaction!");
1053 return errCode;
1054 }
1055
Rollback()1056 int RelationalSyncAbleStorage::Rollback()
1057 {
1058 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1059 if (transactionHandle_ == nullptr) {
1060 LOGE("Invalid handle for rollback or the transaction has not been started.");
1061 return -E_INVALID_DB;
1062 }
1063
1064 int errCode = transactionHandle_->Rollback();
1065 ReleaseHandle(transactionHandle_);
1066 transactionHandle_ = nullptr;
1067 LOGI("connection rollback transaction!");
1068 return errCode;
1069 }
1070
GetAllUploadCount(const QuerySyncObject & query,const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1071 int RelationalSyncAbleStorage::GetAllUploadCount(const QuerySyncObject &query,
1072 const std::vector<Timestamp> ×tampVec, bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1073 {
1074 int errCode = E_OK;
1075 auto *handle = GetHandleExpectTransaction(false, errCode);
1076 if (handle == nullptr) {
1077 return errCode;
1078 }
1079 QuerySyncObject queryObj = query;
1080 queryObj.SetSchema(GetSchemaInfo());
1081 errCode = handle->GetAllUploadCount(timestampVec, isCloudForcePush, isCompensatedTask, queryObj, count);
1082 if (transactionHandle_ == nullptr) {
1083 ReleaseHandle(handle);
1084 }
1085 return errCode;
1086 }
1087
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1088 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp,
1089 bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1090 {
1091 int errCode = E_OK;
1092 auto *handle = GetHandleExpectTransaction(false, errCode);
1093 if (handle == nullptr) {
1094 return errCode;
1095 }
1096 QuerySyncObject queryObj = query;
1097 queryObj.SetSchema(GetSchemaInfo());
1098 errCode = handle->GetUploadCount(timestamp, isCloudForcePush, isCompensatedTask, queryObj, count);
1099 if (transactionHandle_ == nullptr) {
1100 ReleaseHandle(handle);
1101 }
1102 return errCode;
1103 }
1104
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1105 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1106 const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1107 {
1108 if (transactionHandle_ == nullptr) {
1109 LOGE("the transaction has not been started");
1110 return -E_INVALID_DB;
1111 }
1112 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1113 QuerySyncObject query = querySyncObject;
1114 query.SetSchema(GetSchemaInfo());
1115 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1116 if (token == nullptr) {
1117 LOGE("[SingleVerNStore] Allocate continue token failed.");
1118 return -E_OUT_OF_MEMORY;
1119 }
1120 token->SetCloudTableSchema(tableSchema);
1121 continueStmtToken = static_cast<ContinueToken>(token);
1122 return GetCloudDataNext(continueStmtToken, cloudDataResult);
1123 }
1124
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1125 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1126 CloudSyncData &cloudDataResult)
1127 {
1128 if (continueStmtToken == nullptr) {
1129 return -E_INVALID_ARGS;
1130 }
1131 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1132 if (!token->CheckValid()) {
1133 return -E_INVALID_ARGS;
1134 }
1135 if (transactionHandle_ == nullptr) {
1136 LOGE("the transaction has not been started, release the token");
1137 ReleaseCloudDataToken(continueStmtToken);
1138 return -E_INVALID_DB;
1139 }
1140 cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1141 auto config = GetCloudSyncConfig();
1142 transactionHandle_->SetUploadConfig(config.maxUploadCount, config.maxUploadSize);
1143 int errCode = transactionHandle_->GetSyncCloudData(uploadRecorder_, cloudDataResult, *token);
1144 LOGI("mode:%d upload data, ins:%zu, upd:%zu, del:%zu, lock:%zu", cloudDataResult.mode,
1145 cloudDataResult.insData.extend.size(), cloudDataResult.updData.extend.size(),
1146 cloudDataResult.delData.extend.size(), cloudDataResult.lockData.extend.size());
1147 if (errCode != -E_UNFINISHED) {
1148 delete token;
1149 token = nullptr;
1150 }
1151 continueStmtToken = static_cast<ContinueToken>(token);
1152 if (errCode != E_OK && errCode != -E_UNFINISHED) {
1153 return errCode;
1154 }
1155 int fillRefGidCode = FillReferenceData(cloudDataResult);
1156 return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1157 }
1158
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)1159 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1160 bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
1161 {
1162 int errCode = E_OK;
1163 auto *handle = GetHandle(false, errCode);
1164 if (handle == nullptr) {
1165 return errCode;
1166 }
1167 Timestamp beginTime = 0u;
1168 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1169 QuerySyncObject query = querySyncObject;
1170 query.SetSchema(GetSchemaInfo());
1171 handle->SetTableSchema(tableSchema);
1172 errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, isCompensatedTask, cloudGid);
1173 ReleaseHandle(handle);
1174 if (errCode != E_OK) {
1175 LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1176 }
1177 return errCode;
1178 }
1179
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1180 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1181 {
1182 if (continueStmtToken == nullptr) {
1183 return E_OK;
1184 }
1185 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1186 if (!token->CheckValid()) {
1187 return E_OK;
1188 }
1189 int errCode = token->ReleaseCloudStatement();
1190 delete token;
1191 token = nullptr;
1192 return errCode;
1193 }
1194
GetSchemaFromDB(RelationalSchemaObject & schema)1195 int RelationalSyncAbleStorage::GetSchemaFromDB(RelationalSchemaObject &schema)
1196 {
1197 Key schemaKey;
1198 DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
1199 Value schemaVal;
1200 int errCode = GetMetaData(schemaKey, schemaVal);
1201 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1202 LOGE("Get relational schema from DB failed. %d", errCode);
1203 return errCode;
1204 } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
1205 LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
1206 return -E_NOT_FOUND;
1207 }
1208 std::string schemaStr;
1209 DBCommon::VectorToString(schemaVal, schemaStr);
1210 errCode = schema.ParseFromSchemaString(schemaStr);
1211 if (errCode != E_OK) {
1212 LOGE("Parse schema string from DB failed.");
1213 return errCode;
1214 }
1215 storageEngine_->SetSchema(schema);
1216 return errCode;
1217 }
1218
ChkSchema(const TableName & tableName)1219 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1220 {
1221 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1222 RelationalSchemaObject localSchema = GetSchemaInfo();
1223 int errCode = schemaMgr_.ChkSchema(tableName, localSchema);
1224 if (errCode == -E_SCHEMA_MISMATCH) {
1225 LOGI("Get schema by tableName %s failed.", DBCommon::STR_MASK(tableName));
1226 RelationalSchemaObject newSchema;
1227 errCode = GetSchemaFromDB(newSchema);
1228 if (errCode != E_OK) {
1229 LOGE("Get schema from db when check schema. err: %d", errCode);
1230 return -E_SCHEMA_MISMATCH;
1231 }
1232 errCode = schemaMgr_.ChkSchema(tableName, newSchema);
1233 }
1234 return errCode;
1235 }
1236
SetCloudDbSchema(const DataBaseSchema & schema)1237 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1238 {
1239 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1240 RelationalSchemaObject localSchema = GetSchemaInfo();
1241 schemaMgr_.SetCloudDbSchema(schema, localSchema);
1242 return E_OK;
1243 }
1244
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1245 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1246 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1247 {
1248 if (transactionHandle_ == nullptr) {
1249 LOGE(" the transaction has not been started");
1250 return -E_INVALID_DB;
1251 }
1252
1253 return GetInfoByPrimaryKeyOrGidInner(transactionHandle_, tableName, vBucket, dataInfoWithLog, assetInfo);
1254 }
1255
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1256 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1257 const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1258 {
1259 if (handle == nullptr) {
1260 return -E_INVALID_DB;
1261 }
1262 TableSchema tableSchema;
1263 int errCode = GetCloudTableSchema(tableName, tableSchema);
1264 if (errCode != E_OK) {
1265 LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1266 return errCode;
1267 }
1268 RelationalSchemaObject localSchema = GetSchemaInfo();
1269 handle->SetLocalSchema(localSchema);
1270 return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1271 }
1272
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1273 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1274 {
1275 if (transactionHandle_ == nullptr) {
1276 LOGE(" the transaction has not been started");
1277 return -E_INVALID_DB;
1278 }
1279 return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1280 }
1281
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1282 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1283 const std::string &tableName, DownloadData &downloadData)
1284 {
1285 TableSchema tableSchema;
1286 int errCode = GetCloudTableSchema(tableName, tableSchema);
1287 if (errCode != E_OK) {
1288 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1289 return errCode;
1290 }
1291 RelationalSchemaObject localSchema = GetSchemaInfo();
1292 handle->SetLocalSchema(localSchema);
1293 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1294 handle->SetLogicDelete(IsCurrentLogicDelete());
1295 errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1296 handle->SetLogicDelete(false);
1297 return errCode;
1298 }
1299
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1300 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1301 {
1302 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1303 cloudSchema = schemaMgr_.GetCloudDbSchema();
1304 return E_OK;
1305 }
1306
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1307 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1308 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1309 {
1310 if (transactionHandle_ == nullptr) {
1311 LOGE("the transaction has not been started");
1312 return -E_INVALID_DB;
1313 }
1314 transactionHandle_->SetLogicDelete(logicDelete_);
1315 std::vector<std::string> notifyTableList;
1316 int errCode = transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets, notifyTableList);
1317 if (!notifyTableList.empty()) {
1318 for (auto notifyTableName : notifyTableList) {
1319 ChangedData changedData;
1320 changedData.type = ChangedDataType::DATA;
1321 changedData.tableName = notifyTableName;
1322 std::vector<DistributedDB::Type> dataVec;
1323 DistributedDB::Type type;
1324 if (mode == FLAG_ONLY) {
1325 type = std::string(CloudDbConstant::FLAG_ONLY_MODE_NOTIFY);
1326 } else {
1327 type = std::string(CloudDbConstant::FLAG_AND_DATA_MODE_NOTIFY);
1328 }
1329 dataVec.push_back(type);
1330 changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec);
1331 TriggerObserverAction("CLOUD", std::move(changedData), true);
1332 }
1333 }
1334 transactionHandle_->SetLogicDelete(false);
1335 return errCode;
1336 }
1337
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1338 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1339 {
1340 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1341 return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1342 }
1343
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1344 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1345 bool isDownloadSuccess)
1346 {
1347 if (storageEngine_ == nullptr) {
1348 return -E_INVALID_DB;
1349 }
1350 if (transactionHandle_ == nullptr) {
1351 LOGE("the transaction has not been started when fill asset for download.");
1352 return -E_INVALID_DB;
1353 }
1354 TableSchema tableSchema;
1355 int errCode = GetCloudTableSchema(tableName, tableSchema);
1356 if (errCode != E_OK) {
1357 LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1358 return errCode;
1359 }
1360 errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess);
1361 if (errCode != E_OK) {
1362 LOGE("fill cloud asset for download failed.%d", errCode);
1363 }
1364 return errCode;
1365 }
1366
SetLogTriggerStatus(bool status)1367 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1368 {
1369 int errCode = E_OK;
1370 auto *handle = GetHandleExpectTransaction(false, errCode);
1371 if (handle == nullptr) {
1372 return errCode;
1373 }
1374 errCode = handle->SetLogTriggerStatus(status);
1375 if (transactionHandle_ == nullptr) {
1376 ReleaseHandle(handle);
1377 }
1378 return errCode;
1379 }
1380
SetCursorIncFlag(bool flag)1381 int RelationalSyncAbleStorage::SetCursorIncFlag(bool flag)
1382 {
1383 int errCode = E_OK;
1384 auto *handle = GetHandleExpectTransaction(false, errCode);
1385 if (handle == nullptr) {
1386 return errCode;
1387 }
1388 errCode = handle->SetCursorIncFlag(flag);
1389 if (transactionHandle_ == nullptr) {
1390 ReleaseHandle(handle);
1391 }
1392 return errCode;
1393 }
1394
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1395 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1396 bool ignoreEmptyGid)
1397 {
1398 if (storageEngine_ == nullptr) {
1399 return -E_INVALID_DB;
1400 }
1401 int errCode = E_OK;
1402 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1403 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1404 if (writeHandle == nullptr) {
1405 return errCode;
1406 }
1407 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1408 if (errCode != E_OK) {
1409 ReleaseHandle(writeHandle);
1410 return errCode;
1411 }
1412 errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1413 if (errCode != E_OK) {
1414 LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1415 writeHandle->Rollback();
1416 ReleaseHandle(writeHandle);
1417 return errCode;
1418 }
1419 errCode = writeHandle->Commit();
1420 ReleaseHandle(writeHandle);
1421 return errCode;
1422 }
1423
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1424 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1425 {
1426 syncAbleEngine_ = syncAbleEngine;
1427 }
1428
GetIdentify() const1429 std::string RelationalSyncAbleStorage::GetIdentify() const
1430 {
1431 if (storageEngine_ == nullptr) {
1432 LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1433 return "";
1434 }
1435 return storageEngine_->GetIdentifier();
1436 }
1437
EraseDataChangeCallback(uint64_t connectionId)1438 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1439 {
1440 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1441 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
1442 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
1443 auto it = dataChangeCallbackMap_.find(connectionId);
1444 if (it != dataChangeCallbackMap_.end()) {
1445 dataChangeCallbackMap_.erase(it);
1446 LOGI("erase all observer for this delegate.");
1447 }
1448 }, nullptr, &dataChangeCallbackMap_);
1449 ADAPTER_WAIT(handle);
1450 }
1451
ReleaseContinueToken(ContinueToken & continueStmtToken) const1452 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1453 {
1454 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1455 if (token == nullptr || !(token->CheckValid())) {
1456 LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1457 return;
1458 }
1459 delete token;
1460 continueStmtToken = nullptr;
1461 }
1462
CheckQueryValid(const QuerySyncObject & query)1463 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1464 {
1465 int errCode = E_OK;
1466 auto *handle = GetHandle(false, errCode);
1467 if (handle == nullptr) {
1468 return errCode;
1469 }
1470 errCode = handle->CheckQueryObjectLegal(query);
1471 if (errCode != E_OK) {
1472 ReleaseHandle(handle);
1473 return errCode;
1474 }
1475 QuerySyncObject queryObj = query;
1476 queryObj.SetSchema(GetSchemaInfo());
1477 int64_t count = 0;
1478 errCode = handle->GetUploadCount(UINT64_MAX, false, false, queryObj, count);
1479 ReleaseHandle(handle);
1480 if (errCode != E_OK) {
1481 LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1482 return -E_INVALID_ARGS;
1483 }
1484 return errCode;
1485 }
1486
CreateTempSyncTrigger(const std::string & tableName)1487 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1488 {
1489 int errCode = E_OK;
1490 auto *handle = GetHandle(true, errCode);
1491 if (handle == nullptr) {
1492 return errCode;
1493 }
1494 errCode = CreateTempSyncTriggerInner(handle, tableName, true);
1495 ReleaseHandle(handle);
1496 if (errCode != E_OK) {
1497 LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1498 }
1499 return errCode;
1500 }
1501
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1502 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1503 ChangeProperties &changeProperties)
1504 {
1505 int errCode = E_OK;
1506 auto *handle = GetHandle(false, errCode);
1507 if (handle == nullptr) {
1508 return errCode;
1509 }
1510 errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1511 ReleaseHandle(handle);
1512 if (errCode != E_OK) {
1513 LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1514 }
1515 return errCode;
1516 }
1517
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1518 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1519 {
1520 if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1521 changedData.field = {};
1522 for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1523 changedData.primaryData[i].clear();
1524 }
1525 }
1526 if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1527 changedData.properties = {};
1528 }
1529 }
1530
ClearAllTempSyncTrigger()1531 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1532 {
1533 int errCode = E_OK;
1534 auto *handle = GetHandle(true, errCode);
1535 if (handle == nullptr) {
1536 return errCode;
1537 }
1538 errCode = handle->ClearAllTempSyncTrigger();
1539 ReleaseHandle(handle);
1540 if (errCode != E_OK) {
1541 LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1542 }
1543 return errCode;
1544 }
1545
FillReferenceData(CloudSyncData & syncData)1546 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1547 {
1548 std::map<int64_t, Entries> referenceGid;
1549 int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1550 if (errCode != E_OK) {
1551 LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1552 return errCode;
1553 }
1554 errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1555 if (errCode != E_OK) {
1556 return errCode;
1557 }
1558 referenceGid.clear();
1559 errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1560 if (errCode != E_OK) {
1561 LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1562 return errCode;
1563 }
1564 return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1565 }
1566
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1567 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1568 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1569 {
1570 if (referenceGid.empty()) {
1571 return E_OK;
1572 }
1573 int ignoredCount = 0;
1574 for (size_t index = 0u; index < rowid.size(); index++) {
1575 if (index >= extend.size()) {
1576 LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1577 return -E_UNEXPECTED_DATA;
1578 }
1579 int64_t rowId = rowid[index];
1580 if (referenceGid.find(rowId) == referenceGid.end()) {
1581 // current data miss match reference data, we ignored it
1582 ignoredCount++;
1583 continue;
1584 }
1585 extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1586 }
1587 if (ignoredCount != 0) {
1588 LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1589 }
1590 return E_OK;
1591 }
1592
IsSharedTable(const std::string & tableName)1593 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1594 {
1595 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1596 return schemaMgr_.IsSharedTable(tableName);
1597 }
1598
GetSharedTableOriginNames()1599 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1600 {
1601 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1602 return schemaMgr_.GetSharedTableOriginNames();
1603 }
1604
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1605 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1606 std::map<int64_t, Entries> &referenceGid)
1607 {
1608 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1609 int errCode = GetTableReference(tableName, tableReference);
1610 if (errCode != E_OK) {
1611 return errCode;
1612 }
1613 if (tableReference.empty()) {
1614 LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1615 return E_OK;
1616 }
1617 auto *handle = GetHandle(false, errCode);
1618 if (handle == nullptr) {
1619 return errCode;
1620 }
1621 errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1622 ReleaseHandle(handle);
1623 return errCode;
1624 }
1625
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1626 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1627 std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1628 {
1629 if (storageEngine_ == nullptr) {
1630 LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1631 return -E_INVALID_DB;
1632 }
1633 RelationalSchemaObject schema = storageEngine_->GetSchema();
1634 auto referenceProperty = schema.GetReferenceProperty();
1635 if (referenceProperty.empty()) {
1636 return E_OK;
1637 }
1638 auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1639 if (errCode != E_OK) {
1640 return errCode;
1641 }
1642 for (const auto &property : referenceProperty) {
1643 if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1644 if (!IsSharedTable(tableName)) {
1645 reference[property.targetTableName].push_back(property);
1646 continue;
1647 }
1648 TableReferenceProperty tableReference;
1649 tableReference.sourceTableName = tableName;
1650 tableReference.columns = property.columns;
1651 tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1652 auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1653 if (ret != E_OK) {
1654 return ret;
1655 }
1656 tableReference.targetTableName = sharedTargetTable;
1657 reference[tableReference.targetTableName].push_back(tableReference);
1658 }
1659 }
1660 return E_OK;
1661 }
1662
GetSourceTableName(const std::string & tableName)1663 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1664 {
1665 std::pair<std::string, int> res = { "", E_OK };
1666 std::shared_ptr<DataBaseSchema> cloudSchema;
1667 (void) GetCloudDbSchema(cloudSchema);
1668 if (cloudSchema == nullptr) {
1669 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1670 return { "", -E_INTERNAL_ERROR };
1671 }
1672 for (const auto &table : cloudSchema->tables) {
1673 if (CloudStorageUtils::IsSharedTable(table)) {
1674 continue;
1675 }
1676 if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1677 DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1678 res.first = table.name;
1679 break;
1680 }
1681 }
1682 if (res.first.empty()) {
1683 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1684 res.second = -E_SCHEMA_MISMATCH;
1685 }
1686 return res;
1687 }
1688
GetSharedTargetTableName(const std::string & tableName)1689 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1690 {
1691 std::pair<std::string, int> res = { "", E_OK };
1692 std::shared_ptr<DataBaseSchema> cloudSchema;
1693 (void) GetCloudDbSchema(cloudSchema);
1694 if (cloudSchema == nullptr) {
1695 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1696 return { "", -E_INTERNAL_ERROR };
1697 }
1698 for (const auto &table : cloudSchema->tables) {
1699 if (CloudStorageUtils::IsSharedTable(table)) {
1700 continue;
1701 }
1702 if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1703 res.first = table.sharedTableName;
1704 break;
1705 }
1706 }
1707 if (res.first.empty()) {
1708 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1709 res.second = -E_SCHEMA_MISMATCH;
1710 }
1711 return res;
1712 }
1713
SetLogicDelete(bool logicDelete)1714 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1715 {
1716 logicDelete_ = logicDelete;
1717 LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1718 }
1719
SetCloudTaskConfig(const CloudTaskConfig & config)1720 void RelationalSyncAbleStorage::SetCloudTaskConfig(const CloudTaskConfig &config)
1721 {
1722 allowLogicDelete_ = config.allowLogicDelete;
1723 LOGD("[RelationalSyncAbleStorage] allow logic delete %d", static_cast<int>(config.allowLogicDelete));
1724 }
1725
IsCurrentLogicDelete() const1726 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1727 {
1728 return allowLogicDelete_ && logicDelete_;
1729 }
1730
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1731 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema,
1732 const std::string &gid, const Bytes &hashKey, VBucket &assets)
1733 {
1734 if (gid.empty() && hashKey.empty()) {
1735 LOGE("both gid and hashKey are empty.");
1736 return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
1737 }
1738 if (transactionHandle_ == nullptr) {
1739 LOGE("the transaction has not been started");
1740 return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
1741 }
1742 auto [errCode, status] = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1743 if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
1744 LOGE("get assets by gid or hashKey failed. %d", errCode);
1745 }
1746 return { errCode, status };
1747 }
1748
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1749 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1750 {
1751 int errCode = E_OK;
1752 auto *wHandle = GetHandle(true, errCode);
1753 if (wHandle == nullptr) {
1754 return errCode;
1755 }
1756 wHandle->SetIAssetLoader(loader);
1757 ReleaseHandle(wHandle);
1758 return errCode;
1759 }
1760
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1761 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1762 const std::vector<VBucket> &records)
1763 {
1764 int errCode = E_OK;
1765 auto *handle = GetHandle(true, errCode);
1766 if (errCode != E_OK) {
1767 return errCode;
1768 }
1769 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1770 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1771 errCode = UpsertDataInner(handle, tableName, records);
1772 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1773 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1774 ReleaseHandle(handle);
1775 return errCode;
1776 }
1777
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1778 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1779 const std::string &tableName, const std::vector<VBucket> &records)
1780 {
1781 int errCode = E_OK;
1782 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1783 if (errCode != E_OK) {
1784 LOGE("[RDBStorageEngine] start transaction failed %d when upsert data", errCode);
1785 return errCode;
1786 }
1787 errCode = CreateTempSyncTriggerInner(handle, tableName);
1788 if (errCode == E_OK) {
1789 errCode = UpsertDataInTransaction(handle, tableName, records);
1790 (void) handle->ClearAllTempSyncTrigger();
1791 }
1792 if (errCode == E_OK) {
1793 errCode = handle->Commit();
1794 if (errCode != E_OK) {
1795 LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1796 }
1797 } else {
1798 int ret = handle->Rollback();
1799 if (ret != E_OK) {
1800 LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1801 }
1802 }
1803 return errCode;
1804 }
1805
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1806 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1807 const std::string &tableName, const std::vector<VBucket> &records)
1808 {
1809 TableSchema tableSchema;
1810 int errCode = GetCloudTableSchema(tableName, tableSchema);
1811 if (errCode != E_OK) {
1812 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1813 return errCode;
1814 }
1815 TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1816 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1817 std::set<std::vector<uint8_t>> primaryKeys;
1818 DownloadData downloadData;
1819 for (const auto &record : records) {
1820 DataInfoWithLog dataInfoWithLog;
1821 VBucket assetInfo;
1822 auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1823 tableSchema, localTable, pkMap, false);
1824 if (errorCode != E_OK) {
1825 return errorCode;
1826 }
1827 errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1828 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1829 return errCode;
1830 }
1831 VBucket recordCopy = record;
1832 if ((errCode == -E_NOT_FOUND ||
1833 (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1834 primaryKeys.find(hashValue) == primaryKeys.end()) {
1835 downloadData.opType.push_back(OpType::INSERT);
1836 auto currentTime = TimeHelper::GetSysCurrentTime();
1837 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1838 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1839 primaryKeys.insert(hashValue);
1840 } else {
1841 downloadData.opType.push_back(OpType::UPDATE);
1842 recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1843 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1844 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1845 recordCopy[CloudDbConstant::SHARING_RESOURCE_FIELD] = dataInfoWithLog.logInfo.sharingResource;
1846 recordCopy[CloudDbConstant::VERSION_FIELD] = dataInfoWithLog.logInfo.version;
1847 }
1848 downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1849 downloadData.data.push_back(std::move(recordCopy));
1850 }
1851 return PutCloudSyncDataInner(handle, tableName, downloadData);
1852 }
1853
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)1854 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, bool recordConflict,
1855 const LogInfo &logInfo)
1856 {
1857 if (transactionHandle_ == nullptr) {
1858 LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1859 return -E_INVALID_DB;
1860 }
1861 std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(tableName, recordConflict, logInfo);
1862 return transactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
1863 }
1864
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1865 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1866 OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1867 {
1868 TableSchema tableSchema;
1869 int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1870 if (errCode != E_OK) {
1871 LOGE("get table schema failed when fill log and asset. %d", errCode);
1872 return errCode;
1873 }
1874 errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1875 if (errCode != E_OK) {
1876 return errCode;
1877 }
1878 if (opType == OpType::INSERT) {
1879 errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.insData, CloudWaterType::INSERT);
1880 } else if (opType == OpType::UPDATE) {
1881 errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.updData, CloudWaterType::UPDATE);
1882 } else if (opType == OpType::DELETE) {
1883 errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.delData, CloudWaterType::DELETE);
1884 } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1885 errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.lockData, CloudWaterType::BUTT, true);
1886 }
1887 return errCode;
1888 }
1889
UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const CloudSyncBatch & updateData,const CloudWaterType & type,bool isLock)1890 int RelationalSyncAbleStorage::UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle,
1891 const std::string &tableName, const CloudSyncBatch &updateData, const CloudWaterType &type, bool isLock)
1892 {
1893 if (updateData.timestamp.size() != updateData.extend.size()) {
1894 LOGE("the num of extend:%zu and timestamp:%zu is not equal.",
1895 updateData.extend.size(), updateData.timestamp.size());
1896 return -E_INVALID_ARGS;
1897 }
1898 for (size_t i = 0; i < updateData.extend.size(); ++i) {
1899 const auto &record = updateData.extend[i];
1900 if (DBCommon::IsRecordError(record) || DBCommon::IsRecordAssetsMissing(record) ||
1901 DBCommon::IsRecordVersionConflict(record) || isLock) {
1902 if (DBCommon::IsRecordAssetsMissing(record)) {
1903 LOGI("[RDBStorage][UpdateRecordFlagAfterUpload] Record assets missing, skip update.");
1904 }
1905 int errCode = handle->UpdateRecordStatus(tableName, CloudDbConstant::TO_LOCAL_CHANGE,
1906 updateData.hashKey[i]);
1907 if (errCode != E_OK) {
1908 LOGE("[RDBStorage] Update record status failed in index %zu", i);
1909 return errCode;
1910 }
1911 continue;
1912 }
1913 const auto &rowId = updateData.rowid[i];
1914 std::string cloudGid;
1915 (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, record, cloudGid);
1916 LogInfo logInfo;
1917 logInfo.cloudGid = cloudGid;
1918 logInfo.timestamp = updateData.timestamp[i];
1919 logInfo.dataKey = rowId;
1920 logInfo.hashKey = updateData.hashKey[i];
1921 std::string sql = CloudStorageUtils::GetUpdateRecordFlagSqlUpload(tableName, DBCommon::IsRecordIgnored(record),
1922 logInfo, record, type);
1923 int errCode = handle->UpdateRecordFlag(tableName, sql, logInfo);
1924 if (errCode != E_OK) {
1925 LOGE("[RDBStorage] Update record flag failed in index %zu", i);
1926 return errCode;
1927 }
1928 handle->MarkFlagAsUploadFinished(tableName, updateData.hashKey[i], updateData.timestamp[i]);
1929 uploadRecorder_.RecordUploadRecord(tableName, logInfo.hashKey, type, updateData.timestamp[i]);
1930 }
1931 return E_OK;
1932 }
1933
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users)1934 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
1935 std::vector<std::string> &users)
1936 {
1937 std::vector<TableSchema> tables;
1938 int errCode = GetCloudTableWithoutShared(tables);
1939 if (errCode != E_OK) {
1940 return errCode;
1941 }
1942 if (tables.empty()) {
1943 LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1944 return E_OK;
1945 }
1946 auto *handle = GetHandle(true, errCode);
1947 if (errCode != E_OK) {
1948 return errCode;
1949 }
1950 errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery);
1951 ReleaseHandle(handle);
1952 return errCode;
1953 }
1954
ClearUnLockingNoNeedCompensated()1955 int RelationalSyncAbleStorage::ClearUnLockingNoNeedCompensated()
1956 {
1957 std::vector<TableSchema> tables;
1958 int errCode = GetCloudTableWithoutShared(tables);
1959 if (errCode != E_OK) {
1960 return errCode;
1961 }
1962 if (tables.empty()) {
1963 LOGI("[RDBStorage] Table is empty, no need to clear unlocking status");
1964 return E_OK;
1965 }
1966 auto *handle = GetHandle(true, errCode);
1967 if (errCode != E_OK) {
1968 return errCode;
1969 }
1970 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1971 if (errCode != E_OK) {
1972 ReleaseHandle(handle);
1973 return errCode;
1974 }
1975 for (const auto &table : tables) {
1976 errCode = handle->ClearUnLockingStatus(table.name);
1977 if (errCode != E_OK) {
1978 LOGW("[ClearUnLockingNoNeedCompensated] clear unlocking status failed, continue! errCode=%d", errCode);
1979 }
1980 }
1981 errCode = handle->Commit();
1982 if (errCode != E_OK) {
1983 LOGE("[ClearUnLockingNoNeedCompensated] commit failed %d when clear unlocking status", errCode);
1984 }
1985 ReleaseHandle(handle);
1986 return errCode;
1987 }
1988
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1989 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1990 {
1991 const auto tableInfos = GetSchemaInfo().GetTables();
1992 for (const auto &[tableName, info] : tableInfos) {
1993 if (info.GetSharedTableMark()) {
1994 continue;
1995 }
1996 TableSchema schema;
1997 int errCode = GetCloudTableSchema(tableName, schema);
1998 if (errCode == -E_NOT_FOUND) {
1999 continue;
2000 }
2001 if (errCode != E_OK) {
2002 LOGW("[RDBStorage] Get cloud table failed %d", errCode);
2003 return errCode;
2004 }
2005 tables.push_back(schema);
2006 }
2007 return E_OK;
2008 }
2009
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery)2010 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2011 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery)
2012 {
2013 int errCode = E_OK;
2014 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
2015 if (errCode != E_OK) {
2016 return errCode;
2017 }
2018 for (const auto &table : tables) {
2019 if (!CheckTableSupportCompensatedSync(table)) {
2020 continue;
2021 }
2022
2023 std::vector<VBucket> syncDataPk;
2024 errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk);
2025 if (errCode != E_OK) {
2026 LOGW("[RDBStorageEngine] Get wait compensated sync date failed, continue! errCode=%d", errCode);
2027 errCode = E_OK;
2028 continue;
2029 }
2030 if (syncDataPk.empty()) {
2031 // no data need to compensated sync
2032 continue;
2033 }
2034 QuerySyncObject syncObject;
2035 errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, false, syncObject);
2036 if (errCode != E_OK) {
2037 LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
2038 errCode = E_OK;
2039 continue;
2040 }
2041 syncQuery.push_back(syncObject);
2042 }
2043 if (errCode == E_OK) {
2044 errCode = handle->Commit();
2045 if (errCode != E_OK) {
2046 LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
2047 }
2048 } else {
2049 int ret = handle->Rollback();
2050 if (ret != E_OK) {
2051 LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
2052 }
2053 }
2054 return errCode;
2055 }
2056
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,bool flag)2057 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2058 const std::string &tableName, bool flag)
2059 {
2060 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
2061 if (trackerTable.IsEmpty()) {
2062 trackerTable.SetTableName(tableName);
2063 }
2064 return handle->CreateTempSyncTrigger(trackerTable, flag);
2065 }
2066
CheckTableSupportCompensatedSync(const TableSchema & table)2067 bool RelationalSyncAbleStorage::CheckTableSupportCompensatedSync(const TableSchema &table)
2068 {
2069 auto it = std::find_if(table.fields.begin(), table.fields.end(), [](const auto &field) {
2070 return field.primary && (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets> ||
2071 field.type == TYPE_INDEX<Bytes>);
2072 });
2073 if (it != table.fields.end()) {
2074 LOGI("[RDBStorageEngine] Table contain not support pk field type, ignored");
2075 return false;
2076 }
2077 // check whether reference exist
2078 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2079 int errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
2080 if (errCode != E_OK) {
2081 LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2082 return false;
2083 }
2084 if (!tableReference.empty()) {
2085 LOGI("[RDBStorageEngine] current table exist reference property");
2086 return false;
2087 }
2088 return true;
2089 }
2090
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)2091 int RelationalSyncAbleStorage::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
2092 const std::set<std::string> &gidFilters)
2093 {
2094 if (transactionHandle_ == nullptr) {
2095 LOGE("the transaction has not been started");
2096 return -E_INVALID_DB;
2097 }
2098 int errCode = transactionHandle_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
2099 if (errCode != E_OK) {
2100 LOGE("[RelationalSyncAbleStorage] mark flag as consistent failed.%d", errCode);
2101 }
2102 return errCode;
2103 }
2104
GetCloudSyncConfig() const2105 CloudSyncConfig RelationalSyncAbleStorage::GetCloudSyncConfig() const
2106 {
2107 std::lock_guard<std::mutex> autoLock(configMutex_);
2108 return cloudSyncConfig_;
2109 }
2110
SetCloudSyncConfig(const CloudSyncConfig & config)2111 void RelationalSyncAbleStorage::SetCloudSyncConfig(const CloudSyncConfig &config)
2112 {
2113 std::lock_guard<std::mutex> autoLock(configMutex_);
2114 cloudSyncConfig_ = config;
2115 }
2116
IsTableExistReference(const std::string & table)2117 bool RelationalSyncAbleStorage::IsTableExistReference(const std::string &table)
2118 {
2119 // check whether reference exist
2120 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2121 int errCode = RelationalSyncAbleStorage::GetTableReference(table, tableReference);
2122 if (errCode != E_OK) {
2123 LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2124 return false;
2125 }
2126 return !tableReference.empty();
2127 }
2128
IsTableExistReferenceOrReferenceBy(const std::string & table)2129 bool RelationalSyncAbleStorage::IsTableExistReferenceOrReferenceBy(const std::string &table)
2130 {
2131 // check whether reference or reference by exist
2132 if (storageEngine_ == nullptr) {
2133 LOGE("[IsTableExistReferenceOrReferenceBy] storage is null when get reference gid");
2134 return false;
2135 }
2136 RelationalSchemaObject schema = storageEngine_->GetSchema();
2137 auto referenceProperty = schema.GetReferenceProperty();
2138 if (referenceProperty.empty()) {
2139 return false;
2140 }
2141 auto [sourceTableName, errCode] = GetSourceTableName(table);
2142 if (errCode != E_OK) {
2143 return false;
2144 }
2145 for (const auto &property : referenceProperty) {
2146 if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName) ||
2147 DBCommon::CaseInsensitiveCompare(property.targetTableName, sourceTableName)) {
2148 return true;
2149 }
2150 }
2151 return false;
2152 }
2153
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)2154 void RelationalSyncAbleStorage::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
2155 Timestamp localMark)
2156 {
2157 uploadRecorder_.ReleaseUploadRecord(tableName, type, localMark);
2158 }
2159 }
2160 #endif