• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_H
20 #define GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_H
21 
22 #include <grpc/impl/connectivity_state.h>
23 #include <grpc/slice.h>
24 #include <grpc/status.h>
25 #include <grpc/support/port_platform.h>
26 #include <grpc/support/time.h>
27 #include <stddef.h>
28 #include <stdint.h>
29 #include <string.h>
30 
31 #include <functional>
32 #include <string>
33 #include <utility>
34 
35 #include "absl/functional/any_invocable.h"
36 #include "absl/log/log.h"
37 #include "absl/status/status.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 #include "src/core/lib/debug/trace.h"
41 #include "src/core/lib/iomgr/call_combiner.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/iomgr_fwd.h"
46 #include "src/core/lib/iomgr/polling_entity.h"
47 #include "src/core/lib/promise/arena_promise.h"
48 #include "src/core/lib/promise/context.h"
49 #include "src/core/lib/promise/latch.h"
50 #include "src/core/lib/promise/pipe.h"
51 #include "src/core/lib/resource_quota/arena.h"
52 #include "src/core/lib/slice/slice_buffer.h"
53 #include "src/core/lib/transport/call_destination.h"
54 #include "src/core/lib/transport/call_final_info.h"
55 #include "src/core/lib/transport/call_spine.h"
56 #include "src/core/lib/transport/connectivity_state.h"
57 #include "src/core/lib/transport/message.h"
58 #include "src/core/lib/transport/metadata.h"
59 #include "src/core/lib/transport/metadata_batch.h"
60 #include "src/core/lib/transport/transport_fwd.h"
61 #include "src/core/util/orphanable.h"
62 #include "src/core/util/ref_counted.h"
63 
64 // Minimum and maximum protocol accepted versions.
65 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
66 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
67 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
68 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1
69 
70 #define GRPC_ARG_TRANSPORT "grpc.internal.transport"
71 
72 namespace grpc_core {
73 
74 // Move only type that tracks call startup.
75 // Allows observation of when client_initial_metadata has been processed by the
76 // end of the local call stack.
77 // Interested observers can call Wait() to obtain a promise that will resolve
78 // when all local client_initial_metadata processing has completed.
79 // The result of this token is either true on successful completion, or false
80 // if the metadata was not sent.
81 // To set a successful completion, call Complete(true). For failure, call
82 // Complete(false).
83 // If Complete is not called, the destructor of a still held token will complete
84 // with failure.
85 // Transports should hold this token until client_initial_metadata has passed
86 // any flow control (eg MAX_CONCURRENT_STREAMS for http2).
87 class ClientInitialMetadataOutstandingToken {
88  public:
Empty()89   static ClientInitialMetadataOutstandingToken Empty() {
90     return ClientInitialMetadataOutstandingToken();
91   }
92   static ClientInitialMetadataOutstandingToken New(
93       Arena* arena = GetContext<Arena>()) {
94     ClientInitialMetadataOutstandingToken token;
95     token.latch_ = arena->New<Latch<bool>>();
96     return token;
97   }
98 
99   ClientInitialMetadataOutstandingToken(
100       const ClientInitialMetadataOutstandingToken&) = delete;
101   ClientInitialMetadataOutstandingToken& operator=(
102       const ClientInitialMetadataOutstandingToken&) = delete;
ClientInitialMetadataOutstandingToken(ClientInitialMetadataOutstandingToken && other)103   ClientInitialMetadataOutstandingToken(
104       ClientInitialMetadataOutstandingToken&& other) noexcept
105       : latch_(std::exchange(other.latch_, nullptr)) {}
106   ClientInitialMetadataOutstandingToken& operator=(
107       ClientInitialMetadataOutstandingToken&& other) noexcept {
108     latch_ = std::exchange(other.latch_, nullptr);
109     return *this;
110   }
~ClientInitialMetadataOutstandingToken()111   ~ClientInitialMetadataOutstandingToken() {
112     if (latch_ != nullptr) latch_->Set(false);
113   }
Complete(bool success)114   void Complete(bool success) { std::exchange(latch_, nullptr)->Set(success); }
115 
116   // Returns a promise that will resolve when this object (or its moved-from
117   // ancestor) is dropped.
Wait()118   auto Wait() { return latch_->Wait(); }
119 
120  private:
121   ClientInitialMetadataOutstandingToken() = default;
122 
123   Latch<bool>* latch_ = nullptr;
124 };
125 
126 using ClientInitialMetadataOutstandingTokenWaitType =
127     decltype(std::declval<ClientInitialMetadataOutstandingToken>().Wait());
128 
129 struct CallArgs {
130   // Initial metadata from the client to the server.
131   // During promise setup this can be manipulated by filters (and then
132   // passed on to the next filter).
133   ClientMetadataHandle client_initial_metadata;
134   // Token indicating that client_initial_metadata is still being processed.
135   // This should be moved around and only destroyed when the transport is
136   // satisfied that the metadata has passed any flow control measures it has.
137   ClientInitialMetadataOutstandingToken client_initial_metadata_outstanding;
138   // Latch that will ultimately contain the polling entity for the call.
139   // TODO(ctiller): remove once event engine lands
140   Latch<grpc_polling_entity>* polling_entity;
141   // Initial metadata from the server to the client.
142   // Set once when it's available.
143   // During promise setup filters can substitute their own latch for this
144   // and consequently intercept the sent value and mutate/observe it.
145   PipeSender<ServerMetadataHandle>* server_initial_metadata;
146   // Messages travelling from the application to the transport.
147   PipeReceiver<MessageHandle>* client_to_server_messages;
148   // Messages travelling from the transport to the application.
149   PipeSender<MessageHandle>* server_to_client_messages;
150 };
151 
152 using NextPromiseFactory =
153     std::function<ArenaPromise<ServerMetadataHandle>(CallArgs)>;
154 
155 }  // namespace grpc_core
156 
157 // forward declarations
158 
159 // grpc_stream doesn't actually exist. It's used as a typesafe
160 // opaque pointer for whatever data the transport wants to track
161 // for a stream.
162 typedef struct grpc_stream grpc_stream;
163 
164 typedef struct grpc_stream_refcount {
165   grpc_core::RefCount refs;
166   grpc_closure destroy;
167 #ifndef NDEBUG
168   const char* object_type;
169 #endif
170 } grpc_stream_refcount;
171 
172 #ifndef NDEBUG
173 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
174                           grpc_iomgr_cb_func cb, void* cb_arg,
175                           const char* object_type);
176 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
177   grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
178 #else
179 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
180                           grpc_iomgr_cb_func cb, void* cb_arg);
181 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
182   do {                                                    \
183     grpc_stream_ref_init(rc, ir, cb, cb_arg);             \
184     (void)(objtype);                                      \
185   } while (0)
186 #endif
187 
188 #ifndef NDEBUG
grpc_stream_ref(grpc_stream_refcount * refcount,const char * reason)189 inline void grpc_stream_ref(grpc_stream_refcount* refcount,
190                             const char* reason) {
191   GRPC_TRACE_VLOG(stream_refcount, 2)
192       << refcount->object_type << " " << refcount << ":"
193       << refcount->destroy.cb_arg << " REF " << reason;
194   refcount->refs.RefNonZero(DEBUG_LOCATION, reason);
195 }
196 #else
grpc_stream_ref(grpc_stream_refcount * refcount)197 inline void grpc_stream_ref(grpc_stream_refcount* refcount) {
198   refcount->refs.RefNonZero();
199 }
200 #endif
201 
202 void grpc_stream_destroy(grpc_stream_refcount* refcount);
203 
204 #ifndef NDEBUG
grpc_stream_unref(grpc_stream_refcount * refcount,const char * reason)205 inline void grpc_stream_unref(grpc_stream_refcount* refcount,
206                               const char* reason) {
207   GRPC_TRACE_VLOG(stream_refcount, 2)
208       << refcount->object_type << " " << refcount << ":"
209       << refcount->destroy.cb_arg << " UNREF " << reason;
210   if (GPR_UNLIKELY(refcount->refs.Unref(DEBUG_LOCATION, reason))) {
211     grpc_stream_destroy(refcount);
212   }
213 }
214 #else
grpc_stream_unref(grpc_stream_refcount * refcount)215 inline void grpc_stream_unref(grpc_stream_refcount* refcount) {
216   if (GPR_UNLIKELY(refcount->refs.Unref())) {
217     grpc_stream_destroy(refcount);
218   }
219 }
220 #endif
221 
222 // Wrap a buffer that is owned by some stream object into a slice that shares
223 // the same refcount
224 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
225                                                void* buffer, size_t length);
226 
227 // This struct (which is present in both grpc_transport_stream_op_batch
228 // and grpc_transport_op_batch) is a convenience to allow filters or
229 // transports to schedule a closure related to a particular batch without
230 // having to allocate memory.  The general pattern is to initialize the
231 // closure with the callback arg set to the batch and extra_arg set to
232 // whatever state is associated with the handler (e.g., the call element
233 // or the transport stream object).
234 //
235 // Note that this can only be used by the current handler of a given
236 // batch on the way down the stack (i.e., whichever filter or transport is
237 // currently handling the batch).  Once a filter or transport passes control
238 // of the batch to the next handler, it cannot depend on the contents of
239 // this struct anymore, because the next handler may reuse it.
240 struct grpc_handler_private_op_data {
241   void* extra_arg = nullptr;
242   grpc_closure closure;
grpc_handler_private_op_datagrpc_handler_private_op_data243   grpc_handler_private_op_data() { memset(&closure, 0, sizeof(closure)); }
244 };
245 
246 typedef struct grpc_transport_stream_op_batch_payload
247     grpc_transport_stream_op_batch_payload;
248 
249 // Transport stream op: a set of operations to perform on a transport
250 // against a single stream
251 struct grpc_transport_stream_op_batch {
grpc_transport_stream_op_batchgrpc_transport_stream_op_batch252   grpc_transport_stream_op_batch()
253       : send_initial_metadata(false),
254         send_trailing_metadata(false),
255         send_message(false),
256         recv_initial_metadata(false),
257         recv_message(false),
258         recv_trailing_metadata(false),
259         cancel_stream(false),
260         is_traced(false) {}
261 
262   /// Should be scheduled when all of the non-recv operations in the batch
263   /// are complete.
264 
265   /// The recv ops (recv_initial_metadata, recv_message, and
266   /// recv_trailing_metadata) each have their own callbacks.  If a batch
267   /// contains both recv ops and non-recv ops, on_complete should be
268   /// scheduled as soon as the non-recv ops are complete, regardless of
269   /// whether or not the recv ops are complete.  If a batch contains
270   /// only recv ops, on_complete can be null.
271   grpc_closure* on_complete = nullptr;
272 
273   /// Values for the stream op (fields set are determined by flags above)
274   grpc_transport_stream_op_batch_payload* payload = nullptr;
275 
276   /// Send initial metadata to the peer, from the provided metadata batch.
277   bool send_initial_metadata : 1;
278 
279   /// Send trailing metadata to the peer, from the provided metadata batch.
280   bool send_trailing_metadata : 1;
281 
282   /// Send message data to the peer, from the provided byte stream.
283   bool send_message : 1;
284 
285   /// Receive initial metadata from the stream, into provided metadata batch.
286   bool recv_initial_metadata : 1;
287 
288   /// Receive message data from the stream, into provided byte stream.
289   bool recv_message : 1;
290 
291   /// Receive trailing metadata from the stream, into provided metadata batch.
292   ///
293   bool recv_trailing_metadata : 1;
294 
295   /// Cancel this stream with the provided error
296   bool cancel_stream : 1;
297 
298   /// Is this stream traced
299   bool is_traced : 1;
300 
HasOpgrpc_transport_stream_op_batch301   bool HasOp() const {
302     return send_initial_metadata || send_trailing_metadata || send_message ||
303            recv_initial_metadata || recv_message || recv_trailing_metadata ||
304            cancel_stream;
305   }
306 
307   //**************************************************************************
308   // remaining fields are initialized and used at the discretion of the
309   // current handler of the op
310 
311   grpc_handler_private_op_data handler_private;
312 };
313 
314 struct grpc_transport_stream_op_batch_payload {
315   struct {
316     grpc_metadata_batch* send_initial_metadata = nullptr;
317   } send_initial_metadata;
318 
319   struct {
320     grpc_metadata_batch* send_trailing_metadata = nullptr;
321     // Set by the transport to true if the stream successfully wrote the
322     // trailing metadata. If this is not set but there was a send trailing
323     // metadata op present, this can indicate that a server call can be marked
324     // as  a cancellation (since the stream was write-closed before status could
325     // be delivered).
326     bool* sent = nullptr;
327   } send_trailing_metadata;
328 
329   struct {
330     // The transport (or a filter that decides to return a failure before
331     // the op gets down to the transport) takes ownership.
332     // The batch's on_complete will not be called until after the byte
333     // stream is orphaned.
334     grpc_core::SliceBuffer* send_message;
335     uint32_t flags = 0;
336     // Set by the transport if the stream has been closed for writes. If this
337     // is set and send message op is present, we set the operation to be a
338     // failure without sending a cancel OP down the stack. This is so that the
339     // status of the call does not get overwritten by the Cancel OP, which would
340     // be especially problematic if we had received a valid status from the
341     // server.
342     // For send_initial_metadata, it is fine for the status to be overwritten
343     // because at that point, the client will not have received a status.
344     // For send_trailing_metadata, we might overwrite the status if we have
345     // non-zero metadata to send. This is fine because the API does not allow
346     // the client to send trailing metadata.
347     bool stream_write_closed = false;
348   } send_message;
349 
350   struct {
351     grpc_metadata_batch* recv_initial_metadata = nullptr;
352     /// Should be enqueued when initial metadata is ready to be processed.
353     grpc_closure* recv_initial_metadata_ready = nullptr;
354     // If not NULL, will be set to true if trailing metadata is
355     // immediately available. This may be a signal that we received a
356     // Trailers-Only response. The retry filter checks this to know whether to
357     // defer the decision to commit the call or not. The C++ callback API also
358     // uses this to set the success flag of OnReadInitialMetadataDone()
359     // callback.
360     bool* trailing_metadata_available = nullptr;
361   } recv_initial_metadata;
362 
363   struct {
364     // Will be set by the transport to point to the byte stream containing a
365     // received message. Will be nullopt if trailing metadata is received
366     // instead of a message.
367     absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
368     uint32_t* flags = nullptr;
369     // Was this recv_message failed for reasons other than a clean end-of-stream
370     bool* call_failed_before_recv_message = nullptr;
371     /// Should be enqueued when one message is ready to be processed.
372     grpc_closure* recv_message_ready = nullptr;
373   } recv_message;
374 
375   struct {
376     grpc_metadata_batch* recv_trailing_metadata = nullptr;
377     grpc_transport_stream_stats* collect_stats = nullptr;
378     /// Should be enqueued when trailing metadata is ready to be processed.
379     grpc_closure* recv_trailing_metadata_ready = nullptr;
380   } recv_trailing_metadata;
381 
382   /// Forcefully close this stream.
383   /// The HTTP2 semantics should be:
384   /// - server side: if cancel_error has
385   /// grpc_core::StatusIntProperty::kRpcStatus, and trailing metadata has not
386   /// been sent, send trailing metadata with status and message from
387   /// cancel_error (use grpc_error_get_status) followed by a RST_STREAM with
388   /// error=GRPC_CHTTP2_NO_ERROR to force a full close
389   /// - at all other times: use grpc_error_get_status to get a status code, and
390   ///   convert to a HTTP2 error code using
391   ///   grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
392   ///   error.
393   struct {
394     // Error contract: the transport that gets this op must cause cancel_error
395     //                 to be unref'ed after processing it
396     grpc_error_handle cancel_error;
397     // If true the transport should endeavor to delay sending the cancellation
398     // notification for some small amount of time, in order to foil certain
399     // exploits.
400     // This should be set for cancellations that result from malformed client
401     // initial metadata.
402     bool tarpit = false;
403   } cancel_stream;
404 };
405 
406 /// Transport op: a set of operations to perform on a transport as a whole
407 typedef struct grpc_transport_op {
408   /// Called when processing of this op is done.
409   grpc_closure* on_consumed = nullptr;
410   /// connectivity monitoring - set connectivity_state to NULL to unsubscribe
411   grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>
412       start_connectivity_watch;
413   grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch =
414       nullptr;
415   /// should the transport be disconnected
416   /// Error contract: the transport that gets this op must cause
417   ///                disconnect_with_error to be unref'ed after processing it
418   grpc_error_handle disconnect_with_error;
419   /// what should the goaway contain?
420   /// Error contract: the transport that gets this op must cause
421   ///                goaway_error to be unref'ed after processing it
422   grpc_error_handle goaway_error;
423   void (*set_accept_stream_fn)(void* user_data, grpc_core::Transport* transport,
424                                const void* server_data) = nullptr;
425   void (*set_registered_method_matcher_fn)(
426       void* user_data, grpc_core::ServerMetadata* metadata) = nullptr;
427   void* set_accept_stream_user_data = nullptr;
428   void (*set_make_promise_fn)(void* user_data, grpc_core::Transport* transport,
429                               const void* server_data) = nullptr;
430   void* set_make_promise_user_data = nullptr;
431   /// add this transport to a pollset
432   grpc_pollset* bind_pollset = nullptr;
433   /// add this transport to a pollset_set
434   grpc_pollset_set* bind_pollset_set = nullptr;
435   /// send a ping, if either on_initiate or on_ack is not NULL
436   struct {
437     /// Ping may be delayed by the transport, on_initiate callback will be
438     /// called when the ping is actually being sent.
439     grpc_closure* on_initiate = nullptr;
440     /// Called when the ping ack is received
441     grpc_closure* on_ack = nullptr;
442   } send_ping;
443   grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
444   // If true, will reset the channel's connection backoff.
445   bool reset_connect_backoff = false;
446 
447   /// set the callback for accepting new streams;
448   /// this is a permanent callback, unlike the other one-shot closures.
449   /// If true, the callback is set to set_accept_stream_fn, with its
450   /// user_data argument set to set_accept_stream_user_data.
451   /// `set_registered_method_matcher_fn` is also set with its user_data argument
452   /// set to set_accept_stream_user_data. The transport should invoke
453   /// `set_registered_method_matcher_fn` after initial metadata is received but
454   /// before recv_initial_metadata_ready callback is invoked. If the transport
455   /// detects an error in the stream, invoking
456   /// `set_registered_method_matcher_fn` can be skipped.
457   bool set_accept_stream = false;
458 
459   /// set the callback for accepting new streams based upon promises;
460   /// this is a permanent callback, unlike the other one-shot closures.
461   /// If true, the callback is set to set_make_promise_fn, with its
462   /// user_data argument set to set_make_promise_data
463   bool set_make_promise = false;
464 
465   //**************************************************************************
466   // remaining fields are initialized and used at the discretion of the
467   // transport implementation
468 
469   grpc_handler_private_op_data handler_private;
470 } grpc_transport_op;
471 
472 // Allocate a grpc_transport_op, and preconfigure the on_complete closure to
473 // \a on_complete and then delete the returned transport op
474 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete);
475 // Allocate a grpc_transport_stream_op_batch, and preconfigure the on_complete
476 // closure
477 // to \a on_complete and then delete the returned transport op
478 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
479     grpc_closure* on_complete);
480 
481 void grpc_transport_stream_op_batch_finish_with_failure(
482     grpc_transport_stream_op_batch* batch, grpc_error_handle error,
483     grpc_core::CallCombiner* call_combiner);
484 void grpc_transport_stream_op_batch_queue_finish_with_failure(
485     grpc_transport_stream_op_batch* batch, grpc_error_handle error,
486     grpc_core::CallCombinerClosureList* closures);
487 // Fail a batch from within the transport (i.e. without the activity lock/call
488 // combiner taken).
489 void grpc_transport_stream_op_batch_finish_with_failure_from_transport(
490     grpc_transport_stream_op_batch* batch, grpc_error_handle error);
491 
492 std::string grpc_transport_stream_op_batch_string(
493     grpc_transport_stream_op_batch* op, bool truncate);
494 std::string grpc_transport_op_string(grpc_transport_op* op);
495 
496 namespace grpc_core {
497 
498 class FilterStackTransport;
499 class ClientTransport;
500 class ServerTransport;
501 
502 class Transport : public InternallyRefCounted<Transport> {
503  public:
504   struct RawPointerChannelArgTag {};
ChannelArgName()505   static absl::string_view ChannelArgName() { return GRPC_ARG_TRANSPORT; }
506 
507   using InternallyRefCounted<Transport>::InternallyRefCounted;
508 
509   // Though internally ref counted transports expose their "Ref" method to
510   // create a RefCountedPtr to themselves. The OrphanablePtr owner is the
511   // singleton decision maker on whether the transport should be destroyed or
512   // not.
513   // TODO(ctiller): consider moving to a DualRefCounted model (with the
514   // disadvantage that we would accidentally have many strong owners which is
515   // unnecessary for this type).
Ref()516   RefCountedPtr<Transport> Ref() {
517     return InternallyRefCounted<Transport>::Ref();
518   }
519   template <typename T>
RefAsSubclass()520   RefCountedPtr<T> RefAsSubclass() {
521     return InternallyRefCounted<Transport>::RefAsSubclass<T>();
522   }
523 
524   virtual FilterStackTransport* filter_stack_transport() = 0;
525   virtual ClientTransport* client_transport() = 0;
526   virtual ServerTransport* server_transport() = 0;
527 
528   // name of this transport implementation
529   virtual absl::string_view GetTransportName() const = 0;
530 
531   // implementation of grpc_transport_set_pollset
532   virtual void SetPollset(grpc_stream* stream, grpc_pollset* pollset) = 0;
533 
534   // implementation of grpc_transport_set_pollset
535   virtual void SetPollsetSet(grpc_stream* stream,
536                              grpc_pollset_set* pollset_set) = 0;
537 
538   void SetPollingEntity(grpc_stream* stream,
539                         grpc_polling_entity* pollset_or_pollset_set);
540 
541   // implementation of grpc_transport_perform_op
542   virtual void PerformOp(grpc_transport_op* op) = 0;
543 
StartConnectivityWatch(OrphanablePtr<ConnectivityStateWatcherInterface> watcher)544   void StartConnectivityWatch(
545       OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
546     grpc_transport_op* op = grpc_make_transport_op(nullptr);
547     op->start_connectivity_watch = std::move(watcher);
548     PerformOp(op);
549   }
550 
DisconnectWithError(grpc_error_handle error)551   void DisconnectWithError(grpc_error_handle error) {
552     CHECK(!error.ok()) << error;
553     grpc_transport_op* op = grpc_make_transport_op(nullptr);
554     op->disconnect_with_error = error;
555     PerformOp(op);
556   }
557 };
558 
559 class FilterStackTransport : public Transport {
560  public:
561   // Memory required for a single stream element - this is allocated by upper
562   // layers and initialized by the transport
563   virtual size_t SizeOfStream() const = 0;
564 
565   // Initialize transport data for a stream.
566   // Returns 0 on success, any other (transport-defined) value for failure.
567   // May assume that stream contains all-zeros.
568   // Arguments:
569   //   stream      - a pointer to uninitialized memory to initialize
570   //   server_data - either NULL for a client initiated stream, or a pointer
571   //                 supplied from the accept_stream callback function
572   virtual void InitStream(grpc_stream* stream, grpc_stream_refcount* refcount,
573                           const void* server_data, Arena* arena) = 0;
574 
575   // HACK: inproc does not handle stream op batch callbacks correctly (receive
576   // ops are required to complete prior to on_complete triggering).
577   // This flag is used to disable coalescing of batches in connected_channel for
578   // that specific transport.
579   // TODO(ctiller): This ought not be necessary once we have promises complete.
580   virtual bool HackyDisableStreamOpBatchCoalescingInConnectedChannel()
581       const = 0;
582 
583   virtual void PerformStreamOp(grpc_stream* stream,
584                                grpc_transport_stream_op_batch* op) = 0;
585 
586   // Destroy transport data for a stream.
587   // Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
588   // received by the up-layer. Must not be called in the same call stack as
589   // recv_frame.
590   // Arguments:
591   //   stream    - the grpc_stream to destroy (memory is still owned by the
592   //               caller, but any child memory must be cleaned up)
593   virtual void DestroyStream(grpc_stream* stream,
594                              grpc_closure* then_schedule_closure) = 0;
595 
596  protected:
597   ~FilterStackTransport() override = default;
598 };
599 
600 class ClientTransport : public Transport {
601  public:
602   using Transport::Transport;
603   virtual void StartCall(CallHandler call_handler) = 0;
604 
605  protected:
606   ~ClientTransport() override = default;
607 };
608 
609 class ServerTransport : public Transport {
610  public:
611   using Transport::Transport;
612   // Called once slightly after transport setup to register the accept function.
613   virtual void SetCallDestination(
614       RefCountedPtr<UnstartedCallDestination> unstarted_call_handler) = 0;
615 
616  protected:
617   ~ServerTransport() override = default;
618 };
619 
620 }  // namespace grpc_core
621 
622 namespace grpc_core {
623 // This is the key to be used for loading/storing keepalive_throttling in the
624 // absl::Status object.
625 constexpr const char* kKeepaliveThrottlingKey =
626     "grpc.internal.keepalive_throttling";
627 }  // namespace grpc_core
628 
629 #endif  // GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_H
630