1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H 20 #define GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stddef.h> 25 26 #include "src/core/lib/channel/context.h" 27 #include "src/core/lib/gpr/arena.h" 28 #include "src/core/lib/iomgr/call_combiner.h" 29 #include "src/core/lib/iomgr/endpoint.h" 30 #include "src/core/lib/iomgr/polling_entity.h" 31 #include "src/core/lib/iomgr/pollset.h" 32 #include "src/core/lib/iomgr/pollset_set.h" 33 #include "src/core/lib/transport/byte_stream.h" 34 #include "src/core/lib/transport/metadata_batch.h" 35 36 /* Minimum and maximum protocol accepted versions. */ 37 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2 38 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1 39 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2 40 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1 41 42 /* forward declarations */ 43 44 typedef struct grpc_transport grpc_transport; 45 46 /* grpc_stream doesn't actually exist. It's used as a typesafe 47 opaque pointer for whatever data the transport wants to track 48 for a stream. */ 49 typedef struct grpc_stream grpc_stream; 50 51 extern grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount; 52 53 typedef struct grpc_stream_refcount { 54 gpr_refcount refs; 55 grpc_closure destroy; 56 #ifndef NDEBUG 57 const char* object_type; 58 #endif 59 grpc_slice_refcount slice_refcount; 60 } grpc_stream_refcount; 61 62 #ifndef NDEBUG 63 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, 64 grpc_iomgr_cb_func cb, void* cb_arg, 65 const char* object_type); 66 void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason); 67 void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason); 68 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ 69 grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype) 70 #else 71 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, 72 grpc_iomgr_cb_func cb, void* cb_arg); 73 void grpc_stream_ref(grpc_stream_refcount* refcount); 74 void grpc_stream_unref(grpc_stream_refcount* refcount); 75 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ 76 grpc_stream_ref_init(rc, ir, cb, cb_arg) 77 #endif 78 79 /* Wrap a buffer that is owned by some stream object into a slice that shares 80 the same refcount */ 81 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount, 82 void* buffer, size_t length); 83 84 typedef struct { 85 uint64_t framing_bytes; 86 uint64_t data_bytes; 87 uint64_t header_bytes; 88 } grpc_transport_one_way_stats; 89 90 typedef struct grpc_transport_stream_stats { 91 grpc_transport_one_way_stats incoming; 92 grpc_transport_one_way_stats outgoing; 93 } grpc_transport_stream_stats; 94 95 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from, 96 grpc_transport_one_way_stats* to); 97 98 void grpc_transport_move_stats(grpc_transport_stream_stats* from, 99 grpc_transport_stream_stats* to); 100 101 // This struct (which is present in both grpc_transport_stream_op_batch 102 // and grpc_transport_op_batch) is a convenience to allow filters or 103 // transports to schedule a closure related to a particular batch without 104 // having to allocate memory. The general pattern is to initialize the 105 // closure with the callback arg set to the batch and extra_arg set to 106 // whatever state is associated with the handler (e.g., the call element 107 // or the transport stream object). 108 // 109 // Note that this can only be used by the current handler of a given 110 // batch on the way down the stack (i.e., whichever filter or transport is 111 // currently handling the batch). Once a filter or transport passes control 112 // of the batch to the next handler, it cannot depend on the contents of 113 // this struct anymore, because the next handler may reuse it. 114 typedef struct { 115 void* extra_arg; 116 grpc_closure closure; 117 } grpc_handler_private_op_data; 118 119 typedef struct grpc_transport_stream_op_batch_payload 120 grpc_transport_stream_op_batch_payload; 121 122 /* Transport stream op: a set of operations to perform on a transport 123 against a single stream */ 124 typedef struct grpc_transport_stream_op_batch { 125 /** Should be scheduled when all of the non-recv operations in the batch 126 are complete. 127 128 The recv ops (recv_initial_metadata, recv_message, and 129 recv_trailing_metadata) each have their own callbacks. If a batch 130 contains both recv ops and non-recv ops, on_complete should be 131 scheduled as soon as the non-recv ops are complete, regardless of 132 whether or not the recv ops are complete. If a batch contains 133 only recv ops, on_complete can be null. */ 134 grpc_closure* on_complete; 135 136 /** Values for the stream op (fields set are determined by flags above) */ 137 grpc_transport_stream_op_batch_payload* payload; 138 139 /** Send initial metadata to the peer, from the provided metadata batch. */ 140 bool send_initial_metadata : 1; 141 142 /** Send trailing metadata to the peer, from the provided metadata batch. */ 143 bool send_trailing_metadata : 1; 144 145 /** Send message data to the peer, from the provided byte stream. */ 146 bool send_message : 1; 147 148 /** Receive initial metadata from the stream, into provided metadata batch. */ 149 bool recv_initial_metadata : 1; 150 151 /** Receive message data from the stream, into provided byte stream. */ 152 bool recv_message : 1; 153 154 /** Receive trailing metadata from the stream, into provided metadata batch. 155 */ 156 bool recv_trailing_metadata : 1; 157 158 /** Cancel this stream with the provided error */ 159 bool cancel_stream : 1; 160 161 /*************************************************************************** 162 * remaining fields are initialized and used at the discretion of the 163 * current handler of the op */ 164 165 grpc_handler_private_op_data handler_private; 166 } grpc_transport_stream_op_batch; 167 168 struct grpc_transport_stream_op_batch_payload { 169 struct { 170 grpc_metadata_batch* send_initial_metadata; 171 /** Iff send_initial_metadata != NULL, flags associated with 172 send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ 173 uint32_t send_initial_metadata_flags; 174 // If non-NULL, will be set by the transport to the peer string (a char*). 175 // The transport retains ownership of the string. 176 // Note: This pointer may be used by the transport after the 177 // send_initial_metadata op is completed. It must remain valid 178 // until the call is destroyed. 179 gpr_atm* peer_string; 180 } send_initial_metadata; 181 182 struct { 183 grpc_metadata_batch* send_trailing_metadata; 184 } send_trailing_metadata; 185 186 struct { 187 // The transport (or a filter that decides to return a failure before 188 // the op gets down to the transport) takes ownership. 189 // The batch's on_complete will not be called until after the byte 190 // stream is orphaned. 191 grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message; 192 } send_message; 193 194 struct { 195 grpc_metadata_batch* recv_initial_metadata; 196 // Flags are used only on the server side. If non-null, will be set to 197 // a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to 198 // indicate if the call is idempotent). 199 uint32_t* recv_flags; 200 /** Should be enqueued when initial metadata is ready to be processed. */ 201 grpc_closure* recv_initial_metadata_ready; 202 // If not NULL, will be set to true if trailing metadata is 203 // immediately available. This may be a signal that we received a 204 // Trailers-Only response. 205 bool* trailing_metadata_available; 206 // If non-NULL, will be set by the transport to the peer string (a char*). 207 // The transport retains ownership of the string. 208 // Note: This pointer may be used by the transport after the 209 // recv_initial_metadata op is completed. It must remain valid 210 // until the call is destroyed. 211 gpr_atm* peer_string; 212 } recv_initial_metadata; 213 214 struct { 215 // Will be set by the transport to point to the byte stream 216 // containing a received message. 217 // Will be NULL if trailing metadata is received instead of a message. 218 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; 219 /** Should be enqueued when one message is ready to be processed. */ 220 grpc_closure* recv_message_ready; 221 } recv_message; 222 223 struct { 224 grpc_metadata_batch* recv_trailing_metadata; 225 grpc_transport_stream_stats* collect_stats; 226 /** Should be enqueued when initial metadata is ready to be processed. */ 227 grpc_closure* recv_trailing_metadata_ready; 228 } recv_trailing_metadata; 229 230 /** Forcefully close this stream. 231 The HTTP2 semantics should be: 232 - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and 233 trailing metadata has not been sent, send trailing metadata with status 234 and message from cancel_error (use grpc_error_get_status) followed by 235 a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close 236 - at all other times: use grpc_error_get_status to get a status code, and 237 convert to a HTTP2 error code using 238 grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this 239 error. */ 240 struct { 241 // Error contract: the transport that gets this op must cause cancel_error 242 // to be unref'ed after processing it 243 grpc_error* cancel_error; 244 } cancel_stream; 245 246 /* Indexes correspond to grpc_context_index enum values */ 247 grpc_call_context_element* context; 248 }; 249 250 /** Transport op: a set of operations to perform on a transport as a whole */ 251 typedef struct grpc_transport_op { 252 /** Called when processing of this op is done. */ 253 grpc_closure* on_consumed; 254 /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ 255 grpc_closure* on_connectivity_state_change; 256 grpc_connectivity_state* connectivity_state; 257 /** should the transport be disconnected 258 * Error contract: the transport that gets this op must cause 259 * disconnect_with_error to be unref'ed after processing it */ 260 grpc_error* disconnect_with_error; 261 /** what should the goaway contain? 262 * Error contract: the transport that gets this op must cause 263 * goaway_error to be unref'ed after processing it */ 264 grpc_error* goaway_error; 265 /** set the callback for accepting new streams; 266 this is a permanent callback, unlike the other one-shot closures. 267 If true, the callback is set to set_accept_stream_fn, with its 268 user_data argument set to set_accept_stream_user_data */ 269 bool set_accept_stream; 270 void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport, 271 const void* server_data); 272 void* set_accept_stream_user_data; 273 /** add this transport to a pollset */ 274 grpc_pollset* bind_pollset; 275 /** add this transport to a pollset_set */ 276 grpc_pollset_set* bind_pollset_set; 277 /** send a ping, if either on_initiate or on_ack is not NULL */ 278 struct { 279 /** Ping may be delayed by the transport, on_initiate callback will be 280 called when the ping is actually being sent. */ 281 grpc_closure* on_initiate; 282 /** Called when the ping ack is received */ 283 grpc_closure* on_ack; 284 } send_ping; 285 // If true, will reset the channel's connection backoff. 286 bool reset_connect_backoff; 287 288 /*************************************************************************** 289 * remaining fields are initialized and used at the discretion of the 290 * transport implementation */ 291 292 grpc_handler_private_op_data handler_private; 293 } grpc_transport_op; 294 295 /* Returns the amount of memory required to store a grpc_stream for this 296 transport */ 297 size_t grpc_transport_stream_size(grpc_transport* transport); 298 299 /* Initialize transport data for a stream. 300 301 Returns 0 on success, any other (transport-defined) value for failure. 302 May assume that stream contains all-zeros. 303 304 Arguments: 305 transport - the transport on which to create this stream 306 stream - a pointer to uninitialized memory to initialize 307 server_data - either NULL for a client initiated stream, or a pointer 308 supplied from the accept_stream callback function */ 309 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream, 310 grpc_stream_refcount* refcount, 311 const void* server_data, gpr_arena* arena); 312 313 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, 314 grpc_polling_entity* pollent); 315 316 /* Destroy transport data for a stream. 317 318 Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been 319 received by the up-layer. Must not be called in the same call stack as 320 recv_frame. 321 322 Arguments: 323 transport - the transport on which to create this stream 324 stream - the grpc_stream to destroy (memory is still owned by the 325 caller, but any child memory must be cleaned up) */ 326 void grpc_transport_destroy_stream(grpc_transport* transport, 327 grpc_stream* stream, 328 grpc_closure* then_schedule_closure); 329 330 void grpc_transport_stream_op_batch_finish_with_failure( 331 grpc_transport_stream_op_batch* op, grpc_error* error, 332 grpc_call_combiner* call_combiner); 333 334 char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op); 335 char* grpc_transport_op_string(grpc_transport_op* op); 336 337 /* Send a batch of operations on a transport 338 339 Takes ownership of any objects contained in ops. 340 341 Arguments: 342 transport - the transport on which to initiate the stream 343 stream - the stream on which to send the operations. This must be 344 non-NULL and previously initialized by the same transport. 345 op - a grpc_transport_stream_op_batch specifying the op to perform 346 */ 347 void grpc_transport_perform_stream_op(grpc_transport* transport, 348 grpc_stream* stream, 349 grpc_transport_stream_op_batch* op); 350 351 void grpc_transport_perform_op(grpc_transport* transport, 352 grpc_transport_op* op); 353 354 /* Send a ping on a transport 355 356 Calls cb with user data when a response is received. */ 357 void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb); 358 359 /* Advise peer of pending connection termination. */ 360 void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status, 361 grpc_slice debug_data); 362 363 /* Destroy the transport */ 364 void grpc_transport_destroy(grpc_transport* transport); 365 366 /* Get the endpoint used by \a transport */ 367 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport); 368 369 /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to 370 \a on_consumed and then delete the returned transport op */ 371 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed); 372 /* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed 373 closure 374 to \a on_consumed and then delete the returned transport op */ 375 grpc_transport_stream_op_batch* grpc_make_transport_stream_op( 376 grpc_closure* on_consumed); 377 378 #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ 379