• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ipc_thread_pool.h"
17 
18 #include <unistd.h>
19 
20 #include "ipc_process_skeleton.h"
21 #include "iremote_invoker.h"
22 #include "rpc_errno.h"
23 #include "rpc_log.h"
24 #include "rpc_os_adapter.h"
25 #include "rpc_types.h"
26 #include "securec.h"
27 
28 #define PROTO_NUM 2
29 
30 static pthread_key_t g_localKey = -1;
31 static RemoteInvoker *g_invoker[PROTO_NUM];
32 
GetCurrentThreadContext(void)33 ThreadContext *GetCurrentThreadContext(void)
34 {
35     ThreadContext *current = NULL;
36     void *curTLS = pthread_getspecific(g_localKey);
37     if (curTLS != NULL) {
38         current = (ThreadContext *)curTLS;
39     } else {
40         current = (ThreadContext *)calloc(1, sizeof(ThreadContext));
41         if (current == NULL) {
42             return NULL;
43         }
44         current->threadId = pthread_self();
45         current->proto = IF_PROT_DEFAULT;
46         current->callerPid = RpcGetPid();
47         current->callerUid = RpcGetUid();
48         pthread_setspecific(g_localKey, current);
49     }
50     return current;
51 }
52 
TlsDestructor(void * args)53 static void TlsDestructor(void *args)
54 {
55     ThreadContext *threadContext = (ThreadContext *)args;
56     RemoteInvoker *invoker = g_invoker[threadContext->proto];
57     free(threadContext);
58     if (invoker != NULL && invoker->ExitCurrentThread != NULL) {
59         (invoker->ExitCurrentThread)();
60     }
61 }
62 
ThreadContextDestructor(int32_t proto)63 static void ThreadContextDestructor(int32_t proto)
64 {
65     ThreadPool *threadPool = GetCurrentSkeleton()->threadPool;
66     pthread_mutex_lock(&threadPool->lock);
67     if (proto == IF_PROT_BINDER) {
68         ++threadPool->idleThreadNum;
69     } else if (proto == IF_PROT_DATABUS) {
70         ++threadPool->idleSocketThreadNum;
71     }
72     pthread_mutex_unlock(&threadPool->lock);
73 }
74 
GetAndUpdateInvoker(int32_t proto)75 static RemoteInvoker *GetAndUpdateInvoker(int32_t proto)
76 {
77     ThreadContext *threadContext = GetCurrentThreadContext();
78     if (threadContext == NULL) {
79         return NULL;
80     }
81     threadContext->proto = proto;
82     return g_invoker[proto];
83 }
84 
ThreadHandler(void * args)85 static void *ThreadHandler(void *args)
86 {
87     ThreadContext *threadContext = (ThreadContext *)args;
88     int32_t proto = threadContext->proto;
89     int32_t policy = threadContext->policy;
90     free(threadContext);
91     threadContext = NULL;
92     RemoteInvoker *invoker = GetAndUpdateInvoker(proto);
93     if (invoker != NULL) {
94         switch (policy) {
95             case SPAWN_PASSIVE:
96                 invoker->JoinThread(false);
97                 break;
98             case SPAWN_ACTIVE:
99                 invoker->JoinThread(true);
100                 break;
101             default:
102                 break;
103         }
104     }
105     ThreadContextDestructor(proto);
106     return NULL;
107 }
108 
InitThreadPool(int32_t maxThreadNum)109 ThreadPool *InitThreadPool(int32_t maxThreadNum)
110 {
111     ThreadPool *threadPool = (ThreadPool*)calloc(1, sizeof(ThreadPool));
112     if (threadPool == NULL) {
113         return NULL;
114     }
115     threadPool->maxThreadNum = maxThreadNum + maxThreadNum;
116     threadPool->idleThreadNum = maxThreadNum;
117     threadPool->idleSocketThreadNum = maxThreadNum;
118     pthread_mutex_init(&threadPool->lock, NULL);
119     pthread_key_create(&g_localKey, TlsDestructor);
120     for (int32_t index = 0; index < PROTO_NUM; ++index) {
121         g_invoker[index] = InitRemoteInvoker(index);
122     }
123     return threadPool;
124 }
125 
DeinitThreadPool(ThreadPool * threadPool)126 void DeinitThreadPool(ThreadPool *threadPool)
127 {
128     if (threadPool == NULL) {
129         return;
130     }
131     pthread_mutex_destroy(&threadPool->lock);
132     pthread_key_delete(g_localKey);
133     free(threadPool);
134     for (int32_t index = 0; index < PROTO_NUM; ++index) {
135         DeinitRemoteInvoker(g_invoker[index], index);
136         g_invoker[index] = NULL;
137     }
138 }
139 
SpawnNewThread(ThreadPool * threadPool,int32_t policy,int32_t proto)140 int32_t SpawnNewThread(ThreadPool *threadPool, int32_t policy, int32_t proto)
141 {
142     if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
143         !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
144         RPC_LOG_ERROR("thread pool is full.");
145         return ERR_INVALID_PARAM;
146     }
147     pthread_t threadId;
148     if (pthread_mutex_lock(&threadPool->lock) != 0) {
149         RPC_LOG_ERROR("get thread pool lock failed.");
150         return ERR_FAILED;
151     }
152     if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
153         !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
154         pthread_mutex_unlock(&threadPool->lock);
155         RPC_LOG_ERROR("thread pool is full.");
156         return ERR_INVALID_PARAM;
157     }
158     ThreadContext *threadContext = (ThreadContext *)calloc(1, sizeof(ThreadContext));
159     if (threadContext == NULL) {
160         pthread_mutex_unlock(&threadPool->lock);
161         RPC_LOG_ERROR("create thread context failed.");
162         return ERR_FAILED;
163     }
164     threadContext->proto = proto;
165     threadContext->policy = policy;
166     int ret = pthread_create(&threadId, NULL, ThreadHandler, threadContext);
167     if (ret != 0) {
168         pthread_mutex_unlock(&threadPool->lock);
169         free(threadContext);
170         RPC_LOG_ERROR("spawn new thread failed.");
171         return ERR_FAILED;
172     }
173     pthread_detach(threadId);
174     if (proto == IF_PROT_BINDER) {
175         --threadPool->idleThreadNum;
176     } else if (proto == IF_PROT_DATABUS) {
177         --threadPool->idleSocketThreadNum;
178     }
179     pthread_mutex_unlock(&threadPool->lock);
180     return ERR_NONE;
181 }
182 
UpdateMaxThreadNum(ThreadPool * threadPool,int32_t maxThreadNum)183 void UpdateMaxThreadNum(ThreadPool *threadPool, int32_t maxThreadNum)
184 {
185     int32_t totalNum = maxThreadNum + maxThreadNum;
186     if (pthread_mutex_lock(&threadPool->lock) != 0) {
187         RPC_LOG_ERROR("get thread pool lock failed.");
188         return;
189     }
190     int32_t oldThreadNum = threadPool->maxThreadNum;
191     if (totalNum <= oldThreadNum) {
192         pthread_mutex_unlock(&threadPool->lock);
193         RPC_LOG_ERROR("not support set lower max thread num.");
194         return;
195     }
196     int32_t diff = totalNum - oldThreadNum;
197     threadPool->maxThreadNum = totalNum;
198     threadPool->idleThreadNum += diff / PROTO_NUM;
199     threadPool->idleSocketThreadNum += diff / PROTO_NUM;
200     pthread_mutex_unlock(&threadPool->lock);
201 }
202 
GetRemoteInvoker(void)203 RemoteInvoker *GetRemoteInvoker(void)
204 {
205     ThreadContext *threadContext = GetCurrentThreadContext();
206     if (threadContext == NULL) {
207         return NULL;
208     }
209     return g_invoker[threadContext->proto];
210 }