1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/transport/transport.h"
20
21 #include <grpc/event_engine/event_engine.h>
22 #include <grpc/grpc.h>
23 #include <grpc/support/port_platform.h>
24 #include <string.h>
25
26 #include <memory>
27 #include <new>
28
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "src/core/lib/event_engine/default_event_engine.h"
33 #include "src/core/lib/iomgr/exec_ctx.h"
34 #include "src/core/lib/promise/for_each.h"
35 #include "src/core/lib/promise/promise.h"
36 #include "src/core/lib/promise/try_seq.h"
37 #include "src/core/lib/slice/slice.h"
38 #include "src/core/lib/transport/error_utils.h"
39 #include "src/core/util/time.h"
40
grpc_stream_destroy(grpc_stream_refcount * refcount)41 void grpc_stream_destroy(grpc_stream_refcount* refcount) {
42 if ((grpc_core::ExecCtx::Get()->flags() &
43 GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
44 // Ick.
45 // The thread we're running on MAY be owned (indirectly) by a call-stack.
46 // If that's the case, destroying the call-stack MAY try to destroy the
47 // thread, which is a tangled mess that we just don't want to ever have to
48 // cope with.
49 // Throw this over to the executor (on a core-owned thread) and process it
50 // there.
51 grpc_event_engine::experimental::GetDefaultEventEngine()->Run([refcount] {
52 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
53 grpc_core::ExecCtx exec_ctx;
54 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
55 absl::OkStatus());
56 });
57 } else {
58 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
59 absl::OkStatus());
60 }
61 }
62
slice_stream_destroy(void * arg)63 void slice_stream_destroy(void* arg) {
64 grpc_stream_destroy(static_cast<grpc_stream_refcount*>(arg));
65 }
66
67 #ifndef NDEBUG
grpc_stream_ref_init(grpc_stream_refcount * refcount,int,grpc_iomgr_cb_func cb,void * cb_arg,const char * object_type)68 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
69 grpc_iomgr_cb_func cb, void* cb_arg,
70 const char* object_type) {
71 refcount->object_type = object_type;
72 #else
73 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int /*initial_refs*/,
74 grpc_iomgr_cb_func cb, void* cb_arg) {
75 #endif
76 GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
77
78 new (&refcount->refs) grpc_core::RefCount(
79 1,
80 GRPC_TRACE_FLAG_ENABLED(stream_refcount) ? "stream_refcount" : nullptr);
81 }
82
83 namespace grpc_core {
84 void Transport::SetPollingEntity(grpc_stream* stream,
85 grpc_polling_entity* pollset_or_pollset_set) {
86 if (auto* pollset = grpc_polling_entity_pollset(pollset_or_pollset_set)) {
87 SetPollset(stream, pollset);
88 } else if (auto* pollset_set =
89 grpc_polling_entity_pollset_set(pollset_or_pollset_set)) {
90 SetPollsetSet(stream, pollset_set);
91 } else {
92 // No-op for empty pollset. Empty pollset is possible when using
93 // non-fd-based event engines such as CFStream.
94 }
95 }
96 } // namespace grpc_core
97
98 // This comment should be sung to the tune of
99 // "Supercalifragilisticexpialidocious":
100 //
101 // grpc_transport_stream_op_batch_finish_with_failure
102 // is a function that must always unref cancel_error
103 // though it lives in lib, it handles transport stream ops sure
104 // it's grpc_transport_stream_op_batch_finish_with_failure
105 void grpc_transport_stream_op_batch_finish_with_failure(
106 grpc_transport_stream_op_batch* batch, grpc_error_handle error,
107 grpc_core::CallCombiner* call_combiner) {
108 grpc_core::CallCombinerClosureList closures;
109 grpc_transport_stream_op_batch_queue_finish_with_failure(batch, error,
110 &closures);
111 // Execute closures.
112 closures.RunClosures(call_combiner);
113 }
114
115 void grpc_transport_stream_op_batch_queue_finish_with_failure(
116 grpc_transport_stream_op_batch* batch, grpc_error_handle error,
117 grpc_core::CallCombinerClosureList* closures) {
118 // Construct a list of closures to execute.
119 if (batch->recv_initial_metadata) {
120 closures->Add(
121 batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
122 error, "failing recv_initial_metadata_ready");
123 }
124 if (batch->recv_message) {
125 closures->Add(batch->payload->recv_message.recv_message_ready, error,
126 "failing recv_message_ready");
127 }
128 if (batch->recv_trailing_metadata) {
129 closures->Add(
130 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
131 error, "failing recv_trailing_metadata_ready");
132 }
133 if (batch->on_complete != nullptr) {
134 closures->Add(batch->on_complete, error, "failing on_complete");
135 }
136 }
137
138 void grpc_transport_stream_op_batch_finish_with_failure_from_transport(
139 grpc_transport_stream_op_batch* batch, grpc_error_handle error) {
140 // Construct a list of closures to execute.
141 if (batch->recv_initial_metadata) {
142 grpc_core::ExecCtx::Run(
143 DEBUG_LOCATION,
144 batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
145 error);
146 }
147 if (batch->recv_message) {
148 grpc_core::ExecCtx::Run(
149 DEBUG_LOCATION, batch->payload->recv_message.recv_message_ready, error);
150 }
151 if (batch->recv_trailing_metadata) {
152 grpc_core::ExecCtx::Run(
153 DEBUG_LOCATION,
154 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
155 error);
156 }
157 if (batch->on_complete != nullptr) {
158 grpc_core::ExecCtx::Run(DEBUG_LOCATION, batch->on_complete, error);
159 }
160 }
161
162 struct made_transport_op {
163 grpc_closure outer_on_complete;
164 grpc_closure* inner_on_complete = nullptr;
165 grpc_transport_op op;
166 made_transport_op() {
167 memset(&outer_on_complete, 0, sizeof(outer_on_complete));
168 }
169 };
170
171 static void destroy_made_transport_op(void* arg, grpc_error_handle error) {
172 made_transport_op* op = static_cast<made_transport_op*>(arg);
173 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete, error);
174 delete op;
175 }
176
177 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
178 made_transport_op* op = new made_transport_op();
179 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
180 grpc_schedule_on_exec_ctx);
181 op->inner_on_complete = on_complete;
182 op->op.on_consumed = &op->outer_on_complete;
183 return &op->op;
184 }
185
186 struct made_transport_stream_op {
187 grpc_closure outer_on_complete;
188 grpc_closure* inner_on_complete = nullptr;
189 grpc_transport_stream_op_batch op;
190 grpc_transport_stream_op_batch_payload payload;
191 };
192 static void destroy_made_transport_stream_op(void* arg,
193 grpc_error_handle error) {
194 made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
195 grpc_closure* c = op->inner_on_complete;
196 delete op;
197 if (c != nullptr) {
198 grpc_core::Closure::Run(DEBUG_LOCATION, c, error);
199 }
200 }
201
202 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
203 grpc_closure* on_complete) {
204 made_transport_stream_op* op = new made_transport_stream_op();
205 op->op.payload = &op->payload;
206 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
207 op, grpc_schedule_on_exec_ctx);
208 op->inner_on_complete = on_complete;
209 op->op.on_complete = &op->outer_on_complete;
210 return &op->op;
211 }
212