1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
22
23 #include <grpc/grpc.h>
24
25 #include <string.h>
26
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/string_util.h>
30
31 #include "src/core/ext/filters/client_channel/connector.h"
32 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
33 #include "src/core/ext/filters/client_channel/subchannel.h"
34 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/handshaker.h"
37 #include "src/core/lib/channel/handshaker_registry.h"
38 #include "src/core/lib/iomgr/tcp_client.h"
39 #include "src/core/lib/slice/slice_internal.h"
40
41 namespace grpc_core {
42
Chttp2Connector()43 Chttp2Connector::Chttp2Connector() {
44 GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx);
45 }
46
~Chttp2Connector()47 Chttp2Connector::~Chttp2Connector() {
48 if (endpoint_ != nullptr) grpc_endpoint_destroy(endpoint_);
49 }
50
Connect(const Args & args,Result * result,grpc_closure * notify)51 void Chttp2Connector::Connect(const Args& args, Result* result,
52 grpc_closure* notify) {
53 grpc_resolved_address addr;
54 Subchannel::GetAddressFromSubchannelAddressArg(args.channel_args, &addr);
55 grpc_endpoint** ep;
56 {
57 MutexLock lock(&mu_);
58 GPR_ASSERT(notify_ == nullptr);
59 args_ = args;
60 result_ = result;
61 notify_ = notify;
62 GPR_ASSERT(!connecting_);
63 connecting_ = true;
64 GPR_ASSERT(endpoint_ == nullptr);
65 ep = &endpoint_;
66 }
67 // In some implementations, the closure can be flushed before
68 // grpc_tcp_client_connect() returns, and since the closure requires access
69 // to mu_, this can result in a deadlock (see
70 // https://github.com/grpc/grpc/issues/16427 for details).
71 // grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we
72 // make sure that we still exist at that point by taking a ref.
73 Ref().release(); // Ref held by callback.
74 grpc_tcp_client_connect(&connected_, ep, args.interested_parties,
75 args.channel_args, &addr, args.deadline);
76 }
77
Shutdown(grpc_error_handle error)78 void Chttp2Connector::Shutdown(grpc_error_handle error) {
79 MutexLock lock(&mu_);
80 shutdown_ = true;
81 if (handshake_mgr_ != nullptr) {
82 handshake_mgr_->Shutdown(GRPC_ERROR_REF(error));
83 }
84 // If handshaking is not yet in progress, shutdown the endpoint.
85 // Otherwise, the handshaker will do this for us.
86 if (!connecting_ && endpoint_ != nullptr) {
87 grpc_endpoint_shutdown(endpoint_, GRPC_ERROR_REF(error));
88 }
89 GRPC_ERROR_UNREF(error);
90 }
91
Connected(void * arg,grpc_error_handle error)92 void Chttp2Connector::Connected(void* arg, grpc_error_handle error) {
93 Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
94 bool unref = false;
95 {
96 MutexLock lock(&self->mu_);
97 GPR_ASSERT(self->connecting_);
98 self->connecting_ = false;
99 if (error != GRPC_ERROR_NONE || self->shutdown_) {
100 if (error == GRPC_ERROR_NONE) {
101 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
102 } else {
103 error = GRPC_ERROR_REF(error);
104 }
105 if (self->endpoint_ != nullptr) {
106 grpc_endpoint_shutdown(self->endpoint_, GRPC_ERROR_REF(error));
107 }
108 self->result_->Reset();
109 grpc_closure* notify = self->notify_;
110 self->notify_ = nullptr;
111 ExecCtx::Run(DEBUG_LOCATION, notify, error);
112 unref = true;
113 } else {
114 GPR_ASSERT(self->endpoint_ != nullptr);
115 self->StartHandshakeLocked();
116 }
117 }
118 if (unref) self->Unref();
119 }
120
StartHandshakeLocked()121 void Chttp2Connector::StartHandshakeLocked() {
122 handshake_mgr_ = MakeRefCounted<HandshakeManager>();
123 HandshakerRegistry::AddHandshakers(HANDSHAKER_CLIENT, args_.channel_args,
124 args_.interested_parties,
125 handshake_mgr_.get());
126 grpc_endpoint_add_to_pollset_set(endpoint_, args_.interested_parties);
127 handshake_mgr_->DoHandshake(endpoint_, args_.channel_args, args_.deadline,
128 nullptr /* acceptor */, OnHandshakeDone, this);
129 endpoint_ = nullptr; // Endpoint handed off to handshake manager.
130 }
131
132 namespace {
NullThenSchedClosure(const DebugLocation & location,grpc_closure ** closure,grpc_error_handle error)133 void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
134 grpc_error_handle error) {
135 grpc_closure* c = *closure;
136 *closure = nullptr;
137 ExecCtx::Run(location, c, error);
138 }
139 } // namespace
140
OnHandshakeDone(void * arg,grpc_error_handle error)141 void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
142 auto* args = static_cast<HandshakerArgs*>(arg);
143 Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data);
144 {
145 MutexLock lock(&self->mu_);
146 if (error != GRPC_ERROR_NONE || self->shutdown_) {
147 if (error == GRPC_ERROR_NONE) {
148 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
149 // We were shut down after handshaking completed successfully, so
150 // destroy the endpoint here.
151 if (args->endpoint != nullptr) {
152 // TODO(ctiller): It is currently necessary to shutdown endpoints
153 // before destroying them, even if we know that there are no
154 // pending read/write callbacks. This should be fixed, at which
155 // point this can be removed.
156 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
157 grpc_endpoint_destroy(args->endpoint);
158 grpc_channel_args_destroy(args->args);
159 grpc_slice_buffer_destroy_internal(args->read_buffer);
160 gpr_free(args->read_buffer);
161 }
162 } else {
163 error = GRPC_ERROR_REF(error);
164 }
165 self->result_->Reset();
166 NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
167 } else if (args->endpoint != nullptr) {
168 self->result_->transport =
169 grpc_create_chttp2_transport(args->args, args->endpoint, true);
170 self->result_->socket_node =
171 grpc_chttp2_transport_get_socket_node(self->result_->transport);
172 self->result_->channel_args = args->args;
173 GPR_ASSERT(self->result_->transport != nullptr);
174 self->endpoint_ = args->endpoint;
175 self->Ref().release(); // Ref held by OnReceiveSettings()
176 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
177 grpc_schedule_on_exec_ctx);
178 self->Ref().release(); // Ref held by OnTimeout()
179 grpc_chttp2_transport_start_reading(self->result_->transport,
180 args->read_buffer,
181 &self->on_receive_settings_, nullptr);
182 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
183 grpc_schedule_on_exec_ctx);
184 grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_);
185 } else {
186 // If the handshaking succeeded but there is no endpoint, then the
187 // handshaker may have handed off the connection to some external
188 // code. Just verify that exit_early flag is set.
189 GPR_DEBUG_ASSERT(args->exit_early);
190 NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
191 }
192 self->handshake_mgr_.reset();
193 }
194 self->Unref();
195 }
196
OnReceiveSettings(void * arg,grpc_error_handle error)197 void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
198 Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
199 {
200 MutexLock lock(&self->mu_);
201 if (!self->notify_error_.has_value()) {
202 grpc_endpoint_delete_from_pollset_set(self->endpoint_,
203 self->args_.interested_parties);
204 if (error != GRPC_ERROR_NONE) {
205 // Transport got an error while waiting on SETTINGS frame.
206 // TODO(yashykt): The following two lines should be moved to
207 // SubchannelConnector::Result::Reset()
208 grpc_transport_destroy(self->result_->transport);
209 grpc_channel_args_destroy(self->result_->channel_args);
210 self->result_->Reset();
211 }
212 self->MaybeNotify(GRPC_ERROR_REF(error));
213 grpc_timer_cancel(&self->timer_);
214 } else {
215 // OnTimeout() was already invoked. Call Notify() again so that notify_
216 // can be invoked.
217 self->MaybeNotify(GRPC_ERROR_NONE);
218 }
219 }
220 self->Unref();
221 }
222
OnTimeout(void * arg,grpc_error_handle)223 void Chttp2Connector::OnTimeout(void* arg, grpc_error_handle /*error*/) {
224 Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
225 {
226 MutexLock lock(&self->mu_);
227 if (!self->notify_error_.has_value()) {
228 // The transport did not receive the settings frame in time. Destroy the
229 // transport.
230 grpc_endpoint_delete_from_pollset_set(self->endpoint_,
231 self->args_.interested_parties);
232 // TODO(yashykt): The following two lines should be moved to
233 // SubchannelConnector::Result::Reset()
234 grpc_transport_destroy(self->result_->transport);
235 grpc_channel_args_destroy(self->result_->channel_args);
236 self->result_->Reset();
237 self->MaybeNotify(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
238 "connection attempt timed out before receiving SETTINGS frame"));
239 } else {
240 // OnReceiveSettings() was already invoked. Call Notify() again so that
241 // notify_ can be invoked.
242 self->MaybeNotify(GRPC_ERROR_NONE);
243 }
244 }
245 self->Unref();
246 }
247
MaybeNotify(grpc_error_handle error)248 void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
249 if (notify_error_.has_value()) {
250 GRPC_ERROR_UNREF(error);
251 NullThenSchedClosure(DEBUG_LOCATION, ¬ify_, notify_error_.value());
252 // Clear state for a new Connect().
253 // Clear out the endpoint_, since it is the responsibility of
254 // the transport to shut it down.
255 endpoint_ = nullptr;
256 notify_error_.reset();
257 } else {
258 notify_error_ = error;
259 }
260 }
261
262 } // namespace grpc_core
263