1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H 20 #define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H 21 22 ////////////////////////////////////////////////////////////////////////////// 23 // IMPORTANT NOTE: 24 // 25 // When you update this API, please make the corresponding changes to 26 // the C++ API in src/cpp/common/channel_filter.{h,cc} 27 ////////////////////////////////////////////////////////////////////////////// 28 29 /* A channel filter defines how operations on a channel are implemented. 30 Channel filters are chained together to create full channels, and if those 31 chains are linear, then channel stacks provide a mechanism to minimize 32 allocations for that chain. 33 Call stacks are created by channel stacks and represent the per-call data 34 for that stack. 35 36 Implementations should take care of the following details for a batch - 37 1. Synchronization is achieved with a CallCombiner. View 38 src/core/lib/iomgr/call_combiner.h for more details. 39 2. If the filter wants to inject an error on the way down, it needs to call 40 grpc_transport_stream_op_batch_finish_with_failure from within the call 41 combiner. This will cause any batch callbacks to be called with that error. 42 3. If the filter wants to inject an error on the way up (from a callback), it 43 should also inject that error in the recv_trailing_metadata callback so that 44 it can have an effect on the call status. 45 */ 46 47 #include <grpc/support/port_platform.h> 48 49 #include <stddef.h> 50 51 #include <grpc/grpc.h> 52 #include <grpc/support/log.h> 53 #include <grpc/support/time.h> 54 55 #include "src/core/lib/debug/trace.h" 56 #include "src/core/lib/gpr/time_precise.h" 57 #include "src/core/lib/gprpp/arena.h" 58 #include "src/core/lib/iomgr/call_combiner.h" 59 #include "src/core/lib/iomgr/polling_entity.h" 60 #include "src/core/lib/transport/transport.h" 61 62 typedef struct grpc_channel_element grpc_channel_element; 63 typedef struct grpc_call_element grpc_call_element; 64 65 typedef struct grpc_channel_stack grpc_channel_stack; 66 typedef struct grpc_call_stack grpc_call_stack; 67 68 struct grpc_channel_element_args { 69 grpc_channel_stack* channel_stack; 70 const grpc_channel_args* channel_args; 71 /** Transport, iff it is known */ 72 grpc_transport* optional_transport; 73 int is_first; 74 int is_last; 75 }; 76 struct grpc_call_element_args { 77 grpc_call_stack* call_stack; 78 const void* server_transport_data; 79 grpc_call_context_element* context; 80 const grpc_slice& path; 81 gpr_cycle_counter start_time; 82 grpc_millis deadline; 83 grpc_core::Arena* arena; 84 grpc_core::CallCombiner* call_combiner; 85 }; 86 struct grpc_call_stats { 87 grpc_transport_stream_stats transport_stream_stats; 88 gpr_timespec latency; /* From call creating to enqueing of received status */ 89 }; 90 /** Information about the call upon completion. */ 91 struct grpc_call_final_info { 92 grpc_call_stats stats; 93 grpc_status_code final_status = GRPC_STATUS_OK; 94 const char* error_string = nullptr; 95 }; 96 97 /* Channel filters specify: 98 1. the amount of memory needed in the channel & call (via the sizeof_XXX 99 members) 100 2. functions to initialize and destroy channel & call data 101 (init_XXX, destroy_XXX) 102 3. functions to implement call operations and channel operations (call_op, 103 channel_op) 104 4. a name, which is useful when debugging 105 106 Members are laid out in approximate frequency of use order. */ 107 struct grpc_channel_filter { 108 /* Called to eg. send/receive data on a call. 109 See grpc_call_next_op on how to call the next element in the stack */ 110 void (*start_transport_stream_op_batch)(grpc_call_element* elem, 111 grpc_transport_stream_op_batch* op); 112 /* Called to handle channel level operations - e.g. new calls, or transport 113 closure. 114 See grpc_channel_next_op on how to call the next element in the stack */ 115 void (*start_transport_op)(grpc_channel_element* elem, grpc_transport_op* op); 116 117 /* sizeof(per call data) */ 118 size_t sizeof_call_data; 119 /* Initialize per call data. 120 elem is initialized at the start of the call, and elem->call_data is what 121 needs initializing. 122 The filter does not need to do any chaining. 123 server_transport_data is an opaque pointer. If it is NULL, this call is 124 on a client; if it is non-NULL, then it points to memory owned by the 125 transport and is on the server. Most filters want to ignore this 126 argument. 127 Implementations may assume that elem->call_data is all zeros. */ 128 grpc_error* (*init_call_elem)(grpc_call_element* elem, 129 const grpc_call_element_args* args); 130 void (*set_pollset_or_pollset_set)(grpc_call_element* elem, 131 grpc_polling_entity* pollent); 132 /* Destroy per call data. 133 The filter does not need to do any chaining. 134 The bottom filter of a stack will be passed a non-NULL pointer to 135 \a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when 136 destruction is complete. \a final_info contains data about the completed 137 call, mainly for reporting purposes. */ 138 void (*destroy_call_elem)(grpc_call_element* elem, 139 const grpc_call_final_info* final_info, 140 grpc_closure* then_schedule_closure); 141 142 /* sizeof(per channel data) */ 143 size_t sizeof_channel_data; 144 /* Initialize per-channel data. 145 elem is initialized at the creating of the channel, and elem->channel_data 146 is what needs initializing. 147 is_first, is_last designate this elements position in the stack, and are 148 useful for asserting correct configuration by upper layer code. 149 The filter does not need to do any chaining. 150 Implementations may assume that elem->channel_data is all zeros. */ 151 grpc_error* (*init_channel_elem)(grpc_channel_element* elem, 152 grpc_channel_element_args* args); 153 /* Destroy per channel data. 154 The filter does not need to do any chaining */ 155 void (*destroy_channel_elem)(grpc_channel_element* elem); 156 157 /* Implement grpc_channel_get_info() */ 158 void (*get_channel_info)(grpc_channel_element* elem, 159 const grpc_channel_info* channel_info); 160 161 /* The name of this filter */ 162 const char* name; 163 }; 164 /* A channel_element tracks its filter and the filter requested memory within 165 a channel allocation */ 166 struct grpc_channel_element { 167 const grpc_channel_filter* filter; 168 void* channel_data; 169 }; 170 171 /* A call_element tracks its filter, the filter requested memory within 172 a channel allocation, and the filter requested memory within a call 173 allocation */ 174 struct grpc_call_element { 175 const grpc_channel_filter* filter; 176 void* channel_data; 177 void* call_data; 178 }; 179 180 /* A channel stack tracks a set of related filters for one channel, and 181 guarantees they live within a single malloc() allocation */ 182 struct grpc_channel_stack { 183 grpc_stream_refcount refcount; 184 size_t count; 185 /* Memory required for a call stack (computed at channel stack 186 initialization) */ 187 size_t call_stack_size; 188 }; 189 190 /* A call stack tracks a set of related filters for one call, and guarantees 191 they live within a single malloc() allocation */ 192 struct grpc_call_stack { 193 /* shared refcount for this channel stack. 194 MUST be the first element: the underlying code calls destroy 195 with the address of the refcount, but higher layers prefer to think 196 about the address of the call stack itself. */ 197 grpc_stream_refcount refcount; 198 size_t count; 199 }; 200 201 /* Get a channel element given a channel stack and its index */ 202 grpc_channel_element* grpc_channel_stack_element(grpc_channel_stack* stack, 203 size_t i); 204 /* Get the last channel element in a channel stack */ 205 grpc_channel_element* grpc_channel_stack_last_element( 206 grpc_channel_stack* stack); 207 /* Get a call stack element given a call stack and an index */ 208 grpc_call_element* grpc_call_stack_element(grpc_call_stack* stack, size_t i); 209 210 /* Determine memory required for a channel stack containing a set of filters */ 211 size_t grpc_channel_stack_size(const grpc_channel_filter** filters, 212 size_t filter_count); 213 /* Initialize a channel stack given some filters */ 214 grpc_error* grpc_channel_stack_init( 215 int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, 216 const grpc_channel_filter** filters, size_t filter_count, 217 const grpc_channel_args* args, grpc_transport* optional_transport, 218 const char* name, grpc_channel_stack* stack); 219 /* Destroy a channel stack */ 220 void grpc_channel_stack_destroy(grpc_channel_stack* stack); 221 222 /* Initialize a call stack given a channel stack. transport_server_data is 223 expected to be NULL on a client, or an opaque transport owned pointer on the 224 server. */ 225 grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, 226 int initial_refs, grpc_iomgr_cb_func destroy, 227 void* destroy_arg, 228 const grpc_call_element_args* elem_args); 229 /* Set a pollset or a pollset_set for a call stack: must occur before the first 230 * op is started */ 231 void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack, 232 grpc_polling_entity* pollent); 233 234 #ifndef NDEBUG 235 #define GRPC_CALL_STACK_REF(call_stack, reason) \ 236 grpc_stream_ref(&(call_stack)->refcount, reason) 237 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \ 238 grpc_stream_unref(&(call_stack)->refcount, reason) 239 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ 240 grpc_stream_ref(&(channel_stack)->refcount, reason) 241 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \ 242 grpc_stream_unref(&(channel_stack)->refcount, reason) 243 #else 244 #define GRPC_CALL_STACK_REF(call_stack, reason) \ 245 do { \ 246 grpc_stream_ref(&(call_stack)->refcount); \ 247 (void)(reason); \ 248 } while (0); 249 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \ 250 do { \ 251 grpc_stream_unref(&(call_stack)->refcount); \ 252 (void)(reason); \ 253 } while (0); 254 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ 255 do { \ 256 grpc_stream_ref(&(channel_stack)->refcount); \ 257 (void)(reason); \ 258 } while (0); 259 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \ 260 do { \ 261 grpc_stream_unref(&(channel_stack)->refcount); \ 262 (void)(reason); \ 263 } while (0); 264 #endif 265 266 /* Destroy a call stack */ 267 void grpc_call_stack_destroy(grpc_call_stack* stack, 268 const grpc_call_final_info* final_info, 269 grpc_closure* then_schedule_closure); 270 271 /* Ignore set pollset{_set} - used by filters if they don't care about pollsets 272 * at all. Does nothing. */ 273 void grpc_call_stack_ignore_set_pollset_or_pollset_set( 274 grpc_call_element* elem, grpc_polling_entity* pollent); 275 /* Call the next operation in a call stack */ 276 void grpc_call_next_op(grpc_call_element* elem, 277 grpc_transport_stream_op_batch* op); 278 /* Call the next operation (depending on call directionality) in a channel 279 stack */ 280 void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op); 281 /* Pass through a request to get_channel_info() to the next child element */ 282 void grpc_channel_next_get_info(grpc_channel_element* elem, 283 const grpc_channel_info* channel_info); 284 285 /* Given the top element of a channel stack, get the channel stack itself */ 286 grpc_channel_stack* grpc_channel_stack_from_top_element( 287 grpc_channel_element* elem); 288 /* Given the top element of a call stack, get the call stack itself */ 289 grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem); 290 291 void grpc_call_log_op(const char* file, int line, gpr_log_severity severity, 292 grpc_call_element* elem, 293 grpc_transport_stream_op_batch* op); 294 295 extern grpc_core::TraceFlag grpc_trace_channel; 296 297 #define GRPC_CALL_LOG_OP(sev, elem, op) \ 298 do { \ 299 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) { \ 300 grpc_call_log_op(sev, elem, op); \ 301 } \ 302 } while (0) 303 304 #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H */ 305