• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
3  *
4  * HDF is dual licensed: you can use it either under the terms of
5  * the GPL, or the BSD license, at your option.
6  * See the LICENSE file in the root of this repository for complete details.
7  */
8 
9 #include "securec.h"
10 #include "osal/osal_thread.h"
11 #include "osal/osal_time.h"
12 #include "osal/osal_mutex.h"
13 #include "utils/hdf_log.h"
14 #include "message_dispatcher.h"
15 #include "hdf_wlan_priority_queue.h"
16 #include "message_router_inner.h"
17 
18 #ifdef USERSPACE_CLIENT_SUPPORT
19 #define HDF_LOG_TAG UMsgEngine
20 #else
21 #define HDF_LOG_TAG KMsgEngine
22 #endif
23 
24 typedef struct {
25     INHERT_MESSAGE_DISPATCHER;
26     OSAL_DECLARE_THREAD(dispatcherThread);
27 } LocalMessageDispatcher;
28 
ReleaseMessageContext(MessageContext * context)29 void ReleaseMessageContext(MessageContext *context)
30 {
31     if (context == NULL) {
32         return;
33     }
34     if (context->rspData != NULL) {
35         HdfSbufRecycle(context->rspData);
36         context->rspData = NULL;
37     }
38 
39     if (context->crossNode ||
40         (context->requestType != MESSAGE_TYPE_SYNC_REQ && context->requestType != MESSAGE_TYPE_SYNC_RSP)) {
41         // Sync request message may use stack mem.Memory is managed by user
42         if (context->reqData != NULL) {
43             HdfSbufRecycle(context->reqData);
44             context->reqData = NULL;
45         }
46         OsalMemFree(context);
47     }
48 }
49 
ReleaseMessageMapper(struct ServiceDef * mapper)50 void ReleaseMessageMapper(struct ServiceDef *mapper)
51 {
52     if (mapper == NULL) {
53         return;
54     }
55     if (mapper->messages != NULL) {
56         OsalMemFree(mapper->messages);
57         mapper->messages = NULL;
58     }
59     OsalMemFree(mapper);
60 }
61 
GetMsgDef(const struct ServiceDef * serviceDef,uint32_t commandId)62 struct MessageDef *GetMsgDef(const struct ServiceDef *serviceDef, uint32_t commandId)
63 {
64     struct MessageDef *msgDef = NULL;
65     if (serviceDef == NULL || serviceDef->messages == NULL) {
66         HDF_LOGE("%s:input is NULL!", __func__);
67         return NULL;
68     }
69     if (commandId >= serviceDef->messagesLength) {
70         HDF_LOGE("%s:commandId exceed service def!", __func__);
71         return NULL;
72     }
73 
74     msgDef = serviceDef->messages + commandId;
75     if (msgDef->handler == NULL) {
76         HDF_LOGE("%s:command has no handler!", __func__);
77         return NULL;
78     }
79     return msgDef;
80 }
81 
AppendToLocalDispatcher(MessageDispatcher * dispatcher,const uint8_t priority,MessageContext * context)82 ErrorCode AppendToLocalDispatcher(MessageDispatcher *dispatcher, const uint8_t priority, MessageContext *context)
83 {
84     if (context == NULL) {
85         HDF_LOGE("%s:Input context is NULL!", __func__);
86         return ME_ERROR_NULL_PTR;
87     }
88     if (dispatcher == NULL) {
89         HDF_LOGE("%s:Input dispatcher is NULL!", __func__);
90         return ME_ERROR_NULL_PTR;
91     }
92 
93     if (dispatcher->messageQueue == NULL) {
94         HDF_LOGE("MessageQueue is NULL.");
95         return ME_ERROR_NULL_PTR;
96     }
97 
98     if (dispatcher->status != ME_STATUS_RUNNING) {
99         HDF_LOGE("%s:dispatcher is not running", __func__);
100         return ME_ERROR_DISPATCHER_NOT_RUNNING;
101     }
102     return PushPriorityQueue(dispatcher->messageQueue, priority, context);
103 }
104 
SetToResponse(MessageContext * context)105 void SetToResponse(MessageContext *context)
106 {
107     ServiceId senderId;
108     if (context->requestType != MESSAGE_TYPE_ASYNC_REQ && context->requestType != MESSAGE_TYPE_SYNC_REQ) {
109         HDF_LOGE("Only sync and async message can send response!type=%u", context->requestType);
110         return;
111     }
112     senderId = context->senderId;
113     context->senderId = context->receiverId;
114     context->receiverId = senderId;
115     context->requestType = MESSAGE_RSP_START + context->requestType - MESSAGE_REQ_START;
116 }
117 
HandleAsyncResponse(MessageContext * context)118 static void HandleAsyncResponse(MessageContext *context)
119 {
120     if (context == NULL) {
121         return;
122     }
123 
124     if (context->callback != NULL) {
125         context->callback((RequestContext *)context, context->reqData, context->rspData, context->responseStatus);
126     }
127 
128     ReleaseMessageContext(context);
129 }
130 
HandleSyncResponse(MessageContext * context)131 static void HandleSyncResponse(MessageContext *context)
132 {
133     HDF_STATUS status;
134     if (context == NULL) {
135         HDF_LOGE("Input context is NULL!");
136         return;
137     }
138     status = OsalSemPost(&context->rspSemaphore);
139     if (status != HDF_SUCCESS) {
140         HDF_LOGE("Send semaphore failed!CMD=%u,Sender=%u,Receiver=%u", context->commandId, context->senderId,
141             context->receiverId);
142     }
143     return;
144 }
145 
HandleRequestMessage(MessageContext * context)146 static void HandleRequestMessage(MessageContext *context)
147 {
148     RemoteService *targetService = RefRemoteService(context->receiverId);
149     ErrorCode errCode = ME_SUCCESS;
150     RemoteService *rspService = NULL;
151     do {
152         if (targetService == NULL) {
153             HDF_LOGE("%s:Service %u is not available!", __func__, context->receiverId);
154             errCode = ME_ERROR_NULL_PTR;
155             break;
156         }
157 
158         if (targetService->ExecRequestMsg == NULL) {
159             HDF_LOGE("%s:Service %u has no ExecMsg method!", __func__, context->receiverId);
160             errCode = ME_ERROR_NULL_PTR;
161             break;
162         }
163         targetService->ExecRequestMsg(targetService, context);
164 
165         // Convert to response message
166         SetToResponse(context);
167 
168         if (context->requestType == MESSAGE_TYPE_ASYNC_RSP && context->callback == NULL) {
169             ReleaseMessageContext(context);
170             break;
171         }
172 
173         rspService = RefRemoteService(context->receiverId);
174         if (rspService == NULL) {
175             errCode = ME_ERROR_NO_SUCH_SERVICE;
176             break;
177         }
178         if (rspService->SendMessage == NULL) {
179             errCode = ME_ERROR_BAD_SERVICE;
180             break;
181         }
182         errCode = rspService->SendMessage(rspService, context);
183     } while (false);
184 
185     if (errCode != ME_SUCCESS) {
186         if (context->requestType == MESSAGE_TYPE_SYNC_RSP || context->requestType == MESSAGE_TYPE_SYNC_REQ) {
187             (void)OsalSemPost(&context->rspSemaphore);
188         } else {
189             ReleaseMessageContext(context);
190         }
191     }
192 
193     if (targetService != NULL && targetService->Disref != NULL) {
194         targetService->Disref(targetService);
195         targetService = NULL;
196     }
197 
198     if (rspService != NULL && rspService->Disref != NULL) {
199         rspService->Disref(rspService);
200         rspService = NULL;
201     }
202 }
203 
HandleMessage(MessageContext * context)204 static void HandleMessage(MessageContext *context)
205 {
206     if (context != NULL) {
207         switch (context->requestType) {
208             case MESSAGE_TYPE_SYNC_REQ:
209             case MESSAGE_TYPE_ASYNC_REQ:
210                 HandleRequestMessage(context);
211                 break;
212             case MESSAGE_TYPE_SYNC_RSP:
213                 HandleSyncResponse(context);
214                 break;
215             case MESSAGE_TYPE_ASYNC_RSP:
216                 HandleAsyncResponse(context);
217                 break;
218             default:
219                 HDF_LOGE("Unsupported message type %u", context->requestType);
220         }
221     }
222 }
223 
ReleaseAllMessage(MessageDispatcher * dispatcher)224 static void ReleaseAllMessage(MessageDispatcher *dispatcher)
225 {
226     MessageContext *context = NULL;
227     do {
228         context = PopPriorityQueue(dispatcher->messageQueue, 0);
229         ReleaseMessageContext(context);
230     } while (context != NULL);
231 }
232 
RunDispatcher(void * para)233 static int RunDispatcher(void *para)
234 {
235     MessageDispatcher *dispatcher = NULL;
236     MessageContext *context = NULL;
237     if (para == NULL) {
238         HDF_LOGE("Start dispatcher failed! cause:%s\n", "input para is NULL");
239         return ME_ERROR_NULL_PTR;
240     }
241     dispatcher = (MessageDispatcher *)para;
242     if (dispatcher->messageQueue == NULL) {
243         HDF_LOGE("Start dispatcher failed! cause:%s\n", "message queue is NULL");
244         return ME_ERROR_NULL_PTR;
245     }
246 
247     if (dispatcher->Ref != NULL) {
248         dispatcher = dispatcher->Ref(dispatcher);
249     }
250 
251     if (dispatcher->status != ME_STATUS_STARTTING) {
252         if (dispatcher->Disref != NULL) {
253             dispatcher->Disref(dispatcher);
254         }
255         HDF_LOGE("Start dispatcher failed! cause:%s\n", "dispatcher is not stopped");
256         return ME_ERROR_WRONG_STATUS;
257     } else {
258         dispatcher->status = ME_STATUS_RUNNING;
259     }
260     while (dispatcher->status == ME_STATUS_RUNNING) {
261         context = PopPriorityQueue(dispatcher->messageQueue, QUEUE_OPER_TIMEOUT);
262         if (context == NULL) {
263             continue;
264         }
265         HandleMessage(context);
266     }
267 
268     ReleaseAllMessage(dispatcher);
269     dispatcher->status = ME_STATUS_TODESTROY;
270     if (dispatcher->Disref != NULL) {
271         dispatcher->Disref(dispatcher);
272         dispatcher = NULL;
273     }
274 
275     HDF_LOGW("Dispatcher shutdown!");
276     return ME_SUCCESS;
277 }
278 
StartDispatcher(MessageDispatcher * dispatcher)279 static ErrorCode StartDispatcher(MessageDispatcher *dispatcher)
280 {
281     HDF_STATUS status;
282     ErrorCode errCode;
283     LocalMessageDispatcher *localDispatcher = NULL;
284     struct OsalThreadParam config;
285     if (dispatcher == NULL) {
286         return ME_ERROR_NULL_PTR;
287     }
288 
289     status = OsalMutexTimedLock(&dispatcher->mutex, HDF_WAIT_FOREVER);
290     if (status != HDF_SUCCESS) {
291         return ME_ERROR_OPER_MUTEX_FAILED;
292     }
293 
294     errCode = ME_SUCCESS;
295     do {
296         if (dispatcher->status != ME_STATUS_STOPPED) {
297             errCode = ME_ERROR_WRONG_STATUS;
298             break;
299         }
300         dispatcher->status = ME_STATUS_STARTTING;
301         config.name = "MessageDispatcher";
302         config.priority = OSAL_THREAD_PRI_DEFAULT;
303         config.stackSize = 0x2000;
304         localDispatcher = (LocalMessageDispatcher *)dispatcher;
305         status = OsalThreadCreate(&localDispatcher->dispatcherThread, RunDispatcher, localDispatcher);
306         if (status != HDF_SUCCESS) {
307             HDF_LOGE("%s:OsalThreadCreate failed!status=%d", __func__, status);
308             dispatcher->status = ME_STATUS_STOPPED;
309             errCode = ME_ERROR_CREATE_THREAD_FAILED;
310             break;
311         }
312 
313         status = OsalThreadStart(&localDispatcher->dispatcherThread, &config);
314         if (status != HDF_SUCCESS) {
315             HDF_LOGE("%s:OsalThreadStart failed!status=%d", __func__, status);
316             dispatcher->status = ME_STATUS_STOPPED;
317             OsalThreadDestroy(&localDispatcher->dispatcherThread);
318             errCode = ME_ERROR_CREATE_THREAD_FAILED;
319             break;
320         }
321     } while (false);
322 
323     status = OsalMutexUnlock(&dispatcher->mutex);
324     if (status != HDF_SUCCESS) {
325         HDF_LOGE("%s:Destroy mutex failed!", __func__);
326     }
327 
328     if (errCode != ME_SUCCESS) {
329         return errCode;
330     }
331 
332     do {
333         OsalMSleep(1);
334     } while (dispatcher->status == ME_STATUS_STARTTING);
335     return (dispatcher->status == ME_STATUS_RUNNING) ? ME_SUCCESS : ME_ERROR_WRONG_STATUS;
336 }
337 
ShutdownDispatcher(MessageDispatcher * dispatcher)338 static void ShutdownDispatcher(MessageDispatcher *dispatcher)
339 {
340     HDF_STATUS status;
341     if (dispatcher == NULL) {
342         return;
343     }
344     status = OsalMutexTimedLock(&dispatcher->mutex, HDF_WAIT_FOREVER);
345     if (status != HDF_SUCCESS) {
346         HDF_LOGE("Get lock failed!status=%d", status);
347         return;
348     }
349 
350     do {
351         if (dispatcher->status != ME_STATUS_RUNNING && dispatcher->status != ME_STATUS_STARTTING) {
352             HDF_LOGE("%s:wrong status.status=%d", __func__, dispatcher->status);
353             break;
354         }
355         dispatcher->status = ME_STATUS_STOPPING;
356     } while (false);
357 
358     status = OsalMutexUnlock(&dispatcher->mutex);
359     if (status != HDF_SUCCESS) {
360         HDF_LOGE("%s:Destroy mutex failed!", __func__);
361     }
362 }
363 
364 IMPLEMENT_SHARED_OBJ(MessageDispatcher);
DestroyLocalDispatcher(MessageDispatcher * dispatcher)365 static void DestroyLocalDispatcher(MessageDispatcher *dispatcher)
366 {
367     int32_t ret;
368     if (dispatcher == NULL) {
369         return;
370     }
371 
372     ReleaseAllMessage(dispatcher);
373 
374     if (dispatcher->messageQueue != NULL) {
375         DestroyPriorityQueue(dispatcher->messageQueue);
376         dispatcher->messageQueue = NULL;
377     }
378 
379     ret = OsalMutexDestroy(&dispatcher->mutex);
380     if (ret != HDF_SUCCESS) {
381         HDF_LOGE("%s:Release mutex failed.ret=%d", __func__, ret);
382     }
383 
384     DEINIT_SHARED_OBJ(MessageDispatcher, dispatcher);
385 }
386 
CreateLocalDispatcher(MessageDispatcher ** dispatcher,const DispatcherConfig * config)387 ErrorCode CreateLocalDispatcher(MessageDispatcher **dispatcher, const DispatcherConfig *config)
388 {
389     LocalMessageDispatcher *localDispatcher = NULL;
390     int32_t ret;
391     ErrorCode errCode;
392     if (dispatcher == NULL || config == NULL) {
393         return ME_ERROR_NULL_PTR;
394     }
395 
396     localDispatcher = (LocalMessageDispatcher *)OsalMemCalloc(sizeof(LocalMessageDispatcher));
397     if (localDispatcher == NULL) {
398         return ME_ERROR_RES_LAKE;
399     }
400     do {
401         localDispatcher->status = ME_STATUS_STOPPED;
402         localDispatcher->AppendMessage = AppendToLocalDispatcher;
403         localDispatcher->Shutdown = ShutdownDispatcher;
404         localDispatcher->Start = StartDispatcher;
405 
406         localDispatcher->messageQueue = CreatePriorityQueue(config->queueSize, config->priorityLevelCount);
407         if (localDispatcher->messageQueue == NULL) {
408             errCode = ME_ERROR_OPER_QUEUE_FAILED;
409             break;
410         }
411 
412         ret = OsalMutexInit(&localDispatcher->mutex);
413         if (ret != HDF_SUCCESS) {
414             errCode = ME_ERROR_OPER_MUTEX_FAILED;
415             break;
416         }
417 
418         errCode = INIT_SHARED_OBJ(MessageDispatcher, (MessageDispatcher *)localDispatcher, DestroyLocalDispatcher);
419         if (errCode != ME_SUCCESS) {
420             break;
421         }
422     } while (false);
423 
424     if (errCode == ME_SUCCESS) {
425         *dispatcher = (MessageDispatcher *)localDispatcher;
426     } else {
427         DestroyLocalDispatcher((MessageDispatcher *)localDispatcher);
428         OsalMemFree(localDispatcher);
429     }
430     return errCode;
431 }
432