• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "data_compression.h"
21 #include "db_common.h"
22 #include "db_dfx_adapter.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "platform_specific.h"
25 #include "relational_remote_query_continue_token.h"
26 #include "res_finalizer.h"
27 #include "runtime_context.h"
28 
29 namespace DistributedDB {
30 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)31 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
32 {
33     static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
34     (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
35         std::string(CLOSE_CONN_TASK),
36         [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBType::DB_RELATION, properties); }
37     );
38 }
39 }
40 
41 #define CHECK_STORAGE_ENGINE do { \
42     if (storageEngine_ == nullptr) { \
43         return -E_INVALID_DB; \
44     } \
45 } while (0)
46 
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48     : storageEngine_(std::move(engine)),
49       isCachedOption_(false)
50 {}
51 
~RelationalSyncAbleStorage()52 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
53 {}
54 
55 // Get interface type of this relational db.
GetInterfaceType() const56 int RelationalSyncAbleStorage::GetInterfaceType() const
57 {
58     return SYNC_RELATION;
59 }
60 
61 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()62 void RelationalSyncAbleStorage::IncRefCount()
63 {
64     LOGD("RelationalSyncAbleStorage ref +1");
65     IncObjRef(this);
66 }
67 
68 // Drop the interface ref-count.
DecRefCount()69 void RelationalSyncAbleStorage::DecRefCount()
70 {
71     LOGD("RelationalSyncAbleStorage ref -1");
72     DecObjRef(this);
73 }
74 
75 // Get the identifier of this rdb.
GetIdentifier() const76 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
77 {
78     std::string identifier = storageEngine_->GetIdentifier();
79     return std::vector<uint8_t>(identifier.begin(), identifier.end());
80 }
81 
GetDualTupleIdentifier() const82 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
83 {
84     std::string identifier = storageEngine_->GetProperties().GetStringProp(
85         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
86     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
87     return identifierVect;
88 }
89 
90 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const91 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp &timestamp) const
92 {
93     int errCode = E_OK;
94     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
95     if (handle == nullptr) {
96         return;
97     }
98     timestamp = 0;
99     errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
100     if (errCode != E_OK) {
101         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
102         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
103     }
104     ReleaseHandle(handle);
105     return;
106 }
107 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const108 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
109 {
110     int errCode = E_OK;
111     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
112     if (handle == nullptr) {
113         return errCode;
114     }
115     timestamp = 0;
116     errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
117     if (errCode != E_OK) {
118         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
119         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
120     }
121     ReleaseHandle(handle);
122     return errCode;
123 }
124 
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const125 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
126     OperatePerm perm) const
127 {
128     if (storageEngine_ == nullptr) {
129         errCode = -E_INVALID_DB;
130         return nullptr;
131     }
132     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
133         storageEngine_->FindExecutor(isWrite, perm, errCode));
134     if (handle == nullptr) {
135         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
136     }
137     return handle;
138 }
139 
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const140 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
141 {
142     if (storageEngine_ == nullptr) {
143         return;
144     }
145     StorageExecutor *databaseHandle = handle;
146     storageEngine_->Recycle(databaseHandle);
147     std::function<void()> listener = nullptr;
148     {
149         std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
150         listener = heartBeatListener_;
151     }
152     if (listener) {
153         listener();
154     }
155 }
156 
157 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const158 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
159 {
160     CHECK_STORAGE_ENGINE;
161     if (key.size() > DBConstant::MAX_KEY_SIZE) {
162         return -E_INVALID_ARGS;
163     }
164 
165     int errCode = E_OK;
166     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
167     if (handle == nullptr) {
168         return errCode;
169     }
170     errCode = handle->GetKvData(key, value);
171     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
172         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
173     }
174     ReleaseHandle(handle);
175     return errCode;
176 }
177 
178 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)179 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
180 {
181     CHECK_STORAGE_ENGINE;
182     int errCode = E_OK;
183     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
184     if (handle == nullptr) {
185         return errCode;
186     }
187 
188     errCode = handle->PutKvData(key, value); // meta doesn't need time.
189     if (errCode != E_OK) {
190         LOGE("Put kv data err:%d", errCode);
191         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
192     }
193     ReleaseHandle(handle);
194     return errCode;
195 }
196 
197 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)198 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
199 {
200     for (const auto &key : keys) {
201         if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
202             return -E_INVALID_ARGS;
203         }
204     }
205     int errCode = E_OK;
206     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
207     if (handle == nullptr) {
208         return errCode;
209     }
210 
211     handle->StartTransaction(TransactType::IMMEDIATE);
212     errCode = handle->DeleteMetaData(keys);
213     if (errCode != E_OK) {
214         handle->Rollback();
215         LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
216         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
217     } else {
218         handle->Commit();
219     }
220     ReleaseHandle(handle);
221     return errCode;
222 }
223 
224 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const225 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
226 {
227     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
228         return -E_INVALID_ARGS;
229     }
230 
231     int errCode = E_OK;
232     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
233     if (handle == nullptr) {
234         return errCode;
235     }
236 
237     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
238     if (errCode != E_OK) {
239         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
240         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
241     }
242     ReleaseHandle(handle);
243     return errCode;
244 }
245 
246 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const247 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
248 {
249     CHECK_STORAGE_ENGINE;
250     int errCode = E_OK;
251     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
252     if (handle == nullptr) {
253         return errCode;
254     }
255 
256     errCode = handle->GetAllMetaKeys(keys);
257     if (errCode != E_OK) {
258         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
259     }
260     ReleaseHandle(handle);
261     return errCode;
262 }
263 
GetDbProperties() const264 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
265 {
266     return storageEngine_->GetProperties();
267 }
268 
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)269 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
270 {
271     int errCode = E_OK;
272     for (auto &item : dataItems) {
273         auto entry = new (std::nothrow) GenericSingleVerKvEntry();
274         if (entry == nullptr) {
275             errCode = -E_OUT_OF_MEMORY;
276             LOGE("GetKvEntries failed, errCode:%d", errCode);
277             SingleVerKvEntry::Release(entries);
278             break;
279         }
280         entry->SetEntryData(std::move(item));
281         entries.push_back(entry);
282     }
283     return errCode;
284 }
285 
GetDataItemSerialSize(const DataItem & item,size_t appendLen)286 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
287 {
288     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
289     // the size would not be very large.
290     static const size_t maxOrigDevLength = 40;
291     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
292     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
293                        Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
294     return dataSize;
295 }
296 
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)297 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
298     size_t appendLen)
299 {
300     bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
301     for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
302         blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
303         reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
304     }
305     return !reachThreshold;
306 }
307 
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)308 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
309     SQLiteSingleVerRelationalContinueToken *&token)
310 {
311     if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
312         delete token;
313         token = nullptr;
314         return;
315     }
316 
317     if (dataItems.empty()) {
318         errCode = -E_INTERNAL_ERROR;
319         LOGE("Get data unfinished but data items is empty.");
320         delete token;
321         token = nullptr;
322         return;
323     }
324     token->SetNextBeginTime(dataItems.back());
325 }
326 
327 /**
328  * Caller must ensure that parameter token is valid.
329  * If error happened, token will be deleted here.
330  */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const331 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
332     SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
333 {
334     if (storageEngine_ == nullptr) {
335         return -E_INVALID_DB;
336     }
337 
338     int errCode = E_OK;
339     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
340         OperatePerm::NORMAL_PERM, errCode));
341     if (handle == nullptr) {
342         goto ERROR;
343     }
344 
345     do {
346         errCode = handle->GetSyncDataByQuery(dataItems,
347             Parcel::GetAppendedLen(),
348             dataSizeInfo,
349             std::bind(&SQLiteSingleVerRelationalContinueToken::GetStatement, *token,
350                 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
351             storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
352         if (errCode == -E_FINISHED) {
353             token->FinishGetData();
354             errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
355         }
356     } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
357 
358 ERROR:
359     if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
360         dataItems.clear();
361     }
362     ProcessContinueTokenForQuerySync(dataItems, errCode, token);
363     ReleaseHandle(handle);
364     return errCode;
365 }
366 
367 // use kv struct data to sync
368 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const369 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
370     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
371     std::vector<SingleVerKvEntry *> &entries) const
372 {
373     if (!timeRange.IsValid()) {
374         return -E_INVALID_ARGS;
375     }
376     query.SetSchema(storageEngine_->GetSchema());
377     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
378     if (token == nullptr) {
379         LOGE("[SingleVerNStore] Allocate continue token failed.");
380         return -E_OUT_OF_MEMORY;
381     }
382 
383     continueStmtToken = static_cast<ContinueToken>(token);
384     return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
385 }
386 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const387 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
388     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
389 {
390     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
391     if (!token->CheckValid()) {
392         return -E_INVALID_ARGS;
393     }
394     RelationalSchemaObject schema = storageEngine_->GetSchema();
395     const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
396     std::vector<std::string> fieldNames;
397     for (const auto &fieldInfo : fieldInfos) {
398         fieldNames.push_back(fieldInfo.GetFieldName());
399     }
400     token->SetFieldNames(fieldNames);
401 
402     std::vector<DataItem> dataItems;
403     int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
404     if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
405         continueStmtToken = static_cast<ContinueToken>(token);
406         return errCode;
407     }
408 
409     int innerCode = GetKvEntriesByDataItems(entries, dataItems);
410     if (innerCode != E_OK) {
411         errCode = innerCode;
412         delete token;
413         token = nullptr;
414     }
415     continueStmtToken = static_cast<ContinueToken>(token);
416     return errCode;
417 }
418 
419 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)420 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
421 {
422     std::vector<DataItem> dataItems;
423     for (const auto &itemEntry : entries) {
424         GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
425         if (entry != nullptr) {
426             DataItem item;
427             item.origDev = entry->GetOrigDevice();
428             item.flag = entry->GetFlag();
429             item.timestamp = entry->GetTimestamp();
430             item.writeTimestamp = entry->GetWriteTimestamp();
431             entry->GetKey(item.key);
432             entry->GetValue(item.value);
433             entry->GetHashKey(item.hashKey);
434             dataItems.push_back(item);
435         }
436     }
437     return dataItems;
438 }
439 }
440 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)441 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
442     const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
443 {
444     std::vector<DataItem> dataItems = ConvertEntries(entries);
445     return PutSyncData(object, dataItems, deviceName);
446 }
447 
448 namespace {
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)449 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
450 {
451     return engine->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
452         DistributedTableMode::SPLIT_BY_DEVICE) == DistributedTableMode::COLLABORATION;
453 }
454 }
455 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)456 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
457     const std::string &deviceName)
458 {
459     int errCode = E_OK;
460     LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
461     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
462     if (handle == nullptr) {
463         return errCode;
464     }
465     QueryObject query = object;
466     query.SetSchema(storageEngine_->GetSchema());
467 
468     TableInfo table = storageEngine_->GetSchema().GetTable(object.GetTableName());
469     if (!IsCollaborationMode(storageEngine_)) {
470         // Set table name for SPLIT_BY_DEVICE mode
471         table.SetTableName(DBCommon::GetDistributedTableName(deviceName, object.GetTableName()));
472     }
473     DBDfxAdapter::StartTraceSQL();
474     errCode = handle->SaveSyncItems(query, dataItems, deviceName, table);
475     DBDfxAdapter::FinishTraceSQL();
476     if (errCode == E_OK) {
477         // dataItems size > 0 now because already check before
478         // all dataItems will write into db now, so need to observer notify here
479         // if some dataItems will not write into db in the future, observer notify here need change
480         TriggerObserverAction(deviceName);
481     }
482 
483     ReleaseHandle(handle);
484     return errCode;
485 }
486 
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)487 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
488     const std::string &deviceName)
489 {
490     if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
491         LOGW("Device length is invalid for sync put");
492         return -E_INVALID_ARGS;
493     }
494 
495     int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
496     if (errCode != E_OK) {
497         LOGE("[Relational] PutSyncData errCode:%d", errCode);
498         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
499     }
500     return errCode;
501 }
502 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)503 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
504 {
505     (void) deviceName;
506     (void) isNeedNotify;
507     return -E_NOT_SUPPORT;
508 }
509 
GetSchemaInfo() const510 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
511 {
512     return storageEngine_->GetSchema();
513 }
514 
GetSecurityOption(SecurityOption & option) const515 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
516 {
517     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
518     if (isCachedOption_) {
519         option = securityOption_;
520         return E_OK;
521     }
522     std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
523     int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
524     if (errCode == E_OK) {
525         option = securityOption_;
526         isCachedOption_ = true;
527     }
528     return errCode;
529 }
530 
NotifyRemotePushFinished(const std::string & deviceId) const531 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
532 {
533     return;
534 }
535 
536 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const537 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
538 {
539     return OS::GetCurrentSysTimeInMicrosecond(outTime);
540 }
541 
542 // Get batch meta data associated with the given key.
GetBatchMetaData(const std::vector<Key> & keys,std::vector<Entry> & entries) const543 int RelationalSyncAbleStorage::GetBatchMetaData(const std::vector<Key> &keys, std::vector<Entry> &entries) const
544 {
545     return -E_NOT_SUPPORT;
546 }
547 
548 // Put batch meta data as a key-value entry vector
PutBatchMetaData(std::vector<Entry> & entries)549 int RelationalSyncAbleStorage::PutBatchMetaData(std::vector<Entry> &entries)
550 {
551     return -E_NOT_SUPPORT;
552 }
553 
GetTablesQuery()554 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
555 {
556     return {};
557 }
558 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)559 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
560 {
561     (void) queryObj;
562     return -E_NOT_SUPPORT;
563 }
564 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)565 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
566     const RelationalSyncStrategy &syncStrategy)
567 {
568     auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
569         DistributedTableMode::SPLIT_BY_DEVICE);
570     if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
571         LOGD("No need create device table in COLLABORATION mode.");
572         return E_OK;
573     }
574 
575     int errCode = E_OK;
576     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
577     if (handle == nullptr) {
578         return errCode;
579     }
580 
581     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
582     if (errCode != E_OK) {
583         LOGE("Start transaction failed:%d", errCode);
584         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
585         ReleaseHandle(handle);
586         return errCode;
587     }
588 
589     for (const auto &[table, strategy] : syncStrategy) {
590         if (!strategy.permitSync) {
591             continue;
592         }
593 
594         errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table));
595         if (errCode != E_OK) {
596             LOGE("Create distributed device table failed. %d", errCode);
597             break;
598         }
599     }
600 
601     if (errCode == E_OK) {
602         errCode = handle->Commit();
603     } else {
604         (void)handle->Rollback();
605     }
606 
607     ReleaseHandle(handle);
608     return errCode;
609 }
610 
RegisterSchemaChangedCallback(const std::function<void ()> & callback)611 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
612 {
613     std::lock_guard lock(onSchemaChangedMutex_);
614     onSchemaChanged_ = callback;
615     return E_OK;
616 }
617 
NotifySchemaChanged()618 void RelationalSyncAbleStorage::NotifySchemaChanged()
619 {
620     std::lock_guard lock(onSchemaChangedMutex_);
621     if (onSchemaChanged_) {
622         LOGD("Notify relational schema was changed");
623         onSchemaChanged_();
624     }
625 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const626 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
627 {
628     algorithmSet.clear();
629     DataCompression::GetCompressionAlgo(algorithmSet);
630     return E_OK;
631 }
632 
RegisterObserverAction(const RelationalObserverAction & action)633 void RelationalSyncAbleStorage::RegisterObserverAction(const RelationalObserverAction &action)
634 {
635     std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
636     dataChangeDeviceCallback_ = action;
637 }
638 
TriggerObserverAction(const std::string & deviceName)639 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName)
640 {
641     {
642         std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
643         if (!dataChangeDeviceCallback_) {
644             return;
645         }
646     }
647     IncObjRef(this);
648     int taskErrCode = RuntimeContext::GetInstance()->ScheduleTask([this, deviceName] {
649         std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
650         if (dataChangeDeviceCallback_) {
651             dataChangeDeviceCallback_(deviceName);
652         }
653         DecObjRef(this);
654     });
655     if (taskErrCode != E_OK) {
656         LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
657         DecObjRef(this);
658     }
659 }
660 
RegisterHeartBeatListener(const std::function<void ()> & listener)661 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
662 {
663     std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
664     heartBeatListener_ = listener;
665 }
666 
CheckAndInitQueryCondition(QueryObject & query) const667 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
668 {
669     RelationalSchemaObject schema = storageEngine_->GetSchema();
670     TableInfo table = schema.GetTable(query.GetTableName());
671     if (table.GetTableName() != query.GetTableName()) {
672         LOGE("Query table is not a distributed table.");
673         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
674     }
675     query.SetSchema(schema);
676 
677     int errCode = E_OK;
678     auto *handle = GetHandle(false, errCode);
679     if (handle == nullptr) {
680         return errCode;
681     }
682 
683     errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
684     if (errCode != E_OK) {
685         LOGE("Check relational query condition failed. %d", errCode);
686         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
687     }
688 
689     ReleaseHandle(handle);
690     return errCode;
691 }
692 
CheckCompatible(const std::string & schema,uint8_t type) const693 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
694 {
695     // return true if is relational schema.
696     return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
697 }
698 
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const699 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
700     std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
701 {
702     if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
703         return -E_NOT_SUPPORT;
704     }
705     if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
706         LOGE("[ExecuteQuery] invalid args");
707         return -E_INVALID_ARGS;
708     }
709     int errCode = E_OK;
710     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
711     if (handle == nullptr) {
712         LOGE("[ExecuteQuery] get handle fail:%d", errCode);
713         return errCode;
714     }
715     errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
716     if (errCode != E_OK) {
717         LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
718     }
719     ReleaseHandle(handle);
720     return errCode;
721 }
722 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const723 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
724     RelationalRowDataSet &dataSet, ContinueToken &token) const
725 {
726     dataSet.Clear();
727     if (token == nullptr) {
728         // start query
729         std::vector<std::string> colNames;
730         std::vector<RelationalRowData *> data;
731         ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
732 
733         int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
734         if (errCode != E_OK) {
735             return errCode;
736         }
737 
738         // create one token
739         token = static_cast<ContinueToken>(
740             new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
741         if (token == nullptr) {
742             LOGE("ExecuteQuery OOM");
743             return -E_OUT_OF_MEMORY;
744         }
745     }
746 
747     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
748     if (!remoteToken->CheckValid()) {
749         LOGE("ExecuteQuery invalid token");
750         return -E_INVALID_ARGS;
751     }
752 
753     int errCode = remoteToken->GetData(packetSize, dataSet);
754     if (errCode == -E_UNFINISHED) {
755         errCode = E_OK;
756     } else {
757         if (errCode != E_OK) {
758             dataSet.Clear();
759         }
760         delete remoteToken;
761         remoteToken = nullptr;
762         token = nullptr;
763     }
764     LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
765     return errCode;
766 }
767 
GetRelationalDbProperties() const768 const RelationalDBProperties &RelationalSyncAbleStorage::GetRelationalDbProperties() const
769 {
770     return storageEngine_->GetProperties();
771 }
772 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const773 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
774 {
775     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
776     delete remoteToken;
777     remoteToken = nullptr;
778     token = nullptr;
779 }
780 }
781 #endif