1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_kv_syncer.h"
17
18 #include <functional>
19 #include <map>
20 #include <mutex>
21
22 #include "db_common.h"
23 #include "ikvdb_sync_interface.h"
24 #include "log_print.h"
25 #include "meta_data.h"
26 #include "single_ver_sync_engine.h"
27 #include "sqlite_single_ver_natural_store.h"
28
29 namespace DistributedDB {
SingleVerKVSyncer()30 SingleVerKVSyncer::SingleVerKVSyncer()
31 : autoSyncEnable_(false), triggerSyncTask_(true)
32 {
33 }
34
~SingleVerKVSyncer()35 SingleVerKVSyncer::~SingleVerKVSyncer()
36 {
37 }
38
EnableAutoSync(bool enable)39 void SingleVerKVSyncer::EnableAutoSync(bool enable)
40 {
41 LOGI("[Syncer] EnableAutoSync enable = %d, Label=%s", enable, label_.c_str());
42 if (autoSyncEnable_ == enable) {
43 return;
44 }
45
46 autoSyncEnable_ = enable;
47 if (!enable) {
48 return;
49 }
50
51 if (!initialized_) {
52 LOGI("[Syncer] Syncer has not Init");
53 return;
54 }
55
56 std::vector<std::string> devices;
57 GetOnlineDevices(devices);
58 if (devices.empty()) {
59 LOGI("[Syncer] EnableAutoSync no online devices");
60 return;
61 }
62 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
63 if (errCode != E_OK) {
64 LOGE("[Syncer] sync start by EnableAutoSync failed err %d", errCode);
65 }
66 }
67
68 // Local data changed callback
LocalDataChanged(int notifyEvent)69 void SingleVerKVSyncer::LocalDataChanged(int notifyEvent)
70 {
71 if (!initialized_) {
72 LOGE("[Syncer] Syncer has not Init");
73 return;
74 }
75
76 if (notifyEvent != static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT) &&
77 notifyEvent != static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT)) {
78 LOGD("[Syncer] ignore event:%d", notifyEvent);
79 return;
80 }
81 if (!triggerSyncTask_) {
82 LOGI("[Syncer] some sync task is scheduling");
83 return;
84 }
85 triggerSyncTask_ = false;
86 std::vector<std::string> devices;
87 GetOnlineDevices(devices);
88 if (devices.empty()) {
89 LOGI("[Syncer] LocalDataChanged no online standard devices, Label=%s", label_.c_str());
90 triggerSyncTask_ = true;
91 return;
92 }
93 ISyncEngine *engine = syncEngine_;
94 ISyncInterface *storage = syncInterface_;
95 RefObject::IncObjRef(engine);
96 storage->IncRefCount();
97 // To avoid many task were produced and waiting in the queue. For example, put value in a loop.
98 // It will consume thread pool resources, so other task will delay until these task finish.
99 // In extreme situation, 10 thread run the localDataChanged task and 1 task waiting in queue.
100 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, devices, engine, storage] {
101 triggerSyncTask_ = true;
102 if (!TryFullSync(devices)) {
103 TriggerSubQuerySync(devices);
104 }
105 RefObject::DecObjRef(engine);
106 storage->DecRefCount();
107 });
108 // if task schedule failed, but triggerSyncTask_ is not set to true, other thread may skip the schedule time
109 // when task schedule failed, it means unormal status, it is unable to schedule next time probably
110 // so it is ok if other thread skip the schedule if last task schedule failed
111 if (errCode != E_OK) {
112 triggerSyncTask_ = true;
113 LOGE("[TriggerSync] LocalDataChanged retCode:%d", errCode);
114 RefObject::DecObjRef(engine);
115 storage->DecRefCount();
116 }
117 }
118
119 // remote device online callback
RemoteDataChanged(const std::string & device)120 void SingleVerKVSyncer::RemoteDataChanged(const std::string &device)
121 {
122 LOGI("[SingleVerKVSyncer] device online dev %s", STR_MASK(device));
123 if (!initialized_) {
124 LOGE("[Syncer] Syncer has not Init");
125 return;
126 }
127 std::string userId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
128 std::string appId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
129 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
130 std::string subUserId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::SUB_USER, "");
131 RuntimeContext::GetInstance()->NotifyDatabaseStatusChange({userId, appId, storeId, subUserId, device}, true);
132 SingleVerSyncer::RemoteDataChanged(device);
133 if (autoSyncEnable_) {
134 RefObject::IncObjRef(syncEngine_);
135 syncInterface_->IncRefCount();
136 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, userId, appId, storeId, device] {
137 std::vector<std::string> devices;
138 devices.push_back(device);
139 int errCode = E_OK;
140 if (RuntimeContext::GetInstance()->IsNeedAutoSync(userId, appId, storeId, device)) {
141 errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
142 }
143 if (errCode != E_OK) {
144 LOGE("[SingleVerKVSyncer] sync start by RemoteDataChanged failed err %d", errCode);
145 }
146 RefObject::DecObjRef(syncEngine_);
147 syncInterface_->DecRefCount();
148 });
149 if (retCode != E_OK) {
150 LOGE("[AutoLaunch] RemoteDataChanged triggler sync retCode:%d", retCode);
151 RefObject::DecObjRef(syncEngine_);
152 syncInterface_->DecRefCount();
153 }
154 }
155 // db online again ,trigger subscribe
156 // if remote device online, subscribequery num is 0
157 std::vector<QuerySyncObject> syncQueries;
158 static_cast<SingleVerSyncEngine *>(syncEngine_)->GetLocalSubscribeQueries(device, syncQueries);
159 if (syncQueries.empty()) {
160 LOGI("no need to trigger auto subscribe");
161 return;
162 }
163 LOGI("[SingleVerKVSyncer] trigger local subscribe sync, queryNums=%zu", syncQueries.size());
164 for (const auto &query : syncQueries) {
165 TriggerSubscribe(device, query);
166 }
167 static_cast<SingleVerSyncEngine *>(syncEngine_)->PutUnfinishedSubQueries(device, syncQueries);
168 }
169
SyncConditionCheck(const SyncParam & param,const ISyncEngine * engine,ISyncInterface * storage) const170 int SingleVerKVSyncer::SyncConditionCheck(const SyncParam ¶m, const ISyncEngine *engine,
171 ISyncInterface *storage) const
172 {
173 if (!param.isQuerySync) {
174 return E_OK;
175 }
176 QuerySyncObject query = param.syncQuery;
177 int errCode = static_cast<SingleVerKvDBSyncInterface *>(storage)->CheckAndInitQueryCondition(query);
178 if (errCode != E_OK) {
179 LOGE("[SingleVerKVSyncer] QuerySyncObject check failed");
180 return errCode;
181 }
182 if (param.mode != SUBSCRIBE_QUERY) {
183 return E_OK;
184 }
185 if (query.HasLimit() || query.HasOrderBy()) {
186 LOGE("[SingleVerKVSyncer] subscribe query not support limit,offset or orderby");
187 return -E_NOT_SUPPORT;
188 }
189 if (param.devices.size() > MAX_DEVICES_NUM) {
190 LOGE("[SingleVerKVSyncer] devices is overlimit");
191 return -E_MAX_LIMITS;
192 }
193 return engine->SubscribeLimitCheck(param.devices, query);
194 }
195
TriggerSubscribe(const std::string & device,const QuerySyncObject & query)196 void SingleVerKVSyncer::TriggerSubscribe(const std::string &device, const QuerySyncObject &query)
197 {
198 if (!initialized_) { // LCOV_EXCL_BR_LINE
199 LOGE("[Syncer] Syncer has not Init");
200 return;
201 }
202 RefObject::IncObjRef(syncEngine_);
203 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device, query] {
204 std::vector<std::string> devices;
205 devices.push_back(device);
206 SyncParam param;
207 param.devices = devices;
208 param.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
209 param.onComplete = nullptr;
210 param.onFinalize = nullptr;
211 param.wait = false;
212 param.isQuerySync = true;
213 param.syncQuery = query;
214 int errCode = Sync(param);
215 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
216 LOGE("[SingleVerKVSyncer] subscribe start by RemoteDataChanged failed err %d", errCode);
217 }
218 RefObject::DecObjRef(syncEngine_);
219 });
220 if (retCode != E_OK) { // LCOV_EXCL_BR_LINE
221 LOGE("[Syncer] triggler query subscribe start failed err %d", retCode);
222 RefObject::DecObjRef(syncEngine_);
223 }
224 }
225
TryFullSync(const std::vector<std::string> & devices)226 bool SingleVerKVSyncer::TryFullSync(const std::vector<std::string> &devices)
227 {
228 if (!initialized_) {
229 LOGE("[Syncer] Syncer has not Init");
230 return true;
231 }
232 if (!autoSyncEnable_) {
233 LOGD("[Syncer] autoSync no enable");
234 return false;
235 }
236 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
237 if (errCode != E_OK) {
238 LOGE("[Syncer] sync start by RemoteDataChanged failed err %d", errCode);
239 return false;
240 }
241 return true;
242 }
243
TriggerSubQuerySync(const std::vector<std::string> & devices)244 void SingleVerKVSyncer::TriggerSubQuerySync(const std::vector<std::string> &devices)
245 {
246 if (!initialized_) {
247 LOGE("[Syncer] Syncer has not Init");
248 return;
249 }
250 std::shared_ptr<Metadata> metadata = nullptr;
251 ISyncInterface *syncInterface = nullptr;
252 {
253 std::lock_guard<std::mutex> lock(syncerLock_);
254 if (metadata_ == nullptr || syncInterface_ == nullptr) {
255 return;
256 }
257 metadata = metadata_;
258 syncInterface = syncInterface_;
259 syncInterface->IncRefCount();
260 }
261 int errCode;
262 for (auto &device : devices) {
263 std::vector<QuerySyncObject> queries;
264 static_cast<SingleVerSyncEngine *>(syncEngine_)->GetRemoteSubscribeQueries(device, queries);
265 for (auto &query : queries) {
266 std::string queryId = query.GetIdentify();
267 WaterMark queryWaterMark = 0;
268 uint64_t lastTimestamp = metadata->GetQueryLastTimestamp(device, queryId);
269 // Auto sync does not support multi-user sync, The userId param is set to "".
270 errCode = metadata->GetSendQueryWaterMark(queryId, device, "", queryWaterMark, false);
271 if (errCode != E_OK) {
272 LOGE("[Syncer] get queryId=%s,dev=%s watermark failed", STR_MASK(queryId), STR_MASK(device));
273 continue;
274 }
275 if (lastTimestamp < queryWaterMark || lastTimestamp == 0) {
276 continue;
277 }
278 LOGD("[Syncer] lastTime=%" PRIu64 " vs WaterMark=%" PRIu64 ",trigger queryId=%s,dev=%s", lastTimestamp,
279 queryWaterMark, STR_MASK(queryId), STR_MASK(device));
280 InternalSyncParma param;
281 std::vector<std::string> targetDevices;
282 targetDevices.push_back(device);
283 param.devices = targetDevices;
284 param.mode = SyncModeType::AUTO_PUSH;
285 param.isQuerySync = true;
286 param.syncQuery = query;
287 QueryAutoSync(param);
288 }
289 }
290 syncInterface->DecRefCount();
291 }
292
DumpSyncerBasicInfo()293 SyncerBasicInfo SingleVerKVSyncer::DumpSyncerBasicInfo()
294 {
295 SyncerBasicInfo basicInfo = GenericSyncer::DumpSyncerBasicInfo();
296 basicInfo.isAutoSync = autoSyncEnable_;
297 return basicInfo;
298 }
299
InitSyncEngine(DistributedDB::ISyncInterface * syncInterface)300 int SingleVerKVSyncer::InitSyncEngine(DistributedDB::ISyncInterface *syncInterface)
301 {
302 int errCode = GenericSyncer::InitSyncEngine(syncInterface);
303 if (errCode != E_OK) {
304 return errCode;
305 }
306 TriggerAddSubscribeAsync(syncInterface);
307 return E_OK;
308 }
309
TriggerAddSubscribeAsync(ISyncInterface * syncInterface)310 void SingleVerKVSyncer::TriggerAddSubscribeAsync(ISyncInterface *syncInterface)
311 {
312 if (syncInterface == nullptr || syncEngine_ == nullptr) {
313 return;
314 }
315 if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_SVD) {
316 return;
317 }
318 DBInfo dbInfo;
319 auto storage = static_cast<SyncGenericInterface *>(syncInterface);
320 storage->GetDBInfo(dbInfo);
321 std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
322 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
323 if (subscribeQuery.empty()) {
324 LOGD("[SingleVerKVSyncer][TriggerAddSubscribeAsync] Subscribe cache is empty");
325 return;
326 }
327 storage->IncRefCount();
328 ISyncEngine *engine = syncEngine_;
329 RefObject::IncObjRef(engine);
330 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, engine, storage, subscribeQuery]() {
331 engine->AddSubscribe(storage, subscribeQuery);
332 // try to trigger query sync after add trigger
333 LocalDataChanged(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT));
334 RefObject::DecObjRef(engine);
335 storage->DecRefCount();
336 });
337 if (errCode != E_OK) {
338 LOGW("[SingleVerKVSyncer] TriggerAddSubscribeAsync failed errCode = %d", errCode);
339 syncInterface->DecRefCount();
340 RefObject::DecObjRef(engine);
341 }
342 }
343 } // namespace DistributedDB
344