1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_H
20 #define GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_H
21
22 #include <grpc/impl/connectivity_state.h>
23 #include <grpc/slice.h>
24 #include <grpc/status.h>
25 #include <grpc/support/port_platform.h>
26 #include <grpc/support/time.h>
27 #include <stddef.h>
28 #include <stdint.h>
29 #include <string.h>
30
31 #include <functional>
32 #include <string>
33 #include <utility>
34
35 #include "absl/functional/any_invocable.h"
36 #include "absl/log/log.h"
37 #include "absl/status/status.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 #include "src/core/lib/debug/trace.h"
41 #include "src/core/lib/iomgr/call_combiner.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/iomgr_fwd.h"
46 #include "src/core/lib/iomgr/polling_entity.h"
47 #include "src/core/lib/promise/arena_promise.h"
48 #include "src/core/lib/promise/context.h"
49 #include "src/core/lib/promise/latch.h"
50 #include "src/core/lib/promise/pipe.h"
51 #include "src/core/lib/resource_quota/arena.h"
52 #include "src/core/lib/slice/slice_buffer.h"
53 #include "src/core/lib/transport/call_destination.h"
54 #include "src/core/lib/transport/call_final_info.h"
55 #include "src/core/lib/transport/call_spine.h"
56 #include "src/core/lib/transport/connectivity_state.h"
57 #include "src/core/lib/transport/message.h"
58 #include "src/core/lib/transport/metadata.h"
59 #include "src/core/lib/transport/metadata_batch.h"
60 #include "src/core/lib/transport/transport_fwd.h"
61 #include "src/core/util/orphanable.h"
62 #include "src/core/util/ref_counted.h"
63
64 // Minimum and maximum protocol accepted versions.
65 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
66 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
67 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
68 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1
69
70 #define GRPC_ARG_TRANSPORT "grpc.internal.transport"
71
72 namespace grpc_core {
73
74 // Move only type that tracks call startup.
75 // Allows observation of when client_initial_metadata has been processed by the
76 // end of the local call stack.
77 // Interested observers can call Wait() to obtain a promise that will resolve
78 // when all local client_initial_metadata processing has completed.
79 // The result of this token is either true on successful completion, or false
80 // if the metadata was not sent.
81 // To set a successful completion, call Complete(true). For failure, call
82 // Complete(false).
83 // If Complete is not called, the destructor of a still held token will complete
84 // with failure.
85 // Transports should hold this token until client_initial_metadata has passed
86 // any flow control (eg MAX_CONCURRENT_STREAMS for http2).
87 class ClientInitialMetadataOutstandingToken {
88 public:
Empty()89 static ClientInitialMetadataOutstandingToken Empty() {
90 return ClientInitialMetadataOutstandingToken();
91 }
92 static ClientInitialMetadataOutstandingToken New(
93 Arena* arena = GetContext<Arena>()) {
94 ClientInitialMetadataOutstandingToken token;
95 token.latch_ = arena->New<Latch<bool>>();
96 return token;
97 }
98
99 ClientInitialMetadataOutstandingToken(
100 const ClientInitialMetadataOutstandingToken&) = delete;
101 ClientInitialMetadataOutstandingToken& operator=(
102 const ClientInitialMetadataOutstandingToken&) = delete;
ClientInitialMetadataOutstandingToken(ClientInitialMetadataOutstandingToken && other)103 ClientInitialMetadataOutstandingToken(
104 ClientInitialMetadataOutstandingToken&& other) noexcept
105 : latch_(std::exchange(other.latch_, nullptr)) {}
106 ClientInitialMetadataOutstandingToken& operator=(
107 ClientInitialMetadataOutstandingToken&& other) noexcept {
108 latch_ = std::exchange(other.latch_, nullptr);
109 return *this;
110 }
~ClientInitialMetadataOutstandingToken()111 ~ClientInitialMetadataOutstandingToken() {
112 if (latch_ != nullptr) latch_->Set(false);
113 }
Complete(bool success)114 void Complete(bool success) { std::exchange(latch_, nullptr)->Set(success); }
115
116 // Returns a promise that will resolve when this object (or its moved-from
117 // ancestor) is dropped.
Wait()118 auto Wait() { return latch_->Wait(); }
119
120 private:
121 ClientInitialMetadataOutstandingToken() = default;
122
123 Latch<bool>* latch_ = nullptr;
124 };
125
126 using ClientInitialMetadataOutstandingTokenWaitType =
127 decltype(std::declval<ClientInitialMetadataOutstandingToken>().Wait());
128
129 struct CallArgs {
130 // Initial metadata from the client to the server.
131 // During promise setup this can be manipulated by filters (and then
132 // passed on to the next filter).
133 ClientMetadataHandle client_initial_metadata;
134 // Token indicating that client_initial_metadata is still being processed.
135 // This should be moved around and only destroyed when the transport is
136 // satisfied that the metadata has passed any flow control measures it has.
137 ClientInitialMetadataOutstandingToken client_initial_metadata_outstanding;
138 // Latch that will ultimately contain the polling entity for the call.
139 // TODO(ctiller): remove once event engine lands
140 Latch<grpc_polling_entity>* polling_entity;
141 // Initial metadata from the server to the client.
142 // Set once when it's available.
143 // During promise setup filters can substitute their own latch for this
144 // and consequently intercept the sent value and mutate/observe it.
145 PipeSender<ServerMetadataHandle>* server_initial_metadata;
146 // Messages travelling from the application to the transport.
147 PipeReceiver<MessageHandle>* client_to_server_messages;
148 // Messages travelling from the transport to the application.
149 PipeSender<MessageHandle>* server_to_client_messages;
150 };
151
152 using NextPromiseFactory =
153 std::function<ArenaPromise<ServerMetadataHandle>(CallArgs)>;
154
155 } // namespace grpc_core
156
157 // forward declarations
158
159 // grpc_stream doesn't actually exist. It's used as a typesafe
160 // opaque pointer for whatever data the transport wants to track
161 // for a stream.
162 typedef struct grpc_stream grpc_stream;
163
164 typedef struct grpc_stream_refcount {
165 grpc_core::RefCount refs;
166 grpc_closure destroy;
167 #ifndef NDEBUG
168 const char* object_type;
169 #endif
170 } grpc_stream_refcount;
171
172 #ifndef NDEBUG
173 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
174 grpc_iomgr_cb_func cb, void* cb_arg,
175 const char* object_type);
176 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
177 grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
178 #else
179 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
180 grpc_iomgr_cb_func cb, void* cb_arg);
181 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
182 do { \
183 grpc_stream_ref_init(rc, ir, cb, cb_arg); \
184 (void)(objtype); \
185 } while (0)
186 #endif
187
188 #ifndef NDEBUG
grpc_stream_ref(grpc_stream_refcount * refcount,const char * reason)189 inline void grpc_stream_ref(grpc_stream_refcount* refcount,
190 const char* reason) {
191 GRPC_TRACE_VLOG(stream_refcount, 2)
192 << refcount->object_type << " " << refcount << ":"
193 << refcount->destroy.cb_arg << " REF " << reason;
194 refcount->refs.RefNonZero(DEBUG_LOCATION, reason);
195 }
196 #else
grpc_stream_ref(grpc_stream_refcount * refcount)197 inline void grpc_stream_ref(grpc_stream_refcount* refcount) {
198 refcount->refs.RefNonZero();
199 }
200 #endif
201
202 void grpc_stream_destroy(grpc_stream_refcount* refcount);
203
204 #ifndef NDEBUG
grpc_stream_unref(grpc_stream_refcount * refcount,const char * reason)205 inline void grpc_stream_unref(grpc_stream_refcount* refcount,
206 const char* reason) {
207 GRPC_TRACE_VLOG(stream_refcount, 2)
208 << refcount->object_type << " " << refcount << ":"
209 << refcount->destroy.cb_arg << " UNREF " << reason;
210 if (GPR_UNLIKELY(refcount->refs.Unref(DEBUG_LOCATION, reason))) {
211 grpc_stream_destroy(refcount);
212 }
213 }
214 #else
grpc_stream_unref(grpc_stream_refcount * refcount)215 inline void grpc_stream_unref(grpc_stream_refcount* refcount) {
216 if (GPR_UNLIKELY(refcount->refs.Unref())) {
217 grpc_stream_destroy(refcount);
218 }
219 }
220 #endif
221
222 // Wrap a buffer that is owned by some stream object into a slice that shares
223 // the same refcount
224 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
225 void* buffer, size_t length);
226
227 // This struct (which is present in both grpc_transport_stream_op_batch
228 // and grpc_transport_op_batch) is a convenience to allow filters or
229 // transports to schedule a closure related to a particular batch without
230 // having to allocate memory. The general pattern is to initialize the
231 // closure with the callback arg set to the batch and extra_arg set to
232 // whatever state is associated with the handler (e.g., the call element
233 // or the transport stream object).
234 //
235 // Note that this can only be used by the current handler of a given
236 // batch on the way down the stack (i.e., whichever filter or transport is
237 // currently handling the batch). Once a filter or transport passes control
238 // of the batch to the next handler, it cannot depend on the contents of
239 // this struct anymore, because the next handler may reuse it.
240 struct grpc_handler_private_op_data {
241 void* extra_arg = nullptr;
242 grpc_closure closure;
grpc_handler_private_op_datagrpc_handler_private_op_data243 grpc_handler_private_op_data() { memset(&closure, 0, sizeof(closure)); }
244 };
245
246 typedef struct grpc_transport_stream_op_batch_payload
247 grpc_transport_stream_op_batch_payload;
248
249 // Transport stream op: a set of operations to perform on a transport
250 // against a single stream
251 struct grpc_transport_stream_op_batch {
grpc_transport_stream_op_batchgrpc_transport_stream_op_batch252 grpc_transport_stream_op_batch()
253 : send_initial_metadata(false),
254 send_trailing_metadata(false),
255 send_message(false),
256 recv_initial_metadata(false),
257 recv_message(false),
258 recv_trailing_metadata(false),
259 cancel_stream(false),
260 is_traced(false) {}
261
262 /// Should be scheduled when all of the non-recv operations in the batch
263 /// are complete.
264
265 /// The recv ops (recv_initial_metadata, recv_message, and
266 /// recv_trailing_metadata) each have their own callbacks. If a batch
267 /// contains both recv ops and non-recv ops, on_complete should be
268 /// scheduled as soon as the non-recv ops are complete, regardless of
269 /// whether or not the recv ops are complete. If a batch contains
270 /// only recv ops, on_complete can be null.
271 grpc_closure* on_complete = nullptr;
272
273 /// Values for the stream op (fields set are determined by flags above)
274 grpc_transport_stream_op_batch_payload* payload = nullptr;
275
276 /// Send initial metadata to the peer, from the provided metadata batch.
277 bool send_initial_metadata : 1;
278
279 /// Send trailing metadata to the peer, from the provided metadata batch.
280 bool send_trailing_metadata : 1;
281
282 /// Send message data to the peer, from the provided byte stream.
283 bool send_message : 1;
284
285 /// Receive initial metadata from the stream, into provided metadata batch.
286 bool recv_initial_metadata : 1;
287
288 /// Receive message data from the stream, into provided byte stream.
289 bool recv_message : 1;
290
291 /// Receive trailing metadata from the stream, into provided metadata batch.
292 ///
293 bool recv_trailing_metadata : 1;
294
295 /// Cancel this stream with the provided error
296 bool cancel_stream : 1;
297
298 /// Is this stream traced
299 bool is_traced : 1;
300
HasOpgrpc_transport_stream_op_batch301 bool HasOp() const {
302 return send_initial_metadata || send_trailing_metadata || send_message ||
303 recv_initial_metadata || recv_message || recv_trailing_metadata ||
304 cancel_stream;
305 }
306
307 //**************************************************************************
308 // remaining fields are initialized and used at the discretion of the
309 // current handler of the op
310
311 grpc_handler_private_op_data handler_private;
312 };
313
314 struct grpc_transport_stream_op_batch_payload {
315 struct {
316 grpc_metadata_batch* send_initial_metadata = nullptr;
317 } send_initial_metadata;
318
319 struct {
320 grpc_metadata_batch* send_trailing_metadata = nullptr;
321 // Set by the transport to true if the stream successfully wrote the
322 // trailing metadata. If this is not set but there was a send trailing
323 // metadata op present, this can indicate that a server call can be marked
324 // as a cancellation (since the stream was write-closed before status could
325 // be delivered).
326 bool* sent = nullptr;
327 } send_trailing_metadata;
328
329 struct {
330 // The transport (or a filter that decides to return a failure before
331 // the op gets down to the transport) takes ownership.
332 // The batch's on_complete will not be called until after the byte
333 // stream is orphaned.
334 grpc_core::SliceBuffer* send_message;
335 uint32_t flags = 0;
336 // Set by the transport if the stream has been closed for writes. If this
337 // is set and send message op is present, we set the operation to be a
338 // failure without sending a cancel OP down the stack. This is so that the
339 // status of the call does not get overwritten by the Cancel OP, which would
340 // be especially problematic if we had received a valid status from the
341 // server.
342 // For send_initial_metadata, it is fine for the status to be overwritten
343 // because at that point, the client will not have received a status.
344 // For send_trailing_metadata, we might overwrite the status if we have
345 // non-zero metadata to send. This is fine because the API does not allow
346 // the client to send trailing metadata.
347 bool stream_write_closed = false;
348 } send_message;
349
350 struct {
351 grpc_metadata_batch* recv_initial_metadata = nullptr;
352 /// Should be enqueued when initial metadata is ready to be processed.
353 grpc_closure* recv_initial_metadata_ready = nullptr;
354 // If not NULL, will be set to true if trailing metadata is
355 // immediately available. This may be a signal that we received a
356 // Trailers-Only response. The retry filter checks this to know whether to
357 // defer the decision to commit the call or not. The C++ callback API also
358 // uses this to set the success flag of OnReadInitialMetadataDone()
359 // callback.
360 bool* trailing_metadata_available = nullptr;
361 } recv_initial_metadata;
362
363 struct {
364 // Will be set by the transport to point to the byte stream containing a
365 // received message. Will be nullopt if trailing metadata is received
366 // instead of a message.
367 absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
368 uint32_t* flags = nullptr;
369 // Was this recv_message failed for reasons other than a clean end-of-stream
370 bool* call_failed_before_recv_message = nullptr;
371 /// Should be enqueued when one message is ready to be processed.
372 grpc_closure* recv_message_ready = nullptr;
373 } recv_message;
374
375 struct {
376 grpc_metadata_batch* recv_trailing_metadata = nullptr;
377 grpc_transport_stream_stats* collect_stats = nullptr;
378 /// Should be enqueued when trailing metadata is ready to be processed.
379 grpc_closure* recv_trailing_metadata_ready = nullptr;
380 } recv_trailing_metadata;
381
382 /// Forcefully close this stream.
383 /// The HTTP2 semantics should be:
384 /// - server side: if cancel_error has
385 /// grpc_core::StatusIntProperty::kRpcStatus, and trailing metadata has not
386 /// been sent, send trailing metadata with status and message from
387 /// cancel_error (use grpc_error_get_status) followed by a RST_STREAM with
388 /// error=GRPC_CHTTP2_NO_ERROR to force a full close
389 /// - at all other times: use grpc_error_get_status to get a status code, and
390 /// convert to a HTTP2 error code using
391 /// grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
392 /// error.
393 struct {
394 // Error contract: the transport that gets this op must cause cancel_error
395 // to be unref'ed after processing it
396 grpc_error_handle cancel_error;
397 // If true the transport should endeavor to delay sending the cancellation
398 // notification for some small amount of time, in order to foil certain
399 // exploits.
400 // This should be set for cancellations that result from malformed client
401 // initial metadata.
402 bool tarpit = false;
403 } cancel_stream;
404 };
405
406 /// Transport op: a set of operations to perform on a transport as a whole
407 typedef struct grpc_transport_op {
408 /// Called when processing of this op is done.
409 grpc_closure* on_consumed = nullptr;
410 /// connectivity monitoring - set connectivity_state to NULL to unsubscribe
411 grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>
412 start_connectivity_watch;
413 grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch =
414 nullptr;
415 /// should the transport be disconnected
416 /// Error contract: the transport that gets this op must cause
417 /// disconnect_with_error to be unref'ed after processing it
418 grpc_error_handle disconnect_with_error;
419 /// what should the goaway contain?
420 /// Error contract: the transport that gets this op must cause
421 /// goaway_error to be unref'ed after processing it
422 grpc_error_handle goaway_error;
423 void (*set_accept_stream_fn)(void* user_data, grpc_core::Transport* transport,
424 const void* server_data) = nullptr;
425 void (*set_registered_method_matcher_fn)(
426 void* user_data, grpc_core::ServerMetadata* metadata) = nullptr;
427 void* set_accept_stream_user_data = nullptr;
428 void (*set_make_promise_fn)(void* user_data, grpc_core::Transport* transport,
429 const void* server_data) = nullptr;
430 void* set_make_promise_user_data = nullptr;
431 /// add this transport to a pollset
432 grpc_pollset* bind_pollset = nullptr;
433 /// add this transport to a pollset_set
434 grpc_pollset_set* bind_pollset_set = nullptr;
435 /// send a ping, if either on_initiate or on_ack is not NULL
436 struct {
437 /// Ping may be delayed by the transport, on_initiate callback will be
438 /// called when the ping is actually being sent.
439 grpc_closure* on_initiate = nullptr;
440 /// Called when the ping ack is received
441 grpc_closure* on_ack = nullptr;
442 } send_ping;
443 grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
444 // If true, will reset the channel's connection backoff.
445 bool reset_connect_backoff = false;
446
447 /// set the callback for accepting new streams;
448 /// this is a permanent callback, unlike the other one-shot closures.
449 /// If true, the callback is set to set_accept_stream_fn, with its
450 /// user_data argument set to set_accept_stream_user_data.
451 /// `set_registered_method_matcher_fn` is also set with its user_data argument
452 /// set to set_accept_stream_user_data. The transport should invoke
453 /// `set_registered_method_matcher_fn` after initial metadata is received but
454 /// before recv_initial_metadata_ready callback is invoked. If the transport
455 /// detects an error in the stream, invoking
456 /// `set_registered_method_matcher_fn` can be skipped.
457 bool set_accept_stream = false;
458
459 /// set the callback for accepting new streams based upon promises;
460 /// this is a permanent callback, unlike the other one-shot closures.
461 /// If true, the callback is set to set_make_promise_fn, with its
462 /// user_data argument set to set_make_promise_data
463 bool set_make_promise = false;
464
465 //**************************************************************************
466 // remaining fields are initialized and used at the discretion of the
467 // transport implementation
468
469 grpc_handler_private_op_data handler_private;
470 } grpc_transport_op;
471
472 // Allocate a grpc_transport_op, and preconfigure the on_complete closure to
473 // \a on_complete and then delete the returned transport op
474 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete);
475 // Allocate a grpc_transport_stream_op_batch, and preconfigure the on_complete
476 // closure
477 // to \a on_complete and then delete the returned transport op
478 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
479 grpc_closure* on_complete);
480
481 void grpc_transport_stream_op_batch_finish_with_failure(
482 grpc_transport_stream_op_batch* batch, grpc_error_handle error,
483 grpc_core::CallCombiner* call_combiner);
484 void grpc_transport_stream_op_batch_queue_finish_with_failure(
485 grpc_transport_stream_op_batch* batch, grpc_error_handle error,
486 grpc_core::CallCombinerClosureList* closures);
487 // Fail a batch from within the transport (i.e. without the activity lock/call
488 // combiner taken).
489 void grpc_transport_stream_op_batch_finish_with_failure_from_transport(
490 grpc_transport_stream_op_batch* batch, grpc_error_handle error);
491
492 std::string grpc_transport_stream_op_batch_string(
493 grpc_transport_stream_op_batch* op, bool truncate);
494 std::string grpc_transport_op_string(grpc_transport_op* op);
495
496 namespace grpc_core {
497
498 class FilterStackTransport;
499 class ClientTransport;
500 class ServerTransport;
501
502 class Transport : public InternallyRefCounted<Transport> {
503 public:
504 struct RawPointerChannelArgTag {};
ChannelArgName()505 static absl::string_view ChannelArgName() { return GRPC_ARG_TRANSPORT; }
506
507 using InternallyRefCounted<Transport>::InternallyRefCounted;
508
509 // Though internally ref counted transports expose their "Ref" method to
510 // create a RefCountedPtr to themselves. The OrphanablePtr owner is the
511 // singleton decision maker on whether the transport should be destroyed or
512 // not.
513 // TODO(ctiller): consider moving to a DualRefCounted model (with the
514 // disadvantage that we would accidentally have many strong owners which is
515 // unnecessary for this type).
Ref()516 RefCountedPtr<Transport> Ref() {
517 return InternallyRefCounted<Transport>::Ref();
518 }
519 template <typename T>
RefAsSubclass()520 RefCountedPtr<T> RefAsSubclass() {
521 return InternallyRefCounted<Transport>::RefAsSubclass<T>();
522 }
523
524 virtual FilterStackTransport* filter_stack_transport() = 0;
525 virtual ClientTransport* client_transport() = 0;
526 virtual ServerTransport* server_transport() = 0;
527
528 // name of this transport implementation
529 virtual absl::string_view GetTransportName() const = 0;
530
531 // implementation of grpc_transport_set_pollset
532 virtual void SetPollset(grpc_stream* stream, grpc_pollset* pollset) = 0;
533
534 // implementation of grpc_transport_set_pollset
535 virtual void SetPollsetSet(grpc_stream* stream,
536 grpc_pollset_set* pollset_set) = 0;
537
538 void SetPollingEntity(grpc_stream* stream,
539 grpc_polling_entity* pollset_or_pollset_set);
540
541 // implementation of grpc_transport_perform_op
542 virtual void PerformOp(grpc_transport_op* op) = 0;
543
StartConnectivityWatch(OrphanablePtr<ConnectivityStateWatcherInterface> watcher)544 void StartConnectivityWatch(
545 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
546 grpc_transport_op* op = grpc_make_transport_op(nullptr);
547 op->start_connectivity_watch = std::move(watcher);
548 PerformOp(op);
549 }
550
DisconnectWithError(grpc_error_handle error)551 void DisconnectWithError(grpc_error_handle error) {
552 CHECK(!error.ok()) << error;
553 grpc_transport_op* op = grpc_make_transport_op(nullptr);
554 op->disconnect_with_error = error;
555 PerformOp(op);
556 }
557 };
558
559 class FilterStackTransport : public Transport {
560 public:
561 // Memory required for a single stream element - this is allocated by upper
562 // layers and initialized by the transport
563 virtual size_t SizeOfStream() const = 0;
564
565 // Initialize transport data for a stream.
566 // Returns 0 on success, any other (transport-defined) value for failure.
567 // May assume that stream contains all-zeros.
568 // Arguments:
569 // stream - a pointer to uninitialized memory to initialize
570 // server_data - either NULL for a client initiated stream, or a pointer
571 // supplied from the accept_stream callback function
572 virtual void InitStream(grpc_stream* stream, grpc_stream_refcount* refcount,
573 const void* server_data, Arena* arena) = 0;
574
575 // HACK: inproc does not handle stream op batch callbacks correctly (receive
576 // ops are required to complete prior to on_complete triggering).
577 // This flag is used to disable coalescing of batches in connected_channel for
578 // that specific transport.
579 // TODO(ctiller): This ought not be necessary once we have promises complete.
580 virtual bool HackyDisableStreamOpBatchCoalescingInConnectedChannel()
581 const = 0;
582
583 virtual void PerformStreamOp(grpc_stream* stream,
584 grpc_transport_stream_op_batch* op) = 0;
585
586 // Destroy transport data for a stream.
587 // Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
588 // received by the up-layer. Must not be called in the same call stack as
589 // recv_frame.
590 // Arguments:
591 // stream - the grpc_stream to destroy (memory is still owned by the
592 // caller, but any child memory must be cleaned up)
593 virtual void DestroyStream(grpc_stream* stream,
594 grpc_closure* then_schedule_closure) = 0;
595
596 protected:
597 ~FilterStackTransport() override = default;
598 };
599
600 class ClientTransport : public Transport {
601 public:
602 using Transport::Transport;
603 virtual void StartCall(CallHandler call_handler) = 0;
604
605 protected:
606 ~ClientTransport() override = default;
607 };
608
609 class ServerTransport : public Transport {
610 public:
611 using Transport::Transport;
612 // Called once slightly after transport setup to register the accept function.
613 virtual void SetCallDestination(
614 RefCountedPtr<UnstartedCallDestination> unstarted_call_handler) = 0;
615
616 protected:
617 ~ServerTransport() override = default;
618 };
619
620 } // namespace grpc_core
621
622 namespace grpc_core {
623 // This is the key to be used for loading/storing keepalive_throttling in the
624 // absl::Status object.
625 constexpr const char* kKeepaliveThrottlingKey =
626 "grpc.internal.keepalive_throttling";
627 } // namespace grpc_core
628
629 #endif // GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_H
630