• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // Posix implementation for gpr threads.
20 
21 #include <grpc/support/port_platform.h>
22 
23 #include <inttypes.h>
24 
25 #include <csignal>
26 #include <string>
27 
28 #ifdef GPR_POSIX_SYNC
29 
30 #include <pthread.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37 #include <grpc/support/thd_id.h>
38 #include <grpc/support/time.h>
39 
40 #include "src/core/lib/gpr/useful.h"
41 #include "src/core/lib/gprpp/crash.h"
42 #include "src/core/lib/gprpp/fork.h"
43 #include "src/core/lib/gprpp/strerror.h"
44 #include "src/core/lib/gprpp/thd.h"
45 
46 namespace grpc_core {
47 namespace {
48 
49 class ThreadInternalsPosix;
50 
51 struct thd_arg {
52   ThreadInternalsPosix* thread;
53   void (*body)(void* arg);  // body of a thread
54   void* arg;                // argument to a thread
55   const char* name;         // name of thread. Can be nullptr.
56   bool joinable;
57   bool tracked;
58 };
59 
RoundUpToPageSize(size_t size)60 size_t RoundUpToPageSize(size_t size) {
61   // TODO(yunjiaw): Change this variable (page_size) to a function-level static
62   // when possible
63   size_t page_size = static_cast<size_t>(sysconf(_SC_PAGESIZE));
64   return (size + page_size - 1) & ~(page_size - 1);
65 }
66 
67 // Returns the minimum valid stack size that can be passed to
68 // pthread_attr_setstacksize.
MinValidStackSize(size_t request_size)69 size_t MinValidStackSize(size_t request_size) {
70   size_t min_stacksize = sysconf(_SC_THREAD_STACK_MIN);
71   if (request_size < min_stacksize) {
72     request_size = min_stacksize;
73   }
74 
75   // On some systems, pthread_attr_setstacksize() can fail if stacksize is
76   // not a multiple of the system page size.
77   return RoundUpToPageSize(request_size);
78 }
79 
80 class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
81  public:
ThreadInternalsPosix(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Thread::Options & options)82   ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
83                        void* arg, bool* success, const Thread::Options& options)
84       : started_(false) {
85     gpr_mu_init(&mu_);
86     gpr_cv_init(&ready_);
87     pthread_attr_t attr;
88     // don't use gpr_malloc as we may cause an infinite recursion with
89     // the profiling code
90     thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
91     GPR_ASSERT(info != nullptr);
92     info->thread = this;
93     info->body = thd_body;
94     info->arg = arg;
95     info->name = thd_name;
96     info->joinable = options.joinable();
97     info->tracked = options.tracked();
98     if (options.tracked()) {
99       Fork::IncThreadCount();
100     }
101 
102     GPR_ASSERT(pthread_attr_init(&attr) == 0);
103     if (options.joinable()) {
104       GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
105                  0);
106     } else {
107       GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
108                  0);
109     }
110 
111     if (options.stack_size() != 0) {
112       size_t stack_size = MinValidStackSize(options.stack_size());
113       GPR_ASSERT(pthread_attr_setstacksize(&attr, stack_size) == 0);
114     }
115 
116     int pthread_create_err = pthread_create(
117         &pthread_id_, &attr,
118         [](void* v) -> void* {
119           thd_arg arg = *static_cast<thd_arg*>(v);
120           free(v);
121           if (arg.name != nullptr) {
122 #if GPR_APPLE_PTHREAD_NAME
123             // Apple supports 64 characters, and will
124             // truncate if it's longer.
125             pthread_setname_np(arg.name);
126 #elif GPR_LINUX_PTHREAD_NAME
127             // Linux supports 16 characters max, and will
128             // error if it's longer.
129             char buf[16];
130             size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
131             strncpy(buf, arg.name, buf_len);
132             buf[buf_len] = '\0';
133             pthread_setname_np(pthread_self(), buf);
134 #endif  // GPR_APPLE_PTHREAD_NAME
135           }
136 
137           gpr_mu_lock(&arg.thread->mu_);
138           while (!arg.thread->started_) {
139             gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
140                         gpr_inf_future(GPR_CLOCK_MONOTONIC));
141           }
142           gpr_mu_unlock(&arg.thread->mu_);
143 
144           if (!arg.joinable) {
145             delete arg.thread;
146           }
147 
148           (*arg.body)(arg.arg);
149           if (arg.tracked) {
150             Fork::DecThreadCount();
151           }
152           return nullptr;
153         },
154         info);
155     *success = (pthread_create_err == 0);
156 
157     GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
158 
159     if (!(*success)) {
160       gpr_log(GPR_ERROR, "pthread_create failed: %s",
161               StrError(pthread_create_err).c_str());
162       // don't use gpr_free, as this was allocated using malloc (see above)
163       free(info);
164       if (options.tracked()) {
165         Fork::DecThreadCount();
166       }
167     }
168   }
169 
~ThreadInternalsPosix()170   ~ThreadInternalsPosix() override {
171     gpr_mu_destroy(&mu_);
172     gpr_cv_destroy(&ready_);
173   }
174 
Start()175   void Start() override {
176     gpr_mu_lock(&mu_);
177     started_ = true;
178     gpr_cv_signal(&ready_);
179     gpr_mu_unlock(&mu_);
180   }
181 
Join()182   void Join() override {
183     int pthread_join_err = pthread_join(pthread_id_, nullptr);
184     if (pthread_join_err != 0) {
185       Crash("pthread_join failed: " + StrError(pthread_join_err));
186     }
187   }
188 
189  private:
190   gpr_mu mu_;
191   gpr_cv ready_;
192   bool started_;
193   pthread_t pthread_id_;
194 };
195 
196 }  // namespace
197 
Signal(gpr_thd_id tid,int sig)198 void Thread::Signal(gpr_thd_id tid, int sig) {
199   auto kill_err = pthread_kill((pthread_t)tid, sig);
200   if (kill_err != 0) {
201     gpr_log(GPR_ERROR, "pthread_kill for tid %" PRIdPTR " failed: %s", tid,
202             StrError(kill_err).c_str());
203   }
204 }
205 
206 #ifndef GPR_ANDROID
Kill(gpr_thd_id tid)207 void Thread::Kill(gpr_thd_id tid) {
208   auto cancel_err = pthread_cancel((pthread_t)tid);
209   if (cancel_err != 0) {
210     gpr_log(GPR_ERROR, "pthread_cancel for tid %" PRIdPTR " failed: %s", tid,
211             StrError(cancel_err).c_str());
212   }
213 }
214 #else  // GPR_ANDROID
Kill(gpr_thd_id)215 void Thread::Kill(gpr_thd_id /* tid */) {
216   gpr_log(GPR_DEBUG, "Thread::Kill is not supported on Android.");
217 }
218 #endif
219 
Thread(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Options & options)220 Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
221                bool* success, const Options& options)
222     : options_(options) {
223   bool outcome = false;
224   impl_ = new ThreadInternalsPosix(thd_name, thd_body, arg, &outcome, options);
225   if (outcome) {
226     state_ = ALIVE;
227   } else {
228     state_ = FAILED;
229     delete impl_;
230     impl_ = nullptr;
231   }
232 
233   if (success != nullptr) {
234     *success = outcome;
235   }
236 }
237 }  // namespace grpc_core
238 
239 // The following is in the external namespace as it is exposed as C89 API
gpr_thd_currentid(void)240 gpr_thd_id gpr_thd_currentid(void) {
241   // Use C-style casting because Linux and OSX have different definitions
242   // of pthread_t so that a single C++ cast doesn't handle it.
243   // NOLINTNEXTLINE(google-readability-casting)
244   return (gpr_thd_id)pthread_self();
245 }
246 
247 #endif  // GPR_POSIX_SYNC
248