• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "subscribe_manager.h"
16 #include "db_common.h"
17 #include "sync_types.h"
18 
19 namespace DistributedDB {
ClearRemoteSubscribeQuery(const std::string & device)20 void SubscribeManager::ClearRemoteSubscribeQuery(const std::string &device)
21 {
22     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
23     ClearSubscribeQuery(device, remoteSubscribedMap_, remoteSubscribedTotalMap_);
24 }
25 
ClearAllRemoteQuery()26 void SubscribeManager::ClearAllRemoteQuery()
27 {
28     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
29     remoteSubscribedMap_.clear();
30     remoteSubscribedTotalMap_.clear();
31 }
32 
ClearLocalSubscribeQuery(const std::string & device)33 void SubscribeManager::ClearLocalSubscribeQuery(const std::string &device)
34 {
35     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
36     unFinishedLocalAutoSubMap_.erase(device);
37     ClearSubscribeQuery(device, localSubscribeMap_, localSubscribeTotalMap_);
38 }
39 
ReserveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)40 int SubscribeManager::ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
41 {
42     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
43     int errCode = ReserveSubscribeQuery(device, query, remoteSubscribedMap_, remoteSubscribedTotalMap_);
44     LOGI("[SubscribeManager] dev=%s,queryId=%s remote reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
45         errCode);
46     return errCode;
47 }
48 
ActiveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)49 int SubscribeManager::ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
50 {
51     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
52     std::string queryId = query.GetIdentify();
53     int errCode = ActiveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
54     LOGI("[SubscribeManager] dev=%s,queryId=%s remote active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
55     return errCode;
56 }
57 
ReserveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)58 int SubscribeManager::ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
59 {
60     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
61     int errCode = ReserveSubscribeQuery(device, query, localSubscribeMap_, localSubscribeTotalMap_);
62     LOGI("[SubscribeManager] dev=%s,queryId=%s local reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
63         errCode);
64     return errCode;
65 }
66 
ActiveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)67 int SubscribeManager::ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
68 {
69     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
70     std::string queryId = query.GetIdentify();
71     int errCode = ActiveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
72     LOGI("[SubscribeManager] dev=%s,queryId=%s local active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
73     if (errCode != E_OK) {
74         return errCode;
75     }
76     if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
77         unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
78             unFinishedLocalAutoSubMap_[device].erase(queryId);
79     }
80     return errCode;
81 }
82 
DeleteLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)83 void SubscribeManager::DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
84 {
85     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
86     std::string queryId = query.GetIdentify();
87     DeleteSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
88 }
89 
DeleteRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)90 void SubscribeManager::DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
91 {
92     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
93     std::string queryId = query.GetIdentify();
94     DeleteSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
95 }
96 
PutLocalUnFiniedSubQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries)97 void SubscribeManager::PutLocalUnFiniedSubQueries(const std::string &device,
98     std::vector<QuerySyncObject> &subscribeQueries)
99 {
100     LOGI("[SubscribeManager] put local unfinished subscribe queries, nums=%d", subscribeQueries.size());
101     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
102     if (subscribeQueries.size() == 0) {
103         unFinishedLocalAutoSubMap_.erase(device);
104         return;
105     }
106     unFinishedLocalAutoSubMap_[device].clear();
107     auto iter = unFinishedLocalAutoSubMap_.find(device);
108     for (const auto &query : subscribeQueries) {
109         iter->second.insert(query.GetIdentify());
110     }
111 }
112 
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries) const113 void SubscribeManager::GetAllUnFinishSubQueries(
114     std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const
115 {
116     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
117     for (auto &item : unFinishedLocalAutoSubMap_) {
118         if (item.second.size() == 0) {
119             continue;
120         }
121         allSyncQueries[item.first] = {};
122         auto iter = allSyncQueries.find(item.first);
123         for (const auto &queryId : item.second) {
124             auto iterTmp = localSubscribeTotalMap_.find(queryId);
125             if (iterTmp == localSubscribeTotalMap_.end()) {
126                 LOGI("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryId));
127                 continue;
128             }
129             iter->second.push_back(iterTmp->second.first);
130         }
131     }
132 }
133 
RemoveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)134 void SubscribeManager::RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
135 {
136     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
137     std::string queryId = query.GetIdentify();
138     RemoveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
139 }
140 
RemoveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)141 void SubscribeManager::RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
142 {
143     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
144     std::string queryId = query.GetIdentify();
145     RemoveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
146     if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
147         unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
148             unFinishedLocalAutoSubMap_[device].erase(queryId);
149             LOGI("[SubscribeManager] dev=%s,queryId=%s delete from UnFinishedMap", STR_MASK(device), STR_MASK(queryId));
150             if (unFinishedLocalAutoSubMap_[device].size() == 0) {
151                 LOGI("[SubscribeManager] dev=%s delete from unFinish map", STR_MASK(device));
152                 unFinishedLocalAutoSubMap_.erase(device);
153             }
154     }
155 }
156 
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const157 void SubscribeManager::GetLocalSubscribeQueries(const std::string &device,
158     std::vector<QuerySyncObject> &subscribeQueries) const
159 {
160     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
161     GetSubscribeQueries(device, localSubscribeMap_, localSubscribeTotalMap_, subscribeQueries);
162 }
163 
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const164 void SubscribeManager::GetRemoteSubscribeQueries(const std::string &device,
165     std::vector<QuerySyncObject> &subscribeQueries) const
166 {
167     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
168     GetSubscribeQueries(device, remoteSubscribedMap_, remoteSubscribedTotalMap_, subscribeQueries);
169 }
170 
IsRemoteContainSubscribe(const std::string & device,const QuerySyncObject & query) const171 bool SubscribeManager::IsRemoteContainSubscribe(const std::string &device, const QuerySyncObject &query) const
172 {
173     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
174     auto iter = remoteSubscribedMap_.find(device);
175     if (iter == remoteSubscribedMap_.end()) {
176         LOGD("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
177         return false;
178     }
179     std::string queryId = query.GetIdentify();
180     auto subIter = iter->second.find(queryId);
181     if (subIter == iter->second.end()) {
182         LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryId));
183         return false;
184     }
185     return true;
186 }
187 
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds) const188 void SubscribeManager::GetRemoteSubscribeQueryIds(const std::string &device,
189     std::vector<std::string> &subscribeQueryIds) const
190 {
191     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
192     auto iter = remoteSubscribedMap_.find(device);
193     if (iter == remoteSubscribedMap_.end()) {
194         LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
195         return;
196     }
197     for (const auto &queryInfo : iter->second) {
198         if (remoteSubscribedTotalMap_.find(queryInfo.first) == remoteSubscribedTotalMap_.end()) {
199             LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryInfo.first));
200             continue;
201         }
202         subscribeQueryIds.push_back(queryInfo.first);
203     }
204 }
205 
LocalSubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const206 int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
207 {
208     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
209     size_t devNum = localSubscribeMap_.size();
210     for (const auto &device : devices) {
211         if (localSubscribeMap_.find(device) != localSubscribeMap_.end()) {
212             continue;
213         }
214         devNum++;
215         if (devNum > MAX_DEVICES_NUM) {
216             LOGE("[SubscribeManager] local subscribe devices is over limit");
217             return -E_MAX_LIMITS;
218         }
219     }
220     std::string queryId = query.GetIdentify();
221     auto allIter = localSubscribeTotalMap_.find(queryId);
222     if (allIter == localSubscribeTotalMap_.end() && localSubscribeTotalMap_.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
223         LOGE("[SubscribeManager] all local subscribe sums is over limit");
224         return -E_MAX_LIMITS;
225     }
226     return E_OK;
227 }
228 
ClearSubscribeQuery(const std::string & device,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)229 void SubscribeManager::ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap,
230     SubscribedTotalMap &subscribedTotalMap)
231 {
232     if (subscribeMap.find(device) == subscribeMap.end()) {
233         LOGI("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
234         return;
235     }
236     for (const auto &queryInfo : subscribeMap[device]) {
237         if (subscribedTotalMap.find(queryInfo.first) != subscribedTotalMap.end()) {
238             if (subscribedTotalMap[queryInfo.first].second > 0) {
239                 subscribedTotalMap[queryInfo.first].second--;
240             }
241             if (subscribedTotalMap[queryInfo.first].second == 0) {
242                 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryInfo.first));
243                 subscribedTotalMap.erase(queryInfo.first);
244             }
245         }
246     }
247     subscribeMap.erase(device);
248     LOGI("[SubscribeManager] clear dev=%s remote subscribe queies finished", STR_MASK(device));
249 }
250 
ReserveSubscribeQuery(const std::string & device,const QuerySyncObject & query,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)251 int SubscribeManager::ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query,
252     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
253 {
254     std::string queryId = query.GetIdentify();
255     auto iter = subscribeMap.find(device);
256     auto allIter = subscribedTotalMap.find(queryId);
257     // limit check
258     if (allIter == subscribedTotalMap.end() && subscribedTotalMap.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
259         LOGE("[SubscribeManager] all subscribe sums is over limit");
260         return -E_MAX_LIMITS;
261     }
262     if (iter == subscribeMap.end() && subscribeMap.size() >= MAX_DEVICES_NUM) {
263         LOGE("[SubscribeManager] subscribe devices is over limit");
264         return -E_MAX_LIMITS;
265     }
266     if (iter != subscribeMap.end() && iter->second.find(queryId) == iter->second.end() &&
267         iter->second.size() >= MAX_SUBSCRIBE_NUM_PER_DEV) {
268         LOGE("[SubscribeManager] subscribe sums is over limit");
269         return -E_MAX_LIMITS;
270     }
271     if (iter != subscribeMap.end() && iter->second.find(queryId) != iter->second.end() &&
272         iter->second[queryId] == SubscribeStatus::ACTIVE) {
273         LOGE("[SubscribeManager] dev=%s,queryId=%s already active in map", STR_MASK(device), STR_MASK(queryId));
274         return E_OK;
275     }
276 
277     if (iter == subscribeMap.end()) {
278         subscribeMap[device] = std::map<std::string, SubscribeStatus> {};
279     }
280     bool isNeedInc = false;
281     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
282         subscribeMap[device][queryId] = SubscribeStatus::NOT_ACTIVE;
283         isNeedInc = true;
284     }
285     if (allIter == subscribedTotalMap.end()) {
286         subscribedTotalMap[queryId] = {query, 1};
287     } else if (isNeedInc) {
288         subscribedTotalMap[queryId].second++;
289     }
290     return E_OK;
291 }
292 
ActiveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)293 int SubscribeManager::ActiveSubscribeQuery(const std::string &device, const std::string &queryId,
294     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
295 {
296     if (subscribedTotalMap.find(queryId) == subscribedTotalMap.end()) {
297         LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
298         return -E_INTERNAL_ERROR;
299     }
300     if (subscribeMap.find(device) == subscribeMap.end()) {
301         LOGE("[SubscribeManager] can not find dev=%s in localSubscribeMap", STR_MASK(device));
302         return -E_INTERNAL_ERROR;
303     }
304     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
305         LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
306         return -E_INTERNAL_ERROR;
307     }
308     subscribeMap[device][queryId] = SubscribeStatus::ACTIVE;
309     return E_OK;
310 }
311 
DeleteSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)312 void SubscribeManager::DeleteSubscribeQuery(const std::string &device, const std::string &queryId,
313     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
314 {
315     if (subscribeMap.find(device) == subscribeMap.end()) {
316         LOGE("[SubscribeManager] can not find dev=%s in map", STR_MASK(device));
317         return;
318     }
319     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
320         LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
321         return;
322     }
323     SubscribeStatus queryStatus = subscribeMap[device][queryId];
324     // not permit to delete the query when something wrong this time,because it is subscribed successfully last time
325     if (queryStatus == SubscribeStatus::ACTIVE) {
326         LOGE("[SubscribeManager] dev=%s,queryId=%s is active, no need to del", STR_MASK(device), STR_MASK(queryId));
327         return;
328     }
329     subscribeMap[device].erase(queryId);
330     auto iter = subscribedTotalMap.find(queryId);
331     if (iter == subscribedTotalMap.end()) {
332         LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
333         return;
334     }
335     iter->second.second--;
336     if (iter->second.second <= 0) {
337         LOGI("[SubscribeManager] del queryId=%s from SubscribeTotalMap", STR_MASK(queryId));
338         subscribedTotalMap.erase(queryId);
339     }
340     LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribeMap success", STR_MASK(device), STR_MASK(queryId));
341 }
342 
RemoveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)343 void SubscribeManager::RemoveSubscribeQuery(const std::string &device, const std::string &queryId,
344     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
345 {
346     auto iter = subscribeMap.find(device);
347     if (iter == subscribeMap.end()) {
348         LOGE("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
349         return;
350     }
351     if (iter->second.find(queryId) == subscribeMap[device].end()) {
352         LOGI("[SubscribeManager] dev=%s,queryId=%s not in SubscribedMap", STR_MASK(device), STR_MASK(queryId));
353         return;
354     }
355     iter->second.erase(queryId);
356     auto allIter = subscribedTotalMap.find(queryId);
357     if (allIter == subscribedTotalMap.end()) {
358         LOGI("[SubscribeManager] queryId=%s not in TotalMap", STR_MASK(queryId));
359         return;
360     }
361     allIter->second.second--;
362     if (allIter->second.second <= 0) {
363         subscribedTotalMap.erase(queryId);
364         LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryId));
365     }
366     LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribedMap success", STR_MASK(device), STR_MASK(queryId));
367 }
368 
GetSubscribeQueries(const std::string & device,const SubscribeMap & subscribeMap,const SubscribedTotalMap & subscribedTotalMap,std::vector<QuerySyncObject> & subscribeQueries) const369 void SubscribeManager::GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap,
370     const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const
371 {
372     auto iter = subscribeMap.find(device);
373     if (iter == subscribeMap.end()) {
374         LOGD("[SubscribeManager] dev=%s not in localSubscribeMap", STR_MASK(device));
375         return;
376     }
377     for (const auto &queryInfo : iter->second) {
378         auto iterTmp = subscribedTotalMap.find(queryInfo.first);
379         if (iterTmp == subscribedTotalMap.end()) {
380             LOGE("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryInfo.first));
381             continue;
382         }
383         subscribeQueries.push_back(iterTmp->second.first);
384     }
385 }
386 } // namespace DistributedDB