1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "subscribe_manager.h"
16 #include "db_common.h"
17 #include "sync_types.h"
18
19 namespace DistributedDB {
ClearRemoteSubscribeQuery(const std::string & device)20 void SubscribeManager::ClearRemoteSubscribeQuery(const std::string &device)
21 {
22 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
23 ClearSubscribeQuery(device, remoteSubscribedMap_, remoteSubscribedTotalMap_);
24 }
25
ClearAllRemoteQuery()26 void SubscribeManager::ClearAllRemoteQuery()
27 {
28 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
29 remoteSubscribedMap_.clear();
30 remoteSubscribedTotalMap_.clear();
31 }
32
ClearLocalSubscribeQuery(const std::string & device)33 void SubscribeManager::ClearLocalSubscribeQuery(const std::string &device)
34 {
35 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
36 unFinishedLocalAutoSubMap_.erase(device);
37 ClearSubscribeQuery(device, localSubscribeMap_, localSubscribeTotalMap_);
38 }
39
ReserveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)40 int SubscribeManager::ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
41 {
42 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
43 int errCode = ReserveSubscribeQuery(device, query, remoteSubscribedMap_, remoteSubscribedTotalMap_);
44 LOGI("[SubscribeManager] dev=%s,queryId=%s remote reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
45 errCode);
46 return errCode;
47 }
48
ActiveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)49 int SubscribeManager::ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
50 {
51 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
52 std::string queryId = query.GetIdentify();
53 int errCode = ActiveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
54 LOGI("[SubscribeManager] dev=%s,queryId=%s remote active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
55 return errCode;
56 }
57
ReserveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)58 int SubscribeManager::ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
59 {
60 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
61 int errCode = ReserveSubscribeQuery(device, query, localSubscribeMap_, localSubscribeTotalMap_);
62 LOGI("[SubscribeManager] dev=%s,queryId=%s local reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
63 errCode);
64 return errCode;
65 }
66
ActiveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)67 int SubscribeManager::ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
68 {
69 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
70 std::string queryId = query.GetIdentify();
71 int errCode = ActiveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
72 LOGI("[SubscribeManager] dev=%s,queryId=%s local active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
73 if (errCode != E_OK) {
74 return errCode;
75 }
76 if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
77 unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
78 unFinishedLocalAutoSubMap_[device].erase(queryId);
79 }
80 return errCode;
81 }
82
DeleteLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)83 void SubscribeManager::DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
84 {
85 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
86 std::string queryId = query.GetIdentify();
87 DeleteSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
88 }
89
DeleteRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)90 void SubscribeManager::DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
91 {
92 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
93 std::string queryId = query.GetIdentify();
94 DeleteSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
95 }
96
PutLocalUnFiniedSubQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)97 void SubscribeManager::PutLocalUnFiniedSubQueries(const std::string &device,
98 std::vector<QuerySyncObject> &subscribeQueries)
99 {
100 LOGI("[SubscribeManager] put local unfinished subscribe queries, nums=%d", subscribeQueries.size());
101 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
102 if (subscribeQueries.size() == 0) {
103 unFinishedLocalAutoSubMap_.erase(device);
104 return;
105 }
106 unFinishedLocalAutoSubMap_[device].clear();
107 auto iter = unFinishedLocalAutoSubMap_.find(device);
108 for (const auto &query : subscribeQueries) {
109 iter->second.insert(query.GetIdentify());
110 }
111 }
112
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries) const113 void SubscribeManager::GetAllUnFinishSubQueries(
114 std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const
115 {
116 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
117 for (auto &item : unFinishedLocalAutoSubMap_) {
118 if (item.second.size() == 0) {
119 continue;
120 }
121 allSyncQueries[item.first] = {};
122 auto iter = allSyncQueries.find(item.first);
123 for (const auto &queryId : item.second) {
124 auto iterTmp = localSubscribeTotalMap_.find(queryId);
125 if (iterTmp == localSubscribeTotalMap_.end()) {
126 LOGI("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryId));
127 continue;
128 }
129 iter->second.push_back(iterTmp->second.first);
130 }
131 }
132 }
133
RemoveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)134 void SubscribeManager::RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
135 {
136 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
137 std::string queryId = query.GetIdentify();
138 RemoveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
139 }
140
RemoveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)141 void SubscribeManager::RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
142 {
143 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
144 std::string queryId = query.GetIdentify();
145 RemoveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
146 if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
147 unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
148 unFinishedLocalAutoSubMap_[device].erase(queryId);
149 LOGI("[SubscribeManager] dev=%s,queryId=%s delete from UnFinishedMap", STR_MASK(device), STR_MASK(queryId));
150 if (unFinishedLocalAutoSubMap_[device].size() == 0) {
151 LOGI("[SubscribeManager] dev=%s delete from unFinish map", STR_MASK(device));
152 unFinishedLocalAutoSubMap_.erase(device);
153 }
154 }
155 }
156
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const157 void SubscribeManager::GetLocalSubscribeQueries(const std::string &device,
158 std::vector<QuerySyncObject> &subscribeQueries) const
159 {
160 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
161 GetSubscribeQueries(device, localSubscribeMap_, localSubscribeTotalMap_, subscribeQueries);
162 }
163
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const164 void SubscribeManager::GetRemoteSubscribeQueries(const std::string &device,
165 std::vector<QuerySyncObject> &subscribeQueries) const
166 {
167 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
168 GetSubscribeQueries(device, remoteSubscribedMap_, remoteSubscribedTotalMap_, subscribeQueries);
169 }
170
IsRemoteContainSubscribe(const std::string & device,const QuerySyncObject & query) const171 bool SubscribeManager::IsRemoteContainSubscribe(const std::string &device, const QuerySyncObject &query) const
172 {
173 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
174 auto iter = remoteSubscribedMap_.find(device);
175 if (iter == remoteSubscribedMap_.end()) {
176 LOGD("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
177 return false;
178 }
179 std::string queryId = query.GetIdentify();
180 auto subIter = iter->second.find(queryId);
181 if (subIter == iter->second.end()) {
182 LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryId));
183 return false;
184 }
185 return true;
186 }
187
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds) const188 void SubscribeManager::GetRemoteSubscribeQueryIds(const std::string &device,
189 std::vector<std::string> &subscribeQueryIds) const
190 {
191 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
192 auto iter = remoteSubscribedMap_.find(device);
193 if (iter == remoteSubscribedMap_.end()) {
194 LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
195 return;
196 }
197 for (const auto &queryInfo : iter->second) {
198 if (remoteSubscribedTotalMap_.find(queryInfo.first) == remoteSubscribedTotalMap_.end()) {
199 LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryInfo.first));
200 continue;
201 }
202 subscribeQueryIds.push_back(queryInfo.first);
203 }
204 }
205
LocalSubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const206 int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
207 {
208 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
209 size_t devNum = localSubscribeMap_.size();
210 for (const auto &device : devices) {
211 if (localSubscribeMap_.find(device) != localSubscribeMap_.end()) {
212 continue;
213 }
214 devNum++;
215 if (devNum > MAX_DEVICES_NUM) {
216 LOGE("[SubscribeManager] local subscribe devices is over limit");
217 return -E_MAX_LIMITS;
218 }
219 }
220 std::string queryId = query.GetIdentify();
221 auto allIter = localSubscribeTotalMap_.find(queryId);
222 if (allIter == localSubscribeTotalMap_.end() && localSubscribeTotalMap_.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
223 LOGE("[SubscribeManager] all local subscribe sums is over limit");
224 return -E_MAX_LIMITS;
225 }
226 return E_OK;
227 }
228
ClearSubscribeQuery(const std::string & device,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)229 void SubscribeManager::ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap,
230 SubscribedTotalMap &subscribedTotalMap)
231 {
232 if (subscribeMap.find(device) == subscribeMap.end()) {
233 LOGI("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
234 return;
235 }
236 for (const auto &queryInfo : subscribeMap[device]) {
237 if (subscribedTotalMap.find(queryInfo.first) != subscribedTotalMap.end()) {
238 if (subscribedTotalMap[queryInfo.first].second > 0) {
239 subscribedTotalMap[queryInfo.first].second--;
240 }
241 if (subscribedTotalMap[queryInfo.first].second == 0) {
242 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryInfo.first));
243 subscribedTotalMap.erase(queryInfo.first);
244 }
245 }
246 }
247 subscribeMap.erase(device);
248 LOGI("[SubscribeManager] clear dev=%s remote subscribe queies finished", STR_MASK(device));
249 }
250
ReserveSubscribeQuery(const std::string & device,const QuerySyncObject & query,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)251 int SubscribeManager::ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query,
252 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
253 {
254 std::string queryId = query.GetIdentify();
255 auto iter = subscribeMap.find(device);
256 auto allIter = subscribedTotalMap.find(queryId);
257 // limit check
258 if (allIter == subscribedTotalMap.end() && subscribedTotalMap.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
259 LOGE("[SubscribeManager] all subscribe sums is over limit");
260 return -E_MAX_LIMITS;
261 }
262 if (iter == subscribeMap.end() && subscribeMap.size() >= MAX_DEVICES_NUM) {
263 LOGE("[SubscribeManager] subscribe devices is over limit");
264 return -E_MAX_LIMITS;
265 }
266 if (iter != subscribeMap.end() && iter->second.find(queryId) == iter->second.end() &&
267 iter->second.size() >= MAX_SUBSCRIBE_NUM_PER_DEV) {
268 LOGE("[SubscribeManager] subscribe sums is over limit");
269 return -E_MAX_LIMITS;
270 }
271 if (iter != subscribeMap.end() && iter->second.find(queryId) != iter->second.end() &&
272 iter->second[queryId] == SubscribeStatus::ACTIVE) {
273 LOGE("[SubscribeManager] dev=%s,queryId=%s already active in map", STR_MASK(device), STR_MASK(queryId));
274 return E_OK;
275 }
276
277 if (iter == subscribeMap.end()) {
278 subscribeMap[device] = std::map<std::string, SubscribeStatus> {};
279 }
280 bool isNeedInc = false;
281 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
282 subscribeMap[device][queryId] = SubscribeStatus::NOT_ACTIVE;
283 isNeedInc = true;
284 }
285 if (allIter == subscribedTotalMap.end()) {
286 subscribedTotalMap[queryId] = {query, 1};
287 } else if (isNeedInc) {
288 subscribedTotalMap[queryId].second++;
289 }
290 return E_OK;
291 }
292
ActiveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)293 int SubscribeManager::ActiveSubscribeQuery(const std::string &device, const std::string &queryId,
294 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
295 {
296 if (subscribedTotalMap.find(queryId) == subscribedTotalMap.end()) {
297 LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
298 return -E_INTERNAL_ERROR;
299 }
300 if (subscribeMap.find(device) == subscribeMap.end()) {
301 LOGE("[SubscribeManager] can not find dev=%s in localSubscribeMap", STR_MASK(device));
302 return -E_INTERNAL_ERROR;
303 }
304 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
305 LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
306 return -E_INTERNAL_ERROR;
307 }
308 subscribeMap[device][queryId] = SubscribeStatus::ACTIVE;
309 return E_OK;
310 }
311
DeleteSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)312 void SubscribeManager::DeleteSubscribeQuery(const std::string &device, const std::string &queryId,
313 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
314 {
315 if (subscribeMap.find(device) == subscribeMap.end()) {
316 LOGE("[SubscribeManager] can not find dev=%s in map", STR_MASK(device));
317 return;
318 }
319 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
320 LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
321 return;
322 }
323 SubscribeStatus queryStatus = subscribeMap[device][queryId];
324 // not permit to delete the query when something wrong this time,because it is subscribed successfully last time
325 if (queryStatus == SubscribeStatus::ACTIVE) {
326 LOGE("[SubscribeManager] dev=%s,queryId=%s is active, no need to del", STR_MASK(device), STR_MASK(queryId));
327 return;
328 }
329 subscribeMap[device].erase(queryId);
330 auto iter = subscribedTotalMap.find(queryId);
331 if (iter == subscribedTotalMap.end()) {
332 LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
333 return;
334 }
335 iter->second.second--;
336 if (iter->second.second <= 0) {
337 LOGI("[SubscribeManager] del queryId=%s from SubscribeTotalMap", STR_MASK(queryId));
338 subscribedTotalMap.erase(queryId);
339 }
340 LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribeMap success", STR_MASK(device), STR_MASK(queryId));
341 }
342
RemoveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)343 void SubscribeManager::RemoveSubscribeQuery(const std::string &device, const std::string &queryId,
344 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
345 {
346 auto iter = subscribeMap.find(device);
347 if (iter == subscribeMap.end()) {
348 LOGE("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
349 return;
350 }
351 if (iter->second.find(queryId) == subscribeMap[device].end()) {
352 LOGI("[SubscribeManager] dev=%s,queryId=%s not in SubscribedMap", STR_MASK(device), STR_MASK(queryId));
353 return;
354 }
355 iter->second.erase(queryId);
356 auto allIter = subscribedTotalMap.find(queryId);
357 if (allIter == subscribedTotalMap.end()) {
358 LOGI("[SubscribeManager] queryId=%s not in TotalMap", STR_MASK(queryId));
359 return;
360 }
361 allIter->second.second--;
362 if (allIter->second.second <= 0) {
363 subscribedTotalMap.erase(queryId);
364 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryId));
365 }
366 LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribedMap success", STR_MASK(device), STR_MASK(queryId));
367 }
368
GetSubscribeQueries(const std::string & device,const SubscribeMap & subscribeMap,const SubscribedTotalMap & subscribedTotalMap,std::vector<QuerySyncObject> & subscribeQueries) const369 void SubscribeManager::GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap,
370 const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const
371 {
372 auto iter = subscribeMap.find(device);
373 if (iter == subscribeMap.end()) {
374 LOGD("[SubscribeManager] dev=%s not in localSubscribeMap", STR_MASK(device));
375 return;
376 }
377 for (const auto &queryInfo : iter->second) {
378 auto iterTmp = subscribedTotalMap.find(queryInfo.first);
379 if (iterTmp == subscribedTotalMap.end()) {
380 LOGE("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryInfo.first));
381 continue;
382 }
383 subscribeQueries.push_back(iterTmp->second.first);
384 }
385 }
386 } // namespace DistributedDB