• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/gprpp/fork.h"
22 
23 #include <utility>
24 
25 #include <grpc/support/atm.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
28 
29 #include "src/core/lib/config/config_vars.h"
30 #include "src/core/lib/event_engine/thread_local.h"
31 #include "src/core/lib/gprpp/no_destruct.h"
32 
33 //
34 // NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
35 //       AROUND VERY SPECIFIC USE CASES.
36 //
37 
38 namespace grpc_core {
39 namespace {
40 // The exec_ctx_count has 2 modes, blocked and unblocked.
41 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
42 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
43 
44 // When blocked, the exec_ctx_count is 0-indexed.  Note that ExecCtx
45 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
46 // meaning that BLOCKED and UNBLOCKED counts partition the integers
47 #define UNBLOCKED(n) ((n) + 2)
48 #define BLOCKED(n) (n)
49 
50 class ExecCtxState {
51  public:
ExecCtxState()52   ExecCtxState() : fork_complete_(true) {
53     gpr_mu_init(&mu_);
54     gpr_cv_init(&cv_);
55     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
56   }
57 
IncExecCtxCount()58   void IncExecCtxCount() {
59     // EventEngine is expected to terminate all threads before fork, and so this
60     // extra work is unnecessary
61     if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
62       return;
63     }
64     gpr_atm count = gpr_atm_no_barrier_load(&count_);
65     while (true) {
66       if (count <= BLOCKED(1)) {
67         // This only occurs if we are trying to fork.  Wait until the fork()
68         // operation completes before allowing new ExecCtxs.
69         gpr_mu_lock(&mu_);
70         if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
71           while (!fork_complete_) {
72             gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
73           }
74         }
75         gpr_mu_unlock(&mu_);
76       } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
77         break;
78       }
79       count = gpr_atm_no_barrier_load(&count_);
80     }
81   }
82 
DecExecCtxCount()83   void DecExecCtxCount() {
84     if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
85       return;
86     }
87     gpr_atm_no_barrier_fetch_add(&count_, -1);
88   }
89 
BlockExecCtx()90   bool BlockExecCtx() {
91     // Assumes there is an active ExecCtx when this function is called
92     if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
93       gpr_mu_lock(&mu_);
94       fork_complete_ = false;
95       gpr_mu_unlock(&mu_);
96       return true;
97     }
98     return false;
99   }
100 
AllowExecCtx()101   void AllowExecCtx() {
102     gpr_mu_lock(&mu_);
103     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
104     fork_complete_ = true;
105     gpr_cv_broadcast(&cv_);
106     gpr_mu_unlock(&mu_);
107   }
108 
~ExecCtxState()109   ~ExecCtxState() {
110     gpr_mu_destroy(&mu_);
111     gpr_cv_destroy(&cv_);
112   }
113 
114  private:
115   bool fork_complete_;
116   gpr_mu mu_;
117   gpr_cv cv_;
118   gpr_atm count_;
119 };
120 
121 class ThreadState {
122  public:
ThreadState()123   ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
124     gpr_mu_init(&mu_);
125     gpr_cv_init(&cv_);
126   }
127 
IncThreadCount()128   void IncThreadCount() {
129     gpr_mu_lock(&mu_);
130     count_++;
131     gpr_mu_unlock(&mu_);
132   }
133 
DecThreadCount()134   void DecThreadCount() {
135     gpr_mu_lock(&mu_);
136     count_--;
137     if (awaiting_threads_ && count_ == 0) {
138       threads_done_ = true;
139       gpr_cv_signal(&cv_);
140     }
141     gpr_mu_unlock(&mu_);
142   }
AwaitThreads()143   void AwaitThreads() {
144     gpr_mu_lock(&mu_);
145     awaiting_threads_ = true;
146     threads_done_ = (count_ == 0);
147     while (!threads_done_) {
148       gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
149     }
150     awaiting_threads_ = true;
151     gpr_mu_unlock(&mu_);
152   }
153 
~ThreadState()154   ~ThreadState() {
155     gpr_mu_destroy(&mu_);
156     gpr_cv_destroy(&cv_);
157   }
158 
159  private:
160   bool awaiting_threads_;
161   bool threads_done_;
162   gpr_mu mu_;
163   gpr_cv cv_;
164   int count_;
165 };
166 
167 }  // namespace
168 
GlobalInit()169 void Fork::GlobalInit() {
170   if (!override_enabled_) {
171     support_enabled_.store(ConfigVars::Get().EnableForkSupport(),
172                            std::memory_order_relaxed);
173   }
174 }
175 
Enabled()176 bool Fork::Enabled() {
177   return support_enabled_.load(std::memory_order_relaxed);
178 }
179 
180 // Testing Only
Enable(bool enable)181 void Fork::Enable(bool enable) {
182   override_enabled_ = true;
183   support_enabled_.store(enable, std::memory_order_relaxed);
184 }
185 
DoIncExecCtxCount()186 void Fork::DoIncExecCtxCount() {
187   NoDestructSingleton<ExecCtxState>::Get()->IncExecCtxCount();
188 }
189 
DoDecExecCtxCount()190 void Fork::DoDecExecCtxCount() {
191   NoDestructSingleton<ExecCtxState>::Get()->DecExecCtxCount();
192 }
193 
RegisterResetChildPollingEngineFunc(Fork::child_postfork_func reset_child_polling_engine)194 bool Fork::RegisterResetChildPollingEngineFunc(
195     Fork::child_postfork_func reset_child_polling_engine) {
196   if (reset_child_polling_engine_ == nullptr) {
197     reset_child_polling_engine_ = new std::set<Fork::child_postfork_func>();
198   }
199   auto ret = reset_child_polling_engine_->insert(reset_child_polling_engine);
200   return ret.second;
201 }
202 
203 const std::set<Fork::child_postfork_func>&
GetResetChildPollingEngineFunc()204 Fork::GetResetChildPollingEngineFunc() {
205   return *reset_child_polling_engine_;
206 }
207 
BlockExecCtx()208 bool Fork::BlockExecCtx() {
209   if (support_enabled_.load(std::memory_order_relaxed)) {
210     return NoDestructSingleton<ExecCtxState>::Get()->BlockExecCtx();
211   }
212   return false;
213 }
214 
AllowExecCtx()215 void Fork::AllowExecCtx() {
216   if (support_enabled_.load(std::memory_order_relaxed)) {
217     NoDestructSingleton<ExecCtxState>::Get()->AllowExecCtx();
218   }
219 }
220 
IncThreadCount()221 void Fork::IncThreadCount() {
222   if (support_enabled_.load(std::memory_order_relaxed)) {
223     NoDestructSingleton<ThreadState>::Get()->IncThreadCount();
224   }
225 }
226 
DecThreadCount()227 void Fork::DecThreadCount() {
228   if (support_enabled_.load(std::memory_order_relaxed)) {
229     NoDestructSingleton<ThreadState>::Get()->DecThreadCount();
230   }
231 }
AwaitThreads()232 void Fork::AwaitThreads() {
233   if (support_enabled_.load(std::memory_order_relaxed)) {
234     NoDestructSingleton<ThreadState>::Get()->AwaitThreads();
235   }
236 }
237 
238 std::atomic<bool> Fork::support_enabled_(false);
239 bool Fork::override_enabled_ = false;
240 std::set<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
241     nullptr;
242 }  // namespace grpc_core
243