• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "sync_able_engine.h"
16 
17 #include "db_dump_helper.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "parcel.h"
21 #include "ref_object.h"
22 #include "relational_sync_able_storage.h"
23 #include "runtime_context.h"
24 #include "user_change_monitor.h"
25 
26 namespace DistributedDB {
SyncAbleEngine(ISyncInterface * store)27 SyncAbleEngine::SyncAbleEngine(ISyncInterface *store)
28     : syncer_(),
29       started_(false),
30       closed_(false),
31       isSyncModuleActiveCheck_(false),
32       isSyncNeedActive_(true),
33       store_(store),
34       userChangeListener_(nullptr)
35 {}
36 
~SyncAbleEngine()37 SyncAbleEngine::~SyncAbleEngine()
38 {
39     if (userChangeListener_ != nullptr) {
40         userChangeListener_->Drop(true);
41         userChangeListener_ = nullptr;
42     }
43 }
44 
45 // Start a sync action.
Sync(const ISyncer::SyncParma & parm,uint64_t connectionId)46 int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm, uint64_t connectionId)
47 {
48     if (!started_) {
49         int errCode = StartSyncer();
50         if (!started_) {
51             return errCode;
52         }
53     }
54     return syncer_.Sync(parm, connectionId);
55 }
56 
WakeUpSyncer()57 void SyncAbleEngine::WakeUpSyncer()
58 {
59     StartSyncer();
60 }
61 
Close()62 void SyncAbleEngine::Close()
63 {
64     StopSyncer();
65 }
66 
EnableAutoSync(bool enable)67 void SyncAbleEngine::EnableAutoSync(bool enable)
68 {
69     if (!started_) {
70         StartSyncer();
71     }
72     return syncer_.EnableAutoSync(enable);
73 }
74 
EnableManualSync(void)75 int SyncAbleEngine::EnableManualSync(void)
76 {
77     return syncer_.EnableManualSync();
78 }
79 
DisableManualSync(void)80 int SyncAbleEngine::DisableManualSync(void)
81 {
82     return syncer_.DisableManualSync();
83 }
84 
85 // Get The current virtual timestamp
GetTimestamp()86 uint64_t SyncAbleEngine::GetTimestamp()
87 {
88     if (!started_) {
89         StartSyncer();
90     }
91     return syncer_.GetTimestamp();
92 }
93 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)94 int SyncAbleEngine::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
95 {
96     if (!started_) {
97         StartSyncer();
98     }
99     return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash, tableName);
100 }
101 
102 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)103 int SyncAbleEngine::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
104 {
105     int errCode = E_OK;
106     {
107         std::unique_lock<std::mutex> lock(syncerOperateLock_);
108         errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
109         closed_ = false;
110     }
111     UserChangeHandle();
112     return errCode;
113 }
114 
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)115 int SyncAbleEngine::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
116 {
117     if (store_ == nullptr) {
118         LOGF("RDB got null sync interface.");
119         return -E_INVALID_ARGS;
120     }
121     if (!isCheckSyncActive) {
122         SetSyncModuleActive();
123         isNeedActive = GetSyncModuleActive();
124     }
125 
126     int errCode = syncer_.Initialize(store_, isNeedActive);
127     if (errCode == E_OK) {
128         started_ = true;
129     } else {
130         LOGE("RDB start syncer failed, err:'%d'.", errCode);
131     }
132 
133     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
134     if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
135         // active to non_active
136         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
137             std::bind(&SyncAbleEngine::ChangeUserListerner, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
138     } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
139         EventType event = isNeedActive ?
140             UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
141         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
142             std::bind(&SyncAbleEngine::UserChangeHandle, this), event);
143     }
144     return errCode;
145 }
146 
147 // Stop syncer
StopSyncer()148 void SyncAbleEngine::StopSyncer()
149 {
150     NotificationChain::Listener *userChangeListener = nullptr;
151     {
152         std::unique_lock<std::mutex> lock(syncerOperateLock_);
153         StopSyncerWithNoLock(true);
154         userChangeListener = userChangeListener_;
155         userChangeListener_ = nullptr;
156     }
157     if (userChangeListener != nullptr) {
158         userChangeListener->Drop(true);
159         userChangeListener = nullptr;
160     }
161 }
162 
StopSyncerWithNoLock(bool isClosedOperation)163 void SyncAbleEngine::StopSyncerWithNoLock(bool isClosedOperation)
164 {
165     ReSetSyncModuleActive();
166     syncer_.Close(isClosedOperation);
167     if (started_) {
168         started_ = false;
169     }
170     closed_ = isClosedOperation;
171     if (!isClosedOperation && userChangeListener_ != nullptr) {
172         userChangeListener_->Drop(false);
173         userChangeListener_ = nullptr;
174     }
175 }
176 
UserChangeHandle()177 void SyncAbleEngine::UserChangeHandle()
178 {
179     bool isNeedChange = false;
180     bool isNeedActive = true;
181     std::unique_lock<std::mutex> lock(syncerOperateLock_);
182     if (closed_) {
183         LOGI("RDB is already closed");
184         return;
185     }
186     isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
187     isNeedChange = (isNeedActive != isSyncNeedActive_) ? true : false;
188     // non_active to active or active to non_active
189     if (isNeedChange) {
190         StopSyncerWithNoLock(); // will drop userChangeListener
191         isSyncModuleActiveCheck_ = true;
192         isSyncNeedActive_ = isNeedActive;
193         StartSyncerWithNoLock(true, isNeedActive);
194     }
195 }
196 
ChangeUserListerner()197 void SyncAbleEngine::ChangeUserListerner()
198 {
199     // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
200     if (userChangeListener_ != nullptr) {
201         userChangeListener_->Drop(false);
202         userChangeListener_ = nullptr;
203     }
204     if (userChangeListener_ == nullptr) {
205         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
206             std::bind(&SyncAbleEngine::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
207     }
208 }
209 
SetSyncModuleActive()210 void SyncAbleEngine::SetSyncModuleActive()
211 {
212     if (isSyncModuleActiveCheck_) {
213         return;
214     }
215 
216     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
217     if (!isSyncDualTupleMode) {
218         isSyncNeedActive_ = true;
219         isSyncModuleActiveCheck_ = true;
220         return;
221     }
222     isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
223     if (!isSyncNeedActive_) {
224         LOGI("syncer no need to active");
225     }
226     isSyncModuleActiveCheck_ = true;
227 }
228 
GetSyncModuleActive()229 bool SyncAbleEngine::GetSyncModuleActive()
230 {
231     return isSyncNeedActive_;
232 }
233 
ReSetSyncModuleActive()234 void SyncAbleEngine::ReSetSyncModuleActive()
235 {
236     isSyncModuleActiveCheck_ = false;
237     isSyncNeedActive_ = true;
238 }
239 
TriggerSync(int notifyEvent)240 void SyncAbleEngine::TriggerSync(int notifyEvent)
241 {
242     if (!started_) {
243         StartSyncer();
244     }
245     if (started_) {
246         int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, notifyEvent] {
247             syncer_.LocalDataChanged(notifyEvent);
248         });
249         if (errCode != E_OK) {
250             LOGE("[TriggerSync] SyncAbleEngine TriggerSync LocalDataChanged retCode:%d", errCode);
251         }
252     }
253 }
254 
GetLocalIdentity(std::string & outTarget)255 int SyncAbleEngine::GetLocalIdentity(std::string &outTarget)
256 {
257     if (!started_) {
258         StartSyncer();
259     }
260     return syncer_.GetLocalIdentity(outTarget);
261 }
262 
StopSync(uint64_t connectionId)263 void SyncAbleEngine::StopSync(uint64_t connectionId)
264 {
265     if (started_) {
266         syncer_.StopSync(connectionId);
267     }
268 }
269 
Dump(int fd)270 void SyncAbleEngine::Dump(int fd)
271 {
272     SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
273     DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
274         basicInfo.isAutoSync);
275     if (basicInfo.isSyncActive) {
276         DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
277         syncer_.Dump(fd);
278     }
279 }
280 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)281 int SyncAbleEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
282     uint64_t connectionId, std::shared_ptr<ResultSet> &result)
283 {
284     if (!started_) {
285         int errCode = StartSyncer();
286         if (!started_) {
287             return errCode;
288         }
289     }
290     return syncer_.RemoteQuery(device, condition, timeout, connectionId, result);
291 }
292 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)293 int SyncAbleEngine::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
294 {
295     if (!started_) {
296         StartSyncer();
297     }
298     return syncer_.GetHashDeviceId(clientId, hashDevId);
299 }
300 }
301