• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
20 #define GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 
26 #include "src/core/lib/channel/context.h"
27 #include "src/core/lib/gprpp/arena.h"
28 #include "src/core/lib/gprpp/orphanable.h"
29 #include "src/core/lib/iomgr/call_combiner.h"
30 #include "src/core/lib/iomgr/endpoint.h"
31 #include "src/core/lib/iomgr/polling_entity.h"
32 #include "src/core/lib/iomgr/pollset.h"
33 #include "src/core/lib/iomgr/pollset_set.h"
34 #include "src/core/lib/slice/slice_internal.h"
35 #include "src/core/lib/transport/byte_stream.h"
36 #include "src/core/lib/transport/connectivity_state.h"
37 #include "src/core/lib/transport/metadata_batch.h"
38 
39 /* Minimum and maximum protocol accepted versions. */
40 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
41 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
42 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
43 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1
44 
45 /* forward declarations */
46 
47 typedef struct grpc_transport grpc_transport;
48 
49 /* grpc_stream doesn't actually exist. It's used as a typesafe
50    opaque pointer for whatever data the transport wants to track
51    for a stream. */
52 typedef struct grpc_stream grpc_stream;
53 
54 extern grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount;
55 
56 typedef struct grpc_stream_refcount {
57   grpc_core::RefCount refs;
58   grpc_closure destroy;
59 #ifndef NDEBUG
60   const char* object_type;
61 #endif
62   grpc_slice_refcount slice_refcount;
63 } grpc_stream_refcount;
64 
65 #ifndef NDEBUG
66 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
67                           grpc_iomgr_cb_func cb, void* cb_arg,
68                           const char* object_type);
69 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
70   grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
71 #else
72 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
73                           grpc_iomgr_cb_func cb, void* cb_arg);
74 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
75   do {                                                    \
76     grpc_stream_ref_init(rc, ir, cb, cb_arg);             \
77     (void)(objtype);                                      \
78   } while (0)
79 #endif
80 
81 #ifndef NDEBUG
grpc_stream_ref(grpc_stream_refcount * refcount,const char * reason)82 inline void grpc_stream_ref(grpc_stream_refcount* refcount,
83                             const char* reason) {
84   if (grpc_trace_stream_refcount.enabled()) {
85     gpr_log(GPR_DEBUG, "%s %p:%p REF %s", refcount->object_type, refcount,
86             refcount->destroy.cb_arg, reason);
87   }
88   refcount->refs.RefNonZero(DEBUG_LOCATION, reason);
89 }
90 #else
grpc_stream_ref(grpc_stream_refcount * refcount)91 inline void grpc_stream_ref(grpc_stream_refcount* refcount) {
92   refcount->refs.RefNonZero();
93 }
94 #endif
95 
96 void grpc_stream_destroy(grpc_stream_refcount* refcount);
97 
98 #ifndef NDEBUG
grpc_stream_unref(grpc_stream_refcount * refcount,const char * reason)99 inline void grpc_stream_unref(grpc_stream_refcount* refcount,
100                               const char* reason) {
101   if (grpc_trace_stream_refcount.enabled()) {
102     gpr_log(GPR_DEBUG, "%s %p:%p UNREF %s", refcount->object_type, refcount,
103             refcount->destroy.cb_arg, reason);
104   }
105   if (GPR_UNLIKELY(refcount->refs.Unref(DEBUG_LOCATION, reason))) {
106     grpc_stream_destroy(refcount);
107   }
108 }
109 #else
grpc_stream_unref(grpc_stream_refcount * refcount)110 inline void grpc_stream_unref(grpc_stream_refcount* refcount) {
111   if (GPR_UNLIKELY(refcount->refs.Unref())) {
112     grpc_stream_destroy(refcount);
113   }
114 }
115 #endif
116 
117 /* Wrap a buffer that is owned by some stream object into a slice that shares
118    the same refcount */
119 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
120                                                void* buffer, size_t length);
121 
122 struct grpc_transport_one_way_stats {
123   uint64_t framing_bytes = 0;
124   uint64_t data_bytes = 0;
125   uint64_t header_bytes = 0;
126 };
127 
128 struct grpc_transport_stream_stats {
129   grpc_transport_one_way_stats incoming;
130   grpc_transport_one_way_stats outgoing;
131 };
132 
133 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
134                                        grpc_transport_one_way_stats* to);
135 
136 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
137                                grpc_transport_stream_stats* to);
138 
139 // This struct (which is present in both grpc_transport_stream_op_batch
140 // and grpc_transport_op_batch) is a convenience to allow filters or
141 // transports to schedule a closure related to a particular batch without
142 // having to allocate memory.  The general pattern is to initialize the
143 // closure with the callback arg set to the batch and extra_arg set to
144 // whatever state is associated with the handler (e.g., the call element
145 // or the transport stream object).
146 //
147 // Note that this can only be used by the current handler of a given
148 // batch on the way down the stack (i.e., whichever filter or transport is
149 // currently handling the batch).  Once a filter or transport passes control
150 // of the batch to the next handler, it cannot depend on the contents of
151 // this struct anymore, because the next handler may reuse it.
152 struct grpc_handler_private_op_data {
153   void* extra_arg = nullptr;
154   grpc_closure closure;
grpc_handler_private_op_datagrpc_handler_private_op_data155   grpc_handler_private_op_data() { memset(&closure, 0, sizeof(closure)); }
156 };
157 
158 typedef struct grpc_transport_stream_op_batch_payload
159     grpc_transport_stream_op_batch_payload;
160 
161 /* Transport stream op: a set of operations to perform on a transport
162    against a single stream */
163 struct grpc_transport_stream_op_batch {
grpc_transport_stream_op_batchgrpc_transport_stream_op_batch164   grpc_transport_stream_op_batch()
165       : send_initial_metadata(false),
166         send_trailing_metadata(false),
167         send_message(false),
168         recv_initial_metadata(false),
169         recv_message(false),
170         recv_trailing_metadata(false),
171         cancel_stream(false),
172         is_traced(false) {}
173 
174   /** Should be scheduled when all of the non-recv operations in the batch
175       are complete.
176 
177       The recv ops (recv_initial_metadata, recv_message, and
178       recv_trailing_metadata) each have their own callbacks.  If a batch
179       contains both recv ops and non-recv ops, on_complete should be
180       scheduled as soon as the non-recv ops are complete, regardless of
181       whether or not the recv ops are complete.  If a batch contains
182       only recv ops, on_complete can be null. */
183   grpc_closure* on_complete = nullptr;
184 
185   /** Values for the stream op (fields set are determined by flags above) */
186   grpc_transport_stream_op_batch_payload* payload = nullptr;
187 
188   /** Send initial metadata to the peer, from the provided metadata batch. */
189   bool send_initial_metadata : 1;
190 
191   /** Send trailing metadata to the peer, from the provided metadata batch. */
192   bool send_trailing_metadata : 1;
193 
194   /** Send message data to the peer, from the provided byte stream. */
195   bool send_message : 1;
196 
197   /** Receive initial metadata from the stream, into provided metadata batch. */
198   bool recv_initial_metadata : 1;
199 
200   /** Receive message data from the stream, into provided byte stream. */
201   bool recv_message : 1;
202 
203   /** Receive trailing metadata from the stream, into provided metadata batch.
204    */
205   bool recv_trailing_metadata : 1;
206 
207   /** Cancel this stream with the provided error */
208   bool cancel_stream : 1;
209 
210   /** Is this stream traced */
211   bool is_traced : 1;
212 
213   /***************************************************************************
214    * remaining fields are initialized and used at the discretion of the
215    * current handler of the op */
216 
217   grpc_handler_private_op_data handler_private;
218 };
219 
220 struct grpc_transport_stream_op_batch_payload {
grpc_transport_stream_op_batch_payloadgrpc_transport_stream_op_batch_payload221   explicit grpc_transport_stream_op_batch_payload(
222       grpc_call_context_element* context)
223       : context(context) {}
~grpc_transport_stream_op_batch_payloadgrpc_transport_stream_op_batch_payload224   ~grpc_transport_stream_op_batch_payload() {
225     // We don't really own `send_message`, so release ownership and let the
226     // owner clean the data.
227     (void)send_message.send_message.release();
228   }
229 
230   struct {
231     grpc_metadata_batch* send_initial_metadata = nullptr;
232     /** Iff send_initial_metadata != NULL, flags associated with
233         send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
234     uint32_t send_initial_metadata_flags = 0;
235     // If non-NULL, will be set by the transport to the peer string (a char*).
236     // The transport retains ownership of the string.
237     // Note: This pointer may be used by the transport after the
238     // send_initial_metadata op is completed.  It must remain valid
239     // until the call is destroyed.
240     gpr_atm* peer_string = nullptr;
241   } send_initial_metadata;
242 
243   struct {
244     grpc_metadata_batch* send_trailing_metadata = nullptr;
245     // Set by the transport to true if the stream successfully wrote the
246     // trailing metadata. If this is not set but there was a send trailing
247     // metadata op present, this can indicate that a server call can be marked
248     // as  a cancellation (since the stream was write-closed before status could
249     // be delivered).
250     bool* sent = nullptr;
251   } send_trailing_metadata;
252 
253   struct {
254     // The transport (or a filter that decides to return a failure before
255     // the op gets down to the transport) takes ownership.
256     // The batch's on_complete will not be called until after the byte
257     // stream is orphaned.
258     grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
259     // Set by the transport if the stream has been closed for writes. If this
260     // is set and send message op is present, we set the operation to be a
261     // failure without sending a cancel OP down the stack. This is so that the
262     // status of the call does not get overwritten by the Cancel OP, which would
263     // be especially problematic if we had received a valid status from the
264     // server.
265     // For send_initial_metadata, it is fine for the status to be overwritten
266     // because at that point, the client will not have received a status.
267     // For send_trailing_metadata, we might overwrite the status if we have
268     // non-zero metadata to send. This is fine because the API does not allow
269     // the client to send trailing metadata.
270     bool stream_write_closed = false;
271   } send_message;
272 
273   struct {
274     grpc_metadata_batch* recv_initial_metadata = nullptr;
275     // Flags are used only on the server side.  If non-null, will be set to
276     // a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to
277     // indicate if the call is idempotent).
278     uint32_t* recv_flags = nullptr;
279     /** Should be enqueued when initial metadata is ready to be processed. */
280     grpc_closure* recv_initial_metadata_ready = nullptr;
281     // If not NULL, will be set to true if trailing metadata is
282     // immediately available.  This may be a signal that we received a
283     // Trailers-Only response.
284     bool* trailing_metadata_available = nullptr;
285     // If non-NULL, will be set by the transport to the peer string (a char*).
286     // The transport retains ownership of the string.
287     // Note: This pointer may be used by the transport after the
288     // recv_initial_metadata op is completed.  It must remain valid
289     // until the call is destroyed.
290     gpr_atm* peer_string = nullptr;
291   } recv_initial_metadata;
292 
293   struct {
294     // Will be set by the transport to point to the byte stream
295     // containing a received message.
296     // Will be NULL if trailing metadata is received instead of a message.
297     grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr;
298     /** Should be enqueued when one message is ready to be processed. */
299     grpc_closure* recv_message_ready = nullptr;
300   } recv_message;
301 
302   struct {
303     grpc_metadata_batch* recv_trailing_metadata = nullptr;
304     grpc_transport_stream_stats* collect_stats = nullptr;
305     /** Should be enqueued when trailing metadata is ready to be processed. */
306     grpc_closure* recv_trailing_metadata_ready = nullptr;
307   } recv_trailing_metadata;
308 
309   /** Forcefully close this stream.
310       The HTTP2 semantics should be:
311       - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
312         trailing metadata has not been sent, send trailing metadata with status
313         and message from cancel_error (use grpc_error_get_status) followed by
314         a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close
315       - at all other times: use grpc_error_get_status to get a status code, and
316         convert to a HTTP2 error code using
317         grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
318         error. */
319   struct {
320     // Error contract: the transport that gets this op must cause cancel_error
321     //                 to be unref'ed after processing it
322     grpc_error* cancel_error = GRPC_ERROR_NONE;
323   } cancel_stream;
324 
325   /* Indexes correspond to grpc_context_index enum values */
326   grpc_call_context_element* context;
327 };
328 
329 /** Transport op: a set of operations to perform on a transport as a whole */
330 typedef struct grpc_transport_op {
331   /** Called when processing of this op is done. */
332   grpc_closure* on_consumed = nullptr;
333   /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
334   grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>
335       start_connectivity_watch;
336   grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
337   grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch =
338       nullptr;
339   /** should the transport be disconnected
340    * Error contract: the transport that gets this op must cause
341    *                 disconnect_with_error to be unref'ed after processing it */
342   grpc_error* disconnect_with_error = nullptr;
343   /** what should the goaway contain?
344    * Error contract: the transport that gets this op must cause
345    *                 goaway_error to be unref'ed after processing it */
346   grpc_error* goaway_error = nullptr;
347   /** set the callback for accepting new streams;
348       this is a permanent callback, unlike the other one-shot closures.
349       If true, the callback is set to set_accept_stream_fn, with its
350       user_data argument set to set_accept_stream_user_data */
351   bool set_accept_stream = false;
352   void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
353                                const void* server_data) = nullptr;
354   void* set_accept_stream_user_data = nullptr;
355   /** add this transport to a pollset */
356   grpc_pollset* bind_pollset = nullptr;
357   /** add this transport to a pollset_set */
358   grpc_pollset_set* bind_pollset_set = nullptr;
359   /** send a ping, if either on_initiate or on_ack is not NULL */
360   struct {
361     /** Ping may be delayed by the transport, on_initiate callback will be
362         called when the ping is actually being sent. */
363     grpc_closure* on_initiate = nullptr;
364     /** Called when the ping ack is received */
365     grpc_closure* on_ack = nullptr;
366   } send_ping;
367   // If true, will reset the channel's connection backoff.
368   bool reset_connect_backoff = false;
369 
370   /***************************************************************************
371    * remaining fields are initialized and used at the discretion of the
372    * transport implementation */
373 
374   grpc_handler_private_op_data handler_private;
375 } grpc_transport_op;
376 
377 /* Returns the amount of memory required to store a grpc_stream for this
378    transport */
379 size_t grpc_transport_stream_size(grpc_transport* transport);
380 
381 /* Initialize transport data for a stream.
382 
383    Returns 0 on success, any other (transport-defined) value for failure.
384    May assume that stream contains all-zeros.
385 
386    Arguments:
387      transport   - the transport on which to create this stream
388      stream      - a pointer to uninitialized memory to initialize
389      server_data - either NULL for a client initiated stream, or a pointer
390                    supplied from the accept_stream callback function */
391 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
392                                grpc_stream_refcount* refcount,
393                                const void* server_data,
394                                grpc_core::Arena* arena);
395 
396 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
397                              grpc_polling_entity* pollent);
398 
399 /* Destroy transport data for a stream.
400 
401    Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
402    received by the up-layer. Must not be called in the same call stack as
403    recv_frame.
404 
405    Arguments:
406      transport - the transport on which to create this stream
407      stream    - the grpc_stream to destroy (memory is still owned by the
408                  caller, but any child memory must be cleaned up) */
409 void grpc_transport_destroy_stream(grpc_transport* transport,
410                                    grpc_stream* stream,
411                                    grpc_closure* then_schedule_closure);
412 
413 void grpc_transport_stream_op_batch_finish_with_failure(
414     grpc_transport_stream_op_batch* batch, grpc_error* error,
415     grpc_core::CallCombiner* call_combiner);
416 
417 std::string grpc_transport_stream_op_batch_string(
418     grpc_transport_stream_op_batch* op);
419 std::string grpc_transport_op_string(grpc_transport_op* op);
420 
421 /* Send a batch of operations on a transport
422 
423    Takes ownership of any objects contained in ops.
424 
425    Arguments:
426      transport - the transport on which to initiate the stream
427      stream    - the stream on which to send the operations. This must be
428                  non-NULL and previously initialized by the same transport.
429      op        - a grpc_transport_stream_op_batch specifying the op to perform
430    */
431 void grpc_transport_perform_stream_op(grpc_transport* transport,
432                                       grpc_stream* stream,
433                                       grpc_transport_stream_op_batch* op);
434 
435 void grpc_transport_perform_op(grpc_transport* transport,
436                                grpc_transport_op* op);
437 
438 /* Send a ping on a transport
439 
440    Calls cb with user data when a response is received. */
441 void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb);
442 
443 /* Advise peer of pending connection termination. */
444 void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
445                            grpc_slice debug_data);
446 
447 /* Destroy the transport */
448 void grpc_transport_destroy(grpc_transport* transport);
449 
450 /* Get the endpoint used by \a transport */
451 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport);
452 
453 /* Allocate a grpc_transport_op, and preconfigure the on_complete closure to
454    \a on_complete and then delete the returned transport op */
455 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete);
456 /* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_complete
457    closure
458    to \a on_complete and then delete the returned transport op */
459 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
460     grpc_closure* on_complete);
461 
462 namespace grpc_core {
463 // This is the key to be used for loading/storing keepalive_throttling in the
464 // absl::Status object.
465 constexpr const char* kKeepaliveThrottlingKey =
466     "grpc.internal.keepalive_throttling";
467 }  // namespace grpc_core
468 
469 #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */
470