1 /*
2 * Copyright (c) 2020 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "pub_sub_implement.h"
16 #include "securec.h"
17 #include "ohos_errno.h"
18 #include "memory_adapter.h"
19 #include "thread_adapter.h"
20
21 static int AddTopic(IUnknown *iUnknown, const Topic *topic);
22 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer);
23 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer);
24 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer);
25 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len);
26 static void DefaultHandle(const Request *request, const Response *response);
27 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request);
28
29 static PubSubImplement g_pubSubImplement = {
30 DEFAULT_IUNKNOWN_ENTRY_BEGIN,
31 .subscriber.AddTopic = AddTopic,
32 .subscriber.Subscribe = Subscribe,
33 .subscriber.ModifyConsumer = ModifyConsumer,
34 .subscriber.Unsubscribe = Unsubscribe,
35 .provider.Publish = Publish,
36 DEFAULT_IUNKNOWN_ENTRY_END,
37 .feature = NULL
38 };
BCE_CreateInstance(Feature * feature)39 PubSubImplement *BCE_CreateInstance(Feature *feature)
40 {
41 g_pubSubImplement.feature = (PubSubFeature *)feature;
42 return &g_pubSubImplement;
43 }
44
45
AddTopic(IUnknown * iUnknown,const Topic * topic)46 static int AddTopic(IUnknown *iUnknown, const Topic *topic)
47 {
48 if (iUnknown == NULL || topic == NULL) {
49 return EC_INVALID;
50 }
51
52 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
53 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
54 return EC_FAILURE;
55 }
56
57 if (broadcast->feature->GetRelation(broadcast->feature, topic) != NULL) {
58 return EC_FAILURE;
59 }
60
61 Relation *head = &broadcast->feature->relations;
62 Relation *newRelation = (Relation *)SAMGR_Malloc(sizeof(Relation));
63 if (newRelation == NULL) {
64 return EC_NOMEMORY;
65 }
66 newRelation->topic = *topic;
67 newRelation->callbacks.consumer = NULL;
68 UtilsListInit(&newRelation->callbacks.node);
69
70 MUTEX_Lock(broadcast->feature->mutex);
71 UtilsListAdd(&head->node, &(newRelation->node));
72 MUTEX_Unlock(broadcast->feature->mutex);
73 return EC_SUCCESS;
74 }
75
Subscribe(IUnknown * iUnknown,const Topic * topic,Consumer * consumer)76 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer)
77 {
78 if (iUnknown == NULL || topic == NULL || consumer == NULL) {
79 return EC_INVALID;
80 }
81
82 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
83 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
84 return EC_FAILURE;
85 }
86
87 Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
88 if (relation == NULL) {
89 return EC_FAILURE;
90 }
91
92 MUTEX_Lock(broadcast->feature->mutex);
93 ConsumerNode *item = NULL;
94 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
95 if (item->consumer->Equal(item->consumer, consumer)) {
96 MUTEX_Unlock(broadcast->feature->mutex);
97 return EC_ALREADY_SUBSCRIBED;
98 }
99 }
100 MUTEX_Unlock(broadcast->feature->mutex);
101 ConsumerNode *consumerNode = (ConsumerNode *)SAMGR_Malloc(sizeof(ConsumerNode));
102 if (consumerNode == NULL) {
103 return EC_NOMEMORY;
104 }
105
106 UtilsListInit(&consumerNode->node);
107 consumerNode->consumer = consumer;
108 MUTEX_Lock(broadcast->feature->mutex);
109 ConsumerNode *head = &relation->callbacks;
110 UtilsListAdd(&head->node, &consumerNode->node);
111 MUTEX_Unlock(broadcast->feature->mutex);
112 return EC_SUCCESS;
113 }
114
ModifyConsumer(IUnknown * iUnknown,const Topic * topic,Consumer * oldConsumer,Consumer * newConsumer)115 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer)
116 {
117 if (iUnknown == NULL || topic == NULL || oldConsumer == NULL || newConsumer == NULL) {
118 return NULL;
119 }
120
121 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
122 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
123 return NULL;
124 }
125
126 Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
127 if (relation == NULL) {
128 return NULL;
129 }
130
131 MUTEX_Lock(broadcast->feature->mutex);
132 ConsumerNode *item = NULL;
133 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
134 if (item->consumer->Equal(item->consumer, oldConsumer)) {
135 Consumer *older = item->consumer;
136 item->consumer = newConsumer;
137 MUTEX_Unlock(broadcast->feature->mutex);
138 return older;
139 }
140 }
141 MUTEX_Unlock(broadcast->feature->mutex);
142 return NULL;
143 }
144
Unsubscribe(IUnknown * iUnknown,const Topic * topic,const Consumer * consumer)145 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer)
146 {
147 if (iUnknown == NULL || topic == NULL || consumer == NULL) {
148 return NULL;
149 }
150
151 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
152 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
153 return NULL;
154 }
155
156 Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
157 if (relation == NULL) {
158 return NULL;
159 }
160 MUTEX_Lock(broadcast->feature->mutex);
161 ConsumerNode *item = NULL;
162 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
163 if (item->consumer->Equal(item->consumer, consumer)) {
164 UtilsListDelete(&item->node);
165 break;
166 }
167 }
168 MUTEX_Unlock(broadcast->feature->mutex);
169 if (item == &relation->callbacks || item == NULL) {
170 return NULL;
171 }
172 Consumer *oldConsumer = item->consumer;
173 SAMGR_Free(item);
174 return oldConsumer;
175 }
176
Publish(IUnknown * iUnknown,const Topic * topic,uint8 * data,int16 len)177 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len)
178 {
179 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
180 PubSubFeature *feature = broadcast->feature;
181 if (feature == NULL) {
182 return FALSE;
183 }
184
185 Request request = {MSG_PUBLISH, 0, NULL, *(uint32 *)topic};
186 if (data != NULL && len > 0) {
187 request.data = (uint8 *)SAMGR_Malloc(len);
188 if (request.data == NULL) {
189 return FALSE;
190 }
191 request.len = len;
192 // There is no problem, the request.data length is equal the input data length.
193 (void)memcpy_s(request.data, request.len, data, len);
194 }
195
196 if (!ImmediatelyPublish(feature, topic, &request)) {
197 (void)SAMGR_Free(request.data);
198 request.data = NULL;
199 request.len = 0;
200 return FALSE;
201 }
202 return TRUE;
203 }
204
ImmediatelyPublish(PubSubFeature * feature,const Topic * topic,const Request * request)205 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request)
206 {
207 if (feature->GetRelation == NULL) {
208 return FALSE;
209 }
210
211 Relation *relation = feature->GetRelation(feature, topic);
212 if (relation == NULL) {
213 return FALSE;
214 }
215
216 if (UtilsListEmpty(&relation->callbacks.node)) {
217 return FALSE;
218 }
219
220 BOOL needAync = FALSE;
221 ConsumerNode *item = NULL;
222 uint32 *token = NULL;
223 MUTEX_Lock(feature->mutex);
224 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
225 if (item->consumer->identity == NULL) {
226 needAync = TRUE;
227 continue;
228 }
229
230 Response response = {item->consumer, 0};
231 int ret = SAMGR_SendSharedDirectRequest(item->consumer->identity, request, &response, &token, DefaultHandle);
232 if (ret != EC_SUCCESS) {
233 needAync = FALSE;
234 break;
235 }
236 }
237 if (needAync) {
238 token = SAMGR_SendSharedRequest(&feature->identity, request, token, NULL);
239 }
240 MUTEX_Unlock(feature->mutex);
241 return (token != NULL);
242 }
243
DefaultHandle(const Request * request,const Response * response)244 static void DefaultHandle(const Request *request, const Response *response)
245 {
246 Consumer *consumer = (Consumer *)response->data;
247 if (consumer == NULL || consumer->Notify == NULL || g_pubSubImplement.feature == NULL) {
248 return;
249 }
250
251 // wait ImmediatelyPublish finished.
252 MUTEX_Lock(g_pubSubImplement.feature->mutex);
253 MUTEX_Unlock(g_pubSubImplement.feature->mutex);
254 Topic topic = request->msgValue;
255 consumer->Notify(consumer, &topic, request);
256 }