1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include <memory.h>
26 #include <stdio.h>
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32
33 #include "src/core/lib/gprpp/thd.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/lib/iomgr/iomgr.h"
36 #include "src/core/lib/iomgr/resolve_address.h"
37 #include "src/core/lib/iomgr/sockaddr_utils.h"
38 #include "src/core/lib/iomgr/tcp_server.h"
39
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42
43 #define NUM_THREADS 100
44 #define NUM_OUTER_LOOPS 10
45 #define NUM_INNER_LOOPS 10
46 #define DELAY_MILLIS 10
47 #define POLL_MILLIS 3000
48
49 #define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10
50 #define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100
51 #define DELAY_MILLIS_SHORT_TIMEOUTS 1
52 // in a successful test run, POLL_MILLIS should never be reached because all
53 // runs should end after the shorter delay_millis
54 #define POLL_MILLIS_SHORT_TIMEOUTS 30000
55 // it should never take longer that this to shutdown the server
56 #define SERVER_SHUTDOWN_TIMEOUT 30000
57
tag(int n)58 static void* tag(int n) { return (void*)static_cast<uintptr_t>(n); }
detag(void * p)59 static int detag(void* p) { return static_cast<int>((uintptr_t)p); }
60
create_loop_destroy(void * addr)61 void create_loop_destroy(void* addr) {
62 for (int i = 0; i < NUM_OUTER_LOOPS; ++i) {
63 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
64 grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
65 nullptr, nullptr);
66
67 for (int j = 0; j < NUM_INNER_LOOPS; ++j) {
68 gpr_timespec later_time =
69 grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS);
70 grpc_connectivity_state state =
71 grpc_channel_check_connectivity_state(chan, 1);
72 grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
73 nullptr);
74 gpr_timespec poll_time =
75 grpc_timeout_milliseconds_to_deadline(POLL_MILLIS);
76 GPR_ASSERT(grpc_completion_queue_next(cq, poll_time, nullptr).type ==
77 GRPC_OP_COMPLETE);
78 /* check that the watcher from "watch state" was free'd */
79 GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
80 }
81 grpc_channel_destroy(chan);
82 grpc_completion_queue_destroy(cq);
83 }
84 }
85
86 struct server_thread_args {
87 char* addr;
88 grpc_server* server;
89 grpc_completion_queue* cq;
90 grpc_pollset* pollset;
91 gpr_mu* mu;
92 gpr_event ready;
93 gpr_atm stop;
94 };
95
server_thread(void * vargs)96 void server_thread(void* vargs) {
97 struct server_thread_args* args =
98 static_cast<struct server_thread_args*>(vargs);
99 grpc_event ev;
100 gpr_timespec deadline =
101 grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT);
102 ev = grpc_completion_queue_next(args->cq, deadline, nullptr);
103 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
104 GPR_ASSERT(detag(ev.tag) == 0xd1e);
105 }
106
on_connect(void * vargs,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)107 static void on_connect(void* vargs, grpc_endpoint* tcp,
108 grpc_pollset* accepting_pollset,
109 grpc_tcp_server_acceptor* acceptor) {
110 gpr_free(acceptor);
111 struct server_thread_args* args =
112 static_cast<struct server_thread_args*>(vargs);
113 grpc_endpoint_shutdown(tcp,
114 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
115 grpc_endpoint_destroy(tcp);
116 gpr_mu_lock(args->mu);
117 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
118 gpr_mu_unlock(args->mu);
119 }
120
bad_server_thread(void * vargs)121 void bad_server_thread(void* vargs) {
122 struct server_thread_args* args =
123 static_cast<struct server_thread_args*>(vargs);
124
125 grpc_core::ExecCtx exec_ctx;
126 grpc_resolved_address resolved_addr;
127 grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_addr.addr);
128 int port;
129 grpc_tcp_server* s;
130 grpc_error* error = grpc_tcp_server_create(nullptr, nullptr, &s);
131 GPR_ASSERT(error == GRPC_ERROR_NONE);
132 memset(&resolved_addr, 0, sizeof(resolved_addr));
133 addr->sa_family = GRPC_AF_INET;
134 error = grpc_tcp_server_add_port(s, &resolved_addr, &port);
135 GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
136 GPR_ASSERT(port > 0);
137 gpr_asprintf(&args->addr, "localhost:%d", port);
138
139 grpc_tcp_server_start(s, &args->pollset, 1, on_connect, args);
140 gpr_event_set(&args->ready, (void*)1);
141
142 gpr_mu_lock(args->mu);
143 while (gpr_atm_acq_load(&args->stop) == 0) {
144 grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 100;
145
146 grpc_pollset_worker* worker = nullptr;
147 if (!GRPC_LOG_IF_ERROR(
148 "pollset_work",
149 grpc_pollset_work(args->pollset, &worker, deadline))) {
150 gpr_atm_rel_store(&args->stop, 1);
151 }
152 gpr_mu_unlock(args->mu);
153
154 gpr_mu_lock(args->mu);
155 }
156 gpr_mu_unlock(args->mu);
157
158 grpc_tcp_server_unref(s);
159
160 gpr_free(args->addr);
161 }
162
done_pollset_shutdown(void * pollset,grpc_error * error)163 static void done_pollset_shutdown(void* pollset, grpc_error* error) {
164 grpc_pollset_destroy(static_cast<grpc_pollset*>(pollset));
165 gpr_free(pollset);
166 }
167
run_concurrent_connectivity_test()168 int run_concurrent_connectivity_test() {
169 struct server_thread_args args;
170 memset(&args, 0, sizeof(args));
171
172 grpc_init();
173
174 /* First round, no server */
175 {
176 gpr_log(GPR_DEBUG, "Wave 1");
177 char* localhost = gpr_strdup("localhost:54321");
178 grpc_core::Thread threads[NUM_THREADS];
179 for (auto& th : threads) {
180 th = grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
181 th.Start();
182 }
183 for (auto& th : threads) {
184 th.Join();
185 }
186 gpr_free(localhost);
187 }
188
189 {
190 /* Second round, actual grpc server */
191 gpr_log(GPR_DEBUG, "Wave 2");
192 int port = grpc_pick_unused_port_or_die();
193 gpr_asprintf(&args.addr, "localhost:%d", port);
194 args.server = grpc_server_create(nullptr, nullptr);
195 grpc_server_add_insecure_http2_port(args.server, args.addr);
196 args.cq = grpc_completion_queue_create_for_next(nullptr);
197 grpc_server_register_completion_queue(args.server, args.cq, nullptr);
198 grpc_server_start(args.server);
199 grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args);
200 server2.Start();
201
202 grpc_core::Thread threads[NUM_THREADS];
203 for (auto& th : threads) {
204 th = grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
205 th.Start();
206 }
207 for (auto& th : threads) {
208 th.Join();
209 }
210 grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e));
211
212 server2.Join();
213 grpc_server_destroy(args.server);
214 grpc_completion_queue_destroy(args.cq);
215 gpr_free(args.addr);
216 }
217
218 {
219 /* Third round, bogus tcp server */
220 gpr_log(GPR_DEBUG, "Wave 3");
221 args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
222 grpc_pollset_init(args.pollset, &args.mu);
223 gpr_event_init(&args.ready);
224 grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
225 server3.Start();
226 gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
227
228 grpc_core::Thread threads[NUM_THREADS];
229 for (auto& th : threads) {
230 th = grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
231 th.Start();
232 }
233 for (auto& th : threads) {
234 th.Join();
235 }
236
237 gpr_atm_rel_store(&args.stop, 1);
238 server3.Join();
239 {
240 grpc_core::ExecCtx exec_ctx;
241 grpc_pollset_shutdown(
242 args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset,
243 grpc_schedule_on_exec_ctx));
244 }
245 }
246
247 grpc_shutdown();
248 return 0;
249 }
250
watches_with_short_timeouts(void * addr)251 void watches_with_short_timeouts(void* addr) {
252 for (int i = 0; i < NUM_OUTER_LOOPS_SHORT_TIMEOUTS; ++i) {
253 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
254 grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
255 nullptr, nullptr);
256
257 for (int j = 0; j < NUM_INNER_LOOPS_SHORT_TIMEOUTS; ++j) {
258 gpr_timespec later_time =
259 grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS_SHORT_TIMEOUTS);
260 grpc_connectivity_state state =
261 grpc_channel_check_connectivity_state(chan, 0);
262 GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
263 grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
264 nullptr);
265 gpr_timespec poll_time =
266 grpc_timeout_milliseconds_to_deadline(POLL_MILLIS_SHORT_TIMEOUTS);
267 grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr);
268 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
269 GPR_ASSERT(ev.success == false);
270 /* check that the watcher from "watch state" was free'd */
271 GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
272 }
273 grpc_channel_destroy(chan);
274 grpc_completion_queue_destroy(cq);
275 }
276 }
277
278 // This test tries to catch deadlock situations.
279 // With short timeouts on "watches" and long timeouts on cq next calls,
280 // so that a QUEUE_TIMEOUT likely means that something is stuck.
run_concurrent_watches_with_short_timeouts_test()281 int run_concurrent_watches_with_short_timeouts_test() {
282 grpc_init();
283
284 grpc_core::Thread threads[NUM_THREADS];
285
286 char* localhost = gpr_strdup("localhost:54321");
287
288 for (auto& th : threads) {
289 th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts,
290 localhost);
291 th.Start();
292 }
293 for (auto& th : threads) {
294 th.Join();
295 }
296 gpr_free(localhost);
297
298 grpc_shutdown();
299 return 0;
300 }
301
main(int argc,char ** argv)302 int main(int argc, char** argv) {
303 grpc_test_init(argc, argv);
304
305 run_concurrent_connectivity_test();
306 run_concurrent_watches_with_short_timeouts_test();
307 }
308