1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "ipc_thread_pool.h"
17
18 #include <unistd.h>
19
20 #include "ipc_process_skeleton.h"
21 #include "iremote_invoker.h"
22 #include "rpc_errno.h"
23 #include "rpc_log.h"
24 #include "rpc_os_adapter.h"
25 #include "rpc_types.h"
26 #include "securec.h"
27
28 #define PROTO_NUM 2
29
30 static pthread_key_t g_localKey = -1;
31 static RemoteInvoker *g_invoker[PROTO_NUM];
32
GetCurrentThreadContext(void)33 ThreadContext *GetCurrentThreadContext(void)
34 {
35 ThreadContext *current = NULL;
36 void *curTLS = pthread_getspecific(g_localKey);
37 if (curTLS != NULL) {
38 current = (ThreadContext *)curTLS;
39 } else {
40 current = (ThreadContext *)calloc(1, sizeof(ThreadContext));
41 if (current == NULL) {
42 return NULL;
43 }
44 current->threadId = pthread_self();
45 current->proto = IF_PROT_DEFAULT;
46 current->callerPid = RpcGetPid();
47 current->callerUid = RpcGetUid();
48 pthread_setspecific(g_localKey, current);
49 }
50 return current;
51 }
52
TlsDestructor(void * args)53 static void TlsDestructor(void *args)
54 {
55 ThreadContext *threadContext = (ThreadContext *)args;
56 RemoteInvoker *invoker = g_invoker[threadContext->proto];
57 free(threadContext);
58 if (invoker != NULL && invoker->ExitCurrentThread != NULL) {
59 (invoker->ExitCurrentThread)();
60 }
61 }
62
ThreadContextDestructor(int32_t proto)63 static void ThreadContextDestructor(int32_t proto)
64 {
65 ThreadPool *threadPool = GetCurrentSkeleton()->threadPool;
66 pthread_mutex_lock(&threadPool->lock);
67 if (proto == IF_PROT_BINDER) {
68 ++threadPool->idleThreadNum;
69 } else if (proto == IF_PROT_DATABUS) {
70 ++threadPool->idleSocketThreadNum;
71 }
72 pthread_mutex_unlock(&threadPool->lock);
73 }
74
GetAndUpdateInvoker(int32_t proto)75 static RemoteInvoker *GetAndUpdateInvoker(int32_t proto)
76 {
77 ThreadContext *threadContext = GetCurrentThreadContext();
78 if (threadContext == NULL) {
79 return NULL;
80 }
81 threadContext->proto = proto;
82 return g_invoker[proto];
83 }
84
ThreadHandler(void * args)85 static void *ThreadHandler(void *args)
86 {
87 ThreadContext *threadContext = (ThreadContext *)args;
88 int32_t proto = threadContext->proto;
89 int32_t policy = threadContext->policy;
90 free(threadContext);
91 threadContext = NULL;
92 RemoteInvoker *invoker = GetAndUpdateInvoker(proto);
93 if (invoker != NULL) {
94 switch (policy) {
95 case SPAWN_PASSIVE:
96 invoker->JoinThread(false);
97 break;
98 case SPAWN_ACTIVE:
99 invoker->JoinThread(true);
100 break;
101 default:
102 break;
103 }
104 }
105 ThreadContextDestructor(proto);
106 return NULL;
107 }
108
InitThreadPool(int32_t maxThreadNum)109 ThreadPool *InitThreadPool(int32_t maxThreadNum)
110 {
111 ThreadPool *threadPool = (ThreadPool*)calloc(1, sizeof(ThreadPool));
112 if (threadPool == NULL) {
113 return NULL;
114 }
115 threadPool->maxThreadNum = maxThreadNum + maxThreadNum;
116 threadPool->idleThreadNum = maxThreadNum;
117 threadPool->idleSocketThreadNum = maxThreadNum;
118 pthread_mutex_init(&threadPool->lock, NULL);
119 pthread_key_create(&g_localKey, TlsDestructor);
120 for (int32_t index = 0; index < PROTO_NUM; ++index) {
121 g_invoker[index] = InitRemoteInvoker(index);
122 }
123 return threadPool;
124 }
125
DeinitThreadPool(ThreadPool * threadPool)126 void DeinitThreadPool(ThreadPool *threadPool)
127 {
128 if (threadPool == NULL) {
129 return;
130 }
131 pthread_mutex_destroy(&threadPool->lock);
132 pthread_key_delete(g_localKey);
133 free(threadPool);
134 for (int32_t index = 0; index < PROTO_NUM; ++index) {
135 DeinitRemoteInvoker(g_invoker[index], index);
136 g_invoker[index] = NULL;
137 }
138 }
139
SpawnNewThread(ThreadPool * threadPool,int32_t policy,int32_t proto)140 int32_t SpawnNewThread(ThreadPool *threadPool, int32_t policy, int32_t proto)
141 {
142 if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
143 !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
144 RPC_LOG_ERROR("thread pool is full.");
145 return ERR_INVALID_PARAM;
146 }
147 pthread_t threadId;
148 if (pthread_mutex_lock(&threadPool->lock) != 0) {
149 RPC_LOG_ERROR("get thread pool lock failed.");
150 return ERR_FAILED;
151 }
152 if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
153 !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
154 pthread_mutex_unlock(&threadPool->lock);
155 RPC_LOG_ERROR("thread pool is full.");
156 return ERR_INVALID_PARAM;
157 }
158 ThreadContext *threadContext = (ThreadContext *)calloc(1, sizeof(ThreadContext));
159 if (threadContext == NULL) {
160 pthread_mutex_unlock(&threadPool->lock);
161 RPC_LOG_ERROR("create thread context failed.");
162 return ERR_FAILED;
163 }
164 threadContext->proto = proto;
165 threadContext->policy = policy;
166 int ret = pthread_create(&threadId, NULL, ThreadHandler, threadContext);
167 if (ret != 0) {
168 pthread_mutex_unlock(&threadPool->lock);
169 free(threadContext);
170 RPC_LOG_ERROR("spawn new thread failed.");
171 return ERR_FAILED;
172 }
173 pthread_detach(threadId);
174 if (proto == IF_PROT_BINDER) {
175 --threadPool->idleThreadNum;
176 } else if (proto == IF_PROT_DATABUS) {
177 --threadPool->idleSocketThreadNum;
178 }
179 pthread_mutex_unlock(&threadPool->lock);
180 return ERR_NONE;
181 }
182
UpdateMaxThreadNum(ThreadPool * threadPool,int32_t maxThreadNum)183 void UpdateMaxThreadNum(ThreadPool *threadPool, int32_t maxThreadNum)
184 {
185 int32_t totalNum = maxThreadNum + maxThreadNum;
186 if (pthread_mutex_lock(&threadPool->lock) != 0) {
187 RPC_LOG_ERROR("get thread pool lock failed.");
188 return;
189 }
190 int32_t oldThreadNum = threadPool->maxThreadNum;
191 if (totalNum <= oldThreadNum) {
192 pthread_mutex_unlock(&threadPool->lock);
193 RPC_LOG_ERROR("not support set lower max thread num.");
194 return;
195 }
196 int32_t diff = totalNum - oldThreadNum;
197 threadPool->maxThreadNum = totalNum;
198 threadPool->idleThreadNum += diff / PROTO_NUM;
199 threadPool->idleSocketThreadNum += diff / PROTO_NUM;
200 pthread_mutex_unlock(&threadPool->lock);
201 }
202
GetRemoteInvoker(void)203 RemoteInvoker *GetRemoteInvoker(void)
204 {
205 ThreadContext *threadContext = GetCurrentThreadContext();
206 if (threadContext == NULL) {
207 return NULL;
208 }
209 return g_invoker[threadContext->proto];
210 }