1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
22
23 #include <grpc/grpc.h>
24
25 #include <string.h>
26
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/string_util.h>
30
31 #include "src/core/ext/filters/client_channel/connector.h"
32 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
33 #include "src/core/ext/filters/client_channel/subchannel.h"
34 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/handshaker.h"
37 #include "src/core/lib/channel/handshaker_registry.h"
38 #include "src/core/lib/iomgr/tcp_client.h"
39 #include "src/core/lib/slice/slice_internal.h"
40
41 typedef struct {
42 grpc_connector base;
43
44 gpr_mu mu;
45 gpr_refcount refs;
46
47 bool shutdown;
48 bool connecting;
49
50 grpc_closure* notify;
51 grpc_connect_in_args args;
52 grpc_connect_out_args* result;
53
54 grpc_endpoint* endpoint; // Non-NULL until handshaking starts.
55
56 grpc_closure connected;
57
58 grpc_handshake_manager* handshake_mgr;
59 } chttp2_connector;
60
chttp2_connector_ref(grpc_connector * con)61 static void chttp2_connector_ref(grpc_connector* con) {
62 chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
63 gpr_ref(&c->refs);
64 }
65
chttp2_connector_unref(grpc_connector * con)66 static void chttp2_connector_unref(grpc_connector* con) {
67 chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
68 if (gpr_unref(&c->refs)) {
69 gpr_mu_destroy(&c->mu);
70 // If handshaking is not yet in progress, destroy the endpoint.
71 // Otherwise, the handshaker will do this for us.
72 if (c->endpoint != nullptr) grpc_endpoint_destroy(c->endpoint);
73 gpr_free(c);
74 }
75 }
76
chttp2_connector_shutdown(grpc_connector * con,grpc_error * why)77 static void chttp2_connector_shutdown(grpc_connector* con, grpc_error* why) {
78 chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
79 gpr_mu_lock(&c->mu);
80 c->shutdown = true;
81 if (c->handshake_mgr != nullptr) {
82 grpc_handshake_manager_shutdown(c->handshake_mgr, GRPC_ERROR_REF(why));
83 }
84 // If handshaking is not yet in progress, shutdown the endpoint.
85 // Otherwise, the handshaker will do this for us.
86 if (!c->connecting && c->endpoint != nullptr) {
87 grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(why));
88 }
89 gpr_mu_unlock(&c->mu);
90 GRPC_ERROR_UNREF(why);
91 }
92
on_handshake_done(void * arg,grpc_error * error)93 static void on_handshake_done(void* arg, grpc_error* error) {
94 grpc_handshaker_args* args = static_cast<grpc_handshaker_args*>(arg);
95 chttp2_connector* c = static_cast<chttp2_connector*>(args->user_data);
96 gpr_mu_lock(&c->mu);
97 if (error != GRPC_ERROR_NONE || c->shutdown) {
98 if (error == GRPC_ERROR_NONE) {
99 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
100 // We were shut down after handshaking completed successfully, so
101 // destroy the endpoint here.
102 // TODO(ctiller): It is currently necessary to shutdown endpoints
103 // before destroying them, even if we know that there are no
104 // pending read/write callbacks. This should be fixed, at which
105 // point this can be removed.
106 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_REF(error));
107 grpc_endpoint_destroy(args->endpoint);
108 grpc_channel_args_destroy(args->args);
109 grpc_slice_buffer_destroy_internal(args->read_buffer);
110 gpr_free(args->read_buffer);
111 } else {
112 error = GRPC_ERROR_REF(error);
113 }
114 memset(c->result, 0, sizeof(*c->result));
115 } else {
116 grpc_endpoint_delete_from_pollset_set(args->endpoint,
117 c->args.interested_parties);
118 c->result->transport =
119 grpc_create_chttp2_transport(args->args, args->endpoint, true);
120 GPR_ASSERT(c->result->transport);
121 // TODO(roth): We ideally want to wait until we receive HTTP/2
122 // settings from the server before we consider the connection
123 // established. If that doesn't happen before the connection
124 // timeout expires, then we should consider the connection attempt a
125 // failure and feed that information back into the backoff code.
126 // We could pass a notify_on_receive_settings callback to
127 // grpc_chttp2_transport_start_reading() to let us know when
128 // settings are received, but we would need to figure out how to use
129 // that information here.
130 //
131 // Unfortunately, we don't currently have a way to split apart the two
132 // effects of scheduling c->notify: we start sending RPCs immediately
133 // (which we want to do) and we consider the connection attempt successful
134 // (which we don't want to do until we get the notify_on_receive_settings
135 // callback from the transport). If we could split those things
136 // apart, then we could start sending RPCs but then wait for our
137 // timeout before deciding if the connection attempt is successful.
138 // If the attempt is not successful, then we would tear down the
139 // transport and feed the failure back into the backoff code.
140 //
141 // In addition, even if we did that, we would probably not want to do
142 // so until after transparent retries is implemented. Otherwise, any
143 // RPC that we attempt to send on the connection before the timeout
144 // would fail instead of being retried on a subsequent attempt.
145 grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer,
146 nullptr);
147 c->result->channel_args = args->args;
148 }
149 grpc_closure* notify = c->notify;
150 c->notify = nullptr;
151 GRPC_CLOSURE_SCHED(notify, error);
152 grpc_handshake_manager_destroy(c->handshake_mgr);
153 c->handshake_mgr = nullptr;
154 gpr_mu_unlock(&c->mu);
155 chttp2_connector_unref(reinterpret_cast<grpc_connector*>(c));
156 }
157
start_handshake_locked(chttp2_connector * c)158 static void start_handshake_locked(chttp2_connector* c) {
159 c->handshake_mgr = grpc_handshake_manager_create();
160 grpc_handshakers_add(HANDSHAKER_CLIENT, c->args.channel_args,
161 c->handshake_mgr);
162 grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties);
163 grpc_handshake_manager_do_handshake(
164 c->handshake_mgr, c->args.interested_parties, c->endpoint,
165 c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
166 on_handshake_done, c);
167 c->endpoint = nullptr; // Endpoint handed off to handshake manager.
168 }
169
connected(void * arg,grpc_error * error)170 static void connected(void* arg, grpc_error* error) {
171 chttp2_connector* c = static_cast<chttp2_connector*>(arg);
172 gpr_mu_lock(&c->mu);
173 GPR_ASSERT(c->connecting);
174 c->connecting = false;
175 if (error != GRPC_ERROR_NONE || c->shutdown) {
176 if (error == GRPC_ERROR_NONE) {
177 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
178 } else {
179 error = GRPC_ERROR_REF(error);
180 }
181 memset(c->result, 0, sizeof(*c->result));
182 grpc_closure* notify = c->notify;
183 c->notify = nullptr;
184 GRPC_CLOSURE_SCHED(notify, error);
185 if (c->endpoint != nullptr) {
186 grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error));
187 }
188 gpr_mu_unlock(&c->mu);
189 chttp2_connector_unref(static_cast<grpc_connector*>(arg));
190 } else {
191 GPR_ASSERT(c->endpoint != nullptr);
192 start_handshake_locked(c);
193 gpr_mu_unlock(&c->mu);
194 }
195 }
196
chttp2_connector_connect(grpc_connector * con,const grpc_connect_in_args * args,grpc_connect_out_args * result,grpc_closure * notify)197 static void chttp2_connector_connect(grpc_connector* con,
198 const grpc_connect_in_args* args,
199 grpc_connect_out_args* result,
200 grpc_closure* notify) {
201 chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
202 grpc_resolved_address addr;
203 grpc_get_subchannel_address_arg(args->channel_args, &addr);
204 gpr_mu_lock(&c->mu);
205 GPR_ASSERT(c->notify == nullptr);
206 c->notify = notify;
207 c->args = *args;
208 c->result = result;
209 GPR_ASSERT(c->endpoint == nullptr);
210 chttp2_connector_ref(con); // Ref taken for callback.
211 GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
212 GPR_ASSERT(!c->connecting);
213 c->connecting = true;
214 grpc_tcp_client_connect(&c->connected, &c->endpoint, args->interested_parties,
215 args->channel_args, &addr, args->deadline);
216 gpr_mu_unlock(&c->mu);
217 }
218
219 static const grpc_connector_vtable chttp2_connector_vtable = {
220 chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown,
221 chttp2_connector_connect};
222
grpc_chttp2_connector_create()223 grpc_connector* grpc_chttp2_connector_create() {
224 chttp2_connector* c = static_cast<chttp2_connector*>(gpr_zalloc(sizeof(*c)));
225 c->base.vtable = &chttp2_connector_vtable;
226 gpr_mu_init(&c->mu);
227 gpr_ref_init(&c->refs, 1);
228 return &c->base;
229 }
230