1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/util/fork.h"
20
21 #include <grpc/support/atm.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
25
26 #include <utility>
27
28 #include "src/core/config/config_vars.h"
29 #include "src/core/lib/event_engine/thread_local.h"
30 #include "src/core/util/no_destruct.h"
31
32 //
33 // NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
34 // AROUND VERY SPECIFIC USE CASES.
35 //
36
37 namespace grpc_core {
38 namespace {
39 // The exec_ctx_count has 2 modes, blocked and unblocked.
40 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
41 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
42
43 // When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
44 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
45 // meaning that BLOCKED and UNBLOCKED counts partition the integers
46 #define UNBLOCKED(n) ((n) + 2)
47 #define BLOCKED(n) (n)
48
49 class ExecCtxState {
50 public:
ExecCtxState()51 ExecCtxState() : fork_complete_(true) {
52 gpr_mu_init(&mu_);
53 gpr_cv_init(&cv_);
54 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
55 }
56
IncExecCtxCount()57 void IncExecCtxCount() {
58 // EventEngine is expected to terminate all threads before fork, and so this
59 // extra work is unnecessary
60 if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
61 return;
62 }
63 gpr_atm count = gpr_atm_no_barrier_load(&count_);
64 while (true) {
65 if (count <= BLOCKED(1)) {
66 // This only occurs if we are trying to fork. Wait until the fork()
67 // operation completes before allowing new ExecCtxs.
68 gpr_mu_lock(&mu_);
69 if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
70 while (!fork_complete_) {
71 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
72 }
73 }
74 gpr_mu_unlock(&mu_);
75 } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
76 break;
77 }
78 count = gpr_atm_no_barrier_load(&count_);
79 }
80 }
81
DecExecCtxCount()82 void DecExecCtxCount() {
83 if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
84 return;
85 }
86 gpr_atm_no_barrier_fetch_add(&count_, -1);
87 }
88
BlockExecCtx()89 bool BlockExecCtx() {
90 // Assumes there is an active ExecCtx when this function is called
91 if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
92 gpr_mu_lock(&mu_);
93 fork_complete_ = false;
94 gpr_mu_unlock(&mu_);
95 return true;
96 }
97 return false;
98 }
99
AllowExecCtx()100 void AllowExecCtx() {
101 gpr_mu_lock(&mu_);
102 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
103 fork_complete_ = true;
104 gpr_cv_broadcast(&cv_);
105 gpr_mu_unlock(&mu_);
106 }
107
~ExecCtxState()108 ~ExecCtxState() {
109 gpr_mu_destroy(&mu_);
110 gpr_cv_destroy(&cv_);
111 }
112
113 private:
114 bool fork_complete_;
115 gpr_mu mu_;
116 gpr_cv cv_;
117 gpr_atm count_;
118 };
119
120 class ThreadState {
121 public:
ThreadState()122 ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
123 gpr_mu_init(&mu_);
124 gpr_cv_init(&cv_);
125 }
126
IncThreadCount()127 void IncThreadCount() {
128 gpr_mu_lock(&mu_);
129 count_++;
130 gpr_mu_unlock(&mu_);
131 }
132
DecThreadCount()133 void DecThreadCount() {
134 gpr_mu_lock(&mu_);
135 count_--;
136 if (awaiting_threads_ && count_ == 0) {
137 threads_done_ = true;
138 gpr_cv_signal(&cv_);
139 }
140 gpr_mu_unlock(&mu_);
141 }
AwaitThreads()142 void AwaitThreads() {
143 gpr_mu_lock(&mu_);
144 awaiting_threads_ = true;
145 threads_done_ = (count_ == 0);
146 while (!threads_done_) {
147 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
148 }
149 awaiting_threads_ = true;
150 gpr_mu_unlock(&mu_);
151 }
152
~ThreadState()153 ~ThreadState() {
154 gpr_mu_destroy(&mu_);
155 gpr_cv_destroy(&cv_);
156 }
157
158 private:
159 bool awaiting_threads_;
160 bool threads_done_;
161 gpr_mu mu_;
162 gpr_cv cv_;
163 int count_;
164 };
165
166 } // namespace
167
GlobalInit()168 void Fork::GlobalInit() {
169 if (!override_enabled_) {
170 support_enabled_.store(ConfigVars::Get().EnableForkSupport(),
171 std::memory_order_relaxed);
172 }
173 }
174
Enabled()175 bool Fork::Enabled() {
176 return support_enabled_.load(std::memory_order_relaxed);
177 }
178
179 // Testing Only
Enable(bool enable)180 void Fork::Enable(bool enable) {
181 override_enabled_ = true;
182 support_enabled_.store(enable, std::memory_order_relaxed);
183 }
184
DoIncExecCtxCount()185 void Fork::DoIncExecCtxCount() {
186 NoDestructSingleton<ExecCtxState>::Get()->IncExecCtxCount();
187 }
188
DoDecExecCtxCount()189 void Fork::DoDecExecCtxCount() {
190 NoDestructSingleton<ExecCtxState>::Get()->DecExecCtxCount();
191 }
192
RegisterResetChildPollingEngineFunc(Fork::child_postfork_func reset_child_polling_engine)193 bool Fork::RegisterResetChildPollingEngineFunc(
194 Fork::child_postfork_func reset_child_polling_engine) {
195 if (reset_child_polling_engine_ == nullptr) {
196 reset_child_polling_engine_ = new std::set<Fork::child_postfork_func>();
197 }
198 auto ret = reset_child_polling_engine_->insert(reset_child_polling_engine);
199 return ret.second;
200 }
201
202 const std::set<Fork::child_postfork_func>&
GetResetChildPollingEngineFunc()203 Fork::GetResetChildPollingEngineFunc() {
204 return *reset_child_polling_engine_;
205 }
206
BlockExecCtx()207 bool Fork::BlockExecCtx() {
208 if (support_enabled_.load(std::memory_order_relaxed)) {
209 return NoDestructSingleton<ExecCtxState>::Get()->BlockExecCtx();
210 }
211 return false;
212 }
213
AllowExecCtx()214 void Fork::AllowExecCtx() {
215 if (support_enabled_.load(std::memory_order_relaxed)) {
216 NoDestructSingleton<ExecCtxState>::Get()->AllowExecCtx();
217 }
218 }
219
IncThreadCount()220 void Fork::IncThreadCount() {
221 if (support_enabled_.load(std::memory_order_relaxed)) {
222 NoDestructSingleton<ThreadState>::Get()->IncThreadCount();
223 }
224 }
225
DecThreadCount()226 void Fork::DecThreadCount() {
227 if (support_enabled_.load(std::memory_order_relaxed)) {
228 NoDestructSingleton<ThreadState>::Get()->DecThreadCount();
229 }
230 }
AwaitThreads()231 void Fork::AwaitThreads() {
232 if (support_enabled_.load(std::memory_order_relaxed)) {
233 NoDestructSingleton<ThreadState>::Get()->AwaitThreads();
234 }
235 }
236
237 std::atomic<bool> Fork::support_enabled_(false);
238 bool Fork::override_enabled_ = false;
239 std::set<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
240 nullptr;
241 } // namespace grpc_core
242