• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_OPERATION_H
17 #define SYNC_OPERATION_H
18 
19 #include <functional>
20 #include <string>
21 #include <vector>
22 #include <map>
23 #include <mutex>
24 
25 #include "ikvdb_sync_interface.h"
26 #include "notification_chain.h"
27 #include "query_sync_object.h"
28 #include "ref_object.h"
29 #include "runtime_context.h"
30 #include "semaphore_utils.h"
31 #include "sync_types.h"
32 
33 namespace DistributedDB {
34 class SyncOperation : public RefObject {
35 public:
36     enum Status {
37         OP_WAITING = 0,
38         OP_SYNCING,
39         OP_SEND_FINISHED,
40         OP_RECV_FINISHED,
41         OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status.
42         OP_FAILED,
43         OP_TIMEOUT,
44         OP_PERMISSION_CHECK_FAILED,
45         OP_COMM_ABNORMAL,
46         OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local
47         OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error
48         OP_BUSY_FAILURE,
49         OP_SCHEMA_INCOMPATIBLE,
50         OP_QUERY_FORMAT_FAILURE,
51         OP_QUERY_FIELD_FAILURE,
52         OP_NOT_SUPPORT,
53         OP_INTERCEPT_DATA_FAIL,
54         OP_MAX_LIMITS,
55         OP_SCHEMA_CHANGED,
56         OP_INVALID_ARGS,
57         OP_USER_CHANGED
58     };
59 
60     using UserCallback = std::function<void(std::map<std::string, int>)>;
61     using OnSyncFinished = std::function<void(int)>;
62     using OnSyncFinalize = std::function<void(void)>;
63 
64     SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
65         const UserCallback &userCallback, bool isBlockSync);
66 
67     DISABLE_COPY_ASSIGN_MOVE(SyncOperation);
68 
69     // Init the status for callback
70     int Initialize();
71 
72     // Set the OnSyncFinalize callback
73     void SetOnSyncFinalize(const OnSyncFinalize &callback);
74 
75     // Set the OnSyncFinished callback, it will be called either success or failed.
76     void SetOnSyncFinished(const OnSyncFinished &callback);
77 
78     // Set the sync status, running or finished
79     void SetStatus(const std::string &deviceId, int status);
80 
81     // Set the unfinished devices sync status, running or finished
82     void SetUnfinishedDevStatus(int status);
83 
84     // Set the identifier, used in SyncOperation::Finished
85     void SetIdentifier(const std::vector<uint8_t> &identifier);
86 
87     // Get the sync status, running or finished
88     int GetStatus(const std::string &deviceId) const;
89 
90     // Get the sync id.
91     uint32_t GetSyncId() const;
92 
93     // Get the sync mode
94     int GetMode() const;
95 
96     // Used to call the onFinished and caller's on complete
97     void Finished();
98 
99     // Get the deviceId of this sync status
100     const std::vector<std::string> &GetDevices() const;
101 
102     // Wait if it's a block sync
103     void WaitIfNeed();
104 
105     // Notify if it's a block sync
106     void NotifyIfNeed();
107 
108     // Return if this sync is auto sync
109     bool IsAutoSync() const;
110 
111     // Return if this sync is block sync
112     bool IsBlockSync() const;
113 
114     // Return if this sync is AUTO_SUBSCRIBE_QUERY
115     bool IsAutoControlCmd() const;
116 
117     // Check if All devices sync finished.
118     bool CheckIsAllFinished() const;
119 
120     // For query sync
121     void SetQuery(const QuerySyncObject &query);
122     QuerySyncObject GetQuery() const;
123     bool IsQuerySync() const;
124     std::string GetQueryId() const;
125     static SyncType GetSyncType(int mode);
126     static int TransferSyncMode(int mode);
127 
128     static const std::map<int, DBStatus> &DBStatusTransMap();
129 
130 protected:
131     virtual ~SyncOperation();
132 
133 private:
134     DECLARE_OBJECT_TAG(SyncOperation);
135 
136     // called by destruction
137     void Finalize();
138 
139     // Transfer sync mode from interface to inner
140     void TransferQuerySyncMode();
141 
142     // The device list
143     const std::vector<std::string> devices_;
144 
145     // The Syncid
146     uint32_t syncId_;
147 
148     // The sync mode_ see SyncMode
149     int mode_;
150 
151     // The callback caller registered
152     UserCallback userCallback_;
153 
154     // The callback caller registered, when sync timeout, call
155     OnSyncFinished onFinished_;
156 
157     // The callback caller registered, will be called when destruction.
158     OnSyncFinalize onFinalize_;
159 
160     // The device id we sync with
161     std::map<std::string, int> statuses_;
162 
163     // Is this operation is a block sync
164     bool isBlockSync_;
165 
166     // Is this operation is an auto sync
167     bool isAutoSync_;
168 
169     // Is this operation has finished
170     bool isFinished_;
171 
172     // Used for block sync
173     std::unique_ptr<SemaphoreUtils> semaphore_;
174 
175     QuerySyncObject query_;
176     bool isQuerySync_;
177 
178     bool isAutoSubscribe_;
179 
180     // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished
181     std::string identifier_;
182 };
183 } // namespace DistributedDB
184 
185 #endif  // SYNC_OPERATION_H
186