• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "data_compression.h"
21 #include "db_common.h"
22 #include "db_dfx_adapter.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "platform_specific.h"
25 #include "relational_remote_query_continue_token.h"
26 #include "res_finalizer.h"
27 #include "runtime_context.h"
28 
29 namespace DistributedDB {
30 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)31 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
32 {
33     static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
34     (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
35         std::string(CLOSE_CONN_TASK),
36         [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBType::DB_RELATION, properties); }
37     );
38 }
39 }
40 
41 #define CHECK_STORAGE_ENGINE do { \
42     if (storageEngine_ == nullptr) { \
43         return -E_INVALID_DB; \
44     } \
45 } while (0)
46 
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48     : storageEngine_(std::move(engine)),
49       isCachedOption_(false)
50 {}
51 
~RelationalSyncAbleStorage()52 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
53 {}
54 
55 // Get interface type of this relational db.
GetInterfaceType() const56 int RelationalSyncAbleStorage::GetInterfaceType() const
57 {
58     return SYNC_RELATION;
59 }
60 
61 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()62 void RelationalSyncAbleStorage::IncRefCount()
63 {
64     LOGD("RelationalSyncAbleStorage ref +1");
65     IncObjRef(this);
66 }
67 
68 // Drop the interface ref-count.
DecRefCount()69 void RelationalSyncAbleStorage::DecRefCount()
70 {
71     LOGD("RelationalSyncAbleStorage ref -1");
72     DecObjRef(this);
73 }
74 
75 // Get the identifier of this rdb.
GetIdentifier() const76 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
77 {
78     std::string identifier = storageEngine_->GetIdentifier();
79     return std::vector<uint8_t>(identifier.begin(), identifier.end());
80 }
81 
GetDualTupleIdentifier() const82 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
83 {
84     std::string identifier = storageEngine_->GetProperties().GetStringProp(
85         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
86     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
87     return identifierVect;
88 }
89 
90 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const91 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp &timestamp) const
92 {
93     int errCode = E_OK;
94     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
95     if (handle == nullptr) {
96         return;
97     }
98     timestamp = 0;
99     errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
100     if (errCode != E_OK) {
101         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
102         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
103     }
104     ReleaseHandle(handle);
105     return;
106 }
107 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const108 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
109 {
110     int errCode = E_OK;
111     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
112     if (handle == nullptr) {
113         return errCode;
114     }
115     timestamp = 0;
116     errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
117     if (errCode != E_OK) {
118         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
119         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
120     }
121     ReleaseHandle(handle);
122     return errCode;
123 }
124 
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const125 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
126     OperatePerm perm) const
127 {
128     if (storageEngine_ == nullptr) {
129         errCode = -E_INVALID_DB;
130         return nullptr;
131     }
132     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
133         storageEngine_->FindExecutor(isWrite, perm, errCode));
134     if (handle == nullptr) {
135         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
136     }
137     return handle;
138 }
139 
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const140 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
141 {
142     if (storageEngine_ == nullptr) {
143         return;
144     }
145     StorageExecutor *databaseHandle = handle;
146     storageEngine_->Recycle(databaseHandle);
147     std::function<void()> listener = nullptr;
148     {
149         std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
150         listener = heartBeatListener_;
151     }
152     if (listener) {
153         listener();
154     }
155 }
156 
157 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const158 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
159 {
160     CHECK_STORAGE_ENGINE;
161     if (key.size() > DBConstant::MAX_KEY_SIZE) {
162         return -E_INVALID_ARGS;
163     }
164 
165     int errCode = E_OK;
166     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
167     if (handle == nullptr) {
168         return errCode;
169     }
170     errCode = handle->GetKvData(key, value);
171     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
172         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
173     }
174     ReleaseHandle(handle);
175     return errCode;
176 }
177 
178 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)179 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
180 {
181     CHECK_STORAGE_ENGINE;
182     int errCode = E_OK;
183     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
184     if (handle == nullptr) {
185         return errCode;
186     }
187 
188     errCode = handle->PutKvData(key, value); // meta doesn't need time.
189     if (errCode != E_OK) {
190         LOGE("Put kv data err:%d", errCode);
191         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
192     }
193     ReleaseHandle(handle);
194     return errCode;
195 }
196 
197 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)198 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
199 {
200     for (const auto &key : keys) {
201         if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
202             return -E_INVALID_ARGS;
203         }
204     }
205     int errCode = E_OK;
206     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
207     if (handle == nullptr) {
208         return errCode;
209     }
210 
211     handle->StartTransaction(TransactType::IMMEDIATE);
212     errCode = handle->DeleteMetaData(keys);
213     if (errCode != E_OK) {
214         handle->Rollback();
215         LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
216         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
217     } else {
218         handle->Commit();
219     }
220     ReleaseHandle(handle);
221     return errCode;
222 }
223 
224 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const225 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
226 {
227     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
228         return -E_INVALID_ARGS;
229     }
230 
231     int errCode = E_OK;
232     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
233     if (handle == nullptr) {
234         return errCode;
235     }
236 
237     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
238     if (errCode != E_OK) {
239         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
240         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
241     }
242     ReleaseHandle(handle);
243     return errCode;
244 }
245 
246 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const247 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
248 {
249     CHECK_STORAGE_ENGINE;
250     int errCode = E_OK;
251     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
252     if (handle == nullptr) {
253         return errCode;
254     }
255 
256     errCode = handle->GetAllMetaKeys(keys);
257     if (errCode != E_OK) {
258         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
259     }
260     ReleaseHandle(handle);
261     return errCode;
262 }
263 
GetDbProperties() const264 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
265 {
266     return storageEngine_->GetProperties();
267 }
268 
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)269 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
270 {
271     int errCode = E_OK;
272     for (auto &item : dataItems) {
273         auto entry = new (std::nothrow) GenericSingleVerKvEntry();
274         if (entry == nullptr) {
275             errCode = -E_OUT_OF_MEMORY;
276             LOGE("GetKvEntries failed, errCode:%d", errCode);
277             SingleVerKvEntry::Release(entries);
278             break;
279         }
280         entry->SetEntryData(std::move(item));
281         entries.push_back(entry);
282     }
283     return errCode;
284 }
285 
GetDataItemSerialSize(const DataItem & item,size_t appendLen)286 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
287 {
288     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
289     // the size would not be very large.
290     static const size_t maxOrigDevLength = 40;
291     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
292     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
293                        Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
294     return dataSize;
295 }
296 
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)297 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
298     size_t appendLen)
299 {
300     bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
301     for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
302         blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
303         reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
304     }
305     return !reachThreshold;
306 }
307 
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)308 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
309     SQLiteSingleVerRelationalContinueToken *&token)
310 {
311     if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
312         delete token;
313         token = nullptr;
314         return;
315     }
316 
317     if (dataItems.empty()) {
318         errCode = -E_INTERNAL_ERROR;
319         LOGE("Get data unfinished but data items is empty.");
320         delete token;
321         token = nullptr;
322         return;
323     }
324     token->SetNextBeginTime(dataItems.back());
325 }
326 
327 /**
328  * Caller must ensure that parameter token is valid.
329  * If error happened, token will be deleted here.
330  */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const331 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
332     SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
333 {
334     if (storageEngine_ == nullptr) {
335         return -E_INVALID_DB;
336     }
337 
338     int errCode = E_OK;
339     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
340         OperatePerm::NORMAL_PERM, errCode));
341     if (handle == nullptr) {
342         goto ERROR;
343     }
344 
345     do {
346         errCode = handle->GetSyncDataByQuery(dataItems,
347             Parcel::GetAppendedLen(),
348             dataSizeInfo,
349             std::bind(&SQLiteSingleVerRelationalContinueToken::GetStatement, *token,
350                 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
351             storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
352         if (errCode == -E_FINISHED) {
353             token->FinishGetData();
354             errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
355         }
356     } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
357 
358 ERROR:
359     if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
360         dataItems.clear();
361     }
362     ProcessContinueTokenForQuerySync(dataItems, errCode, token);
363     ReleaseHandle(handle);
364     return errCode;
365 }
366 
367 // use kv struct data to sync
368 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const369 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
370     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
371     std::vector<SingleVerKvEntry *> &entries) const
372 {
373     if (!timeRange.IsValid()) {
374         return -E_INVALID_ARGS;
375     }
376     query.SetSchema(storageEngine_->GetSchema());
377     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
378     if (token == nullptr) {
379         LOGE("[SingleVerNStore] Allocate continue token failed.");
380         return -E_OUT_OF_MEMORY;
381     }
382 
383     continueStmtToken = static_cast<ContinueToken>(token);
384     return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
385 }
386 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const387 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
388     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
389 {
390     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
391     if (!token->CheckValid()) {
392         return -E_INVALID_ARGS;
393     }
394     RelationalSchemaObject schema = storageEngine_->GetSchema();
395     const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
396     std::vector<std::string> fieldNames;
397     for (const auto &fieldInfo : fieldInfos) {
398         fieldNames.push_back(fieldInfo.GetFieldName());
399     }
400     token->SetFieldNames(fieldNames);
401 
402     std::vector<DataItem> dataItems;
403     int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
404     if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
405         continueStmtToken = static_cast<ContinueToken>(token);
406         return errCode;
407     }
408 
409     int innerCode = GetKvEntriesByDataItems(entries, dataItems);
410     if (innerCode != E_OK) {
411         errCode = innerCode;
412         delete token;
413         token = nullptr;
414     }
415     continueStmtToken = static_cast<ContinueToken>(token);
416     return errCode;
417 }
418 
419 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)420 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
421 {
422     std::vector<DataItem> dataItems;
423     for (const auto &itemEntry : entries) {
424         GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
425         if (entry != nullptr) {
426             DataItem item;
427             item.origDev = entry->GetOrigDevice();
428             item.flag = entry->GetFlag();
429             item.timestamp = entry->GetTimestamp();
430             item.writeTimestamp = entry->GetWriteTimestamp();
431             entry->GetKey(item.key);
432             entry->GetValue(item.value);
433             entry->GetHashKey(item.hashKey);
434             dataItems.push_back(item);
435         }
436     }
437     return dataItems;
438 }
439 }
440 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)441 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
442     const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
443 {
444     std::vector<DataItem> dataItems = ConvertEntries(entries);
445     return PutSyncData(object, dataItems, deviceName);
446 }
447 
448 namespace {
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)449 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
450 {
451     return engine->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
452         DistributedTableMode::SPLIT_BY_DEVICE) == DistributedTableMode::COLLABORATION;
453 }
454 }
455 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)456 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
457     const std::string &deviceName)
458 {
459     int errCode = E_OK;
460     LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
461     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
462     if (handle == nullptr) {
463         return errCode;
464     }
465     QueryObject query = object;
466     query.SetSchema(storageEngine_->GetSchema());
467 
468     TableInfo table = storageEngine_->GetSchema().GetTable(object.GetTableName());
469     StoreInfo info = {
470         storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
471         storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
472         storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
473     };
474     if (!IsCollaborationMode(storageEngine_)) {
475         // Set table name for SPLIT_BY_DEVICE mode
476         table.SetTableName(DBCommon::GetDistributedTableName(deviceName, object.GetTableName(), info));
477     }
478     DBDfxAdapter::StartTraceSQL();
479     errCode = handle->SaveSyncItems(query, dataItems, deviceName, table);
480     DBDfxAdapter::FinishTraceSQL();
481     if (errCode == E_OK) {
482         // dataItems size > 0 now because already check before
483         // all dataItems will write into db now, so need to observer notify here
484         // if some dataItems will not write into db in the future, observer notify here need change
485         TriggerObserverAction(deviceName);
486     }
487 
488     ReleaseHandle(handle);
489     return errCode;
490 }
491 
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)492 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
493     const std::string &deviceName)
494 {
495     if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
496         LOGW("Device length is invalid for sync put");
497         return -E_INVALID_ARGS;
498     }
499 
500     int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
501     if (errCode != E_OK) {
502         LOGE("[Relational] PutSyncData errCode:%d", errCode);
503         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
504     }
505     return errCode;
506 }
507 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)508 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
509 {
510     (void) deviceName;
511     (void) isNeedNotify;
512     return -E_NOT_SUPPORT;
513 }
514 
GetSchemaInfo() const515 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
516 {
517     return storageEngine_->GetSchema();
518 }
519 
GetSecurityOption(SecurityOption & option) const520 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
521 {
522     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
523     if (isCachedOption_) {
524         option = securityOption_;
525         return E_OK;
526     }
527     std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
528     int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
529     if (errCode == E_OK) {
530         option = securityOption_;
531         isCachedOption_ = true;
532     }
533     return errCode;
534 }
535 
NotifyRemotePushFinished(const std::string & deviceId) const536 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
537 {
538     return;
539 }
540 
541 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const542 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
543 {
544     return OS::GetCurrentSysTimeInMicrosecond(outTime);
545 }
546 
547 // Get batch meta data associated with the given key.
GetBatchMetaData(const std::vector<Key> & keys,std::vector<Entry> & entries) const548 int RelationalSyncAbleStorage::GetBatchMetaData(const std::vector<Key> &keys, std::vector<Entry> &entries) const
549 {
550     return -E_NOT_SUPPORT;
551 }
552 
553 // Put batch meta data as a key-value entry vector
PutBatchMetaData(std::vector<Entry> & entries)554 int RelationalSyncAbleStorage::PutBatchMetaData(std::vector<Entry> &entries)
555 {
556     return -E_NOT_SUPPORT;
557 }
558 
GetTablesQuery()559 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
560 {
561     return {};
562 }
563 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)564 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
565 {
566     (void) queryObj;
567     return -E_NOT_SUPPORT;
568 }
569 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)570 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
571     const RelationalSyncStrategy &syncStrategy)
572 {
573     auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
574         DistributedTableMode::SPLIT_BY_DEVICE);
575     if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
576         LOGD("No need create device table in COLLABORATION mode.");
577         return E_OK;
578     }
579 
580     int errCode = E_OK;
581     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
582     if (handle == nullptr) {
583         return errCode;
584     }
585 
586     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
587     if (errCode != E_OK) {
588         LOGE("Start transaction failed:%d", errCode);
589         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
590         ReleaseHandle(handle);
591         return errCode;
592     }
593 
594     StoreInfo info = {
595         storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
596         storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
597         storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
598     };
599     for (const auto &[table, strategy] : syncStrategy) {
600         if (!strategy.permitSync) {
601             continue;
602         }
603 
604         errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
605         if (errCode != E_OK) {
606             LOGE("Create distributed device table failed. %d", errCode);
607             break;
608         }
609     }
610 
611     if (errCode == E_OK) {
612         errCode = handle->Commit();
613     } else {
614         (void)handle->Rollback();
615     }
616 
617     ReleaseHandle(handle);
618     return errCode;
619 }
620 
RegisterSchemaChangedCallback(const std::function<void ()> & callback)621 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
622 {
623     std::lock_guard lock(onSchemaChangedMutex_);
624     onSchemaChanged_ = callback;
625     return E_OK;
626 }
627 
NotifySchemaChanged()628 void RelationalSyncAbleStorage::NotifySchemaChanged()
629 {
630     std::lock_guard lock(onSchemaChangedMutex_);
631     if (onSchemaChanged_) {
632         LOGD("Notify relational schema was changed");
633         onSchemaChanged_();
634     }
635 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const636 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
637 {
638     algorithmSet.clear();
639     DataCompression::GetCompressionAlgo(algorithmSet);
640     return E_OK;
641 }
642 
RegisterObserverAction(const RelationalObserverAction & action)643 void RelationalSyncAbleStorage::RegisterObserverAction(const RelationalObserverAction &action)
644 {
645     std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
646     dataChangeDeviceCallback_ = action;
647 }
648 
TriggerObserverAction(const std::string & deviceName)649 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName)
650 {
651     {
652         std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
653         if (!dataChangeDeviceCallback_) {
654             return;
655         }
656     }
657     IncObjRef(this);
658     int taskErrCode = RuntimeContext::GetInstance()->ScheduleTask([this, deviceName] {
659         std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
660         if (dataChangeDeviceCallback_) {
661             dataChangeDeviceCallback_(deviceName);
662         }
663         DecObjRef(this);
664     });
665     if (taskErrCode != E_OK) {
666         LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
667         DecObjRef(this);
668     }
669 }
670 
RegisterHeartBeatListener(const std::function<void ()> & listener)671 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
672 {
673     std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
674     heartBeatListener_ = listener;
675 }
676 
CheckAndInitQueryCondition(QueryObject & query) const677 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
678 {
679     RelationalSchemaObject schema = storageEngine_->GetSchema();
680     TableInfo table = schema.GetTable(query.GetTableName());
681     if (table.GetTableName() != query.GetTableName()) {
682         LOGE("Query table is not a distributed table.");
683         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
684     }
685     query.SetSchema(schema);
686 
687     int errCode = E_OK;
688     auto *handle = GetHandle(false, errCode);
689     if (handle == nullptr) {
690         return errCode;
691     }
692 
693     errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
694     if (errCode != E_OK) {
695         LOGE("Check relational query condition failed. %d", errCode);
696         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
697     }
698 
699     ReleaseHandle(handle);
700     return errCode;
701 }
702 
CheckCompatible(const std::string & schema,uint8_t type) const703 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
704 {
705     // return true if is relational schema.
706     return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
707 }
708 
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const709 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
710     std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
711 {
712     if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
713         return -E_NOT_SUPPORT;
714     }
715     if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
716         LOGE("[ExecuteQuery] invalid args");
717         return -E_INVALID_ARGS;
718     }
719     int errCode = E_OK;
720     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
721     if (handle == nullptr) {
722         LOGE("[ExecuteQuery] get handle fail:%d", errCode);
723         return errCode;
724     }
725     errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
726     if (errCode != E_OK) {
727         LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
728     }
729     ReleaseHandle(handle);
730     return errCode;
731 }
732 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const733 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
734     RelationalRowDataSet &dataSet, ContinueToken &token) const
735 {
736     dataSet.Clear();
737     if (token == nullptr) {
738         // start query
739         std::vector<std::string> colNames;
740         std::vector<RelationalRowData *> data;
741         ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
742 
743         int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
744         if (errCode != E_OK) {
745             return errCode;
746         }
747 
748         // create one token
749         token = static_cast<ContinueToken>(
750             new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
751         if (token == nullptr) {
752             LOGE("ExecuteQuery OOM");
753             return -E_OUT_OF_MEMORY;
754         }
755     }
756 
757     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
758     if (!remoteToken->CheckValid()) {
759         LOGE("ExecuteQuery invalid token");
760         return -E_INVALID_ARGS;
761     }
762 
763     int errCode = remoteToken->GetData(packetSize, dataSet);
764     if (errCode == -E_UNFINISHED) {
765         errCode = E_OK;
766     } else {
767         if (errCode != E_OK) {
768             dataSet.Clear();
769         }
770         delete remoteToken;
771         remoteToken = nullptr;
772         token = nullptr;
773     }
774     LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
775     return errCode;
776 }
777 
GetRelationalDbProperties() const778 const RelationalDBProperties &RelationalSyncAbleStorage::GetRelationalDbProperties() const
779 {
780     return storageEngine_->GetProperties();
781 }
782 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const783 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
784 {
785     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
786     delete remoteToken;
787     remoteToken = nullptr;
788     token = nullptr;
789 }
790 }
791 #endif