1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/gprpp/fork.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
28
29 #include "src/core/lib/gpr/env.h"
30 #include "src/core/lib/gpr/useful.h"
31 #include "src/core/lib/gprpp/memory.h"
32
33 /*
34 * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
35 * AROUND VERY SPECIFIC USE CASES.
36 */
37
38 namespace grpc_core {
39 namespace internal {
40 // The exec_ctx_count has 2 modes, blocked and unblocked.
41 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
42 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
43
44 // When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
45 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
46 // meaning that BLOCKED and UNBLOCKED counts partition the integers
47 #define UNBLOCKED(n) (n + 2)
48 #define BLOCKED(n) (n)
49
50 class ExecCtxState {
51 public:
ExecCtxState()52 ExecCtxState() : fork_complete_(true) {
53 gpr_mu_init(&mu_);
54 gpr_cv_init(&cv_);
55 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
56 }
57
IncExecCtxCount()58 void IncExecCtxCount() {
59 gpr_atm count = gpr_atm_no_barrier_load(&count_);
60 while (true) {
61 if (count <= BLOCKED(1)) {
62 // This only occurs if we are trying to fork. Wait until the fork()
63 // operation completes before allowing new ExecCtxs.
64 gpr_mu_lock(&mu_);
65 if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
66 while (!fork_complete_) {
67 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
68 }
69 }
70 gpr_mu_unlock(&mu_);
71 } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
72 break;
73 }
74 count = gpr_atm_no_barrier_load(&count_);
75 }
76 }
77
DecExecCtxCount()78 void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); }
79
BlockExecCtx()80 bool BlockExecCtx() {
81 // Assumes there is an active ExecCtx when this function is called
82 if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
83 gpr_mu_lock(&mu_);
84 fork_complete_ = false;
85 gpr_mu_unlock(&mu_);
86 return true;
87 }
88 return false;
89 }
90
AllowExecCtx()91 void AllowExecCtx() {
92 gpr_mu_lock(&mu_);
93 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
94 fork_complete_ = true;
95 gpr_cv_broadcast(&cv_);
96 gpr_mu_unlock(&mu_);
97 }
98
~ExecCtxState()99 ~ExecCtxState() {
100 gpr_mu_destroy(&mu_);
101 gpr_cv_destroy(&cv_);
102 }
103
104 private:
105 bool fork_complete_;
106 gpr_mu mu_;
107 gpr_cv cv_;
108 gpr_atm count_;
109 };
110
111 class ThreadState {
112 public:
ThreadState()113 ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
114 gpr_mu_init(&mu_);
115 gpr_cv_init(&cv_);
116 }
117
IncThreadCount()118 void IncThreadCount() {
119 gpr_mu_lock(&mu_);
120 count_++;
121 gpr_mu_unlock(&mu_);
122 }
123
DecThreadCount()124 void DecThreadCount() {
125 gpr_mu_lock(&mu_);
126 count_--;
127 if (awaiting_threads_ && count_ == 0) {
128 threads_done_ = true;
129 gpr_cv_signal(&cv_);
130 }
131 gpr_mu_unlock(&mu_);
132 }
AwaitThreads()133 void AwaitThreads() {
134 gpr_mu_lock(&mu_);
135 awaiting_threads_ = true;
136 threads_done_ = (count_ == 0);
137 while (!threads_done_) {
138 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
139 }
140 awaiting_threads_ = true;
141 gpr_mu_unlock(&mu_);
142 }
143
~ThreadState()144 ~ThreadState() {
145 gpr_mu_destroy(&mu_);
146 gpr_cv_destroy(&cv_);
147 }
148
149 private:
150 bool awaiting_threads_;
151 bool threads_done_;
152 gpr_mu mu_;
153 gpr_cv cv_;
154 int count_;
155 };
156
157 } // namespace
158
GlobalInit()159 void Fork::GlobalInit() {
160 if (!override_enabled_) {
161 #ifdef GRPC_ENABLE_FORK_SUPPORT
162 support_enabled_ = true;
163 #else
164 support_enabled_ = false;
165 #endif
166 bool env_var_set = false;
167 char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
168 if (env != nullptr) {
169 static const char* truthy[] = {"yes", "Yes", "YES", "true",
170 "True", "TRUE", "1"};
171 static const char* falsey[] = {"no", "No", "NO", "false",
172 "False", "FALSE", "0"};
173 for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
174 if (0 == strcmp(env, truthy[i])) {
175 support_enabled_ = true;
176 env_var_set = true;
177 break;
178 }
179 }
180 if (!env_var_set) {
181 for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
182 if (0 == strcmp(env, falsey[i])) {
183 support_enabled_ = false;
184 env_var_set = true;
185 break;
186 }
187 }
188 }
189 gpr_free(env);
190 }
191 }
192 if (support_enabled_) {
193 exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>();
194 thread_state_ = grpc_core::New<internal::ThreadState>();
195 }
196 }
197
GlobalShutdown()198 void Fork::GlobalShutdown() {
199 if (support_enabled_) {
200 grpc_core::Delete(exec_ctx_state_);
201 grpc_core::Delete(thread_state_);
202 }
203 }
204
Enabled()205 bool Fork::Enabled() { return support_enabled_; }
206
207 // Testing Only
Enable(bool enable)208 void Fork::Enable(bool enable) {
209 override_enabled_ = true;
210 support_enabled_ = enable;
211 }
212
IncExecCtxCount()213 void Fork::IncExecCtxCount() {
214 if (support_enabled_) {
215 exec_ctx_state_->IncExecCtxCount();
216 }
217 }
218
DecExecCtxCount()219 void Fork::DecExecCtxCount() {
220 if (support_enabled_) {
221 exec_ctx_state_->DecExecCtxCount();
222 }
223 }
224
SetResetChildPollingEngineFunc(Fork::child_postfork_func reset_child_polling_engine)225 void Fork::SetResetChildPollingEngineFunc(
226 Fork::child_postfork_func reset_child_polling_engine) {
227 reset_child_polling_engine_ = reset_child_polling_engine;
228 }
GetResetChildPollingEngineFunc()229 Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
230 return reset_child_polling_engine_;
231 }
232
BlockExecCtx()233 bool Fork::BlockExecCtx() {
234 if (support_enabled_) {
235 return exec_ctx_state_->BlockExecCtx();
236 }
237 return false;
238 }
239
AllowExecCtx()240 void Fork::AllowExecCtx() {
241 if (support_enabled_) {
242 exec_ctx_state_->AllowExecCtx();
243 }
244 }
245
IncThreadCount()246 void Fork::IncThreadCount() {
247 if (support_enabled_) {
248 thread_state_->IncThreadCount();
249 }
250 }
251
DecThreadCount()252 void Fork::DecThreadCount() {
253 if (support_enabled_) {
254 thread_state_->DecThreadCount();
255 }
256 }
AwaitThreads()257 void Fork::AwaitThreads() {
258 if (support_enabled_) {
259 thread_state_->AwaitThreads();
260 }
261 }
262
263 internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
264 internal::ThreadState* Fork::thread_state_ = nullptr;
265 bool Fork::support_enabled_ = false;
266 bool Fork::override_enabled_ = false;
267 Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
268 } // namespace grpc_core
269