• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/util/fork.h"
20 
21 #include <grpc/support/atm.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
25 
26 #include <utility>
27 
28 #include "src/core/config/config_vars.h"
29 #include "src/core/lib/event_engine/thread_local.h"
30 #include "src/core/util/no_destruct.h"
31 
32 //
33 // NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
34 //       AROUND VERY SPECIFIC USE CASES.
35 //
36 
37 namespace grpc_core {
38 namespace {
39 // The exec_ctx_count has 2 modes, blocked and unblocked.
40 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
41 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
42 
43 // When blocked, the exec_ctx_count is 0-indexed.  Note that ExecCtx
44 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
45 // meaning that BLOCKED and UNBLOCKED counts partition the integers
46 #define UNBLOCKED(n) ((n) + 2)
47 #define BLOCKED(n) (n)
48 
49 class ExecCtxState {
50  public:
ExecCtxState()51   ExecCtxState() : fork_complete_(true) {
52     gpr_mu_init(&mu_);
53     gpr_cv_init(&cv_);
54     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
55   }
56 
IncExecCtxCount()57   void IncExecCtxCount() {
58     // EventEngine is expected to terminate all threads before fork, and so this
59     // extra work is unnecessary
60     if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
61       return;
62     }
63     gpr_atm count = gpr_atm_no_barrier_load(&count_);
64     while (true) {
65       if (count <= BLOCKED(1)) {
66         // This only occurs if we are trying to fork.  Wait until the fork()
67         // operation completes before allowing new ExecCtxs.
68         gpr_mu_lock(&mu_);
69         if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
70           while (!fork_complete_) {
71             gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
72           }
73         }
74         gpr_mu_unlock(&mu_);
75       } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
76         break;
77       }
78       count = gpr_atm_no_barrier_load(&count_);
79     }
80   }
81 
DecExecCtxCount()82   void DecExecCtxCount() {
83     if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
84       return;
85     }
86     gpr_atm_no_barrier_fetch_add(&count_, -1);
87   }
88 
BlockExecCtx()89   bool BlockExecCtx() {
90     // Assumes there is an active ExecCtx when this function is called
91     if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
92       gpr_mu_lock(&mu_);
93       fork_complete_ = false;
94       gpr_mu_unlock(&mu_);
95       return true;
96     }
97     return false;
98   }
99 
AllowExecCtx()100   void AllowExecCtx() {
101     gpr_mu_lock(&mu_);
102     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
103     fork_complete_ = true;
104     gpr_cv_broadcast(&cv_);
105     gpr_mu_unlock(&mu_);
106   }
107 
~ExecCtxState()108   ~ExecCtxState() {
109     gpr_mu_destroy(&mu_);
110     gpr_cv_destroy(&cv_);
111   }
112 
113  private:
114   bool fork_complete_;
115   gpr_mu mu_;
116   gpr_cv cv_;
117   gpr_atm count_;
118 };
119 
120 class ThreadState {
121  public:
ThreadState()122   ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
123     gpr_mu_init(&mu_);
124     gpr_cv_init(&cv_);
125   }
126 
IncThreadCount()127   void IncThreadCount() {
128     gpr_mu_lock(&mu_);
129     count_++;
130     gpr_mu_unlock(&mu_);
131   }
132 
DecThreadCount()133   void DecThreadCount() {
134     gpr_mu_lock(&mu_);
135     count_--;
136     if (awaiting_threads_ && count_ == 0) {
137       threads_done_ = true;
138       gpr_cv_signal(&cv_);
139     }
140     gpr_mu_unlock(&mu_);
141   }
AwaitThreads()142   void AwaitThreads() {
143     gpr_mu_lock(&mu_);
144     awaiting_threads_ = true;
145     threads_done_ = (count_ == 0);
146     while (!threads_done_) {
147       gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
148     }
149     awaiting_threads_ = true;
150     gpr_mu_unlock(&mu_);
151   }
152 
~ThreadState()153   ~ThreadState() {
154     gpr_mu_destroy(&mu_);
155     gpr_cv_destroy(&cv_);
156   }
157 
158  private:
159   bool awaiting_threads_;
160   bool threads_done_;
161   gpr_mu mu_;
162   gpr_cv cv_;
163   int count_;
164 };
165 
166 }  // namespace
167 
GlobalInit()168 void Fork::GlobalInit() {
169   if (!override_enabled_) {
170     support_enabled_.store(ConfigVars::Get().EnableForkSupport(),
171                            std::memory_order_relaxed);
172   }
173 }
174 
Enabled()175 bool Fork::Enabled() {
176   return support_enabled_.load(std::memory_order_relaxed);
177 }
178 
179 // Testing Only
Enable(bool enable)180 void Fork::Enable(bool enable) {
181   override_enabled_ = true;
182   support_enabled_.store(enable, std::memory_order_relaxed);
183 }
184 
DoIncExecCtxCount()185 void Fork::DoIncExecCtxCount() {
186   NoDestructSingleton<ExecCtxState>::Get()->IncExecCtxCount();
187 }
188 
DoDecExecCtxCount()189 void Fork::DoDecExecCtxCount() {
190   NoDestructSingleton<ExecCtxState>::Get()->DecExecCtxCount();
191 }
192 
RegisterResetChildPollingEngineFunc(Fork::child_postfork_func reset_child_polling_engine)193 bool Fork::RegisterResetChildPollingEngineFunc(
194     Fork::child_postfork_func reset_child_polling_engine) {
195   if (reset_child_polling_engine_ == nullptr) {
196     reset_child_polling_engine_ = new std::set<Fork::child_postfork_func>();
197   }
198   auto ret = reset_child_polling_engine_->insert(reset_child_polling_engine);
199   return ret.second;
200 }
201 
202 const std::set<Fork::child_postfork_func>&
GetResetChildPollingEngineFunc()203 Fork::GetResetChildPollingEngineFunc() {
204   return *reset_child_polling_engine_;
205 }
206 
BlockExecCtx()207 bool Fork::BlockExecCtx() {
208   if (support_enabled_.load(std::memory_order_relaxed)) {
209     return NoDestructSingleton<ExecCtxState>::Get()->BlockExecCtx();
210   }
211   return false;
212 }
213 
AllowExecCtx()214 void Fork::AllowExecCtx() {
215   if (support_enabled_.load(std::memory_order_relaxed)) {
216     NoDestructSingleton<ExecCtxState>::Get()->AllowExecCtx();
217   }
218 }
219 
IncThreadCount()220 void Fork::IncThreadCount() {
221   if (support_enabled_.load(std::memory_order_relaxed)) {
222     NoDestructSingleton<ThreadState>::Get()->IncThreadCount();
223   }
224 }
225 
DecThreadCount()226 void Fork::DecThreadCount() {
227   if (support_enabled_.load(std::memory_order_relaxed)) {
228     NoDestructSingleton<ThreadState>::Get()->DecThreadCount();
229   }
230 }
AwaitThreads()231 void Fork::AwaitThreads() {
232   if (support_enabled_.load(std::memory_order_relaxed)) {
233     NoDestructSingleton<ThreadState>::Get()->AwaitThreads();
234   }
235 }
236 
237 std::atomic<bool> Fork::support_enabled_(false);
238 bool Fork::override_enabled_ = false;
239 std::set<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
240     nullptr;
241 }  // namespace grpc_core
242