• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
22 
23 #include <grpc/grpc.h>
24 
25 #include <string.h>
26 
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/string_util.h>
30 
31 #include "src/core/ext/filters/client_channel/connector.h"
32 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
33 #include "src/core/ext/filters/client_channel/subchannel.h"
34 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/handshaker.h"
37 #include "src/core/lib/channel/handshaker_registry.h"
38 #include "src/core/lib/iomgr/tcp_client.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 
41 namespace grpc_core {
42 
Chttp2Connector()43 Chttp2Connector::Chttp2Connector() {
44   GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx);
45 }
46 
~Chttp2Connector()47 Chttp2Connector::~Chttp2Connector() {
48   if (endpoint_ != nullptr) grpc_endpoint_destroy(endpoint_);
49 }
50 
Connect(const Args & args,Result * result,grpc_closure * notify)51 void Chttp2Connector::Connect(const Args& args, Result* result,
52                               grpc_closure* notify) {
53   grpc_resolved_address addr;
54   Subchannel::GetAddressFromSubchannelAddressArg(args.channel_args, &addr);
55   grpc_endpoint** ep;
56   {
57     MutexLock lock(&mu_);
58     GPR_ASSERT(notify_ == nullptr);
59     args_ = args;
60     result_ = result;
61     notify_ = notify;
62     GPR_ASSERT(!connecting_);
63     connecting_ = true;
64     GPR_ASSERT(endpoint_ == nullptr);
65     ep = &endpoint_;
66   }
67   // In some implementations, the closure can be flushed before
68   // grpc_tcp_client_connect() returns, and since the closure requires access
69   // to mu_, this can result in a deadlock (see
70   // https://github.com/grpc/grpc/issues/16427 for details).
71   // grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we
72   // make sure that we still exist at that point by taking a ref.
73   Ref().release();  // Ref held by callback.
74   grpc_tcp_client_connect(&connected_, ep, args.interested_parties,
75                           args.channel_args, &addr, args.deadline);
76 }
77 
Shutdown(grpc_error_handle error)78 void Chttp2Connector::Shutdown(grpc_error_handle error) {
79   MutexLock lock(&mu_);
80   shutdown_ = true;
81   if (handshake_mgr_ != nullptr) {
82     handshake_mgr_->Shutdown(GRPC_ERROR_REF(error));
83   }
84   // If handshaking is not yet in progress, shutdown the endpoint.
85   // Otherwise, the handshaker will do this for us.
86   if (!connecting_ && endpoint_ != nullptr) {
87     grpc_endpoint_shutdown(endpoint_, GRPC_ERROR_REF(error));
88   }
89   GRPC_ERROR_UNREF(error);
90 }
91 
Connected(void * arg,grpc_error_handle error)92 void Chttp2Connector::Connected(void* arg, grpc_error_handle error) {
93   Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
94   bool unref = false;
95   {
96     MutexLock lock(&self->mu_);
97     GPR_ASSERT(self->connecting_);
98     self->connecting_ = false;
99     if (error != GRPC_ERROR_NONE || self->shutdown_) {
100       if (error == GRPC_ERROR_NONE) {
101         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
102       } else {
103         error = GRPC_ERROR_REF(error);
104       }
105       if (self->endpoint_ != nullptr) {
106         grpc_endpoint_shutdown(self->endpoint_, GRPC_ERROR_REF(error));
107       }
108       self->result_->Reset();
109       grpc_closure* notify = self->notify_;
110       self->notify_ = nullptr;
111       ExecCtx::Run(DEBUG_LOCATION, notify, error);
112       unref = true;
113     } else {
114       GPR_ASSERT(self->endpoint_ != nullptr);
115       self->StartHandshakeLocked();
116     }
117   }
118   if (unref) self->Unref();
119 }
120 
StartHandshakeLocked()121 void Chttp2Connector::StartHandshakeLocked() {
122   handshake_mgr_ = MakeRefCounted<HandshakeManager>();
123   HandshakerRegistry::AddHandshakers(HANDSHAKER_CLIENT, args_.channel_args,
124                                      args_.interested_parties,
125                                      handshake_mgr_.get());
126   grpc_endpoint_add_to_pollset_set(endpoint_, args_.interested_parties);
127   handshake_mgr_->DoHandshake(endpoint_, args_.channel_args, args_.deadline,
128                               nullptr /* acceptor */, OnHandshakeDone, this);
129   endpoint_ = nullptr;  // Endpoint handed off to handshake manager.
130 }
131 
132 namespace {
NullThenSchedClosure(const DebugLocation & location,grpc_closure ** closure,grpc_error_handle error)133 void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
134                           grpc_error_handle error) {
135   grpc_closure* c = *closure;
136   *closure = nullptr;
137   ExecCtx::Run(location, c, error);
138 }
139 }  // namespace
140 
OnHandshakeDone(void * arg,grpc_error_handle error)141 void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
142   auto* args = static_cast<HandshakerArgs*>(arg);
143   Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data);
144   {
145     MutexLock lock(&self->mu_);
146     if (error != GRPC_ERROR_NONE || self->shutdown_) {
147       if (error == GRPC_ERROR_NONE) {
148         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
149         // We were shut down after handshaking completed successfully, so
150         // destroy the endpoint here.
151         if (args->endpoint != nullptr) {
152           // TODO(ctiller): It is currently necessary to shutdown endpoints
153           // before destroying them, even if we know that there are no
154           // pending read/write callbacks.  This should be fixed, at which
155           // point this can be removed.
156           grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
157           grpc_endpoint_destroy(args->endpoint);
158           grpc_channel_args_destroy(args->args);
159           grpc_slice_buffer_destroy_internal(args->read_buffer);
160           gpr_free(args->read_buffer);
161         }
162       } else {
163         error = GRPC_ERROR_REF(error);
164       }
165       self->result_->Reset();
166       NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
167     } else if (args->endpoint != nullptr) {
168       self->result_->transport =
169           grpc_create_chttp2_transport(args->args, args->endpoint, true);
170       self->result_->socket_node =
171           grpc_chttp2_transport_get_socket_node(self->result_->transport);
172       self->result_->channel_args = args->args;
173       GPR_ASSERT(self->result_->transport != nullptr);
174       self->endpoint_ = args->endpoint;
175       self->Ref().release();  // Ref held by OnReceiveSettings()
176       GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
177                         grpc_schedule_on_exec_ctx);
178       self->Ref().release();  // Ref held by OnTimeout()
179       grpc_chttp2_transport_start_reading(self->result_->transport,
180                                           args->read_buffer,
181                                           &self->on_receive_settings_, nullptr);
182       GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
183                         grpc_schedule_on_exec_ctx);
184       grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_);
185     } else {
186       // If the handshaking succeeded but there is no endpoint, then the
187       // handshaker may have handed off the connection to some external
188       // code. Just verify that exit_early flag is set.
189       GPR_DEBUG_ASSERT(args->exit_early);
190       NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
191     }
192     self->handshake_mgr_.reset();
193   }
194   self->Unref();
195 }
196 
OnReceiveSettings(void * arg,grpc_error_handle error)197 void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
198   Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
199   {
200     MutexLock lock(&self->mu_);
201     if (!self->notify_error_.has_value()) {
202       grpc_endpoint_delete_from_pollset_set(self->endpoint_,
203                                             self->args_.interested_parties);
204       if (error != GRPC_ERROR_NONE) {
205         // Transport got an error while waiting on SETTINGS frame.
206         // TODO(yashykt): The following two lines should be moved to
207         // SubchannelConnector::Result::Reset()
208         grpc_transport_destroy(self->result_->transport);
209         grpc_channel_args_destroy(self->result_->channel_args);
210         self->result_->Reset();
211       }
212       self->MaybeNotify(GRPC_ERROR_REF(error));
213       grpc_timer_cancel(&self->timer_);
214     } else {
215       // OnTimeout() was already invoked. Call Notify() again so that notify_
216       // can be invoked.
217       self->MaybeNotify(GRPC_ERROR_NONE);
218     }
219   }
220   self->Unref();
221 }
222 
OnTimeout(void * arg,grpc_error_handle)223 void Chttp2Connector::OnTimeout(void* arg, grpc_error_handle /*error*/) {
224   Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
225   {
226     MutexLock lock(&self->mu_);
227     if (!self->notify_error_.has_value()) {
228       // The transport did not receive the settings frame in time. Destroy the
229       // transport.
230       grpc_endpoint_delete_from_pollset_set(self->endpoint_,
231                                             self->args_.interested_parties);
232       // TODO(yashykt): The following two lines should be moved to
233       // SubchannelConnector::Result::Reset()
234       grpc_transport_destroy(self->result_->transport);
235       grpc_channel_args_destroy(self->result_->channel_args);
236       self->result_->Reset();
237       self->MaybeNotify(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
238           "connection attempt timed out before receiving SETTINGS frame"));
239     } else {
240       // OnReceiveSettings() was already invoked. Call Notify() again so that
241       // notify_ can be invoked.
242       self->MaybeNotify(GRPC_ERROR_NONE);
243     }
244   }
245   self->Unref();
246 }
247 
MaybeNotify(grpc_error_handle error)248 void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
249   if (notify_error_.has_value()) {
250     GRPC_ERROR_UNREF(error);
251     NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
252     // Clear state for a new Connect().
253     // Clear out the endpoint_, since it is the responsibility of
254     // the transport to shut it down.
255     endpoint_ = nullptr;
256     notify_error_.reset();
257   } else {
258     notify_error_ = error;
259   }
260 }
261 
262 }  // namespace grpc_core
263