1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_kv_syncer.h"
17
18 #include <functional>
19 #include <map>
20 #include <mutex>
21
22 #include "db_common.h"
23 #include "ikvdb_sync_interface.h"
24 #include "log_print.h"
25 #include "meta_data.h"
26 #include "single_ver_sync_engine.h"
27 #include "sqlite_single_ver_natural_store.h"
28
29 namespace DistributedDB {
SingleVerKVSyncer()30 SingleVerKVSyncer::SingleVerKVSyncer()
31 : autoSyncEnable_(false), triggerSyncTask_(true)
32 {
33 }
34
~SingleVerKVSyncer()35 SingleVerKVSyncer::~SingleVerKVSyncer()
36 {
37 }
38
EnableAutoSync(bool enable)39 void SingleVerKVSyncer::EnableAutoSync(bool enable)
40 {
41 LOGI("[Syncer] EnableAutoSync enable = %d, Label=%s", enable, label_.c_str());
42 if (autoSyncEnable_ == enable) {
43 return;
44 }
45
46 autoSyncEnable_ = enable;
47 if (!enable) {
48 return;
49 }
50
51 if (!initialized_) {
52 LOGE("[Syncer] Syncer has not Init");
53 return;
54 }
55
56 std::vector<std::string> devices;
57 GetOnlineDevices(devices);
58 if (devices.empty()) {
59 LOGI("[Syncer] EnableAutoSync no online devices");
60 return;
61 }
62 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
63 if (errCode != E_OK) {
64 LOGE("[Syncer] sync start by EnableAutoSync failed err %d", errCode);
65 }
66 }
67
68 // Local data changed callback
LocalDataChanged(int notifyEvent)69 void SingleVerKVSyncer::LocalDataChanged(int notifyEvent)
70 {
71 if (!initialized_) {
72 LOGE("[Syncer] Syncer has not Init");
73 return;
74 }
75
76 if (notifyEvent != SQLITE_GENERAL_FINISH_MIGRATE_EVENT &&
77 notifyEvent != SQLITE_GENERAL_NS_PUT_EVENT) {
78 LOGD("[Syncer] ignore event:%d", notifyEvent);
79 return;
80 }
81 if (!triggerSyncTask_) {
82 LOGI("[Syncer] some sync task is scheduling");
83 return;
84 }
85 triggerSyncTask_ = false;
86 RefObject::IncObjRef(syncEngine_);
87 // To avoid many task were produced and waiting in the queue. For example, put value in a loop.
88 // It will consume thread pool resources, so other task will delay until these task finish.
89 // In extreme situation, 10 thread run the localDataChanged task and 1 task waiting in queue.
90 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] {
91 triggerSyncTask_ = true;
92 std::vector<std::string> devices;
93 GetOnlineDevices(devices);
94 if (devices.empty()) {
95 LOGI("[Syncer] LocalDataChanged no online devices, Label=%s", label_.c_str());
96 RefObject::DecObjRef(syncEngine_);
97 return;
98 }
99 if (!TryFullSync(devices)) {
100 TriggerSubQuerySync(devices);
101 }
102 RefObject::DecObjRef(syncEngine_);
103 });
104 // if task schedule failed, but triggerSyncTask_ is not set to true, other thread may skip the schedule time
105 // when task schedule failed, it means unormal status, it is unable to schedule next time probably
106 // so it is ok if other thread skip the schedule if last task schedule failed
107 if (errCode != E_OK) {
108 triggerSyncTask_ = true;
109 LOGE("[TriggerSync] LocalDataChanged retCode:%d", errCode);
110 RefObject::DecObjRef(syncEngine_);
111 }
112 return;
113 }
114
115 // remote device online callback
RemoteDataChanged(const std::string & device)116 void SingleVerKVSyncer::RemoteDataChanged(const std::string &device)
117 {
118 LOGI("[SingleVerKVSyncer] device online dev %s", STR_MASK(device));
119 if (!initialized_) {
120 LOGE("[Syncer] Syncer has not Init");
121 return;
122 }
123 std::string userId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
124 std::string appId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
125 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
126 RuntimeContext::GetInstance()->NotifyDatabaseStatusChange(userId, appId, storeId, device, true);
127 SingleVerSyncer::RemoteDataChanged(device);
128 if (autoSyncEnable_) {
129 RefObject::IncObjRef(syncEngine_);
130 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device] {
131 std::vector<std::string> devices;
132 devices.push_back(device);
133 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
134 if (errCode != E_OK) {
135 LOGE("[SingleVerKVSyncer] sync start by RemoteDataChanged failed err %d", errCode);
136 }
137 RefObject::DecObjRef(syncEngine_);
138 });
139 if (retCode != E_OK) {
140 LOGE("[AutoLaunch] RemoteDataChanged triggler sync retCode:%d", retCode);
141 RefObject::DecObjRef(syncEngine_);
142 }
143 }
144 // db online again ,trigger subscribe
145 // if remote device online, subscribequery num is 0
146 std::vector<QuerySyncObject> syncQueries;
147 static_cast<SingleVerSyncEngine *>(syncEngine_)->GetLocalSubscribeQueries(device, syncQueries);
148 if (syncQueries.size() == 0) {
149 LOGI("no need to trigger auto subscribe");
150 return;
151 }
152 LOGI("[SingleVerKVSyncer] trigger local subscribe sync, queryNums=%zu", syncQueries.size());
153 for (const auto &query : syncQueries) {
154 TriggerSubscribe(device, query);
155 }
156 static_cast<SingleVerSyncEngine *>(syncEngine_)->PutUnfiniedSubQueries(device, syncQueries);
157 }
158
QueryAutoSync(const InternalSyncParma & param)159 void SingleVerKVSyncer::QueryAutoSync(const InternalSyncParma ¶m)
160 {
161 if (!initialized_) {
162 LOGE("[Syncer] Syncer has not Init");
163 return;
164 }
165 LOGI("[SingleVerKVSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
166 RefObject::IncObjRef(syncEngine_);
167 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param] {
168 int errCode = Sync(param);
169 if (errCode != E_OK) {
170 LOGE("[SingleVerKVSyncer] sync start by QueryAutoSync failed err %d", errCode);
171 }
172 RefObject::DecObjRef(syncEngine_);
173 });
174 if (retCode != E_OK) {
175 LOGE("[SingleVerKVSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
176 RefObject::DecObjRef(syncEngine_);
177 }
178 }
179
SyncConditionCheck(QuerySyncObject & query,int mode,bool isQuerySync,const std::vector<std::string> & devices) const180 int SingleVerKVSyncer::SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync,
181 const std::vector<std::string> &devices) const
182 {
183 if (!isQuerySync) {
184 return E_OK;
185 }
186 int errCode = static_cast<SingleVerKvDBSyncInterface *>(syncInterface_)->CheckAndInitQueryCondition(query);
187 if (errCode != E_OK) {
188 LOGE("[SingleVerKVSyncer] QuerySyncObject check failed");
189 return errCode;
190 }
191 if (mode != SUBSCRIBE_QUERY) {
192 return E_OK;
193 }
194 if (query.HasLimit() || query.HasOrderBy()) {
195 LOGE("[SingleVerKVSyncer] subscribe query not support limit,offset or orderby");
196 return -E_NOT_SUPPORT;
197 }
198 if (devices.size() > MAX_DEVICES_NUM) {
199 LOGE("[SingleVerKVSyncer] devices is overlimit");
200 return -E_MAX_LIMITS;
201 }
202 return syncEngine_->SubscribeLimitCheck(devices, query);
203 }
204
TriggerSubscribe(const std::string & device,const QuerySyncObject & query)205 void SingleVerKVSyncer::TriggerSubscribe(const std::string &device, const QuerySyncObject &query)
206 {
207 if (!initialized_) {
208 LOGE("[Syncer] Syncer has not Init");
209 return;
210 }
211 RefObject::IncObjRef(syncEngine_);
212 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device, query] {
213 std::vector<std::string> devices;
214 devices.push_back(device);
215 SyncParma param;
216 param.devices = devices;
217 param.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
218 param.onComplete = nullptr;
219 param.onFinalize = nullptr;
220 param.wait = false;
221 param.isQuerySync = true;
222 param.syncQuery = query;
223 int errCode = Sync(param);
224 if (errCode != E_OK) {
225 LOGE("[SingleVerKVSyncer] subscribe start by RemoteDataChanged failed err %d", errCode);
226 }
227 RefObject::DecObjRef(syncEngine_);
228 });
229 if (retCode != E_OK) {
230 LOGE("[Syncer] triggler query subscribe start failed err %d", retCode);
231 RefObject::DecObjRef(syncEngine_);
232 }
233 }
234
TryFullSync(const std::vector<std::string> & devices)235 bool SingleVerKVSyncer::TryFullSync(const std::vector<std::string> &devices)
236 {
237 if (!initialized_) {
238 LOGE("[Syncer] Syncer has not Init");
239 return true;
240 }
241 if (!autoSyncEnable_) {
242 LOGD("[Syncer] autoSync no enable");
243 return false;
244 }
245 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
246 if (errCode != E_OK) {
247 LOGE("[Syncer] sync start by RemoteDataChanged failed err %d", errCode);
248 return false;
249 }
250 return true;
251 }
252
TriggerSubQuerySync(const std::vector<std::string> & devices)253 void SingleVerKVSyncer::TriggerSubQuerySync(const std::vector<std::string> &devices)
254 {
255 if (!initialized_) {
256 LOGE("[Syncer] Syncer has not Init");
257 return;
258 }
259 int errCode;
260 for (auto &device : devices) {
261 std::vector<QuerySyncObject> queries;
262 static_cast<SingleVerSyncEngine *>(syncEngine_)->GetRemoteSubscribeQueries(device, queries);
263 for (auto &query : queries) {
264 std::string queryId = query.GetIdentify();
265 uint64_t lastTimestamp = metadata_->GetQueryLastTimestamp(device, queryId);
266 WaterMark queryWaterMark = 0;
267 errCode = metadata_->GetSendQueryWaterMark(queryId, device, queryWaterMark, false);
268 if (errCode != E_OK) {
269 LOGE("[Syncer] get queryId=%s,dev=%s watermark failed", STR_MASK(queryId), STR_MASK(device));
270 continue;
271 }
272 if (lastTimestamp < queryWaterMark || lastTimestamp == 0) {
273 continue;
274 }
275 LOGD("[Syncer] lastTime=%" PRIu64 " vs WaterMark=%" PRIu64 ",trigger queryId=%s,dev=%s", lastTimestamp,
276 queryWaterMark, STR_MASK(queryId), STR_MASK(device));
277 InternalSyncParma param;
278 std::vector<std::string> targetDevices;
279 targetDevices.push_back(device);
280 param.devices = targetDevices;
281 param.mode = SyncModeType::AUTO_PUSH;
282 param.isQuerySync = true;
283 param.syncQuery = query;
284 QueryAutoSync(param);
285 }
286 }
287 }
288
DumpSyncerBasicInfo()289 SyncerBasicInfo SingleVerKVSyncer::DumpSyncerBasicInfo()
290 {
291 SyncerBasicInfo basicInfo = GenericSyncer::DumpSyncerBasicInfo();
292 basicInfo.isAutoSync = autoSyncEnable_;
293 return basicInfo;
294 }
295 } // namespace DistributedDB
296