1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "ipc_thread_pool.h"
17
18 #include <unistd.h>
19
20 #include "ipc_process_skeleton.h"
21 #include "iremote_invoker.h"
22 #include "rpc_errno.h"
23 #include "rpc_log.h"
24 #include "rpc_os_adapter.h"
25 #include "rpc_types.h"
26 #include "securec.h"
27
28 #define PROTO_NUM 2
29
30 static pthread_key_t g_localKey = -1;
31 static RemoteInvoker *g_invoker[PROTO_NUM];
32
GetCurrentThreadContext(void)33 ThreadContext *GetCurrentThreadContext(void)
34 {
35 ThreadContext *current = NULL;
36 void *curTLS = pthread_getspecific(g_localKey);
37 if (curTLS != NULL) {
38 current = (ThreadContext *)curTLS;
39 } else {
40 current = (ThreadContext *)calloc(1, sizeof(ThreadContext));
41 if (current == NULL) {
42 return NULL;
43 }
44 current->threadId = pthread_self();
45 current->proto = IF_PROT_DEFAULT;
46 current->callerPid = RpcGetPid();
47 current->callerUid = RpcGetUid();
48 pthread_setspecific(g_localKey, current);
49 }
50 return current;
51 }
52
TlsDestructor(void * args)53 static void TlsDestructor(void *args)
54 {
55 ThreadContext *threadContext = (ThreadContext *)args;
56 RemoteInvoker *invoker = g_invoker[threadContext->proto];
57 free(threadContext);
58 if (invoker != NULL && invoker->ExitCurrentThread != NULL) {
59 (invoker->ExitCurrentThread)();
60 }
61 }
62
ThreadContextDestructor(int32_t proto)63 static void ThreadContextDestructor(int32_t proto)
64 {
65 IpcSkeleton *ipcSkeleton = GetCurrentSkeleton();
66 if (ipcSkeleton == NULL || ipcSkeleton->threadPool == NULL) {
67 return;
68 }
69 ThreadPool *threadPool = ipcSkeleton->threadPool;
70 pthread_mutex_lock(&threadPool->lock);
71 if (proto == IF_PROT_BINDER) {
72 ++threadPool->idleThreadNum;
73 } else if (proto == IF_PROT_DATABUS) {
74 ++threadPool->idleSocketThreadNum;
75 }
76 pthread_mutex_unlock(&threadPool->lock);
77 }
78
GetAndUpdateInvoker(int32_t proto)79 static RemoteInvoker *GetAndUpdateInvoker(int32_t proto)
80 {
81 ThreadContext *threadContext = GetCurrentThreadContext();
82 if (threadContext == NULL) {
83 return NULL;
84 }
85 threadContext->proto = proto;
86 return g_invoker[proto];
87 }
88
ThreadHandler(void * args)89 static void *ThreadHandler(void *args)
90 {
91 ThreadContext *threadContext = (ThreadContext *)args;
92 int32_t proto = threadContext->proto;
93 int32_t policy = threadContext->policy;
94 free(threadContext);
95 threadContext = NULL;
96 RemoteInvoker *invoker = GetAndUpdateInvoker(proto);
97 if (invoker != NULL) {
98 switch (policy) {
99 case SPAWN_PASSIVE:
100 invoker->JoinThread(false);
101 break;
102 case SPAWN_ACTIVE:
103 invoker->JoinThread(true);
104 break;
105 default:
106 break;
107 }
108 }
109 ThreadContextDestructor(proto);
110 return NULL;
111 }
112
InitThreadPool(int32_t maxThreadNum)113 ThreadPool *InitThreadPool(int32_t maxThreadNum)
114 {
115 ThreadPool *threadPool = (ThreadPool*)calloc(1, sizeof(ThreadPool));
116 if (threadPool == NULL) {
117 return NULL;
118 }
119 threadPool->maxThreadNum = maxThreadNum + maxThreadNum;
120 threadPool->idleThreadNum = maxThreadNum;
121 threadPool->idleSocketThreadNum = maxThreadNum;
122 pthread_mutex_init(&threadPool->lock, NULL);
123 pthread_key_create(&g_localKey, TlsDestructor);
124 for (int32_t index = 0; index < PROTO_NUM; ++index) {
125 g_invoker[index] = InitRemoteInvoker(index);
126 }
127 return threadPool;
128 }
129
DeinitThreadPool(ThreadPool * threadPool)130 void DeinitThreadPool(ThreadPool *threadPool)
131 {
132 if (threadPool == NULL) {
133 return;
134 }
135 pthread_mutex_destroy(&threadPool->lock);
136 pthread_key_delete(g_localKey);
137 free(threadPool);
138 threadPool = NULL;
139 for (int32_t index = 0; index < PROTO_NUM; ++index) {
140 DeinitRemoteInvoker(g_invoker[index], index);
141 g_invoker[index] = NULL;
142 }
143 }
144
SpawnNewThread(ThreadPool * threadPool,int32_t policy,int32_t proto)145 int32_t SpawnNewThread(ThreadPool *threadPool, int32_t policy, int32_t proto)
146 {
147 if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
148 !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
149 RPC_LOG_ERROR("thread pool is full.");
150 return ERR_INVALID_PARAM;
151 }
152 pthread_t threadId;
153 if (pthread_mutex_lock(&threadPool->lock) != 0) {
154 RPC_LOG_ERROR("get thread pool lock failed.");
155 return ERR_FAILED;
156 }
157 if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
158 !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
159 pthread_mutex_unlock(&threadPool->lock);
160 RPC_LOG_ERROR("thread pool is full.");
161 return ERR_INVALID_PARAM;
162 }
163 ThreadContext *threadContext = (ThreadContext *)calloc(1, sizeof(ThreadContext));
164 if (threadContext == NULL) {
165 pthread_mutex_unlock(&threadPool->lock);
166 RPC_LOG_ERROR("create thread context failed.");
167 return ERR_FAILED;
168 }
169 threadContext->proto = proto;
170 threadContext->policy = policy;
171 int ret = pthread_create(&threadId, NULL, ThreadHandler, threadContext);
172 if (ret != 0) {
173 pthread_mutex_unlock(&threadPool->lock);
174 free(threadContext);
175 RPC_LOG_ERROR("spawn new thread failed.");
176 return ERR_FAILED;
177 }
178 pthread_detach(threadId);
179 if (proto == IF_PROT_BINDER) {
180 --threadPool->idleThreadNum;
181 } else if (proto == IF_PROT_DATABUS) {
182 --threadPool->idleSocketThreadNum;
183 }
184 pthread_mutex_unlock(&threadPool->lock);
185 return ERR_NONE;
186 }
187
UpdateMaxThreadNum(ThreadPool * threadPool,int32_t maxThreadNum)188 void UpdateMaxThreadNum(ThreadPool *threadPool, int32_t maxThreadNum)
189 {
190 int32_t totalNum = maxThreadNum + maxThreadNum;
191 if (pthread_mutex_lock(&threadPool->lock) != 0) {
192 RPC_LOG_ERROR("get thread pool lock failed.");
193 return;
194 }
195 int32_t oldThreadNum = threadPool->maxThreadNum;
196 if (totalNum <= oldThreadNum) {
197 pthread_mutex_unlock(&threadPool->lock);
198 RPC_LOG_ERROR("not support set lower max thread num.");
199 return;
200 }
201 int32_t diff = totalNum - oldThreadNum;
202 threadPool->maxThreadNum = totalNum;
203 threadPool->idleThreadNum += diff / PROTO_NUM;
204 threadPool->idleSocketThreadNum += diff / PROTO_NUM;
205 pthread_mutex_unlock(&threadPool->lock);
206 }
207
GetRemoteInvoker(void)208 RemoteInvoker *GetRemoteInvoker(void)
209 {
210 ThreadContext *threadContext = GetCurrentThreadContext();
211 if (threadContext == NULL) {
212 return NULL;
213 }
214 return g_invoker[threadContext->proto];
215 }