1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dm_common_event_manager.h"
17
18 #include <thread>
19
20 #include "dm_constants.h"
21
22 using namespace OHOS::EventFwk;
23
24 namespace OHOS {
25 namespace DistributedHardware {
26 std::mutex DmCommonEventManager::callbackQueueMutex_;
27 std::mutex DmCommonEventManager::eventSubscriberMutex_;
28 std::condition_variable DmCommonEventManager::notEmpty_;
29 std::list<CommomEventCallbackNode> DmCommonEventManager::callbackQueue_;
30
GetInstance()31 DmCommonEventManager &DmCommonEventManager::GetInstance()
32 {
33 static DmCommonEventManager instance;
34 return instance;
35 }
36
DmCommonEventManager()37 DmCommonEventManager::DmCommonEventManager()
38 {
39 std::thread th(DealCallback);
40 th.detach();
41 }
42
~DmCommonEventManager()43 DmCommonEventManager::~DmCommonEventManager()
44 {
45 std::unique_lock<std::mutex> mutexLock(eventSubscriberMutex_);
46 for (auto iter = dmEventSubscriber_.begin(); iter != dmEventSubscriber_.end(); iter++) {
47 if (!CommonEventManager::UnSubscribeCommonEvent(iter->second)) {
48 LOGI("Unsubscribe service event failed: %s", iter->first.c_str());
49 }
50 }
51 }
52
DealCallback(void)53 void DmCommonEventManager::DealCallback(void)
54 {
55 while (1) {
56 std::unique_lock<std::mutex> callbackQueueMutexLock(callbackQueueMutex_);
57 notEmpty_.wait(callbackQueueMutexLock, [] { return !callbackQueue_.empty(); });
58 CommomEventCallbackNode node = callbackQueue_.front();
59 int32_t input = node.input_;
60 CommomEventCallback funcPrt = node.callback_;
61 funcPrt(input);
62 callbackQueue_.pop_front();
63 }
64 }
65
SubscribeServiceEvent(const std::string & event,const CommomEventCallback callback)66 bool DmCommonEventManager::SubscribeServiceEvent(const std::string &event, const CommomEventCallback callback)
67 {
68 LOGI("Subscribe event: %s", event.c_str());
69 if (dmEventSubscriber_.find(event) != dmEventSubscriber_.end() || callback == nullptr) {
70 LOGE("Subscribe event:%s has been exist or callback is nullptr", event.c_str());
71 return false;
72 }
73
74 MatchingSkills matchingSkills;
75 matchingSkills.AddEvent(event);
76 CommonEventSubscribeInfo subscriberInfo(matchingSkills);
77 std::shared_ptr<EventSubscriber> subscriber =
78 std::make_shared<EventSubscriber>(subscriberInfo, callback, event);
79 if (subscriber == nullptr) {
80 LOGE("subscriber is nullptr %s", event.c_str());
81 return false;
82 }
83
84 if (!CommonEventManager::SubscribeCommonEvent(subscriber)) {
85 LOGE("Subscribe service event failed: %s", event.c_str());
86 return false;
87 }
88
89 std::unique_lock<std::mutex> mutexLock(eventSubscriberMutex_);
90 dmEventSubscriber_[event] = subscriber;
91 return true;
92 }
93
UnsubscribeServiceEvent(const std::string & event)94 bool DmCommonEventManager::UnsubscribeServiceEvent(const std::string &event)
95 {
96 LOGI("UnSubscribe event: %s", event.c_str());
97 if (dmEventSubscriber_.find(event) == dmEventSubscriber_.end()) {
98 LOGE("UnSubscribe event: %s not been exist", event.c_str());
99 return false;
100 }
101
102 if (!CommonEventManager::UnSubscribeCommonEvent(dmEventSubscriber_[event])) {
103 LOGE("Unsubscribe service event failed: %s", event.c_str());
104 return false;
105 }
106
107 std::unique_lock<std::mutex> mutexLock(eventSubscriberMutex_);
108 dmEventSubscriber_.erase(event);
109 return true;
110 }
111
OnReceiveEvent(const CommonEventData & data)112 void DmCommonEventManager::EventSubscriber::OnReceiveEvent(const CommonEventData &data)
113 {
114 std::string receiveEvent = data.GetWant().GetAction();
115 LOGI("Received event: %s", receiveEvent.c_str());
116 if (receiveEvent != event_) {
117 LOGE("Received event is error");
118 return;
119 }
120
121 int32_t userId = data.GetCode();
122 if (userId <= 0) {
123 LOGE("userId is less zero");
124 return;
125 }
126
127 std::unique_lock<std::mutex> callbackQueueMutexLock(callbackQueueMutex_);
128 if (callbackQueue_.size() > COMMON_CALLBACK_MAX_SIZE) {
129 LOGE("event callback Queue is too long");
130 return;
131 }
132
133 CommomEventCallbackNode node {userId, callback_};
134 callbackQueue_.push_back(node);
135 notEmpty_.notify_one();
136 }
137 } // namespace DistributedHardware
138 } // namespace OHOS