• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/end2end/end2end_tests.h"
20 
21 #include <grpc/support/log.h>
22 #include <grpc/support/sync.h>
23 #include <grpc/support/time.h>
24 
25 #include "src/core/lib/gprpp/thd.h"
26 #include "test/core/end2end/cq_verifier.h"
27 
tag(intptr_t t)28 static void* tag(intptr_t t) { return (void*)t; }
29 
30 typedef struct {
31   gpr_event started;
32   grpc_channel* channel;
33   grpc_completion_queue* cq;
34 } child_events;
35 
36 struct CallbackContext {
37   grpc_experimental_completion_queue_functor functor;
38   gpr_event finished;
CallbackContextCallbackContext39   explicit CallbackContext(void (*cb)(
40       grpc_experimental_completion_queue_functor* functor, int success)) {
41     functor.functor_run = cb;
42     functor.inlineable = false;
43     gpr_event_init(&finished);
44   }
45 };
46 
child_thread(void * arg)47 static void child_thread(void* arg) {
48   child_events* ce = static_cast<child_events*>(arg);
49   grpc_event ev;
50   gpr_event_set(&ce->started, (void*)1);
51   gpr_log(GPR_DEBUG, "verifying");
52   ev = grpc_completion_queue_next(ce->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC),
53                                   nullptr);
54   GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
55   GPR_ASSERT(ev.tag == tag(1));
56   GPR_ASSERT(ev.success == 0);
57 }
58 
test_connectivity(grpc_end2end_test_config config)59 static void test_connectivity(grpc_end2end_test_config config) {
60   grpc_end2end_test_fixture f = config.create_fixture(nullptr, nullptr);
61   grpc_connectivity_state state;
62   cq_verifier* cqv = cq_verifier_create(f.cq);
63   child_events ce;
64 
65   grpc_channel_args client_args;
66   grpc_arg arg_array[1];
67   arg_array[0].type = GRPC_ARG_INTEGER;
68   arg_array[0].key =
69       const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms");
70   arg_array[0].value.integer = 1000;
71   client_args.args = arg_array;
72   client_args.num_args = 1;
73 
74   config.init_client(&f, &client_args);
75 
76   ce.channel = f.client;
77   ce.cq = f.cq;
78   gpr_event_init(&ce.started);
79   grpc_core::Thread thd("grpc_connectivity", child_thread, &ce);
80   thd.Start();
81 
82   gpr_event_wait(&ce.started, gpr_inf_future(GPR_CLOCK_MONOTONIC));
83 
84   /* channels should start life in IDLE, and stay there */
85   GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) ==
86              GRPC_CHANNEL_IDLE);
87   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
88   GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) ==
89              GRPC_CHANNEL_IDLE);
90 
91   /* start watching for a change */
92   gpr_log(GPR_DEBUG, "watching");
93   grpc_channel_watch_connectivity_state(
94       f.client, GRPC_CHANNEL_IDLE, gpr_now(GPR_CLOCK_MONOTONIC), f.cq, tag(1));
95 
96   /* eventually the child thread completion should trigger */
97   thd.Join();
98 
99   /* check that we're still in idle, and start connecting */
100   GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 1) ==
101              GRPC_CHANNEL_IDLE);
102   /* start watching for a change */
103   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE,
104                                         grpc_timeout_seconds_to_deadline(3),
105                                         f.cq, tag(2));
106 
107   /* and now the watch should trigger */
108   CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
109   cq_verify(cqv);
110   state = grpc_channel_check_connectivity_state(f.client, 0);
111   GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
112              state == GRPC_CHANNEL_CONNECTING);
113 
114   /* quickly followed by a transition to TRANSIENT_FAILURE */
115   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_CONNECTING,
116                                         grpc_timeout_seconds_to_deadline(3),
117                                         f.cq, tag(3));
118   CQ_EXPECT_COMPLETION(cqv, tag(3), 1);
119   cq_verify(cqv);
120   state = grpc_channel_check_connectivity_state(f.client, 0);
121   GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
122              state == GRPC_CHANNEL_CONNECTING);
123 
124   gpr_log(GPR_DEBUG, "*** STARTING SERVER ***");
125 
126   /* now let's bring up a server to connect to */
127   config.init_server(&f, nullptr);
128 
129   gpr_log(GPR_DEBUG, "*** STARTED SERVER ***");
130 
131   /* we'll go through some set of transitions (some might be missed), until
132      READY is reached */
133   while (state != GRPC_CHANNEL_READY) {
134     grpc_channel_watch_connectivity_state(
135         f.client, state, grpc_timeout_seconds_to_deadline(3), f.cq, tag(4));
136     CQ_EXPECT_COMPLETION(cqv, tag(4), 1);
137     cq_verify(cqv);
138     state = grpc_channel_check_connectivity_state(f.client, 0);
139     GPR_ASSERT(state == GRPC_CHANNEL_READY ||
140                state == GRPC_CHANNEL_CONNECTING ||
141                state == GRPC_CHANNEL_TRANSIENT_FAILURE);
142   }
143 
144   /* bring down the server again */
145   /* we should go immediately to TRANSIENT_FAILURE */
146   gpr_log(GPR_DEBUG, "*** SHUTTING DOWN SERVER ***");
147 
148   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_READY,
149                                         grpc_timeout_seconds_to_deadline(3),
150                                         f.cq, tag(5));
151 
152   grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead));
153 
154   CQ_EXPECT_COMPLETION(cqv, tag(5), 1);
155   CQ_EXPECT_COMPLETION(cqv, tag(0xdead), 1);
156   cq_verify(cqv);
157   state = grpc_channel_check_connectivity_state(f.client, 0);
158   GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
159              state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_IDLE);
160 
161   /* cleanup server */
162   grpc_server_destroy(f.server);
163 
164   gpr_log(GPR_DEBUG, "*** SHUTDOWN SERVER ***");
165 
166   grpc_channel_destroy(f.client);
167   grpc_completion_queue_shutdown(f.cq);
168   grpc_completion_queue_destroy(f.cq);
169 
170   /* shutdown_cq is not used in this test */
171   grpc_completion_queue_destroy(f.shutdown_cq);
172   config.tear_down_data(&f);
173 
174   cq_verifier_destroy(cqv);
175 }
176 
cb_watch_connectivity(grpc_experimental_completion_queue_functor * functor,int success)177 static void cb_watch_connectivity(
178     grpc_experimental_completion_queue_functor* functor, int success) {
179   CallbackContext* cb_ctx = (CallbackContext*)functor;
180 
181   gpr_log(GPR_DEBUG, "cb_watch_connectivity called, verifying");
182 
183   /* callback must not have errors */
184   GPR_ASSERT(success != 0);
185 
186   gpr_event_set(&cb_ctx->finished, (void*)1);
187 }
188 
cb_shutdown(grpc_experimental_completion_queue_functor * functor,int)189 static void cb_shutdown(grpc_experimental_completion_queue_functor* functor,
190                         int /*success*/) {
191   CallbackContext* cb_ctx = (CallbackContext*)functor;
192 
193   gpr_log(GPR_DEBUG, "cb_shutdown called, nothing to do");
194   gpr_event_set(&cb_ctx->finished, (void*)1);
195 }
196 
test_watch_connectivity_cq_callback(grpc_end2end_test_config config)197 static void test_watch_connectivity_cq_callback(
198     grpc_end2end_test_config config) {
199   CallbackContext cb_ctx(cb_watch_connectivity);
200   CallbackContext cb_shutdown_ctx(cb_shutdown);
201   grpc_completion_queue* cq;
202   grpc_end2end_test_fixture f = config.create_fixture(nullptr, nullptr);
203 
204   config.init_client(&f, nullptr);
205 
206   /* start connecting */
207   grpc_channel_check_connectivity_state(f.client, 1);
208 
209   /* create the cq callback */
210   cq = grpc_completion_queue_create_for_callback(&cb_shutdown_ctx.functor,
211                                                  nullptr);
212 
213   /* start watching for any change, cb is immediately called
214    * and no dead lock should be raised */
215   grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE,
216                                         grpc_timeout_seconds_to_deadline(3), cq,
217                                         &cb_ctx.functor);
218 
219   /* we just check that the callback was executed once notifying a connection
220    * transition */
221   GPR_ASSERT(gpr_event_wait(&cb_ctx.finished,
222                             gpr_inf_future(GPR_CLOCK_MONOTONIC)) != nullptr);
223 
224   /* shutdown, since shutdown cb might be executed in a background thread
225    * we actively wait till is executed. */
226   grpc_completion_queue_shutdown(cq);
227   gpr_event_wait(&cb_shutdown_ctx.finished,
228                  gpr_inf_future(GPR_CLOCK_MONOTONIC));
229 
230   /* cleanup */
231   grpc_channel_destroy(f.client);
232   grpc_completion_queue_destroy(cq);
233 
234   /* shutdown_cq and cq are not used in this test */
235   grpc_completion_queue_destroy(f.cq);
236   grpc_completion_queue_destroy(f.shutdown_cq);
237 
238   config.tear_down_data(&f);
239 }
240 
connectivity(grpc_end2end_test_config config)241 void connectivity(grpc_end2end_test_config config) {
242   GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION);
243   test_connectivity(config);
244   test_watch_connectivity_cq_callback(config);
245 }
246 
connectivity_pre_init(void)247 void connectivity_pre_init(void) {}
248