1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
25 #include <string.h>
26 #include "src/core/ext/transport/inproc/inproc_transport.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/channel.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/server.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/lib/transport/error_utils.h"
36 #include "src/core/lib/transport/transport_impl.h"
37
38 #define INPROC_LOG(...) \
39 do { \
40 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
41 gpr_log(__VA_ARGS__); \
42 } \
43 } while (0)
44
45 namespace {
46 grpc_slice g_empty_slice;
47 grpc_slice g_fake_path_key;
48 grpc_slice g_fake_path_value;
49 grpc_slice g_fake_auth_key;
50 grpc_slice g_fake_auth_value;
51
52 struct inproc_stream;
53 bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
54 void maybe_process_ops_locked(inproc_stream* s, grpc_error* error);
55 void op_state_machine_locked(inproc_stream* s, grpc_error* error);
56 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
57 bool is_initial);
58 grpc_error* fill_in_metadata(inproc_stream* s,
59 const grpc_metadata_batch* metadata,
60 uint32_t flags, grpc_metadata_batch* out_md,
61 uint32_t* outflags, bool* markfilled);
62
63 struct shared_mu {
shared_mu__anone0c216ed0111::shared_mu64 shared_mu() {
65 // Share one lock between both sides since both sides get affected
66 gpr_mu_init(&mu);
67 gpr_ref_init(&refs, 2);
68 }
69
~shared_mu__anone0c216ed0111::shared_mu70 ~shared_mu() { gpr_mu_destroy(&mu); }
71
72 gpr_mu mu;
73 gpr_refcount refs;
74 };
75
76 struct inproc_transport {
inproc_transport__anone0c216ed0111::inproc_transport77 inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
78 bool is_client)
79 : mu(mu),
80 is_client(is_client),
81 state_tracker(is_client ? "inproc_client" : "inproc_server",
82 GRPC_CHANNEL_READY) {
83 base.vtable = vtable;
84 // Start each side of transport with 2 refs since they each have a ref
85 // to the other
86 gpr_ref_init(&refs, 2);
87 }
88
~inproc_transport__anone0c216ed0111::inproc_transport89 ~inproc_transport() {
90 if (gpr_unref(&mu->refs)) {
91 mu->~shared_mu();
92 gpr_free(mu);
93 }
94 }
95
ref__anone0c216ed0111::inproc_transport96 void ref() {
97 INPROC_LOG(GPR_INFO, "ref_transport %p", this);
98 gpr_ref(&refs);
99 }
100
unref__anone0c216ed0111::inproc_transport101 void unref() {
102 INPROC_LOG(GPR_INFO, "unref_transport %p", this);
103 if (!gpr_unref(&refs)) {
104 return;
105 }
106 INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
107 this->~inproc_transport();
108 gpr_free(this);
109 }
110
111 grpc_transport base;
112 shared_mu* mu;
113 gpr_refcount refs;
114 bool is_client;
115 grpc_core::ConnectivityStateTracker state_tracker;
116 void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
117 const void* server_data);
118 void* accept_stream_data;
119 bool is_closed = false;
120 struct inproc_transport* other_side;
121 struct inproc_stream* stream_list = nullptr;
122 };
123
124 struct inproc_stream {
inproc_stream__anone0c216ed0111::inproc_stream125 inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
126 const void* server_data, grpc_core::Arena* arena)
127 : t(t), refs(refcount), arena(arena) {
128 // Ref this stream right now for ctor and list.
129 ref("inproc_init_stream:init");
130 ref("inproc_init_stream:list");
131
132 grpc_metadata_batch_init(&to_read_initial_md);
133 grpc_metadata_batch_init(&to_read_trailing_md);
134 grpc_metadata_batch_init(&write_buffer_initial_md);
135 grpc_metadata_batch_init(&write_buffer_trailing_md);
136
137 stream_list_prev = nullptr;
138 gpr_mu_lock(&t->mu->mu);
139 stream_list_next = t->stream_list;
140 if (t->stream_list) {
141 t->stream_list->stream_list_prev = this;
142 }
143 t->stream_list = this;
144 gpr_mu_unlock(&t->mu->mu);
145
146 if (!server_data) {
147 t->ref();
148 inproc_transport* st = t->other_side;
149 st->ref();
150 other_side = nullptr; // will get filled in soon
151 // Pass the client-side stream address to the server-side for a ref
152 ref("inproc_init_stream:clt"); // ref it now on behalf of server
153 // side to avoid destruction
154 INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
155 st->accept_stream_cb, st->accept_stream_data);
156 (*st->accept_stream_cb)(st->accept_stream_data, &st->base, this);
157 } else {
158 // This is the server-side and is being called through accept_stream_cb
159 inproc_stream* cs = const_cast<inproc_stream*>(
160 static_cast<const inproc_stream*>(server_data));
161 other_side = cs;
162 // Ref the server-side stream on behalf of the client now
163 ref("inproc_init_stream:srv");
164
165 // Now we are about to affect the other side, so lock the transport
166 // to make sure that it doesn't get destroyed
167 gpr_mu_lock(&t->mu->mu);
168 cs->other_side = this;
169 // Now transfer from the other side's write_buffer if any to the to_read
170 // buffer
171 if (cs->write_buffer_initial_md_filled) {
172 fill_in_metadata(this, &cs->write_buffer_initial_md,
173 cs->write_buffer_initial_md_flags, &to_read_initial_md,
174 &to_read_initial_md_flags, &to_read_initial_md_filled);
175 deadline = GPR_MIN(deadline, cs->write_buffer_deadline);
176 grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
177 cs->write_buffer_initial_md_filled = false;
178 }
179 if (cs->write_buffer_trailing_md_filled) {
180 fill_in_metadata(this, &cs->write_buffer_trailing_md, 0,
181 &to_read_trailing_md, nullptr,
182 &to_read_trailing_md_filled);
183 grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
184 cs->write_buffer_trailing_md_filled = false;
185 }
186 if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
187 cancel_other_error = cs->write_buffer_cancel_error;
188 cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
189 maybe_process_ops_locked(this, cancel_other_error);
190 }
191
192 gpr_mu_unlock(&t->mu->mu);
193 }
194 }
195
~inproc_stream__anone0c216ed0111::inproc_stream196 ~inproc_stream() {
197 GRPC_ERROR_UNREF(write_buffer_cancel_error);
198 GRPC_ERROR_UNREF(cancel_self_error);
199 GRPC_ERROR_UNREF(cancel_other_error);
200
201 if (recv_inited) {
202 grpc_slice_buffer_destroy_internal(&recv_message);
203 }
204
205 t->unref();
206 }
207
208 #ifndef NDEBUG
209 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
210 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
211 #else
212 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
213 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
214 #endif
ref__anone0c216ed0111::inproc_stream215 void ref(const char* reason) {
216 INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
217 STREAM_REF(refs, reason);
218 }
219
unref__anone0c216ed0111::inproc_stream220 void unref(const char* reason) {
221 INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
222 STREAM_UNREF(refs, reason);
223 }
224 #undef STREAM_REF
225 #undef STREAM_UNREF
226
227 inproc_transport* t;
228 grpc_metadata_batch to_read_initial_md;
229 uint32_t to_read_initial_md_flags = 0;
230 bool to_read_initial_md_filled = false;
231 grpc_metadata_batch to_read_trailing_md;
232 bool to_read_trailing_md_filled = false;
233 bool ops_needed = false;
234 // Write buffer used only during gap at init time when client-side
235 // stream is set up but server side stream is not yet set up
236 grpc_metadata_batch write_buffer_initial_md;
237 bool write_buffer_initial_md_filled = false;
238 uint32_t write_buffer_initial_md_flags = 0;
239 grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
240 grpc_metadata_batch write_buffer_trailing_md;
241 bool write_buffer_trailing_md_filled = false;
242 grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE;
243
244 struct inproc_stream* other_side;
245 bool other_side_closed = false; // won't talk anymore
246 bool write_buffer_other_side_closed = false; // on hold
247 grpc_stream_refcount* refs;
248
249 grpc_core::Arena* arena;
250
251 grpc_transport_stream_op_batch* send_message_op = nullptr;
252 grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
253 grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
254 grpc_transport_stream_op_batch* recv_message_op = nullptr;
255 grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
256
257 grpc_slice_buffer recv_message;
258 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
259 bool recv_inited = false;
260
261 bool initial_md_sent = false;
262 bool trailing_md_sent = false;
263 bool initial_md_recvd = false;
264 bool trailing_md_recvd = false;
265 // The following tracks if the server-side only pretends to have received
266 // trailing metadata since it no longer cares about the RPC. If that is the
267 // case, it is still ok for the client to send trailing metadata (in which
268 // case it will be ignored).
269 bool trailing_md_recvd_implicit_only = false;
270
271 bool closed = false;
272
273 grpc_error* cancel_self_error = GRPC_ERROR_NONE;
274 grpc_error* cancel_other_error = GRPC_ERROR_NONE;
275
276 grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
277
278 bool listed = true;
279 struct inproc_stream* stream_list_prev;
280 struct inproc_stream* stream_list_next;
281 };
282
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)283 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
284 bool is_initial) {
285 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
286 md = md->next) {
287 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
288 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
289 gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
290 is_client ? "CLI" : "SVR", key, value);
291 gpr_free(key);
292 gpr_free(value);
293 }
294 }
295
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,uint32_t flags,grpc_metadata_batch * out_md,uint32_t * outflags,bool * markfilled)296 grpc_error* fill_in_metadata(inproc_stream* s,
297 const grpc_metadata_batch* metadata,
298 uint32_t flags, grpc_metadata_batch* out_md,
299 uint32_t* outflags, bool* markfilled) {
300 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
301 log_metadata(metadata, s->t->is_client, outflags != nullptr);
302 }
303
304 if (outflags != nullptr) {
305 *outflags = flags;
306 }
307 if (markfilled != nullptr) {
308 *markfilled = true;
309 }
310 grpc_error* error = GRPC_ERROR_NONE;
311 for (grpc_linked_mdelem* elem = metadata->list.head;
312 (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) {
313 grpc_linked_mdelem* nelem =
314 static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*nelem)));
315 nelem->md =
316 grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)),
317 grpc_slice_intern(GRPC_MDVALUE(elem->md)));
318
319 error = grpc_metadata_batch_link_tail(out_md, nelem);
320 }
321 return error;
322 }
323
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)324 int init_stream(grpc_transport* gt, grpc_stream* gs,
325 grpc_stream_refcount* refcount, const void* server_data,
326 grpc_core::Arena* arena) {
327 INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
328 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
329 new (gs) inproc_stream(t, refcount, server_data, arena);
330 return 0; // return value is not important
331 }
332
close_stream_locked(inproc_stream * s)333 void close_stream_locked(inproc_stream* s) {
334 if (!s->closed) {
335 // Release the metadata that we would have written out
336 grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
337 grpc_metadata_batch_destroy(&s->write_buffer_trailing_md);
338
339 if (s->listed) {
340 inproc_stream* p = s->stream_list_prev;
341 inproc_stream* n = s->stream_list_next;
342 if (p != nullptr) {
343 p->stream_list_next = n;
344 } else {
345 s->t->stream_list = n;
346 }
347 if (n != nullptr) {
348 n->stream_list_prev = p;
349 }
350 s->listed = false;
351 s->unref("close_stream:list");
352 }
353 s->closed = true;
354 s->unref("close_stream:closing");
355 }
356 }
357
358 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)359 void close_other_side_locked(inproc_stream* s, const char* reason) {
360 if (s->other_side != nullptr) {
361 // First release the metadata that came from the other side's arena
362 grpc_metadata_batch_destroy(&s->to_read_initial_md);
363 grpc_metadata_batch_destroy(&s->to_read_trailing_md);
364
365 s->other_side->unref(reason);
366 s->other_side_closed = true;
367 s->other_side = nullptr;
368 } else if (!s->other_side_closed) {
369 s->write_buffer_other_side_closed = true;
370 }
371 }
372
373 // Call the on_complete closure associated with this stream_op_batch if
374 // this stream_op_batch is only one of the pending operations for this
375 // stream. This is called when one of the pending operations for the stream
376 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error * error,grpc_transport_stream_op_batch * op,const char * msg)377 void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
378 grpc_transport_stream_op_batch* op,
379 const char* msg) {
380 int is_sm = static_cast<int>(op == s->send_message_op);
381 int is_stm = static_cast<int>(op == s->send_trailing_md_op);
382 // TODO(vjpai): We should not consider the recv ops here, since they
383 // have their own callbacks. We should invoke a batch's on_complete
384 // as soon as all of the batch's send ops are complete, even if there
385 // are still recv ops pending.
386 int is_rim = static_cast<int>(op == s->recv_initial_md_op);
387 int is_rm = static_cast<int>(op == s->recv_message_op);
388 int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
389
390 if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
391 INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
392 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
393 GRPC_ERROR_REF(error));
394 }
395 }
396
maybe_process_ops_locked(inproc_stream * s,grpc_error * error)397 void maybe_process_ops_locked(inproc_stream* s, grpc_error* error) {
398 if (s && (error != GRPC_ERROR_NONE || s->ops_needed)) {
399 s->ops_needed = false;
400 op_state_machine_locked(s, error);
401 }
402 }
403
fail_helper_locked(inproc_stream * s,grpc_error * error)404 void fail_helper_locked(inproc_stream* s, grpc_error* error) {
405 INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
406 // If we're failing this side, we need to make sure that
407 // we also send or have already sent trailing metadata
408 if (!s->trailing_md_sent) {
409 // Send trailing md to the other side indicating cancellation
410 s->trailing_md_sent = true;
411
412 grpc_metadata_batch fake_md;
413 grpc_metadata_batch_init(&fake_md);
414
415 inproc_stream* other = s->other_side;
416 grpc_metadata_batch* dest = (other == nullptr)
417 ? &s->write_buffer_trailing_md
418 : &other->to_read_trailing_md;
419 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
420 : &other->to_read_trailing_md_filled;
421 fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
422 grpc_metadata_batch_destroy(&fake_md);
423
424 if (other != nullptr) {
425 if (other->cancel_other_error == GRPC_ERROR_NONE) {
426 other->cancel_other_error = GRPC_ERROR_REF(error);
427 }
428 maybe_process_ops_locked(other, error);
429 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
430 s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
431 }
432 }
433 if (s->recv_initial_md_op) {
434 grpc_error* err;
435 if (!s->t->is_client) {
436 // If this is a server, provide initial metadata with a path and authority
437 // since it expects that as well as no error yet
438 grpc_metadata_batch fake_md;
439 grpc_metadata_batch_init(&fake_md);
440 grpc_linked_mdelem* path_md =
441 static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*path_md)));
442 path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value);
443 GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) ==
444 GRPC_ERROR_NONE);
445 grpc_linked_mdelem* auth_md =
446 static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*auth_md)));
447 auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value);
448 GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) ==
449 GRPC_ERROR_NONE);
450
451 fill_in_metadata(
452 s, &fake_md, 0,
453 s->recv_initial_md_op->payload->recv_initial_metadata
454 .recv_initial_metadata,
455 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
456 nullptr);
457 grpc_metadata_batch_destroy(&fake_md);
458 err = GRPC_ERROR_NONE;
459 } else {
460 err = GRPC_ERROR_REF(error);
461 }
462 if (s->recv_initial_md_op->payload->recv_initial_metadata
463 .trailing_metadata_available != nullptr) {
464 // Set to true unconditionally, because we're failing the call, so even
465 // if we haven't actually seen the send_trailing_metadata op from the
466 // other side, we're going to return trailing metadata anyway.
467 *s->recv_initial_md_op->payload->recv_initial_metadata
468 .trailing_metadata_available = true;
469 }
470 INPROC_LOG(GPR_INFO,
471 "fail_helper %p scheduling initial-metadata-ready %p %p", s,
472 error, err);
473 grpc_core::ExecCtx::Run(
474 DEBUG_LOCATION,
475 s->recv_initial_md_op->payload->recv_initial_metadata
476 .recv_initial_metadata_ready,
477 err);
478 // Last use of err so no need to REF and then UNREF it
479
480 complete_if_batch_end_locked(
481 s, error, s->recv_initial_md_op,
482 "fail_helper scheduling recv-initial-metadata-on-complete");
483 s->recv_initial_md_op = nullptr;
484 }
485 if (s->recv_message_op) {
486 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
487 error);
488 grpc_core::ExecCtx::Run(
489 DEBUG_LOCATION,
490 s->recv_message_op->payload->recv_message.recv_message_ready,
491 GRPC_ERROR_REF(error));
492 complete_if_batch_end_locked(
493 s, error, s->recv_message_op,
494 "fail_helper scheduling recv-message-on-complete");
495 s->recv_message_op = nullptr;
496 }
497 if (s->send_message_op) {
498 s->send_message_op->payload->send_message.send_message.reset();
499 complete_if_batch_end_locked(
500 s, error, s->send_message_op,
501 "fail_helper scheduling send-message-on-complete");
502 s->send_message_op = nullptr;
503 }
504 if (s->send_trailing_md_op) {
505 complete_if_batch_end_locked(
506 s, error, s->send_trailing_md_op,
507 "fail_helper scheduling send-trailng-md-on-complete");
508 s->send_trailing_md_op = nullptr;
509 }
510 if (s->recv_trailing_md_op) {
511 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
512 s, error);
513 grpc_core::ExecCtx::Run(
514 DEBUG_LOCATION,
515 s->recv_trailing_md_op->payload->recv_trailing_metadata
516 .recv_trailing_metadata_ready,
517 GRPC_ERROR_REF(error));
518 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
519 s, error);
520 complete_if_batch_end_locked(
521 s, error, s->recv_trailing_md_op,
522 "fail_helper scheduling recv-trailing-metadata-on-complete");
523 s->recv_trailing_md_op = nullptr;
524 }
525 close_other_side_locked(s, "fail_helper:other_side");
526 close_stream_locked(s);
527
528 GRPC_ERROR_UNREF(error);
529 }
530
531 // TODO(vjpai): It should not be necessary to drain the incoming byte
532 // stream and create a new one; instead, we should simply pass the byte
533 // stream from the sender directly to the receiver as-is.
534 //
535 // Note that fixing this will also avoid the assumption in this code
536 // that the incoming byte stream's next() call will always return
537 // synchronously. That assumption is true today but may not always be
538 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)539 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
540 size_t remaining =
541 sender->send_message_op->payload->send_message.send_message->length();
542 if (receiver->recv_inited) {
543 grpc_slice_buffer_destroy_internal(&receiver->recv_message);
544 }
545 grpc_slice_buffer_init(&receiver->recv_message);
546 receiver->recv_inited = true;
547 do {
548 grpc_slice message_slice;
549 grpc_closure unused;
550 GPR_ASSERT(
551 sender->send_message_op->payload->send_message.send_message->Next(
552 SIZE_MAX, &unused));
553 grpc_error* error =
554 sender->send_message_op->payload->send_message.send_message->Pull(
555 &message_slice);
556 if (error != GRPC_ERROR_NONE) {
557 cancel_stream_locked(sender, GRPC_ERROR_REF(error));
558 break;
559 }
560 GPR_ASSERT(error == GRPC_ERROR_NONE);
561 remaining -= GRPC_SLICE_LENGTH(message_slice);
562 grpc_slice_buffer_add(&receiver->recv_message, message_slice);
563 } while (remaining > 0);
564 sender->send_message_op->payload->send_message.send_message.reset();
565
566 receiver->recv_stream.Init(&receiver->recv_message, 0);
567 receiver->recv_message_op->payload->recv_message.recv_message->reset(
568 receiver->recv_stream.get());
569 INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
570 receiver);
571 grpc_core::ExecCtx::Run(
572 DEBUG_LOCATION,
573 receiver->recv_message_op->payload->recv_message.recv_message_ready,
574 GRPC_ERROR_NONE);
575 complete_if_batch_end_locked(
576 sender, GRPC_ERROR_NONE, sender->send_message_op,
577 "message_transfer scheduling sender on_complete");
578 complete_if_batch_end_locked(
579 receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
580 "message_transfer scheduling receiver on_complete");
581
582 receiver->recv_message_op = nullptr;
583 sender->send_message_op = nullptr;
584 }
585
op_state_machine_locked(inproc_stream * s,grpc_error * error)586 void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
587 // This function gets called when we have contents in the unprocessed reads
588 // Get what we want based on our ops wanted
589 // Schedule our appropriate closures
590 // and then return to ops_needed state if still needed
591
592 grpc_error* new_err = GRPC_ERROR_NONE;
593
594 bool needs_close = false;
595
596 INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
597 // cancellation takes precedence
598 inproc_stream* other = s->other_side;
599
600 if (s->cancel_self_error != GRPC_ERROR_NONE) {
601 fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
602 goto done;
603 } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
604 fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
605 goto done;
606 } else if (error != GRPC_ERROR_NONE) {
607 fail_helper_locked(s, GRPC_ERROR_REF(error));
608 goto done;
609 }
610
611 if (s->send_message_op && other) {
612 if (other->recv_message_op) {
613 message_transfer_locked(s, other);
614 maybe_process_ops_locked(other, GRPC_ERROR_NONE);
615 } else if (!s->t->is_client && s->trailing_md_sent) {
616 // A server send will never be matched if the server already sent status
617 s->send_message_op->payload->send_message.send_message.reset();
618 complete_if_batch_end_locked(
619 s, GRPC_ERROR_NONE, s->send_message_op,
620 "op_state_machine scheduling send-message-on-complete case 1");
621 s->send_message_op = nullptr;
622 }
623 }
624 // Pause a send trailing metadata if there is still an outstanding
625 // send message unless we know that the send message will never get
626 // matched to a receive. This happens on the client if the server has
627 // already sent status or on the server if the client has requested
628 // status
629 if (s->send_trailing_md_op &&
630 (!s->send_message_op ||
631 (s->t->is_client &&
632 (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
633 (!s->t->is_client && other &&
634 (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
635 other->recv_trailing_md_op)))) {
636 grpc_metadata_batch* dest = (other == nullptr)
637 ? &s->write_buffer_trailing_md
638 : &other->to_read_trailing_md;
639 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
640 : &other->to_read_trailing_md_filled;
641 if (*destfilled || s->trailing_md_sent) {
642 // The buffer is already in use; that's an error!
643 INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
644 new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
645 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
646 goto done;
647 } else {
648 if (!other || !other->closed) {
649 fill_in_metadata(s,
650 s->send_trailing_md_op->payload->send_trailing_metadata
651 .send_trailing_metadata,
652 0, dest, nullptr, destfilled);
653 }
654 s->trailing_md_sent = true;
655 if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
656 *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
657 }
658 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
659 INPROC_LOG(GPR_INFO,
660 "op_state_machine %p scheduling trailing-metadata-ready", s);
661 grpc_core::ExecCtx::Run(
662 DEBUG_LOCATION,
663 s->recv_trailing_md_op->payload->recv_trailing_metadata
664 .recv_trailing_metadata_ready,
665 GRPC_ERROR_NONE);
666 INPROC_LOG(GPR_INFO,
667 "op_state_machine %p scheduling trailing-md-on-complete", s);
668 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
669 s->recv_trailing_md_op->on_complete,
670 GRPC_ERROR_NONE);
671 s->recv_trailing_md_op = nullptr;
672 needs_close = true;
673 }
674 }
675 maybe_process_ops_locked(other, GRPC_ERROR_NONE);
676 complete_if_batch_end_locked(
677 s, GRPC_ERROR_NONE, s->send_trailing_md_op,
678 "op_state_machine scheduling send-trailing-metadata-on-complete");
679 s->send_trailing_md_op = nullptr;
680 }
681 if (s->recv_initial_md_op) {
682 if (s->initial_md_recvd) {
683 new_err =
684 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
685 INPROC_LOG(
686 GPR_INFO,
687 "op_state_machine %p scheduling on_complete errors for already "
688 "recvd initial md %p",
689 s, new_err);
690 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
691 goto done;
692 }
693
694 if (s->to_read_initial_md_filled) {
695 s->initial_md_recvd = true;
696 new_err = fill_in_metadata(
697 s, &s->to_read_initial_md, s->to_read_initial_md_flags,
698 s->recv_initial_md_op->payload->recv_initial_metadata
699 .recv_initial_metadata,
700 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
701 nullptr);
702 s->recv_initial_md_op->payload->recv_initial_metadata
703 .recv_initial_metadata->deadline = s->deadline;
704 if (s->recv_initial_md_op->payload->recv_initial_metadata
705 .trailing_metadata_available != nullptr) {
706 *s->recv_initial_md_op->payload->recv_initial_metadata
707 .trailing_metadata_available =
708 (other != nullptr && other->send_trailing_md_op != nullptr);
709 }
710 grpc_metadata_batch_clear(&s->to_read_initial_md);
711 s->to_read_initial_md_filled = false;
712 INPROC_LOG(GPR_INFO,
713 "op_state_machine %p scheduling initial-metadata-ready %p", s,
714 new_err);
715 grpc_core::ExecCtx::Run(
716 DEBUG_LOCATION,
717 s->recv_initial_md_op->payload->recv_initial_metadata
718 .recv_initial_metadata_ready,
719 GRPC_ERROR_REF(new_err));
720 complete_if_batch_end_locked(
721 s, new_err, s->recv_initial_md_op,
722 "op_state_machine scheduling recv-initial-metadata-on-complete");
723 s->recv_initial_md_op = nullptr;
724
725 if (new_err != GRPC_ERROR_NONE) {
726 INPROC_LOG(GPR_INFO,
727 "op_state_machine %p scheduling on_complete errors2 %p", s,
728 new_err);
729 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
730 goto done;
731 }
732 }
733 }
734 if (s->recv_message_op) {
735 if (other && other->send_message_op) {
736 message_transfer_locked(other, s);
737 maybe_process_ops_locked(other, GRPC_ERROR_NONE);
738 }
739 }
740 if (s->to_read_trailing_md_filled) {
741 if (s->trailing_md_recvd) {
742 if (s->trailing_md_recvd_implicit_only) {
743 INPROC_LOG(GPR_INFO,
744 "op_state_machine %p already implicitly received trailing "
745 "metadata, so ignoring new trailing metadata from client",
746 s);
747 grpc_metadata_batch_clear(&s->to_read_trailing_md);
748 s->to_read_trailing_md_filled = false;
749 s->trailing_md_recvd_implicit_only = false;
750 } else {
751 new_err =
752 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
753 INPROC_LOG(
754 GPR_INFO,
755 "op_state_machine %p scheduling on_complete errors for already "
756 "recvd trailing md %p",
757 s, new_err);
758 fail_helper_locked(s, GRPC_ERROR_REF(new_err));
759 goto done;
760 }
761 }
762 if (s->recv_message_op != nullptr) {
763 // This message needs to be wrapped up because it will never be
764 // satisfied
765 *s->recv_message_op->payload->recv_message.recv_message = nullptr;
766 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
767 grpc_core::ExecCtx::Run(
768 DEBUG_LOCATION,
769 s->recv_message_op->payload->recv_message.recv_message_ready,
770 GRPC_ERROR_NONE);
771 complete_if_batch_end_locked(
772 s, new_err, s->recv_message_op,
773 "op_state_machine scheduling recv-message-on-complete");
774 s->recv_message_op = nullptr;
775 }
776 if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
777 // Nothing further will try to receive from this stream, so finish off
778 // any outstanding send_message op
779 s->send_message_op->payload->send_message.send_message.reset();
780 s->send_message_op->payload->send_message.stream_write_closed = true;
781 complete_if_batch_end_locked(
782 s, new_err, s->send_message_op,
783 "op_state_machine scheduling send-message-on-complete case 2");
784 s->send_message_op = nullptr;
785 }
786 if (s->recv_trailing_md_op != nullptr) {
787 // We wanted trailing metadata and we got it
788 s->trailing_md_recvd = true;
789 new_err =
790 fill_in_metadata(s, &s->to_read_trailing_md, 0,
791 s->recv_trailing_md_op->payload
792 ->recv_trailing_metadata.recv_trailing_metadata,
793 nullptr, nullptr);
794 grpc_metadata_batch_clear(&s->to_read_trailing_md);
795 s->to_read_trailing_md_filled = false;
796
797 // We should schedule the recv_trailing_md_op completion if
798 // 1. this stream is the client-side
799 // 2. this stream is the server-side AND has already sent its trailing md
800 // (If the server hasn't already sent its trailing md, it doesn't have
801 // a final status, so don't mark this op complete)
802 if (s->t->is_client || s->trailing_md_sent) {
803 INPROC_LOG(GPR_INFO,
804 "op_state_machine %p scheduling trailing-md-on-complete %p",
805 s, new_err);
806 grpc_core::ExecCtx::Run(
807 DEBUG_LOCATION,
808 s->recv_trailing_md_op->payload->recv_trailing_metadata
809 .recv_trailing_metadata_ready,
810 GRPC_ERROR_REF(new_err));
811 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
812 s->recv_trailing_md_op->on_complete,
813 GRPC_ERROR_REF(new_err));
814 s->recv_trailing_md_op = nullptr;
815 needs_close = s->trailing_md_sent;
816 } else {
817 INPROC_LOG(GPR_INFO,
818 "op_state_machine %p server needs to delay handling "
819 "trailing-md-on-complete %p",
820 s, new_err);
821 }
822 } else if (!s->trailing_md_recvd) {
823 INPROC_LOG(
824 GPR_INFO,
825 "op_state_machine %p has trailing md but not yet waiting for it", s);
826 }
827 }
828 if (!s->t->is_client && s->trailing_md_sent &&
829 (s->recv_trailing_md_op != nullptr)) {
830 // In this case, we don't care to receive the write-close from the client
831 // because we have already sent status and the RPC is over as far as we
832 // are concerned.
833 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %p",
834 s, new_err);
835 grpc_core::ExecCtx::Run(
836 DEBUG_LOCATION,
837 s->recv_trailing_md_op->payload->recv_trailing_metadata
838 .recv_trailing_metadata_ready,
839 GRPC_ERROR_REF(new_err));
840 complete_if_batch_end_locked(
841 s, new_err, s->recv_trailing_md_op,
842 "op_state_machine scheduling recv-trailing-md-on-complete");
843 s->trailing_md_recvd = true;
844 s->recv_trailing_md_op = nullptr;
845 // Since we are only pretending to have received the trailing MD, it would
846 // be ok (not an error) if the client actually sends it later.
847 s->trailing_md_recvd_implicit_only = true;
848 }
849 if (s->trailing_md_recvd && s->recv_message_op) {
850 // No further message will come on this stream, so finish off the
851 // recv_message_op
852 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
853 *s->recv_message_op->payload->recv_message.recv_message = nullptr;
854 grpc_core::ExecCtx::Run(
855 DEBUG_LOCATION,
856 s->recv_message_op->payload->recv_message.recv_message_ready,
857 GRPC_ERROR_NONE);
858 complete_if_batch_end_locked(
859 s, new_err, s->recv_message_op,
860 "op_state_machine scheduling recv-message-on-complete");
861 s->recv_message_op = nullptr;
862 }
863 if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
864 // Nothing further will try to receive from this stream, so finish off
865 // any outstanding send_message op
866 s->send_message_op->payload->send_message.send_message.reset();
867 complete_if_batch_end_locked(
868 s, new_err, s->send_message_op,
869 "op_state_machine scheduling send-message-on-complete case 3");
870 s->send_message_op = nullptr;
871 }
872 if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
873 s->recv_message_op || s->recv_trailing_md_op) {
874 // Didn't get the item we wanted so we still need to get
875 // rescheduled
876 INPROC_LOG(
877 GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
878 s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
879 s->recv_message_op, s->recv_trailing_md_op);
880 s->ops_needed = true;
881 }
882 done:
883 if (needs_close) {
884 close_other_side_locked(s, "op_state_machine");
885 close_stream_locked(s);
886 }
887 GRPC_ERROR_UNREF(new_err);
888 }
889
cancel_stream_locked(inproc_stream * s,grpc_error * error)890 bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
891 bool ret = false; // was the cancel accepted
892 INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error));
893 if (s->cancel_self_error == GRPC_ERROR_NONE) {
894 ret = true;
895 s->cancel_self_error = GRPC_ERROR_REF(error);
896 // Catch current value of other before it gets closed off
897 inproc_stream* other = s->other_side;
898 maybe_process_ops_locked(s, s->cancel_self_error);
899 // Send trailing md to the other side indicating cancellation, even if we
900 // already have
901 s->trailing_md_sent = true;
902
903 grpc_metadata_batch cancel_md;
904 grpc_metadata_batch_init(&cancel_md);
905
906 grpc_metadata_batch* dest = (other == nullptr)
907 ? &s->write_buffer_trailing_md
908 : &other->to_read_trailing_md;
909 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
910 : &other->to_read_trailing_md_filled;
911 fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
912 grpc_metadata_batch_destroy(&cancel_md);
913
914 if (other != nullptr) {
915 if (other->cancel_other_error == GRPC_ERROR_NONE) {
916 other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
917 }
918 maybe_process_ops_locked(other, other->cancel_other_error);
919 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
920 s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
921 }
922
923 // if we are a server and already received trailing md but
924 // couldn't complete that because we hadn't yet sent out trailing
925 // md, now's the chance
926 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
927 grpc_core::ExecCtx::Run(
928 DEBUG_LOCATION,
929 s->recv_trailing_md_op->payload->recv_trailing_metadata
930 .recv_trailing_metadata_ready,
931 GRPC_ERROR_REF(s->cancel_self_error));
932 complete_if_batch_end_locked(
933 s, s->cancel_self_error, s->recv_trailing_md_op,
934 "cancel_stream scheduling trailing-md-on-complete");
935 s->recv_trailing_md_op = nullptr;
936 }
937 }
938
939 close_other_side_locked(s, "cancel_stream:other_side");
940 close_stream_locked(s);
941
942 GRPC_ERROR_UNREF(error);
943 return ret;
944 }
945
do_nothing(void *,grpc_error *)946 void do_nothing(void* /*arg*/, grpc_error* /*error*/) {}
947
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)948 void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
949 grpc_transport_stream_op_batch* op) {
950 INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
951 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
952 gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
953 gpr_mu_lock(mu);
954
955 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
956 if (op->send_initial_metadata) {
957 log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
958 s->t->is_client, true);
959 }
960 if (op->send_trailing_metadata) {
961 log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
962 s->t->is_client, false);
963 }
964 }
965 grpc_error* error = GRPC_ERROR_NONE;
966 grpc_closure* on_complete = op->on_complete;
967 // TODO(roth): This is a hack needed because we use data inside of the
968 // closure itself to do the barrier calculation (i.e., to ensure that
969 // we don't schedule the closure until all ops in the batch have been
970 // completed). This can go away once we move to a new C++ closure API
971 // that provides the ability to create a barrier closure.
972 if (on_complete == nullptr) {
973 on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
974 nullptr, grpc_schedule_on_exec_ctx);
975 }
976
977 if (op->cancel_stream) {
978 // Call cancel_stream_locked without ref'ing the cancel_error because
979 // this function is responsible to make sure that that field gets unref'ed
980 cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
981 // this op can complete without an error
982 } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
983 // already self-canceled so still give it an error
984 error = GRPC_ERROR_REF(s->cancel_self_error);
985 } else {
986 INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
987 s->t->is_client ? "client" : "server",
988 op->send_initial_metadata ? " send_initial_metadata" : "",
989 op->send_message ? " send_message" : "",
990 op->send_trailing_metadata ? " send_trailing_metadata" : "",
991 op->recv_initial_metadata ? " recv_initial_metadata" : "",
992 op->recv_message ? " recv_message" : "",
993 op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
994 }
995
996 inproc_stream* other = s->other_side;
997 if (error == GRPC_ERROR_NONE &&
998 (op->send_initial_metadata || op->send_trailing_metadata)) {
999 if (s->t->is_closed) {
1000 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
1001 }
1002 if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
1003 grpc_metadata_batch* dest = (other == nullptr)
1004 ? &s->write_buffer_initial_md
1005 : &other->to_read_initial_md;
1006 uint32_t* destflags = (other == nullptr)
1007 ? &s->write_buffer_initial_md_flags
1008 : &other->to_read_initial_md_flags;
1009 bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
1010 : &other->to_read_initial_md_filled;
1011 if (*destfilled || s->initial_md_sent) {
1012 // The buffer is already in use; that's an error!
1013 INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
1014 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
1015 } else {
1016 if (!s->other_side_closed) {
1017 fill_in_metadata(
1018 s, op->payload->send_initial_metadata.send_initial_metadata,
1019 op->payload->send_initial_metadata.send_initial_metadata_flags,
1020 dest, destflags, destfilled);
1021 }
1022 if (s->t->is_client) {
1023 grpc_millis* dl =
1024 (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
1025 *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
1026 .send_initial_metadata->deadline);
1027 s->initial_md_sent = true;
1028 }
1029 }
1030 maybe_process_ops_locked(other, error);
1031 }
1032 }
1033
1034 if (error == GRPC_ERROR_NONE &&
1035 (op->send_message || op->send_trailing_metadata ||
1036 op->recv_initial_metadata || op->recv_message ||
1037 op->recv_trailing_metadata)) {
1038 // Mark ops that need to be processed by the state machine
1039 if (op->send_message) {
1040 s->send_message_op = op;
1041 }
1042 if (op->send_trailing_metadata) {
1043 s->send_trailing_md_op = op;
1044 }
1045 if (op->recv_initial_metadata) {
1046 s->recv_initial_md_op = op;
1047 }
1048 if (op->recv_message) {
1049 s->recv_message_op = op;
1050 }
1051 if (op->recv_trailing_metadata) {
1052 s->recv_trailing_md_op = op;
1053 }
1054
1055 // We want to initiate the state machine if:
1056 // 1. We want to send a message and the other side wants to receive
1057 // 2. We want to send trailing metadata and there isn't an unmatched send
1058 // or the other side wants trailing metadata
1059 // 3. We want initial metadata and the other side has sent it
1060 // 4. We want to receive a message and there is a message ready
1061 // 5. There is trailing metadata, even if nothing specifically wants
1062 // that because that can shut down the receive message as well
1063 if ((op->send_message && other && other->recv_message_op != nullptr) ||
1064 (op->send_trailing_metadata &&
1065 (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1066 (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1067 (op->recv_message && other && other->send_message_op != nullptr) ||
1068 (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1069 op_state_machine_locked(s, error);
1070 } else {
1071 s->ops_needed = true;
1072 }
1073 } else {
1074 if (error != GRPC_ERROR_NONE) {
1075 // Consume any send message that was sent here but that we are not pushing
1076 // to the other side
1077 if (op->send_message) {
1078 op->payload->send_message.send_message.reset();
1079 }
1080 // Schedule op's closures that we didn't push to op state machine
1081 if (op->recv_initial_metadata) {
1082 if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1083 nullptr) {
1084 // Set to true unconditionally, because we're failing the call, so
1085 // even if we haven't actually seen the send_trailing_metadata op
1086 // from the other side, we're going to return trailing metadata
1087 // anyway.
1088 *op->payload->recv_initial_metadata.trailing_metadata_available =
1089 true;
1090 }
1091 INPROC_LOG(
1092 GPR_INFO,
1093 "perform_stream_op error %p scheduling initial-metadata-ready %p",
1094 s, error);
1095 grpc_core::ExecCtx::Run(
1096 DEBUG_LOCATION,
1097 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1098 GRPC_ERROR_REF(error));
1099 }
1100 if (op->recv_message) {
1101 INPROC_LOG(
1102 GPR_INFO,
1103 "perform_stream_op error %p scheduling recv message-ready %p", s,
1104 error);
1105 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1106 op->payload->recv_message.recv_message_ready,
1107 GRPC_ERROR_REF(error));
1108 }
1109 if (op->recv_trailing_metadata) {
1110 INPROC_LOG(
1111 GPR_INFO,
1112 "perform_stream_op error %p scheduling trailing-metadata-ready %p",
1113 s, error);
1114 grpc_core::ExecCtx::Run(
1115 DEBUG_LOCATION,
1116 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1117 GRPC_ERROR_REF(error));
1118 }
1119 }
1120 INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
1121 error);
1122 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
1123 }
1124 gpr_mu_unlock(mu);
1125 GRPC_ERROR_UNREF(error);
1126 }
1127
close_transport_locked(inproc_transport * t)1128 void close_transport_locked(inproc_transport* t) {
1129 INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1130 t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
1131 "close transport");
1132 if (!t->is_closed) {
1133 t->is_closed = true;
1134 /* Also end all streams on this transport */
1135 while (t->stream_list != nullptr) {
1136 // cancel_stream_locked also adjusts stream list
1137 cancel_stream_locked(
1138 t->stream_list,
1139 grpc_error_set_int(
1140 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1141 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1142 }
1143 }
1144 }
1145
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1146 void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1147 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1148 INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1149 gpr_mu_lock(&t->mu->mu);
1150 if (op->start_connectivity_watch != nullptr) {
1151 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1152 std::move(op->start_connectivity_watch));
1153 }
1154 if (op->stop_connectivity_watch != nullptr) {
1155 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1156 }
1157 if (op->set_accept_stream) {
1158 t->accept_stream_cb = op->set_accept_stream_fn;
1159 t->accept_stream_data = op->set_accept_stream_user_data;
1160 }
1161 if (op->on_consumed) {
1162 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1163 }
1164
1165 bool do_close = false;
1166 if (op->goaway_error != GRPC_ERROR_NONE) {
1167 do_close = true;
1168 GRPC_ERROR_UNREF(op->goaway_error);
1169 }
1170 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1171 do_close = true;
1172 GRPC_ERROR_UNREF(op->disconnect_with_error);
1173 }
1174
1175 if (do_close) {
1176 close_transport_locked(t);
1177 }
1178 gpr_mu_unlock(&t->mu->mu);
1179 }
1180
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1181 void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1182 grpc_closure* then_schedule_closure) {
1183 INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1184 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1185 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1186 gpr_mu_lock(&t->mu->mu);
1187 close_stream_locked(s);
1188 gpr_mu_unlock(&t->mu->mu);
1189 s->~inproc_stream();
1190 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1191 GRPC_ERROR_NONE);
1192 }
1193
destroy_transport(grpc_transport * gt)1194 void destroy_transport(grpc_transport* gt) {
1195 inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1196 INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1197 gpr_mu_lock(&t->mu->mu);
1198 close_transport_locked(t);
1199 gpr_mu_unlock(&t->mu->mu);
1200 t->other_side->unref();
1201 t->unref();
1202 }
1203
1204 /*******************************************************************************
1205 * INTEGRATION GLUE
1206 */
1207
set_pollset(grpc_transport *,grpc_stream *,grpc_pollset *)1208 void set_pollset(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1209 grpc_pollset* /*pollset*/) {
1210 // Nothing to do here
1211 }
1212
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)1213 void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1214 grpc_pollset_set* /*pollset_set*/) {
1215 // Nothing to do here
1216 }
1217
get_endpoint(grpc_transport *)1218 grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; }
1219
1220 const grpc_transport_vtable inproc_vtable = {
1221 sizeof(inproc_stream), "inproc", init_stream,
1222 set_pollset, set_pollset_set, perform_stream_op,
1223 perform_transport_op, destroy_stream, destroy_transport,
1224 get_endpoint};
1225
1226 /*******************************************************************************
1227 * Main inproc transport functions
1228 */
inproc_transports_create(grpc_transport ** server_transport,const grpc_channel_args *,grpc_transport ** client_transport,const grpc_channel_args *)1229 void inproc_transports_create(grpc_transport** server_transport,
1230 const grpc_channel_args* /*server_args*/,
1231 grpc_transport** client_transport,
1232 const grpc_channel_args* /*client_args*/) {
1233 INPROC_LOG(GPR_INFO, "inproc_transports_create");
1234 shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1235 inproc_transport* st = new (gpr_malloc(sizeof(*st)))
1236 inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
1237 inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
1238 inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
1239 st->other_side = ct;
1240 ct->other_side = st;
1241 *server_transport = reinterpret_cast<grpc_transport*>(st);
1242 *client_transport = reinterpret_cast<grpc_transport*>(ct);
1243 }
1244 } // namespace
1245
1246 /*******************************************************************************
1247 * GLOBAL INIT AND DESTROY
1248 */
grpc_inproc_transport_init(void)1249 void grpc_inproc_transport_init(void) {
1250 grpc_core::ExecCtx exec_ctx;
1251 g_empty_slice = grpc_core::ExternallyManagedSlice();
1252
1253 grpc_slice key_tmp = grpc_slice_from_static_string(":path");
1254 g_fake_path_key = grpc_slice_intern(key_tmp);
1255 grpc_slice_unref_internal(key_tmp);
1256
1257 g_fake_path_value = grpc_slice_from_static_string("/");
1258
1259 grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
1260 g_fake_auth_key = grpc_slice_intern(auth_tmp);
1261 grpc_slice_unref_internal(auth_tmp);
1262
1263 g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
1264 }
1265
grpc_inproc_channel_create(grpc_server * server,grpc_channel_args * args,void *)1266 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1267 grpc_channel_args* args,
1268 void* /*reserved*/) {
1269 GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1270 (server, args));
1271
1272 grpc_core::ExecCtx exec_ctx;
1273
1274 // Remove max_connection_idle and max_connection_age channel arguments since
1275 // those do not apply to inproc transports.
1276 const char* args_to_remove[] = {GRPC_ARG_MAX_CONNECTION_IDLE_MS,
1277 GRPC_ARG_MAX_CONNECTION_AGE_MS};
1278 const grpc_channel_args* server_args = grpc_channel_args_copy_and_remove(
1279 server->core_server->channel_args(), args_to_remove,
1280 GPR_ARRAY_SIZE(args_to_remove));
1281
1282 // Add a default authority channel argument for the client
1283 grpc_arg default_authority_arg;
1284 default_authority_arg.type = GRPC_ARG_STRING;
1285 default_authority_arg.key = const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY);
1286 default_authority_arg.value.string = const_cast<char*>("inproc.authority");
1287 grpc_channel_args* client_args =
1288 grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1289
1290 grpc_transport* server_transport;
1291 grpc_transport* client_transport;
1292 inproc_transports_create(&server_transport, server_args, &client_transport,
1293 client_args);
1294
1295 // TODO(ncteisen): design and support channelz GetSocket for inproc.
1296 grpc_error* error = server->core_server->SetupTransport(
1297 server_transport, nullptr, server_args, nullptr);
1298 grpc_channel* channel = nullptr;
1299 if (error == GRPC_ERROR_NONE) {
1300 channel =
1301 grpc_channel_create("inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL,
1302 client_transport, nullptr, &error);
1303 if (error != GRPC_ERROR_NONE) {
1304 GPR_ASSERT(!channel);
1305 gpr_log(GPR_ERROR, "Failed to create client channel: %s",
1306 grpc_error_string(error));
1307 intptr_t integer;
1308 grpc_status_code status = GRPC_STATUS_INTERNAL;
1309 if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
1310 status = static_cast<grpc_status_code>(integer);
1311 }
1312 GRPC_ERROR_UNREF(error);
1313 // client_transport was destroyed when grpc_channel_create saw an error.
1314 grpc_transport_destroy(server_transport);
1315 channel = grpc_lame_client_channel_create(
1316 nullptr, status, "Failed to create client channel");
1317 }
1318 } else {
1319 GPR_ASSERT(!channel);
1320 gpr_log(GPR_ERROR, "Failed to create server channel: %s",
1321 grpc_error_string(error));
1322 intptr_t integer;
1323 grpc_status_code status = GRPC_STATUS_INTERNAL;
1324 if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
1325 status = static_cast<grpc_status_code>(integer);
1326 }
1327 GRPC_ERROR_UNREF(error);
1328 grpc_transport_destroy(client_transport);
1329 grpc_transport_destroy(server_transport);
1330 channel = grpc_lame_client_channel_create(
1331 nullptr, status, "Failed to create server channel");
1332 }
1333
1334 // Free up created channel args
1335 grpc_channel_args_destroy(server_args);
1336 grpc_channel_args_destroy(client_args);
1337
1338 // Now finish scheduled operations
1339
1340 return channel;
1341 }
1342
grpc_inproc_transport_shutdown(void)1343 void grpc_inproc_transport_shutdown(void) {
1344 grpc_core::ExecCtx exec_ctx;
1345 grpc_slice_unref_internal(g_empty_slice);
1346 grpc_slice_unref_internal(g_fake_path_key);
1347 grpc_slice_unref_internal(g_fake_path_value);
1348 grpc_slice_unref_internal(g_fake_auth_key);
1349 grpc_slice_unref_internal(g_fake_auth_value);
1350 }
1351