1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "subscribe_manager.h"
16
17 #include <mutex>
18 #include "db_common.h"
19 #include "sync_types.h"
20
21 namespace DistributedDB {
ClearRemoteSubscribeQuery(const std::string & device)22 void SubscribeManager::ClearRemoteSubscribeQuery(const std::string &device)
23 {
24 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
25 ClearSubscribeQuery(device, remoteSubscribedMap_, remoteSubscribedTotalMap_);
26 }
27
ClearAllRemoteQuery()28 void SubscribeManager::ClearAllRemoteQuery()
29 {
30 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
31 remoteSubscribedMap_.clear();
32 remoteSubscribedTotalMap_.clear();
33 }
34
ClearLocalSubscribeQuery(const std::string & device)35 void SubscribeManager::ClearLocalSubscribeQuery(const std::string &device)
36 {
37 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
38 unFinishedLocalAutoSubMap_.erase(device);
39 ClearSubscribeQuery(device, localSubscribeMap_, localSubscribeTotalMap_);
40 }
41
ReserveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)42 int SubscribeManager::ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
43 {
44 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
45 int errCode = ReserveSubscribeQuery(device, query, remoteSubscribedMap_, remoteSubscribedTotalMap_);
46 LOGI("[SubscribeManager] dev=%s,queryId=%s remote reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
47 errCode);
48 return errCode;
49 }
50
ActiveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)51 int SubscribeManager::ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
52 {
53 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
54 std::string queryId = query.GetIdentify();
55 int errCode = ActiveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
56 LOGI("[SubscribeManager] dev=%s,queryId=%s remote active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
57 return errCode;
58 }
59
ReserveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)60 int SubscribeManager::ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
61 {
62 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
63 int errCode = ReserveSubscribeQuery(device, query, localSubscribeMap_, localSubscribeTotalMap_);
64 LOGI("[SubscribeManager] dev=%s,queryId=%s local reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
65 errCode);
66 return errCode;
67 }
68
ActiveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)69 int SubscribeManager::ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
70 {
71 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
72 std::string queryId = query.GetIdentify();
73 int errCode = ActiveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
74 LOGI("[SubscribeManager] dev=%s,queryId=%s local active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
75 if (errCode != E_OK) {
76 return errCode;
77 }
78 if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
79 unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
80 unFinishedLocalAutoSubMap_[device].erase(queryId);
81 }
82 return errCode;
83 }
84
DeleteLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)85 void SubscribeManager::DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
86 {
87 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
88 std::string queryId = query.GetIdentify();
89 DeleteSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
90 }
91
DeleteRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)92 void SubscribeManager::DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
93 {
94 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
95 std::string queryId = query.GetIdentify();
96 DeleteSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
97 }
98
PutLocalUnFiniedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)99 void SubscribeManager::PutLocalUnFiniedSubQueries(const std::string &device,
100 const std::vector<QuerySyncObject> &subscribeQueries)
101 {
102 LOGI("[SubscribeManager] put local unfinished subscribe queries, nums=%zu", subscribeQueries.size());
103 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
104 if (subscribeQueries.size() == 0) {
105 unFinishedLocalAutoSubMap_.erase(device);
106 return;
107 }
108 unFinishedLocalAutoSubMap_[device].clear();
109 auto iter = unFinishedLocalAutoSubMap_.find(device);
110 for (const auto &query : subscribeQueries) {
111 iter->second.insert(query.GetIdentify());
112 }
113 }
114
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries) const115 void SubscribeManager::GetAllUnFinishSubQueries(
116 std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const
117 {
118 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
119 for (auto &item : unFinishedLocalAutoSubMap_) {
120 if (item.second.size() == 0) {
121 continue;
122 }
123 allSyncQueries[item.first] = {};
124 auto iter = allSyncQueries.find(item.first);
125 for (const auto &queryId : item.second) {
126 auto iterTmp = localSubscribeTotalMap_.find(queryId);
127 if (iterTmp == localSubscribeTotalMap_.end()) {
128 LOGI("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryId));
129 continue;
130 }
131 iter->second.push_back(iterTmp->second.first);
132 }
133 }
134 }
135
RemoveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)136 void SubscribeManager::RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
137 {
138 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
139 std::string queryId = query.GetIdentify();
140 RemoveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
141 }
142
RemoveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)143 void SubscribeManager::RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
144 {
145 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
146 std::string queryId = query.GetIdentify();
147 RemoveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
148 if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
149 unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
150 unFinishedLocalAutoSubMap_[device].erase(queryId);
151 LOGI("[SubscribeManager] dev=%s,queryId=%s delete from UnFinishedMap", STR_MASK(device), STR_MASK(queryId));
152 if (unFinishedLocalAutoSubMap_[device].size() == 0) {
153 LOGI("[SubscribeManager] dev=%s delete from unFinish map", STR_MASK(device));
154 unFinishedLocalAutoSubMap_.erase(device);
155 }
156 }
157 }
158
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const159 void SubscribeManager::GetLocalSubscribeQueries(const std::string &device,
160 std::vector<QuerySyncObject> &subscribeQueries) const
161 {
162 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
163 GetSubscribeQueries(device, localSubscribeMap_, localSubscribeTotalMap_, subscribeQueries);
164 }
165
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const166 void SubscribeManager::GetRemoteSubscribeQueries(const std::string &device,
167 std::vector<QuerySyncObject> &subscribeQueries) const
168 {
169 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
170 GetSubscribeQueries(device, remoteSubscribedMap_, remoteSubscribedTotalMap_, subscribeQueries);
171 }
172
IsLastRemoteContainSubscribe(const std::string & device,const std::string & queryId) const173 bool SubscribeManager::IsLastRemoteContainSubscribe(const std::string &device, const std::string &queryId) const
174 {
175 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
176 if (remoteSubscribedMap_.find(device) == remoteSubscribedMap_.end()) {
177 LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
178 return false;
179 }
180 auto iter = remoteSubscribedTotalMap_.find(queryId);
181 if (iter == remoteSubscribedTotalMap_.end()) {
182 LOGD("[SubscribeManager] queryId=%s not in remoteSubscribedTotalMap", STR_MASK(queryId));
183 return false;
184 }
185 return iter->second.second == 1;
186 }
187
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds) const188 void SubscribeManager::GetRemoteSubscribeQueryIds(const std::string &device,
189 std::vector<std::string> &subscribeQueryIds) const
190 {
191 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
192 auto iter = remoteSubscribedMap_.find(device);
193 if (iter == remoteSubscribedMap_.end()) {
194 LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
195 return;
196 }
197 for (const auto &queryInfo : iter->second) {
198 if (remoteSubscribedTotalMap_.find(queryInfo.first) == remoteSubscribedTotalMap_.end()) {
199 LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryInfo.first));
200 continue;
201 }
202 subscribeQueryIds.push_back(queryInfo.first);
203 }
204 }
205
LocalSubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const206 int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
207 {
208 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
209 size_t devNum = localSubscribeMap_.size();
210 for (const auto &device : devices) {
211 if (localSubscribeMap_.find(device) != localSubscribeMap_.end()) {
212 continue;
213 }
214 devNum++;
215 if (devNum > MAX_DEVICES_NUM) {
216 LOGE("[SubscribeManager] local subscribe devices is over limit");
217 return -E_MAX_LIMITS;
218 }
219 }
220 std::string queryId = query.GetIdentify();
221 auto allIter = localSubscribeTotalMap_.find(queryId);
222 if (allIter == localSubscribeTotalMap_.end() && localSubscribeTotalMap_.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
223 LOGE("[SubscribeManager] all local subscribe sums is over limit");
224 return -E_MAX_LIMITS;
225 }
226 return E_OK;
227 }
228
IsQueryExistSubscribe(const std::string & queryId) const229 bool SubscribeManager::IsQueryExistSubscribe(const std::string &queryId) const
230 {
231 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
232 return remoteSubscribedTotalMap_.find(queryId) != remoteSubscribedTotalMap_.end();
233 }
234
ClearSubscribeQuery(const std::string & device,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)235 void SubscribeManager::ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap,
236 SubscribedTotalMap &subscribedTotalMap)
237 {
238 if (subscribeMap.find(device) == subscribeMap.end()) {
239 LOGI("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
240 return;
241 }
242 for (const auto &queryInfo : subscribeMap[device]) {
243 if (subscribedTotalMap.find(queryInfo.first) != subscribedTotalMap.end()) {
244 if (subscribedTotalMap[queryInfo.first].second > 0) {
245 subscribedTotalMap[queryInfo.first].second--;
246 }
247 if (subscribedTotalMap[queryInfo.first].second == 0) {
248 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryInfo.first));
249 subscribedTotalMap.erase(queryInfo.first);
250 }
251 }
252 }
253 subscribeMap.erase(device);
254 LOGI("[SubscribeManager] clear dev=%s remote subscribe queies finished", STR_MASK(device));
255 }
256
ReserveSubscribeQuery(const std::string & device,const QuerySyncObject & query,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)257 int SubscribeManager::ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query,
258 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
259 {
260 std::string queryId = query.GetIdentify();
261 auto iter = subscribeMap.find(device);
262 auto allIter = subscribedTotalMap.find(queryId);
263 // limit check
264 if (allIter == subscribedTotalMap.end() && subscribedTotalMap.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
265 LOGE("[SubscribeManager] all subscribe sums is over limit");
266 return -E_MAX_LIMITS;
267 }
268 if (iter == subscribeMap.end() && subscribeMap.size() >= MAX_DEVICES_NUM) {
269 LOGE("[SubscribeManager] subscribe devices is over limit");
270 return -E_MAX_LIMITS;
271 }
272 if (iter != subscribeMap.end() && iter->second.find(queryId) == iter->second.end() &&
273 iter->second.size() >= MAX_SUBSCRIBE_NUM_PER_DEV) {
274 LOGE("[SubscribeManager] subscribe sums is over limit");
275 return -E_MAX_LIMITS;
276 }
277 if (iter != subscribeMap.end() && iter->second.find(queryId) != iter->second.end() &&
278 iter->second[queryId] == SubscribeStatus::ACTIVE) {
279 LOGE("[SubscribeManager] dev=%s,queryId=%s already active in map", STR_MASK(device), STR_MASK(queryId));
280 return E_OK;
281 }
282
283 if (iter == subscribeMap.end()) {
284 subscribeMap[device] = std::map<std::string, SubscribeStatus> {};
285 }
286 bool isNeedInc = false;
287 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
288 subscribeMap[device][queryId] = SubscribeStatus::NOT_ACTIVE;
289 isNeedInc = true;
290 }
291 if (allIter == subscribedTotalMap.end()) {
292 subscribedTotalMap[queryId] = {query, 1};
293 } else if (isNeedInc) {
294 subscribedTotalMap[queryId].second++;
295 }
296 return E_OK;
297 }
298
ActiveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)299 int SubscribeManager::ActiveSubscribeQuery(const std::string &device, const std::string &queryId,
300 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
301 {
302 if (subscribedTotalMap.find(queryId) == subscribedTotalMap.end()) {
303 LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
304 return -E_INTERNAL_ERROR;
305 }
306 if (subscribeMap.find(device) == subscribeMap.end()) {
307 LOGE("[SubscribeManager] can not find dev=%s in localSubscribeMap", STR_MASK(device));
308 return -E_INTERNAL_ERROR;
309 }
310 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
311 LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
312 return -E_INTERNAL_ERROR;
313 }
314 subscribeMap[device][queryId] = SubscribeStatus::ACTIVE;
315 return E_OK;
316 }
317
DeleteSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)318 void SubscribeManager::DeleteSubscribeQuery(const std::string &device, const std::string &queryId,
319 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
320 {
321 if (subscribeMap.find(device) == subscribeMap.end()) {
322 LOGE("[SubscribeManager] can not find dev=%s in map", STR_MASK(device));
323 return;
324 }
325 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
326 LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
327 return;
328 }
329 SubscribeStatus queryStatus = subscribeMap[device][queryId];
330 // not permit to delete the query when something wrong this time,because it is subscribed successfully last time
331 if (queryStatus == SubscribeStatus::ACTIVE) {
332 LOGE("[SubscribeManager] dev=%s,queryId=%s is active, no need to del", STR_MASK(device), STR_MASK(queryId));
333 return;
334 }
335 subscribeMap[device].erase(queryId);
336 auto iter = subscribedTotalMap.find(queryId);
337 if (iter == subscribedTotalMap.end()) {
338 LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
339 return;
340 }
341 iter->second.second--;
342 if (iter->second.second <= 0) {
343 LOGI("[SubscribeManager] del queryId=%s from SubscribeTotalMap", STR_MASK(queryId));
344 subscribedTotalMap.erase(queryId);
345 }
346 LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribeMap success", STR_MASK(device), STR_MASK(queryId));
347 }
348
RemoveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)349 void SubscribeManager::RemoveSubscribeQuery(const std::string &device, const std::string &queryId,
350 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
351 {
352 auto iter = subscribeMap.find(device);
353 if (iter == subscribeMap.end()) {
354 LOGE("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
355 return;
356 }
357 if (iter->second.find(queryId) == subscribeMap[device].end()) {
358 LOGI("[SubscribeManager] dev=%s,queryId=%s not in SubscribedMap", STR_MASK(device), STR_MASK(queryId));
359 return;
360 }
361 iter->second.erase(queryId);
362 auto allIter = subscribedTotalMap.find(queryId);
363 if (allIter == subscribedTotalMap.end()) {
364 LOGI("[SubscribeManager] queryId=%s not in TotalMap", STR_MASK(queryId));
365 return;
366 }
367 allIter->second.second--;
368 if (allIter->second.second <= 0) {
369 subscribedTotalMap.erase(queryId);
370 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryId));
371 }
372 LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribedMap success", STR_MASK(device), STR_MASK(queryId));
373 }
374
GetSubscribeQueries(const std::string & device,const SubscribeMap & subscribeMap,const SubscribedTotalMap & subscribedTotalMap,std::vector<QuerySyncObject> & subscribeQueries) const375 void SubscribeManager::GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap,
376 const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const
377 {
378 auto iter = subscribeMap.find(device);
379 if (iter == subscribeMap.end()) {
380 LOGD("[SubscribeManager] dev=%s not in localSubscribeMap", STR_MASK(device));
381 return;
382 }
383 for (const auto &queryInfo : iter->second) {
384 auto iterTmp = subscribedTotalMap.find(queryInfo.first);
385 if (iterTmp == subscribedTotalMap.end()) {
386 LOGE("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryInfo.first));
387 continue;
388 }
389 subscribeQueries.push_back(iterTmp->second.first);
390 }
391 }
392 } // namespace DistributedDB