1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "sync_able_engine.h"
16
17 #include "db_dump_helper.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "parcel.h"
21 #include "ref_object.h"
22 #include "relational_sync_able_storage.h"
23 #include "runtime_context.h"
24 #include "user_change_monitor.h"
25
26 namespace DistributedDB {
SyncAbleEngine(ISyncInterface * store)27 SyncAbleEngine::SyncAbleEngine(ISyncInterface *store)
28 : syncer_(),
29 started_(false),
30 closed_(false),
31 isSyncModuleActiveCheck_(false),
32 isSyncNeedActive_(true),
33 store_(store),
34 userChangeListener_(nullptr)
35 {}
36
~SyncAbleEngine()37 SyncAbleEngine::~SyncAbleEngine()
38 {
39 if (userChangeListener_ != nullptr) {
40 userChangeListener_->Drop(true);
41 userChangeListener_ = nullptr;
42 }
43 }
44
45 // Start a sync action.
Sync(const ISyncer::SyncParma & parm,uint64_t connectionId)46 int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm, uint64_t connectionId)
47 {
48 if (!started_) {
49 int errCode = StartSyncer();
50 if (!started_) {
51 return errCode;
52 }
53 }
54 return syncer_.Sync(parm, connectionId);
55 }
56
WakeUpSyncer()57 void SyncAbleEngine::WakeUpSyncer()
58 {
59 StartSyncer();
60 }
61
Close()62 void SyncAbleEngine::Close()
63 {
64 StopSyncer();
65 }
66
EnableAutoSync(bool enable)67 void SyncAbleEngine::EnableAutoSync(bool enable)
68 {
69 if (!started_) {
70 StartSyncer();
71 }
72 return syncer_.EnableAutoSync(enable);
73 }
74
EnableManualSync(void)75 int SyncAbleEngine::EnableManualSync(void)
76 {
77 return syncer_.EnableManualSync();
78 }
79
DisableManualSync(void)80 int SyncAbleEngine::DisableManualSync(void)
81 {
82 return syncer_.DisableManualSync();
83 }
84
85 // Get The current virtual timestamp
GetTimestamp()86 uint64_t SyncAbleEngine::GetTimestamp()
87 {
88 if (!started_) {
89 StartSyncer();
90 }
91 return syncer_.GetTimestamp();
92 }
93
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)94 int SyncAbleEngine::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
95 {
96 if (!started_) {
97 StartSyncer();
98 }
99 return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash, tableName);
100 }
101
102 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)103 int SyncAbleEngine::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
104 {
105 int errCode = E_OK;
106 {
107 std::unique_lock<std::mutex> lock(syncerOperateLock_);
108 errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
109 closed_ = false;
110 }
111 UserChangeHandle();
112 return errCode;
113 }
114
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)115 int SyncAbleEngine::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
116 {
117 if (store_ == nullptr) {
118 LOGF("RDB got null sync interface.");
119 return -E_INVALID_ARGS;
120 }
121 if (!isCheckSyncActive) {
122 SetSyncModuleActive();
123 isNeedActive = GetSyncModuleActive();
124 }
125
126 int errCode = syncer_.Initialize(store_, isNeedActive);
127 if (errCode == E_OK) {
128 started_ = true;
129 } else {
130 LOGE("RDB start syncer failed, err:'%d'.", errCode);
131 }
132
133 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
134 if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
135 // active to non_active
136 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
137 std::bind(&SyncAbleEngine::ChangeUserListerner, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
138 } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
139 EventType event = isNeedActive ?
140 UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
141 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
142 std::bind(&SyncAbleEngine::UserChangeHandle, this), event);
143 }
144 return errCode;
145 }
146
147 // Stop syncer
StopSyncer()148 void SyncAbleEngine::StopSyncer()
149 {
150 NotificationChain::Listener *userChangeListener = nullptr;
151 {
152 std::unique_lock<std::mutex> lock(syncerOperateLock_);
153 StopSyncerWithNoLock(true);
154 userChangeListener = userChangeListener_;
155 userChangeListener_ = nullptr;
156 }
157 if (userChangeListener != nullptr) {
158 userChangeListener->Drop(true);
159 userChangeListener = nullptr;
160 }
161 }
162
StopSyncerWithNoLock(bool isClosedOperation)163 void SyncAbleEngine::StopSyncerWithNoLock(bool isClosedOperation)
164 {
165 ReSetSyncModuleActive();
166 syncer_.Close(isClosedOperation);
167 if (started_) {
168 started_ = false;
169 }
170 closed_ = isClosedOperation;
171 if (!isClosedOperation && userChangeListener_ != nullptr) {
172 userChangeListener_->Drop(false);
173 userChangeListener_ = nullptr;
174 }
175 }
176
UserChangeHandle()177 void SyncAbleEngine::UserChangeHandle()
178 {
179 bool isNeedChange = false;
180 bool isNeedActive = true;
181 std::unique_lock<std::mutex> lock(syncerOperateLock_);
182 if (closed_) {
183 LOGI("RDB is already closed");
184 return;
185 }
186 isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
187 isNeedChange = (isNeedActive != isSyncNeedActive_) ? true : false;
188 // non_active to active or active to non_active
189 if (isNeedChange) {
190 StopSyncerWithNoLock(); // will drop userChangeListener
191 isSyncModuleActiveCheck_ = true;
192 isSyncNeedActive_ = isNeedActive;
193 StartSyncerWithNoLock(true, isNeedActive);
194 }
195 }
196
ChangeUserListerner()197 void SyncAbleEngine::ChangeUserListerner()
198 {
199 // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
200 if (userChangeListener_ != nullptr) {
201 userChangeListener_->Drop(false);
202 userChangeListener_ = nullptr;
203 }
204 if (userChangeListener_ == nullptr) {
205 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
206 std::bind(&SyncAbleEngine::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
207 }
208 }
209
SetSyncModuleActive()210 void SyncAbleEngine::SetSyncModuleActive()
211 {
212 if (isSyncModuleActiveCheck_) {
213 return;
214 }
215
216 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
217 if (!isSyncDualTupleMode) {
218 isSyncNeedActive_ = true;
219 isSyncModuleActiveCheck_ = true;
220 return;
221 }
222 isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
223 if (!isSyncNeedActive_) {
224 LOGI("syncer no need to active");
225 }
226 isSyncModuleActiveCheck_ = true;
227 }
228
GetSyncModuleActive()229 bool SyncAbleEngine::GetSyncModuleActive()
230 {
231 return isSyncNeedActive_;
232 }
233
ReSetSyncModuleActive()234 void SyncAbleEngine::ReSetSyncModuleActive()
235 {
236 isSyncModuleActiveCheck_ = false;
237 isSyncNeedActive_ = true;
238 }
239
TriggerSync(int notifyEvent)240 void SyncAbleEngine::TriggerSync(int notifyEvent)
241 {
242 if (!started_) {
243 StartSyncer();
244 }
245 if (started_) {
246 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, notifyEvent] {
247 syncer_.LocalDataChanged(notifyEvent);
248 });
249 if (errCode != E_OK) {
250 LOGE("[TriggerSync] SyncAbleEngine TriggerSync LocalDataChanged retCode:%d", errCode);
251 }
252 }
253 }
254
GetLocalIdentity(std::string & outTarget)255 int SyncAbleEngine::GetLocalIdentity(std::string &outTarget)
256 {
257 if (!started_) {
258 StartSyncer();
259 }
260 return syncer_.GetLocalIdentity(outTarget);
261 }
262
StopSync(uint64_t connectionId)263 void SyncAbleEngine::StopSync(uint64_t connectionId)
264 {
265 if (started_) {
266 syncer_.StopSync(connectionId);
267 }
268 }
269
Dump(int fd)270 void SyncAbleEngine::Dump(int fd)
271 {
272 SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
273 DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
274 basicInfo.isAutoSync);
275 if (basicInfo.isSyncActive) {
276 DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
277 syncer_.Dump(fd);
278 }
279 }
280
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)281 int SyncAbleEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
282 uint64_t connectionId, std::shared_ptr<ResultSet> &result)
283 {
284 if (!started_) {
285 int errCode = StartSyncer();
286 if (!started_) {
287 return errCode;
288 }
289 }
290 return syncer_.RemoteQuery(device, condition, timeout, connectionId, result);
291 }
292
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)293 int SyncAbleEngine::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
294 {
295 if (!started_) {
296 StartSyncer();
297 }
298 return syncer_.GetHashDeviceId(clientId, hashDevId);
299 }
300 }
301