1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17
18 #include <utility>
19
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "relational_remote_query_continue_token.h"
29 #include "relational_sync_data_inserter.h"
30 #include "res_finalizer.h"
31 #include "runtime_context.h"
32 #include "time_helper.h"
33
34 namespace DistributedDB {
35 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)36 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
37 {
38 static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
39 (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
40 std::string(CLOSE_CONN_TASK),
41 [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
42 );
43 }
44 }
45
46 #define CHECK_STORAGE_ENGINE do { \
47 if (storageEngine_ == nullptr) { \
48 return -E_INVALID_DB; \
49 } \
50 } while (0)
51
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)52 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
53 : storageEngine_(std::move(engine)),
54 reusedHandle_(nullptr),
55 isCachedOption_(false)
56 {}
57
~RelationalSyncAbleStorage()58 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
59 {
60 syncAbleEngine_ = nullptr;
61 }
62
63 // Get interface type of this relational db.
GetInterfaceType() const64 int RelationalSyncAbleStorage::GetInterfaceType() const
65 {
66 return SYNC_RELATION;
67 }
68
69 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()70 void RelationalSyncAbleStorage::IncRefCount()
71 {
72 LOGD("RelationalSyncAbleStorage ref +1");
73 IncObjRef(this);
74 }
75
76 // Drop the interface ref-count.
DecRefCount()77 void RelationalSyncAbleStorage::DecRefCount()
78 {
79 LOGD("RelationalSyncAbleStorage ref -1");
80 DecObjRef(this);
81 }
82
83 // Get the identifier of this rdb.
GetIdentifier() const84 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
85 {
86 std::string identifier = storageEngine_->GetIdentifier();
87 return std::vector<uint8_t>(identifier.begin(), identifier.end());
88 }
89
GetDualTupleIdentifier() const90 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
91 {
92 std::string identifier = storageEngine_->GetProperties().GetStringProp(
93 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
94 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
95 return identifierVect;
96 }
97
98 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const99 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp ×tamp) const
100 {
101 int errCode = E_OK;
102 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
103 if (handle == nullptr) {
104 return;
105 }
106 timestamp = 0;
107 errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
108 if (errCode != E_OK) {
109 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
110 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
111 }
112 ReleaseHandle(handle);
113 return;
114 }
115
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const116 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const
117 {
118 int errCode = E_OK;
119 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
120 if (handle == nullptr) {
121 return errCode;
122 }
123 timestamp = 0;
124 errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
125 if (errCode != E_OK) {
126 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
127 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
128 }
129 ReleaseHandle(handle);
130 return errCode;
131 }
132
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const133 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
134 OperatePerm perm) const
135 {
136 if (storageEngine_ == nullptr) {
137 errCode = -E_INVALID_DB;
138 return nullptr;
139 }
140 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
141 storageEngine_->FindExecutor(isWrite, perm, errCode));
142 if (handle == nullptr) {
143 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
144 }
145 return handle;
146 }
147
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const148 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
149 int &errCode, OperatePerm perm) const
150 {
151 if (storageEngine_ == nullptr) {
152 errCode = -E_INVALID_DB;
153 return nullptr;
154 }
155 if (transactionHandle_ != nullptr) {
156 return transactionHandle_;
157 }
158 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
159 storageEngine_->FindExecutor(isWrite, perm, errCode));
160 if (errCode != E_OK) {
161 ReleaseHandle(handle);
162 handle = nullptr;
163 }
164 return handle;
165 }
166
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const167 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
168 {
169 if (storageEngine_ == nullptr) {
170 return;
171 }
172 StorageExecutor *databaseHandle = handle;
173 storageEngine_->Recycle(databaseHandle);
174 std::function<void()> listener = nullptr;
175 {
176 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
177 listener = heartBeatListener_;
178 }
179 if (listener) {
180 listener();
181 }
182 }
183
184 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const185 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
186 {
187 CHECK_STORAGE_ENGINE;
188 if (key.size() > DBConstant::MAX_KEY_SIZE) {
189 return -E_INVALID_ARGS;
190 }
191 int errCode = E_OK;
192 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
193 if (handle == nullptr) {
194 return errCode;
195 }
196 errCode = handle->GetKvData(key, value);
197 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
198 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
199 }
200 ReleaseHandle(handle);
201 return errCode;
202 }
203
204 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)205 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
206 {
207 CHECK_STORAGE_ENGINE;
208 int errCode = E_OK;
209 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
210 if (handle == nullptr) {
211 return errCode;
212 }
213
214 errCode = handle->PutKvData(key, value); // meta doesn't need time.
215 if (errCode != E_OK) {
216 LOGE("Put kv data err:%d", errCode);
217 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
218 }
219 ReleaseHandle(handle);
220 return errCode;
221 }
222
PutMetaData(const Key & key,const Value & value,bool isInTransaction)223 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
224 {
225 CHECK_STORAGE_ENGINE;
226 int errCode = E_OK;
227 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
228 std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
229
230 // try to recycle using the handle
231 if (isInTransaction) {
232 handLock.lock();
233 if (reusedHandle_ != nullptr) {
234 handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
235 } else {
236 isInTransaction = false;
237 handLock.unlock();
238 }
239 }
240
241 if (handle == nullptr) {
242 handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
243 if (handle == nullptr) {
244 return errCode;
245 }
246 }
247
248 errCode = handle->PutKvData(key, value);
249 if (errCode != E_OK) {
250 LOGE("Put kv data err:%d", errCode);
251 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
252 }
253 if (!isInTransaction) {
254 ReleaseHandle(handle);
255 }
256 return errCode;
257 }
258
259 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)260 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
261 {
262 CHECK_STORAGE_ENGINE;
263 for (const auto &key : keys) {
264 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
265 return -E_INVALID_ARGS;
266 }
267 }
268 int errCode = E_OK;
269 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
270 if (handle == nullptr) {
271 return errCode;
272 }
273
274 handle->StartTransaction(TransactType::IMMEDIATE);
275 errCode = handle->DeleteMetaData(keys);
276 if (errCode != E_OK) {
277 handle->Rollback();
278 LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
279 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
280 } else {
281 handle->Commit();
282 }
283 ReleaseHandle(handle);
284 return errCode;
285 }
286
287 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const288 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
289 {
290 CHECK_STORAGE_ENGINE;
291 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
292 return -E_INVALID_ARGS;
293 }
294
295 int errCode = E_OK;
296 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
297 if (handle == nullptr) {
298 return errCode;
299 }
300
301 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
302 if (errCode != E_OK) {
303 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
304 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
305 }
306 ReleaseHandle(handle);
307 return errCode;
308 }
309
310 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const311 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
312 {
313 CHECK_STORAGE_ENGINE;
314 int errCode = E_OK;
315 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
316 if (handle == nullptr) {
317 return errCode;
318 }
319
320 errCode = handle->GetAllMetaKeys(keys);
321 if (errCode != E_OK) {
322 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
323 }
324 ReleaseHandle(handle);
325 return errCode;
326 }
327
GetDbProperties() const328 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
329 {
330 return storageEngine_->GetProperties();
331 }
332
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)333 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
334 {
335 int errCode = E_OK;
336 for (auto &item : dataItems) {
337 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
338 if (entry == nullptr) {
339 errCode = -E_OUT_OF_MEMORY;
340 LOGE("GetKvEntries failed, errCode:%d", errCode);
341 SingleVerKvEntry::Release(entries);
342 break;
343 }
344 entry->SetEntryData(std::move(item));
345 entries.push_back(entry);
346 }
347 return errCode;
348 }
349
GetDataItemSerialSize(const DataItem & item,size_t appendLen)350 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
351 {
352 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
353 // the size would not be very large.
354 static const size_t maxOrigDevLength = 40;
355 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
356 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
357 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
358 return dataSize;
359 }
360
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)361 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
362 size_t appendLen)
363 {
364 bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
365 for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
366 blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
367 reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
368 }
369 return !reachThreshold;
370 }
371
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)372 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
373 SQLiteSingleVerRelationalContinueToken *&token)
374 {
375 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
376 delete token;
377 token = nullptr;
378 return;
379 }
380
381 if (dataItems.empty()) {
382 errCode = -E_INTERNAL_ERROR;
383 LOGE("Get data unfinished but data items is empty.");
384 delete token;
385 token = nullptr;
386 return;
387 }
388 token->SetNextBeginTime(dataItems.back());
389 token->UpdateNextSyncOffset(dataItems.size());
390 }
391
392 /**
393 * Caller must ensure that parameter token is valid.
394 * If error happened, token will be deleted here.
395 */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const396 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
397 SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
398 {
399 if (storageEngine_ == nullptr) {
400 return -E_INVALID_DB;
401 }
402
403 int errCode = E_OK;
404 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
405 OperatePerm::NORMAL_PERM, errCode));
406 if (handle == nullptr) {
407 goto ERROR;
408 }
409
410 do {
411 errCode = handle->GetSyncDataByQuery(dataItems,
412 Parcel::GetAppendedLen(),
413 dataSizeInfo,
414 std::bind(&SQLiteSingleVerRelationalContinueToken::GetStatement, *token,
415 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
416 storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
417 if (errCode == -E_FINISHED) {
418 token->FinishGetData();
419 errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
420 }
421 } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
422
423 ERROR:
424 if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
425 dataItems.clear();
426 }
427 ProcessContinueTokenForQuerySync(dataItems, errCode, token);
428 ReleaseHandle(handle);
429 return errCode;
430 }
431
432 // use kv struct data to sync
433 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const434 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
435 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
436 std::vector<SingleVerKvEntry *> &entries) const
437 {
438 if (!timeRange.IsValid()) {
439 return -E_INVALID_ARGS;
440 }
441 query.SetSchema(storageEngine_->GetSchema());
442 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
443 if (token == nullptr) {
444 LOGE("[SingleVerNStore] Allocate continue token failed.");
445 return -E_OUT_OF_MEMORY;
446 }
447
448 continueStmtToken = static_cast<ContinueToken>(token);
449 return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
450 }
451
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const452 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
453 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
454 {
455 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
456 if (!token->CheckValid()) {
457 return -E_INVALID_ARGS;
458 }
459 RelationalSchemaObject schema = storageEngine_->GetSchema();
460 const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
461 std::vector<std::string> fieldNames;
462 fieldNames.reserve(fieldInfos.size());
463 for (const auto &fieldInfo : fieldInfos) { // order by cid
464 fieldNames.push_back(fieldInfo.GetFieldName());
465 }
466 token->SetFieldNames(fieldNames);
467
468 std::vector<DataItem> dataItems;
469 int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
470 if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
471 continueStmtToken = static_cast<ContinueToken>(token);
472 return errCode;
473 }
474
475 int innerCode = GetKvEntriesByDataItems(entries, dataItems);
476 if (innerCode != E_OK) {
477 errCode = innerCode;
478 delete token;
479 token = nullptr;
480 }
481 continueStmtToken = static_cast<ContinueToken>(token);
482 return errCode;
483 }
484
485 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)486 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
487 {
488 std::vector<DataItem> dataItems;
489 for (const auto &itemEntry : entries) {
490 GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
491 if (entry != nullptr) {
492 DataItem item;
493 item.origDev = entry->GetOrigDevice();
494 item.flag = entry->GetFlag();
495 item.timestamp = entry->GetTimestamp();
496 item.writeTimestamp = entry->GetWriteTimestamp();
497 entry->GetKey(item.key);
498 entry->GetValue(item.value);
499 entry->GetHashKey(item.hashKey);
500 dataItems.push_back(item);
501 }
502 }
503 return dataItems;
504 }
505 }
506
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)507 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
508 const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
509 {
510 std::vector<DataItem> dataItems = ConvertEntries(entries);
511 return PutSyncData(object, dataItems, deviceName);
512 }
513
514 namespace {
GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)515 inline DistributedTableMode GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
516 {
517 return static_cast<DistributedTableMode>(engine->GetProperties().GetIntProp(
518 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
519 }
520
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)521 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
522 {
523 return GetCollaborationMode(engine) == DistributedTableMode::COLLABORATION;
524 }
525 }
526
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)527 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
528 const std::string &deviceName)
529 {
530 int errCode = E_OK;
531 LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
532 QueryObject query = object;
533 query.SetSchema(storageEngine_->GetSchema());
534
535 RelationalSchemaObject remoteSchema;
536 errCode = GetRemoteDeviceSchema(deviceName, remoteSchema);
537 if (errCode != E_OK) {
538 LOGE("Find remote schema failed. err=%d", errCode);
539 return errCode;
540 }
541
542 StoreInfo info = GetStoreInfo();
543 auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
544 remoteSchema.GetTable(query.GetTableName()).GetFieldInfos(), info);
545 inserter.SetEntries(dataItems);
546
547 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
548 if (handle == nullptr) {
549 return errCode;
550 }
551
552 DBDfxAdapter::StartTracing();
553
554 errCode = handle->SaveSyncItems(inserter);
555
556 DBDfxAdapter::FinishTracing();
557 if (errCode == E_OK) {
558 // dataItems size > 0 now because already check before
559 // all dataItems will write into db now, so need to observer notify here
560 // if some dataItems will not write into db in the future, observer notify here need change
561 ChangedData data;
562 TriggerObserverAction(deviceName, std::move(data), false);
563 }
564
565 ReleaseHandle(handle);
566 return errCode;
567 }
568
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)569 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
570 const std::string &deviceName)
571 {
572 if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
573 LOGW("Device length is invalid for sync put");
574 return -E_INVALID_ARGS;
575 }
576
577 int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
578 if (errCode != E_OK) {
579 LOGE("[Relational] PutSyncData errCode:%d", errCode);
580 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
581 }
582 return errCode;
583 }
584
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)585 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
586 {
587 (void) deviceName;
588 (void) isNeedNotify;
589 return -E_NOT_SUPPORT;
590 }
591
GetSchemaInfo() const592 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
593 {
594 return storageEngine_->GetSchema();
595 }
596
GetSecurityOption(SecurityOption & option) const597 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
598 {
599 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
600 if (isCachedOption_) {
601 option = securityOption_;
602 return E_OK;
603 }
604 std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
605 int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
606 if (errCode == E_OK) {
607 option = securityOption_;
608 isCachedOption_ = true;
609 }
610 return errCode;
611 }
612
NotifyRemotePushFinished(const std::string & deviceId) const613 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
614 {
615 return;
616 }
617
618 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const619 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
620 {
621 return OS::GetCurrentSysTimeInMicrosecond(outTime);
622 }
623
GetTablesQuery()624 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
625 {
626 auto tableNames = storageEngine_->GetSchema().GetTableNames();
627 std::vector<QuerySyncObject> queries;
628 queries.reserve(tableNames.size());
629 for (const auto &it : tableNames) {
630 queries.emplace_back(Query::Select(it));
631 }
632 return queries;
633 }
634
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)635 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
636 {
637 (void) queryObj;
638 return -E_NOT_SUPPORT;
639 }
640
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const641 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
642 const std::string &targetID) const
643 {
644 return E_OK;
645 }
646
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)647 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
648 const RelationalSyncStrategy &syncStrategy)
649 {
650 auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
651 DistributedTableMode::SPLIT_BY_DEVICE);
652 if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
653 LOGD("No need create device table in COLLABORATION mode.");
654 return E_OK;
655 }
656
657 int errCode = E_OK;
658 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
659 if (handle == nullptr) {
660 return errCode;
661 }
662
663 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
664 if (errCode != E_OK) {
665 LOGE("Start transaction failed:%d", errCode);
666 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
667 ReleaseHandle(handle);
668 return errCode;
669 }
670
671 StoreInfo info = GetStoreInfo();
672 for (const auto &[table, strategy] : syncStrategy) {
673 if (!strategy.permitSync) {
674 continue;
675 }
676
677 errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
678 if (errCode != E_OK) {
679 LOGE("Create distributed device table failed. %d", errCode);
680 break;
681 }
682 }
683
684 if (errCode == E_OK) {
685 errCode = handle->Commit();
686 } else {
687 (void)handle->Rollback();
688 }
689
690 ReleaseHandle(handle);
691 return errCode;
692 }
693
RegisterSchemaChangedCallback(const std::function<void ()> & callback)694 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
695 {
696 std::lock_guard lock(onSchemaChangedMutex_);
697 onSchemaChanged_ = callback;
698 return E_OK;
699 }
700
NotifySchemaChanged()701 void RelationalSyncAbleStorage::NotifySchemaChanged()
702 {
703 std::lock_guard lock(onSchemaChangedMutex_);
704 if (onSchemaChanged_) {
705 LOGD("Notify relational schema was changed");
706 onSchemaChanged_();
707 }
708 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const709 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
710 {
711 algorithmSet.clear();
712 DataCompression::GetCompressionAlgo(algorithmSet);
713 return E_OK;
714 }
715
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)716 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
717 const RelationalObserverAction &action)
718 {
719 int errCode = E_OK;
720 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, action, &errCode] () mutable {
721 ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
722 auto it = dataChangeCallbackMap_.find(connectionId);
723 if (it != dataChangeCallbackMap_.end()) {
724 if (it->second.find(observer) != it->second.end()) {
725 LOGE("obsever already registered");
726 errCode = -E_ALREADY_SET;
727 return;
728 }
729 if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
730 LOGE("The number of relational observers has been over limit");
731 errCode = -E_MAX_LIMITS;
732 return;
733 }
734 it->second[observer] = action;
735 } else {
736 dataChangeCallbackMap_[connectionId][observer] = action;
737 }
738 LOGI("register relational observer ok");
739 }, nullptr, &dataChangeCallbackMap_);
740 ADAPTER_WAIT(handle);
741 return errCode;
742 }
743
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)744 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
745 {
746 if (observer == nullptr) {
747 EraseDataChangeCallback(connectionId);
748 return E_OK;
749 }
750 int errCode = -E_NOT_FOUND;
751 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, &errCode] () mutable {
752 ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
753 auto it = dataChangeCallbackMap_.find(connectionId);
754 if (it == dataChangeCallbackMap_.end()) {
755 return;
756 }
757 auto action = it->second.find(observer);
758 if (action != it->second.end()) {
759 it->second.erase(action);
760 LOGI("unregister relational observer.");
761 if (it->second.empty()) {
762 dataChangeCallbackMap_.erase(it);
763 LOGI("observer for this delegate is zero now");
764 }
765 errCode = E_OK;
766 }
767 }, nullptr, &dataChangeCallbackMap_);
768 ADAPTER_WAIT(handle);
769 return errCode;
770 }
771
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,int & observerCnt,const std::string & deviceName,const ChangedData & changedData,bool isChangedData)772 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
773 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item, int &observerCnt,
774 const std::string &deviceName, const ChangedData &changedData, bool isChangedData)
775 {
776 for (auto &action : item.second) {
777 if (action.second == nullptr) {
778 continue;
779 }
780 observerCnt++;
781 ChangedData observerChangeData = changedData;
782 if (action.first != nullptr) {
783 FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
784 }
785 action.second(deviceName, std::move(observerChangeData), isChangedData);
786 }
787 }
788
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)789 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
790 ChangedData &&changedData, bool isChangedData)
791 {
792 IncObjRef(this);
793 int taskErrCode =
794 ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData] () mutable {
795 LOGD("begin to trigger relational observer.");
796 int observerCnt = 0;
797 ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
798 for (const auto &item : dataChangeCallbackMap_) {
799 ExecuteDataChangeCallback(item, observerCnt, deviceName, changedData, isChangedData);
800 }
801 LOGD("relational observer size = %d", observerCnt);
802 DecObjRef(this);
803 }, &dataChangeCallbackMap_);
804 if (taskErrCode != E_OK) {
805 LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
806 DecObjRef(this);
807 }
808 }
809
RegisterHeartBeatListener(const std::function<void ()> & listener)810 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
811 {
812 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
813 heartBeatListener_ = listener;
814 }
815
CheckAndInitQueryCondition(QueryObject & query) const816 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
817 {
818 RelationalSchemaObject schema = storageEngine_->GetSchema();
819 TableInfo table = schema.GetTable(query.GetTableName());
820 if (!table.IsValid()) {
821 LOGE("Query table is not a distributed table.");
822 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
823 }
824 if (table.GetTableSyncType() == CLOUD_COOPERATION) {
825 LOGE("cloud table mode is not support");
826 return -E_NOT_SUPPORT;
827 }
828 query.SetSchema(schema);
829
830 int errCode = E_OK;
831 auto *handle = GetHandle(true, errCode);
832 if (handle == nullptr) {
833 return errCode;
834 }
835
836 errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
837 if (errCode != E_OK) {
838 LOGE("Check relational query condition failed. %d", errCode);
839 TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
840 }
841
842 ReleaseHandle(handle);
843 return errCode;
844 }
845
CheckCompatible(const std::string & schema,uint8_t type) const846 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
847 {
848 // return true if is relational schema.
849 return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
850 }
851
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const852 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
853 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
854 {
855 if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
856 return -E_NOT_SUPPORT;
857 }
858 if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
859 LOGE("[ExecuteQuery] invalid args");
860 return -E_INVALID_ARGS;
861 }
862 int errCode = E_OK;
863 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
864 if (handle == nullptr) {
865 LOGE("[ExecuteQuery] get handle fail:%d", errCode);
866 return errCode;
867 }
868 errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
869 if (errCode != E_OK) {
870 LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
871 }
872 ReleaseHandle(handle);
873 return errCode;
874 }
875
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const876 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
877 RelationalRowDataSet &dataSet, ContinueToken &token) const
878 {
879 dataSet.Clear();
880 if (token == nullptr) {
881 // start query
882 std::vector<std::string> colNames;
883 std::vector<RelationalRowData *> data;
884 ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
885
886 int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
887 if (errCode != E_OK) {
888 return errCode;
889 }
890
891 // create one token
892 token = static_cast<ContinueToken>(
893 new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
894 if (token == nullptr) {
895 LOGE("ExecuteQuery OOM");
896 return -E_OUT_OF_MEMORY;
897 }
898 }
899
900 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
901 if (!remoteToken->CheckValid()) {
902 LOGE("ExecuteQuery invalid token");
903 return -E_INVALID_ARGS;
904 }
905
906 int errCode = remoteToken->GetData(packetSize, dataSet);
907 if (errCode == -E_UNFINISHED) {
908 errCode = E_OK;
909 } else {
910 if (errCode != E_OK) {
911 dataSet.Clear();
912 }
913 delete remoteToken;
914 remoteToken = nullptr;
915 token = nullptr;
916 }
917 LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
918 return errCode;
919 }
920
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)921 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
922 uint8_t type)
923 {
924 if (ReadSchemaType(type) != SchemaType::RELATIVE) {
925 return -E_INVALID_ARGS;
926 }
927
928 RelationalSchemaObject schemaObj;
929 int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
930 if (errCode != E_OK) {
931 LOGE("Parse remote schema failed. err=%d", errCode);
932 return errCode;
933 }
934
935 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
936 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
937 Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
938 errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
939 if (errCode != E_OK) {
940 LOGE("Save remote schema failed. err=%d", errCode);
941 return errCode;
942 }
943
944 return remoteDeviceSchema_.Put(deviceId, remoteSchema);
945 }
946
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj)947 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj)
948 {
949 if (schemaObj.IsSchemaValid()) {
950 return -E_INVALID_ARGS;
951 }
952
953 std::string remoteSchema;
954 int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
955 if (errCode == -E_NOT_FOUND) {
956 LOGW("Get remote device schema miss cached.");
957 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
958 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
959 Value remoteSchemaBuff;
960 errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
961 if (errCode != E_OK) {
962 LOGE("Get remote device schema from meta failed. err=%d", errCode);
963 return errCode;
964 }
965 remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
966 errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
967 }
968
969 if (errCode != E_OK) {
970 LOGE("Get remote device schema failed. err=%d", errCode);
971 return errCode;
972 }
973
974 errCode = schemaObj.ParseFromSchemaString(remoteSchema);
975 if (errCode != E_OK) {
976 LOGE("Parse remote schema failed. err=%d", errCode);
977 }
978 return errCode;
979 }
980
SetReusedHandle(StorageExecutor * handle)981 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
982 {
983 std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
984 reusedHandle_ = handle;
985 }
986
ReleaseRemoteQueryContinueToken(ContinueToken & token) const987 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
988 {
989 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
990 delete remoteToken;
991 remoteToken = nullptr;
992 token = nullptr;
993 }
994
GetStoreInfo() const995 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
996 {
997 StoreInfo info = {
998 storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
999 storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
1000 storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
1001 };
1002 return info;
1003 }
1004
StartTransaction(TransactType type)1005 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
1006 {
1007 CHECK_STORAGE_ENGINE;
1008 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1009 if (transactionHandle_ != nullptr) {
1010 LOGD("Transaction started already.");
1011 return -E_TRANSACT_STATE;
1012 }
1013 int errCode = E_OK;
1014 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1015 storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1016 if (handle == nullptr) {
1017 ReleaseHandle(handle);
1018 return errCode;
1019 }
1020 errCode = handle->StartTransaction(type);
1021 if (errCode != E_OK) {
1022 ReleaseHandle(handle);
1023 return errCode;
1024 }
1025 transactionHandle_ = handle;
1026 return errCode;
1027 }
1028
Commit()1029 int RelationalSyncAbleStorage::Commit()
1030 {
1031 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1032 if (transactionHandle_ == nullptr) {
1033 LOGE("relation database is null or the transaction has not been started");
1034 return -E_INVALID_DB;
1035 }
1036 int errCode = transactionHandle_->Commit();
1037 ReleaseHandle(transactionHandle_);
1038 transactionHandle_ = nullptr;
1039 LOGD("connection commit transaction!");
1040 return errCode;
1041 }
1042
Rollback()1043 int RelationalSyncAbleStorage::Rollback()
1044 {
1045 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1046 if (transactionHandle_ == nullptr) {
1047 LOGE("Invalid handle for rollback or the transaction has not been started.");
1048 return -E_INVALID_DB;
1049 }
1050
1051 int errCode = transactionHandle_->Rollback();
1052 ReleaseHandle(transactionHandle_);
1053 transactionHandle_ = nullptr;
1054 LOGI("connection rollback transaction!");
1055 return errCode;
1056 }
1057
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,int64_t & count)1058 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp,
1059 bool isCloudForcePush, int64_t &count)
1060 {
1061 int errCode = E_OK;
1062 auto *handle = GetHandleExpectTransaction(false, errCode);
1063 if (handle == nullptr) {
1064 return errCode;
1065 }
1066 QuerySyncObject queryObj = query;
1067 queryObj.SetSchema(GetSchemaInfo());
1068 errCode = handle->GetUploadCount(timestamp, isCloudForcePush, queryObj, count);
1069 if (transactionHandle_ == nullptr) {
1070 ReleaseHandle(handle);
1071 }
1072 return errCode;
1073 }
1074
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1075 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1076 const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1077 {
1078 if (transactionHandle_ == nullptr) {
1079 LOGE("the transaction has not been started");
1080 return -E_INVALID_DB;
1081 }
1082 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1083 QuerySyncObject query = querySyncObject;
1084 query.SetSchema(GetSchemaInfo());
1085 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1086 if (token == nullptr) {
1087 LOGE("[SingleVerNStore] Allocate continue token failed.");
1088 return -E_OUT_OF_MEMORY;
1089 }
1090 token->SetCloudTableSchema(tableSchema);
1091 continueStmtToken = static_cast<ContinueToken>(token);
1092 return GetCloudDataNext(continueStmtToken, cloudDataResult);
1093 }
1094
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1095 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1096 CloudSyncData &cloudDataResult)
1097 {
1098 if (continueStmtToken == nullptr) {
1099 return -E_INVALID_ARGS;
1100 }
1101 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1102 if (!token->CheckValid()) {
1103 return -E_INVALID_ARGS;
1104 }
1105 if (transactionHandle_ == nullptr) {
1106 LOGE("the transaction has not been started, release the token");
1107 ReleaseCloudDataToken(continueStmtToken);
1108 return -E_INVALID_DB;
1109 }
1110 cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1111 int errCode = transactionHandle_->GetSyncCloudData(cloudDataResult, CloudDbConstant::MAX_UPLOAD_SIZE, *token);
1112 if (errCode != -E_UNFINISHED) {
1113 delete token;
1114 token = nullptr;
1115 }
1116 continueStmtToken = static_cast<ContinueToken>(token);
1117 if (errCode != E_OK && errCode != -E_UNFINISHED) {
1118 return errCode;
1119 }
1120 int fillRefGidCode = FillReferenceData(cloudDataResult);
1121 return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1122 }
1123
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,std::vector<std::string> & cloudGid)1124 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1125 bool isCloudForcePush, std::vector<std::string> &cloudGid)
1126 {
1127 int errCode = E_OK;
1128 auto *handle = GetHandle(false, errCode);
1129 if (handle == nullptr) {
1130 return errCode;
1131 }
1132 Timestamp beginTime = 0u;
1133 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1134 QuerySyncObject query = querySyncObject;
1135 query.SetSchema(GetSchemaInfo());
1136 errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, cloudGid);
1137 ReleaseHandle(handle);
1138 if (errCode != E_OK) {
1139 LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1140 }
1141 return errCode;
1142 }
1143
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1144 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1145 {
1146 if (continueStmtToken == nullptr) {
1147 return E_OK;
1148 }
1149 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1150 if (!token->CheckValid()) {
1151 return E_OK;
1152 }
1153 int errCode = token->ReleaseCloudStatement();
1154 delete token;
1155 token = nullptr;
1156 return errCode;
1157 }
1158
ChkSchema(const TableName & tableName)1159 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1160 {
1161 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1162 RelationalSchemaObject localSchema = GetSchemaInfo();
1163 return schemaMgr_.ChkSchema(tableName, localSchema);
1164 }
1165
SetCloudDbSchema(const DataBaseSchema & schema)1166 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1167 {
1168 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1169 schemaMgr_.SetCloudDbSchema(schema);
1170 return E_OK;
1171 }
1172
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1173 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1174 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1175 {
1176 if (transactionHandle_ == nullptr) {
1177 LOGE(" the transaction has not been started");
1178 return -E_INVALID_DB;
1179 }
1180
1181 return GetInfoByPrimaryKeyOrGidInner(transactionHandle_, tableName, vBucket, dataInfoWithLog, assetInfo);
1182 }
1183
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1184 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1185 const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1186 {
1187 TableSchema tableSchema;
1188 int errCode = GetCloudTableSchema(tableName, tableSchema);
1189 if (errCode != E_OK) {
1190 LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1191 return errCode;
1192 }
1193 RelationalSchemaObject localSchema = GetSchemaInfo();
1194 handle->SetLocalSchema(localSchema);
1195 return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1196 }
1197
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1198 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1199 {
1200 if (transactionHandle_ == nullptr) {
1201 LOGE(" the transaction has not been started");
1202 return -E_INVALID_DB;
1203 }
1204 return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1205 }
1206
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1207 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1208 const std::string &tableName, DownloadData &downloadData)
1209 {
1210 TableSchema tableSchema;
1211 int errCode = GetCloudTableSchema(tableName, tableSchema);
1212 if (errCode != E_OK) {
1213 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1214 return errCode;
1215 }
1216 RelationalSchemaObject localSchema = GetSchemaInfo();
1217 handle->SetLocalSchema(localSchema);
1218 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1219 handle->SetLogicDelete(IsCurrentLogicDelete());
1220 errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1221 handle->SetLogicDelete(false);
1222 return errCode;
1223 }
1224
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1225 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1226 {
1227 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1228 cloudSchema = schemaMgr_.GetCloudDbSchema();
1229 return E_OK;
1230 }
1231
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1232 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1233 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1234 {
1235 if (transactionHandle_ == nullptr) {
1236 LOGE("the transaction has not been started");
1237 return -E_INVALID_DB;
1238 }
1239 return transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets);
1240 }
1241
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1242 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1243 {
1244 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1245 return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1246 }
1247
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1248 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1249 bool isDownloadSuccess)
1250 {
1251 if (storageEngine_ == nullptr) {
1252 return -E_INVALID_DB;
1253 }
1254 if (transactionHandle_ == nullptr) {
1255 LOGE("the transaction has not been started when fill asset for download.");
1256 return -E_INVALID_DB;
1257 }
1258 TableSchema tableSchema;
1259 int errCode = GetCloudTableSchema(tableName, tableSchema);
1260 if (errCode != E_OK) {
1261 LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1262 return errCode;
1263 }
1264 errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess);
1265 if (errCode != E_OK) {
1266 LOGE("fill cloud asset for download failed.%d", errCode);
1267 }
1268 return errCode;
1269 }
1270
SetLogTriggerStatus(bool status)1271 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1272 {
1273 int errCode = E_OK;
1274 auto *handle = GetHandleExpectTransaction(false, errCode);
1275 if (handle == nullptr) {
1276 return errCode;
1277 }
1278 errCode = handle->SetLogTriggerStatus(status);
1279 if (transactionHandle_ == nullptr) {
1280 ReleaseHandle(handle);
1281 }
1282 return errCode;
1283 }
1284
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1285 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1286 bool ignoreEmptyGid)
1287 {
1288 CHECK_STORAGE_ENGINE;
1289 if (opType == OpType::UPDATE && data.updData.assets.empty()) {
1290 return E_OK;
1291 }
1292 int errCode = E_OK;
1293 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1294 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1295 if (writeHandle == nullptr) {
1296 return errCode;
1297 }
1298 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1299 if (errCode != E_OK) {
1300 ReleaseHandle(writeHandle);
1301 return errCode;
1302 }
1303 errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1304 if (errCode != E_OK) {
1305 LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1306 writeHandle->Rollback();
1307 ReleaseHandle(writeHandle);
1308 return errCode;
1309 }
1310 errCode = writeHandle->Commit();
1311 ReleaseHandle(writeHandle);
1312 return errCode;
1313 }
1314
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1315 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1316 {
1317 syncAbleEngine_ = syncAbleEngine;
1318 }
1319
GetIdentify() const1320 std::string RelationalSyncAbleStorage::GetIdentify() const
1321 {
1322 if (storageEngine_ == nullptr) {
1323 LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1324 return "";
1325 }
1326 return storageEngine_->GetIdentifier();
1327 }
1328
EraseDataChangeCallback(uint64_t connectionId)1329 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1330 {
1331 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1332 ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
1333 auto it = dataChangeCallbackMap_.find(connectionId);
1334 if (it != dataChangeCallbackMap_.end()) {
1335 dataChangeCallbackMap_.erase(it);
1336 LOGI("erase all observer for this delegate.");
1337 }
1338 }, nullptr, &dataChangeCallbackMap_);
1339 ADAPTER_WAIT(handle);
1340 }
1341
ReleaseContinueToken(ContinueToken & continueStmtToken) const1342 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1343 {
1344 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1345 if (token == nullptr || !(token->CheckValid())) {
1346 LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1347 return;
1348 }
1349 delete token;
1350 continueStmtToken = nullptr;
1351 }
1352
GetCloudDataGid(const QuerySyncObject & query,Timestamp beginTime,std::vector<std::string> & gid)1353 int RelationalSyncAbleStorage::GetCloudDataGid(const QuerySyncObject &query, Timestamp beginTime,
1354 std::vector<std::string> &gid)
1355 {
1356 return E_OK;
1357 }
1358
CheckQueryValid(const QuerySyncObject & query)1359 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1360 {
1361 int errCode = E_OK;
1362 auto *handle = GetHandle(false, errCode);
1363 if (handle == nullptr) {
1364 return errCode;
1365 }
1366 errCode = handle->CheckQueryObjectLegal(query);
1367 if (errCode != E_OK) {
1368 ReleaseHandle(handle);
1369 return errCode;
1370 }
1371 QuerySyncObject queryObj = query;
1372 queryObj.SetSchema(GetSchemaInfo());
1373 int64_t count = 0;
1374 errCode = handle->GetUploadCount(UINT64_MAX, false, queryObj, count);
1375 ReleaseHandle(handle);
1376 if (errCode != E_OK) {
1377 LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1378 return -E_INVALID_ARGS;
1379 }
1380 return errCode;
1381 }
1382
CreateTempSyncTrigger(const std::string & tableName)1383 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1384 {
1385 int errCode = E_OK;
1386 auto *handle = GetHandle(true, errCode);
1387 if (handle == nullptr) {
1388 return errCode;
1389 }
1390 errCode = CreateTempSyncTriggerInner(handle, tableName);
1391 ReleaseHandle(handle);
1392 if (errCode != E_OK) {
1393 LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1394 }
1395 return errCode;
1396 }
1397
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1398 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1399 ChangeProperties &changeProperties)
1400 {
1401 int errCode = E_OK;
1402 auto *handle = GetHandle(false, errCode);
1403 if (handle == nullptr) {
1404 return errCode;
1405 }
1406 errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1407 ReleaseHandle(handle);
1408 if (errCode != E_OK) {
1409 LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1410 }
1411 return errCode;
1412 }
1413
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1414 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1415 {
1416 if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1417 changedData.field = {};
1418 for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1419 changedData.primaryData[i].clear();
1420 }
1421 }
1422 if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1423 changedData.properties = {};
1424 }
1425 }
1426
ClearAllTempSyncTrigger()1427 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1428 {
1429 int errCode = E_OK;
1430 auto *handle = GetHandle(true, errCode);
1431 if (handle == nullptr) {
1432 return errCode;
1433 }
1434 errCode = handle->ClearAllTempSyncTrigger();
1435 ReleaseHandle(handle);
1436 if (errCode != E_OK) {
1437 LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1438 }
1439 return errCode;
1440 }
1441
FillReferenceData(CloudSyncData & syncData)1442 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1443 {
1444 std::map<int64_t, Entries> referenceGid;
1445 int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1446 if (errCode != E_OK) {
1447 LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1448 return errCode;
1449 }
1450 errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1451 if (errCode != E_OK) {
1452 return errCode;
1453 }
1454 referenceGid.clear();
1455 errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1456 if (errCode != E_OK) {
1457 LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1458 return errCode;
1459 }
1460 return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1461 }
1462
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1463 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1464 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1465 {
1466 if (referenceGid.empty()) {
1467 return E_OK;
1468 }
1469 int ignoredCount = 0;
1470 for (size_t index = 0u; index < rowid.size(); index++) {
1471 if (index >= extend.size()) {
1472 LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1473 return -E_UNEXPECTED_DATA;
1474 }
1475 int64_t rowId = rowid[index];
1476 if (referenceGid.find(rowId) == referenceGid.end()) {
1477 // current data miss match reference data, we ignored it
1478 ignoredCount++;
1479 continue;
1480 }
1481 extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1482 }
1483 if (ignoredCount != 0) {
1484 LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1485 }
1486 return E_OK;
1487 }
1488
IsSharedTable(const std::string & tableName)1489 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1490 {
1491 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1492 return schemaMgr_.IsSharedTable(tableName);
1493 }
1494
GetSharedTableOriginNames()1495 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1496 {
1497 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1498 return schemaMgr_.GetSharedTableOriginNames();
1499 }
1500
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1501 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1502 std::map<int64_t, Entries> &referenceGid)
1503 {
1504 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1505 int errCode = GetTableReference(tableName, tableReference);
1506 if (errCode != E_OK) {
1507 return errCode;
1508 }
1509 if (tableReference.empty()) {
1510 LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1511 return E_OK;
1512 }
1513 auto *handle = GetHandle(false, errCode);
1514 if (handle == nullptr) {
1515 return errCode;
1516 }
1517 errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1518 ReleaseHandle(handle);
1519 return errCode;
1520 }
1521
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1522 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1523 std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1524 {
1525 if (storageEngine_ == nullptr) {
1526 LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1527 return -E_INVALID_DB;
1528 }
1529 RelationalSchemaObject schema = storageEngine_->GetSchema();
1530 auto referenceProperty = schema.GetReferenceProperty();
1531 if (referenceProperty.empty()) {
1532 return E_OK;
1533 }
1534 auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1535 if (errCode != E_OK) {
1536 return errCode;
1537 }
1538 for (const auto &property : referenceProperty) {
1539 if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1540 if (!IsSharedTable(tableName)) {
1541 reference[property.targetTableName].push_back(property);
1542 continue;
1543 }
1544 TableReferenceProperty tableReference;
1545 tableReference.sourceTableName = tableName;
1546 tableReference.columns = property.columns;
1547 tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1548 auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1549 if (ret != E_OK) {
1550 return ret;
1551 }
1552 tableReference.targetTableName = sharedTargetTable;
1553 reference[tableReference.targetTableName].push_back(tableReference);
1554 }
1555 }
1556 return E_OK;
1557 }
1558
GetSourceTableName(const std::string & tableName)1559 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1560 {
1561 std::pair<std::string, int> res = { "", E_OK };
1562 std::shared_ptr<DataBaseSchema> cloudSchema;
1563 (void) GetCloudDbSchema(cloudSchema);
1564 if (cloudSchema == nullptr) {
1565 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1566 return { "", -E_INTERNAL_ERROR };
1567 }
1568 for (const auto &table : cloudSchema->tables) {
1569 if (CloudStorageUtils::IsSharedTable(table)) {
1570 continue;
1571 }
1572 if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1573 DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1574 res.first = table.name;
1575 break;
1576 }
1577 }
1578 if (res.first.empty()) {
1579 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1580 res.second = -E_SCHEMA_MISMATCH;
1581 }
1582 return res;
1583 }
1584
GetSharedTargetTableName(const std::string & tableName)1585 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1586 {
1587 std::pair<std::string, int> res = { "", E_OK };
1588 std::shared_ptr<DataBaseSchema> cloudSchema;
1589 (void) GetCloudDbSchema(cloudSchema);
1590 if (cloudSchema == nullptr) {
1591 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1592 return { "", -E_INTERNAL_ERROR };
1593 }
1594 for (const auto &table : cloudSchema->tables) {
1595 if (CloudStorageUtils::IsSharedTable(table)) {
1596 continue;
1597 }
1598 if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1599 res.first = table.sharedTableName;
1600 break;
1601 }
1602 }
1603 if (res.first.empty()) {
1604 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1605 res.second = -E_SCHEMA_MISMATCH;
1606 }
1607 return res;
1608 }
1609
SetLogicDelete(bool logicDelete)1610 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1611 {
1612 logicDelete_ = logicDelete;
1613 LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1614 }
1615
SetCloudTaskConfig(const CloudTaskConfig & config)1616 void RelationalSyncAbleStorage::SetCloudTaskConfig(const CloudTaskConfig &config)
1617 {
1618 allowLogicDelete_ = config.allowLogicDelete;
1619 LOGD("[RelationalSyncAbleStorage] allow logic delete %d", static_cast<int>(config.allowLogicDelete));
1620 }
1621
IsCurrentLogicDelete() const1622 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1623 {
1624 return allowLogicDelete_ && logicDelete_;
1625 }
1626
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1627 int RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid,
1628 const Bytes &hashKey, VBucket &assets)
1629 {
1630 if (gid.empty() && hashKey.empty()) {
1631 LOGE("both gid and hashKey are empty.");
1632 return -E_INVALID_ARGS;
1633 }
1634 if (transactionHandle_ == nullptr) {
1635 LOGE("the transaction has not been started");
1636 return -E_INVALID_DB;
1637 }
1638 int errCode = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1639 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1640 LOGE("get assets by gid or hashKey failed. %d", errCode);
1641 }
1642 return errCode;
1643 }
1644
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1645 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1646 {
1647 int errCode = E_OK;
1648 auto *wHandle = GetHandle(true, errCode);
1649 if (wHandle == nullptr) {
1650 return errCode;
1651 }
1652 wHandle->SetIAssetLoader(loader);
1653 ReleaseHandle(wHandle);
1654 return errCode;
1655 }
1656
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1657 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1658 const std::vector<VBucket> &records)
1659 {
1660 int errCode = E_OK;
1661 auto *handle = GetHandle(true, errCode);
1662 if (errCode != E_OK) {
1663 return errCode;
1664 }
1665 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1666 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1667 errCode = UpsertDataInner(handle, tableName, records);
1668 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1669 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1670 ReleaseHandle(handle);
1671 return errCode;
1672 }
1673
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1674 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1675 const std::string &tableName, const std::vector<VBucket> &records)
1676 {
1677 int errCode = E_OK;
1678 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1679 if (errCode != E_OK) {
1680 return errCode;
1681 }
1682 errCode = CreateTempSyncTriggerInner(handle, tableName);
1683 if (errCode == E_OK) {
1684 errCode = UpsertDataInTransaction(handle, tableName, records);
1685 (void) handle->ClearAllTempSyncTrigger();
1686 }
1687 if (errCode == E_OK) {
1688 errCode = handle->Commit();
1689 if (errCode != E_OK) {
1690 LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1691 }
1692 } else {
1693 int ret = handle->Rollback();
1694 if (ret != E_OK) {
1695 LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1696 }
1697 }
1698 return errCode;
1699 }
1700
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1701 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1702 const std::string &tableName, const std::vector<VBucket> &records)
1703 {
1704 TableSchema tableSchema;
1705 int errCode = GetCloudTableSchema(tableName, tableSchema);
1706 if (errCode != E_OK) {
1707 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1708 return errCode;
1709 }
1710 TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1711 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1712 std::set<std::vector<uint8_t>> primaryKeys;
1713 DownloadData downloadData;
1714 for (const auto &record : records) {
1715 DataInfoWithLog dataInfoWithLog;
1716 VBucket assetInfo;
1717 auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1718 tableSchema, localTable, pkMap, false);
1719 if (errorCode != E_OK) {
1720 return errorCode;
1721 }
1722 errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1723 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1724 return errCode;
1725 }
1726 VBucket recordCopy = record;
1727 if ((errCode == -E_NOT_FOUND ||
1728 (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1729 primaryKeys.find(hashValue) == primaryKeys.end()) {
1730 downloadData.opType.push_back(OpType::INSERT);
1731 auto currentTime = TimeHelper::GetSysCurrentTime();
1732 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1733 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1734 primaryKeys.insert(hashValue);
1735 } else {
1736 downloadData.opType.push_back(OpType::UPDATE);
1737 recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1738 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1739 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1740 }
1741 downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1742 downloadData.data.push_back(std::move(recordCopy));
1743 }
1744 return PutCloudSyncDataInner(handle, tableName, downloadData);
1745 }
1746
UpdateRecordFlag(const std::string & tableName,const std::string & gid,bool recordConflict)1747 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, const std::string &gid,
1748 bool recordConflict)
1749 {
1750 if (transactionHandle_ == nullptr) {
1751 LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1752 return -E_INVALID_DB;
1753 }
1754 if (gid.empty()) {
1755 LOGE("[RelationalSyncAbleStorage] gid is empty.");
1756 return -E_INVALID_ARGS;
1757 }
1758 return transactionHandle_->UpdateRecordFlag(tableName, recordConflict, gid);
1759 }
1760
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1761 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1762 OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1763 {
1764 TableSchema tableSchema;
1765 int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1766 if (errCode != E_OK) {
1767 LOGE("get table schema failed when fill log and asset. %d", errCode);
1768 return errCode;
1769 }
1770 errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1771 if (errCode != E_OK) {
1772 return errCode;
1773 }
1774 if (opType == OpType::INSERT) {
1775 errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.insData);
1776 } else if (opType == OpType::UPDATE) {
1777 errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.updData);
1778 }
1779 return errCode;
1780 }
1781
UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const CloudSyncBatch & updateData)1782 int RelationalSyncAbleStorage::UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle,
1783 const std::string &tableName, const CloudSyncBatch &updateData)
1784 {
1785 for (size_t i = 0; i < updateData.extend.size(); ++i) {
1786 const auto &record = updateData.extend[i];
1787 if (DBCommon::IsRecordError(record)) {
1788 continue;
1789 }
1790 const auto &rowId = updateData.rowid[i];
1791 int errCode = handle->UpdateRecordFlag(tableName, DBCommon::IsRecordIgnored(record), "", rowId);
1792 if (errCode != E_OK) {
1793 LOGE("[RDBStorage] Update record flag failed in index %zu", i);
1794 return errCode;
1795 }
1796 }
1797 return E_OK;
1798 }
1799
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery)1800 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery)
1801 {
1802 std::vector<TableSchema> tables;
1803 int errCode = GetCloudTableWithoutShared(tables);
1804 if (errCode != E_OK) {
1805 return errCode;
1806 }
1807 if (tables.empty()) {
1808 LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1809 return E_OK;
1810 }
1811 auto *handle = GetHandle(true, errCode);
1812 if (errCode != E_OK) {
1813 return errCode;
1814 }
1815 errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery);
1816 ReleaseHandle(handle);
1817 return errCode;
1818 }
1819
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1820 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1821 {
1822 const auto tableInfos = GetSchemaInfo().GetTables();
1823 for (const auto &[tableName, info] : tableInfos) {
1824 if (info.GetSharedTableMark()) {
1825 continue;
1826 }
1827 TableSchema schema;
1828 int errCode = GetCloudTableSchema(tableName, schema);
1829 if (errCode == -E_NOT_FOUND) {
1830 continue;
1831 }
1832 if (errCode != E_OK) {
1833 LOGW("[RDBStorage] Get cloud table failed %d", errCode);
1834 return errCode;
1835 }
1836 tables.push_back(schema);
1837 }
1838 return E_OK;
1839 }
1840
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery)1841 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1842 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery)
1843 {
1844 int errCode = E_OK;
1845 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1846 if (errCode != E_OK) {
1847 return errCode;
1848 }
1849 for (const auto &table : tables) {
1850 // check whether reference exist
1851 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1852 errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
1853 if (errCode != E_OK) {
1854 LOGW("[RelationalSyncAbleStorage] Get table reference failed, continue next! errCode = %d", errCode);
1855 errCode = E_OK;
1856 continue;
1857 }
1858 if (!tableReference.empty()) {
1859 LOGD("[RelationalSyncAbleStorage] current table exist reference property");
1860 continue;
1861 }
1862
1863 std::vector<VBucket> syncDataPk;
1864 errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk);
1865 if (errCode != E_OK) {
1866 LOGW("[GetWaitCompensatedSyncDataPk] Get wait compensated sync date failed, continue! errCode=%d", errCode);
1867 errCode = E_OK;
1868 continue;
1869 }
1870 if (syncDataPk.empty()) {
1871 // no data need to compensated sync
1872 continue;
1873 }
1874 QuerySyncObject syncObject;
1875 errCode = GetSyncQueryByPk(table.name, syncDataPk, syncObject);
1876 if (errCode != E_OK) {
1877 LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
1878 errCode = E_OK;
1879 continue;
1880 }
1881 syncQuery.push_back(syncObject);
1882 }
1883 if (errCode == E_OK) {
1884 errCode = handle->Commit();
1885 if (errCode != E_OK) {
1886 LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
1887 }
1888 } else {
1889 int ret = handle->Rollback();
1890 if (ret != E_OK) {
1891 LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
1892 }
1893 }
1894 return errCode;
1895 }
1896
GetSyncQueryByPk(const std::string & tableName,const std::vector<VBucket> & data,QuerySyncObject & querySyncObject)1897 int RelationalSyncAbleStorage::GetSyncQueryByPk(const std::string &tableName,
1898 const std::vector<VBucket> &data, QuerySyncObject &querySyncObject)
1899 {
1900 std::map<std::string, size_t> dataIndex;
1901 std::map<std::string, std::vector<Type>> syncPk;
1902 int ignoreCount = 0;
1903 for (const auto &oneRow : data) {
1904 if (oneRow.size() >= 2u) { // mean this data has more than 2 pk
1905 LOGW("[RDBStorageEngine] Not support compensated sync with composite primary key now");
1906 return -E_NOT_SUPPORT;
1907 }
1908 for (const auto &[col, value] : oneRow) {
1909 if (dataIndex.find(col) == dataIndex.end()) {
1910 dataIndex[col] = value.index();
1911 } else if (dataIndex[col] != value.index()) {
1912 ignoreCount++;
1913 continue;
1914 }
1915 syncPk[col].push_back(value);
1916 }
1917 }
1918 LOGD("[RDBStorageEngine] match %zu data for compensated sync, ignore %d", data.size(), ignoreCount);
1919 Query query = Query::Select().From(tableName);
1920 for (const auto &[col, pkList] : syncPk) {
1921 FillQueryInKeys(col, pkList, dataIndex[col], query);
1922 }
1923 auto objectList = QuerySyncObject::GetQuerySyncObject(query);
1924 if (objectList.size() != 1u) {
1925 return -E_INTERNAL_ERROR;
1926 }
1927 querySyncObject = objectList[0];
1928 return E_OK;
1929 }
1930
FillQueryInKeys(const std::string & col,const std::vector<Type> & data,size_t valueType,Query & query)1931 void RelationalSyncAbleStorage::FillQueryInKeys(const std::string &col, const std::vector<Type> &data, size_t valueType,
1932 Query &query)
1933 {
1934 switch (valueType) {
1935 case TYPE_INDEX<int64_t>: {
1936 std::vector<int64_t> pkList;
1937 for (const auto &pk : data) {
1938 pkList.push_back(std::get<int64_t>(pk));
1939 }
1940 query.In(col, pkList);
1941 break;
1942 }
1943 case TYPE_INDEX<std::string>: {
1944 std::vector<std::string> pkList;
1945 for (const auto &pk : data) {
1946 pkList.push_back(std::get<std::string>(pk));
1947 }
1948 query.In(col, pkList);
1949 break;
1950 }
1951 case TYPE_INDEX<double>: {
1952 std::vector<double> pkList;
1953 for (const auto &pk : data) {
1954 pkList.push_back(std::get<double>(pk));
1955 }
1956 query.In(col, pkList);
1957 break;
1958 }
1959 case TYPE_INDEX<bool>: {
1960 std::vector<bool> pkList;
1961 for (const auto &pk : data) {
1962 pkList.push_back(std::get<bool>(pk));
1963 }
1964 query.In(col, pkList);
1965 break;
1966 }
1967 default:
1968 break;
1969 }
1970 }
1971
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName)1972 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1973 const std::string &tableName)
1974 {
1975 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1976 if (trackerTable.IsEmpty()) {
1977 trackerTable.SetTableName(tableName);
1978 }
1979 return handle->CreateTempSyncTrigger(trackerTable);
1980 }
1981 }
1982 #endif