1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/channel/connected_channel.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/status.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/port_platform.h>
25 #include <inttypes.h>
26
27 #include <functional>
28 #include <memory>
29 #include <string>
30 #include <type_traits>
31 #include <utility>
32
33 #include "absl/log/check.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/types/optional.h"
37 #include "src/core/config/core_configuration.h"
38 #include "src/core/lib/channel/call_finalization.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/channel/channel_fwd.h"
41 #include "src/core/lib/channel/channel_stack.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/experiments/experiments.h"
44 #include "src/core/lib/iomgr/call_combiner.h"
45 #include "src/core/lib/iomgr/closure.h"
46 #include "src/core/lib/iomgr/error.h"
47 #include "src/core/lib/iomgr/polling_entity.h"
48 #include "src/core/lib/promise/activity.h"
49 #include "src/core/lib/promise/arena_promise.h"
50 #include "src/core/lib/promise/context.h"
51 #include "src/core/lib/promise/detail/status.h"
52 #include "src/core/lib/promise/for_each.h"
53 #include "src/core/lib/promise/if.h"
54 #include "src/core/lib/promise/latch.h"
55 #include "src/core/lib/promise/loop.h"
56 #include "src/core/lib/promise/map.h"
57 #include "src/core/lib/promise/party.h"
58 #include "src/core/lib/promise/pipe.h"
59 #include "src/core/lib/promise/poll.h"
60 #include "src/core/lib/promise/promise.h"
61 #include "src/core/lib/promise/race.h"
62 #include "src/core/lib/promise/seq.h"
63 #include "src/core/lib/promise/try_seq.h"
64 #include "src/core/lib/resource_quota/arena.h"
65 #include "src/core/lib/slice/slice.h"
66 #include "src/core/lib/slice/slice_buffer.h"
67 #include "src/core/lib/surface/call.h"
68 #include "src/core/lib/surface/channel_stack_type.h"
69 #include "src/core/lib/transport/error_utils.h"
70 #include "src/core/lib/transport/metadata_batch.h"
71 #include "src/core/lib/transport/transport.h"
72 #include "src/core/util/alloc.h"
73 #include "src/core/util/debug_location.h"
74 #include "src/core/util/orphanable.h"
75 #include "src/core/util/ref_counted_ptr.h"
76 #include "src/core/util/time.h"
77
78 typedef struct connected_channel_channel_data {
79 grpc_core::Transport* transport;
80 } channel_data;
81
82 struct callback_state {
83 grpc_closure closure;
84 grpc_closure* original_closure;
85 grpc_core::CallCombiner* call_combiner;
86 const char* reason;
87 };
88 typedef struct connected_channel_call_data {
89 grpc_core::CallCombiner* call_combiner;
90 // Closures used for returning results on the call combiner.
91 callback_state on_complete[6]; // Max number of pending batches.
92 callback_state recv_initial_metadata_ready;
93 callback_state recv_message_ready;
94 callback_state recv_trailing_metadata_ready;
95 } call_data;
96
run_in_call_combiner(void * arg,grpc_error_handle error)97 static void run_in_call_combiner(void* arg, grpc_error_handle error) {
98 callback_state* state = static_cast<callback_state*>(arg);
99 GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure, error,
100 state->reason);
101 }
102
run_cancel_in_call_combiner(void * arg,grpc_error_handle error)103 static void run_cancel_in_call_combiner(void* arg, grpc_error_handle error) {
104 run_in_call_combiner(arg, error);
105 gpr_free(arg);
106 }
107
intercept_callback(call_data * calld,callback_state * state,bool free_when_done,const char * reason,grpc_closure ** original_closure)108 static void intercept_callback(call_data* calld, callback_state* state,
109 bool free_when_done, const char* reason,
110 grpc_closure** original_closure) {
111 state->original_closure = *original_closure;
112 state->call_combiner = calld->call_combiner;
113 state->reason = reason;
114 *original_closure = GRPC_CLOSURE_INIT(
115 &state->closure,
116 free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
117 state, grpc_schedule_on_exec_ctx);
118 }
119
get_state_for_batch(call_data * calld,grpc_transport_stream_op_batch * batch)120 static callback_state* get_state_for_batch(
121 call_data* calld, grpc_transport_stream_op_batch* batch) {
122 if (batch->send_initial_metadata) return &calld->on_complete[0];
123 if (batch->send_message) return &calld->on_complete[1];
124 if (batch->send_trailing_metadata) return &calld->on_complete[2];
125 if (batch->recv_initial_metadata) return &calld->on_complete[3];
126 if (batch->recv_message) return &calld->on_complete[4];
127 if (batch->recv_trailing_metadata) return &calld->on_complete[5];
128 GPR_UNREACHABLE_CODE(return nullptr);
129 }
130
131 // We perform a small hack to locate transport data alongside the connected
132 // channel data in call allocations, to allow everything to be pulled in minimal
133 // cache line requests
134 #define TRANSPORT_STREAM_FROM_CALL_DATA(calld) \
135 ((grpc_stream*)(((char*)(calld)) + \
136 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(call_data))))
137 #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
138 ((call_data*)(((char*)(transport_stream)) - \
139 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(call_data))))
140
141 // Intercept a call operation and either push it directly up or translate it
142 // into transport stream operations
connected_channel_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)143 static void connected_channel_start_transport_stream_op_batch(
144 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
145 call_data* calld = static_cast<call_data*>(elem->call_data);
146 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
147 if (batch->recv_initial_metadata) {
148 callback_state* state = &calld->recv_initial_metadata_ready;
149 intercept_callback(
150 calld, state, false, "recv_initial_metadata_ready",
151 &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
152 }
153 if (batch->recv_message) {
154 callback_state* state = &calld->recv_message_ready;
155 intercept_callback(calld, state, false, "recv_message_ready",
156 &batch->payload->recv_message.recv_message_ready);
157 }
158 if (batch->recv_trailing_metadata) {
159 callback_state* state = &calld->recv_trailing_metadata_ready;
160 intercept_callback(
161 calld, state, false, "recv_trailing_metadata_ready",
162 &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
163 }
164 if (batch->cancel_stream) {
165 // There can be more than one cancellation batch in flight at any
166 // given time, so we can't just pick out a fixed index into
167 // calld->on_complete like we can for the other ops. However,
168 // cancellation isn't in the fast path, so we just allocate a new
169 // closure for each one.
170 callback_state* state =
171 static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
172 intercept_callback(calld, state, true, "on_complete (cancel_stream)",
173 &batch->on_complete);
174 } else if (batch->on_complete != nullptr) {
175 callback_state* state = get_state_for_batch(calld, batch);
176 intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
177 }
178 chand->transport->filter_stack_transport()->PerformStreamOp(
179 TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch);
180 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport");
181 }
182
connected_channel_start_transport_op(grpc_channel_element * elem,grpc_transport_op * op)183 static void connected_channel_start_transport_op(grpc_channel_element* elem,
184 grpc_transport_op* op) {
185 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
186 chand->transport->PerformOp(op);
187 }
188
189 // Constructor for call_data
connected_channel_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)190 static grpc_error_handle connected_channel_init_call_elem(
191 grpc_call_element* elem, const grpc_call_element_args* args) {
192 call_data* calld = static_cast<call_data*>(elem->call_data);
193 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
194 calld->call_combiner = args->call_combiner;
195 chand->transport->filter_stack_transport()->InitStream(
196 TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount,
197 args->server_transport_data, args->arena);
198 return absl::OkStatus();
199 }
200
set_pollset_or_pollset_set(grpc_call_element * elem,grpc_polling_entity * pollent)201 static void set_pollset_or_pollset_set(grpc_call_element* elem,
202 grpc_polling_entity* pollent) {
203 call_data* calld = static_cast<call_data*>(elem->call_data);
204 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
205 chand->transport->SetPollingEntity(TRANSPORT_STREAM_FROM_CALL_DATA(calld),
206 pollent);
207 }
208
209 // Destructor for call_data
connected_channel_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)210 static void connected_channel_destroy_call_elem(
211 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
212 grpc_closure* then_schedule_closure) {
213 call_data* calld = static_cast<call_data*>(elem->call_data);
214 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
215 chand->transport->filter_stack_transport()->DestroyStream(
216 TRANSPORT_STREAM_FROM_CALL_DATA(calld), then_schedule_closure);
217 }
218
219 // Constructor for channel_data
connected_channel_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)220 static grpc_error_handle connected_channel_init_channel_elem(
221 grpc_channel_element* elem, grpc_channel_element_args* args) {
222 channel_data* cd = static_cast<channel_data*>(elem->channel_data);
223 CHECK(args->is_last);
224 cd->transport = args->channel_args.GetObject<grpc_core::Transport>();
225 return absl::OkStatus();
226 }
227
228 // Destructor for channel_data
connected_channel_destroy_channel_elem(grpc_channel_element * elem)229 static void connected_channel_destroy_channel_elem(grpc_channel_element* elem) {
230 channel_data* cd = static_cast<channel_data*>(elem->channel_data);
231 if (cd->transport) {
232 cd->transport->Orphan();
233 }
234 }
235
236 // No-op.
connected_channel_get_channel_info(grpc_channel_element *,const grpc_channel_info *)237 static void connected_channel_get_channel_info(
238 grpc_channel_element* /*elem*/, const grpc_channel_info* /*channel_info*/) {
239 }
240
241 namespace grpc_core {
242 namespace {
243 const grpc_channel_filter kConnectedFilter{
244 connected_channel_start_transport_stream_op_batch,
245 connected_channel_start_transport_op,
246 sizeof(call_data),
247 connected_channel_init_call_elem,
248 set_pollset_or_pollset_set,
249 connected_channel_destroy_call_elem,
250 sizeof(channel_data),
251 connected_channel_init_channel_elem,
__anonc8181c9b0202() 252 +[](grpc_channel_stack* channel_stack, grpc_channel_element* elem) {
253 // HACK(ctiller): increase call stack size for the channel to make
254 // space for channel data. We need a cleaner (but performant) way to
255 // do this, and I'm not sure what that is yet. This is only "safe"
256 // because call stacks place no additional data after the last call
257 // element, and the last call element MUST be the connected channel.
258 auto* transport =
259 static_cast<channel_data*>(elem->channel_data)->transport;
260 if (transport->filter_stack_transport() != nullptr) {
261 channel_stack->call_stack_size +=
262 transport->filter_stack_transport()->SizeOfStream();
263 }
264 },
265 connected_channel_destroy_channel_elem,
266 connected_channel_get_channel_info,
267 GRPC_UNIQUE_TYPE_NAME_HERE("connected"),
268 };
269
270 // noop filter for the v3 stack: placeholder for now because other code requires
271 // we have a terminator.
272 // TODO(ctiller): delete when v3 transition is complete.
273 const grpc_channel_filter kPromiseBasedTransportFilter = {
274 nullptr,
275 connected_channel_start_transport_op,
276 0,
277 nullptr,
278 set_pollset_or_pollset_set,
279 nullptr,
280 sizeof(channel_data),
__anonc8181c9b0302() 281 +[](grpc_channel_element*, grpc_channel_element_args*) {
282 return absl::InternalError(
283 "Cannot use filter based stack with promise based transports");
284 },
__anonc8181c9b0402() 285 +[](grpc_channel_stack*, grpc_channel_element*) {},
286 connected_channel_destroy_channel_elem,
287 connected_channel_get_channel_info,
288 GRPC_UNIQUE_TYPE_NAME_HERE("connected"),
289 };
290
TransportSupportsClientPromiseBasedCalls(const ChannelArgs & args)291 bool TransportSupportsClientPromiseBasedCalls(const ChannelArgs& args) {
292 auto* transport = args.GetObject<Transport>();
293 return transport->client_transport() != nullptr;
294 }
295
TransportSupportsServerPromiseBasedCalls(const ChannelArgs & args)296 bool TransportSupportsServerPromiseBasedCalls(const ChannelArgs& args) {
297 auto* transport = args.GetObject<Transport>();
298 return transport->server_transport() != nullptr;
299 }
300 } // namespace
301
RegisterConnectedChannel(CoreConfiguration::Builder * builder)302 void RegisterConnectedChannel(CoreConfiguration::Builder* builder) {
303 // We can't know promise based call or not here (that decision needs the
304 // collaboration of all of the filters on the channel, and we don't want
305 // ordering constraints on when we add filters).
306 // We can know if this results in a promise based call how we'll create
307 // our promise (if indeed we can), and so that is the choice made here.
308
309 // Option 1, and our ideal: the transport supports promise based calls,
310 // and so we simply use the transport directly.
311 builder->channel_init()
312 ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kPromiseBasedTransportFilter)
313 .Terminal()
314 .If(TransportSupportsClientPromiseBasedCalls);
315 builder->channel_init()
316 ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
317 &kPromiseBasedTransportFilter)
318 .Terminal()
319 .If(TransportSupportsClientPromiseBasedCalls);
320 builder->channel_init()
321 ->RegisterFilter(GRPC_SERVER_CHANNEL, &kPromiseBasedTransportFilter)
322 .Terminal()
323 .If(TransportSupportsServerPromiseBasedCalls);
324
325 // Option 2: the transport does not support promise based calls.
326 builder->channel_init()
327 ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kConnectedFilter)
328 .Terminal()
329 .IfNot(TransportSupportsClientPromiseBasedCalls);
330 builder->channel_init()
331 ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &kConnectedFilter)
332 .Terminal()
333 .IfNot(TransportSupportsClientPromiseBasedCalls);
334 builder->channel_init()
335 ->RegisterFilter(GRPC_SERVER_CHANNEL, &kConnectedFilter)
336 .Terminal()
337 .IfNot(TransportSupportsServerPromiseBasedCalls);
338 }
339
340 } // namespace grpc_core
341