1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17
18 #include <utility>
19
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34
35 namespace DistributedDB {
36 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)37 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
38 {
39 static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
40 (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
41 std::string(CLOSE_CONN_TASK),
42 [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
43 );
44 }
45 }
46
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48 : storageEngine_(std::move(engine)),
49 reusedHandle_(nullptr),
50 isCachedOption_(false)
51 {}
52
~RelationalSyncAbleStorage()53 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
54 {
55 syncAbleEngine_ = nullptr;
56 }
57
58 // Get interface type of this relational db.
GetInterfaceType() const59 int RelationalSyncAbleStorage::GetInterfaceType() const
60 {
61 return SYNC_RELATION;
62 }
63
64 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()65 void RelationalSyncAbleStorage::IncRefCount()
66 {
67 LOGD("RelationalSyncAbleStorage ref +1");
68 IncObjRef(this);
69 }
70
71 // Drop the interface ref-count.
DecRefCount()72 void RelationalSyncAbleStorage::DecRefCount()
73 {
74 LOGD("RelationalSyncAbleStorage ref -1");
75 DecObjRef(this);
76 }
77
78 // Get the identifier of this rdb.
GetIdentifier() const79 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
80 {
81 std::string identifier = storageEngine_->GetIdentifier();
82 return std::vector<uint8_t>(identifier.begin(), identifier.end());
83 }
84
GetDualTupleIdentifier() const85 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
86 {
87 std::string identifier = storageEngine_->GetProperties().GetStringProp(
88 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
89 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
90 return identifierVect;
91 }
92
93 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const94 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp ×tamp) const
95 {
96 int errCode = E_OK;
97 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
98 if (handle == nullptr) {
99 return;
100 }
101 timestamp = 0;
102 errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
103 if (errCode != E_OK) {
104 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
105 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
106 }
107 ReleaseHandle(handle);
108 return;
109 }
110
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const111 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const
112 {
113 int errCode = E_OK;
114 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
115 if (handle == nullptr) {
116 return errCode;
117 }
118 timestamp = 0;
119 errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
120 if (errCode != E_OK) {
121 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
122 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
123 }
124 ReleaseHandle(handle);
125 return errCode;
126 }
127
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const128 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
129 OperatePerm perm) const
130 {
131 if (storageEngine_ == nullptr) {
132 errCode = -E_INVALID_DB;
133 return nullptr;
134 }
135 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
136 storageEngine_->FindExecutor(isWrite, perm, errCode));
137 if (handle == nullptr) {
138 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
139 }
140 return handle;
141 }
142
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const143 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
144 int &errCode, OperatePerm perm) const
145 {
146 if (storageEngine_ == nullptr) {
147 errCode = -E_INVALID_DB;
148 return nullptr;
149 }
150 if (transactionHandle_ != nullptr) {
151 return transactionHandle_;
152 }
153 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
154 storageEngine_->FindExecutor(isWrite, perm, errCode));
155 if (errCode != E_OK) {
156 ReleaseHandle(handle);
157 handle = nullptr;
158 }
159 return handle;
160 }
161
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const162 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
163 {
164 if (storageEngine_ == nullptr) {
165 return;
166 }
167 StorageExecutor *databaseHandle = handle;
168 storageEngine_->Recycle(databaseHandle);
169 std::function<void()> listener = nullptr;
170 {
171 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
172 listener = heartBeatListener_;
173 }
174 if (listener) {
175 listener();
176 }
177 }
178
179 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const180 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
181 {
182 if (storageEngine_ == nullptr) {
183 return -E_INVALID_DB;
184 }
185 if (key.size() > DBConstant::MAX_KEY_SIZE) {
186 return -E_INVALID_ARGS;
187 }
188 int errCode = E_OK;
189 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
190 if (handle == nullptr) {
191 return errCode;
192 }
193 errCode = handle->GetKvData(key, value);
194 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
195 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
196 }
197 ReleaseHandle(handle);
198 return errCode;
199 }
200
201 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)202 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
203 {
204 if (storageEngine_ == nullptr) {
205 return -E_INVALID_DB;
206 }
207 int errCode = E_OK;
208 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
209 if (handle == nullptr) {
210 return errCode;
211 }
212
213 errCode = handle->PutKvData(key, value); // meta doesn't need time.
214 if (errCode != E_OK) {
215 LOGE("Put kv data err:%d", errCode);
216 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
217 }
218 ReleaseHandle(handle);
219 return errCode;
220 }
221
PutMetaData(const Key & key,const Value & value,bool isInTransaction)222 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
223 {
224 if (storageEngine_ == nullptr) {
225 return -E_INVALID_DB;
226 }
227 int errCode = E_OK;
228 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
229 std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
230
231 // try to recycle using the handle
232 if (isInTransaction) {
233 handLock.lock();
234 if (reusedHandle_ != nullptr) {
235 handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
236 } else {
237 isInTransaction = false;
238 handLock.unlock();
239 }
240 }
241
242 if (handle == nullptr) {
243 handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
244 if (handle == nullptr) {
245 return errCode;
246 }
247 }
248
249 errCode = handle->PutKvData(key, value);
250 if (errCode != E_OK) {
251 LOGE("Put kv data err:%d", errCode);
252 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
253 }
254 if (!isInTransaction) {
255 ReleaseHandle(handle);
256 }
257 return errCode;
258 }
259
260 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)261 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
262 {
263 if (storageEngine_ == nullptr) {
264 return -E_INVALID_DB;
265 }
266 for (const auto &key : keys) {
267 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
268 return -E_INVALID_ARGS;
269 }
270 }
271 int errCode = E_OK;
272 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
273 if (handle == nullptr) {
274 return errCode;
275 }
276
277 handle->StartTransaction(TransactType::IMMEDIATE);
278 errCode = handle->DeleteMetaData(keys);
279 if (errCode != E_OK) {
280 handle->Rollback();
281 LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
282 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
283 } else {
284 handle->Commit();
285 }
286 ReleaseHandle(handle);
287 return errCode;
288 }
289
290 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const291 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
292 {
293 if (storageEngine_ == nullptr) {
294 return -E_INVALID_DB;
295 }
296 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
297 return -E_INVALID_ARGS;
298 }
299
300 int errCode = E_OK;
301 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
302 if (handle == nullptr) {
303 return errCode;
304 }
305
306 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
307 if (errCode != E_OK) {
308 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
309 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
310 }
311 ReleaseHandle(handle);
312 return errCode;
313 }
314
315 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const316 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
317 {
318 if (storageEngine_ == nullptr) {
319 return -E_INVALID_DB;
320 }
321 int errCode = E_OK;
322 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
323 if (handle == nullptr) {
324 return errCode;
325 }
326
327 errCode = handle->GetAllMetaKeys(keys);
328 if (errCode != E_OK) {
329 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
330 }
331 ReleaseHandle(handle);
332 return errCode;
333 }
334
GetDbProperties() const335 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
336 {
337 return storageEngine_->GetProperties();
338 }
339
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)340 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
341 {
342 int errCode = E_OK;
343 for (auto &item : dataItems) {
344 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
345 if (entry == nullptr) {
346 errCode = -E_OUT_OF_MEMORY;
347 LOGE("GetKvEntries failed, errCode:%d", errCode);
348 SingleVerKvEntry::Release(entries);
349 break;
350 }
351 entry->SetEntryData(std::move(item));
352 entries.push_back(entry);
353 }
354 return errCode;
355 }
356
GetDataItemSerialSize(const DataItem & item,size_t appendLen)357 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
358 {
359 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
360 // the size would not be very large.
361 static const size_t maxOrigDevLength = 40;
362 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
363 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
364 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
365 return dataSize;
366 }
367
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)368 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
369 size_t appendLen)
370 {
371 bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
372 for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
373 blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
374 reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
375 }
376 return !reachThreshold;
377 }
378
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)379 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
380 SQLiteSingleVerRelationalContinueToken *&token)
381 {
382 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
383 delete token;
384 token = nullptr;
385 return;
386 }
387
388 if (dataItems.empty()) {
389 errCode = -E_INTERNAL_ERROR;
390 LOGE("Get data unfinished but data items is empty.");
391 delete token;
392 token = nullptr;
393 return;
394 }
395 token->SetNextBeginTime(dataItems.back());
396 token->UpdateNextSyncOffset(dataItems.size());
397 }
398
399 /**
400 * Caller must ensure that parameter token is valid.
401 * If error happened, token will be deleted here.
402 */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo,RelationalSchemaObject && filterSchema) const403 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
404 SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo,
405 RelationalSchemaObject &&filterSchema) const
406 {
407 if (storageEngine_ == nullptr) {
408 return -E_INVALID_DB;
409 }
410
411 int errCode = E_OK;
412 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
413 OperatePerm::NORMAL_PERM, errCode));
414 if (handle == nullptr) {
415 goto ERROR;
416 }
417 handle->SetLocalSchema(filterSchema);
418 do {
419 errCode = handle->GetSyncDataByQuery(dataItems,
420 Parcel::GetAppendedLen(),
421 dataSizeInfo,
422 [token](sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt, bool &isGettingDeletedData) {
423 return token->GetStatement(db, queryStmt, fullStmt, isGettingDeletedData);
424 }, storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
425 if (errCode == -E_FINISHED) {
426 token->FinishGetData();
427 errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
428 }
429 } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
430
431 ERROR:
432 if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
433 dataItems.clear();
434 }
435 ProcessContinueTokenForQuerySync(dataItems, errCode, token);
436 ReleaseHandle(handle);
437 return errCode;
438 }
439
440 // use kv struct data to sync
441 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const442 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
443 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
444 std::vector<SingleVerKvEntry *> &entries) const
445 {
446 if (!timeRange.IsValid()) {
447 return -E_INVALID_ARGS;
448 }
449 query.SetSchema(storageEngine_->GetSchema());
450 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
451 if (token == nullptr) {
452 LOGE("[SingleVerNStore] Allocate continue token failed.");
453 return -E_OUT_OF_MEMORY;
454 }
455
456 continueStmtToken = static_cast<ContinueToken>(token);
457 return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
458 }
459
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const460 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
461 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
462 {
463 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
464 if (token == nullptr) {
465 LOGE("[SingleVerNStore] Allocate continue stmt token failed.");
466 return -E_OUT_OF_MEMORY;
467 }
468 if (!token->CheckValid()) {
469 return -E_INVALID_ARGS;
470 }
471 RelationalSchemaObject schema = storageEngine_->GetSchema();
472 RelationalSchemaObject filterSchema;
473 if (token->IsUseLocalSchema()) {
474 filterSchema = schema;
475 } else {
476 int errCode = GetRemoteDeviceSchema(token->GetRemoteDev(), filterSchema);
477 if (errCode != E_OK) {
478 return errCode;
479 }
480 }
481 const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
482 std::vector<std::string> fieldNames;
483 fieldNames.reserve(fieldInfos.size());
484 for (const auto &fieldInfo : fieldInfos) { // order by cid
485 fieldNames.push_back(fieldInfo.GetFieldName());
486 }
487 token->SetFieldNames(fieldNames);
488
489 std::vector<DataItem> dataItems;
490 int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo, std::move(filterSchema));
491 if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
492 continueStmtToken = static_cast<ContinueToken>(token);
493 return errCode;
494 }
495
496 int innerCode = GetKvEntriesByDataItems(entries, dataItems);
497 if (innerCode != E_OK) {
498 errCode = innerCode;
499 delete token;
500 token = nullptr;
501 }
502 continueStmtToken = static_cast<ContinueToken>(token);
503 return errCode;
504 }
505
506 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)507 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
508 {
509 std::vector<DataItem> dataItems;
510 for (const auto &itemEntry : entries) {
511 GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
512 if (entry != nullptr) {
513 DataItem item;
514 item.origDev = entry->GetOrigDevice();
515 item.flag = entry->GetFlag();
516 item.timestamp = entry->GetTimestamp();
517 item.writeTimestamp = entry->GetWriteTimestamp();
518 entry->GetKey(item.key);
519 entry->GetValue(item.value);
520 entry->GetHashKey(item.hashKey);
521 dataItems.push_back(item);
522 }
523 }
524 return dataItems;
525 }
526 }
527
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)528 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
529 const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
530 {
531 std::vector<DataItem> dataItems = ConvertEntries(entries);
532 return PutSyncData(object, dataItems, deviceName);
533 }
534
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)535 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
536 const std::string &deviceName)
537 {
538 int errCode = E_OK;
539 LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
540 QueryObject query = object;
541 auto localSchema = storageEngine_->GetSchema();
542 query.SetSchema(localSchema);
543
544 RelationalSchemaObject filterSchema;
545 errCode = GetRemoteDeviceSchema(deviceName, filterSchema);
546 if (errCode != E_OK) {
547 LOGE("Find remote schema failed. err=%d", errCode);
548 return errCode;
549 }
550 if (query.IsUseLocalSchema()) {
551 // remote send always with its table col sort
552 filterSchema.SetDistributedSchema(localSchema.GetDistributedSchema());
553 }
554
555 StoreInfo info = GetStoreInfo();
556 auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
557 filterSchema.GetSyncFieldInfo(query.GetTableName()), info);
558 inserter.SetEntries(dataItems);
559
560 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
561 if (handle == nullptr) {
562 return errCode;
563 }
564
565 // To prevent certain abnormal scenarios from deleting the table,
566 // check if the table exists before each synchronization.
567 // If the table does not exist, create it.
568 // Because it is a fallback scenario, if the table creation fails, no failure will be returned
569 if (localSchema.GetTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
570 errCode = handle->CreateDistributedDeviceTable(deviceName,
571 storageEngine_->GetSchema().GetTable(query.GetTableName()), info);
572 if (errCode != E_OK) {
573 LOGW("[RelationalSyncAbleStorage::SaveSyncDataItems] Create distributed device table fail %d", errCode);
574 }
575 }
576 DBDfxAdapter::StartTracing();
577
578 handle->SetTableMode(localSchema.GetTableMode());
579 errCode = handle->SaveSyncItems(inserter);
580 ChangedData data = inserter.GetChangedData();
581 data.properties.isP2pSyncDataChange = !dataItems.empty();
582 bool emptyChangedData = data.field.empty() && data.primaryData[OP_INSERT].empty() &&
583 data.primaryData[OP_UPDATE].empty() && data.primaryData[OP_DELETE].empty();
584
585 DBDfxAdapter::FinishTracing();
586 if (errCode == E_OK && !emptyChangedData) {
587 // dataItems size > 0 now because already check before
588 // all dataItems will write into db now, so need to observer notify here
589 // if some dataItems will not write into db in the future, observer notify here need change
590 data.tableName = query.GetTableName();
591 // SPLIT_BY_DEVICE trigger observer with device, userId, appId and storeId, so trigger with isChangeData false
592 // COLLABORATION trigger observer with changeData, so trigger with isChangeData true
593 TriggerObserverAction(deviceName, std::move(data),
594 GetDbProperties().GetDistributedTableMode() == DistributedTableMode::COLLABORATION, Origin::ORIGIN_REMOTE);
595 }
596
597 ReleaseHandle(handle);
598 return errCode;
599 }
600
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)601 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
602 const std::string &deviceName)
603 {
604 if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
605 LOGW("Device length is invalid for sync put");
606 return -E_INVALID_ARGS;
607 }
608
609 int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
610 if (errCode != E_OK) {
611 LOGE("[Relational] PutSyncData errCode:%d", errCode);
612 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
613 }
614 return errCode;
615 }
616
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)617 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
618 {
619 (void) deviceName;
620 (void) isNeedNotify;
621 return -E_NOT_SUPPORT;
622 }
623
GetSchemaInfo() const624 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
625 {
626 return storageEngine_->GetSchema();
627 }
628
GetSecurityOption(SecurityOption & option) const629 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
630 {
631 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
632 if (isCachedOption_) {
633 option = securityOption_;
634 return E_OK;
635 }
636 std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
637 int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
638 if (errCode == E_OK) {
639 option = securityOption_;
640 isCachedOption_ = true;
641 }
642 return errCode;
643 }
644
NotifyRemotePushFinished(const std::string & deviceId) const645 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
646 {
647 return;
648 }
649
650 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const651 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
652 {
653 return OS::GetCurrentSysTimeInMicrosecond(outTime);
654 }
655
GetTablesQuery()656 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
657 {
658 auto tableNames = storageEngine_->GetSchema().GetTableNames();
659 std::vector<QuerySyncObject> queries;
660 queries.reserve(tableNames.size());
661 for (const auto &it : tableNames) {
662 queries.emplace_back(Query::Select(it));
663 }
664 return queries;
665 }
666
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)667 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
668 {
669 (void) queryObj;
670 return -E_NOT_SUPPORT;
671 }
672
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const673 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
674 const std::string &targetID, bool isPush) const
675 {
676 return E_OK;
677 }
678
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)679 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
680 const RelationalSyncStrategy &syncStrategy)
681 {
682 auto mode = storageEngine_->GetProperties().GetDistributedTableMode();
683 if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
684 LOGD("No need create device table in COLLABORATION mode.");
685 return E_OK;
686 }
687
688 int errCode = E_OK;
689 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
690 if (handle == nullptr) {
691 return errCode;
692 }
693
694 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
695 if (errCode != E_OK) {
696 LOGE("Start transaction failed:%d", errCode);
697 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
698 ReleaseHandle(handle);
699 return errCode;
700 }
701
702 StoreInfo info = GetStoreInfo();
703 for (const auto &[table, strategy] : syncStrategy) {
704 if (!strategy.permitSync) {
705 continue;
706 }
707
708 errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
709 if (errCode != E_OK) {
710 LOGE("Create distributed device table failed. %d", errCode);
711 break;
712 }
713 }
714
715 if (errCode == E_OK) {
716 errCode = handle->Commit();
717 } else {
718 (void)handle->Rollback();
719 }
720
721 ReleaseHandle(handle);
722 return errCode;
723 }
724
RegisterSchemaChangedCallback(const std::function<void ()> & callback)725 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
726 {
727 std::lock_guard lock(onSchemaChangedMutex_);
728 onSchemaChanged_ = callback;
729 return E_OK;
730 }
731
NotifySchemaChanged()732 void RelationalSyncAbleStorage::NotifySchemaChanged()
733 {
734 std::lock_guard lock(onSchemaChangedMutex_);
735 if (onSchemaChanged_) {
736 LOGD("Notify relational schema was changed");
737 onSchemaChanged_();
738 }
739 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const740 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
741 {
742 algorithmSet.clear();
743 DataCompression::GetCompressionAlgo(algorithmSet);
744 return E_OK;
745 }
746
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)747 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
748 const RelationalObserverAction &action)
749 {
750 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
751 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
752 auto it = dataChangeCallbackMap_.find(connectionId);
753 if (it != dataChangeCallbackMap_.end()) {
754 if (it->second.find(observer) != it->second.end()) {
755 LOGE("obsever already registered");
756 return -E_ALREADY_SET;
757 }
758 if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
759 LOGE("The number of relational observers has been over limit");
760 return -E_MAX_LIMITS;
761 }
762 it->second[observer] = action;
763 } else {
764 dataChangeCallbackMap_[connectionId][observer] = action;
765 }
766 LOGI("register relational observer ok");
767 return E_OK;
768 }
769
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)770 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
771 {
772 if (observer == nullptr) {
773 EraseDataChangeCallback(connectionId);
774 return E_OK;
775 }
776 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
777 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
778 auto it = dataChangeCallbackMap_.find(connectionId);
779 if (it != dataChangeCallbackMap_.end()) {
780 auto action = it->second.find(observer);
781 if (action != it->second.end()) {
782 it->second.erase(action);
783 LOGI("unregister relational observer.");
784 if (it->second.empty()) {
785 dataChangeCallbackMap_.erase(it);
786 LOGI("observer for this delegate is zero now");
787 }
788 return E_OK;
789 }
790 }
791 return -E_NOT_FOUND;
792 }
793
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,const std::string & deviceName,const ChangedData & changedData,bool isChangedData,Origin origin)794 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
795 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
796 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin)
797 {
798 for (auto &action : item.second) {
799 if (action.second == nullptr) {
800 continue;
801 }
802 ChangedData observerChangeData = changedData;
803 if (action.first != nullptr) {
804 FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
805 }
806 action.second(deviceName, std::move(observerChangeData), isChangedData, origin);
807 }
808 }
809
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)810 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
811 ChangedData &&changedData, bool isChangedData)
812 {
813 TriggerObserverAction(deviceName, std::move(changedData), isChangedData, Origin::ORIGIN_CLOUD);
814 }
815
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData,Origin origin)816 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData,
817 bool isChangedData, Origin origin)
818 {
819 IncObjRef(this);
820 int taskErrCode =
821 ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData, origin] () mutable {
822 LOGD("begin to trigger relational observer.");
823 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
824 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
825 for (const auto &item : dataChangeCallbackMap_) {
826 ExecuteDataChangeCallback(item, deviceName, changedData, isChangedData, origin);
827 }
828 DecObjRef(this);
829 }, &dataChangeCallbackMap_);
830 if (taskErrCode != E_OK) {
831 LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
832 DecObjRef(this);
833 }
834 }
835
RegisterHeartBeatListener(const std::function<void ()> & listener)836 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
837 {
838 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
839 heartBeatListener_ = listener;
840 }
841
CheckAndInitQueryCondition(QueryObject & query) const842 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
843 {
844 RelationalSchemaObject schema = storageEngine_->GetSchema();
845 TableInfo table = schema.GetTable(query.GetTableName());
846 if (!table.IsValid()) {
847 LOGE("Query table is not a distributed table.");
848 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
849 }
850 if (table.GetTableSyncType() == CLOUD_COOPERATION) {
851 LOGE("cloud table mode is not support");
852 return -E_NOT_SUPPORT;
853 }
854 query.SetSchema(schema);
855
856 int errCode = E_OK;
857 auto *handle = GetHandle(true, errCode);
858 if (handle == nullptr) {
859 return errCode;
860 }
861
862 errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
863 if (errCode != E_OK) {
864 LOGE("Check relational query condition failed. %d", errCode);
865 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
866 }
867
868 ReleaseHandle(handle);
869 return errCode;
870 }
871
CheckCompatible(const std::string & schema,uint8_t type) const872 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
873 {
874 // return true if is relational schema.
875 return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
876 }
877
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const878 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
879 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
880 {
881 if (!storageEngine_->GetSchema().IsSchemaValid()) {
882 return -E_NOT_SUPPORT;
883 }
884 if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
885 LOGE("[ExecuteQuery] invalid args");
886 return -E_INVALID_ARGS;
887 }
888 int errCode = E_OK;
889 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
890 if (handle == nullptr) {
891 LOGE("[ExecuteQuery] get handle fail:%d", errCode);
892 return errCode;
893 }
894 errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
895 if (errCode != E_OK) {
896 LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
897 }
898 ReleaseHandle(handle);
899 return errCode;
900 }
901
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const902 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
903 RelationalRowDataSet &dataSet, ContinueToken &token) const
904 {
905 dataSet.Clear();
906 if (token == nullptr) {
907 // start query
908 std::vector<std::string> colNames;
909 std::vector<RelationalRowData *> data;
910 ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
911
912 int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
913 if (errCode != E_OK) {
914 return errCode;
915 }
916
917 // create one token
918 token = static_cast<ContinueToken>(
919 new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
920 if (token == nullptr) {
921 LOGE("ExecuteQuery OOM");
922 return -E_OUT_OF_MEMORY;
923 }
924 }
925
926 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
927 if (!remoteToken->CheckValid()) {
928 LOGE("ExecuteQuery invalid token");
929 return -E_INVALID_ARGS;
930 }
931
932 int errCode = remoteToken->GetData(packetSize, dataSet);
933 if (errCode == -E_UNFINISHED) {
934 errCode = E_OK;
935 } else {
936 if (errCode != E_OK) {
937 dataSet.Clear();
938 }
939 delete remoteToken;
940 remoteToken = nullptr;
941 token = nullptr;
942 }
943 LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
944 return errCode;
945 }
946
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)947 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
948 uint8_t type)
949 {
950 if (ReadSchemaType(type) != SchemaType::RELATIVE) {
951 return -E_INVALID_ARGS;
952 }
953
954 RelationalSchemaObject schemaObj;
955 int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
956 if (errCode != E_OK) {
957 LOGE("Parse remote schema failed. err=%d", errCode);
958 return errCode;
959 }
960
961 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
962 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
963 Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
964 errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
965 if (errCode != E_OK) {
966 LOGE("Save remote schema failed. err=%d", errCode);
967 return errCode;
968 }
969
970 return remoteDeviceSchema_.Put(deviceId, remoteSchema);
971 }
972
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj) const973 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId,
974 RelationalSchemaObject &schemaObj) const
975 {
976 if (schemaObj.IsSchemaValid()) {
977 LOGE("schema is already valid");
978 return -E_INVALID_ARGS;
979 }
980
981 std::string remoteSchema;
982 int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
983 if (errCode == -E_NOT_FOUND) {
984 LOGW("Get remote device schema miss cached.");
985 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
986 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
987 Value remoteSchemaBuff;
988 errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
989 if (errCode != E_OK) {
990 LOGE("Get remote device schema from meta failed. err=%d", errCode);
991 return errCode;
992 }
993 remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
994 errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
995 }
996
997 if (errCode != E_OK) {
998 LOGE("Get remote device schema failed. err=%d", errCode);
999 return errCode;
1000 }
1001
1002 errCode = schemaObj.ParseFromSchemaString(remoteSchema);
1003 if (errCode != E_OK) {
1004 LOGE("Parse remote schema failed. err=%d", errCode);
1005 }
1006 return errCode;
1007 }
1008
SetReusedHandle(StorageExecutor * handle)1009 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
1010 {
1011 std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
1012 reusedHandle_ = handle;
1013 }
1014
ReleaseRemoteQueryContinueToken(ContinueToken & token) const1015 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
1016 {
1017 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
1018 delete remoteToken;
1019 remoteToken = nullptr;
1020 token = nullptr;
1021 }
1022
GetStoreInfo() const1023 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
1024 {
1025 StoreInfo info = {
1026 storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
1027 storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
1028 storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
1029 };
1030 return info;
1031 }
1032
StartTransaction(TransactType type)1033 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
1034 {
1035 if (storageEngine_ == nullptr) {
1036 return -E_INVALID_DB;
1037 }
1038 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1039 if (transactionHandle_ != nullptr) {
1040 LOGD("Transaction started already.");
1041 return -E_TRANSACT_STATE;
1042 }
1043 int errCode = E_OK;
1044 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1045 storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1046 if (handle == nullptr) {
1047 ReleaseHandle(handle);
1048 return errCode;
1049 }
1050 errCode = handle->StartTransaction(type);
1051 if (errCode != E_OK) {
1052 ReleaseHandle(handle);
1053 return errCode;
1054 }
1055 transactionHandle_ = handle;
1056 return errCode;
1057 }
1058
Commit()1059 int RelationalSyncAbleStorage::Commit()
1060 {
1061 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1062 if (transactionHandle_ == nullptr) {
1063 LOGE("relation database is null or the transaction has not been started");
1064 return -E_INVALID_DB;
1065 }
1066 int errCode = transactionHandle_->Commit();
1067 ReleaseHandle(transactionHandle_);
1068 transactionHandle_ = nullptr;
1069 LOGD("connection commit transaction!");
1070 return errCode;
1071 }
1072
Rollback()1073 int RelationalSyncAbleStorage::Rollback()
1074 {
1075 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1076 if (transactionHandle_ == nullptr) {
1077 LOGE("Invalid handle for rollback or the transaction has not been started.");
1078 return -E_INVALID_DB;
1079 }
1080
1081 int errCode = transactionHandle_->Rollback();
1082 ReleaseHandle(transactionHandle_);
1083 transactionHandle_ = nullptr;
1084 LOGI("connection rollback transaction!");
1085 return errCode;
1086 }
1087
GetAllUploadCount(const QuerySyncObject & query,const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1088 int RelationalSyncAbleStorage::GetAllUploadCount(const QuerySyncObject &query,
1089 const std::vector<Timestamp> ×tampVec, bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1090 {
1091 int errCode = E_OK;
1092 auto *handle = GetHandleExpectTransaction(false, errCode);
1093 if (handle == nullptr) {
1094 return errCode;
1095 }
1096 QuerySyncObject queryObj = query;
1097 queryObj.SetSchema(GetSchemaInfo());
1098 errCode = handle->GetAllUploadCount(timestampVec, isCloudForcePush, isCompensatedTask, queryObj, count);
1099 if (transactionHandle_ == nullptr) {
1100 ReleaseHandle(handle);
1101 }
1102 return errCode;
1103 }
1104
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1105 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp,
1106 bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1107 {
1108 int errCode = E_OK;
1109 auto *handle = GetHandleExpectTransaction(false, errCode);
1110 if (handle == nullptr) {
1111 return errCode;
1112 }
1113 QuerySyncObject queryObj = query;
1114 queryObj.SetSchema(GetSchemaInfo());
1115 errCode = handle->GetUploadCount(timestamp, isCloudForcePush, isCompensatedTask, queryObj, count);
1116 if (transactionHandle_ == nullptr) {
1117 ReleaseHandle(handle);
1118 }
1119 return errCode;
1120 }
1121
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1122 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1123 const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1124 {
1125 if (transactionHandle_ == nullptr) {
1126 LOGE("the transaction has not been started");
1127 return -E_INVALID_DB;
1128 }
1129 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1130 QuerySyncObject query = querySyncObject;
1131 query.SetSchema(GetSchemaInfo());
1132 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1133 if (token == nullptr) {
1134 LOGE("[SingleVerNStore] Allocate continue token failed.");
1135 return -E_OUT_OF_MEMORY;
1136 }
1137 token->SetCloudTableSchema(tableSchema);
1138 continueStmtToken = static_cast<ContinueToken>(token);
1139 return GetCloudDataNext(continueStmtToken, cloudDataResult);
1140 }
1141
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1142 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1143 CloudSyncData &cloudDataResult)
1144 {
1145 if (continueStmtToken == nullptr) {
1146 return -E_INVALID_ARGS;
1147 }
1148 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1149 if (!token->CheckValid()) {
1150 return -E_INVALID_ARGS;
1151 }
1152 if (transactionHandle_ == nullptr) {
1153 LOGE("the transaction has not been started, release the token");
1154 ReleaseCloudDataToken(continueStmtToken);
1155 return -E_INVALID_DB;
1156 }
1157 cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1158 auto config = GetCloudSyncConfig();
1159 transactionHandle_->SetUploadConfig(config.maxUploadCount, config.maxUploadSize);
1160 int errCode = transactionHandle_->GetSyncCloudData(uploadRecorder_, cloudDataResult, *token);
1161 LOGI("mode:%d upload data, ins:%zu, upd:%zu, del:%zu, lock:%zu", cloudDataResult.mode,
1162 cloudDataResult.insData.extend.size(), cloudDataResult.updData.extend.size(),
1163 cloudDataResult.delData.extend.size(), cloudDataResult.lockData.extend.size());
1164 if (errCode != -E_UNFINISHED) {
1165 delete token;
1166 token = nullptr;
1167 }
1168 continueStmtToken = static_cast<ContinueToken>(token);
1169 if (errCode != E_OK && errCode != -E_UNFINISHED) {
1170 return errCode;
1171 }
1172 int fillRefGidCode = FillReferenceData(cloudDataResult);
1173 return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1174 }
1175
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)1176 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1177 bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
1178 {
1179 int errCode = E_OK;
1180 auto *handle = GetHandle(false, errCode);
1181 if (handle == nullptr) {
1182 return errCode;
1183 }
1184 Timestamp beginTime = 0u;
1185 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1186 QuerySyncObject query = querySyncObject;
1187 query.SetSchema(GetSchemaInfo());
1188 handle->SetTableSchema(tableSchema);
1189 errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, isCompensatedTask, cloudGid);
1190 ReleaseHandle(handle);
1191 if (errCode != E_OK) {
1192 LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1193 }
1194 return errCode;
1195 }
1196
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1197 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1198 {
1199 if (continueStmtToken == nullptr) {
1200 return E_OK;
1201 }
1202 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1203 if (!token->CheckValid()) {
1204 return E_OK;
1205 }
1206 int errCode = token->ReleaseCloudStatement();
1207 delete token;
1208 token = nullptr;
1209 return errCode;
1210 }
1211
GetSchemaFromDB(RelationalSchemaObject & schema)1212 int RelationalSyncAbleStorage::GetSchemaFromDB(RelationalSchemaObject &schema)
1213 {
1214 Key schemaKey;
1215 DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
1216 Value schemaVal;
1217 int errCode = GetMetaData(schemaKey, schemaVal);
1218 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1219 LOGE("Get relational schema from DB failed. %d", errCode);
1220 return errCode;
1221 } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
1222 LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
1223 return -E_NOT_FOUND;
1224 }
1225 std::string schemaStr;
1226 DBCommon::VectorToString(schemaVal, schemaStr);
1227 errCode = schema.ParseFromSchemaString(schemaStr);
1228 if (errCode != E_OK) {
1229 LOGE("Parse schema string from DB failed.");
1230 return errCode;
1231 }
1232 storageEngine_->SetSchema(schema);
1233 return errCode;
1234 }
1235
ChkSchema(const TableName & tableName)1236 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1237 {
1238 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1239 RelationalSchemaObject localSchema = GetSchemaInfo();
1240 int errCode = schemaMgr_.ChkSchema(tableName, localSchema);
1241 if (errCode == -E_SCHEMA_MISMATCH) {
1242 LOGI("Get schema by tableName %s failed.", DBCommon::STR_MASK(tableName));
1243 RelationalSchemaObject newSchema;
1244 errCode = GetSchemaFromDB(newSchema);
1245 if (errCode != E_OK) {
1246 LOGE("Get schema from db when check schema. err: %d", errCode);
1247 return -E_SCHEMA_MISMATCH;
1248 }
1249 errCode = schemaMgr_.ChkSchema(tableName, newSchema);
1250 }
1251 return errCode;
1252 }
1253
SetCloudDbSchema(const DataBaseSchema & schema)1254 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1255 {
1256 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1257 RelationalSchemaObject localSchema = GetSchemaInfo();
1258 schemaMgr_.SetCloudDbSchema(schema, localSchema);
1259 return E_OK;
1260 }
1261
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1262 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1263 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1264 {
1265 return GetInfoByPrimaryKeyOrGid(tableName, vBucket, true, dataInfoWithLog, assetInfo);
1266 }
1267
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1268 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1269 const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1270 {
1271 if (handle == nullptr) {
1272 return -E_INVALID_DB;
1273 }
1274 TableSchema tableSchema;
1275 int errCode = GetCloudTableSchema(tableName, tableSchema);
1276 if (errCode != E_OK) {
1277 LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1278 return errCode;
1279 }
1280 RelationalSchemaObject localSchema = GetSchemaInfo();
1281 handle->SetLocalSchema(localSchema);
1282 return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1283 }
1284
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1285 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1286 {
1287 if (transactionHandle_ == nullptr) {
1288 LOGE(" the transaction has not been started");
1289 return -E_INVALID_DB;
1290 }
1291 return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1292 }
1293
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1294 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1295 const std::string &tableName, DownloadData &downloadData)
1296 {
1297 TableSchema tableSchema;
1298 int errCode = GetCloudTableSchema(tableName, tableSchema);
1299 if (errCode != E_OK) {
1300 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1301 return errCode;
1302 }
1303 RelationalSchemaObject localSchema = GetSchemaInfo();
1304 handle->SetLocalSchema(localSchema);
1305 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1306 handle->SetLogicDelete(IsCurrentLogicDelete());
1307 errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1308 handle->SetLogicDelete(false);
1309 return errCode;
1310 }
1311
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1312 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1313 {
1314 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1315 cloudSchema = schemaMgr_.GetCloudDbSchema();
1316 return E_OK;
1317 }
1318
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1319 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1320 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1321 {
1322 if (transactionHandle_ == nullptr) {
1323 LOGE("the transaction has not been started");
1324 return -E_INVALID_DB;
1325 }
1326 transactionHandle_->SetLogicDelete(logicDelete_);
1327 std::vector<std::string> notifyTableList;
1328 int errCode = transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets, notifyTableList);
1329 if (!notifyTableList.empty()) {
1330 for (auto notifyTableName : notifyTableList) {
1331 ChangedData changedData;
1332 changedData.type = ChangedDataType::DATA;
1333 changedData.tableName = notifyTableName;
1334 std::vector<DistributedDB::Type> dataVec;
1335 DistributedDB::Type type;
1336 if (mode == FLAG_ONLY) {
1337 type = std::string(CloudDbConstant::FLAG_ONLY_MODE_NOTIFY);
1338 } else {
1339 type = std::string(CloudDbConstant::FLAG_AND_DATA_MODE_NOTIFY);
1340 }
1341 dataVec.push_back(type);
1342 changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec);
1343 TriggerObserverAction("CLOUD", std::move(changedData), true);
1344 }
1345 }
1346 transactionHandle_->SetLogicDelete(false);
1347 return errCode;
1348 }
1349
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1350 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1351 {
1352 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1353 return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1354 }
1355
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1356 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1357 bool isDownloadSuccess)
1358 {
1359 if (storageEngine_ == nullptr) {
1360 return -E_INVALID_DB;
1361 }
1362 if (transactionHandle_ == nullptr) {
1363 LOGE("the transaction has not been started when fill asset for download.");
1364 return -E_INVALID_DB;
1365 }
1366 TableSchema tableSchema;
1367 int errCode = GetCloudTableSchema(tableName, tableSchema);
1368 if (errCode != E_OK) {
1369 LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1370 return errCode;
1371 }
1372 uint64_t currCursor = DBConstant::INVALID_CURSOR;
1373 errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess, currCursor);
1374 if (errCode != E_OK) {
1375 LOGE("fill cloud asset for download failed.%d", errCode);
1376 }
1377 return errCode;
1378 }
1379
SetLogTriggerStatus(bool status)1380 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1381 {
1382 int errCode = E_OK;
1383 auto *handle = GetHandleExpectTransaction(false, errCode);
1384 if (handle == nullptr) {
1385 return errCode;
1386 }
1387 errCode = handle->SetLogTriggerStatus(status);
1388 if (transactionHandle_ == nullptr) {
1389 ReleaseHandle(handle);
1390 }
1391 return errCode;
1392 }
1393
SetCursorIncFlag(bool flag)1394 int RelationalSyncAbleStorage::SetCursorIncFlag(bool flag)
1395 {
1396 int errCode = E_OK;
1397 auto *handle = GetHandleExpectTransaction(false, errCode);
1398 if (handle == nullptr) {
1399 return errCode;
1400 }
1401 errCode = handle->SetCursorIncFlag(flag);
1402 if (transactionHandle_ == nullptr) {
1403 ReleaseHandle(handle);
1404 }
1405 return errCode;
1406 }
1407
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1408 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1409 bool ignoreEmptyGid)
1410 {
1411 if (storageEngine_ == nullptr) {
1412 return -E_INVALID_DB;
1413 }
1414 int errCode = E_OK;
1415 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1416 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1417 if (writeHandle == nullptr) {
1418 return errCode;
1419 }
1420 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1421 if (errCode != E_OK) {
1422 ReleaseHandle(writeHandle);
1423 return errCode;
1424 }
1425 errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1426 if (errCode != E_OK) {
1427 LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1428 writeHandle->Rollback();
1429 ReleaseHandle(writeHandle);
1430 return errCode;
1431 }
1432 errCode = writeHandle->Commit();
1433 ReleaseHandle(writeHandle);
1434 return errCode;
1435 }
1436
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1437 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1438 {
1439 syncAbleEngine_ = syncAbleEngine;
1440 }
1441
GetIdentify() const1442 std::string RelationalSyncAbleStorage::GetIdentify() const
1443 {
1444 if (storageEngine_ == nullptr) {
1445 LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1446 return "";
1447 }
1448 return storageEngine_->GetIdentifier();
1449 }
1450
EraseDataChangeCallback(uint64_t connectionId)1451 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1452 {
1453 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1454 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
1455 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
1456 auto it = dataChangeCallbackMap_.find(connectionId);
1457 if (it != dataChangeCallbackMap_.end()) {
1458 dataChangeCallbackMap_.erase(it);
1459 LOGI("erase all observer for this delegate.");
1460 }
1461 }, nullptr, &dataChangeCallbackMap_);
1462 ADAPTER_WAIT(handle);
1463 }
1464
ReleaseContinueToken(ContinueToken & continueStmtToken) const1465 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1466 {
1467 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1468 if (token == nullptr || !(token->CheckValid())) {
1469 LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1470 return;
1471 }
1472 delete token;
1473 continueStmtToken = nullptr;
1474 }
1475
CheckQueryValid(const QuerySyncObject & query)1476 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1477 {
1478 int errCode = E_OK;
1479 auto *handle = GetHandle(false, errCode);
1480 if (handle == nullptr) {
1481 return errCode;
1482 }
1483 errCode = handle->CheckQueryObjectLegal(query);
1484 if (errCode != E_OK) {
1485 ReleaseHandle(handle);
1486 return errCode;
1487 }
1488 QuerySyncObject queryObj = query;
1489 queryObj.SetSchema(GetSchemaInfo());
1490 int64_t count = 0;
1491 errCode = handle->GetUploadCount(UINT64_MAX, false, false, queryObj, count);
1492 ReleaseHandle(handle);
1493 if (errCode != E_OK) {
1494 LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1495 return -E_INVALID_ARGS;
1496 }
1497 return errCode;
1498 }
1499
CreateTempSyncTrigger(const std::string & tableName)1500 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1501 {
1502 int errCode = E_OK;
1503 auto *handle = GetHandle(true, errCode);
1504 if (handle == nullptr) {
1505 return errCode;
1506 }
1507 errCode = CreateTempSyncTriggerInner(handle, tableName, true);
1508 ReleaseHandle(handle);
1509 if (errCode != E_OK) {
1510 LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1511 }
1512 return errCode;
1513 }
1514
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1515 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1516 ChangeProperties &changeProperties)
1517 {
1518 int errCode = E_OK;
1519 auto *handle = GetHandle(false, errCode);
1520 if (handle == nullptr) {
1521 return errCode;
1522 }
1523 errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1524 ReleaseHandle(handle);
1525 if (errCode != E_OK) {
1526 LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1527 }
1528 return errCode;
1529 }
1530
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1531 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1532 {
1533 if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1534 changedData.field = {};
1535 for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1536 changedData.primaryData[i].clear();
1537 }
1538 }
1539 if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1540 changedData.properties = {};
1541 }
1542 }
1543
ClearAllTempSyncTrigger()1544 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1545 {
1546 int errCode = E_OK;
1547 auto *handle = GetHandle(true, errCode);
1548 if (handle == nullptr) {
1549 return errCode;
1550 }
1551 errCode = handle->ClearAllTempSyncTrigger();
1552 ReleaseHandle(handle);
1553 if (errCode != E_OK) {
1554 LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1555 }
1556 return errCode;
1557 }
1558
FillReferenceData(CloudSyncData & syncData)1559 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1560 {
1561 std::map<int64_t, Entries> referenceGid;
1562 int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1563 if (errCode != E_OK) {
1564 LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1565 return errCode;
1566 }
1567 errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1568 if (errCode != E_OK) {
1569 return errCode;
1570 }
1571 referenceGid.clear();
1572 errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1573 if (errCode != E_OK) {
1574 LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1575 return errCode;
1576 }
1577 return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1578 }
1579
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1580 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1581 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1582 {
1583 if (referenceGid.empty()) {
1584 return E_OK;
1585 }
1586 int ignoredCount = 0;
1587 for (size_t index = 0u; index < rowid.size(); index++) {
1588 if (index >= extend.size()) {
1589 LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1590 return -E_UNEXPECTED_DATA;
1591 }
1592 int64_t rowId = rowid[index];
1593 if (referenceGid.find(rowId) == referenceGid.end()) {
1594 // current data miss match reference data, we ignored it
1595 ignoredCount++;
1596 continue;
1597 }
1598 extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1599 }
1600 if (ignoredCount != 0) {
1601 LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1602 }
1603 return E_OK;
1604 }
1605
IsSharedTable(const std::string & tableName)1606 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1607 {
1608 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1609 return schemaMgr_.IsSharedTable(tableName);
1610 }
1611
GetSharedTableOriginNames()1612 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1613 {
1614 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1615 return schemaMgr_.GetSharedTableOriginNames();
1616 }
1617
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1618 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1619 std::map<int64_t, Entries> &referenceGid)
1620 {
1621 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1622 int errCode = GetTableReference(tableName, tableReference);
1623 if (errCode != E_OK) {
1624 return errCode;
1625 }
1626 if (tableReference.empty()) {
1627 LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1628 return E_OK;
1629 }
1630 auto *handle = GetHandle(false, errCode);
1631 if (handle == nullptr) {
1632 return errCode;
1633 }
1634 errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1635 ReleaseHandle(handle);
1636 return errCode;
1637 }
1638
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1639 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1640 std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1641 {
1642 if (storageEngine_ == nullptr) {
1643 LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1644 return -E_INVALID_DB;
1645 }
1646 RelationalSchemaObject schema = storageEngine_->GetSchema();
1647 auto referenceProperty = schema.GetReferenceProperty();
1648 if (referenceProperty.empty()) {
1649 return E_OK;
1650 }
1651 auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1652 if (errCode != E_OK) {
1653 return errCode;
1654 }
1655 for (const auto &property : referenceProperty) {
1656 if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1657 if (!IsSharedTable(tableName)) {
1658 reference[property.targetTableName].push_back(property);
1659 continue;
1660 }
1661 TableReferenceProperty tableReference;
1662 tableReference.sourceTableName = tableName;
1663 tableReference.columns = property.columns;
1664 tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1665 auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1666 if (ret != E_OK) {
1667 return ret;
1668 }
1669 tableReference.targetTableName = sharedTargetTable;
1670 reference[tableReference.targetTableName].push_back(tableReference);
1671 }
1672 }
1673 return E_OK;
1674 }
1675
GetSourceTableName(const std::string & tableName)1676 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1677 {
1678 std::pair<std::string, int> res = { "", E_OK };
1679 std::shared_ptr<DataBaseSchema> cloudSchema;
1680 (void) GetCloudDbSchema(cloudSchema);
1681 if (cloudSchema == nullptr) {
1682 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1683 return { "", -E_INTERNAL_ERROR };
1684 }
1685 for (const auto &table : cloudSchema->tables) {
1686 if (CloudStorageUtils::IsSharedTable(table)) {
1687 continue;
1688 }
1689 if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1690 DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1691 res.first = table.name;
1692 break;
1693 }
1694 }
1695 if (res.first.empty()) {
1696 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1697 res.second = -E_SCHEMA_MISMATCH;
1698 }
1699 return res;
1700 }
1701
GetSharedTargetTableName(const std::string & tableName)1702 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1703 {
1704 std::pair<std::string, int> res = { "", E_OK };
1705 std::shared_ptr<DataBaseSchema> cloudSchema;
1706 (void) GetCloudDbSchema(cloudSchema);
1707 if (cloudSchema == nullptr) {
1708 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1709 return { "", -E_INTERNAL_ERROR };
1710 }
1711 for (const auto &table : cloudSchema->tables) {
1712 if (CloudStorageUtils::IsSharedTable(table)) {
1713 continue;
1714 }
1715 if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1716 res.first = table.sharedTableName;
1717 break;
1718 }
1719 }
1720 if (res.first.empty()) {
1721 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1722 res.second = -E_SCHEMA_MISMATCH;
1723 }
1724 return res;
1725 }
1726
SetLogicDelete(bool logicDelete)1727 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1728 {
1729 logicDelete_ = logicDelete;
1730 LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1731 }
1732
IsCurrentLogicDelete() const1733 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1734 {
1735 return logicDelete_;
1736 }
1737
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1738 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema,
1739 const std::string &gid, const Bytes &hashKey, VBucket &assets)
1740 {
1741 if (gid.empty() && hashKey.empty()) {
1742 LOGE("both gid and hashKey are empty.");
1743 return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
1744 }
1745 if (transactionHandle_ == nullptr) {
1746 LOGE("the transaction has not been started");
1747 return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
1748 }
1749 auto [errCode, status] = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1750 if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
1751 LOGE("get assets by gid or hashKey failed. %d", errCode);
1752 }
1753 return { errCode, status };
1754 }
1755
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1756 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1757 {
1758 int errCode = E_OK;
1759 auto *wHandle = GetHandle(true, errCode);
1760 if (wHandle == nullptr) {
1761 return errCode;
1762 }
1763 wHandle->SetIAssetLoader(loader);
1764 ReleaseHandle(wHandle);
1765 return errCode;
1766 }
1767
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1768 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1769 const std::vector<VBucket> &records)
1770 {
1771 int errCode = E_OK;
1772 auto *handle = GetHandle(true, errCode);
1773 if (handle == nullptr || errCode != E_OK) {
1774 return errCode;
1775 }
1776 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1777 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1778 errCode = UpsertDataInner(handle, tableName, records);
1779 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1780 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1781 ReleaseHandle(handle);
1782 return errCode;
1783 }
1784
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1785 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1786 const std::string &tableName, const std::vector<VBucket> &records)
1787 {
1788 int errCode = E_OK;
1789 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1790 if (errCode != E_OK) {
1791 LOGE("[RDBStorageEngine] start transaction failed %d when upsert data", errCode);
1792 return errCode;
1793 }
1794 errCode = CreateTempSyncTriggerInner(handle, tableName);
1795 if (errCode == E_OK) {
1796 errCode = UpsertDataInTransaction(handle, tableName, records);
1797 (void) handle->ClearAllTempSyncTrigger();
1798 }
1799 if (errCode == E_OK) {
1800 errCode = handle->Commit();
1801 if (errCode != E_OK) {
1802 LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1803 }
1804 } else {
1805 int ret = handle->Rollback();
1806 if (ret != E_OK) {
1807 LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1808 }
1809 }
1810 return errCode;
1811 }
1812
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1813 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1814 const std::string &tableName, const std::vector<VBucket> &records)
1815 {
1816 TableSchema tableSchema;
1817 int errCode = GetCloudTableSchema(tableName, tableSchema);
1818 if (errCode != E_OK) {
1819 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1820 return errCode;
1821 }
1822 TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1823 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1824 std::set<std::vector<uint8_t>> primaryKeys;
1825 DownloadData downloadData;
1826 for (const auto &record : records) {
1827 DataInfoWithLog dataInfoWithLog;
1828 VBucket assetInfo;
1829 auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1830 tableSchema, localTable, pkMap, false);
1831 if (errorCode != E_OK) {
1832 return errorCode;
1833 }
1834 errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1835 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1836 return errCode;
1837 }
1838 VBucket recordCopy = record;
1839 if ((errCode == -E_NOT_FOUND ||
1840 (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1841 primaryKeys.find(hashValue) == primaryKeys.end()) {
1842 downloadData.opType.push_back(OpType::INSERT);
1843 auto currentTime = TimeHelper::GetSysCurrentTime();
1844 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1845 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1846 primaryKeys.insert(hashValue);
1847 } else {
1848 downloadData.opType.push_back(OpType::UPDATE);
1849 recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1850 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1851 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1852 recordCopy[CloudDbConstant::SHARING_RESOURCE_FIELD] = dataInfoWithLog.logInfo.sharingResource;
1853 recordCopy[CloudDbConstant::VERSION_FIELD] = dataInfoWithLog.logInfo.version;
1854 }
1855 downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1856 downloadData.data.push_back(std::move(recordCopy));
1857 }
1858 return PutCloudSyncDataInner(handle, tableName, downloadData);
1859 }
1860
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)1861 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, bool recordConflict,
1862 const LogInfo &logInfo)
1863 {
1864 if (transactionHandle_ == nullptr) {
1865 LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1866 return -E_INVALID_DB;
1867 }
1868 TableSchema tableSchema;
1869 GetCloudTableSchema(tableName, tableSchema);
1870 std::vector<VBucket> assets;
1871 int errCode = transactionHandle_->GetDownloadAssetRecordsByGid(tableSchema, logInfo.cloudGid, assets);
1872 if (errCode != E_OK) {
1873 LOGE("[RelationalSyncAbleStorage] get download asset by gid %s failed %d",
1874 DBCommon::StringMiddleMasking(logInfo.cloudGid).c_str(), errCode);
1875 return errCode;
1876 }
1877 bool isInconsistency = !assets.empty();
1878 UpdateRecordFlagStruct updateRecordFlag = {
1879 .tableName = tableName,
1880 .isRecordConflict = recordConflict,
1881 .isInconsistency = isInconsistency
1882 };
1883 std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(updateRecordFlag, logInfo);
1884 return transactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
1885 }
1886
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1887 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1888 OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1889 {
1890 TableSchema tableSchema;
1891 int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1892 if (errCode != E_OK) {
1893 LOGE("get table schema failed when fill log and asset. %d", errCode);
1894 return errCode;
1895 }
1896 errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1897 if (errCode != E_OK) {
1898 return errCode;
1899 }
1900 if (opType == OpType::INSERT) {
1901 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1902 handle, {data.tableName, CloudWaterType::INSERT, tableSchema}, data.insData, uploadRecorder_);
1903 } else if (opType == OpType::UPDATE) {
1904 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1905 handle, {data.tableName, CloudWaterType::UPDATE, tableSchema}, data.updData, uploadRecorder_);
1906 } else if (opType == OpType::DELETE) {
1907 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1908 handle, {data.tableName, CloudWaterType::DELETE, tableSchema}, data.delData, uploadRecorder_);
1909 } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1910 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1911 handle, {data.tableName, CloudWaterType::BUTT, tableSchema}, data.lockData, uploadRecorder_, true);
1912 }
1913 return errCode;
1914 }
1915
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users,bool isQueryDownloadRecords)1916 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
1917 std::vector<std::string> &users, bool isQueryDownloadRecords)
1918 {
1919 std::vector<TableSchema> tables;
1920 int errCode = GetCloudTableWithoutShared(tables);
1921 if (errCode != E_OK) {
1922 return errCode;
1923 }
1924 if (tables.empty()) {
1925 LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1926 return E_OK;
1927 }
1928 auto *handle = GetHandle(true, errCode);
1929 if (handle == nullptr || errCode != E_OK) {
1930 return errCode;
1931 }
1932 errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery, isQueryDownloadRecords);
1933 ReleaseHandle(handle);
1934 return errCode;
1935 }
1936
ClearUnLockingNoNeedCompensated()1937 int RelationalSyncAbleStorage::ClearUnLockingNoNeedCompensated()
1938 {
1939 std::vector<TableSchema> tables;
1940 int errCode = GetCloudTableWithoutShared(tables);
1941 if (errCode != E_OK) {
1942 return errCode;
1943 }
1944 if (tables.empty()) {
1945 LOGI("[RDBStorage] Table is empty, no need to clear unlocking status");
1946 return E_OK;
1947 }
1948 auto *handle = GetHandle(true, errCode);
1949 if (handle == nullptr || errCode != E_OK) {
1950 return errCode;
1951 }
1952 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1953 if (errCode != E_OK) {
1954 ReleaseHandle(handle);
1955 return errCode;
1956 }
1957 for (const auto &table : tables) {
1958 errCode = handle->ClearUnLockingStatus(table.name);
1959 if (errCode != E_OK) {
1960 LOGW("[ClearUnLockingNoNeedCompensated] clear unlocking status failed, continue! errCode=%d", errCode);
1961 }
1962 }
1963 errCode = handle->Commit();
1964 if (errCode != E_OK) {
1965 LOGE("[ClearUnLockingNoNeedCompensated] commit failed %d when clear unlocking status", errCode);
1966 }
1967 ReleaseHandle(handle);
1968 return errCode;
1969 }
1970
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1971 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1972 {
1973 const auto tableInfos = GetSchemaInfo().GetTables();
1974 for (const auto &[tableName, info] : tableInfos) {
1975 if (info.GetSharedTableMark()) {
1976 continue;
1977 }
1978 TableSchema schema;
1979 int errCode = GetCloudTableSchema(tableName, schema);
1980 if (errCode == -E_NOT_FOUND) {
1981 continue;
1982 }
1983 if (errCode != E_OK) {
1984 LOGW("[RDBStorage] Get cloud table failed %d", errCode);
1985 return errCode;
1986 }
1987 tables.push_back(schema);
1988 }
1989 return E_OK;
1990 }
1991
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery,bool isQueryDownloadRecords)1992 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1993 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords)
1994 {
1995 int errCode = E_OK;
1996 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1997 if (errCode != E_OK) {
1998 return errCode;
1999 }
2000 for (const auto &table : tables) {
2001 if (!CheckTableSupportCompensatedSync(table)) {
2002 continue;
2003 }
2004
2005 std::vector<VBucket> syncDataPk;
2006 errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk, isQueryDownloadRecords);
2007 if (errCode != E_OK) {
2008 LOGW("[RDBStorageEngine] Get wait compensated sync data failed, continue! errCode=%d", errCode);
2009 errCode = E_OK;
2010 continue;
2011 }
2012 if (syncDataPk.empty()) {
2013 // no data need to compensated sync
2014 continue;
2015 }
2016 errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, false, syncQuery);
2017 if (errCode != E_OK) {
2018 LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
2019 errCode = E_OK;
2020 continue;
2021 }
2022 }
2023 if (errCode == E_OK) {
2024 errCode = handle->Commit();
2025 if (errCode != E_OK) {
2026 LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
2027 }
2028 } else {
2029 int ret = handle->Rollback();
2030 if (ret != E_OK) {
2031 LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
2032 }
2033 }
2034 return errCode;
2035 }
2036
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,bool flag)2037 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2038 const std::string &tableName, bool flag)
2039 {
2040 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
2041 if (trackerTable.IsEmpty()) {
2042 trackerTable.SetTableName(tableName);
2043 }
2044 return handle->CreateTempSyncTrigger(trackerTable, flag);
2045 }
2046
CheckTableSupportCompensatedSync(const TableSchema & table)2047 bool RelationalSyncAbleStorage::CheckTableSupportCompensatedSync(const TableSchema &table)
2048 {
2049 auto it = std::find_if(table.fields.begin(), table.fields.end(), [](const auto &field) {
2050 return field.primary && (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets> ||
2051 field.type == TYPE_INDEX<Bytes>);
2052 });
2053 if (it != table.fields.end()) {
2054 LOGI("[RDBStorageEngine] Table contain not support pk field type, ignored");
2055 return false;
2056 }
2057 // check whether reference exist
2058 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2059 int errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
2060 if (errCode != E_OK) {
2061 LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2062 return false;
2063 }
2064 if (!tableReference.empty()) {
2065 LOGI("[RDBStorageEngine] current table exist reference property");
2066 return false;
2067 }
2068 return true;
2069 }
2070
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)2071 int RelationalSyncAbleStorage::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
2072 const std::set<std::string> &gidFilters)
2073 {
2074 if (transactionHandle_ == nullptr) {
2075 LOGE("the transaction has not been started");
2076 return -E_INVALID_DB;
2077 }
2078 int errCode = transactionHandle_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
2079 if (errCode != E_OK) {
2080 LOGE("[RelationalSyncAbleStorage] mark flag as consistent failed.%d", errCode);
2081 }
2082 return errCode;
2083 }
2084
GetCloudSyncConfig() const2085 CloudSyncConfig RelationalSyncAbleStorage::GetCloudSyncConfig() const
2086 {
2087 std::lock_guard<std::mutex> autoLock(configMutex_);
2088 return cloudSyncConfig_;
2089 }
2090
SetCloudSyncConfig(const CloudSyncConfig & config)2091 void RelationalSyncAbleStorage::SetCloudSyncConfig(const CloudSyncConfig &config)
2092 {
2093 std::lock_guard<std::mutex> autoLock(configMutex_);
2094 cloudSyncConfig_ = config;
2095 }
2096
IsTableExistReference(const std::string & table)2097 bool RelationalSyncAbleStorage::IsTableExistReference(const std::string &table)
2098 {
2099 // check whether reference exist
2100 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2101 int errCode = RelationalSyncAbleStorage::GetTableReference(table, tableReference);
2102 if (errCode != E_OK) {
2103 LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2104 return false;
2105 }
2106 return !tableReference.empty();
2107 }
2108
IsTableExistReferenceOrReferenceBy(const std::string & table)2109 bool RelationalSyncAbleStorage::IsTableExistReferenceOrReferenceBy(const std::string &table)
2110 {
2111 // check whether reference or reference by exist
2112 if (storageEngine_ == nullptr) {
2113 LOGE("[IsTableExistReferenceOrReferenceBy] storage is null when get reference gid");
2114 return false;
2115 }
2116 RelationalSchemaObject schema = storageEngine_->GetSchema();
2117 auto referenceProperty = schema.GetReferenceProperty();
2118 if (referenceProperty.empty()) {
2119 return false;
2120 }
2121 auto [sourceTableName, errCode] = GetSourceTableName(table);
2122 if (errCode != E_OK) {
2123 return false;
2124 }
2125 for (const auto &property : referenceProperty) {
2126 if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName) ||
2127 DBCommon::CaseInsensitiveCompare(property.targetTableName, sourceTableName)) {
2128 return true;
2129 }
2130 }
2131 return false;
2132 }
2133
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)2134 void RelationalSyncAbleStorage::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
2135 Timestamp localMark)
2136 {
2137 uploadRecorder_.ReleaseUploadRecord(tableName, type, localMark);
2138 }
2139
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)2140 int RelationalSyncAbleStorage::ReviseLocalModTime(const std::string &tableName,
2141 const std::vector<ReviseModTimeInfo> &revisedData)
2142 {
2143 if (storageEngine_ == nullptr) {
2144 LOGE("[ReviseLocalModTime] Storage is null");
2145 return -E_INVALID_DB;
2146 }
2147 int errCode = E_OK;
2148 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
2149 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
2150 if (writeHandle == nullptr) {
2151 LOGE("[ReviseLocalModTime] Get write handle fail: %d", errCode);
2152 return errCode;
2153 }
2154 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
2155 if (errCode != E_OK) {
2156 LOGE("[ReviseLocalModTime] Start Transaction fail: %d", errCode);
2157 ReleaseHandle(writeHandle);
2158 return errCode;
2159 }
2160 errCode = writeHandle->ReviseLocalModTime(tableName, revisedData);
2161 if (errCode != E_OK) {
2162 LOGE("[ReviseLocalModTime] Revise local modify time fail: %d", errCode);
2163 writeHandle->Rollback();
2164 ReleaseHandle(writeHandle);
2165 return errCode;
2166 }
2167 errCode = writeHandle->Commit();
2168 ReleaseHandle(writeHandle);
2169 return errCode;
2170 }
2171
GetCursor(const std::string & tableName,uint64_t & cursor)2172 int RelationalSyncAbleStorage::GetCursor(const std::string &tableName, uint64_t &cursor)
2173 {
2174 if (transactionHandle_ == nullptr) {
2175 LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
2176 return -E_INVALID_DB;
2177 }
2178 return transactionHandle_->GetCursor(tableName, cursor);
2179 }
2180
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)2181 int RelationalSyncAbleStorage::GetLocalDataCount(const std::string &tableName, int &dataCount,
2182 int &logicDeleteDataCount)
2183 {
2184 int errCode = E_OK;
2185 auto *handle = GetHandle(false, errCode);
2186 if (handle == nullptr || errCode != E_OK) {
2187 LOGE("[RelationalSyncAbleStorage] Get handle failed when get local data count: %d", errCode);
2188 return errCode;
2189 }
2190 errCode = handle->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
2191 ReleaseHandle(handle);
2192 return errCode;
2193 }
2194 }
2195 #endif
2196