• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/transport.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29 
30 #include "src/core/lib/gpr/alloc.h"
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/gprpp/memory.h"
33 #include "src/core/lib/iomgr/executor.h"
34 #include "src/core/lib/iomgr/iomgr.h"
35 #include "src/core/lib/slice/slice_internal.h"
36 #include "src/core/lib/slice/slice_string_helpers.h"
37 #include "src/core/lib/transport/transport_impl.h"
38 
39 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false,
40                                                          "stream_refcount");
41 
grpc_stream_destroy(grpc_stream_refcount * refcount)42 void grpc_stream_destroy(grpc_stream_refcount* refcount) {
43   if (!grpc_iomgr_is_any_background_poller_thread() &&
44       (grpc_core::ExecCtx::Get()->flags() &
45        GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
46     /* Ick.
47        The thread we're running on MAY be owned (indirectly) by a call-stack.
48        If that's the case, destroying the call-stack MAY try to destroy the
49        thread, which is a tangled mess that we just don't want to ever have to
50        cope with.
51        Throw this over to the executor (on a core-owned thread) and process it
52        there. */
53     grpc_core::Executor::Run(&refcount->destroy, GRPC_ERROR_NONE);
54   } else {
55     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
56                             GRPC_ERROR_NONE);
57   }
58 }
59 
slice_stream_destroy(void * arg)60 void slice_stream_destroy(void* arg) {
61   grpc_stream_destroy(static_cast<grpc_stream_refcount*>(arg));
62 }
63 
64 #define STREAM_REF_FROM_SLICE_REF(p)         \
65   ((grpc_stream_refcount*)(((uint8_t*)(p)) - \
66                            offsetof(grpc_stream_refcount, slice_refcount)))
67 
grpc_slice_from_stream_owned_buffer(grpc_stream_refcount * refcount,void * buffer,size_t length)68 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
69                                                void* buffer, size_t length) {
70 #ifndef NDEBUG
71   grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(&refcount->slice_refcount),
72                   "slice");
73 #else
74   grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(&refcount->slice_refcount));
75 #endif
76   grpc_slice res;
77   res.refcount = &refcount->slice_refcount;
78   res.data.refcounted.bytes = static_cast<uint8_t*>(buffer);
79   res.data.refcounted.length = length;
80   return res;
81 }
82 
83 #ifndef NDEBUG
grpc_stream_ref_init(grpc_stream_refcount * refcount,int,grpc_iomgr_cb_func cb,void * cb_arg,const char * object_type)84 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
85                           grpc_iomgr_cb_func cb, void* cb_arg,
86                           const char* object_type) {
87   refcount->object_type = object_type;
88 #else
89 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
90                           grpc_iomgr_cb_func cb, void* cb_arg) {
91 #endif
92   GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
93 
94   new (&refcount->refs) grpc_core::RefCount(
95       1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_stream_refcount) ? "stream_refcount"
96                                                              : nullptr);
97   new (&refcount->slice_refcount) grpc_slice_refcount(
98       grpc_slice_refcount::Type::REGULAR, &refcount->refs, slice_stream_destroy,
99       refcount, &refcount->slice_refcount);
100 }
101 
102 static void move64(uint64_t* from, uint64_t* to) {
103   *to += *from;
104   *from = 0;
105 }
106 
107 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
108                                        grpc_transport_one_way_stats* to) {
109   move64(&from->framing_bytes, &to->framing_bytes);
110   move64(&from->data_bytes, &to->data_bytes);
111   move64(&from->header_bytes, &to->header_bytes);
112 }
113 
114 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
115                                grpc_transport_stream_stats* to) {
116   grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
117   grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
118 }
119 
120 size_t grpc_transport_stream_size(grpc_transport* transport) {
121   return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(transport->vtable->sizeof_stream);
122 }
123 
124 void grpc_transport_destroy(grpc_transport* transport) {
125   transport->vtable->destroy(transport);
126 }
127 
128 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
129                                grpc_stream_refcount* refcount,
130                                const void* server_data,
131                                grpc_core::Arena* arena) {
132   return transport->vtable->init_stream(transport, stream, refcount,
133                                         server_data, arena);
134 }
135 
136 void grpc_transport_perform_stream_op(grpc_transport* transport,
137                                       grpc_stream* stream,
138                                       grpc_transport_stream_op_batch* op) {
139   transport->vtable->perform_stream_op(transport, stream, op);
140 }
141 
142 void grpc_transport_perform_op(grpc_transport* transport,
143                                grpc_transport_op* op) {
144   transport->vtable->perform_op(transport, op);
145 }
146 
147 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
148                              grpc_polling_entity* pollent) {
149   grpc_pollset* pollset;
150   grpc_pollset_set* pollset_set;
151   if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
152     transport->vtable->set_pollset(transport, stream, pollset);
153   } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
154              nullptr) {
155     transport->vtable->set_pollset_set(transport, stream, pollset_set);
156   } else {
157     // No-op for empty pollset. Empty pollset is possible when using
158     // non-fd-based event engines such as CFStream.
159   }
160 }
161 
162 void grpc_transport_destroy_stream(grpc_transport* transport,
163                                    grpc_stream* stream,
164                                    grpc_closure* then_schedule_closure) {
165   transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
166 }
167 
168 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
169   return transport->vtable->get_endpoint(transport);
170 }
171 
172 // This comment should be sung to the tune of
173 // "Supercalifragilisticexpialidocious":
174 //
175 // grpc_transport_stream_op_batch_finish_with_failure
176 // is a function that must always unref cancel_error
177 // though it lives in lib, it handles transport stream ops sure
178 // it's grpc_transport_stream_op_batch_finish_with_failure
179 void grpc_transport_stream_op_batch_finish_with_failure(
180     grpc_transport_stream_op_batch* batch, grpc_error* error,
181     grpc_core::CallCombiner* call_combiner) {
182   if (batch->send_message) {
183     batch->payload->send_message.send_message.reset();
184   }
185   if (batch->cancel_stream) {
186     GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
187   }
188   // Construct a list of closures to execute.
189   grpc_core::CallCombinerClosureList closures;
190   if (batch->recv_initial_metadata) {
191     closures.Add(
192         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
193         GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
194   }
195   if (batch->recv_message) {
196     closures.Add(batch->payload->recv_message.recv_message_ready,
197                  GRPC_ERROR_REF(error), "failing recv_message_ready");
198   }
199   if (batch->recv_trailing_metadata) {
200     closures.Add(
201         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
202         GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
203   }
204   if (batch->on_complete != nullptr) {
205     closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
206                  "failing on_complete");
207   }
208   // Execute closures.
209   closures.RunClosures(call_combiner);
210   GRPC_ERROR_UNREF(error);
211 }
212 
213 struct made_transport_op {
214   grpc_closure outer_on_complete;
215   grpc_closure* inner_on_complete = nullptr;
216   grpc_transport_op op;
217   made_transport_op() {
218     memset(&outer_on_complete, 0, sizeof(outer_on_complete));
219   }
220 };
221 
222 static void destroy_made_transport_op(void* arg, grpc_error* error) {
223   made_transport_op* op = static_cast<made_transport_op*>(arg);
224   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete,
225                           GRPC_ERROR_REF(error));
226   delete op;
227 }
228 
229 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
230   made_transport_op* op = new made_transport_op();
231   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
232                     grpc_schedule_on_exec_ctx);
233   op->inner_on_complete = on_complete;
234   op->op.on_consumed = &op->outer_on_complete;
235   return &op->op;
236 }
237 
238 struct made_transport_stream_op {
239   grpc_closure outer_on_complete;
240   grpc_closure* inner_on_complete;
241   grpc_transport_stream_op_batch op;
242   grpc_transport_stream_op_batch_payload payload;
243 };
244 static void destroy_made_transport_stream_op(void* arg, grpc_error* error) {
245   made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
246   grpc_closure* c = op->inner_on_complete;
247   gpr_free(op);
248   grpc_core::Closure::Run(DEBUG_LOCATION, c, GRPC_ERROR_REF(error));
249 }
250 
251 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
252     grpc_closure* on_complete) {
253   made_transport_stream_op* op =
254       static_cast<made_transport_stream_op*>(gpr_zalloc(sizeof(*op)));
255   op->op.payload = &op->payload;
256   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
257                     op, grpc_schedule_on_exec_ctx);
258   op->inner_on_complete = on_complete;
259   op->op.on_complete = &op->outer_on_complete;
260   return &op->op;
261 }
262