• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_kv_syncer.h"
17 
18 #include <functional>
19 #include <map>
20 #include <mutex>
21 
22 #include "db_common.h"
23 #include "ikvdb_sync_interface.h"
24 #include "log_print.h"
25 #include "meta_data.h"
26 #include "single_ver_sync_engine.h"
27 #include "sqlite_single_ver_natural_store.h"
28 
29 namespace DistributedDB {
SingleVerKVSyncer()30 SingleVerKVSyncer::SingleVerKVSyncer()
31     : autoSyncEnable_(false), triggerSyncTask_(true)
32 {
33 }
34 
~SingleVerKVSyncer()35 SingleVerKVSyncer::~SingleVerKVSyncer()
36 {
37 }
38 
EnableAutoSync(bool enable)39 void SingleVerKVSyncer::EnableAutoSync(bool enable)
40 {
41     LOGI("[Syncer] EnableAutoSync enable = %d, Label=%s", enable, label_.c_str());
42     if (autoSyncEnable_ == enable) {
43         return;
44     }
45 
46     autoSyncEnable_ = enable;
47     if (!enable) {
48         return;
49     }
50 
51     if (!initialized_) {
52         LOGI("[Syncer] Syncer has not Init");
53         return;
54     }
55 
56     std::vector<std::string> devices;
57     GetOnlineDevices(devices);
58     if (devices.empty()) {
59         LOGI("[Syncer] EnableAutoSync no online devices");
60         return;
61     }
62     int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
63     if (errCode != E_OK) {
64         LOGE("[Syncer] sync start by EnableAutoSync failed err %d", errCode);
65     }
66 }
67 
68 // Local data changed callback
LocalDataChanged(int notifyEvent)69 void SingleVerKVSyncer::LocalDataChanged(int notifyEvent)
70 {
71     if (!initialized_) {
72         LOGE("[Syncer] Syncer has not Init");
73         return;
74     }
75 
76     if (notifyEvent != static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT) &&
77         notifyEvent != static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT)) {
78         LOGD("[Syncer] ignore event:%d", notifyEvent);
79         return;
80     }
81     if (!triggerSyncTask_) {
82         LOGI("[Syncer] some sync task is scheduling");
83         return;
84     }
85     triggerSyncTask_ = false;
86     std::vector<std::string> devices;
87     GetOnlineDevices(devices);
88     if (devices.empty()) {
89         LOGI("[Syncer] LocalDataChanged no online standard devices, Label=%s", label_.c_str());
90         triggerSyncTask_ = true;
91         return;
92     }
93     ISyncEngine *engine = syncEngine_;
94     ISyncInterface *storage = syncInterface_;
95     RefObject::IncObjRef(engine);
96     storage->IncRefCount();
97     // To avoid many task were produced and waiting in the queue. For example, put value in a loop.
98     // It will consume thread pool resources, so other task will delay until these task finish.
99     // In extreme situation, 10 thread run the localDataChanged task and 1 task waiting in queue.
100     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, devices, engine, storage] {
101         triggerSyncTask_ = true;
102         if (!TryFullSync(devices)) {
103             TriggerSubQuerySync(devices);
104         }
105         RefObject::DecObjRef(engine);
106         storage->DecRefCount();
107     });
108     // if task schedule failed, but triggerSyncTask_ is not set to true, other thread may skip the schedule time
109     // when task schedule failed, it means unormal status, it is unable to schedule next time probably
110     // so it is ok if other thread skip the schedule if last task schedule failed
111     if (errCode != E_OK) {
112         triggerSyncTask_ = true;
113         LOGE("[TriggerSync] LocalDataChanged retCode:%d", errCode);
114         RefObject::DecObjRef(engine);
115         storage->DecRefCount();
116     }
117 }
118 
119 // remote device online callback
RemoteDataChanged(const std::string & device)120 void SingleVerKVSyncer::RemoteDataChanged(const std::string &device)
121 {
122     LOGI("[SingleVerKVSyncer] device online dev %s", STR_MASK(device));
123     if (!initialized_) {
124         LOGE("[Syncer] Syncer has not Init");
125         return;
126     }
127     std::string userId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
128     std::string appId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
129     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
130     std::string subUserId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::SUB_USER, "");
131     RuntimeContext::GetInstance()->NotifyDatabaseStatusChange({userId, appId, storeId, subUserId, device}, true);
132     SingleVerSyncer::RemoteDataChanged(device);
133     if (autoSyncEnable_) {
134         RefObject::IncObjRef(syncEngine_);
135         syncInterface_->IncRefCount();
136         int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, userId, appId, storeId, device] {
137             std::vector<std::string> devices;
138             devices.push_back(device);
139             int errCode = E_OK;
140             if (RuntimeContext::GetInstance()->IsNeedAutoSync(userId, appId, storeId, device)) {
141                 errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
142             }
143             if (errCode != E_OK) {
144                 LOGE("[SingleVerKVSyncer] sync start by RemoteDataChanged failed err %d", errCode);
145             }
146             RefObject::DecObjRef(syncEngine_);
147             syncInterface_->DecRefCount();
148         });
149         if (retCode != E_OK) {
150             LOGE("[AutoLaunch] RemoteDataChanged triggler sync retCode:%d", retCode);
151             RefObject::DecObjRef(syncEngine_);
152             syncInterface_->DecRefCount();
153         }
154     }
155     // db online again ,trigger subscribe
156     // if remote device online, subscribequery num is 0
157     std::vector<QuerySyncObject> syncQueries;
158     static_cast<SingleVerSyncEngine *>(syncEngine_)->GetLocalSubscribeQueries(device, syncQueries);
159     if (syncQueries.empty()) {
160         LOGI("no need to trigger auto subscribe");
161         return;
162     }
163     LOGI("[SingleVerKVSyncer] trigger local subscribe sync, queryNums=%zu", syncQueries.size());
164     for (const auto &query : syncQueries) {
165         TriggerSubscribe(device, query);
166     }
167     static_cast<SingleVerSyncEngine *>(syncEngine_)->PutUnfinishedSubQueries(device, syncQueries);
168 }
169 
SyncConditionCheck(const SyncParam & param,const ISyncEngine * engine,ISyncInterface * storage) const170 int SingleVerKVSyncer::SyncConditionCheck(const SyncParam &param, const ISyncEngine *engine,
171     ISyncInterface *storage) const
172 {
173     if (!param.isQuerySync) {
174         return E_OK;
175     }
176     QuerySyncObject query = param.syncQuery;
177     int errCode = static_cast<SingleVerKvDBSyncInterface *>(storage)->CheckAndInitQueryCondition(query);
178     if (errCode != E_OK) {
179         LOGE("[SingleVerKVSyncer] QuerySyncObject check failed");
180         return errCode;
181     }
182     if (param.mode != SUBSCRIBE_QUERY) {
183         return E_OK;
184     }
185     if (query.HasLimit() || query.HasOrderBy()) {
186         LOGE("[SingleVerKVSyncer] subscribe query not support limit,offset or orderby");
187         return -E_NOT_SUPPORT;
188     }
189     if (param.devices.size() > MAX_DEVICES_NUM) {
190         LOGE("[SingleVerKVSyncer] devices is overlimit");
191         return -E_MAX_LIMITS;
192     }
193     return engine->SubscribeLimitCheck(param.devices, query);
194 }
195 
TriggerSubscribe(const std::string & device,const QuerySyncObject & query)196 void SingleVerKVSyncer::TriggerSubscribe(const std::string &device, const QuerySyncObject &query)
197 {
198     if (!initialized_) { // LCOV_EXCL_BR_LINE
199         LOGE("[Syncer] Syncer has not Init");
200         return;
201     }
202     RefObject::IncObjRef(syncEngine_);
203     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device, query] {
204         std::vector<std::string> devices;
205         devices.push_back(device);
206         SyncParam param;
207         param.devices = devices;
208         param.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
209         param.onComplete = nullptr;
210         param.onFinalize = nullptr;
211         param.wait = false;
212         param.isQuerySync = true;
213         param.syncQuery = query;
214         int errCode = Sync(param);
215         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
216             LOGE("[SingleVerKVSyncer] subscribe start by RemoteDataChanged failed err %d", errCode);
217         }
218         RefObject::DecObjRef(syncEngine_);
219     });
220     if (retCode != E_OK) { // LCOV_EXCL_BR_LINE
221         LOGE("[Syncer] triggler query subscribe start failed err %d", retCode);
222         RefObject::DecObjRef(syncEngine_);
223     }
224 }
225 
TryFullSync(const std::vector<std::string> & devices)226 bool SingleVerKVSyncer::TryFullSync(const std::vector<std::string> &devices)
227 {
228     if (!initialized_) {
229         LOGE("[Syncer] Syncer has not Init");
230         return true;
231     }
232     if (!autoSyncEnable_) {
233         LOGD("[Syncer] autoSync no enable");
234         return false;
235     }
236     int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
237     if (errCode != E_OK) {
238         LOGE("[Syncer] sync start by RemoteDataChanged failed err %d", errCode);
239         return false;
240     }
241     return true;
242 }
243 
TriggerSubQuerySync(const std::vector<std::string> & devices)244 void SingleVerKVSyncer::TriggerSubQuerySync(const std::vector<std::string> &devices)
245 {
246     if (!initialized_) {
247         LOGE("[Syncer] Syncer has not Init");
248         return;
249     }
250     std::shared_ptr<Metadata> metadata = nullptr;
251     ISyncInterface *syncInterface = nullptr;
252     {
253         std::lock_guard<std::mutex> lock(syncerLock_);
254         if (metadata_ == nullptr || syncInterface_ == nullptr) {
255             return;
256         }
257         metadata = metadata_;
258         syncInterface = syncInterface_;
259         syncInterface->IncRefCount();
260     }
261     int errCode;
262     for (auto &device : devices) {
263         std::vector<QuerySyncObject> queries;
264         static_cast<SingleVerSyncEngine *>(syncEngine_)->GetRemoteSubscribeQueries(device, queries);
265         for (auto &query : queries) {
266             std::string queryId = query.GetIdentify();
267             WaterMark queryWaterMark = 0;
268             uint64_t lastTimestamp = metadata->GetQueryLastTimestamp(device, queryId);
269             // Auto sync does not support multi-user sync, The userId param is set to "".
270             errCode = metadata->GetSendQueryWaterMark(queryId, device, "", queryWaterMark, false);
271             if (errCode != E_OK) {
272                 LOGE("[Syncer] get queryId=%s,dev=%s watermark failed", STR_MASK(queryId), STR_MASK(device));
273                 continue;
274             }
275             if (lastTimestamp < queryWaterMark || lastTimestamp == 0) {
276                 continue;
277             }
278             LOGD("[Syncer] lastTime=%" PRIu64 " vs WaterMark=%" PRIu64 ",trigger queryId=%s,dev=%s", lastTimestamp,
279                 queryWaterMark, STR_MASK(queryId), STR_MASK(device));
280             InternalSyncParma param;
281             std::vector<std::string> targetDevices;
282             targetDevices.push_back(device);
283             param.devices = targetDevices;
284             param.mode = SyncModeType::AUTO_PUSH;
285             param.isQuerySync = true;
286             param.syncQuery = query;
287             QueryAutoSync(param);
288         }
289     }
290     syncInterface->DecRefCount();
291 }
292 
DumpSyncerBasicInfo()293 SyncerBasicInfo SingleVerKVSyncer::DumpSyncerBasicInfo()
294 {
295     SyncerBasicInfo basicInfo = GenericSyncer::DumpSyncerBasicInfo();
296     basicInfo.isAutoSync = autoSyncEnable_;
297     return basicInfo;
298 }
299 
InitSyncEngine(DistributedDB::ISyncInterface * syncInterface)300 int SingleVerKVSyncer::InitSyncEngine(DistributedDB::ISyncInterface *syncInterface)
301 {
302     int errCode = GenericSyncer::InitSyncEngine(syncInterface);
303     if (errCode != E_OK) {
304         return errCode;
305     }
306     TriggerAddSubscribeAsync(syncInterface);
307     return E_OK;
308 }
309 
TriggerAddSubscribeAsync(ISyncInterface * syncInterface)310 void SingleVerKVSyncer::TriggerAddSubscribeAsync(ISyncInterface *syncInterface)
311 {
312     if (syncInterface == nullptr || syncEngine_ == nullptr) {
313         return;
314     }
315     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_SVD) {
316         return;
317     }
318     DBInfo dbInfo;
319     auto storage = static_cast<SyncGenericInterface *>(syncInterface);
320     storage->GetDBInfo(dbInfo);
321     std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
322     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
323     if (subscribeQuery.empty()) {
324         LOGD("[SingleVerKVSyncer][TriggerAddSubscribeAsync] Subscribe cache is empty");
325         return;
326     }
327     storage->IncRefCount();
328     ISyncEngine *engine = syncEngine_;
329     RefObject::IncObjRef(engine);
330     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, engine, storage, subscribeQuery]() {
331         engine->AddSubscribe(storage, subscribeQuery);
332         // try to trigger query sync after add trigger
333         LocalDataChanged(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT));
334         RefObject::DecObjRef(engine);
335         storage->DecRefCount();
336     });
337     if (errCode != E_OK) {
338         LOGW("[SingleVerKVSyncer] TriggerAddSubscribeAsync failed errCode = %d", errCode);
339         syncInterface->DecRefCount();
340         RefObject::DecObjRef(engine);
341     }
342 }
343 } // namespace DistributedDB
344