• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // Posix implementation for gpr threads.
20 
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 
24 #include <csignal>
25 #include <string>
26 
27 #ifdef GPR_POSIX_SYNC
28 
29 #include <grpc/support/sync.h>
30 #include <grpc/support/thd_id.h>
31 #include <grpc/support/time.h>
32 #include <pthread.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <unistd.h>
36 
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "src/core/util/crash.h"
40 #include "src/core/util/fork.h"
41 #include "src/core/util/strerror.h"
42 #include "src/core/util/thd.h"
43 #include "src/core/util/useful.h"
44 
45 namespace grpc_core {
46 namespace {
47 
48 class ThreadInternalsPosix;
49 
50 struct thd_arg {
51   ThreadInternalsPosix* thread;
52   void (*body)(void* arg);  // body of a thread
53   void* arg;                // argument to a thread
54   const char* name;         // name of thread. Can be nullptr.
55   bool joinable;
56   bool tracked;
57 };
58 
RoundUpToPageSize(size_t size)59 size_t RoundUpToPageSize(size_t size) {
60   // TODO(yunjiaw): Change this variable (page_size) to a function-level static
61   // when possible
62   size_t page_size = static_cast<size_t>(sysconf(_SC_PAGESIZE));
63   return (size + page_size - 1) & ~(page_size - 1);
64 }
65 
66 // Returns the minimum valid stack size that can be passed to
67 // pthread_attr_setstacksize.
MinValidStackSize(size_t request_size)68 size_t MinValidStackSize(size_t request_size) {
69   size_t min_stacksize = sysconf(_SC_THREAD_STACK_MIN);
70   if (request_size < min_stacksize) {
71     request_size = min_stacksize;
72   }
73 
74   // On some systems, pthread_attr_setstacksize() can fail if stacksize is
75   // not a multiple of the system page size.
76   return RoundUpToPageSize(request_size);
77 }
78 
79 class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
80  public:
ThreadInternalsPosix(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Thread::Options & options)81   ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
82                        void* arg, bool* success, const Thread::Options& options)
83       : started_(false) {
84     gpr_mu_init(&mu_);
85     gpr_cv_init(&ready_);
86     pthread_attr_t attr;
87     // don't use gpr_malloc as we may cause an infinite recursion with
88     // the profiling code
89     thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
90     CHECK_NE(info, nullptr);
91     info->thread = this;
92     info->body = thd_body;
93     info->arg = arg;
94     info->name = thd_name;
95     info->joinable = options.joinable();
96     info->tracked = options.tracked();
97     if (options.tracked()) {
98       Fork::IncThreadCount();
99     }
100 
101     CHECK_EQ(pthread_attr_init(&attr), 0);
102     if (options.joinable()) {
103       CHECK(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0);
104     } else {
105       CHECK(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0);
106     }
107 
108     if (options.stack_size() != 0) {
109       size_t stack_size = MinValidStackSize(options.stack_size());
110       CHECK_EQ(pthread_attr_setstacksize(&attr, stack_size), 0);
111     }
112 
113     int pthread_create_err = pthread_create(
114         &pthread_id_, &attr,
115         [](void* v) -> void* {
116           thd_arg arg = *static_cast<thd_arg*>(v);
117           free(v);
118           if (arg.name != nullptr) {
119 #if GPR_APPLE_PTHREAD_NAME
120             // Apple supports 64 characters, and will
121             // truncate if it's longer.
122             pthread_setname_np(arg.name);
123 #elif GPR_LINUX_PTHREAD_NAME
124             // Linux supports 16 characters max, and will
125             // error if it's longer.
126             char buf[16];
127             size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
128             strncpy(buf, arg.name, buf_len);
129             buf[buf_len] = '\0';
130             pthread_setname_np(pthread_self(), buf);
131 #endif  // GPR_APPLE_PTHREAD_NAME
132           }
133 
134           gpr_mu_lock(&arg.thread->mu_);
135           while (!arg.thread->started_) {
136             gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
137                         gpr_inf_future(GPR_CLOCK_MONOTONIC));
138           }
139           gpr_mu_unlock(&arg.thread->mu_);
140 
141           if (!arg.joinable) {
142             delete arg.thread;
143           }
144 
145           (*arg.body)(arg.arg);
146           if (arg.tracked) {
147             Fork::DecThreadCount();
148           }
149           return nullptr;
150         },
151         info);
152     *success = (pthread_create_err == 0);
153 
154     CHECK_EQ(pthread_attr_destroy(&attr), 0);
155 
156     if (!(*success)) {
157       LOG(ERROR) << "pthread_create failed: " << StrError(pthread_create_err);
158       // don't use gpr_free, as this was allocated using malloc (see above)
159       free(info);
160       if (options.tracked()) {
161         Fork::DecThreadCount();
162       }
163     }
164   }
165 
~ThreadInternalsPosix()166   ~ThreadInternalsPosix() override {
167     gpr_mu_destroy(&mu_);
168     gpr_cv_destroy(&ready_);
169   }
170 
Start()171   void Start() override {
172     gpr_mu_lock(&mu_);
173     started_ = true;
174     gpr_cv_signal(&ready_);
175     gpr_mu_unlock(&mu_);
176   }
177 
Join()178   void Join() override {
179     int pthread_join_err = pthread_join(pthread_id_, nullptr);
180     if (pthread_join_err != 0) {
181       Crash("pthread_join failed: " + StrError(pthread_join_err));
182     }
183   }
184 
185  private:
186   gpr_mu mu_;
187   gpr_cv ready_;
188   bool started_;
189   pthread_t pthread_id_;
190 };
191 
192 }  // namespace
193 
Signal(gpr_thd_id tid,int sig)194 void Thread::Signal(gpr_thd_id tid, int sig) {
195   auto kill_err = pthread_kill((pthread_t)tid, sig);
196   if (kill_err != 0) {
197     LOG(ERROR) << "pthread_kill for tid " << tid
198                << " failed: " << StrError(kill_err);
199   }
200 }
201 
202 #ifndef GPR_ANDROID
Kill(gpr_thd_id tid)203 void Thread::Kill(gpr_thd_id tid) {
204   auto cancel_err = pthread_cancel((pthread_t)tid);
205   if (cancel_err != 0) {
206     LOG(ERROR) << "pthread_cancel for tid " << tid
207                << " failed: " << StrError(cancel_err);
208   }
209 }
210 #else  // GPR_ANDROID
Kill(gpr_thd_id)211 void Thread::Kill(gpr_thd_id /* tid */) {
212   VLOG(2) << "Thread::Kill is not supported on Android.";
213 }
214 #endif
215 
Thread(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Options & options)216 Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
217                bool* success, const Options& options)
218     : options_(options) {
219   bool outcome = false;
220   impl_ = new ThreadInternalsPosix(thd_name, thd_body, arg, &outcome, options);
221   if (outcome) {
222     state_ = ALIVE;
223   } else {
224     state_ = FAILED;
225     delete impl_;
226     impl_ = nullptr;
227   }
228 
229   if (success != nullptr) {
230     *success = outcome;
231   }
232 }
233 }  // namespace grpc_core
234 
235 // The following is in the external namespace as it is exposed as C89 API
gpr_thd_currentid(void)236 gpr_thd_id gpr_thd_currentid(void) {
237   // Use C-style casting because Linux and OSX have different definitions
238   // of pthread_t so that a single C++ cast doesn't handle it.
239   // NOLINTNEXTLINE(google-readability-casting)
240   return (gpr_thd_id)pthread_self();
241 }
242 
243 #endif  // GPR_POSIX_SYNC
244