1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_kv_syncer.h"
17
18 #include <functional>
19 #include <map>
20 #include <mutex>
21
22 #include "db_common.h"
23 #include "ikvdb_sync_interface.h"
24 #include "log_print.h"
25 #include "meta_data.h"
26 #include "single_ver_sync_engine.h"
27 #include "sqlite_single_ver_natural_store.h"
28
29 namespace DistributedDB {
SingleVerKVSyncer()30 SingleVerKVSyncer::SingleVerKVSyncer()
31 : autoSyncEnable_(false), triggerSyncTask_(true)
32 {
33 }
34
~SingleVerKVSyncer()35 SingleVerKVSyncer::~SingleVerKVSyncer()
36 {
37 }
38
EnableAutoSync(bool enable)39 void SingleVerKVSyncer::EnableAutoSync(bool enable)
40 {
41 LOGI("[Syncer] EnableAutoSync enable = %d, Label=%s", enable, label_.c_str());
42 if (autoSyncEnable_ == enable) {
43 return;
44 }
45
46 autoSyncEnable_ = enable;
47 if (!enable) {
48 return;
49 }
50
51 if (!initialized_) {
52 LOGI("[Syncer] Syncer has not Init");
53 return;
54 }
55
56 std::vector<std::string> devices;
57 GetOnlineDevices(devices);
58 if (devices.empty()) {
59 LOGI("[Syncer] EnableAutoSync no online devices");
60 return;
61 }
62 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
63 if (errCode != E_OK) {
64 LOGE("[Syncer] sync start by EnableAutoSync failed err %d", errCode);
65 }
66 }
67
68 // Local data changed callback
LocalDataChanged(int notifyEvent)69 void SingleVerKVSyncer::LocalDataChanged(int notifyEvent)
70 {
71 if (!initialized_) {
72 LOGE("[Syncer] Syncer has not Init");
73 return;
74 }
75
76 if (notifyEvent != static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT) &&
77 notifyEvent != static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT)) {
78 LOGD("[Syncer] ignore event:%d", notifyEvent);
79 return;
80 }
81 if (!triggerSyncTask_) {
82 LOGI("[Syncer] some sync task is scheduling");
83 return;
84 }
85 triggerSyncTask_ = false;
86 std::vector<std::string> devices;
87 GetOnlineDevices(devices);
88 if (devices.empty()) {
89 LOGI("[Syncer] LocalDataChanged no online standard devices, Label=%s", label_.c_str());
90 triggerSyncTask_ = true;
91 return;
92 }
93 RefObject::IncObjRef(syncEngine_);
94 // To avoid many task were produced and waiting in the queue. For example, put value in a loop.
95 // It will consume thread pool resources, so other task will delay until these task finish.
96 // In extreme situation, 10 thread run the localDataChanged task and 1 task waiting in queue.
97 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, devices] {
98 triggerSyncTask_ = true;
99 if (!TryFullSync(devices)) {
100 TriggerSubQuerySync(devices);
101 }
102 RefObject::DecObjRef(syncEngine_);
103 });
104 // if task schedule failed, but triggerSyncTask_ is not set to true, other thread may skip the schedule time
105 // when task schedule failed, it means unormal status, it is unable to schedule next time probably
106 // so it is ok if other thread skip the schedule if last task schedule failed
107 if (errCode != E_OK) {
108 triggerSyncTask_ = true;
109 LOGE("[TriggerSync] LocalDataChanged retCode:%d", errCode);
110 RefObject::DecObjRef(syncEngine_);
111 }
112 return;
113 }
114
115 // remote device online callback
RemoteDataChanged(const std::string & device)116 void SingleVerKVSyncer::RemoteDataChanged(const std::string &device)
117 {
118 LOGI("[SingleVerKVSyncer] device online dev %s", STR_MASK(device));
119 if (!initialized_) {
120 LOGE("[Syncer] Syncer has not Init");
121 return;
122 }
123 std::string userId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
124 std::string appId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
125 std::string storeId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
126 RuntimeContext::GetInstance()->NotifyDatabaseStatusChange(userId, appId, storeId, device, true);
127 SingleVerSyncer::RemoteDataChanged(device);
128 if (autoSyncEnable_) {
129 RefObject::IncObjRef(syncEngine_);
130 syncInterface_->IncRefCount();
131 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device] {
132 std::vector<std::string> devices;
133 devices.push_back(device);
134 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
135 if (errCode != E_OK) {
136 LOGE("[SingleVerKVSyncer] sync start by RemoteDataChanged failed err %d", errCode);
137 }
138 RefObject::DecObjRef(syncEngine_);
139 syncInterface_->DecRefCount();
140 });
141 if (retCode != E_OK) {
142 LOGE("[AutoLaunch] RemoteDataChanged triggler sync retCode:%d", retCode);
143 RefObject::DecObjRef(syncEngine_);
144 syncInterface_->DecRefCount();
145 }
146 }
147 // db online again ,trigger subscribe
148 // if remote device online, subscribequery num is 0
149 std::vector<QuerySyncObject> syncQueries;
150 static_cast<SingleVerSyncEngine *>(syncEngine_)->GetLocalSubscribeQueries(device, syncQueries);
151 if (syncQueries.empty()) {
152 LOGI("no need to trigger auto subscribe");
153 return;
154 }
155 LOGI("[SingleVerKVSyncer] trigger local subscribe sync, queryNums=%zu", syncQueries.size());
156 for (const auto &query : syncQueries) {
157 TriggerSubscribe(device, query);
158 }
159 static_cast<SingleVerSyncEngine *>(syncEngine_)->PutUnfinishedSubQueries(device, syncQueries);
160 }
161
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const162 int SingleVerKVSyncer::SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine,
163 ISyncInterface *storage) const
164 {
165 if (!param.isQuerySync) {
166 return E_OK;
167 }
168 QuerySyncObject query = param.syncQuery;
169 int errCode = static_cast<SingleVerKvDBSyncInterface *>(storage)->CheckAndInitQueryCondition(query);
170 if (errCode != E_OK) {
171 LOGE("[SingleVerKVSyncer] QuerySyncObject check failed");
172 return errCode;
173 }
174 if (param.mode != SUBSCRIBE_QUERY) {
175 return E_OK;
176 }
177 if (query.HasLimit() || query.HasOrderBy()) {
178 LOGE("[SingleVerKVSyncer] subscribe query not support limit,offset or orderby");
179 return -E_NOT_SUPPORT;
180 }
181 if (param.devices.size() > MAX_DEVICES_NUM) {
182 LOGE("[SingleVerKVSyncer] devices is overlimit");
183 return -E_MAX_LIMITS;
184 }
185 return engine->SubscribeLimitCheck(param.devices, query);
186 }
187
TriggerSubscribe(const std::string & device,const QuerySyncObject & query)188 void SingleVerKVSyncer::TriggerSubscribe(const std::string &device, const QuerySyncObject &query)
189 {
190 if (!initialized_) {
191 LOGE("[Syncer] Syncer has not Init");
192 return;
193 }
194 RefObject::IncObjRef(syncEngine_);
195 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device, query] {
196 std::vector<std::string> devices;
197 devices.push_back(device);
198 SyncParma param;
199 param.devices = devices;
200 param.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
201 param.onComplete = nullptr;
202 param.onFinalize = nullptr;
203 param.wait = false;
204 param.isQuerySync = true;
205 param.syncQuery = query;
206 int errCode = Sync(param);
207 if (errCode != E_OK) {
208 LOGE("[SingleVerKVSyncer] subscribe start by RemoteDataChanged failed err %d", errCode);
209 }
210 RefObject::DecObjRef(syncEngine_);
211 });
212 if (retCode != E_OK) {
213 LOGE("[Syncer] triggler query subscribe start failed err %d", retCode);
214 RefObject::DecObjRef(syncEngine_);
215 }
216 }
217
TryFullSync(const std::vector<std::string> & devices)218 bool SingleVerKVSyncer::TryFullSync(const std::vector<std::string> &devices)
219 {
220 if (!initialized_) {
221 LOGE("[Syncer] Syncer has not Init");
222 return true;
223 }
224 if (!autoSyncEnable_) {
225 LOGD("[Syncer] autoSync no enable");
226 return false;
227 }
228 int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
229 if (errCode != E_OK) {
230 LOGE("[Syncer] sync start by RemoteDataChanged failed err %d", errCode);
231 return false;
232 }
233 return true;
234 }
235
TriggerSubQuerySync(const std::vector<std::string> & devices)236 void SingleVerKVSyncer::TriggerSubQuerySync(const std::vector<std::string> &devices)
237 {
238 if (!initialized_) {
239 LOGE("[Syncer] Syncer has not Init");
240 return;
241 }
242 std::shared_ptr<Metadata> metadata = nullptr;
243 ISyncInterface *syncInterface = nullptr;
244 {
245 std::lock_guard<std::mutex> lock(syncerLock_);
246 if (metadata_ == nullptr || syncInterface_ == nullptr) {
247 return;
248 }
249 metadata = metadata_;
250 syncInterface = syncInterface_;
251 syncInterface->IncRefCount();
252 }
253 int errCode;
254 for (auto &device : devices) {
255 std::vector<QuerySyncObject> queries;
256 static_cast<SingleVerSyncEngine *>(syncEngine_)->GetRemoteSubscribeQueries(device, queries);
257 for (auto &query : queries) {
258 std::string queryId = query.GetIdentify();
259 WaterMark queryWaterMark = 0;
260 uint64_t lastTimestamp = metadata->GetQueryLastTimestamp(device, queryId);
261 errCode = metadata->GetSendQueryWaterMark(queryId, device, queryWaterMark, false);
262 if (errCode != E_OK) {
263 LOGE("[Syncer] get queryId=%s,dev=%s watermark failed", STR_MASK(queryId), STR_MASK(device));
264 continue;
265 }
266 if (lastTimestamp < queryWaterMark || lastTimestamp == 0) {
267 continue;
268 }
269 LOGD("[Syncer] lastTime=%" PRIu64 " vs WaterMark=%" PRIu64 ",trigger queryId=%s,dev=%s", lastTimestamp,
270 queryWaterMark, STR_MASK(queryId), STR_MASK(device));
271 InternalSyncParma param;
272 std::vector<std::string> targetDevices;
273 targetDevices.push_back(device);
274 param.devices = targetDevices;
275 param.mode = SyncModeType::AUTO_PUSH;
276 param.isQuerySync = true;
277 param.syncQuery = query;
278 QueryAutoSync(param);
279 }
280 }
281 syncInterface->DecRefCount();
282 }
283
DumpSyncerBasicInfo()284 SyncerBasicInfo SingleVerKVSyncer::DumpSyncerBasicInfo()
285 {
286 SyncerBasicInfo basicInfo = GenericSyncer::DumpSyncerBasicInfo();
287 basicInfo.isAutoSync = autoSyncEnable_;
288 return basicInfo;
289 }
290 } // namespace DistributedDB
291