• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "sync_able_engine.h"
16 
17 #include "db_dump_helper.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "parcel.h"
21 #include "ref_object.h"
22 #include "relational_sync_able_storage.h"
23 #include "runtime_context.h"
24 #include "user_change_monitor.h"
25 
26 namespace DistributedDB {
SyncAbleEngine(ISyncInterface * store)27 SyncAbleEngine::SyncAbleEngine(ISyncInterface *store)
28     : syncer_(),
29       started_(false),
30       closed_(false),
31       isSyncModuleActiveCheck_(false),
32       isSyncNeedActive_(true),
33       store_(store),
34       userChangeListener_(nullptr)
35 {}
36 
~SyncAbleEngine()37 SyncAbleEngine::~SyncAbleEngine()
38 {
39     if (userChangeListener_ != nullptr) {
40         userChangeListener_->Drop(true);
41         userChangeListener_ = nullptr;
42     }
43 }
44 
45 // Start a sync action.
Sync(const ISyncer::SyncParma & parm,uint64_t connectionId)46 int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm, uint64_t connectionId)
47 {
48     if (!started_) {
49         int errCode = StartSyncer();
50         if (!started_) {
51             return errCode;
52         }
53     }
54     return syncer_.Sync(parm, connectionId);
55 }
56 
WakeUpSyncer()57 void SyncAbleEngine::WakeUpSyncer()
58 {
59     StartSyncer();
60 }
61 
Close()62 void SyncAbleEngine::Close()
63 {
64     StopSyncer();
65 }
66 
67 // Get The current virtual timestamp
GetTimestamp()68 uint64_t SyncAbleEngine::GetTimestamp()
69 {
70     if (NeedStartSyncer()) {
71         StartSyncer();
72     }
73     return syncer_.GetTimestamp();
74 }
75 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)76 int SyncAbleEngine::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
77 {
78     if (NeedStartSyncer()) {
79         int errCode = StartSyncer();
80         if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
81             return errCode;
82         }
83     }
84     return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash, tableName);
85 }
86 
87 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)88 int SyncAbleEngine::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
89 {
90     int errCode = E_OK;
91     {
92         std::unique_lock<std::mutex> lock(syncerOperateLock_);
93         errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
94         closed_ = false;
95     }
96     UserChangeHandle();
97     return errCode;
98 }
99 
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)100 int SyncAbleEngine::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
101 {
102     if (store_ == nullptr) {
103         LOGF("RDB got null sync interface.");
104         return -E_INVALID_ARGS;
105     }
106     if (!isCheckSyncActive) {
107         SetSyncModuleActive();
108         isNeedActive = GetSyncModuleActive();
109     }
110 
111     int errCode = syncer_.Initialize(store_, isNeedActive);
112     if (errCode == E_OK) {
113         started_ = true;
114     } else {
115         LOGE("RDB start syncer failed, err:'%d'.", errCode);
116     }
117 
118     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
119     if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
120         // active to non_active
121         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
122             std::bind(&SyncAbleEngine::ChangeUserListener, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
123     } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
124         EventType event = isNeedActive ?
125             UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
126         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
127             std::bind(&SyncAbleEngine::UserChangeHandle, this), event);
128     }
129     return errCode;
130 }
131 
132 // Stop syncer
StopSyncer()133 void SyncAbleEngine::StopSyncer()
134 {
135     NotificationChain::Listener *userChangeListener = nullptr;
136     {
137         std::unique_lock<std::mutex> lock(syncerOperateLock_);
138         StopSyncerWithNoLock(true);
139         userChangeListener = userChangeListener_;
140         userChangeListener_ = nullptr;
141     }
142     if (userChangeListener != nullptr) {
143         userChangeListener->Drop(true);
144         userChangeListener = nullptr;
145     }
146 }
147 
StopSyncerWithNoLock(bool isClosedOperation)148 void SyncAbleEngine::StopSyncerWithNoLock(bool isClosedOperation)
149 {
150     ReSetSyncModuleActive();
151     syncer_.Close(isClosedOperation);
152     if (started_) {
153         started_ = false;
154     }
155     closed_ = isClosedOperation;
156     if (!isClosedOperation && userChangeListener_ != nullptr) {
157         userChangeListener_->Drop(false);
158         userChangeListener_ = nullptr;
159     }
160 }
161 
UserChangeHandle()162 void SyncAbleEngine::UserChangeHandle()
163 {
164     if (store_ == nullptr) {
165         LOGD("[SyncAbleEngine] RDB got null sync interface in userChange.");
166         return;
167     }
168     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
169     if (!isSyncDualTupleMode) {
170         LOGD("[SyncAbleEngine] no use syncDualTupleMode, abort userChange");
171         return;
172     }
173     std::unique_lock<std::mutex> lock(syncerOperateLock_);
174     if (closed_) {
175         LOGI("RDB is already closed");
176         return;
177     }
178     bool isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
179     bool isNeedChange = (isNeedActive != isSyncNeedActive_);
180     // non_active to active or active to non_active
181     if (isNeedChange) {
182         StopSyncerWithNoLock(); // will drop userChangeListener
183         isSyncModuleActiveCheck_ = true;
184         isSyncNeedActive_ = isNeedActive;
185         StartSyncerWithNoLock(true, isNeedActive);
186     }
187 }
188 
ChangeUserListener()189 void SyncAbleEngine::ChangeUserListener()
190 {
191     // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
192     if (userChangeListener_ != nullptr) {
193         userChangeListener_->Drop(false);
194         userChangeListener_ = nullptr;
195     }
196     userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
197         std::bind(&SyncAbleEngine::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
198 }
199 
SetSyncModuleActive()200 void SyncAbleEngine::SetSyncModuleActive()
201 {
202     if (isSyncModuleActiveCheck_) {
203         return;
204     }
205 
206     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
207     if (!isSyncDualTupleMode) {
208         isSyncNeedActive_ = true;
209         isSyncModuleActiveCheck_ = true;
210         return;
211     }
212     isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
213     if (!isSyncNeedActive_) {
214         LOGI("syncer no need to active");
215     }
216     isSyncModuleActiveCheck_ = true;
217 }
218 
GetSyncModuleActive()219 bool SyncAbleEngine::GetSyncModuleActive()
220 {
221     return isSyncNeedActive_;
222 }
223 
ReSetSyncModuleActive()224 void SyncAbleEngine::ReSetSyncModuleActive()
225 {
226     isSyncModuleActiveCheck_ = false;
227     isSyncNeedActive_ = true;
228 }
229 
GetLocalIdentity(std::string & outTarget)230 int SyncAbleEngine::GetLocalIdentity(std::string &outTarget)
231 {
232     if (!started_) {
233         StartSyncer();
234     }
235     return syncer_.GetLocalIdentity(outTarget);
236 }
237 
StopSync(uint64_t connectionId)238 void SyncAbleEngine::StopSync(uint64_t connectionId)
239 {
240     if (started_) {
241         syncer_.StopSync(connectionId);
242     }
243 }
244 
Dump(int fd)245 void SyncAbleEngine::Dump(int fd)
246 {
247     SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
248     DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
249         basicInfo.isAutoSync);
250     if (basicInfo.isSyncActive) {
251         DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
252         syncer_.Dump(fd);
253     }
254 }
255 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)256 int SyncAbleEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
257     uint64_t connectionId, std::shared_ptr<ResultSet> &result)
258 {
259     if (!started_) {
260         int errCode = StartSyncer();
261         if (!started_) {
262             return errCode;
263         }
264     }
265     return syncer_.RemoteQuery(device, condition, timeout, connectionId, result);
266 }
267 
NeedStartSyncer() const268 bool SyncAbleEngine::NeedStartSyncer() const
269 {
270     if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
271         LOGW("communicator not ready!");
272         return false;
273     }
274     // don't start when check callback got not active
275     // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
276     return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
277 }
278 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)279 int SyncAbleEngine::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
280 {
281     if (NeedStartSyncer()) {
282         int errCode = StartSyncer();
283         if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
284             return errCode;
285         }
286     }
287     return syncer_.GetHashDeviceId(clientId, hashDevId);
288 }
289 }
290