• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H
16 #define RELATIONAL_SYNC_ABLE_STORAGE_H
17 #ifdef RELATIONAL_STORE
18 
19 #include "lru_map.h"
20 #include "relational_db_sync_interface.h"
21 #include "relationaldb_properties.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_relational_storage_engine.h"
24 #include "sqlite_single_ver_relational_continue_token.h"
25 
26 namespace DistributedDB {
27 using RelationalObserverAction = std::function<void(const std::string &device)>;
28 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public virtual RefObject {
29 public:
30     explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine);
31     ~RelationalSyncAbleStorage() override;
32 
33     // Get interface type of this kvdb.
34     int GetInterfaceType() const override;
35 
36     // Get the interface ref-count, in order to access asynchronously.
37     void IncRefCount() override;
38 
39     // Drop the interface ref-count.
40     void DecRefCount() override;
41 
42     // Get the identifier of this rdb.
43     std::vector<uint8_t> GetIdentifier() const override;
44 
45     // Get the dual tuple identifier of this rdb.
46     std::vector<uint8_t> GetDualTupleIdentifier() const override;
47 
48     // Get the max timestamp of all entries in database.
49     void GetMaxTimestamp(Timestamp &stamp) const override;
50 
51     // Get the max timestamp of one table.
52     int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override;
53 
54     // Get meta data associated with the given key.
55     int GetMetaData(const Key &key, Value &value) const override;
56 
57     // Put meta data as a key-value entry.
58     int PutMetaData(const Key &key, const Value &value) override;
59 
60     // Delete multiple meta data records in a transaction.
61     int DeleteMetaData(const std::vector<Key> &keys) override;
62 
63     // Delete multiple meta data records with key prefix in a transaction.
64     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
65 
66     // Get all meta data keys.
67     int GetAllMetaKeys(std::vector<Key> &keys) const override;
68 
69     const RelationalDBProperties &GetDbProperties() const override;
70 
71     // Get the data which would be synced with query condition
72     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
73         const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
74         std::vector<SingleVerKvEntry *> &entries) const override;
75 
76     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
77         const DataSizeSpecInfo &dataSizeInfo) const override;
78 
79     int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries,
80         const DeviceID &deviceName) override;
81 
82     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
83 
84     RelationalSchemaObject GetSchemaInfo() const override;
85 
86     int GetSecurityOption(SecurityOption &option) const override;
87 
88     void NotifyRemotePushFinished(const std::string &deviceId) const override;
89 
90     // Get the timestamp when database created or imported
91     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
92 
93     // Get batch meta data associated with the given key.
94     int GetBatchMetaData(const std::vector<Key> &keys, std::vector<Entry> &entries) const override;
95     // Put batch meta data as a key-value entry vector
96     int PutBatchMetaData(std::vector<Entry> &entries) override;
97 
98     std::vector<QuerySyncObject> GetTablesQuery() override;
99 
100     int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override;
101 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID)102     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
103         const std::string &targetID) const override
104     {
105         return E_OK;
106     }
107 
108     int CheckAndInitQueryCondition(QueryObject &query) const override;
109     void RegisterObserverAction(const RelationalObserverAction &action);
110     void TriggerObserverAction(const std::string &deviceName);
111 
112     int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override;
113 
114     int RegisterSchemaChangedCallback(const std::function<void()> &callback) override;
115 
116     void NotifySchemaChanged();
117 
118     void RegisterHeartBeatListener(const std::function<void()> &listener);
119 
120     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
121 
122     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
123 
124     int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data,
125         ContinueToken &token) const override;
126 
127     const RelationalDBProperties &GetRelationalDbProperties() const override;
128 
129     void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override;
130 private:
131     SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode,
132         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
133     void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const;
134 
135     // get
136     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token,
137         const DataSizeSpecInfo &dataSizeInfo) const;
138     int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
139         std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const;
140 
141     // put
142     int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
143     int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
144 
145     // data
146     std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr;
147     std::function<void()> onSchemaChanged_;
148     mutable std::mutex onSchemaChangedMutex_;
149     std::mutex dataChangeDeviceMutex_;
150     RelationalObserverAction dataChangeDeviceCallback_;
151     std::function<void()> heartBeatListener_;
152     mutable std::mutex heartBeatMutex_;
153 
154     // cache securityOption
155     mutable std::mutex securityOptionMutex_;
156     mutable SecurityOption securityOption_;
157     mutable bool isCachedOption_;
158 };
159 }  // namespace DistributedDB
160 #endif
161 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H