1 /*
2 * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
3 *
4 * HDF is dual licensed: you can use it either under the terms of
5 * the GPL, or the BSD license, at your option.
6 * See the LICENSE file in the root of this repository for complete details.
7 */
8
9 #include "securec.h"
10 #include "osal/osal_thread.h"
11 #include "osal/osal_time.h"
12 #include "osal/osal_mutex.h"
13 #include "utils/hdf_log.h"
14 #include "message_dispatcher.h"
15 #include "hdf_wlan_priority_queue.h"
16 #include "message_router_inner.h"
17
18 #ifdef USERSPACE_CLIENT_SUPPORT
19 #define HDF_LOG_TAG UMsgEngine
20 #else
21 #define HDF_LOG_TAG KMsgEngine
22 #endif
23
24 typedef struct {
25 INHERT_MESSAGE_DISPATCHER;
26 OSAL_DECLARE_THREAD(dispatcherThread);
27 } LocalMessageDispatcher;
28
ReleaseMessageContext(MessageContext * context)29 void ReleaseMessageContext(MessageContext *context)
30 {
31 if (context == NULL) {
32 return;
33 }
34 if (context->rspData != NULL) {
35 HdfSbufRecycle(context->rspData);
36 context->rspData = NULL;
37 }
38
39 if (context->crossNode ||
40 (context->requestType != MESSAGE_TYPE_SYNC_REQ && context->requestType != MESSAGE_TYPE_SYNC_RSP)) {
41 // Sync request message may use stack mem.Memory is managed by user
42 if (context->reqData != NULL) {
43 HdfSbufRecycle(context->reqData);
44 context->reqData = NULL;
45 }
46 OsalMemFree(context);
47 }
48 }
49
ReleaseMessageMapper(struct ServiceDef * mapper)50 void ReleaseMessageMapper(struct ServiceDef *mapper)
51 {
52 if (mapper == NULL) {
53 return;
54 }
55 if (mapper->messages != NULL) {
56 OsalMemFree(mapper->messages);
57 mapper->messages = NULL;
58 }
59 OsalMemFree(mapper);
60 }
61
GetMsgDef(const struct ServiceDef * serviceDef,uint32_t commandId)62 struct MessageDef *GetMsgDef(const struct ServiceDef *serviceDef, uint32_t commandId)
63 {
64 struct MessageDef *msgDef = NULL;
65 if (serviceDef == NULL || serviceDef->messages == NULL) {
66 HDF_LOGE("%s:input is NULL!", __func__);
67 return NULL;
68 }
69 if (commandId >= serviceDef->messagesLength) {
70 HDF_LOGE("%s:commandId exceed service def!", __func__);
71 return NULL;
72 }
73
74 msgDef = serviceDef->messages + commandId;
75 if (msgDef->handler == NULL) {
76 HDF_LOGE("%s:command has no handler!", __func__);
77 return NULL;
78 }
79 return msgDef;
80 }
81
AppendToLocalDispatcher(MessageDispatcher * dispatcher,const uint8_t priority,MessageContext * context)82 ErrorCode AppendToLocalDispatcher(MessageDispatcher *dispatcher, const uint8_t priority, MessageContext *context)
83 {
84 if (context == NULL) {
85 HDF_LOGE("%s:Input context is NULL!", __func__);
86 return ME_ERROR_NULL_PTR;
87 }
88 if (dispatcher == NULL) {
89 HDF_LOGE("%s:Input dispatcher is NULL!", __func__);
90 return ME_ERROR_NULL_PTR;
91 }
92
93 if (dispatcher->messageQueue == NULL) {
94 HDF_LOGE("MessageQueue is NULL.");
95 return ME_ERROR_NULL_PTR;
96 }
97
98 if (dispatcher->status != ME_STATUS_RUNNING) {
99 HDF_LOGE("%s:dispatcher is not running", __func__);
100 return ME_ERROR_DISPATCHER_NOT_RUNNING;
101 }
102 return PushPriorityQueue(dispatcher->messageQueue, priority, context);
103 }
104
SetToResponse(MessageContext * context)105 void SetToResponse(MessageContext *context)
106 {
107 ServiceId senderId;
108 if (context->requestType != MESSAGE_TYPE_ASYNC_REQ && context->requestType != MESSAGE_TYPE_SYNC_REQ) {
109 HDF_LOGE("Only sync and async message can send response!type=%u", context->requestType);
110 return;
111 }
112 senderId = context->senderId;
113 context->senderId = context->receiverId;
114 context->receiverId = senderId;
115 context->requestType = MESSAGE_RSP_START + context->requestType - MESSAGE_REQ_START;
116 }
117
HandleAsyncResponse(MessageContext * context)118 static void HandleAsyncResponse(MessageContext *context)
119 {
120 if (context == NULL) {
121 return;
122 }
123
124 if (context->callback != NULL) {
125 context->callback((RequestContext *)context, context->reqData, context->rspData, context->responseStatus);
126 }
127
128 ReleaseMessageContext(context);
129 }
130
HandleSyncResponse(MessageContext * context)131 static void HandleSyncResponse(MessageContext *context)
132 {
133 HDF_STATUS status;
134 if (context == NULL) {
135 HDF_LOGE("Input context is NULL!");
136 return;
137 }
138 status = OsalSemPost(&context->rspSemaphore);
139 if (status != HDF_SUCCESS) {
140 HDF_LOGE("Send semaphore failed!CMD=%u,Sender=%u,Receiver=%u", context->commandId, context->senderId,
141 context->receiverId);
142 }
143 return;
144 }
145
HandleRequestMessage(MessageContext * context)146 static void HandleRequestMessage(MessageContext *context)
147 {
148 RemoteService *targetService = RefRemoteService(context->receiverId);
149 ErrorCode errCode = ME_SUCCESS;
150 RemoteService *rspService = NULL;
151 do {
152 if (targetService == NULL) {
153 HDF_LOGE("%s:Service %u is not available!", __func__, context->receiverId);
154 errCode = ME_ERROR_NULL_PTR;
155 break;
156 }
157
158 if (targetService->ExecRequestMsg == NULL) {
159 HDF_LOGE("%s:Service %u has no ExecMsg method!", __func__, context->receiverId);
160 errCode = ME_ERROR_NULL_PTR;
161 break;
162 }
163 targetService->ExecRequestMsg(targetService, context);
164
165 // Convert to response message
166 SetToResponse(context);
167
168 if (context->requestType == MESSAGE_TYPE_ASYNC_RSP && context->callback == NULL) {
169 ReleaseMessageContext(context);
170 break;
171 }
172
173 rspService = RefRemoteService(context->receiverId);
174 if (rspService == NULL) {
175 errCode = ME_ERROR_NO_SUCH_SERVICE;
176 break;
177 }
178 if (rspService->SendMessage == NULL) {
179 errCode = ME_ERROR_BAD_SERVICE;
180 break;
181 }
182 errCode = rspService->SendMessage(rspService, context);
183 } while (false);
184
185 if (errCode != ME_SUCCESS) {
186 if (context->requestType == MESSAGE_TYPE_SYNC_RSP || context->requestType == MESSAGE_TYPE_SYNC_REQ) {
187 (void)OsalSemPost(&context->rspSemaphore);
188 } else {
189 ReleaseMessageContext(context);
190 }
191 }
192
193 if (targetService != NULL && targetService->Disref != NULL) {
194 targetService->Disref(targetService);
195 targetService = NULL;
196 }
197
198 if (rspService != NULL && rspService->Disref != NULL) {
199 rspService->Disref(rspService);
200 rspService = NULL;
201 }
202 }
203
HandleMessage(MessageContext * context)204 static void HandleMessage(MessageContext *context)
205 {
206 if (context != NULL) {
207 switch (context->requestType) {
208 case MESSAGE_TYPE_SYNC_REQ:
209 case MESSAGE_TYPE_ASYNC_REQ:
210 HandleRequestMessage(context);
211 break;
212 case MESSAGE_TYPE_SYNC_RSP:
213 HandleSyncResponse(context);
214 break;
215 case MESSAGE_TYPE_ASYNC_RSP:
216 HandleAsyncResponse(context);
217 break;
218 default:
219 HDF_LOGE("Unsupported message type %u", context->requestType);
220 }
221 }
222 }
223
ReleaseAllMessage(MessageDispatcher * dispatcher)224 static void ReleaseAllMessage(MessageDispatcher *dispatcher)
225 {
226 MessageContext *context = NULL;
227 do {
228 context = PopPriorityQueue(dispatcher->messageQueue, 0);
229 ReleaseMessageContext(context);
230 } while (context != NULL);
231 }
232
RunDispatcher(void * para)233 static int RunDispatcher(void *para)
234 {
235 MessageDispatcher *dispatcher = NULL;
236 MessageContext *context = NULL;
237 if (para == NULL) {
238 HDF_LOGE("Start dispatcher failed! cause:%s\n", "input para is NULL");
239 return ME_ERROR_NULL_PTR;
240 }
241 dispatcher = (MessageDispatcher *)para;
242 if (dispatcher->messageQueue == NULL) {
243 HDF_LOGE("Start dispatcher failed! cause:%s\n", "message queue is NULL");
244 return ME_ERROR_NULL_PTR;
245 }
246
247 if (dispatcher->Ref != NULL) {
248 dispatcher = dispatcher->Ref(dispatcher);
249 }
250
251 if (dispatcher->status != ME_STATUS_STARTTING) {
252 if (dispatcher->Disref != NULL) {
253 dispatcher->Disref(dispatcher);
254 }
255 HDF_LOGE("Start dispatcher failed! cause:%s\n", "dispatcher is not stopped");
256 return ME_ERROR_WRONG_STATUS;
257 } else {
258 dispatcher->status = ME_STATUS_RUNNING;
259 }
260 while (dispatcher->status == ME_STATUS_RUNNING) {
261 context = PopPriorityQueue(dispatcher->messageQueue, QUEUE_OPER_TIMEOUT);
262 if (context == NULL) {
263 continue;
264 }
265 HandleMessage(context);
266 }
267
268 ReleaseAllMessage(dispatcher);
269 dispatcher->status = ME_STATUS_TODESTROY;
270 if (dispatcher->Disref != NULL) {
271 dispatcher->Disref(dispatcher);
272 dispatcher = NULL;
273 }
274
275 HDF_LOGW("Dispatcher shutdown!");
276 return ME_SUCCESS;
277 }
278
StartDispatcher(MessageDispatcher * dispatcher)279 static ErrorCode StartDispatcher(MessageDispatcher *dispatcher)
280 {
281 HDF_STATUS status;
282 ErrorCode errCode;
283 LocalMessageDispatcher *localDispatcher = NULL;
284 struct OsalThreadParam config;
285 if (dispatcher == NULL) {
286 return ME_ERROR_NULL_PTR;
287 }
288
289 status = OsalMutexTimedLock(&dispatcher->mutex, HDF_WAIT_FOREVER);
290 if (status != HDF_SUCCESS) {
291 return ME_ERROR_OPER_MUTEX_FAILED;
292 }
293
294 errCode = ME_SUCCESS;
295 do {
296 if (dispatcher->status != ME_STATUS_STOPPED) {
297 errCode = ME_ERROR_WRONG_STATUS;
298 break;
299 }
300 dispatcher->status = ME_STATUS_STARTTING;
301 config.name = "MessageDispatcher";
302 config.priority = OSAL_THREAD_PRI_DEFAULT;
303 config.stackSize = 0x2000;
304 localDispatcher = (LocalMessageDispatcher *)dispatcher;
305 status = OsalThreadCreate(&localDispatcher->dispatcherThread, RunDispatcher, localDispatcher);
306 if (status != HDF_SUCCESS) {
307 HDF_LOGE("%s:OsalThreadCreate failed!status=%d", __func__, status);
308 dispatcher->status = ME_STATUS_STOPPED;
309 errCode = ME_ERROR_CREATE_THREAD_FAILED;
310 break;
311 }
312
313 status = OsalThreadStart(&localDispatcher->dispatcherThread, &config);
314 if (status != HDF_SUCCESS) {
315 HDF_LOGE("%s:OsalThreadStart failed!status=%d", __func__, status);
316 dispatcher->status = ME_STATUS_STOPPED;
317 OsalThreadDestroy(&localDispatcher->dispatcherThread);
318 errCode = ME_ERROR_CREATE_THREAD_FAILED;
319 break;
320 }
321 } while (false);
322
323 status = OsalMutexUnlock(&dispatcher->mutex);
324 if (status != HDF_SUCCESS) {
325 HDF_LOGE("%s:Destroy mutex failed!", __func__);
326 }
327
328 if (errCode != ME_SUCCESS) {
329 return errCode;
330 }
331
332 do {
333 OsalMSleep(1);
334 } while (dispatcher->status == ME_STATUS_STARTTING);
335 return (dispatcher->status == ME_STATUS_RUNNING) ? ME_SUCCESS : ME_ERROR_WRONG_STATUS;
336 }
337
ShutdownDispatcher(MessageDispatcher * dispatcher)338 static void ShutdownDispatcher(MessageDispatcher *dispatcher)
339 {
340 HDF_STATUS status;
341 if (dispatcher == NULL) {
342 return;
343 }
344 status = OsalMutexTimedLock(&dispatcher->mutex, HDF_WAIT_FOREVER);
345 if (status != HDF_SUCCESS) {
346 HDF_LOGE("Get lock failed!status=%d", status);
347 return;
348 }
349
350 do {
351 if (dispatcher->status != ME_STATUS_RUNNING && dispatcher->status != ME_STATUS_STARTTING) {
352 HDF_LOGE("%s:wrong status.status=%d", __func__, dispatcher->status);
353 break;
354 }
355 dispatcher->status = ME_STATUS_STOPPING;
356 } while (false);
357
358 status = OsalMutexUnlock(&dispatcher->mutex);
359 if (status != HDF_SUCCESS) {
360 HDF_LOGE("%s:Destroy mutex failed!", __func__);
361 }
362 }
363
364 IMPLEMENT_SHARED_OBJ(MessageDispatcher);
DestroyLocalDispatcher(MessageDispatcher * dispatcher)365 static void DestroyLocalDispatcher(MessageDispatcher *dispatcher)
366 {
367 int32_t ret;
368 if (dispatcher == NULL) {
369 return;
370 }
371
372 ReleaseAllMessage(dispatcher);
373
374 if (dispatcher->messageQueue != NULL) {
375 DestroyPriorityQueue(dispatcher->messageQueue);
376 dispatcher->messageQueue = NULL;
377 }
378
379 ret = OsalMutexDestroy(&dispatcher->mutex);
380 if (ret != HDF_SUCCESS) {
381 HDF_LOGE("%s:Release mutex failed.ret=%d", __func__, ret);
382 }
383
384 DEINIT_SHARED_OBJ(MessageDispatcher, dispatcher);
385 }
386
CreateLocalDispatcher(MessageDispatcher ** dispatcher,const DispatcherConfig * config)387 ErrorCode CreateLocalDispatcher(MessageDispatcher **dispatcher, const DispatcherConfig *config)
388 {
389 LocalMessageDispatcher *localDispatcher = NULL;
390 int32_t ret;
391 ErrorCode errCode;
392 if (dispatcher == NULL || config == NULL) {
393 return ME_ERROR_NULL_PTR;
394 }
395
396 localDispatcher = (LocalMessageDispatcher *)OsalMemCalloc(sizeof(LocalMessageDispatcher));
397 if (localDispatcher == NULL) {
398 return ME_ERROR_RES_LAKE;
399 }
400 do {
401 localDispatcher->status = ME_STATUS_STOPPED;
402 localDispatcher->AppendMessage = AppendToLocalDispatcher;
403 localDispatcher->Shutdown = ShutdownDispatcher;
404 localDispatcher->Start = StartDispatcher;
405
406 localDispatcher->messageQueue = CreatePriorityQueue(config->queueSize, config->priorityLevelCount);
407 if (localDispatcher->messageQueue == NULL) {
408 errCode = ME_ERROR_OPER_QUEUE_FAILED;
409 break;
410 }
411
412 ret = OsalMutexInit(&localDispatcher->mutex);
413 if (ret != HDF_SUCCESS) {
414 errCode = ME_ERROR_OPER_MUTEX_FAILED;
415 break;
416 }
417
418 errCode = INIT_SHARED_OBJ(MessageDispatcher, (MessageDispatcher *)localDispatcher, DestroyLocalDispatcher);
419 if (errCode != ME_SUCCESS) {
420 break;
421 }
422 } while (false);
423
424 if (errCode == ME_SUCCESS) {
425 *dispatcher = (MessageDispatcher *)localDispatcher;
426 } else {
427 DestroyLocalDispatcher((MessageDispatcher *)localDispatcher);
428 OsalMemFree(localDispatcher);
429 }
430 return errCode;
431 }
432