• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
25 #include <string.h>
26 #include "src/core/ext/transport/inproc/inproc_transport.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/channel.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/server.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/lib/transport/error_utils.h"
36 #include "src/core/lib/transport/transport_impl.h"
37 
38 #define INPROC_LOG(...)                               \
39   do {                                                \
40     if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
41       gpr_log(__VA_ARGS__);                           \
42     }                                                 \
43   } while (0)
44 
45 namespace {
46 grpc_slice g_empty_slice;
47 grpc_slice g_fake_path_key;
48 grpc_slice g_fake_path_value;
49 grpc_slice g_fake_auth_key;
50 grpc_slice g_fake_auth_value;
51 
52 struct inproc_stream;
53 bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
54 void maybe_process_ops_locked(inproc_stream* s, grpc_error* error);
55 void op_state_machine_locked(inproc_stream* s, grpc_error* error);
56 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
57                   bool is_initial);
58 grpc_error* fill_in_metadata(inproc_stream* s,
59                              const grpc_metadata_batch* metadata,
60                              uint32_t flags, grpc_metadata_batch* out_md,
61                              uint32_t* outflags, bool* markfilled);
62 
63 struct shared_mu {
shared_mu__anone0c216ed0111::shared_mu64   shared_mu() {
65     // Share one lock between both sides since both sides get affected
66     gpr_mu_init(&mu);
67     gpr_ref_init(&refs, 2);
68   }
69 
~shared_mu__anone0c216ed0111::shared_mu70   ~shared_mu() { gpr_mu_destroy(&mu); }
71 
72   gpr_mu mu;
73   gpr_refcount refs;
74 };
75 
76 struct inproc_transport {
inproc_transport__anone0c216ed0111::inproc_transport77   inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
78                    bool is_client)
79       : mu(mu),
80         is_client(is_client),
81         state_tracker(is_client ? "inproc_client" : "inproc_server",
82                       GRPC_CHANNEL_READY) {
83     base.vtable = vtable;
84     // Start each side of transport with 2 refs since they each have a ref
85     // to the other
86     gpr_ref_init(&refs, 2);
87   }
88 
~inproc_transport__anone0c216ed0111::inproc_transport89   ~inproc_transport() {
90     if (gpr_unref(&mu->refs)) {
91       mu->~shared_mu();
92       gpr_free(mu);
93     }
94   }
95 
ref__anone0c216ed0111::inproc_transport96   void ref() {
97     INPROC_LOG(GPR_INFO, "ref_transport %p", this);
98     gpr_ref(&refs);
99   }
100 
unref__anone0c216ed0111::inproc_transport101   void unref() {
102     INPROC_LOG(GPR_INFO, "unref_transport %p", this);
103     if (!gpr_unref(&refs)) {
104       return;
105     }
106     INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
107     this->~inproc_transport();
108     gpr_free(this);
109   }
110 
111   grpc_transport base;
112   shared_mu* mu;
113   gpr_refcount refs;
114   bool is_client;
115   grpc_core::ConnectivityStateTracker state_tracker;
116   void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
117                            const void* server_data);
118   void* accept_stream_data;
119   bool is_closed = false;
120   struct inproc_transport* other_side;
121   struct inproc_stream* stream_list = nullptr;
122 };
123 
124 struct inproc_stream {
inproc_stream__anone0c216ed0111::inproc_stream125   inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
126                 const void* server_data, grpc_core::Arena* arena)
127       : t(t), refs(refcount), arena(arena) {
128     // Ref this stream right now for ctor and list.
129     ref("inproc_init_stream:init");
130     ref("inproc_init_stream:list");
131 
132     grpc_metadata_batch_init(&to_read_initial_md);
133     grpc_metadata_batch_init(&to_read_trailing_md);
134     grpc_metadata_batch_init(&write_buffer_initial_md);
135     grpc_metadata_batch_init(&write_buffer_trailing_md);
136 
137     stream_list_prev = nullptr;
138     gpr_mu_lock(&t->mu->mu);
139     stream_list_next = t->stream_list;
140     if (t->stream_list) {
141       t->stream_list->stream_list_prev = this;
142     }
143     t->stream_list = this;
144     gpr_mu_unlock(&t->mu->mu);
145 
146     if (!server_data) {
147       t->ref();
148       inproc_transport* st = t->other_side;
149       st->ref();
150       other_side = nullptr;  // will get filled in soon
151       // Pass the client-side stream address to the server-side for a ref
152       ref("inproc_init_stream:clt");  // ref it now on behalf of server
153                                       // side to avoid destruction
154       INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
155                  st->accept_stream_cb, st->accept_stream_data);
156       (*st->accept_stream_cb)(st->accept_stream_data, &st->base, this);
157     } else {
158       // This is the server-side and is being called through accept_stream_cb
159       inproc_stream* cs = const_cast<inproc_stream*>(
160           static_cast<const inproc_stream*>(server_data));
161       other_side = cs;
162       // Ref the server-side stream on behalf of the client now
163       ref("inproc_init_stream:srv");
164 
165       // Now we are about to affect the other side, so lock the transport
166       // to make sure that it doesn't get destroyed
167       gpr_mu_lock(&t->mu->mu);
168       cs->other_side = this;
169       // Now transfer from the other side's write_buffer if any to the to_read
170       // buffer
171       if (cs->write_buffer_initial_md_filled) {
172         fill_in_metadata(this, &cs->write_buffer_initial_md,
173                          cs->write_buffer_initial_md_flags, &to_read_initial_md,
174                          &to_read_initial_md_flags, &to_read_initial_md_filled);
175         deadline = GPR_MIN(deadline, cs->write_buffer_deadline);
176         grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
177         cs->write_buffer_initial_md_filled = false;
178       }
179       if (cs->write_buffer_trailing_md_filled) {
180         fill_in_metadata(this, &cs->write_buffer_trailing_md, 0,
181                          &to_read_trailing_md, nullptr,
182                          &to_read_trailing_md_filled);
183         grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
184         cs->write_buffer_trailing_md_filled = false;
185       }
186       if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
187         cancel_other_error = cs->write_buffer_cancel_error;
188         cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
189         maybe_process_ops_locked(this, cancel_other_error);
190       }
191 
192       gpr_mu_unlock(&t->mu->mu);
193     }
194   }
195 
~inproc_stream__anone0c216ed0111::inproc_stream196   ~inproc_stream() {
197     GRPC_ERROR_UNREF(write_buffer_cancel_error);
198     GRPC_ERROR_UNREF(cancel_self_error);
199     GRPC_ERROR_UNREF(cancel_other_error);
200 
201     if (recv_inited) {
202       grpc_slice_buffer_destroy_internal(&recv_message);
203     }
204 
205     t->unref();
206   }
207 
208 #ifndef NDEBUG
209 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
210 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
211 #else
212 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
213 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
214 #endif
ref__anone0c216ed0111::inproc_stream215   void ref(const char* reason) {
216     INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
217     STREAM_REF(refs, reason);
218   }
219 
unref__anone0c216ed0111::inproc_stream220   void unref(const char* reason) {
221     INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
222     STREAM_UNREF(refs, reason);
223   }
224 #undef STREAM_REF
225 #undef STREAM_UNREF
226 
227   inproc_transport* t;
228   grpc_metadata_batch to_read_initial_md;
229   uint32_t to_read_initial_md_flags = 0;
230   bool to_read_initial_md_filled = false;
231   grpc_metadata_batch to_read_trailing_md;
232   bool to_read_trailing_md_filled = false;
233   bool ops_needed = false;
234   // Write buffer used only during gap at init time when client-side
235   // stream is set up but server side stream is not yet set up
236   grpc_metadata_batch write_buffer_initial_md;
237   bool write_buffer_initial_md_filled = false;
238   uint32_t write_buffer_initial_md_flags = 0;
239   grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
240   grpc_metadata_batch write_buffer_trailing_md;
241   bool write_buffer_trailing_md_filled = false;
242   grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE;
243 
244   struct inproc_stream* other_side;
245   bool other_side_closed = false;               // won't talk anymore
246   bool write_buffer_other_side_closed = false;  // on hold
247   grpc_stream_refcount* refs;
248 
249   grpc_core::Arena* arena;
250 
251   grpc_transport_stream_op_batch* send_message_op = nullptr;
252   grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
253   grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
254   grpc_transport_stream_op_batch* recv_message_op = nullptr;
255   grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
256 
257   grpc_slice_buffer recv_message;
258   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
259   bool recv_inited = false;
260 
261   bool initial_md_sent = false;
262   bool trailing_md_sent = false;
263   bool initial_md_recvd = false;
264   bool trailing_md_recvd = false;
265   // The following tracks if the server-side only pretends to have received
266   // trailing metadata since it no longer cares about the RPC. If that is the
267   // case, it is still ok for the client to send trailing metadata (in which
268   // case it will be ignored).
269   bool trailing_md_recvd_implicit_only = false;
270 
271   bool closed = false;
272 
273   grpc_error* cancel_self_error = GRPC_ERROR_NONE;
274   grpc_error* cancel_other_error = GRPC_ERROR_NONE;
275 
276   grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
277 
278   bool listed = true;
279   struct inproc_stream* stream_list_prev;
280   struct inproc_stream* stream_list_next;
281 };
282 
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)283 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
284                   bool is_initial) {
285   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
286        md = md->next) {
287     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
288     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
289     gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
290             is_client ? "CLI" : "SVR", key, value);
291     gpr_free(key);
292     gpr_free(value);
293   }
294 }
295 
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,uint32_t flags,grpc_metadata_batch * out_md,uint32_t * outflags,bool * markfilled)296 grpc_error* fill_in_metadata(inproc_stream* s,
297                              const grpc_metadata_batch* metadata,
298                              uint32_t flags, grpc_metadata_batch* out_md,
299                              uint32_t* outflags, bool* markfilled) {
300   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
301     log_metadata(metadata, s->t->is_client, outflags != nullptr);
302   }
303 
304   if (outflags != nullptr) {
305     *outflags = flags;
306   }
307   if (markfilled != nullptr) {
308     *markfilled = true;
309   }
310   grpc_error* error = GRPC_ERROR_NONE;
311   for (grpc_linked_mdelem* elem = metadata->list.head;
312        (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) {
313     grpc_linked_mdelem* nelem =
314         static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*nelem)));
315     nelem->md =
316         grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)),
317                                 grpc_slice_intern(GRPC_MDVALUE(elem->md)));
318 
319     error = grpc_metadata_batch_link_tail(out_md, nelem);
320   }
321   return error;
322 }
323 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)324 int init_stream(grpc_transport* gt, grpc_stream* gs,
325                 grpc_stream_refcount* refcount, const void* server_data,
326                 grpc_core::Arena* arena) {
327   INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
328   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
329   new (gs) inproc_stream(t, refcount, server_data, arena);
330   return 0;  // return value is not important
331 }
332 
close_stream_locked(inproc_stream * s)333 void close_stream_locked(inproc_stream* s) {
334   if (!s->closed) {
335     // Release the metadata that we would have written out
336     grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
337     grpc_metadata_batch_destroy(&s->write_buffer_trailing_md);
338 
339     if (s->listed) {
340       inproc_stream* p = s->stream_list_prev;
341       inproc_stream* n = s->stream_list_next;
342       if (p != nullptr) {
343         p->stream_list_next = n;
344       } else {
345         s->t->stream_list = n;
346       }
347       if (n != nullptr) {
348         n->stream_list_prev = p;
349       }
350       s->listed = false;
351       s->unref("close_stream:list");
352     }
353     s->closed = true;
354     s->unref("close_stream:closing");
355   }
356 }
357 
358 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)359 void close_other_side_locked(inproc_stream* s, const char* reason) {
360   if (s->other_side != nullptr) {
361     // First release the metadata that came from the other side's arena
362     grpc_metadata_batch_destroy(&s->to_read_initial_md);
363     grpc_metadata_batch_destroy(&s->to_read_trailing_md);
364 
365     s->other_side->unref(reason);
366     s->other_side_closed = true;
367     s->other_side = nullptr;
368   } else if (!s->other_side_closed) {
369     s->write_buffer_other_side_closed = true;
370   }
371 }
372 
373 // Call the on_complete closure associated with this stream_op_batch if
374 // this stream_op_batch is only one of the pending operations for this
375 // stream. This is called when one of the pending operations for the stream
376 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error * error,grpc_transport_stream_op_batch * op,const char * msg)377 void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
378                                   grpc_transport_stream_op_batch* op,
379                                   const char* msg) {
380   int is_sm = static_cast<int>(op == s->send_message_op);
381   int is_stm = static_cast<int>(op == s->send_trailing_md_op);
382   // TODO(vjpai): We should not consider the recv ops here, since they
383   // have their own callbacks.  We should invoke a batch's on_complete
384   // as soon as all of the batch's send ops are complete, even if there
385   // are still recv ops pending.
386   int is_rim = static_cast<int>(op == s->recv_initial_md_op);
387   int is_rm = static_cast<int>(op == s->recv_message_op);
388   int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
389 
390   if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
391     INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
392     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
393                             GRPC_ERROR_REF(error));
394   }
395 }
396 
maybe_process_ops_locked(inproc_stream * s,grpc_error * error)397 void maybe_process_ops_locked(inproc_stream* s, grpc_error* error) {
398   if (s && (error != GRPC_ERROR_NONE || s->ops_needed)) {
399     s->ops_needed = false;
400     op_state_machine_locked(s, error);
401   }
402 }
403 
fail_helper_locked(inproc_stream * s,grpc_error * error)404 void fail_helper_locked(inproc_stream* s, grpc_error* error) {
405   INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
406   // If we're failing this side, we need to make sure that
407   // we also send or have already sent trailing metadata
408   if (!s->trailing_md_sent) {
409     // Send trailing md to the other side indicating cancellation
410     s->trailing_md_sent = true;
411 
412     grpc_metadata_batch fake_md;
413     grpc_metadata_batch_init(&fake_md);
414 
415     inproc_stream* other = s->other_side;
416     grpc_metadata_batch* dest = (other == nullptr)
417                                     ? &s->write_buffer_trailing_md
418                                     : &other->to_read_trailing_md;
419     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
420                                           : &other->to_read_trailing_md_filled;
421     fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
422     grpc_metadata_batch_destroy(&fake_md);
423 
424     if (other != nullptr) {
425       if (other->cancel_other_error == GRPC_ERROR_NONE) {
426         other->cancel_other_error = GRPC_ERROR_REF(error);
427       }
428       maybe_process_ops_locked(other, error);
429     } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
430       s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
431     }
432   }
433   if (s->recv_initial_md_op) {
434     grpc_error* err;
435     if (!s->t->is_client) {
436       // If this is a server, provide initial metadata with a path and authority
437       // since it expects that as well as no error yet
438       grpc_metadata_batch fake_md;
439       grpc_metadata_batch_init(&fake_md);
440       grpc_linked_mdelem* path_md =
441           static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*path_md)));
442       path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value);
443       GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) ==
444                  GRPC_ERROR_NONE);
445       grpc_linked_mdelem* auth_md =
446           static_cast<grpc_linked_mdelem*>(s->arena->Alloc(sizeof(*auth_md)));
447       auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value);
448       GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) ==
449                  GRPC_ERROR_NONE);
450 
451       fill_in_metadata(
452           s, &fake_md, 0,
453           s->recv_initial_md_op->payload->recv_initial_metadata
454               .recv_initial_metadata,
455           s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
456           nullptr);
457       grpc_metadata_batch_destroy(&fake_md);
458       err = GRPC_ERROR_NONE;
459     } else {
460       err = GRPC_ERROR_REF(error);
461     }
462     if (s->recv_initial_md_op->payload->recv_initial_metadata
463             .trailing_metadata_available != nullptr) {
464       // Set to true unconditionally, because we're failing the call, so even
465       // if we haven't actually seen the send_trailing_metadata op from the
466       // other side, we're going to return trailing metadata anyway.
467       *s->recv_initial_md_op->payload->recv_initial_metadata
468            .trailing_metadata_available = true;
469     }
470     INPROC_LOG(GPR_INFO,
471                "fail_helper %p scheduling initial-metadata-ready %p %p", s,
472                error, err);
473     grpc_core::ExecCtx::Run(
474         DEBUG_LOCATION,
475         s->recv_initial_md_op->payload->recv_initial_metadata
476             .recv_initial_metadata_ready,
477         err);
478     // Last use of err so no need to REF and then UNREF it
479 
480     complete_if_batch_end_locked(
481         s, error, s->recv_initial_md_op,
482         "fail_helper scheduling recv-initial-metadata-on-complete");
483     s->recv_initial_md_op = nullptr;
484   }
485   if (s->recv_message_op) {
486     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
487                error);
488     grpc_core::ExecCtx::Run(
489         DEBUG_LOCATION,
490         s->recv_message_op->payload->recv_message.recv_message_ready,
491         GRPC_ERROR_REF(error));
492     complete_if_batch_end_locked(
493         s, error, s->recv_message_op,
494         "fail_helper scheduling recv-message-on-complete");
495     s->recv_message_op = nullptr;
496   }
497   if (s->send_message_op) {
498     s->send_message_op->payload->send_message.send_message.reset();
499     complete_if_batch_end_locked(
500         s, error, s->send_message_op,
501         "fail_helper scheduling send-message-on-complete");
502     s->send_message_op = nullptr;
503   }
504   if (s->send_trailing_md_op) {
505     complete_if_batch_end_locked(
506         s, error, s->send_trailing_md_op,
507         "fail_helper scheduling send-trailng-md-on-complete");
508     s->send_trailing_md_op = nullptr;
509   }
510   if (s->recv_trailing_md_op) {
511     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
512                s, error);
513     grpc_core::ExecCtx::Run(
514         DEBUG_LOCATION,
515         s->recv_trailing_md_op->payload->recv_trailing_metadata
516             .recv_trailing_metadata_ready,
517         GRPC_ERROR_REF(error));
518     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
519                s, error);
520     complete_if_batch_end_locked(
521         s, error, s->recv_trailing_md_op,
522         "fail_helper scheduling recv-trailing-metadata-on-complete");
523     s->recv_trailing_md_op = nullptr;
524   }
525   close_other_side_locked(s, "fail_helper:other_side");
526   close_stream_locked(s);
527 
528   GRPC_ERROR_UNREF(error);
529 }
530 
531 // TODO(vjpai): It should not be necessary to drain the incoming byte
532 // stream and create a new one; instead, we should simply pass the byte
533 // stream from the sender directly to the receiver as-is.
534 //
535 // Note that fixing this will also avoid the assumption in this code
536 // that the incoming byte stream's next() call will always return
537 // synchronously.  That assumption is true today but may not always be
538 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)539 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
540   size_t remaining =
541       sender->send_message_op->payload->send_message.send_message->length();
542   if (receiver->recv_inited) {
543     grpc_slice_buffer_destroy_internal(&receiver->recv_message);
544   }
545   grpc_slice_buffer_init(&receiver->recv_message);
546   receiver->recv_inited = true;
547   do {
548     grpc_slice message_slice;
549     grpc_closure unused;
550     GPR_ASSERT(
551         sender->send_message_op->payload->send_message.send_message->Next(
552             SIZE_MAX, &unused));
553     grpc_error* error =
554         sender->send_message_op->payload->send_message.send_message->Pull(
555             &message_slice);
556     if (error != GRPC_ERROR_NONE) {
557       cancel_stream_locked(sender, GRPC_ERROR_REF(error));
558       break;
559     }
560     GPR_ASSERT(error == GRPC_ERROR_NONE);
561     remaining -= GRPC_SLICE_LENGTH(message_slice);
562     grpc_slice_buffer_add(&receiver->recv_message, message_slice);
563   } while (remaining > 0);
564   sender->send_message_op->payload->send_message.send_message.reset();
565 
566   receiver->recv_stream.Init(&receiver->recv_message, 0);
567   receiver->recv_message_op->payload->recv_message.recv_message->reset(
568       receiver->recv_stream.get());
569   INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
570              receiver);
571   grpc_core::ExecCtx::Run(
572       DEBUG_LOCATION,
573       receiver->recv_message_op->payload->recv_message.recv_message_ready,
574       GRPC_ERROR_NONE);
575   complete_if_batch_end_locked(
576       sender, GRPC_ERROR_NONE, sender->send_message_op,
577       "message_transfer scheduling sender on_complete");
578   complete_if_batch_end_locked(
579       receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
580       "message_transfer scheduling receiver on_complete");
581 
582   receiver->recv_message_op = nullptr;
583   sender->send_message_op = nullptr;
584 }
585 
op_state_machine_locked(inproc_stream * s,grpc_error * error)586 void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
587   // This function gets called when we have contents in the unprocessed reads
588   // Get what we want based on our ops wanted
589   // Schedule our appropriate closures
590   // and then return to ops_needed state if still needed
591 
592   grpc_error* new_err = GRPC_ERROR_NONE;
593 
594   bool needs_close = false;
595 
596   INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
597   // cancellation takes precedence
598   inproc_stream* other = s->other_side;
599 
600   if (s->cancel_self_error != GRPC_ERROR_NONE) {
601     fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
602     goto done;
603   } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
604     fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
605     goto done;
606   } else if (error != GRPC_ERROR_NONE) {
607     fail_helper_locked(s, GRPC_ERROR_REF(error));
608     goto done;
609   }
610 
611   if (s->send_message_op && other) {
612     if (other->recv_message_op) {
613       message_transfer_locked(s, other);
614       maybe_process_ops_locked(other, GRPC_ERROR_NONE);
615     } else if (!s->t->is_client && s->trailing_md_sent) {
616       // A server send will never be matched if the server already sent status
617       s->send_message_op->payload->send_message.send_message.reset();
618       complete_if_batch_end_locked(
619           s, GRPC_ERROR_NONE, s->send_message_op,
620           "op_state_machine scheduling send-message-on-complete case 1");
621       s->send_message_op = nullptr;
622     }
623   }
624   // Pause a send trailing metadata if there is still an outstanding
625   // send message unless we know that the send message will never get
626   // matched to a receive. This happens on the client if the server has
627   // already sent status or on the server if the client has requested
628   // status
629   if (s->send_trailing_md_op &&
630       (!s->send_message_op ||
631        (s->t->is_client &&
632         (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
633        (!s->t->is_client && other &&
634         (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
635          other->recv_trailing_md_op)))) {
636     grpc_metadata_batch* dest = (other == nullptr)
637                                     ? &s->write_buffer_trailing_md
638                                     : &other->to_read_trailing_md;
639     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
640                                           : &other->to_read_trailing_md_filled;
641     if (*destfilled || s->trailing_md_sent) {
642       // The buffer is already in use; that's an error!
643       INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
644       new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
645       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
646       goto done;
647     } else {
648       if (!other || !other->closed) {
649         fill_in_metadata(s,
650                          s->send_trailing_md_op->payload->send_trailing_metadata
651                              .send_trailing_metadata,
652                          0, dest, nullptr, destfilled);
653       }
654       s->trailing_md_sent = true;
655       if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
656         *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
657       }
658       if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
659         INPROC_LOG(GPR_INFO,
660                    "op_state_machine %p scheduling trailing-metadata-ready", s);
661         grpc_core::ExecCtx::Run(
662             DEBUG_LOCATION,
663             s->recv_trailing_md_op->payload->recv_trailing_metadata
664                 .recv_trailing_metadata_ready,
665             GRPC_ERROR_NONE);
666         INPROC_LOG(GPR_INFO,
667                    "op_state_machine %p scheduling trailing-md-on-complete", s);
668         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
669                                 s->recv_trailing_md_op->on_complete,
670                                 GRPC_ERROR_NONE);
671         s->recv_trailing_md_op = nullptr;
672         needs_close = true;
673       }
674     }
675     maybe_process_ops_locked(other, GRPC_ERROR_NONE);
676     complete_if_batch_end_locked(
677         s, GRPC_ERROR_NONE, s->send_trailing_md_op,
678         "op_state_machine scheduling send-trailing-metadata-on-complete");
679     s->send_trailing_md_op = nullptr;
680   }
681   if (s->recv_initial_md_op) {
682     if (s->initial_md_recvd) {
683       new_err =
684           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
685       INPROC_LOG(
686           GPR_INFO,
687           "op_state_machine %p scheduling on_complete errors for already "
688           "recvd initial md %p",
689           s, new_err);
690       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
691       goto done;
692     }
693 
694     if (s->to_read_initial_md_filled) {
695       s->initial_md_recvd = true;
696       new_err = fill_in_metadata(
697           s, &s->to_read_initial_md, s->to_read_initial_md_flags,
698           s->recv_initial_md_op->payload->recv_initial_metadata
699               .recv_initial_metadata,
700           s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
701           nullptr);
702       s->recv_initial_md_op->payload->recv_initial_metadata
703           .recv_initial_metadata->deadline = s->deadline;
704       if (s->recv_initial_md_op->payload->recv_initial_metadata
705               .trailing_metadata_available != nullptr) {
706         *s->recv_initial_md_op->payload->recv_initial_metadata
707              .trailing_metadata_available =
708             (other != nullptr && other->send_trailing_md_op != nullptr);
709       }
710       grpc_metadata_batch_clear(&s->to_read_initial_md);
711       s->to_read_initial_md_filled = false;
712       INPROC_LOG(GPR_INFO,
713                  "op_state_machine %p scheduling initial-metadata-ready %p", s,
714                  new_err);
715       grpc_core::ExecCtx::Run(
716           DEBUG_LOCATION,
717           s->recv_initial_md_op->payload->recv_initial_metadata
718               .recv_initial_metadata_ready,
719           GRPC_ERROR_REF(new_err));
720       complete_if_batch_end_locked(
721           s, new_err, s->recv_initial_md_op,
722           "op_state_machine scheduling recv-initial-metadata-on-complete");
723       s->recv_initial_md_op = nullptr;
724 
725       if (new_err != GRPC_ERROR_NONE) {
726         INPROC_LOG(GPR_INFO,
727                    "op_state_machine %p scheduling on_complete errors2 %p", s,
728                    new_err);
729         fail_helper_locked(s, GRPC_ERROR_REF(new_err));
730         goto done;
731       }
732     }
733   }
734   if (s->recv_message_op) {
735     if (other && other->send_message_op) {
736       message_transfer_locked(other, s);
737       maybe_process_ops_locked(other, GRPC_ERROR_NONE);
738     }
739   }
740   if (s->to_read_trailing_md_filled) {
741     if (s->trailing_md_recvd) {
742       if (s->trailing_md_recvd_implicit_only) {
743         INPROC_LOG(GPR_INFO,
744                    "op_state_machine %p already implicitly received trailing "
745                    "metadata, so ignoring new trailing metadata from client",
746                    s);
747         grpc_metadata_batch_clear(&s->to_read_trailing_md);
748         s->to_read_trailing_md_filled = false;
749         s->trailing_md_recvd_implicit_only = false;
750       } else {
751         new_err =
752             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
753         INPROC_LOG(
754             GPR_INFO,
755             "op_state_machine %p scheduling on_complete errors for already "
756             "recvd trailing md %p",
757             s, new_err);
758         fail_helper_locked(s, GRPC_ERROR_REF(new_err));
759         goto done;
760       }
761     }
762     if (s->recv_message_op != nullptr) {
763       // This message needs to be wrapped up because it will never be
764       // satisfied
765       *s->recv_message_op->payload->recv_message.recv_message = nullptr;
766       INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
767       grpc_core::ExecCtx::Run(
768           DEBUG_LOCATION,
769           s->recv_message_op->payload->recv_message.recv_message_ready,
770           GRPC_ERROR_NONE);
771       complete_if_batch_end_locked(
772           s, new_err, s->recv_message_op,
773           "op_state_machine scheduling recv-message-on-complete");
774       s->recv_message_op = nullptr;
775     }
776     if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
777       // Nothing further will try to receive from this stream, so finish off
778       // any outstanding send_message op
779       s->send_message_op->payload->send_message.send_message.reset();
780       s->send_message_op->payload->send_message.stream_write_closed = true;
781       complete_if_batch_end_locked(
782           s, new_err, s->send_message_op,
783           "op_state_machine scheduling send-message-on-complete case 2");
784       s->send_message_op = nullptr;
785     }
786     if (s->recv_trailing_md_op != nullptr) {
787       // We wanted trailing metadata and we got it
788       s->trailing_md_recvd = true;
789       new_err =
790           fill_in_metadata(s, &s->to_read_trailing_md, 0,
791                            s->recv_trailing_md_op->payload
792                                ->recv_trailing_metadata.recv_trailing_metadata,
793                            nullptr, nullptr);
794       grpc_metadata_batch_clear(&s->to_read_trailing_md);
795       s->to_read_trailing_md_filled = false;
796 
797       // We should schedule the recv_trailing_md_op completion if
798       // 1. this stream is the client-side
799       // 2. this stream is the server-side AND has already sent its trailing md
800       //    (If the server hasn't already sent its trailing md, it doesn't have
801       //     a final status, so don't mark this op complete)
802       if (s->t->is_client || s->trailing_md_sent) {
803         INPROC_LOG(GPR_INFO,
804                    "op_state_machine %p scheduling trailing-md-on-complete %p",
805                    s, new_err);
806         grpc_core::ExecCtx::Run(
807             DEBUG_LOCATION,
808             s->recv_trailing_md_op->payload->recv_trailing_metadata
809                 .recv_trailing_metadata_ready,
810             GRPC_ERROR_REF(new_err));
811         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
812                                 s->recv_trailing_md_op->on_complete,
813                                 GRPC_ERROR_REF(new_err));
814         s->recv_trailing_md_op = nullptr;
815         needs_close = s->trailing_md_sent;
816       } else {
817         INPROC_LOG(GPR_INFO,
818                    "op_state_machine %p server needs to delay handling "
819                    "trailing-md-on-complete %p",
820                    s, new_err);
821       }
822     } else if (!s->trailing_md_recvd) {
823       INPROC_LOG(
824           GPR_INFO,
825           "op_state_machine %p has trailing md but not yet waiting for it", s);
826     }
827   }
828   if (!s->t->is_client && s->trailing_md_sent &&
829       (s->recv_trailing_md_op != nullptr)) {
830     // In this case, we don't care to receive the write-close from the client
831     // because we have already sent status and the RPC is over as far as we
832     // are concerned.
833     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %p",
834                s, new_err);
835     grpc_core::ExecCtx::Run(
836         DEBUG_LOCATION,
837         s->recv_trailing_md_op->payload->recv_trailing_metadata
838             .recv_trailing_metadata_ready,
839         GRPC_ERROR_REF(new_err));
840     complete_if_batch_end_locked(
841         s, new_err, s->recv_trailing_md_op,
842         "op_state_machine scheduling recv-trailing-md-on-complete");
843     s->trailing_md_recvd = true;
844     s->recv_trailing_md_op = nullptr;
845     // Since we are only pretending to have received the trailing MD, it would
846     // be ok (not an error) if the client actually sends it later.
847     s->trailing_md_recvd_implicit_only = true;
848   }
849   if (s->trailing_md_recvd && s->recv_message_op) {
850     // No further message will come on this stream, so finish off the
851     // recv_message_op
852     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
853     *s->recv_message_op->payload->recv_message.recv_message = nullptr;
854     grpc_core::ExecCtx::Run(
855         DEBUG_LOCATION,
856         s->recv_message_op->payload->recv_message.recv_message_ready,
857         GRPC_ERROR_NONE);
858     complete_if_batch_end_locked(
859         s, new_err, s->recv_message_op,
860         "op_state_machine scheduling recv-message-on-complete");
861     s->recv_message_op = nullptr;
862   }
863   if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
864     // Nothing further will try to receive from this stream, so finish off
865     // any outstanding send_message op
866     s->send_message_op->payload->send_message.send_message.reset();
867     complete_if_batch_end_locked(
868         s, new_err, s->send_message_op,
869         "op_state_machine scheduling send-message-on-complete case 3");
870     s->send_message_op = nullptr;
871   }
872   if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
873       s->recv_message_op || s->recv_trailing_md_op) {
874     // Didn't get the item we wanted so we still need to get
875     // rescheduled
876     INPROC_LOG(
877         GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
878         s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
879         s->recv_message_op, s->recv_trailing_md_op);
880     s->ops_needed = true;
881   }
882 done:
883   if (needs_close) {
884     close_other_side_locked(s, "op_state_machine");
885     close_stream_locked(s);
886   }
887   GRPC_ERROR_UNREF(new_err);
888 }
889 
cancel_stream_locked(inproc_stream * s,grpc_error * error)890 bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
891   bool ret = false;  // was the cancel accepted
892   INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error));
893   if (s->cancel_self_error == GRPC_ERROR_NONE) {
894     ret = true;
895     s->cancel_self_error = GRPC_ERROR_REF(error);
896     // Catch current value of other before it gets closed off
897     inproc_stream* other = s->other_side;
898     maybe_process_ops_locked(s, s->cancel_self_error);
899     // Send trailing md to the other side indicating cancellation, even if we
900     // already have
901     s->trailing_md_sent = true;
902 
903     grpc_metadata_batch cancel_md;
904     grpc_metadata_batch_init(&cancel_md);
905 
906     grpc_metadata_batch* dest = (other == nullptr)
907                                     ? &s->write_buffer_trailing_md
908                                     : &other->to_read_trailing_md;
909     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
910                                           : &other->to_read_trailing_md_filled;
911     fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
912     grpc_metadata_batch_destroy(&cancel_md);
913 
914     if (other != nullptr) {
915       if (other->cancel_other_error == GRPC_ERROR_NONE) {
916         other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
917       }
918       maybe_process_ops_locked(other, other->cancel_other_error);
919     } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
920       s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
921     }
922 
923     // if we are a server and already received trailing md but
924     // couldn't complete that because we hadn't yet sent out trailing
925     // md, now's the chance
926     if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
927       grpc_core::ExecCtx::Run(
928           DEBUG_LOCATION,
929           s->recv_trailing_md_op->payload->recv_trailing_metadata
930               .recv_trailing_metadata_ready,
931           GRPC_ERROR_REF(s->cancel_self_error));
932       complete_if_batch_end_locked(
933           s, s->cancel_self_error, s->recv_trailing_md_op,
934           "cancel_stream scheduling trailing-md-on-complete");
935       s->recv_trailing_md_op = nullptr;
936     }
937   }
938 
939   close_other_side_locked(s, "cancel_stream:other_side");
940   close_stream_locked(s);
941 
942   GRPC_ERROR_UNREF(error);
943   return ret;
944 }
945 
do_nothing(void *,grpc_error *)946 void do_nothing(void* /*arg*/, grpc_error* /*error*/) {}
947 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)948 void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
949                        grpc_transport_stream_op_batch* op) {
950   INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
951   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
952   gpr_mu* mu = &s->t->mu->mu;  // save aside in case s gets closed
953   gpr_mu_lock(mu);
954 
955   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
956     if (op->send_initial_metadata) {
957       log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
958                    s->t->is_client, true);
959     }
960     if (op->send_trailing_metadata) {
961       log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
962                    s->t->is_client, false);
963     }
964   }
965   grpc_error* error = GRPC_ERROR_NONE;
966   grpc_closure* on_complete = op->on_complete;
967   // TODO(roth): This is a hack needed because we use data inside of the
968   // closure itself to do the barrier calculation (i.e., to ensure that
969   // we don't schedule the closure until all ops in the batch have been
970   // completed).  This can go away once we move to a new C++ closure API
971   // that provides the ability to create a barrier closure.
972   if (on_complete == nullptr) {
973     on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
974                                     nullptr, grpc_schedule_on_exec_ctx);
975   }
976 
977   if (op->cancel_stream) {
978     // Call cancel_stream_locked without ref'ing the cancel_error because
979     // this function is responsible to make sure that that field gets unref'ed
980     cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
981     // this op can complete without an error
982   } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
983     // already self-canceled so still give it an error
984     error = GRPC_ERROR_REF(s->cancel_self_error);
985   } else {
986     INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
987                s->t->is_client ? "client" : "server",
988                op->send_initial_metadata ? " send_initial_metadata" : "",
989                op->send_message ? " send_message" : "",
990                op->send_trailing_metadata ? " send_trailing_metadata" : "",
991                op->recv_initial_metadata ? " recv_initial_metadata" : "",
992                op->recv_message ? " recv_message" : "",
993                op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
994   }
995 
996   inproc_stream* other = s->other_side;
997   if (error == GRPC_ERROR_NONE &&
998       (op->send_initial_metadata || op->send_trailing_metadata)) {
999     if (s->t->is_closed) {
1000       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
1001     }
1002     if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
1003       grpc_metadata_batch* dest = (other == nullptr)
1004                                       ? &s->write_buffer_initial_md
1005                                       : &other->to_read_initial_md;
1006       uint32_t* destflags = (other == nullptr)
1007                                 ? &s->write_buffer_initial_md_flags
1008                                 : &other->to_read_initial_md_flags;
1009       bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
1010                                             : &other->to_read_initial_md_filled;
1011       if (*destfilled || s->initial_md_sent) {
1012         // The buffer is already in use; that's an error!
1013         INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
1014         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
1015       } else {
1016         if (!s->other_side_closed) {
1017           fill_in_metadata(
1018               s, op->payload->send_initial_metadata.send_initial_metadata,
1019               op->payload->send_initial_metadata.send_initial_metadata_flags,
1020               dest, destflags, destfilled);
1021         }
1022         if (s->t->is_client) {
1023           grpc_millis* dl =
1024               (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
1025           *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
1026                                  .send_initial_metadata->deadline);
1027           s->initial_md_sent = true;
1028         }
1029       }
1030       maybe_process_ops_locked(other, error);
1031     }
1032   }
1033 
1034   if (error == GRPC_ERROR_NONE &&
1035       (op->send_message || op->send_trailing_metadata ||
1036        op->recv_initial_metadata || op->recv_message ||
1037        op->recv_trailing_metadata)) {
1038     // Mark ops that need to be processed by the state machine
1039     if (op->send_message) {
1040       s->send_message_op = op;
1041     }
1042     if (op->send_trailing_metadata) {
1043       s->send_trailing_md_op = op;
1044     }
1045     if (op->recv_initial_metadata) {
1046       s->recv_initial_md_op = op;
1047     }
1048     if (op->recv_message) {
1049       s->recv_message_op = op;
1050     }
1051     if (op->recv_trailing_metadata) {
1052       s->recv_trailing_md_op = op;
1053     }
1054 
1055     // We want to initiate the state machine if:
1056     // 1. We want to send a message and the other side wants to receive
1057     // 2. We want to send trailing metadata and there isn't an unmatched send
1058     //    or the other side wants trailing metadata
1059     // 3. We want initial metadata and the other side has sent it
1060     // 4. We want to receive a message and there is a message ready
1061     // 5. There is trailing metadata, even if nothing specifically wants
1062     //    that because that can shut down the receive message as well
1063     if ((op->send_message && other && other->recv_message_op != nullptr) ||
1064         (op->send_trailing_metadata &&
1065          (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1066         (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1067         (op->recv_message && other && other->send_message_op != nullptr) ||
1068         (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1069       op_state_machine_locked(s, error);
1070     } else {
1071       s->ops_needed = true;
1072     }
1073   } else {
1074     if (error != GRPC_ERROR_NONE) {
1075       // Consume any send message that was sent here but that we are not pushing
1076       // to the other side
1077       if (op->send_message) {
1078         op->payload->send_message.send_message.reset();
1079       }
1080       // Schedule op's closures that we didn't push to op state machine
1081       if (op->recv_initial_metadata) {
1082         if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1083             nullptr) {
1084           // Set to true unconditionally, because we're failing the call, so
1085           // even if we haven't actually seen the send_trailing_metadata op
1086           // from the other side, we're going to return trailing metadata
1087           // anyway.
1088           *op->payload->recv_initial_metadata.trailing_metadata_available =
1089               true;
1090         }
1091         INPROC_LOG(
1092             GPR_INFO,
1093             "perform_stream_op error %p scheduling initial-metadata-ready %p",
1094             s, error);
1095         grpc_core::ExecCtx::Run(
1096             DEBUG_LOCATION,
1097             op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1098             GRPC_ERROR_REF(error));
1099       }
1100       if (op->recv_message) {
1101         INPROC_LOG(
1102             GPR_INFO,
1103             "perform_stream_op error %p scheduling recv message-ready %p", s,
1104             error);
1105         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1106                                 op->payload->recv_message.recv_message_ready,
1107                                 GRPC_ERROR_REF(error));
1108       }
1109       if (op->recv_trailing_metadata) {
1110         INPROC_LOG(
1111             GPR_INFO,
1112             "perform_stream_op error %p scheduling trailing-metadata-ready %p",
1113             s, error);
1114         grpc_core::ExecCtx::Run(
1115             DEBUG_LOCATION,
1116             op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1117             GRPC_ERROR_REF(error));
1118       }
1119     }
1120     INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
1121                error);
1122     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
1123   }
1124   gpr_mu_unlock(mu);
1125   GRPC_ERROR_UNREF(error);
1126 }
1127 
close_transport_locked(inproc_transport * t)1128 void close_transport_locked(inproc_transport* t) {
1129   INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1130   t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
1131                             "close transport");
1132   if (!t->is_closed) {
1133     t->is_closed = true;
1134     /* Also end all streams on this transport */
1135     while (t->stream_list != nullptr) {
1136       // cancel_stream_locked also adjusts stream list
1137       cancel_stream_locked(
1138           t->stream_list,
1139           grpc_error_set_int(
1140               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1141               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1142     }
1143   }
1144 }
1145 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1146 void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1147   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1148   INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1149   gpr_mu_lock(&t->mu->mu);
1150   if (op->start_connectivity_watch != nullptr) {
1151     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1152                                 std::move(op->start_connectivity_watch));
1153   }
1154   if (op->stop_connectivity_watch != nullptr) {
1155     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1156   }
1157   if (op->set_accept_stream) {
1158     t->accept_stream_cb = op->set_accept_stream_fn;
1159     t->accept_stream_data = op->set_accept_stream_user_data;
1160   }
1161   if (op->on_consumed) {
1162     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1163   }
1164 
1165   bool do_close = false;
1166   if (op->goaway_error != GRPC_ERROR_NONE) {
1167     do_close = true;
1168     GRPC_ERROR_UNREF(op->goaway_error);
1169   }
1170   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1171     do_close = true;
1172     GRPC_ERROR_UNREF(op->disconnect_with_error);
1173   }
1174 
1175   if (do_close) {
1176     close_transport_locked(t);
1177   }
1178   gpr_mu_unlock(&t->mu->mu);
1179 }
1180 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1181 void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1182                     grpc_closure* then_schedule_closure) {
1183   INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1184   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1185   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1186   gpr_mu_lock(&t->mu->mu);
1187   close_stream_locked(s);
1188   gpr_mu_unlock(&t->mu->mu);
1189   s->~inproc_stream();
1190   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1191                           GRPC_ERROR_NONE);
1192 }
1193 
destroy_transport(grpc_transport * gt)1194 void destroy_transport(grpc_transport* gt) {
1195   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1196   INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1197   gpr_mu_lock(&t->mu->mu);
1198   close_transport_locked(t);
1199   gpr_mu_unlock(&t->mu->mu);
1200   t->other_side->unref();
1201   t->unref();
1202 }
1203 
1204 /*******************************************************************************
1205  * INTEGRATION GLUE
1206  */
1207 
set_pollset(grpc_transport *,grpc_stream *,grpc_pollset *)1208 void set_pollset(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1209                  grpc_pollset* /*pollset*/) {
1210   // Nothing to do here
1211 }
1212 
set_pollset_set(grpc_transport *,grpc_stream *,grpc_pollset_set *)1213 void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1214                      grpc_pollset_set* /*pollset_set*/) {
1215   // Nothing to do here
1216 }
1217 
get_endpoint(grpc_transport *)1218 grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; }
1219 
1220 const grpc_transport_vtable inproc_vtable = {
1221     sizeof(inproc_stream), "inproc",        init_stream,
1222     set_pollset,           set_pollset_set, perform_stream_op,
1223     perform_transport_op,  destroy_stream,  destroy_transport,
1224     get_endpoint};
1225 
1226 /*******************************************************************************
1227  * Main inproc transport functions
1228  */
inproc_transports_create(grpc_transport ** server_transport,const grpc_channel_args *,grpc_transport ** client_transport,const grpc_channel_args *)1229 void inproc_transports_create(grpc_transport** server_transport,
1230                               const grpc_channel_args* /*server_args*/,
1231                               grpc_transport** client_transport,
1232                               const grpc_channel_args* /*client_args*/) {
1233   INPROC_LOG(GPR_INFO, "inproc_transports_create");
1234   shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1235   inproc_transport* st = new (gpr_malloc(sizeof(*st)))
1236       inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
1237   inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
1238       inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
1239   st->other_side = ct;
1240   ct->other_side = st;
1241   *server_transport = reinterpret_cast<grpc_transport*>(st);
1242   *client_transport = reinterpret_cast<grpc_transport*>(ct);
1243 }
1244 }  // namespace
1245 
1246 /*******************************************************************************
1247  * GLOBAL INIT AND DESTROY
1248  */
grpc_inproc_transport_init(void)1249 void grpc_inproc_transport_init(void) {
1250   grpc_core::ExecCtx exec_ctx;
1251   g_empty_slice = grpc_core::ExternallyManagedSlice();
1252 
1253   grpc_slice key_tmp = grpc_slice_from_static_string(":path");
1254   g_fake_path_key = grpc_slice_intern(key_tmp);
1255   grpc_slice_unref_internal(key_tmp);
1256 
1257   g_fake_path_value = grpc_slice_from_static_string("/");
1258 
1259   grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
1260   g_fake_auth_key = grpc_slice_intern(auth_tmp);
1261   grpc_slice_unref_internal(auth_tmp);
1262 
1263   g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
1264 }
1265 
grpc_inproc_channel_create(grpc_server * server,grpc_channel_args * args,void *)1266 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1267                                          grpc_channel_args* args,
1268                                          void* /*reserved*/) {
1269   GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1270                  (server, args));
1271 
1272   grpc_core::ExecCtx exec_ctx;
1273 
1274   // Remove max_connection_idle and max_connection_age channel arguments since
1275   // those do not apply to inproc transports.
1276   const char* args_to_remove[] = {GRPC_ARG_MAX_CONNECTION_IDLE_MS,
1277                                   GRPC_ARG_MAX_CONNECTION_AGE_MS};
1278   const grpc_channel_args* server_args = grpc_channel_args_copy_and_remove(
1279       server->core_server->channel_args(), args_to_remove,
1280       GPR_ARRAY_SIZE(args_to_remove));
1281 
1282   // Add a default authority channel argument for the client
1283   grpc_arg default_authority_arg;
1284   default_authority_arg.type = GRPC_ARG_STRING;
1285   default_authority_arg.key = const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY);
1286   default_authority_arg.value.string = const_cast<char*>("inproc.authority");
1287   grpc_channel_args* client_args =
1288       grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1289 
1290   grpc_transport* server_transport;
1291   grpc_transport* client_transport;
1292   inproc_transports_create(&server_transport, server_args, &client_transport,
1293                            client_args);
1294 
1295   // TODO(ncteisen): design and support channelz GetSocket for inproc.
1296   grpc_error* error = server->core_server->SetupTransport(
1297       server_transport, nullptr, server_args, nullptr);
1298   grpc_channel* channel = nullptr;
1299   if (error == GRPC_ERROR_NONE) {
1300     channel =
1301         grpc_channel_create("inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL,
1302                             client_transport, nullptr, &error);
1303     if (error != GRPC_ERROR_NONE) {
1304       GPR_ASSERT(!channel);
1305       gpr_log(GPR_ERROR, "Failed to create client channel: %s",
1306               grpc_error_string(error));
1307       intptr_t integer;
1308       grpc_status_code status = GRPC_STATUS_INTERNAL;
1309       if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
1310         status = static_cast<grpc_status_code>(integer);
1311       }
1312       GRPC_ERROR_UNREF(error);
1313       // client_transport was destroyed when grpc_channel_create saw an error.
1314       grpc_transport_destroy(server_transport);
1315       channel = grpc_lame_client_channel_create(
1316           nullptr, status, "Failed to create client channel");
1317     }
1318   } else {
1319     GPR_ASSERT(!channel);
1320     gpr_log(GPR_ERROR, "Failed to create server channel: %s",
1321             grpc_error_string(error));
1322     intptr_t integer;
1323     grpc_status_code status = GRPC_STATUS_INTERNAL;
1324     if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
1325       status = static_cast<grpc_status_code>(integer);
1326     }
1327     GRPC_ERROR_UNREF(error);
1328     grpc_transport_destroy(client_transport);
1329     grpc_transport_destroy(server_transport);
1330     channel = grpc_lame_client_channel_create(
1331         nullptr, status, "Failed to create server channel");
1332   }
1333 
1334   // Free up created channel args
1335   grpc_channel_args_destroy(server_args);
1336   grpc_channel_args_destroy(client_args);
1337 
1338   // Now finish scheduled operations
1339 
1340   return channel;
1341 }
1342 
grpc_inproc_transport_shutdown(void)1343 void grpc_inproc_transport_shutdown(void) {
1344   grpc_core::ExecCtx exec_ctx;
1345   grpc_slice_unref_internal(g_empty_slice);
1346   grpc_slice_unref_internal(g_fake_path_key);
1347   grpc_slice_unref_internal(g_fake_path_value);
1348   grpc_slice_unref_internal(g_fake_auth_key);
1349   grpc_slice_unref_internal(g_fake_auth_value);
1350 }
1351