• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ipc_thread_pool.h"
17 
18 #include <unistd.h>
19 
20 #include "ipc_process_skeleton.h"
21 #include "iremote_invoker.h"
22 #include "rpc_errno.h"
23 #include "rpc_log.h"
24 #include "rpc_os_adapter.h"
25 #include "rpc_types.h"
26 #include "securec.h"
27 
28 #define PROTO_NUM 2
29 
30 static pthread_key_t g_localKey = -1;
31 static RemoteInvoker *g_invoker[PROTO_NUM];
32 
GetCurrentThreadContext(void)33 ThreadContext *GetCurrentThreadContext(void)
34 {
35     ThreadContext *current = NULL;
36     void *curTLS = pthread_getspecific(g_localKey);
37     if (curTLS != NULL) {
38         current = (ThreadContext *)curTLS;
39     } else {
40         current = (ThreadContext *)calloc(1, sizeof(ThreadContext));
41         if (current == NULL) {
42             return NULL;
43         }
44         current->threadId = pthread_self();
45         current->proto = IF_PROT_DEFAULT;
46         current->callerPid = RpcGetPid();
47         current->callerUid = RpcGetUid();
48         pthread_setspecific(g_localKey, current);
49     }
50     return current;
51 }
52 
TlsDestructor(void * args)53 static void TlsDestructor(void *args)
54 {
55     ThreadContext *threadContext = (ThreadContext *)args;
56     RemoteInvoker *invoker = g_invoker[threadContext->proto];
57     free(threadContext);
58     if (invoker != NULL && invoker->ExitCurrentThread != NULL) {
59         (invoker->ExitCurrentThread)();
60     }
61 }
62 
ThreadContextDestructor(int32_t proto)63 static void ThreadContextDestructor(int32_t proto)
64 {
65     IpcSkeleton *ipcSkeleton = GetCurrentSkeleton();
66     if (ipcSkeleton == NULL || ipcSkeleton->threadPool == NULL) {
67         return;
68     }
69     ThreadPool *threadPool = ipcSkeleton->threadPool;
70     pthread_mutex_lock(&threadPool->lock);
71     if (proto == IF_PROT_BINDER) {
72         ++threadPool->idleThreadNum;
73     } else if (proto == IF_PROT_DATABUS) {
74         ++threadPool->idleSocketThreadNum;
75     }
76     pthread_mutex_unlock(&threadPool->lock);
77 }
78 
GetAndUpdateInvoker(int32_t proto)79 static RemoteInvoker *GetAndUpdateInvoker(int32_t proto)
80 {
81     ThreadContext *threadContext = GetCurrentThreadContext();
82     if (threadContext == NULL) {
83         return NULL;
84     }
85     threadContext->proto = proto;
86     return g_invoker[proto];
87 }
88 
ThreadHandler(void * args)89 static void *ThreadHandler(void *args)
90 {
91     ThreadContext *threadContext = (ThreadContext *)args;
92     int32_t proto = threadContext->proto;
93     int32_t policy = threadContext->policy;
94     free(threadContext);
95     threadContext = NULL;
96     RemoteInvoker *invoker = GetAndUpdateInvoker(proto);
97     if (invoker != NULL) {
98         switch (policy) {
99             case SPAWN_PASSIVE:
100                 invoker->JoinThread(false);
101                 break;
102             case SPAWN_ACTIVE:
103                 invoker->JoinThread(true);
104                 break;
105             default:
106                 break;
107         }
108     }
109     ThreadContextDestructor(proto);
110     return NULL;
111 }
112 
InitThreadPool(int32_t maxThreadNum)113 ThreadPool *InitThreadPool(int32_t maxThreadNum)
114 {
115     ThreadPool *threadPool = (ThreadPool*)calloc(1, sizeof(ThreadPool));
116     if (threadPool == NULL) {
117         return NULL;
118     }
119     threadPool->maxThreadNum = maxThreadNum + maxThreadNum;
120     threadPool->idleThreadNum = maxThreadNum;
121     threadPool->idleSocketThreadNum = maxThreadNum;
122     pthread_mutex_init(&threadPool->lock, NULL);
123     pthread_key_create(&g_localKey, TlsDestructor);
124     for (int32_t index = 0; index < PROTO_NUM; ++index) {
125         g_invoker[index] = InitRemoteInvoker(index);
126     }
127     return threadPool;
128 }
129 
DeinitThreadPool(ThreadPool * threadPool)130 void DeinitThreadPool(ThreadPool *threadPool)
131 {
132     if (threadPool == NULL) {
133         return;
134     }
135     pthread_mutex_destroy(&threadPool->lock);
136     pthread_key_delete(g_localKey);
137     free(threadPool);
138     threadPool = NULL;
139     for (int32_t index = 0; index < PROTO_NUM; ++index) {
140         DeinitRemoteInvoker(g_invoker[index], index);
141         g_invoker[index] = NULL;
142     }
143 }
144 
SpawnNewThread(ThreadPool * threadPool,int32_t policy,int32_t proto)145 int32_t SpawnNewThread(ThreadPool *threadPool, int32_t policy, int32_t proto)
146 {
147     if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
148         !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
149         RPC_LOG_ERROR("thread pool is full.");
150         return ERR_INVALID_PARAM;
151     }
152     pthread_t threadId;
153     if (pthread_mutex_lock(&threadPool->lock) != 0) {
154         RPC_LOG_ERROR("get thread pool lock failed.");
155         return ERR_FAILED;
156     }
157     if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
158         !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
159         pthread_mutex_unlock(&threadPool->lock);
160         RPC_LOG_ERROR("thread pool is full.");
161         return ERR_INVALID_PARAM;
162     }
163     ThreadContext *threadContext = (ThreadContext *)calloc(1, sizeof(ThreadContext));
164     if (threadContext == NULL) {
165         pthread_mutex_unlock(&threadPool->lock);
166         RPC_LOG_ERROR("create thread context failed.");
167         return ERR_FAILED;
168     }
169     threadContext->proto = proto;
170     threadContext->policy = policy;
171     int ret = pthread_create(&threadId, NULL, ThreadHandler, threadContext);
172     if (ret != 0) {
173         pthread_mutex_unlock(&threadPool->lock);
174         free(threadContext);
175         RPC_LOG_ERROR("spawn new thread failed.");
176         return ERR_FAILED;
177     }
178     pthread_detach(threadId);
179     if (proto == IF_PROT_BINDER) {
180         --threadPool->idleThreadNum;
181     } else if (proto == IF_PROT_DATABUS) {
182         --threadPool->idleSocketThreadNum;
183     }
184     pthread_mutex_unlock(&threadPool->lock);
185     return ERR_NONE;
186 }
187 
UpdateMaxThreadNum(ThreadPool * threadPool,int32_t maxThreadNum)188 void UpdateMaxThreadNum(ThreadPool *threadPool, int32_t maxThreadNum)
189 {
190     int32_t totalNum = maxThreadNum + maxThreadNum;
191     if (pthread_mutex_lock(&threadPool->lock) != 0) {
192         RPC_LOG_ERROR("get thread pool lock failed.");
193         return;
194     }
195     int32_t oldThreadNum = threadPool->maxThreadNum;
196     if (totalNum <= oldThreadNum) {
197         pthread_mutex_unlock(&threadPool->lock);
198         RPC_LOG_ERROR("not support set lower max thread num.");
199         return;
200     }
201     int32_t diff = totalNum - oldThreadNum;
202     threadPool->maxThreadNum = totalNum;
203     threadPool->idleThreadNum += diff / PROTO_NUM;
204     threadPool->idleSocketThreadNum += diff / PROTO_NUM;
205     pthread_mutex_unlock(&threadPool->lock);
206 }
207 
GetRemoteInvoker(void)208 RemoteInvoker *GetRemoteInvoker(void)
209 {
210     ThreadContext *threadContext = GetCurrentThreadContext();
211     if (threadContext == NULL) {
212         return NULL;
213     }
214     return g_invoker[threadContext->proto];
215 }