1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
25 #include <string.h>
26 #include "src/core/ext/transport/inproc/inproc_transport.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/channel.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/server.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/lib/transport/error_utils.h"
36 #include "src/core/lib/transport/transport_impl.h"
37
38 #define INPROC_LOG(...) \
39 do { \
40 if (grpc_inproc_trace.enabled()) gpr_log(__VA_ARGS__); \
41 } while (0)
42
43 static grpc_slice g_empty_slice;
44 static grpc_slice g_fake_path_key;
45 static grpc_slice g_fake_path_value;
46 static grpc_slice g_fake_auth_key;
47 static grpc_slice g_fake_auth_value;
48
49 typedef struct {
50 gpr_mu mu;
51 gpr_refcount refs;
52 } shared_mu;
53
54 typedef struct inproc_transport {
55 grpc_transport base;
56 shared_mu* mu;
57 gpr_refcount refs;
58 bool is_client;
59 grpc_connectivity_state_tracker connectivity;
60 void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
61 const void* server_data);
62 void* accept_stream_data;
63 bool is_closed;
64 struct inproc_transport* other_side;
65 struct inproc_stream* stream_list;
66 } inproc_transport;
67
68 typedef struct inproc_stream {
69 inproc_transport* t;
70 grpc_metadata_batch to_read_initial_md;
71 uint32_t to_read_initial_md_flags;
72 bool to_read_initial_md_filled;
73 grpc_metadata_batch to_read_trailing_md;
74 bool to_read_trailing_md_filled;
75 bool ops_needed;
76 bool op_closure_scheduled;
77 grpc_closure op_closure;
78 // Write buffer used only during gap at init time when client-side
79 // stream is set up but server side stream is not yet set up
80 grpc_metadata_batch write_buffer_initial_md;
81 bool write_buffer_initial_md_filled;
82 uint32_t write_buffer_initial_md_flags;
83 grpc_millis write_buffer_deadline;
84 grpc_metadata_batch write_buffer_trailing_md;
85 bool write_buffer_trailing_md_filled;
86 grpc_error* write_buffer_cancel_error;
87
88 struct inproc_stream* other_side;
89 bool other_side_closed; // won't talk anymore
90 bool write_buffer_other_side_closed; // on hold
91 grpc_stream_refcount* refs;
92 grpc_closure* closure_at_destroy;
93
94 gpr_arena* arena;
95
96 grpc_transport_stream_op_batch* send_message_op;
97 grpc_transport_stream_op_batch* send_trailing_md_op;
98 grpc_transport_stream_op_batch* recv_initial_md_op;
99 grpc_transport_stream_op_batch* recv_message_op;
100 grpc_transport_stream_op_batch* recv_trailing_md_op;
101
102 grpc_slice_buffer recv_message;
103 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
104 bool recv_inited;
105
106 bool initial_md_sent;
107 bool trailing_md_sent;
108 bool initial_md_recvd;
109 bool trailing_md_recvd;
110
111 bool closed;
112
113 grpc_error* cancel_self_error;
114 grpc_error* cancel_other_error;
115
116 grpc_millis deadline;
117
118 bool listed;
119 struct inproc_stream* stream_list_prev;
120 struct inproc_stream* stream_list_next;
121 } inproc_stream;
122
123 static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
124 static void op_state_machine(void* arg, grpc_error* error);
125
ref_transport(inproc_transport * t)126 static void ref_transport(inproc_transport* t) {
127 INPROC_LOG(GPR_INFO, "ref_transport %p", t);
128 gpr_ref(&t->refs);
129 }
130
really_destroy_transport(inproc_transport * t)131 static void really_destroy_transport(inproc_transport* t) {
132 INPROC_LOG(GPR_INFO, "really_destroy_transport %p", t);
133 grpc_connectivity_state_destroy(&t->connectivity);
134 if (gpr_unref(&t->mu->refs)) {
135 gpr_free(t->mu);
136 }
137 gpr_free(t);
138 }
139
unref_transport(inproc_transport * t)140 static void unref_transport(inproc_transport* t) {
141 INPROC_LOG(GPR_INFO, "unref_transport %p", t);
142 if (gpr_unref(&t->refs)) {
143 really_destroy_transport(t);
144 }
145 }
146
147 #ifndef NDEBUG
148 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
149 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
150 #else
151 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
152 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
153 #endif
154
ref_stream(inproc_stream * s,const char * reason)155 static void ref_stream(inproc_stream* s, const char* reason) {
156 INPROC_LOG(GPR_INFO, "ref_stream %p %s", s, reason);
157 STREAM_REF(s->refs, reason);
158 }
159
unref_stream(inproc_stream * s,const char * reason)160 static void unref_stream(inproc_stream* s, const char* reason) {
161 INPROC_LOG(GPR_INFO, "unref_stream %p %s", s, reason);
162 STREAM_UNREF(s->refs, reason);
163 }
164
really_destroy_stream(inproc_stream * s)165 static void really_destroy_stream(inproc_stream* s) {
166 INPROC_LOG(GPR_INFO, "really_destroy_stream %p", s);
167
168 GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
169 GRPC_ERROR_UNREF(s->cancel_self_error);
170 GRPC_ERROR_UNREF(s->cancel_other_error);
171
172 if (s->recv_inited) {
173 grpc_slice_buffer_destroy_internal(&s->recv_message);
174 }
175
176 unref_transport(s->t);
177
178 if (s->closure_at_destroy) {
179 GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE);
180 }
181 }
182
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)183 static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
184 bool is_initial) {
185 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
186 md = md->next) {
187 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
188 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
189 gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
190 is_client ? "CLI" : "SVR", key, value);
191 gpr_free(key);
192 gpr_free(value);
193 }
194 }
195
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,uint32_t flags,grpc_metadata_batch * out_md,uint32_t * outflags,bool * markfilled)196 static grpc_error* fill_in_metadata(inproc_stream* s,
197 const grpc_metadata_batch* metadata,
198 uint32_t flags, grpc_metadata_batch* out_md,
199 uint32_t* outflags, bool* markfilled) {
200 if (grpc_inproc_trace.enabled()) {
201 log_metadata(metadata, s->t->is_client, outflags != nullptr);
202 }
203
204 if (outflags != nullptr) {
205 *outflags = flags;
206 }
207 if (markfilled != nullptr) {
208 *markfilled = true;
209 }
210 grpc_error* error = GRPC_ERROR_NONE;
211 for (grpc_linked_mdelem* elem = metadata->list.head;
212 (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) {
213 grpc_linked_mdelem* nelem = static_cast<grpc_linked_mdelem*>(
214 gpr_arena_alloc(s->arena, sizeof(*nelem)));
215 nelem->md =
216 grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)),
217 grpc_slice_intern(GRPC_MDVALUE(elem->md)));
218
219 error = grpc_metadata_batch_link_tail(out_md, nelem);
220 }
221 return error;
222 }
223
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,gpr_arena * arena)224 static int init_stream(grpc_transport* gt, grpc_stream* gs,
225 grpc_stream_refcount* refcount, const void* server_data,
226 gpr_arena* arena) {
227 INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
228 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
229 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
230 s->arena = arena;
231
232 s->refs = refcount;
233 // Ref this stream right now
234 ref_stream(s, "inproc_init_stream:init");
235
236 grpc_metadata_batch_init(&s->to_read_initial_md);
237 s->to_read_initial_md_flags = 0;
238 s->to_read_initial_md_filled = false;
239 grpc_metadata_batch_init(&s->to_read_trailing_md);
240 s->to_read_trailing_md_filled = false;
241 grpc_metadata_batch_init(&s->write_buffer_initial_md);
242 s->write_buffer_initial_md_flags = 0;
243 s->write_buffer_initial_md_filled = false;
244 grpc_metadata_batch_init(&s->write_buffer_trailing_md);
245 s->write_buffer_trailing_md_filled = false;
246 s->ops_needed = false;
247 s->op_closure_scheduled = false;
248 GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
249 grpc_schedule_on_exec_ctx);
250 s->t = t;
251 s->closure_at_destroy = nullptr;
252 s->other_side_closed = false;
253
254 s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd =
255 s->trailing_md_recvd = false;
256
257 s->closed = false;
258
259 s->cancel_self_error = GRPC_ERROR_NONE;
260 s->cancel_other_error = GRPC_ERROR_NONE;
261 s->write_buffer_cancel_error = GRPC_ERROR_NONE;
262 s->deadline = GRPC_MILLIS_INF_FUTURE;
263 s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
264
265 s->stream_list_prev = nullptr;
266 gpr_mu_lock(&t->mu->mu);
267 s->listed = true;
268 ref_stream(s, "inproc_init_stream:list");
269 s->stream_list_next = t->stream_list;
270 if (t->stream_list) {
271 t->stream_list->stream_list_prev = s;
272 }
273 t->stream_list = s;
274 gpr_mu_unlock(&t->mu->mu);
275
276 if (!server_data) {
277 ref_transport(t);
278 inproc_transport* st = t->other_side;
279 ref_transport(st);
280 s->other_side = nullptr; // will get filled in soon
281 // Pass the client-side stream address to the server-side for a ref
282 ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server
283 // side to avoid destruction
284 INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", st->accept_stream_cb,
285 st->accept_stream_data);
286 (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)s);
287 } else {
288 // This is the server-side and is being called through accept_stream_cb
289 inproc_stream* cs = (inproc_stream*)server_data;
290 s->other_side = cs;
291 // Ref the server-side stream on behalf of the client now
292 ref_stream(s, "inproc_init_stream:srv");
293
294 // Now we are about to affect the other side, so lock the transport
295 // to make sure that it doesn't get destroyed
296 gpr_mu_lock(&s->t->mu->mu);
297 cs->other_side = s;
298 // Now transfer from the other side's write_buffer if any to the to_read
299 // buffer
300 if (cs->write_buffer_initial_md_filled) {
301 fill_in_metadata(s, &cs->write_buffer_initial_md,
302 cs->write_buffer_initial_md_flags,
303 &s->to_read_initial_md, &s->to_read_initial_md_flags,
304 &s->to_read_initial_md_filled);
305 s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline);
306 grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
307 cs->write_buffer_initial_md_filled = false;
308 }
309 if (cs->write_buffer_trailing_md_filled) {
310 fill_in_metadata(s, &cs->write_buffer_trailing_md, 0,
311 &s->to_read_trailing_md, nullptr,
312 &s->to_read_trailing_md_filled);
313 grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
314 cs->write_buffer_trailing_md_filled = false;
315 }
316 if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
317 s->cancel_other_error = cs->write_buffer_cancel_error;
318 cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
319 }
320
321 gpr_mu_unlock(&s->t->mu->mu);
322 }
323 return 0; // return value is not important
324 }
325
close_stream_locked(inproc_stream * s)326 static void close_stream_locked(inproc_stream* s) {
327 if (!s->closed) {
328 // Release the metadata that we would have written out
329 grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
330 grpc_metadata_batch_destroy(&s->write_buffer_trailing_md);
331
332 if (s->listed) {
333 inproc_stream* p = s->stream_list_prev;
334 inproc_stream* n = s->stream_list_next;
335 if (p != nullptr) {
336 p->stream_list_next = n;
337 } else {
338 s->t->stream_list = n;
339 }
340 if (n != nullptr) {
341 n->stream_list_prev = p;
342 }
343 s->listed = false;
344 unref_stream(s, "close_stream:list");
345 }
346 s->closed = true;
347 unref_stream(s, "close_stream:closing");
348 }
349 }
350
351 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)352 static void close_other_side_locked(inproc_stream* s, const char* reason) {
353 if (s->other_side != nullptr) {
354 // First release the metadata that came from the other side's arena
355 grpc_metadata_batch_destroy(&s->to_read_initial_md);
356 grpc_metadata_batch_destroy(&s->to_read_trailing_md);
357
358 unref_stream(s->other_side, reason);
359 s->other_side_closed = true;
360 s->other_side = nullptr;
361 } else if (!s->other_side_closed) {
362 s->write_buffer_other_side_closed = true;
363 }
364 }
365
366 // Call the on_complete closure associated with this stream_op_batch if
367 // this stream_op_batch is only one of the pending operations for this
368 // stream. This is called when one of the pending operations for the stream
369 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error * error,grpc_transport_stream_op_batch * op,const char * msg)370 static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
371 grpc_transport_stream_op_batch* op,
372 const char* msg) {
373 int is_sm = static_cast<int>(op == s->send_message_op);
374 int is_stm = static_cast<int>(op == s->send_trailing_md_op);
375 // TODO(vjpai): We should not consider the recv ops here, since they
376 // have their own callbacks. We should invoke a batch's on_complete
377 // as soon as all of the batch's send ops are complete, even if there
378 // are still recv ops pending.
379 int is_rim = static_cast<int>(op == s->recv_initial_md_op);
380 int is_rm = static_cast<int>(op == s->recv_message_op);
381 int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
382
383 if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
384 INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
385 GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error));
386 }
387 }
388
maybe_schedule_op_closure_locked(inproc_stream * s,grpc_error * error)389 static void maybe_schedule_op_closure_locked(inproc_stream* s,
390 grpc_error* error) {
391 if (s && s->ops_needed && !s->op_closure_scheduled) {
392 GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
393 s->op_closure_scheduled = true;
394 s->ops_needed = false;
395 }
396 }
397
fail_helper_locked(inproc_stream * s,grpc_error * error)398 static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
399 INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
400 // If we're failing this side, we need to make sure that
401 // we also send or have already sent trailing metadata
402 if (!s->trailing_md_sent) {
403 // Send trailing md to the other side indicating cancellation
404 s->trailing_md_sent = true;
405
406 grpc_metadata_batch fake_md;
407 grpc_metadata_batch_init(&fake_md);
408
409 inproc_stream* other = s->other_side;
410 grpc_metadata_batch* dest = (other == nullptr)
411 ? &s->write_buffer_trailing_md
412 : &other->to_read_trailing_md;
413 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
414 : &other->to_read_trailing_md_filled;
415 fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
416 grpc_metadata_batch_destroy(&fake_md);
417
418 if (other != nullptr) {
419 if (other->cancel_other_error == GRPC_ERROR_NONE) {
420 other->cancel_other_error = GRPC_ERROR_REF(error);
421 }
422 maybe_schedule_op_closure_locked(other, error);
423 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
424 s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
425 }
426 }
427 if (s->recv_initial_md_op) {
428 grpc_error* err;
429 if (!s->t->is_client) {
430 // If this is a server, provide initial metadata with a path and authority
431 // since it expects that as well as no error yet
432 grpc_metadata_batch fake_md;
433 grpc_metadata_batch_init(&fake_md);
434 grpc_linked_mdelem* path_md = static_cast<grpc_linked_mdelem*>(
435 gpr_arena_alloc(s->arena, sizeof(*path_md)));
436 path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value);
437 GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) ==
438 GRPC_ERROR_NONE);
439 grpc_linked_mdelem* auth_md = static_cast<grpc_linked_mdelem*>(
440 gpr_arena_alloc(s->arena, sizeof(*auth_md)));
441 auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value);
442 GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) ==
443 GRPC_ERROR_NONE);
444
445 fill_in_metadata(
446 s, &fake_md, 0,
447 s->recv_initial_md_op->payload->recv_initial_metadata
448 .recv_initial_metadata,
449 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
450 nullptr);
451 grpc_metadata_batch_destroy(&fake_md);
452 err = GRPC_ERROR_NONE;
453 } else {
454 err = GRPC_ERROR_REF(error);
455 }
456 if (s->recv_initial_md_op->payload->recv_initial_metadata
457 .trailing_metadata_available != nullptr) {
458 // Set to true unconditionally, because we're failing the call, so even
459 // if we haven't actually seen the send_trailing_metadata op from the
460 // other side, we're going to return trailing metadata anyway.
461 *s->recv_initial_md_op->payload->recv_initial_metadata
462 .trailing_metadata_available = true;
463 }
464 INPROC_LOG(GPR_INFO,
465 "fail_helper %p scheduling initial-metadata-ready %p %p", s,
466 error, err);
467 GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
468 .recv_initial_metadata_ready,
469 err);
470 // Last use of err so no need to REF and then UNREF it
471
472 complete_if_batch_end_locked(
473 s, error, s->recv_initial_md_op,
474 "fail_helper scheduling recv-initial-metadata-on-complete");
475 s->recv_initial_md_op = nullptr;
476 }
477 if (s->recv_message_op) {
478 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
479 error);
480 GRPC_CLOSURE_SCHED(
481 s->recv_message_op->payload->recv_message.recv_message_ready,
482 GRPC_ERROR_REF(error));
483 complete_if_batch_end_locked(
484 s, error, s->recv_message_op,
485 "fail_helper scheduling recv-message-on-complete");
486 s->recv_message_op = nullptr;
487 }
488 if (s->send_message_op) {
489 s->send_message_op->payload->send_message.send_message.reset();
490 complete_if_batch_end_locked(
491 s, error, s->send_message_op,
492 "fail_helper scheduling send-message-on-complete");
493 s->send_message_op = nullptr;
494 }
495 if (s->send_trailing_md_op) {
496 complete_if_batch_end_locked(
497 s, error, s->send_trailing_md_op,
498 "fail_helper scheduling send-trailng-md-on-complete");
499 s->send_trailing_md_op = nullptr;
500 }
501 if (s->recv_trailing_md_op) {
502 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
503 s, error);
504 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
505 .recv_trailing_metadata_ready,
506 GRPC_ERROR_REF(error));
507 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
508 s, error);
509 complete_if_batch_end_locked(
510 s, error, s->recv_trailing_md_op,
511 "fail_helper scheduling recv-trailing-metadata-on-complete");
512 s->recv_trailing_md_op = nullptr;
513 }
514 close_other_side_locked(s, "fail_helper:other_side");
515 close_stream_locked(s);
516
517 GRPC_ERROR_UNREF(error);
518 }
519
520 // TODO(vjpai): It should not be necessary to drain the incoming byte
521 // stream and create a new one; instead, we should simply pass the byte
522 // stream from the sender directly to the receiver as-is.
523 //
524 // Note that fixing this will also avoid the assumption in this code
525 // that the incoming byte stream's next() call will always return
526 // synchronously. That assumption is true today but may not always be
527 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)528 static void message_transfer_locked(inproc_stream* sender,
529 inproc_stream* receiver) {
530 size_t remaining =
531 sender->send_message_op->payload->send_message.send_message->length();
532 if (receiver->recv_inited) {
533 grpc_slice_buffer_destroy_internal(&receiver->recv_message);
534 }
535 grpc_slice_buffer_init(&receiver->recv_message);
536 receiver->recv_inited = true;
537 do {
538 grpc_slice message_slice;
539 grpc_closure unused;
540 GPR_ASSERT(
541 sender->send_message_op->payload->send_message.send_message->Next(
542 SIZE_MAX, &unused));
543 grpc_error* error =
544 sender->send_message_op->payload->send_message.send_message->Pull(
545 &message_slice);
546 if (error != GRPC_ERROR_NONE) {
547 cancel_stream_locked(sender, GRPC_ERROR_REF(error));
548 break;
549 }
550 GPR_ASSERT(error == GRPC_ERROR_NONE);
551 remaining -= GRPC_SLICE_LENGTH(message_slice);
552 grpc_slice_buffer_add(&receiver->recv_message, message_slice);
553 } while (remaining > 0);
554 sender->send_message_op->payload->send_message.send_message.reset();
555
556 receiver->recv_stream.Init(&receiver->recv_message, 0);
557 receiver->recv_message_op->payload->recv_message.recv_message->reset(
558 receiver->recv_stream.get());
559 INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
560 receiver);
561 GRPC_CLOSURE_SCHED(
562 receiver->recv_message_op->payload->recv_message.recv_message_ready,
563 GRPC_ERROR_NONE);
564 complete_if_batch_end_locked(
565 sender, GRPC_ERROR_NONE, sender->send_message_op,
566 "message_transfer scheduling sender on_complete");
567 complete_if_batch_end_locked(
568 receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
569 "message_transfer scheduling receiver on_complete");
570
571 receiver->recv_message_op = nullptr;
572 sender->send_message_op = nullptr;
573 }
574
op_state_machine(void * arg,grpc_error * error)575 static void op_state_machine(void* arg, grpc_error* error) {
576 // This function gets called when we have contents in the unprocessed reads
577 // Get what we want based on our ops wanted
578 // Schedule our appropriate closures
579 // and then return to ops_needed state if still needed
580
581 // Since this is a closure directly invoked by the combiner, it should not
582 // unref the error parameter explicitly; the combiner will do that implicitly
583 grpc_error* new_err = GRPC_ERROR_NONE;
584
585 bool needs_close = false;
586
587 INPROC_LOG(GPR_INFO, "op_state_machine %p", arg);
588 inproc_stream* s = static_cast<inproc_stream*>(arg);
589 gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed
590 gpr_mu_lock(mu);
591 s->op_closure_scheduled = false;
592 // cancellation takes precedence
593 inproc_stream* other = s->other_side;
594
595 if (s->cancel_self_error != GRPC_ERROR_NONE) {
596 fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
597 goto done;
598 } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
599 fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
600 goto done;
601 } else if (error != GRPC_ERROR_NONE) {
602 fail_helper_locked(s, GRPC_ERROR_REF(error));
603 goto done;
604 }
605
606 if (s->send_message_op && other) {
607 if (other->recv_message_op) {
608 message_transfer_locked(s, other);
609 maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
610 } else if (!s->t->is_client &&
611 (s->trailing_md_sent || other->recv_trailing_md_op)) {
612 // A server send will never be matched if the client is waiting
613 // for trailing metadata already
614 s->send_message_op->payload->send_message.send_message.reset();
615 complete_if_batch_end_locked(
616 s, GRPC_ERROR_NONE, s->send_message_op,
617 "op_state_machine scheduling send-message-on-complete");
618 s->send_message_op = nullptr;
619 }
620 }
621 // Pause a send trailing metadata if there is still an outstanding
622 // send message unless we know that the send message will never get
623 // matched to a receive. This happens on the client if the server has
624 // already sent status.
625 if (s->send_trailing_md_op &&
626 (!s->send_message_op ||
627 (s->t->is_client &&
628 (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
629 grpc_metadata_batch* dest = (other == nullptr)
630 ? &s->write_buffer_trailing_md
631 : &other->to_read_trailing_md;
632 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
633 : &other->to_read_trailing_md_filled;
634 if (*destfilled || s->trailing_md_sent) {
635 // The buffer is already in use; that's an error!
636 INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
637 new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
638 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
639 goto done;
640 } else {
641 if (!other || !other->closed) {
642 fill_in_metadata(s,
643 s->send_trailing_md_op->payload->send_trailing_metadata
644 .send_trailing_metadata,
645 0, dest, nullptr, destfilled);
646 }
647 s->trailing_md_sent = true;
648 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
649 INPROC_LOG(GPR_INFO,
650 "op_state_machine %p scheduling trailing-metadata-ready", s);
651 GRPC_CLOSURE_SCHED(
652 s->recv_trailing_md_op->payload->recv_trailing_metadata
653 .recv_trailing_metadata_ready,
654 GRPC_ERROR_NONE);
655 INPROC_LOG(GPR_INFO,
656 "op_state_machine %p scheduling trailing-md-on-complete", s);
657 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
658 GRPC_ERROR_NONE);
659 s->recv_trailing_md_op = nullptr;
660 needs_close = true;
661 }
662 }
663 maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
664 complete_if_batch_end_locked(
665 s, GRPC_ERROR_NONE, s->send_trailing_md_op,
666 "op_state_machine scheduling send-trailing-metadata-on-complete");
667 s->send_trailing_md_op = nullptr;
668 }
669 if (s->recv_initial_md_op) {
670 if (s->initial_md_recvd) {
671 new_err =
672 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
673 INPROC_LOG(
674 GPR_INFO,
675 "op_state_machine %p scheduling on_complete errors for already "
676 "recvd initial md %p",
677 s, new_err);
678 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
679 goto done;
680 }
681
682 if (s->to_read_initial_md_filled) {
683 s->initial_md_recvd = true;
684 new_err = fill_in_metadata(
685 s, &s->to_read_initial_md, s->to_read_initial_md_flags,
686 s->recv_initial_md_op->payload->recv_initial_metadata
687 .recv_initial_metadata,
688 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
689 nullptr);
690 s->recv_initial_md_op->payload->recv_initial_metadata
691 .recv_initial_metadata->deadline = s->deadline;
692 if (s->recv_initial_md_op->payload->recv_initial_metadata
693 .trailing_metadata_available != nullptr) {
694 *s->recv_initial_md_op->payload->recv_initial_metadata
695 .trailing_metadata_available =
696 (other != nullptr && other->send_trailing_md_op != nullptr);
697 }
698 grpc_metadata_batch_clear(&s->to_read_initial_md);
699 s->to_read_initial_md_filled = false;
700 INPROC_LOG(GPR_INFO,
701 "op_state_machine %p scheduling initial-metadata-ready %p", s,
702 new_err);
703 GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
704 .recv_initial_metadata_ready,
705 GRPC_ERROR_REF(new_err));
706 complete_if_batch_end_locked(
707 s, new_err, s->recv_initial_md_op,
708 "op_state_machine scheduling recv-initial-metadata-on-complete");
709 s->recv_initial_md_op = nullptr;
710
711 if (new_err != GRPC_ERROR_NONE) {
712 INPROC_LOG(GPR_INFO,
713 "op_state_machine %p scheduling on_complete errors2 %p", s,
714 new_err);
715 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
716 goto done;
717 }
718 }
719 }
720 if (s->recv_message_op) {
721 if (other && other->send_message_op) {
722 message_transfer_locked(other, s);
723 maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
724 }
725 }
726 if (s->recv_trailing_md_op && s->t->is_client && other &&
727 other->send_message_op) {
728 INPROC_LOG(GPR_INFO,
729 "op_state_machine %p scheduling trailing-metadata-ready %p", s,
730 GRPC_ERROR_NONE);
731 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
732 .recv_trailing_metadata_ready,
733 GRPC_ERROR_NONE);
734 maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
735 }
736 if (s->to_read_trailing_md_filled) {
737 if (s->trailing_md_recvd) {
738 new_err =
739 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
740 INPROC_LOG(
741 GPR_INFO,
742 "op_state_machine %p scheduling on_complete errors for already "
743 "recvd trailing md %p",
744 s, new_err);
745 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
746 goto done;
747 }
748 if (s->recv_message_op != nullptr) {
749 // This message needs to be wrapped up because it will never be
750 // satisfied
751 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
752 GRPC_CLOSURE_SCHED(
753 s->recv_message_op->payload->recv_message.recv_message_ready,
754 GRPC_ERROR_NONE);
755 complete_if_batch_end_locked(
756 s, new_err, s->recv_message_op,
757 "op_state_machine scheduling recv-message-on-complete");
758 s->recv_message_op = nullptr;
759 }
760 if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
761 // Nothing further will try to receive from this stream, so finish off
762 // any outstanding send_message op
763 s->send_message_op->payload->send_message.send_message.reset();
764 complete_if_batch_end_locked(
765 s, new_err, s->send_message_op,
766 "op_state_machine scheduling send-message-on-complete");
767 s->send_message_op = nullptr;
768 }
769 if (s->recv_trailing_md_op != nullptr) {
770 // We wanted trailing metadata and we got it
771 s->trailing_md_recvd = true;
772 new_err =
773 fill_in_metadata(s, &s->to_read_trailing_md, 0,
774 s->recv_trailing_md_op->payload
775 ->recv_trailing_metadata.recv_trailing_metadata,
776 nullptr, nullptr);
777 grpc_metadata_batch_clear(&s->to_read_trailing_md);
778 s->to_read_trailing_md_filled = false;
779
780 // We should schedule the recv_trailing_md_op completion if
781 // 1. this stream is the client-side
782 // 2. this stream is the server-side AND has already sent its trailing md
783 // (If the server hasn't already sent its trailing md, it doesn't have
784 // a final status, so don't mark this op complete)
785 if (s->t->is_client || s->trailing_md_sent) {
786 INPROC_LOG(GPR_INFO,
787 "op_state_machine %p scheduling trailing-md-on-complete %p",
788 s, new_err);
789 GRPC_CLOSURE_SCHED(
790 s->recv_trailing_md_op->payload->recv_trailing_metadata
791 .recv_trailing_metadata_ready,
792 GRPC_ERROR_REF(new_err));
793 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
794 GRPC_ERROR_REF(new_err));
795 s->recv_trailing_md_op = nullptr;
796 needs_close = true;
797 } else {
798 INPROC_LOG(GPR_INFO,
799 "op_state_machine %p server needs to delay handling "
800 "trailing-md-on-complete %p",
801 s, new_err);
802 }
803 } else {
804 INPROC_LOG(
805 GPR_INFO,
806 "op_state_machine %p has trailing md but not yet waiting for it", s);
807 }
808 }
809 if (s->trailing_md_recvd && s->recv_message_op) {
810 // No further message will come on this stream, so finish off the
811 // recv_message_op
812 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
813 GRPC_CLOSURE_SCHED(
814 s->recv_message_op->payload->recv_message.recv_message_ready,
815 GRPC_ERROR_NONE);
816 complete_if_batch_end_locked(
817 s, new_err, s->recv_message_op,
818 "op_state_machine scheduling recv-message-on-complete");
819 s->recv_message_op = nullptr;
820 }
821 if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
822 s->send_message_op) {
823 // Nothing further will try to receive from this stream, so finish off
824 // any outstanding send_message op
825 s->send_message_op->payload->send_message.send_message.reset();
826 complete_if_batch_end_locked(
827 s, new_err, s->send_message_op,
828 "op_state_machine scheduling send-message-on-complete");
829 s->send_message_op = nullptr;
830 }
831 if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
832 s->recv_message_op || s->recv_trailing_md_op) {
833 // Didn't get the item we wanted so we still need to get
834 // rescheduled
835 INPROC_LOG(
836 GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
837 s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
838 s->recv_message_op, s->recv_trailing_md_op);
839 s->ops_needed = true;
840 }
841 done:
842 if (needs_close) {
843 close_other_side_locked(s, "op_state_machine");
844 close_stream_locked(s);
845 }
846 gpr_mu_unlock(mu);
847 GRPC_ERROR_UNREF(new_err);
848 }
849
cancel_stream_locked(inproc_stream * s,grpc_error * error)850 static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
851 bool ret = false; // was the cancel accepted
852 INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error));
853 if (s->cancel_self_error == GRPC_ERROR_NONE) {
854 ret = true;
855 s->cancel_self_error = GRPC_ERROR_REF(error);
856 maybe_schedule_op_closure_locked(s, s->cancel_self_error);
857 // Send trailing md to the other side indicating cancellation, even if we
858 // already have
859 s->trailing_md_sent = true;
860
861 grpc_metadata_batch cancel_md;
862 grpc_metadata_batch_init(&cancel_md);
863
864 inproc_stream* other = s->other_side;
865 grpc_metadata_batch* dest = (other == nullptr)
866 ? &s->write_buffer_trailing_md
867 : &other->to_read_trailing_md;
868 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
869 : &other->to_read_trailing_md_filled;
870 fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
871 grpc_metadata_batch_destroy(&cancel_md);
872
873 if (other != nullptr) {
874 if (other->cancel_other_error == GRPC_ERROR_NONE) {
875 other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
876 }
877 maybe_schedule_op_closure_locked(other, other->cancel_other_error);
878 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
879 s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
880 }
881
882 // if we are a server and already received trailing md but
883 // couldn't complete that because we hadn't yet sent out trailing
884 // md, now's the chance
885 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
886 GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
887 .recv_trailing_metadata_ready,
888 GRPC_ERROR_REF(s->cancel_self_error));
889 complete_if_batch_end_locked(
890 s, s->cancel_self_error, s->recv_trailing_md_op,
891 "cancel_stream scheduling trailing-md-on-complete");
892 s->recv_trailing_md_op = nullptr;
893 }
894 }
895
896 close_other_side_locked(s, "cancel_stream:other_side");
897 close_stream_locked(s);
898
899 GRPC_ERROR_UNREF(error);
900 return ret;
901 }
902
do_nothing(void * arg,grpc_error * error)903 static void do_nothing(void* arg, grpc_error* error) {}
904
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)905 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
906 grpc_transport_stream_op_batch* op) {
907 INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
908 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
909 gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
910 gpr_mu_lock(mu);
911
912 if (grpc_inproc_trace.enabled()) {
913 if (op->send_initial_metadata) {
914 log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
915 s->t->is_client, true);
916 }
917 if (op->send_trailing_metadata) {
918 log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
919 s->t->is_client, false);
920 }
921 }
922 grpc_error* error = GRPC_ERROR_NONE;
923 grpc_closure* on_complete = op->on_complete;
924 // TODO(roth): This is a hack needed because we use data inside of the
925 // closure itself to do the barrier calculation (i.e., to ensure that
926 // we don't schedule the closure until all ops in the batch have been
927 // completed). This can go away once we move to a new C++ closure API
928 // that provides the ability to create a barrier closure.
929 if (on_complete == nullptr) {
930 on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
931 nullptr, grpc_schedule_on_exec_ctx);
932 }
933
934 if (op->cancel_stream) {
935 // Call cancel_stream_locked without ref'ing the cancel_error because
936 // this function is responsible to make sure that that field gets unref'ed
937 cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
938 // this op can complete without an error
939 } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
940 // already self-canceled so still give it an error
941 error = GRPC_ERROR_REF(s->cancel_self_error);
942 } else {
943 INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
944 s->t->is_client ? "client" : "server",
945 op->send_initial_metadata ? " send_initial_metadata" : "",
946 op->send_message ? " send_message" : "",
947 op->send_trailing_metadata ? " send_trailing_metadata" : "",
948 op->recv_initial_metadata ? " recv_initial_metadata" : "",
949 op->recv_message ? " recv_message" : "",
950 op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
951 }
952
953 bool needs_close = false;
954
955 inproc_stream* other = s->other_side;
956 if (error == GRPC_ERROR_NONE &&
957 (op->send_initial_metadata || op->send_trailing_metadata)) {
958 if (s->t->is_closed) {
959 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
960 }
961 if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
962 grpc_metadata_batch* dest = (other == nullptr)
963 ? &s->write_buffer_initial_md
964 : &other->to_read_initial_md;
965 uint32_t* destflags = (other == nullptr)
966 ? &s->write_buffer_initial_md_flags
967 : &other->to_read_initial_md_flags;
968 bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
969 : &other->to_read_initial_md_filled;
970 if (*destfilled || s->initial_md_sent) {
971 // The buffer is already in use; that's an error!
972 INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
973 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
974 } else {
975 if (!other || !other->closed) {
976 fill_in_metadata(
977 s, op->payload->send_initial_metadata.send_initial_metadata,
978 op->payload->send_initial_metadata.send_initial_metadata_flags,
979 dest, destflags, destfilled);
980 }
981 if (s->t->is_client) {
982 grpc_millis* dl =
983 (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
984 *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
985 .send_initial_metadata->deadline);
986 s->initial_md_sent = true;
987 }
988 }
989 maybe_schedule_op_closure_locked(other, error);
990 }
991 }
992
993 if (error == GRPC_ERROR_NONE &&
994 (op->send_message || op->send_trailing_metadata ||
995 op->recv_initial_metadata || op->recv_message ||
996 op->recv_trailing_metadata)) {
997 // Mark ops that need to be processed by the closure
998 if (op->send_message) {
999 s->send_message_op = op;
1000 }
1001 if (op->send_trailing_metadata) {
1002 s->send_trailing_md_op = op;
1003 }
1004 if (op->recv_initial_metadata) {
1005 s->recv_initial_md_op = op;
1006 }
1007 if (op->recv_message) {
1008 s->recv_message_op = op;
1009 }
1010 if (op->recv_trailing_metadata) {
1011 s->recv_trailing_md_op = op;
1012 }
1013
1014 // We want to initiate the closure if:
1015 // 1. We want to send a message and the other side wants to receive or end
1016 // 2. We want to send trailing metadata and there isn't an unmatched send
1017 // 3. We want initial metadata and the other side has sent it
1018 // 4. We want to receive a message and there is a message ready
1019 // 5. There is trailing metadata, even if nothing specifically wants
1020 // that because that can shut down the receive message as well
1021 if ((op->send_message && other &&
1022 ((other->recv_message_op != nullptr) ||
1023 (other->recv_trailing_md_op != nullptr))) ||
1024 (op->send_trailing_metadata && !op->send_message) ||
1025 (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1026 (op->recv_message && other && (other->send_message_op != nullptr)) ||
1027 (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1028 if (!s->op_closure_scheduled) {
1029 GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
1030 s->op_closure_scheduled = true;
1031 }
1032 } else {
1033 s->ops_needed = true;
1034 }
1035 } else {
1036 if (error != GRPC_ERROR_NONE) {
1037 // Schedule op's closures that we didn't push to op state machine
1038 if (op->recv_initial_metadata) {
1039 if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1040 nullptr) {
1041 // Set to true unconditionally, because we're failing the call, so
1042 // even if we haven't actually seen the send_trailing_metadata op
1043 // from the other side, we're going to return trailing metadata
1044 // anyway.
1045 *op->payload->recv_initial_metadata.trailing_metadata_available =
1046 true;
1047 }
1048 INPROC_LOG(
1049 GPR_INFO,
1050 "perform_stream_op error %p scheduling initial-metadata-ready %p",
1051 s, error);
1052 GRPC_CLOSURE_SCHED(
1053 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1054 GRPC_ERROR_REF(error));
1055 }
1056 if (op->recv_message) {
1057 INPROC_LOG(
1058 GPR_INFO,
1059 "perform_stream_op error %p scheduling recv message-ready %p", s,
1060 error);
1061 GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1062 GRPC_ERROR_REF(error));
1063 }
1064 if (op->recv_trailing_metadata) {
1065 INPROC_LOG(
1066 GPR_INFO,
1067 "perform_stream_op error %p scheduling trailing-metadata-ready %p",
1068 s, error);
1069 GRPC_CLOSURE_SCHED(
1070 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1071 GRPC_ERROR_REF(error));
1072 }
1073 }
1074 INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
1075 error);
1076 GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
1077 }
1078 if (needs_close) {
1079 close_other_side_locked(s, "perform_stream_op:other_side");
1080 close_stream_locked(s);
1081 }
1082 gpr_mu_unlock(mu);
1083 GRPC_ERROR_UNREF(error);
1084 }
1085
close_transport_locked(inproc_transport * t)1086 static void close_transport_locked(inproc_transport* t) {
1087 INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1088 grpc_connectivity_state_set(
1089 &t->connectivity, GRPC_CHANNEL_SHUTDOWN,
1090 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."),
1091 "close transport");
1092 if (!t->is_closed) {
1093 t->is_closed = true;
1094 /* Also end all streams on this transport */
1095 while (t->stream_list != nullptr) {
1096 // cancel_stream_locked also adjusts stream list
1097 cancel_stream_locked(
1098 t->stream_list,
1099 grpc_error_set_int(
1100 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1101 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1102 }
1103 }
1104 }
1105
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1106 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1107 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1108 INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1109 gpr_mu_lock(&t->mu->mu);
1110 if (op->on_connectivity_state_change) {
1111 grpc_connectivity_state_notify_on_state_change(
1112 &t->connectivity, op->connectivity_state,
1113 op->on_connectivity_state_change);
1114 }
1115 if (op->set_accept_stream) {
1116 t->accept_stream_cb = op->set_accept_stream_fn;
1117 t->accept_stream_data = op->set_accept_stream_user_data;
1118 }
1119 if (op->on_consumed) {
1120 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1121 }
1122
1123 bool do_close = false;
1124 if (op->goaway_error != GRPC_ERROR_NONE) {
1125 do_close = true;
1126 GRPC_ERROR_UNREF(op->goaway_error);
1127 }
1128 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1129 do_close = true;
1130 GRPC_ERROR_UNREF(op->disconnect_with_error);
1131 }
1132
1133 if (do_close) {
1134 close_transport_locked(t);
1135 }
1136 gpr_mu_unlock(&t->mu->mu);
1137 }
1138
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1139 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1140 grpc_closure* then_schedule_closure) {
1141 INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1142 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1143 s->closure_at_destroy = then_schedule_closure;
1144 really_destroy_stream(s);
1145 }
1146
destroy_transport(grpc_transport * gt)1147 static void destroy_transport(grpc_transport* gt) {
1148 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1149 INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1150 gpr_mu_lock(&t->mu->mu);
1151 close_transport_locked(t);
1152 gpr_mu_unlock(&t->mu->mu);
1153 unref_transport(t->other_side);
1154 unref_transport(t);
1155 }
1156
1157 /*******************************************************************************
1158 * INTEGRATION GLUE
1159 */
1160
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1161 static void set_pollset(grpc_transport* gt, grpc_stream* gs,
1162 grpc_pollset* pollset) {
1163 // Nothing to do here
1164 }
1165
set_pollset_set(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1166 static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
1167 grpc_pollset_set* pollset_set) {
1168 // Nothing to do here
1169 }
1170
get_endpoint(grpc_transport * t)1171 static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
1172
1173 /*******************************************************************************
1174 * GLOBAL INIT AND DESTROY
1175 */
grpc_inproc_transport_init(void)1176 void grpc_inproc_transport_init(void) {
1177 grpc_core::ExecCtx exec_ctx;
1178 g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
1179
1180 grpc_slice key_tmp = grpc_slice_from_static_string(":path");
1181 g_fake_path_key = grpc_slice_intern(key_tmp);
1182 grpc_slice_unref_internal(key_tmp);
1183
1184 g_fake_path_value = grpc_slice_from_static_string("/");
1185
1186 grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
1187 g_fake_auth_key = grpc_slice_intern(auth_tmp);
1188 grpc_slice_unref_internal(auth_tmp);
1189
1190 g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
1191 }
1192
1193 static const grpc_transport_vtable inproc_vtable = {
1194 sizeof(inproc_stream), "inproc", init_stream,
1195 set_pollset, set_pollset_set, perform_stream_op,
1196 perform_transport_op, destroy_stream, destroy_transport,
1197 get_endpoint};
1198
1199 /*******************************************************************************
1200 * Main inproc transport functions
1201 */
inproc_transports_create(grpc_transport ** server_transport,const grpc_channel_args * server_args,grpc_transport ** client_transport,const grpc_channel_args * client_args)1202 static void inproc_transports_create(grpc_transport** server_transport,
1203 const grpc_channel_args* server_args,
1204 grpc_transport** client_transport,
1205 const grpc_channel_args* client_args) {
1206 INPROC_LOG(GPR_INFO, "inproc_transports_create");
1207 inproc_transport* st =
1208 static_cast<inproc_transport*>(gpr_zalloc(sizeof(*st)));
1209 inproc_transport* ct =
1210 static_cast<inproc_transport*>(gpr_zalloc(sizeof(*ct)));
1211 // Share one lock between both sides since both sides get affected
1212 st->mu = ct->mu = static_cast<shared_mu*>(gpr_malloc(sizeof(*st->mu)));
1213 gpr_mu_init(&st->mu->mu);
1214 gpr_ref_init(&st->mu->refs, 2);
1215 st->base.vtable = &inproc_vtable;
1216 ct->base.vtable = &inproc_vtable;
1217 // Start each side of transport with 2 refs since they each have a ref
1218 // to the other
1219 gpr_ref_init(&st->refs, 2);
1220 gpr_ref_init(&ct->refs, 2);
1221 st->is_client = false;
1222 ct->is_client = true;
1223 grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY,
1224 "inproc_server");
1225 grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY,
1226 "inproc_client");
1227 st->other_side = ct;
1228 ct->other_side = st;
1229 st->stream_list = nullptr;
1230 ct->stream_list = nullptr;
1231 *server_transport = reinterpret_cast<grpc_transport*>(st);
1232 *client_transport = reinterpret_cast<grpc_transport*>(ct);
1233 }
1234
grpc_inproc_channel_create(grpc_server * server,grpc_channel_args * args,void * reserved)1235 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1236 grpc_channel_args* args,
1237 void* reserved) {
1238 GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1239 (server, args));
1240
1241 grpc_core::ExecCtx exec_ctx;
1242
1243 const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
1244
1245 // Add a default authority channel argument for the client
1246
1247 grpc_arg default_authority_arg;
1248 default_authority_arg.type = GRPC_ARG_STRING;
1249 default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY;
1250 default_authority_arg.value.string = (char*)"inproc.authority";
1251 grpc_channel_args* client_args =
1252 grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1253
1254 grpc_transport* server_transport;
1255 grpc_transport* client_transport;
1256 inproc_transports_create(&server_transport, server_args, &client_transport,
1257 client_args);
1258
1259 grpc_server_setup_transport(server, server_transport, nullptr, server_args);
1260 grpc_channel* channel = grpc_channel_create(
1261 "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1262
1263 // Free up created channel args
1264 grpc_channel_args_destroy(client_args);
1265
1266 // Now finish scheduled operations
1267
1268 return channel;
1269 }
1270
grpc_inproc_transport_shutdown(void)1271 void grpc_inproc_transport_shutdown(void) {
1272 grpc_core::ExecCtx exec_ctx;
1273 grpc_slice_unref_internal(g_empty_slice);
1274 grpc_slice_unref_internal(g_fake_path_key);
1275 grpc_slice_unref_internal(g_fake_path_value);
1276 grpc_slice_unref_internal(g_fake_auth_key);
1277 grpc_slice_unref_internal(g_fake_auth_value);
1278 }
1279