• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_OPERATION_H
17 #define SYNC_OPERATION_H
18 
19 #include <functional>
20 #include <map>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
25 #include "ikvdb_sync_interface.h"
26 #include "notification_chain.h"
27 #include "query_sync_object.h"
28 #include "ref_object.h"
29 #include "runtime_context.h"
30 #include "semaphore_utils.h"
31 #include "sync_types.h"
32 
33 namespace DistributedDB {
34 class SyncOperation : public RefObject {
35 public:
36     enum Status {
37         OP_WAITING = 0,
38         OP_SYNCING,
39         OP_SEND_FINISHED,
40         OP_RECV_FINISHED,
41         OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status.
42         OP_FAILED,
43         OP_TIMEOUT,
44         OP_PERMISSION_CHECK_FAILED,
45         OP_COMM_ABNORMAL,
46         OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local
47         OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error
48         OP_BUSY_FAILURE,
49         OP_SCHEMA_INCOMPATIBLE,
50         OP_QUERY_FORMAT_FAILURE,
51         OP_QUERY_FIELD_FAILURE,
52         OP_NOT_SUPPORT,
53         OP_INTERCEPT_DATA_FAIL,
54         OP_MAX_LIMITS,
55         OP_SCHEMA_CHANGED,
56         OP_INVALID_ARGS,
57         OP_USER_CHANGED,
58         OP_DENIED_SQL,
59         OP_NOTADB_OR_CORRUPTED,
60     };
61 
62     using UserCallback = std::function<void(std::map<std::string, int>)>;
63     using OnSyncFinished = std::function<void(int)>;
64     using OnSyncFinalize = std::function<void(void)>;
65 
66     SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
67         const UserCallback &userCallback, bool isBlockSync);
68 
69     DISABLE_COPY_ASSIGN_MOVE(SyncOperation);
70 
71     // Init the status for callback
72     int Initialize();
73 
74     // Set the OnSyncFinalize callback
75     void SetOnSyncFinalize(const OnSyncFinalize &callback);
76 
77     // Set the OnSyncFinished callback, it will be called either success or failed.
78     void SetOnSyncFinished(const OnSyncFinished &callback);
79 
80     // Set the sync status, running or finished
81     void SetStatus(const std::string &deviceId, int status);
82 
83     // Set the unfinished devices sync status, running or finished
84     void SetUnfinishedDevStatus(int status);
85 
86     // Set the identifier, used in SyncOperation::Finished
87     void SetIdentifier(const std::vector<uint8_t> &identifier);
88 
89     // Get the sync status, running or finished
90     int GetStatus(const std::string &deviceId) const;
91 
92     // Get the sync id.
93     uint32_t GetSyncId() const;
94 
95     // Get the sync mode
96     int GetMode() const;
97 
98     // Used to call the onFinished and caller's on complete
99     void Finished();
100 
101     // Get the deviceId of this sync status
102     const std::vector<std::string> &GetDevices() const;
103 
104     // Wait if it's a block sync
105     void WaitIfNeed();
106 
107     // Notify if it's a block sync
108     void NotifyIfNeed();
109 
110     // Return if this sync is auto sync
111     bool IsAutoSync() const;
112 
113     // Return if this sync is block sync
114     bool IsBlockSync() const;
115 
116     // Return if this sync is AUTO_SUBSCRIBE_QUERY
117     bool IsAutoControlCmd() const;
118 
119     // Check if All devices sync finished.
120     bool CheckIsAllFinished() const;
121 
122     // For query sync
123     void SetQuery(const QuerySyncObject &query);
124     void GetQuery(QuerySyncObject &targetObject) const;
125     bool IsQuerySync() const;
126     std::string GetQueryId() const;
127     static SyncType GetSyncType(int mode);
128     static int TransferSyncMode(int mode);
129 
130     static const std::map<int, DBStatus> &DBStatusTransMap();
131 
132 protected:
133     virtual ~SyncOperation();
134 
135 private:
136     DECLARE_OBJECT_TAG(SyncOperation);
137 
138     // called by destruction
139     void Finalize();
140 
141     // The device list
142     const std::vector<std::string> devices_;
143 
144     // The Syncid
145     uint32_t syncId_;
146 
147     // The sync mode_ see SyncMode
148     int mode_;
149 
150     // The callback caller registered
151     UserCallback userCallback_;
152 
153     // The callback caller registered, when sync timeout, call
154     OnSyncFinished onFinished_;
155 
156     // The callback caller registered, will be called when destruction.
157     OnSyncFinalize onFinalize_;
158 
159     // The device id we sync with
160     std::map<std::string, int> statuses_;
161 
162     // Is this operation is a block sync
163     bool isBlockSync_;
164 
165     // Is this operation is an auto sync
166     bool isAutoSync_;
167 
168     // Is this operation has finished
169     bool isFinished_;
170 
171     // Used for block sync
172     std::unique_ptr<SemaphoreUtils> semaphore_;
173 
174     mutable std::mutex queryMutex_;
175     QuerySyncObject query_;
176     bool isQuerySync_;
177 
178     bool isAutoSubscribe_;
179 
180     // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished
181     std::string identifier_;
182 };
183 } // namespace DistributedDB
184 
185 #endif  // SYNC_OPERATION_H
186