1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "sync_able_engine.h"
16
17 #include "db_dump_helper.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "parcel.h"
21 #include "ref_object.h"
22 #include "relational_sync_able_storage.h"
23 #include "runtime_context.h"
24 #include "user_change_monitor.h"
25
26 namespace DistributedDB {
SyncAbleEngine(ISyncInterface * store)27 SyncAbleEngine::SyncAbleEngine(ISyncInterface *store)
28 : syncer_(),
29 started_(false),
30 closed_(false),
31 isSyncModuleActiveCheck_(false),
32 isSyncNeedActive_(true),
33 store_(store),
34 userChangeListener_(nullptr)
35 {}
36
~SyncAbleEngine()37 SyncAbleEngine::~SyncAbleEngine()
38 {
39 if (userChangeListener_ != nullptr) {
40 userChangeListener_->Drop(true);
41 userChangeListener_ = nullptr;
42 }
43 }
44
45 // Start a sync action.
Sync(const ISyncer::SyncParma & parm,uint64_t connectionId)46 int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm, uint64_t connectionId)
47 {
48 if (!started_) {
49 int errCode = StartSyncer();
50 if (!started_) {
51 return errCode;
52 }
53 }
54 return syncer_.Sync(parm, connectionId);
55 }
56
WakeUpSyncer()57 void SyncAbleEngine::WakeUpSyncer()
58 {
59 StartSyncer();
60 }
61
Close()62 void SyncAbleEngine::Close()
63 {
64 StopSyncer();
65 }
66
67 // Get The current virtual timestamp
GetTimestamp()68 uint64_t SyncAbleEngine::GetTimestamp()
69 {
70 if (NeedStartSyncer()) {
71 StartSyncer();
72 }
73 return syncer_.GetTimestamp();
74 }
75
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)76 int SyncAbleEngine::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
77 {
78 if (NeedStartSyncer()) {
79 int errCode = StartSyncer();
80 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
81 return errCode;
82 }
83 }
84 return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash, tableName);
85 }
86
87 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)88 int SyncAbleEngine::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
89 {
90 int errCode = E_OK;
91 {
92 std::unique_lock<std::mutex> lock(syncerOperateLock_);
93 errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
94 closed_ = false;
95 }
96 UserChangeHandle();
97 return errCode;
98 }
99
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)100 int SyncAbleEngine::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
101 {
102 if (store_ == nullptr) {
103 LOGF("RDB got null sync interface.");
104 return -E_INVALID_ARGS;
105 }
106 if (!isCheckSyncActive) {
107 SetSyncModuleActive();
108 isNeedActive = GetSyncModuleActive();
109 }
110
111 int errCode = syncer_.Initialize(store_, isNeedActive);
112 if (errCode == E_OK) {
113 started_ = true;
114 } else {
115 LOGE("RDB start syncer failed, err:'%d'.", errCode);
116 }
117
118 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
119 if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
120 // active to non_active
121 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
122 std::bind(&SyncAbleEngine::ChangeUserListener, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
123 } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
124 EventType event = isNeedActive ?
125 UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
126 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
127 std::bind(&SyncAbleEngine::UserChangeHandle, this), event);
128 }
129 return errCode;
130 }
131
132 // Stop syncer
StopSyncer()133 void SyncAbleEngine::StopSyncer()
134 {
135 NotificationChain::Listener *userChangeListener = nullptr;
136 {
137 std::unique_lock<std::mutex> lock(syncerOperateLock_);
138 StopSyncerWithNoLock(true);
139 userChangeListener = userChangeListener_;
140 userChangeListener_ = nullptr;
141 }
142 if (userChangeListener != nullptr) {
143 userChangeListener->Drop(true);
144 userChangeListener = nullptr;
145 }
146 }
147
StopSyncerWithNoLock(bool isClosedOperation)148 void SyncAbleEngine::StopSyncerWithNoLock(bool isClosedOperation)
149 {
150 ReSetSyncModuleActive();
151 syncer_.Close(isClosedOperation);
152 if (started_) {
153 started_ = false;
154 }
155 closed_ = isClosedOperation;
156 if (!isClosedOperation && userChangeListener_ != nullptr) {
157 userChangeListener_->Drop(false);
158 userChangeListener_ = nullptr;
159 }
160 }
161
UserChangeHandle()162 void SyncAbleEngine::UserChangeHandle()
163 {
164 if (store_ == nullptr) {
165 LOGD("[SyncAbleEngine] RDB got null sync interface in userChange.");
166 return;
167 }
168 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
169 if (!isSyncDualTupleMode) {
170 LOGD("[SyncAbleEngine] no use syncDualTupleMode, abort userChange");
171 return;
172 }
173 std::unique_lock<std::mutex> lock(syncerOperateLock_);
174 if (closed_) {
175 LOGI("RDB is already closed");
176 return;
177 }
178 bool isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
179 bool isNeedChange = (isNeedActive != isSyncNeedActive_);
180 // non_active to active or active to non_active
181 if (isNeedChange) {
182 StopSyncerWithNoLock(); // will drop userChangeListener
183 isSyncModuleActiveCheck_ = true;
184 isSyncNeedActive_ = isNeedActive;
185 StartSyncerWithNoLock(true, isNeedActive);
186 }
187 }
188
ChangeUserListener()189 void SyncAbleEngine::ChangeUserListener()
190 {
191 // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
192 if (userChangeListener_ != nullptr) {
193 userChangeListener_->Drop(false);
194 userChangeListener_ = nullptr;
195 }
196 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
197 std::bind(&SyncAbleEngine::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
198 }
199
SetSyncModuleActive()200 void SyncAbleEngine::SetSyncModuleActive()
201 {
202 if (isSyncModuleActiveCheck_) {
203 return;
204 }
205
206 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
207 if (!isSyncDualTupleMode) {
208 isSyncNeedActive_ = true;
209 isSyncModuleActiveCheck_ = true;
210 return;
211 }
212 isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
213 if (!isSyncNeedActive_) {
214 LOGI("syncer no need to active");
215 }
216 isSyncModuleActiveCheck_ = true;
217 }
218
GetSyncModuleActive()219 bool SyncAbleEngine::GetSyncModuleActive()
220 {
221 return isSyncNeedActive_;
222 }
223
ReSetSyncModuleActive()224 void SyncAbleEngine::ReSetSyncModuleActive()
225 {
226 isSyncModuleActiveCheck_ = false;
227 isSyncNeedActive_ = true;
228 }
229
GetLocalIdentity(std::string & outTarget)230 int SyncAbleEngine::GetLocalIdentity(std::string &outTarget)
231 {
232 if (!started_) {
233 StartSyncer();
234 }
235 return syncer_.GetLocalIdentity(outTarget);
236 }
237
StopSync(uint64_t connectionId)238 void SyncAbleEngine::StopSync(uint64_t connectionId)
239 {
240 if (started_) {
241 syncer_.StopSync(connectionId);
242 }
243 }
244
Dump(int fd)245 void SyncAbleEngine::Dump(int fd)
246 {
247 SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
248 DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
249 basicInfo.isAutoSync);
250 if (basicInfo.isSyncActive) {
251 DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
252 syncer_.Dump(fd);
253 }
254 }
255
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)256 int SyncAbleEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
257 uint64_t connectionId, std::shared_ptr<ResultSet> &result)
258 {
259 if (!started_) {
260 int errCode = StartSyncer();
261 if (!started_) {
262 return errCode;
263 }
264 }
265 return syncer_.RemoteQuery(device, condition, timeout, connectionId, result);
266 }
267
NeedStartSyncer() const268 bool SyncAbleEngine::NeedStartSyncer() const
269 {
270 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
271 LOGW("communicator not ready!");
272 return false;
273 }
274 // don't start when check callback got not active
275 // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
276 return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
277 }
278
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)279 int SyncAbleEngine::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
280 {
281 if (NeedStartSyncer()) {
282 int errCode = StartSyncer();
283 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
284 return errCode;
285 }
286 }
287 return syncer_.GetHashDeviceId(clientId, hashDevId);
288 }
289 }
290