• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2020 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "pub_sub_implement.h"
16 #include "securec.h"
17 #include "ohos_errno.h"
18 #include "memory_adapter.h"
19 #include "thread_adapter.h"
20 
21 static int AddTopic(IUnknown *iUnknown, const Topic *topic);
22 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer);
23 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer);
24 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer);
25 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len);
26 static void DefaultHandle(const Request *request, const Response *response);
27 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request);
28 
29 static PubSubImplement g_pubSubImplement = {
30     DEFAULT_IUNKNOWN_ENTRY_BEGIN,
31     .subscriber.AddTopic = AddTopic,
32     .subscriber.Subscribe = Subscribe,
33     .subscriber.ModifyConsumer = ModifyConsumer,
34     .subscriber.Unsubscribe = Unsubscribe,
35     .provider.Publish = Publish,
36     DEFAULT_IUNKNOWN_ENTRY_END,
37     .feature = NULL
38 };
BCE_CreateInstance(Feature * feature)39 PubSubImplement *BCE_CreateInstance(Feature *feature)
40 {
41     g_pubSubImplement.feature = (PubSubFeature *)feature;
42     return &g_pubSubImplement;
43 }
44 
45 
AddTopic(IUnknown * iUnknown,const Topic * topic)46 static int AddTopic(IUnknown *iUnknown, const Topic *topic)
47 {
48     if (iUnknown == NULL || topic == NULL) {
49         return EC_INVALID;
50     }
51 
52     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
53     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
54         return EC_FAILURE;
55     }
56 
57     if (broadcast->feature->GetRelation(broadcast->feature, topic) != NULL) {
58         return EC_FAILURE;
59     }
60 
61     Relation *head = &broadcast->feature->relations;
62     Relation *newRelation = (Relation *)SAMGR_Malloc(sizeof(Relation));
63     if (newRelation == NULL) {
64         return EC_NOMEMORY;
65     }
66     newRelation->topic = *topic;
67     newRelation->callbacks.consumer = NULL;
68     UtilsListInit(&newRelation->callbacks.node);
69 
70     MUTEX_Lock(broadcast->feature->mutex);
71     UtilsListAdd(&head->node, &(newRelation->node));
72     MUTEX_Unlock(broadcast->feature->mutex);
73     return EC_SUCCESS;
74 }
75 
Subscribe(IUnknown * iUnknown,const Topic * topic,Consumer * consumer)76 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer)
77 {
78     if (iUnknown == NULL || topic == NULL || consumer == NULL) {
79         return EC_INVALID;
80     }
81 
82     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
83     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
84         return EC_FAILURE;
85     }
86 
87     Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
88     if (relation == NULL) {
89         return EC_FAILURE;
90     }
91 
92     MUTEX_Lock(broadcast->feature->mutex);
93     ConsumerNode *item = NULL;
94     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
95         if (item->consumer->Equal(item->consumer, consumer)) {
96             MUTEX_Unlock(broadcast->feature->mutex);
97             return EC_ALREADY_SUBSCRIBED;
98         }
99     }
100     MUTEX_Unlock(broadcast->feature->mutex);
101     ConsumerNode *consumerNode = (ConsumerNode *)SAMGR_Malloc(sizeof(ConsumerNode));
102     if (consumerNode == NULL) {
103         return EC_NOMEMORY;
104     }
105 
106     UtilsListInit(&consumerNode->node);
107     consumerNode->consumer = consumer;
108     MUTEX_Lock(broadcast->feature->mutex);
109     ConsumerNode *head = &relation->callbacks;
110     UtilsListAdd(&head->node, &consumerNode->node);
111     MUTEX_Unlock(broadcast->feature->mutex);
112     return EC_SUCCESS;
113 }
114 
ModifyConsumer(IUnknown * iUnknown,const Topic * topic,Consumer * oldConsumer,Consumer * newConsumer)115 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer)
116 {
117     if (iUnknown == NULL || topic == NULL || oldConsumer == NULL || newConsumer == NULL) {
118         return NULL;
119     }
120 
121     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
122     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
123         return NULL;
124     }
125 
126     Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
127     if (relation == NULL) {
128         return NULL;
129     }
130 
131     MUTEX_Lock(broadcast->feature->mutex);
132     ConsumerNode *item = NULL;
133     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
134         if (item->consumer->Equal(item->consumer, oldConsumer)) {
135             Consumer *older = item->consumer;
136             item->consumer = newConsumer;
137             MUTEX_Unlock(broadcast->feature->mutex);
138             return older;
139         }
140     }
141     MUTEX_Unlock(broadcast->feature->mutex);
142     return NULL;
143 }
144 
Unsubscribe(IUnknown * iUnknown,const Topic * topic,const Consumer * consumer)145 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer)
146 {
147     if (iUnknown == NULL || topic == NULL || consumer == NULL) {
148         return NULL;
149     }
150 
151     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
152     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
153         return NULL;
154     }
155 
156     Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
157     if (relation == NULL) {
158         return NULL;
159     }
160     MUTEX_Lock(broadcast->feature->mutex);
161     ConsumerNode *item = NULL;
162     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
163         if (item->consumer->Equal(item->consumer, consumer)) {
164             UtilsListDelete(&item->node);
165             break;
166         }
167     }
168     MUTEX_Unlock(broadcast->feature->mutex);
169     if (item == &relation->callbacks || item == NULL) {
170         return NULL;
171     }
172     Consumer *oldConsumer = item->consumer;
173     SAMGR_Free(item);
174     return oldConsumer;
175 }
176 
Publish(IUnknown * iUnknown,const Topic * topic,uint8 * data,int16 len)177 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len)
178 {
179     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
180     PubSubFeature *feature = broadcast->feature;
181     if (feature == NULL) {
182         return FALSE;
183     }
184 
185     Request request = {MSG_PUBLISH, 0, NULL, *(uint32 *)topic};
186     if (data != NULL && len > 0) {
187         request.data = (uint8 *)SAMGR_Malloc(len);
188         if (request.data == NULL) {
189             return FALSE;
190         }
191         request.len = len;
192         // There is no problem, the request.data length is equal the input data length.
193         (void)memcpy_s(request.data, request.len, data, len);
194     }
195 
196     if (!ImmediatelyPublish(feature, topic, &request)) {
197         (void)SAMGR_Free(request.data);
198         request.data = NULL;
199         request.len = 0;
200         return FALSE;
201     }
202     return TRUE;
203 }
204 
ImmediatelyPublish(PubSubFeature * feature,const Topic * topic,const Request * request)205 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request)
206 {
207     if (feature->GetRelation == NULL) {
208         return FALSE;
209     }
210 
211     Relation *relation = feature->GetRelation(feature, topic);
212     if (relation == NULL) {
213         return FALSE;
214     }
215 
216     if (UtilsListEmpty(&relation->callbacks.node)) {
217         return FALSE;
218     }
219 
220     BOOL needAync = FALSE;
221     ConsumerNode *item = NULL;
222     uint32 *token = NULL;
223     MUTEX_Lock(feature->mutex);
224     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
225         if (item->consumer->identity == NULL) {
226             needAync = TRUE;
227             continue;
228         }
229 
230         Response response = {item->consumer, 0};
231         int ret = SAMGR_SendSharedDirectRequest(item->consumer->identity, request, &response, &token, DefaultHandle);
232         if (ret != EC_SUCCESS) {
233             needAync = FALSE;
234             break;
235         }
236     }
237     if (needAync) {
238         token = SAMGR_SendSharedRequest(&feature->identity, request, token, NULL);
239     }
240     MUTEX_Unlock(feature->mutex);
241     return (token != NULL);
242 }
243 
DefaultHandle(const Request * request,const Response * response)244 static void DefaultHandle(const Request *request, const Response *response)
245 {
246     Consumer *consumer = (Consumer *)response->data;
247     if (consumer == NULL || consumer->Notify == NULL || g_pubSubImplement.feature == NULL) {
248         return;
249     }
250 
251     // wait ImmediatelyPublish finished.
252     MUTEX_Lock(g_pubSubImplement.feature->mutex);
253     MUTEX_Unlock(g_pubSubImplement.feature->mutex);
254     Topic topic = request->msgValue;
255     consumer->Notify(consumer, &topic, request);
256 }