• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 #include "src/core/lib/iomgr/socket_utils.h"
25 
26 #include <string.h>
27 
28 #include <string>
29 
30 #include "absl/strings/str_cat.h"
31 
32 #include <grpc/grpc.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 
36 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
37 #include "src/core/ext/filters/client_channel/server_address.h"
38 #include "src/core/lib/iomgr/resolve_address.h"
39 #include "src/core/lib/iomgr/sockaddr.h"
40 #include "test/core/end2end/cq_verifier.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 
44 extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
45 static grpc_address_resolver_vtable* default_resolver;
46 
tag(intptr_t i)47 static void* tag(intptr_t i) { return (void*)i; }
48 
49 static gpr_mu g_mu;
50 static int g_resolve_port = -1;
51 
52 static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
53     const char* dns_server, const char* addr, const char* default_port,
54     grpc_pollset_set* interested_parties, grpc_closure* on_done,
55     std::unique_ptr<grpc_core::ServerAddressList>* addresses,
56     std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
57     char** service_config_json, int query_timeout_ms,
58     std::shared_ptr<grpc_core::WorkSerializer> combiner);
59 
60 static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request);
61 
set_resolve_port(int port)62 static void set_resolve_port(int port) {
63   gpr_mu_lock(&g_mu);
64   g_resolve_port = port;
65   gpr_mu_unlock(&g_mu);
66 }
67 
my_resolve_address(const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,grpc_resolved_addresses ** addrs)68 static void my_resolve_address(const char* addr, const char* default_port,
69                                grpc_pollset_set* interested_parties,
70                                grpc_closure* on_done,
71                                grpc_resolved_addresses** addrs) {
72   if (0 != strcmp(addr, "test")) {
73     default_resolver->resolve_address(addr, default_port, interested_parties,
74                                       on_done, addrs);
75     return;
76   }
77 
78   grpc_error* error = GRPC_ERROR_NONE;
79   gpr_mu_lock(&g_mu);
80   if (g_resolve_port < 0) {
81     gpr_mu_unlock(&g_mu);
82     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
83   } else {
84     *addrs = static_cast<grpc_resolved_addresses*>(gpr_malloc(sizeof(**addrs)));
85     (*addrs)->naddrs = 1;
86     (*addrs)->addrs = static_cast<grpc_resolved_address*>(
87         gpr_malloc(sizeof(*(*addrs)->addrs)));
88     memset((*addrs)->addrs, 0, sizeof(*(*addrs)->addrs));
89     grpc_sockaddr_in* sa =
90         reinterpret_cast<grpc_sockaddr_in*>((*addrs)->addrs[0].addr);
91     sa->sin_family = GRPC_AF_INET;
92     sa->sin_addr.s_addr = 0x100007f;
93     sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
94     (*addrs)->addrs[0].len = static_cast<socklen_t>(sizeof(*sa));
95     gpr_mu_unlock(&g_mu);
96   }
97   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
98 }
99 
my_blocking_resolve_address(const char * name,const char * default_port,grpc_resolved_addresses ** addresses)100 static grpc_error* my_blocking_resolve_address(
101     const char* name, const char* default_port,
102     grpc_resolved_addresses** addresses) {
103   return default_resolver->blocking_resolve_address(name, default_port,
104                                                     addresses);
105 }
106 
107 static grpc_address_resolver_vtable test_resolver = {
108     my_resolve_address, my_blocking_resolve_address};
109 
my_dns_lookup_ares_locked(const char * dns_server,const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,std::unique_ptr<grpc_core::ServerAddressList> * addresses,std::unique_ptr<grpc_core::ServerAddressList> * balancer_addresses,char ** service_config_json,int query_timeout_ms,std::shared_ptr<grpc_core::WorkSerializer> work_serializer)110 static grpc_ares_request* my_dns_lookup_ares_locked(
111     const char* dns_server, const char* addr, const char* default_port,
112     grpc_pollset_set* interested_parties, grpc_closure* on_done,
113     std::unique_ptr<grpc_core::ServerAddressList>* addresses,
114     std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
115     char** service_config_json, int query_timeout_ms,
116     std::shared_ptr<grpc_core::WorkSerializer> work_serializer) {
117   if (0 != strcmp(addr, "test")) {
118     return iomgr_dns_lookup_ares_locked(
119         dns_server, addr, default_port, interested_parties, on_done, addresses,
120         balancer_addresses, service_config_json, query_timeout_ms,
121         std::move(work_serializer));
122   }
123 
124   grpc_error* error = GRPC_ERROR_NONE;
125   gpr_mu_lock(&g_mu);
126   if (g_resolve_port < 0) {
127     gpr_mu_unlock(&g_mu);
128     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
129   } else {
130     *addresses = absl::make_unique<grpc_core::ServerAddressList>();
131     grpc_sockaddr_in sa;
132     sa.sin_family = GRPC_AF_INET;
133     sa.sin_addr.s_addr = 0x100007f;
134     sa.sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
135     (*addresses)->emplace_back(&sa, sizeof(sa), nullptr);
136     gpr_mu_unlock(&g_mu);
137   }
138   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
139   return nullptr;
140 }
141 
my_cancel_ares_request_locked(grpc_ares_request * request)142 static void my_cancel_ares_request_locked(grpc_ares_request* request) {
143   if (request != nullptr) {
144     iomgr_cancel_ares_request_locked(request);
145   }
146 }
147 
main(int argc,char ** argv)148 int main(int argc, char** argv) {
149   grpc_completion_queue* cq;
150   cq_verifier* cqv;
151   grpc_op ops[6];
152   grpc_op* op;
153 
154   grpc::testing::TestEnvironment env(argc, argv);
155 
156   gpr_mu_init(&g_mu);
157   grpc_init();
158   default_resolver = grpc_resolve_address_impl;
159   grpc_set_resolver_impl(&test_resolver);
160   iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
161   iomgr_cancel_ares_request_locked = grpc_cancel_ares_request_locked;
162   grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
163   grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
164 
165   int was_cancelled1;
166   int was_cancelled2;
167 
168   grpc_metadata_array trailing_metadata_recv1;
169   grpc_metadata_array request_metadata1;
170   grpc_call_details request_details1;
171   grpc_status_code status1;
172   grpc_slice details1;
173   grpc_metadata_array_init(&trailing_metadata_recv1);
174   grpc_metadata_array_init(&request_metadata1);
175   grpc_call_details_init(&request_details1);
176 
177   grpc_metadata_array trailing_metadata_recv2;
178   grpc_metadata_array request_metadata2;
179   grpc_call_details request_details2;
180   grpc_status_code status2;
181   grpc_slice details2;
182   grpc_metadata_array_init(&trailing_metadata_recv2);
183   grpc_metadata_array_init(&request_metadata2);
184   grpc_call_details_init(&request_details2);
185 
186   cq = grpc_completion_queue_create_for_next(nullptr);
187   cqv = cq_verifier_create(cq);
188 
189   /* reserve two ports */
190   int port1 = grpc_pick_unused_port_or_die();
191   int port2 = grpc_pick_unused_port_or_die();
192 
193   std::string addr;
194 
195   grpc_channel_args client_args;
196   grpc_arg arg_array[2];
197   arg_array[0].type = GRPC_ARG_INTEGER;
198   arg_array[0].key =
199       const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms");
200   arg_array[0].value.integer = 1000;
201   /* When this test brings down server1 and then brings up server2,
202    * the targetted server port number changes, and the client channel
203    * needs to re-resolve to pick this up. This test requires that
204    * happen within 10 seconds, but gRPC's DNS resolvers rate limit
205    * resolution attempts to at most once every 30 seconds by default.
206    * So we tweak it for this test. */
207   arg_array[1].type = GRPC_ARG_INTEGER;
208   arg_array[1].key =
209       const_cast<char*>(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS);
210   arg_array[1].value.integer = 1000;
211   client_args.args = arg_array;
212   client_args.num_args = 2;
213 
214   /* create a channel that picks first amongst the servers */
215   grpc_channel* chan =
216       grpc_insecure_channel_create("test", &client_args, nullptr);
217   /* and an initial call to them */
218   grpc_slice host = grpc_slice_from_static_string("127.0.0.1");
219   grpc_call* call1 =
220       grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
221                                grpc_slice_from_static_string("/foo"), &host,
222                                grpc_timeout_seconds_to_deadline(20), nullptr);
223   /* send initial metadata to probe connectivity */
224   memset(ops, 0, sizeof(ops));
225   op = ops;
226   op->op = GRPC_OP_SEND_INITIAL_METADATA;
227   op->data.send_initial_metadata.count = 0;
228   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
229   op->reserved = nullptr;
230   op++;
231   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
232                                                    (size_t)(op - ops),
233                                                    tag(0x101), nullptr));
234   /* and receive status to probe termination */
235   memset(ops, 0, sizeof(ops));
236   op = ops;
237   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
238   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
239   op->data.recv_status_on_client.status = &status1;
240   op->data.recv_status_on_client.status_details = &details1;
241   op->flags = 0;
242   op->reserved = nullptr;
243   op++;
244   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
245                                                    (size_t)(op - ops),
246                                                    tag(0x102), nullptr));
247 
248   /* bring a server up on the first port */
249   grpc_server* server1 = grpc_server_create(nullptr, nullptr);
250   addr = absl::StrCat("127.0.0.1:", port1);
251   grpc_server_add_insecure_http2_port(server1, addr.c_str());
252   grpc_server_register_completion_queue(server1, cq, nullptr);
253   grpc_server_start(server1);
254 
255   /* request a call to the server */
256   grpc_call* server_call1;
257   GPR_ASSERT(GRPC_CALL_OK ==
258              grpc_server_request_call(server1, &server_call1, &request_details1,
259                                       &request_metadata1, cq, cq, tag(0x301)));
260 
261   set_resolve_port(port1);
262 
263   /* first call should now start */
264   CQ_EXPECT_COMPLETION(cqv, tag(0x101), 1);
265   CQ_EXPECT_COMPLETION(cqv, tag(0x301), 1);
266   cq_verify(cqv);
267 
268   GPR_ASSERT(GRPC_CHANNEL_READY ==
269              grpc_channel_check_connectivity_state(chan, 0));
270   grpc_channel_watch_connectivity_state(chan, GRPC_CHANNEL_READY,
271                                         gpr_inf_future(GPR_CLOCK_REALTIME), cq,
272                                         tag(0x9999));
273 
274   /* listen for close on the server call to probe for finishing */
275   memset(ops, 0, sizeof(ops));
276   op = ops;
277   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
278   op->data.recv_close_on_server.cancelled = &was_cancelled1;
279   op->flags = 0;
280   op++;
281   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call1, ops,
282                                                    (size_t)(op - ops),
283                                                    tag(0x302), nullptr));
284 
285   /* shutdown first server:
286    * we should see a connectivity change and then nothing */
287   set_resolve_port(-1);
288   grpc_server_shutdown_and_notify(server1, cq, tag(0xdead1));
289   CQ_EXPECT_COMPLETION(cqv, tag(0x9999), 1);
290   cq_verify(cqv);
291   cq_verify_empty(cqv);
292 
293   /* and a new call: should go through to server2 when we start it */
294   grpc_call* call2 =
295       grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
296                                grpc_slice_from_static_string("/foo"), &host,
297                                grpc_timeout_seconds_to_deadline(20), nullptr);
298   /* send initial metadata to probe connectivity */
299   memset(ops, 0, sizeof(ops));
300   op = ops;
301   op->op = GRPC_OP_SEND_INITIAL_METADATA;
302   op->data.send_initial_metadata.count = 0;
303   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
304   op->reserved = nullptr;
305   op++;
306   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
307                                                    (size_t)(op - ops),
308                                                    tag(0x201), nullptr));
309   /* and receive status to probe termination */
310   memset(ops, 0, sizeof(ops));
311   op = ops;
312   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
313   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
314   op->data.recv_status_on_client.status = &status2;
315   op->data.recv_status_on_client.status_details = &details2;
316   op->flags = 0;
317   op->reserved = nullptr;
318   op++;
319   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
320                                                    (size_t)(op - ops),
321                                                    tag(0x202), nullptr));
322 
323   /* and bring up second server */
324   set_resolve_port(port2);
325   grpc_server* server2 = grpc_server_create(nullptr, nullptr);
326   addr = absl::StrCat("127.0.0.1:", port2);
327   grpc_server_add_insecure_http2_port(server2, addr.c_str());
328   grpc_server_register_completion_queue(server2, cq, nullptr);
329   grpc_server_start(server2);
330 
331   /* request a call to the server */
332   grpc_call* server_call2;
333   GPR_ASSERT(GRPC_CALL_OK ==
334              grpc_server_request_call(server2, &server_call2, &request_details2,
335                                       &request_metadata2, cq, cq, tag(0x401)));
336 
337   /* second call should now start */
338   CQ_EXPECT_COMPLETION(cqv, tag(0x201), 1);
339   CQ_EXPECT_COMPLETION(cqv, tag(0x401), 1);
340   cq_verify(cqv);
341 
342   /* listen for close on the server call to probe for finishing */
343   memset(ops, 0, sizeof(ops));
344   op = ops;
345   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
346   op->data.recv_close_on_server.cancelled = &was_cancelled2;
347   op->flags = 0;
348   op++;
349   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call2, ops,
350                                                    (size_t)(op - ops),
351                                                    tag(0x402), nullptr));
352 
353   /* shutdown second server: we should see nothing */
354   grpc_server_shutdown_and_notify(server2, cq, tag(0xdead2));
355   cq_verify_empty(cqv);
356 
357   grpc_call_cancel(call1, nullptr);
358   grpc_call_cancel(call2, nullptr);
359 
360   /* now everything else should finish */
361   CQ_EXPECT_COMPLETION(cqv, tag(0x102), 1);
362   CQ_EXPECT_COMPLETION(cqv, tag(0x202), 1);
363   CQ_EXPECT_COMPLETION(cqv, tag(0x302), 1);
364   CQ_EXPECT_COMPLETION(cqv, tag(0x402), 1);
365   CQ_EXPECT_COMPLETION(cqv, tag(0xdead1), 1);
366   CQ_EXPECT_COMPLETION(cqv, tag(0xdead2), 1);
367   cq_verify(cqv);
368 
369   grpc_call_unref(call1);
370   grpc_call_unref(call2);
371   grpc_call_unref(server_call1);
372   grpc_call_unref(server_call2);
373   grpc_server_destroy(server1);
374   grpc_server_destroy(server2);
375   grpc_channel_destroy(chan);
376 
377   grpc_metadata_array_destroy(&trailing_metadata_recv1);
378   grpc_metadata_array_destroy(&request_metadata1);
379   grpc_call_details_destroy(&request_details1);
380   grpc_slice_unref(details1);
381   grpc_metadata_array_destroy(&trailing_metadata_recv2);
382   grpc_metadata_array_destroy(&request_metadata2);
383   grpc_call_details_destroy(&request_details2);
384   grpc_slice_unref(details2);
385 
386   cq_verifier_destroy(cqv);
387   grpc_completion_queue_destroy(cq);
388 
389   grpc_shutdown();
390   gpr_mu_destroy(&g_mu);
391 
392   return 0;
393 }
394