1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 #include "src/core/lib/iomgr/socket_utils.h"
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <string.h>
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
33 #include "src/core/lib/iomgr/resolve_address.h"
34 #include "src/core/lib/iomgr/sockaddr.h"
35 #include "test/core/end2end/cq_verifier.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38
39 extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
40 static grpc_address_resolver_vtable* default_resolver;
41
tag(intptr_t i)42 static void* tag(intptr_t i) { return (void*)i; }
43
44 static gpr_mu g_mu;
45 static int g_resolve_port = -1;
46
47 static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
48 const char* dns_server, const char* addr, const char* default_port,
49 grpc_pollset_set* interested_parties, grpc_closure* on_done,
50 grpc_lb_addresses** addresses, bool check_grpclb,
51 char** service_config_json, grpc_combiner* combiner);
52
set_resolve_port(int port)53 static void set_resolve_port(int port) {
54 gpr_mu_lock(&g_mu);
55 g_resolve_port = port;
56 gpr_mu_unlock(&g_mu);
57 }
58
my_resolve_address(const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,grpc_resolved_addresses ** addrs)59 static void my_resolve_address(const char* addr, const char* default_port,
60 grpc_pollset_set* interested_parties,
61 grpc_closure* on_done,
62 grpc_resolved_addresses** addrs) {
63 if (0 != strcmp(addr, "test")) {
64 default_resolver->resolve_address(addr, default_port, interested_parties,
65 on_done, addrs);
66 return;
67 }
68
69 grpc_error* error = GRPC_ERROR_NONE;
70 gpr_mu_lock(&g_mu);
71 if (g_resolve_port < 0) {
72 gpr_mu_unlock(&g_mu);
73 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
74 } else {
75 *addrs = static_cast<grpc_resolved_addresses*>(gpr_malloc(sizeof(**addrs)));
76 (*addrs)->naddrs = 1;
77 (*addrs)->addrs = static_cast<grpc_resolved_address*>(
78 gpr_malloc(sizeof(*(*addrs)->addrs)));
79 memset((*addrs)->addrs, 0, sizeof(*(*addrs)->addrs));
80 grpc_sockaddr_in* sa =
81 reinterpret_cast<grpc_sockaddr_in*>((*addrs)->addrs[0].addr);
82 sa->sin_family = GRPC_AF_INET;
83 sa->sin_addr.s_addr = 0x100007f;
84 sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
85 (*addrs)->addrs[0].len = static_cast<socklen_t>(sizeof(*sa));
86 gpr_mu_unlock(&g_mu);
87 }
88 GRPC_CLOSURE_SCHED(on_done, error);
89 }
90
my_blocking_resolve_address(const char * name,const char * default_port,grpc_resolved_addresses ** addresses)91 static grpc_error* my_blocking_resolve_address(
92 const char* name, const char* default_port,
93 grpc_resolved_addresses** addresses) {
94 return default_resolver->blocking_resolve_address(name, default_port,
95 addresses);
96 }
97
98 static grpc_address_resolver_vtable test_resolver = {
99 my_resolve_address, my_blocking_resolve_address};
100
my_dns_lookup_ares_locked(const char * dns_server,const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,grpc_lb_addresses ** lb_addrs,bool check_grpclb,char ** service_config_json,grpc_combiner * combiner)101 static grpc_ares_request* my_dns_lookup_ares_locked(
102 const char* dns_server, const char* addr, const char* default_port,
103 grpc_pollset_set* interested_parties, grpc_closure* on_done,
104 grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
105 grpc_combiner* combiner) {
106 if (0 != strcmp(addr, "test")) {
107 return iomgr_dns_lookup_ares_locked(
108 dns_server, addr, default_port, interested_parties, on_done, lb_addrs,
109 check_grpclb, service_config_json, combiner);
110 }
111
112 grpc_error* error = GRPC_ERROR_NONE;
113 gpr_mu_lock(&g_mu);
114 if (g_resolve_port < 0) {
115 gpr_mu_unlock(&g_mu);
116 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
117 } else {
118 *lb_addrs = grpc_lb_addresses_create(1, nullptr);
119 grpc_sockaddr_in* sa =
120 static_cast<grpc_sockaddr_in*>(gpr_zalloc(sizeof(grpc_sockaddr_in)));
121 sa->sin_family = GRPC_AF_INET;
122 sa->sin_addr.s_addr = 0x100007f;
123 sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
124 grpc_lb_addresses_set_address(*lb_addrs, 0, sa, sizeof(*sa), false, nullptr,
125 nullptr);
126 gpr_free(sa);
127 gpr_mu_unlock(&g_mu);
128 }
129 GRPC_CLOSURE_SCHED(on_done, error);
130 return nullptr;
131 }
132
main(int argc,char ** argv)133 int main(int argc, char** argv) {
134 grpc_completion_queue* cq;
135 cq_verifier* cqv;
136 grpc_op ops[6];
137 grpc_op* op;
138
139 grpc_test_init(argc, argv);
140
141 gpr_mu_init(&g_mu);
142 grpc_init();
143 default_resolver = grpc_resolve_address_impl;
144 grpc_set_resolver_impl(&test_resolver);
145 iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
146 grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
147
148 int was_cancelled1;
149 int was_cancelled2;
150
151 grpc_metadata_array trailing_metadata_recv1;
152 grpc_metadata_array request_metadata1;
153 grpc_call_details request_details1;
154 grpc_status_code status1;
155 grpc_slice details1;
156 grpc_metadata_array_init(&trailing_metadata_recv1);
157 grpc_metadata_array_init(&request_metadata1);
158 grpc_call_details_init(&request_details1);
159
160 grpc_metadata_array trailing_metadata_recv2;
161 grpc_metadata_array request_metadata2;
162 grpc_call_details request_details2;
163 grpc_status_code status2;
164 grpc_slice details2;
165 grpc_metadata_array_init(&trailing_metadata_recv2);
166 grpc_metadata_array_init(&request_metadata2);
167 grpc_call_details_init(&request_details2);
168
169 cq = grpc_completion_queue_create_for_next(nullptr);
170 cqv = cq_verifier_create(cq);
171
172 /* reserve two ports */
173 int port1 = grpc_pick_unused_port_or_die();
174 int port2 = grpc_pick_unused_port_or_die();
175
176 char* addr;
177
178 grpc_channel_args client_args;
179 grpc_arg arg_array[1];
180 arg_array[0].type = GRPC_ARG_INTEGER;
181 arg_array[0].key =
182 const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms");
183 arg_array[0].value.integer = 1000;
184 client_args.args = arg_array;
185 client_args.num_args = 1;
186
187 /* create a channel that picks first amongst the servers */
188 grpc_channel* chan =
189 grpc_insecure_channel_create("test", &client_args, nullptr);
190 /* and an initial call to them */
191 grpc_slice host = grpc_slice_from_static_string("127.0.0.1");
192 grpc_call* call1 =
193 grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
194 grpc_slice_from_static_string("/foo"), &host,
195 grpc_timeout_seconds_to_deadline(20), nullptr);
196 /* send initial metadata to probe connectivity */
197 memset(ops, 0, sizeof(ops));
198 op = ops;
199 op->op = GRPC_OP_SEND_INITIAL_METADATA;
200 op->data.send_initial_metadata.count = 0;
201 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
202 op->reserved = nullptr;
203 op++;
204 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
205 (size_t)(op - ops),
206 tag(0x101), nullptr));
207 /* and receive status to probe termination */
208 memset(ops, 0, sizeof(ops));
209 op = ops;
210 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
211 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
212 op->data.recv_status_on_client.status = &status1;
213 op->data.recv_status_on_client.status_details = &details1;
214 op->flags = 0;
215 op->reserved = nullptr;
216 op++;
217 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
218 (size_t)(op - ops),
219 tag(0x102), nullptr));
220
221 /* bring a server up on the first port */
222 grpc_server* server1 = grpc_server_create(nullptr, nullptr);
223 gpr_asprintf(&addr, "127.0.0.1:%d", port1);
224 grpc_server_add_insecure_http2_port(server1, addr);
225 grpc_server_register_completion_queue(server1, cq, nullptr);
226 gpr_free(addr);
227 grpc_server_start(server1);
228
229 /* request a call to the server */
230 grpc_call* server_call1;
231 GPR_ASSERT(GRPC_CALL_OK ==
232 grpc_server_request_call(server1, &server_call1, &request_details1,
233 &request_metadata1, cq, cq, tag(0x301)));
234
235 set_resolve_port(port1);
236
237 /* first call should now start */
238 CQ_EXPECT_COMPLETION(cqv, tag(0x101), 1);
239 CQ_EXPECT_COMPLETION(cqv, tag(0x301), 1);
240 cq_verify(cqv);
241
242 GPR_ASSERT(GRPC_CHANNEL_READY ==
243 grpc_channel_check_connectivity_state(chan, 0));
244 grpc_channel_watch_connectivity_state(chan, GRPC_CHANNEL_READY,
245 gpr_inf_future(GPR_CLOCK_REALTIME), cq,
246 tag(0x9999));
247
248 /* listen for close on the server call to probe for finishing */
249 memset(ops, 0, sizeof(ops));
250 op = ops;
251 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
252 op->data.recv_close_on_server.cancelled = &was_cancelled1;
253 op->flags = 0;
254 op++;
255 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call1, ops,
256 (size_t)(op - ops),
257 tag(0x302), nullptr));
258
259 /* shutdown first server:
260 * we should see a connectivity change and then nothing */
261 set_resolve_port(-1);
262 grpc_server_shutdown_and_notify(server1, cq, tag(0xdead1));
263 CQ_EXPECT_COMPLETION(cqv, tag(0x9999), 1);
264 cq_verify(cqv);
265 cq_verify_empty(cqv);
266
267 /* and a new call: should go through to server2 when we start it */
268 grpc_call* call2 =
269 grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
270 grpc_slice_from_static_string("/foo"), &host,
271 grpc_timeout_seconds_to_deadline(20), nullptr);
272 /* send initial metadata to probe connectivity */
273 memset(ops, 0, sizeof(ops));
274 op = ops;
275 op->op = GRPC_OP_SEND_INITIAL_METADATA;
276 op->data.send_initial_metadata.count = 0;
277 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
278 op->reserved = nullptr;
279 op++;
280 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
281 (size_t)(op - ops),
282 tag(0x201), nullptr));
283 /* and receive status to probe termination */
284 memset(ops, 0, sizeof(ops));
285 op = ops;
286 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
287 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
288 op->data.recv_status_on_client.status = &status2;
289 op->data.recv_status_on_client.status_details = &details2;
290 op->flags = 0;
291 op->reserved = nullptr;
292 op++;
293 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
294 (size_t)(op - ops),
295 tag(0x202), nullptr));
296
297 /* and bring up second server */
298 set_resolve_port(port2);
299 grpc_server* server2 = grpc_server_create(nullptr, nullptr);
300 gpr_asprintf(&addr, "127.0.0.1:%d", port2);
301 grpc_server_add_insecure_http2_port(server2, addr);
302 grpc_server_register_completion_queue(server2, cq, nullptr);
303 gpr_free(addr);
304 grpc_server_start(server2);
305
306 /* request a call to the server */
307 grpc_call* server_call2;
308 GPR_ASSERT(GRPC_CALL_OK ==
309 grpc_server_request_call(server2, &server_call2, &request_details2,
310 &request_metadata2, cq, cq, tag(0x401)));
311
312 /* second call should now start */
313 CQ_EXPECT_COMPLETION(cqv, tag(0x201), 1);
314 CQ_EXPECT_COMPLETION(cqv, tag(0x401), 1);
315 cq_verify(cqv);
316
317 /* listen for close on the server call to probe for finishing */
318 memset(ops, 0, sizeof(ops));
319 op = ops;
320 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
321 op->data.recv_close_on_server.cancelled = &was_cancelled2;
322 op->flags = 0;
323 op++;
324 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call2, ops,
325 (size_t)(op - ops),
326 tag(0x402), nullptr));
327
328 /* shutdown second server: we should see nothing */
329 grpc_server_shutdown_and_notify(server2, cq, tag(0xdead2));
330 cq_verify_empty(cqv);
331
332 grpc_call_cancel(call1, nullptr);
333 grpc_call_cancel(call2, nullptr);
334
335 /* now everything else should finish */
336 CQ_EXPECT_COMPLETION(cqv, tag(0x102), 1);
337 CQ_EXPECT_COMPLETION(cqv, tag(0x202), 1);
338 CQ_EXPECT_COMPLETION(cqv, tag(0x302), 1);
339 CQ_EXPECT_COMPLETION(cqv, tag(0x402), 1);
340 CQ_EXPECT_COMPLETION(cqv, tag(0xdead1), 1);
341 CQ_EXPECT_COMPLETION(cqv, tag(0xdead2), 1);
342 cq_verify(cqv);
343
344 grpc_call_unref(call1);
345 grpc_call_unref(call2);
346 grpc_call_unref(server_call1);
347 grpc_call_unref(server_call2);
348 grpc_server_destroy(server1);
349 grpc_server_destroy(server2);
350 grpc_channel_destroy(chan);
351
352 grpc_metadata_array_destroy(&trailing_metadata_recv1);
353 grpc_metadata_array_destroy(&request_metadata1);
354 grpc_call_details_destroy(&request_details1);
355 grpc_slice_unref(details1);
356 grpc_metadata_array_destroy(&trailing_metadata_recv2);
357 grpc_metadata_array_destroy(&request_metadata2);
358 grpc_call_details_destroy(&request_details2);
359 grpc_slice_unref(details2);
360
361 cq_verifier_destroy(cqv);
362 grpc_completion_queue_destroy(cq);
363
364 grpc_shutdown();
365 gpr_mu_destroy(&g_mu);
366
367 return 0;
368 }
369