• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/channel/connected_channel.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/status.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/port_platform.h>
25 #include <inttypes.h>
26 
27 #include <functional>
28 #include <memory>
29 #include <string>
30 #include <type_traits>
31 #include <utility>
32 
33 #include "absl/log/check.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/types/optional.h"
37 #include "src/core/config/core_configuration.h"
38 #include "src/core/lib/channel/call_finalization.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/channel/channel_fwd.h"
41 #include "src/core/lib/channel/channel_stack.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/experiments/experiments.h"
44 #include "src/core/lib/iomgr/call_combiner.h"
45 #include "src/core/lib/iomgr/closure.h"
46 #include "src/core/lib/iomgr/error.h"
47 #include "src/core/lib/iomgr/polling_entity.h"
48 #include "src/core/lib/promise/activity.h"
49 #include "src/core/lib/promise/arena_promise.h"
50 #include "src/core/lib/promise/context.h"
51 #include "src/core/lib/promise/detail/status.h"
52 #include "src/core/lib/promise/for_each.h"
53 #include "src/core/lib/promise/if.h"
54 #include "src/core/lib/promise/latch.h"
55 #include "src/core/lib/promise/loop.h"
56 #include "src/core/lib/promise/map.h"
57 #include "src/core/lib/promise/party.h"
58 #include "src/core/lib/promise/pipe.h"
59 #include "src/core/lib/promise/poll.h"
60 #include "src/core/lib/promise/promise.h"
61 #include "src/core/lib/promise/race.h"
62 #include "src/core/lib/promise/seq.h"
63 #include "src/core/lib/promise/try_seq.h"
64 #include "src/core/lib/resource_quota/arena.h"
65 #include "src/core/lib/slice/slice.h"
66 #include "src/core/lib/slice/slice_buffer.h"
67 #include "src/core/lib/surface/call.h"
68 #include "src/core/lib/surface/channel_stack_type.h"
69 #include "src/core/lib/transport/error_utils.h"
70 #include "src/core/lib/transport/metadata_batch.h"
71 #include "src/core/lib/transport/transport.h"
72 #include "src/core/util/alloc.h"
73 #include "src/core/util/debug_location.h"
74 #include "src/core/util/orphanable.h"
75 #include "src/core/util/ref_counted_ptr.h"
76 #include "src/core/util/time.h"
77 
78 typedef struct connected_channel_channel_data {
79   grpc_core::Transport* transport;
80 } channel_data;
81 
82 struct callback_state {
83   grpc_closure closure;
84   grpc_closure* original_closure;
85   grpc_core::CallCombiner* call_combiner;
86   const char* reason;
87 };
88 typedef struct connected_channel_call_data {
89   grpc_core::CallCombiner* call_combiner;
90   // Closures used for returning results on the call combiner.
91   callback_state on_complete[6];  // Max number of pending batches.
92   callback_state recv_initial_metadata_ready;
93   callback_state recv_message_ready;
94   callback_state recv_trailing_metadata_ready;
95 } call_data;
96 
run_in_call_combiner(void * arg,grpc_error_handle error)97 static void run_in_call_combiner(void* arg, grpc_error_handle error) {
98   callback_state* state = static_cast<callback_state*>(arg);
99   GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure, error,
100                            state->reason);
101 }
102 
run_cancel_in_call_combiner(void * arg,grpc_error_handle error)103 static void run_cancel_in_call_combiner(void* arg, grpc_error_handle error) {
104   run_in_call_combiner(arg, error);
105   gpr_free(arg);
106 }
107 
intercept_callback(call_data * calld,callback_state * state,bool free_when_done,const char * reason,grpc_closure ** original_closure)108 static void intercept_callback(call_data* calld, callback_state* state,
109                                bool free_when_done, const char* reason,
110                                grpc_closure** original_closure) {
111   state->original_closure = *original_closure;
112   state->call_combiner = calld->call_combiner;
113   state->reason = reason;
114   *original_closure = GRPC_CLOSURE_INIT(
115       &state->closure,
116       free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
117       state, grpc_schedule_on_exec_ctx);
118 }
119 
get_state_for_batch(call_data * calld,grpc_transport_stream_op_batch * batch)120 static callback_state* get_state_for_batch(
121     call_data* calld, grpc_transport_stream_op_batch* batch) {
122   if (batch->send_initial_metadata) return &calld->on_complete[0];
123   if (batch->send_message) return &calld->on_complete[1];
124   if (batch->send_trailing_metadata) return &calld->on_complete[2];
125   if (batch->recv_initial_metadata) return &calld->on_complete[3];
126   if (batch->recv_message) return &calld->on_complete[4];
127   if (batch->recv_trailing_metadata) return &calld->on_complete[5];
128   GPR_UNREACHABLE_CODE(return nullptr);
129 }
130 
131 // We perform a small hack to locate transport data alongside the connected
132 // channel data in call allocations, to allow everything to be pulled in minimal
133 // cache line requests
134 #define TRANSPORT_STREAM_FROM_CALL_DATA(calld) \
135   ((grpc_stream*)(((char*)(calld)) +           \
136                   GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(call_data))))
137 #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
138   ((call_data*)(((char*)(transport_stream)) -             \
139                 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(call_data))))
140 
141 // Intercept a call operation and either push it directly up or translate it
142 // into transport stream operations
connected_channel_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)143 static void connected_channel_start_transport_stream_op_batch(
144     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
145   call_data* calld = static_cast<call_data*>(elem->call_data);
146   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
147   if (batch->recv_initial_metadata) {
148     callback_state* state = &calld->recv_initial_metadata_ready;
149     intercept_callback(
150         calld, state, false, "recv_initial_metadata_ready",
151         &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
152   }
153   if (batch->recv_message) {
154     callback_state* state = &calld->recv_message_ready;
155     intercept_callback(calld, state, false, "recv_message_ready",
156                        &batch->payload->recv_message.recv_message_ready);
157   }
158   if (batch->recv_trailing_metadata) {
159     callback_state* state = &calld->recv_trailing_metadata_ready;
160     intercept_callback(
161         calld, state, false, "recv_trailing_metadata_ready",
162         &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
163   }
164   if (batch->cancel_stream) {
165     // There can be more than one cancellation batch in flight at any
166     // given time, so we can't just pick out a fixed index into
167     // calld->on_complete like we can for the other ops.  However,
168     // cancellation isn't in the fast path, so we just allocate a new
169     // closure for each one.
170     callback_state* state =
171         static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
172     intercept_callback(calld, state, true, "on_complete (cancel_stream)",
173                        &batch->on_complete);
174   } else if (batch->on_complete != nullptr) {
175     callback_state* state = get_state_for_batch(calld, batch);
176     intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
177   }
178   chand->transport->filter_stack_transport()->PerformStreamOp(
179       TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch);
180   GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport");
181 }
182 
connected_channel_start_transport_op(grpc_channel_element * elem,grpc_transport_op * op)183 static void connected_channel_start_transport_op(grpc_channel_element* elem,
184                                                  grpc_transport_op* op) {
185   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
186   chand->transport->PerformOp(op);
187 }
188 
189 // Constructor for call_data
connected_channel_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)190 static grpc_error_handle connected_channel_init_call_elem(
191     grpc_call_element* elem, const grpc_call_element_args* args) {
192   call_data* calld = static_cast<call_data*>(elem->call_data);
193   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
194   calld->call_combiner = args->call_combiner;
195   chand->transport->filter_stack_transport()->InitStream(
196       TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount,
197       args->server_transport_data, args->arena);
198   return absl::OkStatus();
199 }
200 
set_pollset_or_pollset_set(grpc_call_element * elem,grpc_polling_entity * pollent)201 static void set_pollset_or_pollset_set(grpc_call_element* elem,
202                                        grpc_polling_entity* pollent) {
203   call_data* calld = static_cast<call_data*>(elem->call_data);
204   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
205   chand->transport->SetPollingEntity(TRANSPORT_STREAM_FROM_CALL_DATA(calld),
206                                      pollent);
207 }
208 
209 // Destructor for call_data
connected_channel_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)210 static void connected_channel_destroy_call_elem(
211     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
212     grpc_closure* then_schedule_closure) {
213   call_data* calld = static_cast<call_data*>(elem->call_data);
214   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
215   chand->transport->filter_stack_transport()->DestroyStream(
216       TRANSPORT_STREAM_FROM_CALL_DATA(calld), then_schedule_closure);
217 }
218 
219 // Constructor for channel_data
connected_channel_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)220 static grpc_error_handle connected_channel_init_channel_elem(
221     grpc_channel_element* elem, grpc_channel_element_args* args) {
222   channel_data* cd = static_cast<channel_data*>(elem->channel_data);
223   CHECK(args->is_last);
224   cd->transport = args->channel_args.GetObject<grpc_core::Transport>();
225   return absl::OkStatus();
226 }
227 
228 // Destructor for channel_data
connected_channel_destroy_channel_elem(grpc_channel_element * elem)229 static void connected_channel_destroy_channel_elem(grpc_channel_element* elem) {
230   channel_data* cd = static_cast<channel_data*>(elem->channel_data);
231   if (cd->transport) {
232     cd->transport->Orphan();
233   }
234 }
235 
236 // No-op.
connected_channel_get_channel_info(grpc_channel_element *,const grpc_channel_info *)237 static void connected_channel_get_channel_info(
238     grpc_channel_element* /*elem*/, const grpc_channel_info* /*channel_info*/) {
239 }
240 
241 namespace grpc_core {
242 namespace {
243 const grpc_channel_filter kConnectedFilter{
244     connected_channel_start_transport_stream_op_batch,
245     connected_channel_start_transport_op,
246     sizeof(call_data),
247     connected_channel_init_call_elem,
248     set_pollset_or_pollset_set,
249     connected_channel_destroy_call_elem,
250     sizeof(channel_data),
251     connected_channel_init_channel_elem,
__anonc8181c9b0202() 252     +[](grpc_channel_stack* channel_stack, grpc_channel_element* elem) {
253       // HACK(ctiller): increase call stack size for the channel to make
254       // space for channel data. We need a cleaner (but performant) way to
255       // do this, and I'm not sure what that is yet. This is only "safe"
256       // because call stacks place no additional data after the last call
257       // element, and the last call element MUST be the connected channel.
258       auto* transport =
259           static_cast<channel_data*>(elem->channel_data)->transport;
260       if (transport->filter_stack_transport() != nullptr) {
261         channel_stack->call_stack_size +=
262             transport->filter_stack_transport()->SizeOfStream();
263       }
264     },
265     connected_channel_destroy_channel_elem,
266     connected_channel_get_channel_info,
267     GRPC_UNIQUE_TYPE_NAME_HERE("connected"),
268 };
269 
270 // noop filter for the v3 stack: placeholder for now because other code requires
271 // we have a terminator.
272 // TODO(ctiller): delete when v3 transition is complete.
273 const grpc_channel_filter kPromiseBasedTransportFilter = {
274     nullptr,
275     connected_channel_start_transport_op,
276     0,
277     nullptr,
278     set_pollset_or_pollset_set,
279     nullptr,
280     sizeof(channel_data),
__anonc8181c9b0302() 281     +[](grpc_channel_element*, grpc_channel_element_args*) {
282       return absl::InternalError(
283           "Cannot use filter based stack with promise based transports");
284     },
__anonc8181c9b0402() 285     +[](grpc_channel_stack*, grpc_channel_element*) {},
286     connected_channel_destroy_channel_elem,
287     connected_channel_get_channel_info,
288     GRPC_UNIQUE_TYPE_NAME_HERE("connected"),
289 };
290 
TransportSupportsClientPromiseBasedCalls(const ChannelArgs & args)291 bool TransportSupportsClientPromiseBasedCalls(const ChannelArgs& args) {
292   auto* transport = args.GetObject<Transport>();
293   return transport->client_transport() != nullptr;
294 }
295 
TransportSupportsServerPromiseBasedCalls(const ChannelArgs & args)296 bool TransportSupportsServerPromiseBasedCalls(const ChannelArgs& args) {
297   auto* transport = args.GetObject<Transport>();
298   return transport->server_transport() != nullptr;
299 }
300 }  // namespace
301 
RegisterConnectedChannel(CoreConfiguration::Builder * builder)302 void RegisterConnectedChannel(CoreConfiguration::Builder* builder) {
303   // We can't know promise based call or not here (that decision needs the
304   // collaboration of all of the filters on the channel, and we don't want
305   // ordering constraints on when we add filters).
306   // We can know if this results in a promise based call how we'll create
307   // our promise (if indeed we can), and so that is the choice made here.
308 
309   // Option 1, and our ideal: the transport supports promise based calls,
310   // and so we simply use the transport directly.
311   builder->channel_init()
312       ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kPromiseBasedTransportFilter)
313       .Terminal()
314       .If(TransportSupportsClientPromiseBasedCalls);
315   builder->channel_init()
316       ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
317                        &kPromiseBasedTransportFilter)
318       .Terminal()
319       .If(TransportSupportsClientPromiseBasedCalls);
320   builder->channel_init()
321       ->RegisterFilter(GRPC_SERVER_CHANNEL, &kPromiseBasedTransportFilter)
322       .Terminal()
323       .If(TransportSupportsServerPromiseBasedCalls);
324 
325   // Option 2: the transport does not support promise based calls.
326   builder->channel_init()
327       ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kConnectedFilter)
328       .Terminal()
329       .IfNot(TransportSupportsClientPromiseBasedCalls);
330   builder->channel_init()
331       ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &kConnectedFilter)
332       .Terminal()
333       .IfNot(TransportSupportsClientPromiseBasedCalls);
334   builder->channel_init()
335       ->RegisterFilter(GRPC_SERVER_CHANNEL, &kConnectedFilter)
336       .Terminal()
337       .IfNot(TransportSupportsServerPromiseBasedCalls);
338 }
339 
340 }  // namespace grpc_core
341