• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
22 
23 #include <grpc/grpc.h>
24 
25 #include <string.h>
26 
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/string_util.h>
30 
31 #include "src/core/ext/filters/client_channel/connector.h"
32 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
33 #include "src/core/ext/filters/client_channel/subchannel.h"
34 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/handshaker.h"
37 #include "src/core/lib/channel/handshaker_registry.h"
38 #include "src/core/lib/iomgr/tcp_client.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 
41 typedef struct {
42   grpc_connector base;
43 
44   gpr_mu mu;
45   gpr_refcount refs;
46 
47   bool shutdown;
48   bool connecting;
49 
50   grpc_closure* notify;
51   grpc_connect_in_args args;
52   grpc_connect_out_args* result;
53 
54   grpc_endpoint* endpoint;  // Non-NULL until handshaking starts.
55 
56   grpc_closure connected;
57 
58   grpc_handshake_manager* handshake_mgr;
59 } chttp2_connector;
60 
chttp2_connector_ref(grpc_connector * con)61 static void chttp2_connector_ref(grpc_connector* con) {
62   chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
63   gpr_ref(&c->refs);
64 }
65 
chttp2_connector_unref(grpc_connector * con)66 static void chttp2_connector_unref(grpc_connector* con) {
67   chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
68   if (gpr_unref(&c->refs)) {
69     gpr_mu_destroy(&c->mu);
70     // If handshaking is not yet in progress, destroy the endpoint.
71     // Otherwise, the handshaker will do this for us.
72     if (c->endpoint != nullptr) grpc_endpoint_destroy(c->endpoint);
73     gpr_free(c);
74   }
75 }
76 
chttp2_connector_shutdown(grpc_connector * con,grpc_error * why)77 static void chttp2_connector_shutdown(grpc_connector* con, grpc_error* why) {
78   chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
79   gpr_mu_lock(&c->mu);
80   c->shutdown = true;
81   if (c->handshake_mgr != nullptr) {
82     grpc_handshake_manager_shutdown(c->handshake_mgr, GRPC_ERROR_REF(why));
83   }
84   // If handshaking is not yet in progress, shutdown the endpoint.
85   // Otherwise, the handshaker will do this for us.
86   if (!c->connecting && c->endpoint != nullptr) {
87     grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(why));
88   }
89   gpr_mu_unlock(&c->mu);
90   GRPC_ERROR_UNREF(why);
91 }
92 
on_handshake_done(void * arg,grpc_error * error)93 static void on_handshake_done(void* arg, grpc_error* error) {
94   grpc_handshaker_args* args = static_cast<grpc_handshaker_args*>(arg);
95   chttp2_connector* c = static_cast<chttp2_connector*>(args->user_data);
96   gpr_mu_lock(&c->mu);
97   if (error != GRPC_ERROR_NONE || c->shutdown) {
98     if (error == GRPC_ERROR_NONE) {
99       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
100       // We were shut down after handshaking completed successfully, so
101       // destroy the endpoint here.
102       // TODO(ctiller): It is currently necessary to shutdown endpoints
103       // before destroying them, even if we know that there are no
104       // pending read/write callbacks.  This should be fixed, at which
105       // point this can be removed.
106       grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
107       grpc_endpoint_destroy(args->endpoint);
108       grpc_channel_args_destroy(args->args);
109       grpc_slice_buffer_destroy_internal(args->read_buffer);
110       gpr_free(args->read_buffer);
111     } else {
112       error = GRPC_ERROR_REF(error);
113     }
114     memset(c->result, 0, sizeof(*c->result));
115   } else {
116     grpc_endpoint_delete_from_pollset_set(args->endpoint,
117                                           c->args.interested_parties);
118     c->result->transport =
119         grpc_create_chttp2_transport(args->args, args->endpoint, true);
120     GPR_ASSERT(c->result->transport);
121     // TODO(roth): We ideally want to wait until we receive HTTP/2
122     // settings from the server before we consider the connection
123     // established.  If that doesn't happen before the connection
124     // timeout expires, then we should consider the connection attempt a
125     // failure and feed that information back into the backoff code.
126     // We could pass a notify_on_receive_settings callback to
127     // grpc_chttp2_transport_start_reading() to let us know when
128     // settings are received, but we would need to figure out how to use
129     // that information here.
130     //
131     // Unfortunately, we don't currently have a way to split apart the two
132     // effects of scheduling c->notify: we start sending RPCs immediately
133     // (which we want to do) and we consider the connection attempt successful
134     // (which we don't want to do until we get the notify_on_receive_settings
135     // callback from the transport).  If we could split those things
136     // apart, then we could start sending RPCs but then wait for our
137     // timeout before deciding if the connection attempt is successful.
138     // If the attempt is not successful, then we would tear down the
139     // transport and feed the failure back into the backoff code.
140     //
141     // In addition, even if we did that, we would probably not want to do
142     // so until after transparent retries is implemented.  Otherwise, any
143     // RPC that we attempt to send on the connection before the timeout
144     // would fail instead of being retried on a subsequent attempt.
145     grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer,
146                                         nullptr);
147     c->result->channel_args = args->args;
148   }
149   grpc_closure* notify = c->notify;
150   c->notify = nullptr;
151   GRPC_CLOSURE_SCHED(notify, error);
152   grpc_handshake_manager_destroy(c->handshake_mgr);
153   c->handshake_mgr = nullptr;
154   gpr_mu_unlock(&c->mu);
155   chttp2_connector_unref(reinterpret_cast<grpc_connector*>(c));
156 }
157 
start_handshake_locked(chttp2_connector * c)158 static void start_handshake_locked(chttp2_connector* c) {
159   c->handshake_mgr = grpc_handshake_manager_create();
160   grpc_handshakers_add(HANDSHAKER_CLIENT, c->args.channel_args,
161                        c->handshake_mgr);
162   grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties);
163   grpc_handshake_manager_do_handshake(
164       c->handshake_mgr, c->args.interested_parties, c->endpoint,
165       c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
166       on_handshake_done, c);
167   c->endpoint = nullptr;  // Endpoint handed off to handshake manager.
168 }
169 
connected(void * arg,grpc_error * error)170 static void connected(void* arg, grpc_error* error) {
171   chttp2_connector* c = static_cast<chttp2_connector*>(arg);
172   gpr_mu_lock(&c->mu);
173   GPR_ASSERT(c->connecting);
174   c->connecting = false;
175   if (error != GRPC_ERROR_NONE || c->shutdown) {
176     if (error == GRPC_ERROR_NONE) {
177       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
178     } else {
179       error = GRPC_ERROR_REF(error);
180     }
181     memset(c->result, 0, sizeof(*c->result));
182     grpc_closure* notify = c->notify;
183     c->notify = nullptr;
184     GRPC_CLOSURE_SCHED(notify, error);
185     if (c->endpoint != nullptr) {
186       grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error));
187     }
188     gpr_mu_unlock(&c->mu);
189     chttp2_connector_unref(static_cast<grpc_connector*>(arg));
190   } else {
191     GPR_ASSERT(c->endpoint != nullptr);
192     start_handshake_locked(c);
193     gpr_mu_unlock(&c->mu);
194   }
195 }
196 
chttp2_connector_connect(grpc_connector * con,const grpc_connect_in_args * args,grpc_connect_out_args * result,grpc_closure * notify)197 static void chttp2_connector_connect(grpc_connector* con,
198                                      const grpc_connect_in_args* args,
199                                      grpc_connect_out_args* result,
200                                      grpc_closure* notify) {
201   chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
202   grpc_resolved_address addr;
203   grpc_get_subchannel_address_arg(args->channel_args, &addr);
204   gpr_mu_lock(&c->mu);
205   GPR_ASSERT(c->notify == nullptr);
206   c->notify = notify;
207   c->args = *args;
208   c->result = result;
209   GPR_ASSERT(c->endpoint == nullptr);
210   chttp2_connector_ref(con);  // Ref taken for callback.
211   GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
212   GPR_ASSERT(!c->connecting);
213   c->connecting = true;
214   grpc_tcp_client_connect(&c->connected, &c->endpoint, args->interested_parties,
215                           args->channel_args, &addr, args->deadline);
216   gpr_mu_unlock(&c->mu);
217 }
218 
219 static const grpc_connector_vtable chttp2_connector_vtable = {
220     chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown,
221     chttp2_connector_connect};
222 
grpc_chttp2_connector_create()223 grpc_connector* grpc_chttp2_connector_create() {
224   chttp2_connector* c = static_cast<chttp2_connector*>(gpr_zalloc(sizeof(*c)));
225   c->base.vtable = &chttp2_connector_vtable;
226   gpr_mu_init(&c->mu);
227   gpr_ref_init(&c->refs, 1);
228   return &c->base;
229 }
230