• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "subscribe_manager.h"
16 
17 #include <mutex>
18 #include "db_common.h"
19 #include "sync_types.h"
20 
21 namespace DistributedDB {
ClearRemoteSubscribeQuery(const std::string & device)22 void SubscribeManager::ClearRemoteSubscribeQuery(const std::string &device)
23 {
24     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
25     ClearSubscribeQuery(device, remoteSubscribedMap_, remoteSubscribedTotalMap_);
26 }
27 
ClearAllRemoteQuery()28 void SubscribeManager::ClearAllRemoteQuery()
29 {
30     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
31     remoteSubscribedMap_.clear();
32     remoteSubscribedTotalMap_.clear();
33 }
34 
ClearLocalSubscribeQuery(const std::string & device)35 void SubscribeManager::ClearLocalSubscribeQuery(const std::string &device)
36 {
37     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
38     unFinishedLocalAutoSubMap_.erase(device);
39     ClearSubscribeQuery(device, localSubscribeMap_, localSubscribeTotalMap_);
40 }
41 
ReserveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)42 int SubscribeManager::ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
43 {
44     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
45     int errCode = ReserveSubscribeQuery(device, query, remoteSubscribedMap_, remoteSubscribedTotalMap_);
46     LOGI("[SubscribeManager] dev=%s,queryId=%s remote reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
47         errCode);
48     return errCode;
49 }
50 
ActiveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)51 int SubscribeManager::ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
52 {
53     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
54     std::string queryId = query.GetIdentify();
55     int errCode = ActiveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
56     LOGI("[SubscribeManager] dev=%s,queryId=%s remote active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
57     return errCode;
58 }
59 
ReserveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)60 int SubscribeManager::ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
61 {
62     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
63     int errCode = ReserveSubscribeQuery(device, query, localSubscribeMap_, localSubscribeTotalMap_);
64     LOGI("[SubscribeManager] dev=%s,queryId=%s local reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
65         errCode);
66     return errCode;
67 }
68 
ActiveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)69 int SubscribeManager::ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
70 {
71     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
72     std::string queryId = query.GetIdentify();
73     int errCode = ActiveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
74     LOGI("[SubscribeManager] dev=%s,queryId=%s local active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
75     if (errCode != E_OK) {
76         return errCode;
77     }
78     if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
79         unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
80         unFinishedLocalAutoSubMap_[device].erase(queryId);
81     }
82     return errCode;
83 }
84 
DeleteLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)85 void SubscribeManager::DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
86 {
87     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
88     std::string queryId = query.GetIdentify();
89     DeleteSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
90 }
91 
DeleteRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)92 void SubscribeManager::DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
93 {
94     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
95     std::string queryId = query.GetIdentify();
96     DeleteSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
97 }
98 
PutLocalUnFiniedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)99 void SubscribeManager::PutLocalUnFiniedSubQueries(const std::string &device,
100     const std::vector<QuerySyncObject> &subscribeQueries)
101 {
102     LOGI("[SubscribeManager] put local unfinished subscribe queries, nums=%zu", subscribeQueries.size());
103     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
104     if (subscribeQueries.size() == 0) {
105         unFinishedLocalAutoSubMap_.erase(device);
106         return;
107     }
108     unFinishedLocalAutoSubMap_[device].clear();
109     auto iter = unFinishedLocalAutoSubMap_.find(device);
110     for (const auto &query : subscribeQueries) {
111         iter->second.insert(query.GetIdentify());
112     }
113 }
114 
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries) const115 void SubscribeManager::GetAllUnFinishSubQueries(
116     std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const
117 {
118     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
119     for (auto &item : unFinishedLocalAutoSubMap_) {
120         if (item.second.size() == 0) {
121             continue;
122         }
123         allSyncQueries[item.first] = {};
124         auto iter = allSyncQueries.find(item.first);
125         for (const auto &queryId : item.second) {
126             auto iterTmp = localSubscribeTotalMap_.find(queryId);
127             if (iterTmp == localSubscribeTotalMap_.end()) {
128                 LOGI("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryId));
129                 continue;
130             }
131             iter->second.push_back(iterTmp->second.first);
132         }
133     }
134 }
135 
RemoveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)136 void SubscribeManager::RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
137 {
138     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
139     std::string queryId = query.GetIdentify();
140     RemoveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
141 }
142 
RemoveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)143 void SubscribeManager::RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
144 {
145     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
146     std::string queryId = query.GetIdentify();
147     RemoveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
148     if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
149         unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
150         unFinishedLocalAutoSubMap_[device].erase(queryId);
151         LOGI("[SubscribeManager] dev=%s,queryId=%s delete from UnFinishedMap", STR_MASK(device), STR_MASK(queryId));
152         if (unFinishedLocalAutoSubMap_[device].size() == 0) {
153             LOGI("[SubscribeManager] dev=%s delete from unFinish map", STR_MASK(device));
154             unFinishedLocalAutoSubMap_.erase(device);
155         }
156     }
157 }
158 
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const159 void SubscribeManager::GetLocalSubscribeQueries(const std::string &device,
160     std::vector<QuerySyncObject> &subscribeQueries) const
161 {
162     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
163     GetSubscribeQueries(device, localSubscribeMap_, localSubscribeTotalMap_, subscribeQueries);
164 }
165 
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const166 void SubscribeManager::GetRemoteSubscribeQueries(const std::string &device,
167     std::vector<QuerySyncObject> &subscribeQueries) const
168 {
169     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
170     GetSubscribeQueries(device, remoteSubscribedMap_, remoteSubscribedTotalMap_, subscribeQueries);
171 }
172 
IsLastRemoteContainSubscribe(const std::string & device,const std::string & queryId) const173 bool SubscribeManager::IsLastRemoteContainSubscribe(const std::string &device, const std::string &queryId) const
174 {
175     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
176     if (remoteSubscribedMap_.find(device) == remoteSubscribedMap_.end()) {
177         LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
178         return false;
179     }
180     auto iter = remoteSubscribedTotalMap_.find(queryId);
181     if (iter == remoteSubscribedTotalMap_.end()) {
182         LOGD("[SubscribeManager] queryId=%s not in remoteSubscribedTotalMap", STR_MASK(queryId));
183         return false;
184     }
185     return iter->second.second == 1;
186 }
187 
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds) const188 void SubscribeManager::GetRemoteSubscribeQueryIds(const std::string &device,
189     std::vector<std::string> &subscribeQueryIds) const
190 {
191     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
192     auto iter = remoteSubscribedMap_.find(device);
193     if (iter == remoteSubscribedMap_.end()) {
194         LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
195         return;
196     }
197     for (const auto &queryInfo : iter->second) {
198         if (remoteSubscribedTotalMap_.find(queryInfo.first) == remoteSubscribedTotalMap_.end()) {
199             LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryInfo.first));
200             continue;
201         }
202         subscribeQueryIds.push_back(queryInfo.first);
203     }
204 }
205 
LocalSubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const206 int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
207 {
208     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
209     size_t devNum = localSubscribeMap_.size();
210     for (const auto &device : devices) {
211         if (localSubscribeMap_.find(device) != localSubscribeMap_.end()) {
212             continue;
213         }
214         devNum++;
215         if (devNum > MAX_DEVICES_NUM) {
216             LOGE("[SubscribeManager] local subscribe devices is over limit");
217             return -E_MAX_LIMITS;
218         }
219     }
220     std::string queryId = query.GetIdentify();
221     auto allIter = localSubscribeTotalMap_.find(queryId);
222     if (allIter == localSubscribeTotalMap_.end() && localSubscribeTotalMap_.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
223         LOGE("[SubscribeManager] all local subscribe sums is over limit");
224         return -E_MAX_LIMITS;
225     }
226     return E_OK;
227 }
228 
IsQueryExistSubscribe(const std::string & queryId) const229 bool SubscribeManager::IsQueryExistSubscribe(const std::string &queryId) const
230 {
231     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
232     return remoteSubscribedTotalMap_.find(queryId) != remoteSubscribedTotalMap_.end();
233 }
234 
ClearSubscribeQuery(const std::string & device,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)235 void SubscribeManager::ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap,
236     SubscribedTotalMap &subscribedTotalMap)
237 {
238     if (subscribeMap.find(device) == subscribeMap.end()) {
239         LOGI("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
240         return;
241     }
242     for (const auto &queryInfo : subscribeMap[device]) {
243         if (subscribedTotalMap.find(queryInfo.first) != subscribedTotalMap.end()) {
244             if (subscribedTotalMap[queryInfo.first].second > 0) {
245                 subscribedTotalMap[queryInfo.first].second--;
246             }
247             if (subscribedTotalMap[queryInfo.first].second == 0) {
248                 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryInfo.first));
249                 subscribedTotalMap.erase(queryInfo.first);
250             }
251         }
252     }
253     subscribeMap.erase(device);
254     LOGI("[SubscribeManager] clear dev=%s remote subscribe queies finished", STR_MASK(device));
255 }
256 
ReserveSubscribeQuery(const std::string & device,const QuerySyncObject & query,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)257 int SubscribeManager::ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query,
258     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
259 {
260     std::string queryId = query.GetIdentify();
261     auto iter = subscribeMap.find(device);
262     auto allIter = subscribedTotalMap.find(queryId);
263     // limit check
264     if (allIter == subscribedTotalMap.end() && subscribedTotalMap.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
265         LOGE("[SubscribeManager] all subscribe sums is over limit");
266         return -E_MAX_LIMITS;
267     }
268     if (iter == subscribeMap.end() && subscribeMap.size() >= MAX_DEVICES_NUM) {
269         LOGE("[SubscribeManager] subscribe devices is over limit");
270         return -E_MAX_LIMITS;
271     }
272     if (iter != subscribeMap.end() && iter->second.find(queryId) == iter->second.end() &&
273         iter->second.size() >= MAX_SUBSCRIBE_NUM_PER_DEV) {
274         LOGE("[SubscribeManager] subscribe sums is over limit");
275         return -E_MAX_LIMITS;
276     }
277     if (iter != subscribeMap.end() && iter->second.find(queryId) != iter->second.end() &&
278         iter->second[queryId] == SubscribeStatus::ACTIVE) {
279         LOGE("[SubscribeManager] dev=%s,queryId=%s already active in map", STR_MASK(device), STR_MASK(queryId));
280         return E_OK;
281     }
282 
283     if (iter == subscribeMap.end()) {
284         subscribeMap[device] = std::map<std::string, SubscribeStatus> {};
285     }
286     bool isNeedInc = false;
287     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
288         subscribeMap[device][queryId] = SubscribeStatus::NOT_ACTIVE;
289         isNeedInc = true;
290     }
291     if (allIter == subscribedTotalMap.end()) {
292         subscribedTotalMap[queryId] = {query, 1};
293     } else if (isNeedInc) {
294         subscribedTotalMap[queryId].second++;
295     }
296     return E_OK;
297 }
298 
ActiveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)299 int SubscribeManager::ActiveSubscribeQuery(const std::string &device, const std::string &queryId,
300     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
301 {
302     if (subscribedTotalMap.find(queryId) == subscribedTotalMap.end()) {
303         LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
304         return -E_INTERNAL_ERROR;
305     }
306     if (subscribeMap.find(device) == subscribeMap.end()) {
307         LOGE("[SubscribeManager] can not find dev=%s in localSubscribeMap", STR_MASK(device));
308         return -E_INTERNAL_ERROR;
309     }
310     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
311         LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
312         return -E_INTERNAL_ERROR;
313     }
314     subscribeMap[device][queryId] = SubscribeStatus::ACTIVE;
315     return E_OK;
316 }
317 
DeleteSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)318 void SubscribeManager::DeleteSubscribeQuery(const std::string &device, const std::string &queryId,
319     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
320 {
321     if (subscribeMap.find(device) == subscribeMap.end()) {
322         LOGE("[SubscribeManager] can not find dev=%s in map", STR_MASK(device));
323         return;
324     }
325     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
326         LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
327         return;
328     }
329     SubscribeStatus queryStatus = subscribeMap[device][queryId];
330     // not permit to delete the query when something wrong this time,because it is subscribed successfully last time
331     if (queryStatus == SubscribeStatus::ACTIVE) {
332         LOGE("[SubscribeManager] dev=%s,queryId=%s is active, no need to del", STR_MASK(device), STR_MASK(queryId));
333         return;
334     }
335     subscribeMap[device].erase(queryId);
336     auto iter = subscribedTotalMap.find(queryId);
337     if (iter == subscribedTotalMap.end()) {
338         LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
339         return;
340     }
341     iter->second.second--;
342     if (iter->second.second <= 0) {
343         LOGI("[SubscribeManager] del queryId=%s from SubscribeTotalMap", STR_MASK(queryId));
344         subscribedTotalMap.erase(queryId);
345     }
346     LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribeMap success", STR_MASK(device), STR_MASK(queryId));
347 }
348 
RemoveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)349 void SubscribeManager::RemoveSubscribeQuery(const std::string &device, const std::string &queryId,
350     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
351 {
352     auto iter = subscribeMap.find(device);
353     if (iter == subscribeMap.end()) {
354         LOGE("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
355         return;
356     }
357     if (iter->second.find(queryId) == subscribeMap[device].end()) {
358         LOGI("[SubscribeManager] dev=%s,queryId=%s not in SubscribedMap", STR_MASK(device), STR_MASK(queryId));
359         return;
360     }
361     iter->second.erase(queryId);
362     auto allIter = subscribedTotalMap.find(queryId);
363     if (allIter == subscribedTotalMap.end()) {
364         LOGI("[SubscribeManager] queryId=%s not in TotalMap", STR_MASK(queryId));
365         return;
366     }
367     allIter->second.second--;
368     if (allIter->second.second <= 0) {
369         subscribedTotalMap.erase(queryId);
370         LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryId));
371     }
372     LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribedMap success", STR_MASK(device), STR_MASK(queryId));
373 }
374 
GetSubscribeQueries(const std::string & device,const SubscribeMap & subscribeMap,const SubscribedTotalMap & subscribedTotalMap,std::vector<QuerySyncObject> & subscribeQueries) const375 void SubscribeManager::GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap,
376     const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const
377 {
378     auto iter = subscribeMap.find(device);
379     if (iter == subscribeMap.end()) {
380         LOGD("[SubscribeManager] dev=%s not in localSubscribeMap", STR_MASK(device));
381         return;
382     }
383     for (const auto &queryInfo : iter->second) {
384         auto iterTmp = subscribedTotalMap.find(queryInfo.first);
385         if (iterTmp == subscribedTotalMap.end()) {
386             LOGE("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryInfo.first));
387             continue;
388         }
389         subscribeQueries.push_back(iterTmp->second.first);
390     }
391 }
392 } // namespace DistributedDB