1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/log.h>
22 #include <grpc/support/string_util.h>
23
24 #include <functional>
25 #include <memory>
26 #include <thread>
27
28 #include <gtest/gtest.h>
29
30 #include "src/core/lib/iomgr/endpoint.h"
31 #include "src/core/lib/iomgr/error.h"
32 #include "src/core/lib/iomgr/pollset.h"
33 #include "src/core/lib/iomgr/pollset_set.h"
34 #include "src/core/lib/iomgr/resolve_address.h"
35 #include "src/core/lib/iomgr/tcp_client.h"
36 #include "src/core/lib/slice/slice_internal.h"
37
38 #include "test/core/util/port.h"
39 #include "test/core/util/test_config.h"
40
41 namespace grpc_core {
42 namespace test {
43 namespace {
44
45 // A gRPC server, running in its own thread.
46 class ServerThread {
47 public:
ServerThread(const char * address)48 explicit ServerThread(const char* address) : address_(address) {}
49
Start()50 void Start() {
51 // Start server with 1-second handshake timeout.
52 grpc_arg arg;
53 arg.type = GRPC_ARG_INTEGER;
54 arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
55 arg.value.integer = 1000;
56 grpc_channel_args args = {1, &arg};
57 server_ = grpc_server_create(&args, nullptr);
58 ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_));
59 cq_ = grpc_completion_queue_create_for_next(nullptr);
60 grpc_server_register_completion_queue(server_, cq_, nullptr);
61 grpc_server_start(server_);
62 thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this)));
63 }
64
Shutdown()65 void Shutdown() {
66 grpc_completion_queue* shutdown_cq =
67 grpc_completion_queue_create_for_pluck(nullptr);
68 grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
69 GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
70 grpc_timeout_seconds_to_deadline(1),
71 nullptr)
72 .type == GRPC_OP_COMPLETE);
73 grpc_completion_queue_destroy(shutdown_cq);
74 grpc_server_destroy(server_);
75 grpc_completion_queue_destroy(cq_);
76 thread_->join();
77 }
78
79 private:
Serve()80 void Serve() {
81 // The completion queue should not return anything other than shutdown.
82 grpc_event ev = grpc_completion_queue_next(
83 cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
84 ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
85 }
86
87 const char* address_; // Do not own.
88 grpc_server* server_ = nullptr;
89 grpc_completion_queue* cq_ = nullptr;
90 std::unique_ptr<std::thread> thread_;
91 };
92
93 // A TCP client that connects to the server, reads data until the server
94 // closes, and then terminates.
95 class Client {
96 public:
Client(const char * server_address)97 explicit Client(const char* server_address)
98 : server_address_(server_address) {}
99
Connect()100 void Connect() {
101 grpc_core::ExecCtx exec_ctx;
102 grpc_resolved_addresses* server_addresses = nullptr;
103 grpc_error* error =
104 grpc_blocking_resolve_address(server_address_, "80", &server_addresses);
105 ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error);
106 ASSERT_GE(server_addresses->naddrs, 1UL);
107 pollset_ = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
108 grpc_pollset_init(pollset_, &mu_);
109 grpc_pollset_set* pollset_set = grpc_pollset_set_create();
110 grpc_pollset_set_add_pollset(pollset_set, pollset_);
111 EventState state;
112 grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set,
113 nullptr /* channel_args */, server_addresses->addrs,
114 1000);
115 ASSERT_TRUE(PollUntilDone(
116 &state,
117 grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC))));
118 ASSERT_EQ(GRPC_ERROR_NONE, state.error());
119 grpc_pollset_set_destroy(pollset_set);
120 grpc_endpoint_add_to_pollset(endpoint_, pollset_);
121 grpc_resolved_addresses_destroy(server_addresses);
122 }
123
124 // Reads until an error is returned.
125 // Returns true if an error was encountered before the deadline.
ReadUntilError()126 bool ReadUntilError() {
127 grpc_core::ExecCtx exec_ctx;
128 grpc_slice_buffer read_buffer;
129 grpc_slice_buffer_init(&read_buffer);
130 bool retval = true;
131 // Use a deadline of 3 seconds, which is a lot more than we should
132 // need for a 1-second timeout, but this helps avoid flakes.
133 grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000;
134 while (true) {
135 EventState state;
136 grpc_endpoint_read(endpoint_, &read_buffer, state.closure());
137 if (!PollUntilDone(&state, deadline)) {
138 retval = false;
139 break;
140 }
141 if (state.error() != GRPC_ERROR_NONE) break;
142 gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
143 grpc_slice_buffer_reset_and_unref_internal(&read_buffer);
144 }
145 grpc_endpoint_shutdown(endpoint_,
146 GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown"));
147 grpc_slice_buffer_destroy_internal(&read_buffer);
148 return retval;
149 }
150
Shutdown()151 void Shutdown() {
152 grpc_core::ExecCtx exec_ctx;
153 grpc_endpoint_destroy(endpoint_);
154 grpc_pollset_shutdown(pollset_,
155 GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
156 grpc_schedule_on_exec_ctx));
157 }
158
159 private:
160 // State used to wait for an I/O event.
161 class EventState {
162 public:
EventState()163 EventState() {
164 GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
165 grpc_schedule_on_exec_ctx);
166 }
167
~EventState()168 ~EventState() { GRPC_ERROR_UNREF(error_); }
169
closure()170 grpc_closure* closure() { return &closure_; }
171
done() const172 bool done() const { return gpr_atm_acq_load(&done_atm_) != 0; }
173
174 // Caller does NOT take ownership of the error.
error() const175 grpc_error* error() const { return error_; }
176
177 private:
OnEventDone(void * arg,grpc_error * error)178 static void OnEventDone(void* arg, grpc_error* error) {
179 gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error));
180 EventState* state = static_cast<EventState*>(arg);
181 state->error_ = GRPC_ERROR_REF(error);
182 gpr_atm_rel_store(&state->done_atm_, 1);
183 }
184
185 grpc_closure closure_;
186 gpr_atm done_atm_ = 0;
187 grpc_error* error_ = GRPC_ERROR_NONE;
188 };
189
190 // Returns true if done, or false if deadline exceeded.
PollUntilDone(EventState * state,grpc_millis deadline)191 bool PollUntilDone(EventState* state, grpc_millis deadline) {
192 while (true) {
193 grpc_pollset_worker* worker = nullptr;
194 gpr_mu_lock(mu_);
195 GRPC_LOG_IF_ERROR(
196 "grpc_pollset_work",
197 grpc_pollset_work(pollset_, &worker,
198 grpc_core::ExecCtx::Get()->Now() + 1000));
199 gpr_mu_unlock(mu_);
200 if (state != nullptr && state->done()) return true;
201 if (grpc_core::ExecCtx::Get()->Now() >= deadline) return false;
202 }
203 }
204
PollsetDestroy(void * arg,grpc_error * error)205 static void PollsetDestroy(void* arg, grpc_error* error) {
206 grpc_pollset* pollset = static_cast<grpc_pollset*>(arg);
207 grpc_pollset_destroy(pollset);
208 gpr_free(pollset);
209 }
210
211 const char* server_address_; // Do not own.
212 grpc_endpoint* endpoint_;
213 gpr_mu* mu_;
214 grpc_pollset* pollset_;
215 };
216
TEST(SettingsTimeout,Basic)217 TEST(SettingsTimeout, Basic) {
218 // Construct server address string.
219 const int server_port = grpc_pick_unused_port_or_die();
220 char* server_address_string;
221 gpr_asprintf(&server_address_string, "localhost:%d", server_port);
222 // Start server.
223 gpr_log(GPR_INFO, "starting server on %s", server_address_string);
224 ServerThread server_thread(server_address_string);
225 server_thread.Start();
226 // Create client and connect to server.
227 gpr_log(GPR_INFO, "starting client connect");
228 Client client(server_address_string);
229 client.Connect();
230 // Client read. Should fail due to server dropping connection.
231 gpr_log(GPR_INFO, "starting client read");
232 EXPECT_TRUE(client.ReadUntilError());
233 // Shut down client.
234 gpr_log(GPR_INFO, "shutting down client");
235 client.Shutdown();
236 // Shut down server.
237 gpr_log(GPR_INFO, "shutting down server");
238 server_thread.Shutdown();
239 // Clean up.
240 gpr_free(server_address_string);
241 }
242
243 } // namespace
244 } // namespace test
245 } // namespace grpc_core
246
main(int argc,char ** argv)247 int main(int argc, char** argv) {
248 ::testing::InitGoogleTest(&argc, argv);
249 grpc_test_init(argc, argv);
250 grpc_init();
251 int result = RUN_ALL_TESTS();
252 grpc_shutdown();
253 return result;
254 }
255