1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/channel/connected_channel.h"
22
23 #include <stdarg.h>
24 #include <stdio.h>
25 #include <string.h>
26
27 #include <grpc/byte_buffer.h>
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/profiling/timers.h"
33 #include "src/core/lib/transport/transport.h"
34
35 #define MAX_BUFFER_LENGTH 8192
36
37 typedef struct connected_channel_channel_data {
38 grpc_transport* transport;
39 } channel_data;
40
41 typedef struct {
42 grpc_closure closure;
43 grpc_closure* original_closure;
44 grpc_call_combiner* call_combiner;
45 const char* reason;
46 } callback_state;
47
48 typedef struct connected_channel_call_data {
49 grpc_call_combiner* call_combiner;
50 // Closures used for returning results on the call combiner.
51 callback_state on_complete[6]; // Max number of pending batches.
52 callback_state recv_initial_metadata_ready;
53 callback_state recv_message_ready;
54 callback_state recv_trailing_metadata_ready;
55 } call_data;
56
run_in_call_combiner(void * arg,grpc_error * error)57 static void run_in_call_combiner(void* arg, grpc_error* error) {
58 callback_state* state = static_cast<callback_state*>(arg);
59 GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure,
60 GRPC_ERROR_REF(error), state->reason);
61 }
62
run_cancel_in_call_combiner(void * arg,grpc_error * error)63 static void run_cancel_in_call_combiner(void* arg, grpc_error* error) {
64 run_in_call_combiner(arg, error);
65 gpr_free(arg);
66 }
67
intercept_callback(call_data * calld,callback_state * state,bool free_when_done,const char * reason,grpc_closure ** original_closure)68 static void intercept_callback(call_data* calld, callback_state* state,
69 bool free_when_done, const char* reason,
70 grpc_closure** original_closure) {
71 state->original_closure = *original_closure;
72 state->call_combiner = calld->call_combiner;
73 state->reason = reason;
74 *original_closure = GRPC_CLOSURE_INIT(
75 &state->closure,
76 free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
77 state, grpc_schedule_on_exec_ctx);
78 }
79
get_state_for_batch(call_data * calld,grpc_transport_stream_op_batch * batch)80 static callback_state* get_state_for_batch(
81 call_data* calld, grpc_transport_stream_op_batch* batch) {
82 if (batch->send_initial_metadata) return &calld->on_complete[0];
83 if (batch->send_message) return &calld->on_complete[1];
84 if (batch->send_trailing_metadata) return &calld->on_complete[2];
85 if (batch->recv_initial_metadata) return &calld->on_complete[3];
86 if (batch->recv_message) return &calld->on_complete[4];
87 if (batch->recv_trailing_metadata) return &calld->on_complete[5];
88 GPR_UNREACHABLE_CODE(return nullptr);
89 }
90
91 /* We perform a small hack to locate transport data alongside the connected
92 channel data in call allocations, to allow everything to be pulled in minimal
93 cache line requests */
94 #define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream*)((calld) + 1))
95 #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
96 (((call_data*)(transport_stream)) - 1)
97
98 /* Intercept a call operation and either push it directly up or translate it
99 into transport stream operations */
con_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)100 static void con_start_transport_stream_op_batch(
101 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
102 call_data* calld = static_cast<call_data*>(elem->call_data);
103 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
104 if (batch->recv_initial_metadata) {
105 callback_state* state = &calld->recv_initial_metadata_ready;
106 intercept_callback(
107 calld, state, false, "recv_initial_metadata_ready",
108 &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
109 }
110 if (batch->recv_message) {
111 callback_state* state = &calld->recv_message_ready;
112 intercept_callback(calld, state, false, "recv_message_ready",
113 &batch->payload->recv_message.recv_message_ready);
114 }
115 if (batch->recv_trailing_metadata) {
116 callback_state* state = &calld->recv_trailing_metadata_ready;
117 intercept_callback(
118 calld, state, false, "recv_trailing_metadata_ready",
119 &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
120 }
121 if (batch->cancel_stream) {
122 // There can be more than one cancellation batch in flight at any
123 // given time, so we can't just pick out a fixed index into
124 // calld->on_complete like we can for the other ops. However,
125 // cancellation isn't in the fast path, so we just allocate a new
126 // closure for each one.
127 callback_state* state =
128 static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
129 intercept_callback(calld, state, true, "on_complete (cancel_stream)",
130 &batch->on_complete);
131 } else if (batch->on_complete != nullptr) {
132 callback_state* state = get_state_for_batch(calld, batch);
133 intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
134 }
135 grpc_transport_perform_stream_op(
136 chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch);
137 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport");
138 }
139
con_start_transport_op(grpc_channel_element * elem,grpc_transport_op * op)140 static void con_start_transport_op(grpc_channel_element* elem,
141 grpc_transport_op* op) {
142 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
143 grpc_transport_perform_op(chand->transport, op);
144 }
145
146 /* Constructor for call_data */
init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)147 static grpc_error* init_call_elem(grpc_call_element* elem,
148 const grpc_call_element_args* args) {
149 call_data* calld = static_cast<call_data*>(elem->call_data);
150 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
151 calld->call_combiner = args->call_combiner;
152 int r = grpc_transport_init_stream(
153 chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
154 &args->call_stack->refcount, args->server_transport_data, args->arena);
155 return r == 0 ? GRPC_ERROR_NONE
156 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
157 "transport stream initialization failed");
158 }
159
set_pollset_or_pollset_set(grpc_call_element * elem,grpc_polling_entity * pollent)160 static void set_pollset_or_pollset_set(grpc_call_element* elem,
161 grpc_polling_entity* pollent) {
162 call_data* calld = static_cast<call_data*>(elem->call_data);
163 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
164 grpc_transport_set_pops(chand->transport,
165 TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent);
166 }
167
168 /* Destructor for call_data */
destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)169 static void destroy_call_elem(grpc_call_element* elem,
170 const grpc_call_final_info* final_info,
171 grpc_closure* then_schedule_closure) {
172 call_data* calld = static_cast<call_data*>(elem->call_data);
173 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
174 grpc_transport_destroy_stream(chand->transport,
175 TRANSPORT_STREAM_FROM_CALL_DATA(calld),
176 then_schedule_closure);
177 }
178
179 /* Constructor for channel_data */
init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)180 static grpc_error* init_channel_elem(grpc_channel_element* elem,
181 grpc_channel_element_args* args) {
182 channel_data* cd = static_cast<channel_data*>(elem->channel_data);
183 GPR_ASSERT(args->is_last);
184 cd->transport = nullptr;
185 return GRPC_ERROR_NONE;
186 }
187
188 /* Destructor for channel_data */
destroy_channel_elem(grpc_channel_element * elem)189 static void destroy_channel_elem(grpc_channel_element* elem) {
190 channel_data* cd = static_cast<channel_data*>(elem->channel_data);
191 if (cd->transport) {
192 grpc_transport_destroy(cd->transport);
193 }
194 }
195
196 /* No-op. */
con_get_channel_info(grpc_channel_element * elem,const grpc_channel_info * channel_info)197 static void con_get_channel_info(grpc_channel_element* elem,
198 const grpc_channel_info* channel_info) {}
199
200 const grpc_channel_filter grpc_connected_filter = {
201 con_start_transport_stream_op_batch,
202 con_start_transport_op,
203 sizeof(call_data),
204 init_call_elem,
205 set_pollset_or_pollset_set,
206 destroy_call_elem,
207 sizeof(channel_data),
208 init_channel_elem,
209 destroy_channel_elem,
210 con_get_channel_info,
211 "connected",
212 };
213
bind_transport(grpc_channel_stack * channel_stack,grpc_channel_element * elem,void * t)214 static void bind_transport(grpc_channel_stack* channel_stack,
215 grpc_channel_element* elem, void* t) {
216 channel_data* cd = static_cast<channel_data*>(elem->channel_data);
217 GPR_ASSERT(elem->filter == &grpc_connected_filter);
218 GPR_ASSERT(cd->transport == nullptr);
219 cd->transport = static_cast<grpc_transport*>(t);
220
221 /* HACK(ctiller): increase call stack size for the channel to make space
222 for channel data. We need a cleaner (but performant) way to do this,
223 and I'm not sure what that is yet.
224 This is only "safe" because call stacks place no additional data after
225 the last call element, and the last call element MUST be the connected
226 channel. */
227 channel_stack->call_stack_size +=
228 grpc_transport_stream_size(static_cast<grpc_transport*>(t));
229 }
230
grpc_add_connected_filter(grpc_channel_stack_builder * builder,void * arg_must_be_null)231 bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
232 void* arg_must_be_null) {
233 GPR_ASSERT(arg_must_be_null == nullptr);
234 grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
235 GPR_ASSERT(t != nullptr);
236 return grpc_channel_stack_builder_append_filter(
237 builder, &grpc_connected_filter, bind_transport, t);
238 }
239
grpc_connected_channel_get_stream(grpc_call_element * elem)240 grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem) {
241 call_data* calld = static_cast<call_data*>(elem->call_data);
242 return TRANSPORT_STREAM_FROM_CALL_DATA(calld);
243 }
244