• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/iomgr/combiner.h"
20 
21 #include <assert.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/port_platform.h>
24 #include <inttypes.h>
25 #include <string.h>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "src/core/lib/experiments/experiments.h"
30 #include "src/core/lib/iomgr/executor.h"
31 #include "src/core/lib/iomgr/iomgr_internal.h"
32 #include "src/core/util/crash.h"
33 #include "src/core/util/mpscq.h"
34 
35 #define STATE_UNORPHANED 1
36 #define STATE_ELEM_COUNT_LOW_BIT 2
37 
38 static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* closure,
39                           grpc_error_handle error);
40 static void combiner_finally_exec(grpc_core::Combiner* lock,
41                                   grpc_closure* closure,
42                                   grpc_error_handle error);
43 
grpc_combiner_create(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)44 grpc_core::Combiner* grpc_combiner_create(
45     std::shared_ptr<grpc_event_engine::experimental::EventEngine>
46         event_engine) {
47   grpc_core::Combiner* lock = new grpc_core::Combiner();
48   lock->event_engine = event_engine;
49   gpr_ref_init(&lock->refs, 1);
50   gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
51   grpc_closure_list_init(&lock->final_list);
52   GRPC_TRACE_LOG(combiner, INFO) << "C:" << lock << " create";
53   return lock;
54 }
55 
really_destroy(grpc_core::Combiner * lock)56 static void really_destroy(grpc_core::Combiner* lock) {
57   GRPC_TRACE_LOG(combiner, INFO) << "C:" << lock << " really_destroy";
58   CHECK_EQ(gpr_atm_no_barrier_load(&lock->state), 0);
59   delete lock;
60 }
61 
start_destroy(grpc_core::Combiner * lock)62 static void start_destroy(grpc_core::Combiner* lock) {
63   gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
64   GRPC_TRACE_LOG(combiner, INFO)
65       << "C:" << lock << " really_destroy old_state=" << old_state;
66   if (old_state == 1) {
67     really_destroy(lock);
68   }
69 }
70 
71 #ifndef NDEBUG
72 #define GRPC_COMBINER_DEBUG_SPAM(op, delta)                          \
73   GRPC_TRACE_VLOG(combiner, 2).AtLocation(file, line)                \
74       << "C:" << lock << " " << (op) << " "                          \
75       << gpr_atm_no_barrier_load(&lock->refs.count) << " --> "       \
76       << gpr_atm_no_barrier_load(&lock->refs.count) + (delta) << " " \
77       << reason;
78 #else
79 #define GRPC_COMBINER_DEBUG_SPAM(op, delta)
80 #endif
81 
grpc_combiner_unref(grpc_core::Combiner * lock GRPC_COMBINER_DEBUG_ARGS)82 void grpc_combiner_unref(grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
83   GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
84   if (gpr_unref(&lock->refs)) {
85     start_destroy(lock);
86   }
87 }
88 
grpc_combiner_ref(grpc_core::Combiner * lock GRPC_COMBINER_DEBUG_ARGS)89 grpc_core::Combiner* grpc_combiner_ref(
90     grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
91   GRPC_COMBINER_DEBUG_SPAM("  REF", 1);
92   gpr_ref(&lock->refs);
93   return lock;
94 }
95 
push_last_on_exec_ctx(grpc_core::Combiner * lock)96 static void push_last_on_exec_ctx(grpc_core::Combiner* lock) {
97   lock->next_combiner_on_this_exec_ctx = nullptr;
98   if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
99     grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
100         grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
101   } else {
102     grpc_core::ExecCtx::Get()
103         ->combiner_data()
104         ->last_combiner->next_combiner_on_this_exec_ctx = lock;
105     grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
106   }
107 }
108 
push_first_on_exec_ctx(grpc_core::Combiner * lock)109 static void push_first_on_exec_ctx(grpc_core::Combiner* lock) {
110   lock->next_combiner_on_this_exec_ctx =
111       grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
112   grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock;
113   if (lock->next_combiner_on_this_exec_ctx == nullptr) {
114     grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
115   }
116 }
117 
combiner_exec(grpc_core::Combiner * lock,grpc_closure * cl,grpc_error_handle error)118 static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl,
119                           grpc_error_handle error) {
120   gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
121   GRPC_TRACE_LOG(combiner, INFO)
122       << "C:" << lock << " grpc_combiner_execute c=" << cl << " last=" << last;
123   if (last == 1) {
124     gpr_atm_no_barrier_store(
125         &lock->initiating_exec_ctx_or_null,
126         reinterpret_cast<gpr_atm>(grpc_core::ExecCtx::Get()));
127     // first element on this list: add it to the list of combiner locks
128     // executing within this exec_ctx
129     push_last_on_exec_ctx(lock);
130   } else {
131     // there may be a race with setting here: if that happens, we may delay
132     // offload for one or two actions, and that's fine
133     gpr_atm initiator =
134         gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
135     if (initiator != 0 &&
136         initiator != reinterpret_cast<gpr_atm>(grpc_core::ExecCtx::Get())) {
137       gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
138     }
139   }
140   CHECK(last & STATE_UNORPHANED);  // ensure lock has not been destroyed
141   assert(cl->cb);
142   cl->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
143   lock->queue.Push(cl->next_data.mpscq_node.get());
144 }
145 
move_next()146 static void move_next() {
147   grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
148       grpc_core::ExecCtx::Get()
149           ->combiner_data()
150           ->active_combiner->next_combiner_on_this_exec_ctx;
151   if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
152     grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = nullptr;
153   }
154 }
155 
queue_offload(grpc_core::Combiner * lock)156 static void queue_offload(grpc_core::Combiner* lock) {
157   move_next();
158   // Make the combiner look uncontended by storing a non-null value here, so
159   // that we don't immediately offload again.
160   gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 1);
161   GRPC_TRACE_LOG(combiner, INFO) << "C:" << lock << " queue_offload";
162   lock->event_engine->Run([lock] {
163     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
164     grpc_core::ExecCtx exec_ctx(0);
165     push_last_on_exec_ctx(lock);
166     exec_ctx.Flush();
167   });
168 }
169 
grpc_combiner_continue_exec_ctx()170 bool grpc_combiner_continue_exec_ctx() {
171   grpc_core::Combiner* lock =
172       grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
173   if (lock == nullptr) {
174     return false;
175   }
176 
177   bool contended =
178       gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) == 0;
179 
180   GRPC_TRACE_LOG(combiner, INFO)
181       << "C:" << lock << " grpc_combiner_continue_exec_ctx "
182       << "contended=" << contended << " exec_ctx_ready_to_finish="
183       << grpc_core::ExecCtx::Get()->IsReadyToFinish()
184       << " time_to_execute_final_list=" << lock->time_to_execute_final_list;
185 
186   // offload only if both (1) the combiner is contended and has more than one
187   // closure to execute, and (2) the current execution context needs to finish
188   // as soon as possible
189   if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish()) {
190     // this execution context wants to move on: schedule remaining work to be
191     // picked up on the executor
192     queue_offload(lock);
193     return true;
194   }
195 
196   if (!lock->time_to_execute_final_list ||
197       // peek to see if something new has shown up, and execute that with
198       // priority
199       (gpr_atm_acq_load(&lock->state) >> 1) > 1) {
200     grpc_core::MultiProducerSingleConsumerQueue::Node* n = lock->queue.Pop();
201     GRPC_TRACE_LOG(combiner, INFO)
202         << "C:" << lock << " maybe_finish_one n=" << n;
203 
204     if (n == nullptr) {
205       // queue is in an inconsistent state: use this as a cue that we should
206       // go off and do something else for a while (and come back later)
207       queue_offload(lock);
208       return true;
209     }
210     grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
211 #ifndef NDEBUG
212     cl->scheduled = false;
213 #endif
214     grpc_error_handle cl_err =
215         grpc_core::internal::StatusMoveFromHeapPtr(cl->error_data.error);
216     cl->error_data.error = 0;
217     cl->cb(cl->cb_arg, std::move(cl_err));
218   } else {
219     grpc_closure* c = lock->final_list.head;
220     CHECK_NE(c, nullptr);
221     grpc_closure_list_init(&lock->final_list);
222     int loops = 0;
223     while (c != nullptr) {
224       GRPC_TRACE_LOG(combiner, INFO)
225           << "C:" << lock << " execute_final[" << loops << "] c=" << c;
226       grpc_closure* next = c->next_data.next;
227 #ifndef NDEBUG
228       c->scheduled = false;
229 #endif
230       grpc_error_handle error =
231           grpc_core::internal::StatusMoveFromHeapPtr(c->error_data.error);
232       c->error_data.error = 0;
233       c->cb(c->cb_arg, std::move(error));
234       c = next;
235     }
236   }
237 
238   move_next();
239   lock->time_to_execute_final_list = false;
240   gpr_atm old_state =
241       gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT);
242   GRPC_TRACE_LOG(combiner, INFO)
243       << "C:" << lock << " finish old_state=" << old_state;
244 // Define a macro to ease readability of the following switch statement.
245 #define OLD_STATE_WAS(orphaned, elem_count) \
246   (((orphaned) ? 0 : STATE_UNORPHANED) |    \
247    ((elem_count) * STATE_ELEM_COUNT_LOW_BIT))
248   // Depending on what the previous state was, we need to perform different
249   // actions.
250   switch (old_state) {
251     default:
252       // we have multiple queued work items: just continue executing them
253       break;
254     case OLD_STATE_WAS(false, 2):
255     case OLD_STATE_WAS(true, 2):
256       // we're down to one queued item: if it's the final list we should do that
257       if (!grpc_closure_list_empty(lock->final_list)) {
258         lock->time_to_execute_final_list = true;
259       }
260       break;
261     case OLD_STATE_WAS(false, 1):
262       // had one count, one unorphaned --> unlocked unorphaned
263       return true;
264     case OLD_STATE_WAS(true, 1):
265       // and one count, one orphaned --> unlocked and orphaned
266       really_destroy(lock);
267       return true;
268     case OLD_STATE_WAS(false, 0):
269     case OLD_STATE_WAS(true, 0):
270       // these values are illegal - representing an already unlocked or
271       // deleted lock
272       GPR_UNREACHABLE_CODE(return true);
273   }
274   push_first_on_exec_ctx(lock);
275   return true;
276 }
277 
278 static void enqueue_finally(void* closure, grpc_error_handle error);
279 
combiner_finally_exec(grpc_core::Combiner * lock,grpc_closure * closure,grpc_error_handle error)280 static void combiner_finally_exec(grpc_core::Combiner* lock,
281                                   grpc_closure* closure,
282                                   grpc_error_handle error) {
283   CHECK_NE(lock, nullptr);
284   GRPC_TRACE_LOG(combiner, INFO)
285       << "C:" << lock << " grpc_combiner_execute_finally c=" << closure
286       << "; ac=" << grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
287   if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
288     // Using error_data.scratch to store the combiner so that it can be accessed
289     // in enqueue_finally.
290     closure->error_data.scratch = reinterpret_cast<uintptr_t>(lock);
291     lock->Run(GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error);
292     return;
293   }
294 
295   if (grpc_closure_list_empty(lock->final_list)) {
296     gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
297   }
298   grpc_closure_list_append(&lock->final_list, closure, error);
299 }
300 
enqueue_finally(void * closure,grpc_error_handle error)301 static void enqueue_finally(void* closure, grpc_error_handle error) {
302   grpc_closure* cl = static_cast<grpc_closure*>(closure);
303   grpc_core::Combiner* lock =
304       reinterpret_cast<grpc_core::Combiner*>(cl->error_data.scratch);
305   cl->error_data.scratch = 0;
306   combiner_finally_exec(lock, cl, error);
307 }
308 
309 namespace grpc_core {
Run(grpc_closure * closure,grpc_error_handle error)310 void Combiner::Run(grpc_closure* closure, grpc_error_handle error) {
311   combiner_exec(this, closure, error);
312 }
313 
FinallyRun(grpc_closure * closure,grpc_error_handle error)314 void Combiner::FinallyRun(grpc_closure* closure, grpc_error_handle error) {
315   combiner_finally_exec(this, closure, error);
316 }
317 
ForceOffload()318 void Combiner::ForceOffload() {
319   gpr_atm_no_barrier_store(&initiating_exec_ctx_or_null, 0);
320   ExecCtx::Get()->SetReadyToFinishFlag();
321 }
322 
323 }  // namespace grpc_core
324