• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_kv_syncer.h"
17 
18 #include <functional>
19 #include <map>
20 #include <mutex>
21 
22 #include "db_common.h"
23 #include "ikvdb_sync_interface.h"
24 #include "log_print.h"
25 #include "meta_data.h"
26 #include "single_ver_sync_engine.h"
27 #include "sqlite_single_ver_natural_store.h"
28 
29 namespace DistributedDB {
SingleVerKVSyncer()30 SingleVerKVSyncer::SingleVerKVSyncer()
31     : autoSyncEnable_(false), triggerSyncTask_(true)
32 {
33 }
34 
~SingleVerKVSyncer()35 SingleVerKVSyncer::~SingleVerKVSyncer()
36 {
37 }
38 
EnableAutoSync(bool enable)39 void SingleVerKVSyncer::EnableAutoSync(bool enable)
40 {
41     LOGI("[Syncer] EnableAutoSync enable = %d, Label=%s", enable, label_.c_str());
42     if (autoSyncEnable_ == enable) {
43         return;
44     }
45 
46     autoSyncEnable_ = enable;
47     if (!enable) {
48         return;
49     }
50 
51     if (!initialized_) {
52         LOGE("[Syncer] Syncer has not Init");
53         return;
54     }
55 
56     std::vector<std::string> devices;
57     GetOnlineDevices(devices);
58     if (devices.empty()) {
59         LOGI("[Syncer] EnableAutoSync no online devices");
60         return;
61     }
62     int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
63     if (errCode != E_OK) {
64         LOGE("[Syncer] sync start by EnableAutoSync failed err %d", errCode);
65     }
66 }
67 
68 // Local data changed callback
LocalDataChanged(int notifyEvent)69 void SingleVerKVSyncer::LocalDataChanged(int notifyEvent)
70 {
71     if (!initialized_) {
72         LOGE("[Syncer] Syncer has not Init");
73         return;
74     }
75 
76     if (notifyEvent != SQLITE_GENERAL_FINISH_MIGRATE_EVENT &&
77         notifyEvent != SQLITE_GENERAL_NS_PUT_EVENT) {
78         LOGD("[Syncer] ignore event:%d", notifyEvent);
79         return;
80     }
81     if (!triggerSyncTask_) {
82         LOGI("[Syncer] some sync task is scheduling");
83         return;
84     }
85     triggerSyncTask_ = false;
86     RefObject::IncObjRef(syncEngine_);
87     // To avoid many task were produced and waiting in the queue. For example, put value in a loop.
88     // It will consume thread pool resources, so other task will delay until these task finish.
89     // In extreme situation, 10 thread run the localDataChanged task and 1 task waiting in queue.
90     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] {
91         triggerSyncTask_ = true;
92         std::vector<std::string> devices;
93         GetOnlineDevices(devices);
94         if (devices.empty()) {
95             LOGI("[Syncer] LocalDataChanged no online devices, Label=%s", label_.c_str());
96             RefObject::DecObjRef(syncEngine_);
97             return;
98         }
99         if (!TryFullSync(devices)) {
100             TriggerSubQuerySync(devices);
101         }
102         RefObject::DecObjRef(syncEngine_);
103     });
104     // if task schedule failed, but triggerSyncTask_ is not set to true, other thread may skip the schedule time
105     // when task schedule failed, it means unormal status, it is unable to schedule next time probably
106     // so it is ok if other thread skip the schedule if last task schedule failed
107     if (errCode != E_OK) {
108         triggerSyncTask_ = true;
109         LOGE("[TriggerSync] LocalDataChanged retCode:%d", errCode);
110         RefObject::DecObjRef(syncEngine_);
111     }
112     return;
113 }
114 
115 // remote device online callback
RemoteDataChanged(const std::string & device)116 void SingleVerKVSyncer::RemoteDataChanged(const std::string &device)
117 {
118     LOGI("[SingleVerKVSyncer] device online dev %s", STR_MASK(device));
119     if (!initialized_) {
120         LOGE("[Syncer] Syncer has not Init");
121         return;
122     }
123     std::string userId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
124     std::string appId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
125     std::string storeId = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
126     RuntimeContext::GetInstance()->NotifyDatabaseStatusChange(userId, appId, storeId, device, true);
127     SingleVerSyncer::RemoteDataChanged(device);
128     if (autoSyncEnable_) {
129         RefObject::IncObjRef(syncEngine_);
130         int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device] {
131             std::vector<std::string> devices;
132             devices.push_back(device);
133             int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
134             if (errCode != E_OK) {
135                 LOGE("[SingleVerKVSyncer] sync start by RemoteDataChanged failed err %d", errCode);
136             }
137             RefObject::DecObjRef(syncEngine_);
138         });
139         if (retCode != E_OK) {
140             LOGE("[AutoLaunch] RemoteDataChanged triggler sync retCode:%d", retCode);
141             RefObject::DecObjRef(syncEngine_);
142         }
143     }
144     // db online again ,trigger subscribe
145     // if remote device online, subscribequery num is 0
146     std::vector<QuerySyncObject> syncQueries;
147     static_cast<SingleVerSyncEngine *>(syncEngine_)->GetLocalSubscribeQueries(device, syncQueries);
148     if (syncQueries.size() == 0) {
149         LOGI("no need to trigger auto subscribe");
150         return;
151     }
152     LOGI("[SingleVerKVSyncer] trigger local subscribe sync, queryNums=%zu", syncQueries.size());
153     for (const auto &query : syncQueries) {
154         TriggerSubscribe(device, query);
155     }
156     static_cast<SingleVerSyncEngine *>(syncEngine_)->PutUnfiniedSubQueries(device, syncQueries);
157 }
158 
QueryAutoSync(const InternalSyncParma & param)159 void SingleVerKVSyncer::QueryAutoSync(const InternalSyncParma &param)
160 {
161     if (!initialized_) {
162         LOGE("[Syncer] Syncer has not Init");
163         return;
164     }
165     LOGI("[SingleVerKVSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
166     RefObject::IncObjRef(syncEngine_);
167     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param] {
168         int errCode = Sync(param);
169         if (errCode != E_OK) {
170             LOGE("[SingleVerKVSyncer] sync start by QueryAutoSync failed err %d", errCode);
171         }
172         RefObject::DecObjRef(syncEngine_);
173     });
174     if (retCode != E_OK) {
175         LOGE("[SingleVerKVSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
176         RefObject::DecObjRef(syncEngine_);
177     }
178 }
179 
SyncConditionCheck(QuerySyncObject & query,int mode,bool isQuerySync,const std::vector<std::string> & devices) const180 int SingleVerKVSyncer::SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync,
181     const std::vector<std::string> &devices) const
182 {
183     if (!isQuerySync) {
184         return E_OK;
185     }
186     int errCode = static_cast<SingleVerKvDBSyncInterface *>(syncInterface_)->CheckAndInitQueryCondition(query);
187     if (errCode != E_OK) {
188         LOGE("[SingleVerKVSyncer] QuerySyncObject check failed");
189         return errCode;
190     }
191     if (mode != SUBSCRIBE_QUERY) {
192         return E_OK;
193     }
194     if (query.HasLimit() || query.HasOrderBy()) {
195         LOGE("[SingleVerKVSyncer] subscribe query not support limit,offset or orderby");
196         return -E_NOT_SUPPORT;
197     }
198     if (devices.size() > MAX_DEVICES_NUM) {
199         LOGE("[SingleVerKVSyncer] devices is overlimit");
200         return -E_MAX_LIMITS;
201     }
202     return syncEngine_->SubscribeLimitCheck(devices, query);
203 }
204 
TriggerSubscribe(const std::string & device,const QuerySyncObject & query)205 void SingleVerKVSyncer::TriggerSubscribe(const std::string &device, const QuerySyncObject &query)
206 {
207     if (!initialized_) {
208         LOGE("[Syncer] Syncer has not Init");
209         return;
210     }
211     RefObject::IncObjRef(syncEngine_);
212     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device, query] {
213         std::vector<std::string> devices;
214         devices.push_back(device);
215         SyncParma param;
216         param.devices = devices;
217         param.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
218         param.onComplete = nullptr;
219         param.onFinalize = nullptr;
220         param.wait = false;
221         param.isQuerySync = true;
222         param.syncQuery = query;
223         int errCode = Sync(param);
224         if (errCode != E_OK) {
225             LOGE("[SingleVerKVSyncer] subscribe start by RemoteDataChanged failed err %d", errCode);
226         }
227         RefObject::DecObjRef(syncEngine_);
228     });
229     if (retCode != E_OK) {
230         LOGE("[Syncer] triggler query subscribe start failed err %d", retCode);
231         RefObject::DecObjRef(syncEngine_);
232     }
233 }
234 
TryFullSync(const std::vector<std::string> & devices)235 bool SingleVerKVSyncer::TryFullSync(const std::vector<std::string> &devices)
236 {
237     if (!initialized_) {
238         LOGE("[Syncer] Syncer has not Init");
239         return true;
240     }
241     if (!autoSyncEnable_) {
242         LOGD("[Syncer] autoSync no enable");
243         return false;
244     }
245     int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
246     if (errCode != E_OK) {
247         LOGE("[Syncer] sync start by RemoteDataChanged failed err %d", errCode);
248         return false;
249     }
250     return true;
251 }
252 
TriggerSubQuerySync(const std::vector<std::string> & devices)253 void SingleVerKVSyncer::TriggerSubQuerySync(const std::vector<std::string> &devices)
254 {
255     if (!initialized_) {
256         LOGE("[Syncer] Syncer has not Init");
257         return;
258     }
259     int errCode;
260     for (auto &device : devices) {
261         std::vector<QuerySyncObject> queries;
262         static_cast<SingleVerSyncEngine *>(syncEngine_)->GetRemoteSubscribeQueries(device, queries);
263         for (auto &query : queries) {
264             std::string queryId = query.GetIdentify();
265             uint64_t lastTimestamp = metadata_->GetQueryLastTimestamp(device, queryId);
266             WaterMark queryWaterMark = 0;
267             errCode = metadata_->GetSendQueryWaterMark(queryId, device, queryWaterMark, false);
268             if (errCode != E_OK) {
269                 LOGE("[Syncer] get queryId=%s,dev=%s watermark failed", STR_MASK(queryId), STR_MASK(device));
270                 continue;
271             }
272             if (lastTimestamp < queryWaterMark || lastTimestamp == 0) {
273                 continue;
274             }
275             LOGD("[Syncer] lastTime=%" PRIu64 " vs WaterMark=%" PRIu64 ",trigger queryId=%s,dev=%s", lastTimestamp,
276                 queryWaterMark, STR_MASK(queryId), STR_MASK(device));
277             InternalSyncParma param;
278             std::vector<std::string> targetDevices;
279             targetDevices.push_back(device);
280             param.devices = targetDevices;
281             param.mode = SyncModeType::AUTO_PUSH;
282             param.isQuerySync = true;
283             param.syncQuery = query;
284             QueryAutoSync(param);
285         }
286     }
287 }
288 
DumpSyncerBasicInfo()289 SyncerBasicInfo SingleVerKVSyncer::DumpSyncerBasicInfo()
290 {
291     SyncerBasicInfo basicInfo = GenericSyncer::DumpSyncerBasicInfo();
292     basicInfo.isAutoSync = autoSyncEnable_;
293     return basicInfo;
294 }
295 } // namespace DistributedDB
296