• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 #include "src/core/lib/iomgr/socket_utils.h"
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <string.h>
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
33 #include "src/core/lib/iomgr/resolve_address.h"
34 #include "src/core/lib/iomgr/sockaddr.h"
35 #include "test/core/end2end/cq_verifier.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38 
39 extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
40 static grpc_address_resolver_vtable* default_resolver;
41 
tag(intptr_t i)42 static void* tag(intptr_t i) { return (void*)i; }
43 
44 static gpr_mu g_mu;
45 static int g_resolve_port = -1;
46 
47 static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
48     const char* dns_server, const char* addr, const char* default_port,
49     grpc_pollset_set* interested_parties, grpc_closure* on_done,
50     grpc_lb_addresses** addresses, bool check_grpclb,
51     char** service_config_json, grpc_combiner* combiner);
52 
set_resolve_port(int port)53 static void set_resolve_port(int port) {
54   gpr_mu_lock(&g_mu);
55   g_resolve_port = port;
56   gpr_mu_unlock(&g_mu);
57 }
58 
my_resolve_address(const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,grpc_resolved_addresses ** addrs)59 static void my_resolve_address(const char* addr, const char* default_port,
60                                grpc_pollset_set* interested_parties,
61                                grpc_closure* on_done,
62                                grpc_resolved_addresses** addrs) {
63   if (0 != strcmp(addr, "test")) {
64     default_resolver->resolve_address(addr, default_port, interested_parties,
65                                       on_done, addrs);
66     return;
67   }
68 
69   grpc_error* error = GRPC_ERROR_NONE;
70   gpr_mu_lock(&g_mu);
71   if (g_resolve_port < 0) {
72     gpr_mu_unlock(&g_mu);
73     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
74   } else {
75     *addrs = static_cast<grpc_resolved_addresses*>(gpr_malloc(sizeof(**addrs)));
76     (*addrs)->naddrs = 1;
77     (*addrs)->addrs = static_cast<grpc_resolved_address*>(
78         gpr_malloc(sizeof(*(*addrs)->addrs)));
79     memset((*addrs)->addrs, 0, sizeof(*(*addrs)->addrs));
80     grpc_sockaddr_in* sa =
81         reinterpret_cast<grpc_sockaddr_in*>((*addrs)->addrs[0].addr);
82     sa->sin_family = GRPC_AF_INET;
83     sa->sin_addr.s_addr = 0x100007f;
84     sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
85     (*addrs)->addrs[0].len = static_cast<socklen_t>(sizeof(*sa));
86     gpr_mu_unlock(&g_mu);
87   }
88   GRPC_CLOSURE_SCHED(on_done, error);
89 }
90 
my_blocking_resolve_address(const char * name,const char * default_port,grpc_resolved_addresses ** addresses)91 static grpc_error* my_blocking_resolve_address(
92     const char* name, const char* default_port,
93     grpc_resolved_addresses** addresses) {
94   return default_resolver->blocking_resolve_address(name, default_port,
95                                                     addresses);
96 }
97 
98 static grpc_address_resolver_vtable test_resolver = {
99     my_resolve_address, my_blocking_resolve_address};
100 
my_dns_lookup_ares_locked(const char * dns_server,const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,grpc_lb_addresses ** lb_addrs,bool check_grpclb,char ** service_config_json,grpc_combiner * combiner)101 static grpc_ares_request* my_dns_lookup_ares_locked(
102     const char* dns_server, const char* addr, const char* default_port,
103     grpc_pollset_set* interested_parties, grpc_closure* on_done,
104     grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
105     grpc_combiner* combiner) {
106   if (0 != strcmp(addr, "test")) {
107     return iomgr_dns_lookup_ares_locked(
108         dns_server, addr, default_port, interested_parties, on_done, lb_addrs,
109         check_grpclb, service_config_json, combiner);
110   }
111 
112   grpc_error* error = GRPC_ERROR_NONE;
113   gpr_mu_lock(&g_mu);
114   if (g_resolve_port < 0) {
115     gpr_mu_unlock(&g_mu);
116     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
117   } else {
118     *lb_addrs = grpc_lb_addresses_create(1, nullptr);
119     grpc_sockaddr_in* sa =
120         static_cast<grpc_sockaddr_in*>(gpr_zalloc(sizeof(grpc_sockaddr_in)));
121     sa->sin_family = GRPC_AF_INET;
122     sa->sin_addr.s_addr = 0x100007f;
123     sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
124     grpc_lb_addresses_set_address(*lb_addrs, 0, sa, sizeof(*sa), false, nullptr,
125                                   nullptr);
126     gpr_free(sa);
127     gpr_mu_unlock(&g_mu);
128   }
129   GRPC_CLOSURE_SCHED(on_done, error);
130   return nullptr;
131 }
132 
main(int argc,char ** argv)133 int main(int argc, char** argv) {
134   grpc_completion_queue* cq;
135   cq_verifier* cqv;
136   grpc_op ops[6];
137   grpc_op* op;
138 
139   grpc_test_init(argc, argv);
140 
141   gpr_mu_init(&g_mu);
142   grpc_init();
143   default_resolver = grpc_resolve_address_impl;
144   grpc_set_resolver_impl(&test_resolver);
145   iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
146   grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
147 
148   int was_cancelled1;
149   int was_cancelled2;
150 
151   grpc_metadata_array trailing_metadata_recv1;
152   grpc_metadata_array request_metadata1;
153   grpc_call_details request_details1;
154   grpc_status_code status1;
155   grpc_slice details1;
156   grpc_metadata_array_init(&trailing_metadata_recv1);
157   grpc_metadata_array_init(&request_metadata1);
158   grpc_call_details_init(&request_details1);
159 
160   grpc_metadata_array trailing_metadata_recv2;
161   grpc_metadata_array request_metadata2;
162   grpc_call_details request_details2;
163   grpc_status_code status2;
164   grpc_slice details2;
165   grpc_metadata_array_init(&trailing_metadata_recv2);
166   grpc_metadata_array_init(&request_metadata2);
167   grpc_call_details_init(&request_details2);
168 
169   cq = grpc_completion_queue_create_for_next(nullptr);
170   cqv = cq_verifier_create(cq);
171 
172   /* reserve two ports */
173   int port1 = grpc_pick_unused_port_or_die();
174   int port2 = grpc_pick_unused_port_or_die();
175 
176   char* addr;
177 
178   grpc_channel_args client_args;
179   grpc_arg arg_array[1];
180   arg_array[0].type = GRPC_ARG_INTEGER;
181   arg_array[0].key =
182       const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms");
183   arg_array[0].value.integer = 1000;
184   client_args.args = arg_array;
185   client_args.num_args = 1;
186 
187   /* create a channel that picks first amongst the servers */
188   grpc_channel* chan =
189       grpc_insecure_channel_create("test", &client_args, nullptr);
190   /* and an initial call to them */
191   grpc_slice host = grpc_slice_from_static_string("127.0.0.1");
192   grpc_call* call1 =
193       grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
194                                grpc_slice_from_static_string("/foo"), &host,
195                                grpc_timeout_seconds_to_deadline(20), nullptr);
196   /* send initial metadata to probe connectivity */
197   memset(ops, 0, sizeof(ops));
198   op = ops;
199   op->op = GRPC_OP_SEND_INITIAL_METADATA;
200   op->data.send_initial_metadata.count = 0;
201   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
202   op->reserved = nullptr;
203   op++;
204   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
205                                                    (size_t)(op - ops),
206                                                    tag(0x101), nullptr));
207   /* and receive status to probe termination */
208   memset(ops, 0, sizeof(ops));
209   op = ops;
210   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
211   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
212   op->data.recv_status_on_client.status = &status1;
213   op->data.recv_status_on_client.status_details = &details1;
214   op->flags = 0;
215   op->reserved = nullptr;
216   op++;
217   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
218                                                    (size_t)(op - ops),
219                                                    tag(0x102), nullptr));
220 
221   /* bring a server up on the first port */
222   grpc_server* server1 = grpc_server_create(nullptr, nullptr);
223   gpr_asprintf(&addr, "127.0.0.1:%d", port1);
224   grpc_server_add_insecure_http2_port(server1, addr);
225   grpc_server_register_completion_queue(server1, cq, nullptr);
226   gpr_free(addr);
227   grpc_server_start(server1);
228 
229   /* request a call to the server */
230   grpc_call* server_call1;
231   GPR_ASSERT(GRPC_CALL_OK ==
232              grpc_server_request_call(server1, &server_call1, &request_details1,
233                                       &request_metadata1, cq, cq, tag(0x301)));
234 
235   set_resolve_port(port1);
236 
237   /* first call should now start */
238   CQ_EXPECT_COMPLETION(cqv, tag(0x101), 1);
239   CQ_EXPECT_COMPLETION(cqv, tag(0x301), 1);
240   cq_verify(cqv);
241 
242   GPR_ASSERT(GRPC_CHANNEL_READY ==
243              grpc_channel_check_connectivity_state(chan, 0));
244   grpc_channel_watch_connectivity_state(chan, GRPC_CHANNEL_READY,
245                                         gpr_inf_future(GPR_CLOCK_REALTIME), cq,
246                                         tag(0x9999));
247 
248   /* listen for close on the server call to probe for finishing */
249   memset(ops, 0, sizeof(ops));
250   op = ops;
251   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
252   op->data.recv_close_on_server.cancelled = &was_cancelled1;
253   op->flags = 0;
254   op++;
255   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call1, ops,
256                                                    (size_t)(op - ops),
257                                                    tag(0x302), nullptr));
258 
259   /* shutdown first server:
260    * we should see a connectivity change and then nothing */
261   set_resolve_port(-1);
262   grpc_server_shutdown_and_notify(server1, cq, tag(0xdead1));
263   CQ_EXPECT_COMPLETION(cqv, tag(0x9999), 1);
264   cq_verify(cqv);
265   cq_verify_empty(cqv);
266 
267   /* and a new call: should go through to server2 when we start it */
268   grpc_call* call2 =
269       grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
270                                grpc_slice_from_static_string("/foo"), &host,
271                                grpc_timeout_seconds_to_deadline(20), nullptr);
272   /* send initial metadata to probe connectivity */
273   memset(ops, 0, sizeof(ops));
274   op = ops;
275   op->op = GRPC_OP_SEND_INITIAL_METADATA;
276   op->data.send_initial_metadata.count = 0;
277   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
278   op->reserved = nullptr;
279   op++;
280   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
281                                                    (size_t)(op - ops),
282                                                    tag(0x201), nullptr));
283   /* and receive status to probe termination */
284   memset(ops, 0, sizeof(ops));
285   op = ops;
286   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
287   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
288   op->data.recv_status_on_client.status = &status2;
289   op->data.recv_status_on_client.status_details = &details2;
290   op->flags = 0;
291   op->reserved = nullptr;
292   op++;
293   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
294                                                    (size_t)(op - ops),
295                                                    tag(0x202), nullptr));
296 
297   /* and bring up second server */
298   set_resolve_port(port2);
299   grpc_server* server2 = grpc_server_create(nullptr, nullptr);
300   gpr_asprintf(&addr, "127.0.0.1:%d", port2);
301   grpc_server_add_insecure_http2_port(server2, addr);
302   grpc_server_register_completion_queue(server2, cq, nullptr);
303   gpr_free(addr);
304   grpc_server_start(server2);
305 
306   /* request a call to the server */
307   grpc_call* server_call2;
308   GPR_ASSERT(GRPC_CALL_OK ==
309              grpc_server_request_call(server2, &server_call2, &request_details2,
310                                       &request_metadata2, cq, cq, tag(0x401)));
311 
312   /* second call should now start */
313   CQ_EXPECT_COMPLETION(cqv, tag(0x201), 1);
314   CQ_EXPECT_COMPLETION(cqv, tag(0x401), 1);
315   cq_verify(cqv);
316 
317   /* listen for close on the server call to probe for finishing */
318   memset(ops, 0, sizeof(ops));
319   op = ops;
320   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
321   op->data.recv_close_on_server.cancelled = &was_cancelled2;
322   op->flags = 0;
323   op++;
324   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call2, ops,
325                                                    (size_t)(op - ops),
326                                                    tag(0x402), nullptr));
327 
328   /* shutdown second server: we should see nothing */
329   grpc_server_shutdown_and_notify(server2, cq, tag(0xdead2));
330   cq_verify_empty(cqv);
331 
332   grpc_call_cancel(call1, nullptr);
333   grpc_call_cancel(call2, nullptr);
334 
335   /* now everything else should finish */
336   CQ_EXPECT_COMPLETION(cqv, tag(0x102), 1);
337   CQ_EXPECT_COMPLETION(cqv, tag(0x202), 1);
338   CQ_EXPECT_COMPLETION(cqv, tag(0x302), 1);
339   CQ_EXPECT_COMPLETION(cqv, tag(0x402), 1);
340   CQ_EXPECT_COMPLETION(cqv, tag(0xdead1), 1);
341   CQ_EXPECT_COMPLETION(cqv, tag(0xdead2), 1);
342   cq_verify(cqv);
343 
344   grpc_call_unref(call1);
345   grpc_call_unref(call2);
346   grpc_call_unref(server_call1);
347   grpc_call_unref(server_call2);
348   grpc_server_destroy(server1);
349   grpc_server_destroy(server2);
350   grpc_channel_destroy(chan);
351 
352   grpc_metadata_array_destroy(&trailing_metadata_recv1);
353   grpc_metadata_array_destroy(&request_metadata1);
354   grpc_call_details_destroy(&request_details1);
355   grpc_slice_unref(details1);
356   grpc_metadata_array_destroy(&trailing_metadata_recv2);
357   grpc_metadata_array_destroy(&request_metadata2);
358   grpc_call_details_destroy(&request_details2);
359   grpc_slice_unref(details2);
360 
361   cq_verifier_destroy(cqv);
362   grpc_completion_queue_destroy(cq);
363 
364   grpc_shutdown();
365   gpr_mu_destroy(&g_mu);
366 
367   return 0;
368 }
369