1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 #include "src/core/lib/iomgr/socket_utils.h"
25
26 #include <string.h>
27
28 #include <string>
29
30 #include "absl/strings/str_cat.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35
36 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
37 #include "src/core/ext/filters/client_channel/server_address.h"
38 #include "src/core/lib/iomgr/resolve_address.h"
39 #include "src/core/lib/iomgr/sockaddr.h"
40 #include "test/core/end2end/cq_verifier.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43
44 extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
45 static grpc_address_resolver_vtable* default_resolver;
46
tag(intptr_t i)47 static void* tag(intptr_t i) { return (void*)i; }
48
49 static gpr_mu g_mu;
50 static int g_resolve_port = -1;
51
52 static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
53 const char* dns_server, const char* addr, const char* default_port,
54 grpc_pollset_set* interested_parties, grpc_closure* on_done,
55 std::unique_ptr<grpc_core::ServerAddressList>* addresses,
56 std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
57 char** service_config_json, int query_timeout_ms,
58 std::shared_ptr<grpc_core::WorkSerializer> combiner);
59
60 static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request);
61
set_resolve_port(int port)62 static void set_resolve_port(int port) {
63 gpr_mu_lock(&g_mu);
64 g_resolve_port = port;
65 gpr_mu_unlock(&g_mu);
66 }
67
my_resolve_address(const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,grpc_resolved_addresses ** addrs)68 static void my_resolve_address(const char* addr, const char* default_port,
69 grpc_pollset_set* interested_parties,
70 grpc_closure* on_done,
71 grpc_resolved_addresses** addrs) {
72 if (0 != strcmp(addr, "test")) {
73 default_resolver->resolve_address(addr, default_port, interested_parties,
74 on_done, addrs);
75 return;
76 }
77
78 grpc_error* error = GRPC_ERROR_NONE;
79 gpr_mu_lock(&g_mu);
80 if (g_resolve_port < 0) {
81 gpr_mu_unlock(&g_mu);
82 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
83 } else {
84 *addrs = static_cast<grpc_resolved_addresses*>(gpr_malloc(sizeof(**addrs)));
85 (*addrs)->naddrs = 1;
86 (*addrs)->addrs = static_cast<grpc_resolved_address*>(
87 gpr_malloc(sizeof(*(*addrs)->addrs)));
88 memset((*addrs)->addrs, 0, sizeof(*(*addrs)->addrs));
89 grpc_sockaddr_in* sa =
90 reinterpret_cast<grpc_sockaddr_in*>((*addrs)->addrs[0].addr);
91 sa->sin_family = GRPC_AF_INET;
92 sa->sin_addr.s_addr = 0x100007f;
93 sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
94 (*addrs)->addrs[0].len = static_cast<socklen_t>(sizeof(*sa));
95 gpr_mu_unlock(&g_mu);
96 }
97 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
98 }
99
my_blocking_resolve_address(const char * name,const char * default_port,grpc_resolved_addresses ** addresses)100 static grpc_error* my_blocking_resolve_address(
101 const char* name, const char* default_port,
102 grpc_resolved_addresses** addresses) {
103 return default_resolver->blocking_resolve_address(name, default_port,
104 addresses);
105 }
106
107 static grpc_address_resolver_vtable test_resolver = {
108 my_resolve_address, my_blocking_resolve_address};
109
my_dns_lookup_ares_locked(const char * dns_server,const char * addr,const char * default_port,grpc_pollset_set * interested_parties,grpc_closure * on_done,std::unique_ptr<grpc_core::ServerAddressList> * addresses,std::unique_ptr<grpc_core::ServerAddressList> * balancer_addresses,char ** service_config_json,int query_timeout_ms,std::shared_ptr<grpc_core::WorkSerializer> work_serializer)110 static grpc_ares_request* my_dns_lookup_ares_locked(
111 const char* dns_server, const char* addr, const char* default_port,
112 grpc_pollset_set* interested_parties, grpc_closure* on_done,
113 std::unique_ptr<grpc_core::ServerAddressList>* addresses,
114 std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
115 char** service_config_json, int query_timeout_ms,
116 std::shared_ptr<grpc_core::WorkSerializer> work_serializer) {
117 if (0 != strcmp(addr, "test")) {
118 return iomgr_dns_lookup_ares_locked(
119 dns_server, addr, default_port, interested_parties, on_done, addresses,
120 balancer_addresses, service_config_json, query_timeout_ms,
121 std::move(work_serializer));
122 }
123
124 grpc_error* error = GRPC_ERROR_NONE;
125 gpr_mu_lock(&g_mu);
126 if (g_resolve_port < 0) {
127 gpr_mu_unlock(&g_mu);
128 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Forced Failure");
129 } else {
130 *addresses = absl::make_unique<grpc_core::ServerAddressList>();
131 grpc_sockaddr_in sa;
132 sa.sin_family = GRPC_AF_INET;
133 sa.sin_addr.s_addr = 0x100007f;
134 sa.sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
135 (*addresses)->emplace_back(&sa, sizeof(sa), nullptr);
136 gpr_mu_unlock(&g_mu);
137 }
138 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
139 return nullptr;
140 }
141
my_cancel_ares_request_locked(grpc_ares_request * request)142 static void my_cancel_ares_request_locked(grpc_ares_request* request) {
143 if (request != nullptr) {
144 iomgr_cancel_ares_request_locked(request);
145 }
146 }
147
main(int argc,char ** argv)148 int main(int argc, char** argv) {
149 grpc_completion_queue* cq;
150 cq_verifier* cqv;
151 grpc_op ops[6];
152 grpc_op* op;
153
154 grpc::testing::TestEnvironment env(argc, argv);
155
156 gpr_mu_init(&g_mu);
157 grpc_init();
158 default_resolver = grpc_resolve_address_impl;
159 grpc_set_resolver_impl(&test_resolver);
160 iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
161 iomgr_cancel_ares_request_locked = grpc_cancel_ares_request_locked;
162 grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
163 grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
164
165 int was_cancelled1;
166 int was_cancelled2;
167
168 grpc_metadata_array trailing_metadata_recv1;
169 grpc_metadata_array request_metadata1;
170 grpc_call_details request_details1;
171 grpc_status_code status1;
172 grpc_slice details1;
173 grpc_metadata_array_init(&trailing_metadata_recv1);
174 grpc_metadata_array_init(&request_metadata1);
175 grpc_call_details_init(&request_details1);
176
177 grpc_metadata_array trailing_metadata_recv2;
178 grpc_metadata_array request_metadata2;
179 grpc_call_details request_details2;
180 grpc_status_code status2;
181 grpc_slice details2;
182 grpc_metadata_array_init(&trailing_metadata_recv2);
183 grpc_metadata_array_init(&request_metadata2);
184 grpc_call_details_init(&request_details2);
185
186 cq = grpc_completion_queue_create_for_next(nullptr);
187 cqv = cq_verifier_create(cq);
188
189 /* reserve two ports */
190 int port1 = grpc_pick_unused_port_or_die();
191 int port2 = grpc_pick_unused_port_or_die();
192
193 std::string addr;
194
195 grpc_channel_args client_args;
196 grpc_arg arg_array[2];
197 arg_array[0].type = GRPC_ARG_INTEGER;
198 arg_array[0].key =
199 const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms");
200 arg_array[0].value.integer = 1000;
201 /* When this test brings down server1 and then brings up server2,
202 * the targetted server port number changes, and the client channel
203 * needs to re-resolve to pick this up. This test requires that
204 * happen within 10 seconds, but gRPC's DNS resolvers rate limit
205 * resolution attempts to at most once every 30 seconds by default.
206 * So we tweak it for this test. */
207 arg_array[1].type = GRPC_ARG_INTEGER;
208 arg_array[1].key =
209 const_cast<char*>(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS);
210 arg_array[1].value.integer = 1000;
211 client_args.args = arg_array;
212 client_args.num_args = 2;
213
214 /* create a channel that picks first amongst the servers */
215 grpc_channel* chan =
216 grpc_insecure_channel_create("test", &client_args, nullptr);
217 /* and an initial call to them */
218 grpc_slice host = grpc_slice_from_static_string("127.0.0.1");
219 grpc_call* call1 =
220 grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
221 grpc_slice_from_static_string("/foo"), &host,
222 grpc_timeout_seconds_to_deadline(20), nullptr);
223 /* send initial metadata to probe connectivity */
224 memset(ops, 0, sizeof(ops));
225 op = ops;
226 op->op = GRPC_OP_SEND_INITIAL_METADATA;
227 op->data.send_initial_metadata.count = 0;
228 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
229 op->reserved = nullptr;
230 op++;
231 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
232 (size_t)(op - ops),
233 tag(0x101), nullptr));
234 /* and receive status to probe termination */
235 memset(ops, 0, sizeof(ops));
236 op = ops;
237 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
238 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
239 op->data.recv_status_on_client.status = &status1;
240 op->data.recv_status_on_client.status_details = &details1;
241 op->flags = 0;
242 op->reserved = nullptr;
243 op++;
244 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call1, ops,
245 (size_t)(op - ops),
246 tag(0x102), nullptr));
247
248 /* bring a server up on the first port */
249 grpc_server* server1 = grpc_server_create(nullptr, nullptr);
250 addr = absl::StrCat("127.0.0.1:", port1);
251 grpc_server_add_insecure_http2_port(server1, addr.c_str());
252 grpc_server_register_completion_queue(server1, cq, nullptr);
253 grpc_server_start(server1);
254
255 /* request a call to the server */
256 grpc_call* server_call1;
257 GPR_ASSERT(GRPC_CALL_OK ==
258 grpc_server_request_call(server1, &server_call1, &request_details1,
259 &request_metadata1, cq, cq, tag(0x301)));
260
261 set_resolve_port(port1);
262
263 /* first call should now start */
264 CQ_EXPECT_COMPLETION(cqv, tag(0x101), 1);
265 CQ_EXPECT_COMPLETION(cqv, tag(0x301), 1);
266 cq_verify(cqv);
267
268 GPR_ASSERT(GRPC_CHANNEL_READY ==
269 grpc_channel_check_connectivity_state(chan, 0));
270 grpc_channel_watch_connectivity_state(chan, GRPC_CHANNEL_READY,
271 gpr_inf_future(GPR_CLOCK_REALTIME), cq,
272 tag(0x9999));
273
274 /* listen for close on the server call to probe for finishing */
275 memset(ops, 0, sizeof(ops));
276 op = ops;
277 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
278 op->data.recv_close_on_server.cancelled = &was_cancelled1;
279 op->flags = 0;
280 op++;
281 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call1, ops,
282 (size_t)(op - ops),
283 tag(0x302), nullptr));
284
285 /* shutdown first server:
286 * we should see a connectivity change and then nothing */
287 set_resolve_port(-1);
288 grpc_server_shutdown_and_notify(server1, cq, tag(0xdead1));
289 CQ_EXPECT_COMPLETION(cqv, tag(0x9999), 1);
290 cq_verify(cqv);
291 cq_verify_empty(cqv);
292
293 /* and a new call: should go through to server2 when we start it */
294 grpc_call* call2 =
295 grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
296 grpc_slice_from_static_string("/foo"), &host,
297 grpc_timeout_seconds_to_deadline(20), nullptr);
298 /* send initial metadata to probe connectivity */
299 memset(ops, 0, sizeof(ops));
300 op = ops;
301 op->op = GRPC_OP_SEND_INITIAL_METADATA;
302 op->data.send_initial_metadata.count = 0;
303 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
304 op->reserved = nullptr;
305 op++;
306 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
307 (size_t)(op - ops),
308 tag(0x201), nullptr));
309 /* and receive status to probe termination */
310 memset(ops, 0, sizeof(ops));
311 op = ops;
312 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
313 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
314 op->data.recv_status_on_client.status = &status2;
315 op->data.recv_status_on_client.status_details = &details2;
316 op->flags = 0;
317 op->reserved = nullptr;
318 op++;
319 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call2, ops,
320 (size_t)(op - ops),
321 tag(0x202), nullptr));
322
323 /* and bring up second server */
324 set_resolve_port(port2);
325 grpc_server* server2 = grpc_server_create(nullptr, nullptr);
326 addr = absl::StrCat("127.0.0.1:", port2);
327 grpc_server_add_insecure_http2_port(server2, addr.c_str());
328 grpc_server_register_completion_queue(server2, cq, nullptr);
329 grpc_server_start(server2);
330
331 /* request a call to the server */
332 grpc_call* server_call2;
333 GPR_ASSERT(GRPC_CALL_OK ==
334 grpc_server_request_call(server2, &server_call2, &request_details2,
335 &request_metadata2, cq, cq, tag(0x401)));
336
337 /* second call should now start */
338 CQ_EXPECT_COMPLETION(cqv, tag(0x201), 1);
339 CQ_EXPECT_COMPLETION(cqv, tag(0x401), 1);
340 cq_verify(cqv);
341
342 /* listen for close on the server call to probe for finishing */
343 memset(ops, 0, sizeof(ops));
344 op = ops;
345 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
346 op->data.recv_close_on_server.cancelled = &was_cancelled2;
347 op->flags = 0;
348 op++;
349 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(server_call2, ops,
350 (size_t)(op - ops),
351 tag(0x402), nullptr));
352
353 /* shutdown second server: we should see nothing */
354 grpc_server_shutdown_and_notify(server2, cq, tag(0xdead2));
355 cq_verify_empty(cqv);
356
357 grpc_call_cancel(call1, nullptr);
358 grpc_call_cancel(call2, nullptr);
359
360 /* now everything else should finish */
361 CQ_EXPECT_COMPLETION(cqv, tag(0x102), 1);
362 CQ_EXPECT_COMPLETION(cqv, tag(0x202), 1);
363 CQ_EXPECT_COMPLETION(cqv, tag(0x302), 1);
364 CQ_EXPECT_COMPLETION(cqv, tag(0x402), 1);
365 CQ_EXPECT_COMPLETION(cqv, tag(0xdead1), 1);
366 CQ_EXPECT_COMPLETION(cqv, tag(0xdead2), 1);
367 cq_verify(cqv);
368
369 grpc_call_unref(call1);
370 grpc_call_unref(call2);
371 grpc_call_unref(server_call1);
372 grpc_call_unref(server_call2);
373 grpc_server_destroy(server1);
374 grpc_server_destroy(server2);
375 grpc_channel_destroy(chan);
376
377 grpc_metadata_array_destroy(&trailing_metadata_recv1);
378 grpc_metadata_array_destroy(&request_metadata1);
379 grpc_call_details_destroy(&request_details1);
380 grpc_slice_unref(details1);
381 grpc_metadata_array_destroy(&trailing_metadata_recv2);
382 grpc_metadata_array_destroy(&request_metadata2);
383 grpc_call_details_destroy(&request_details2);
384 grpc_slice_unref(details2);
385
386 cq_verifier_destroy(cqv);
387 grpc_completion_queue_destroy(cq);
388
389 grpc_shutdown();
390 gpr_mu_destroy(&g_mu);
391
392 return 0;
393 }
394