1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/credentials.h>
20 #include <grpc/grpc.h>
21 #include <grpc/grpc_security.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/sync.h>
29 #include <grpc/support/time.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <string.h>
33
34 #include <memory>
35 #include <string>
36 #include <vector>
37
38 #include "absl/log/check.h"
39 #include "absl/log/log.h"
40 #include "src/core/lib/event_engine/shim.h"
41 #include "src/core/lib/iomgr/closure.h"
42 #include "src/core/lib/iomgr/endpoint.h"
43 #include "src/core/lib/iomgr/error.h"
44 #include "src/core/lib/iomgr/exec_ctx.h"
45 #include "src/core/lib/iomgr/iomgr_fwd.h"
46 #include "src/core/lib/iomgr/tcp_server.h"
47 #include "src/core/lib/slice/slice_string_helpers.h"
48 #include "src/core/util/host_port.h"
49 #include "src/core/util/notification.h"
50 #include "src/core/util/status_helper.h"
51 #include "src/core/util/string.h"
52 #include "src/core/util/thd.h"
53 #include "test/core/end2end/cq_verifier.h"
54 #include "test/core/test_util/port.h"
55 #include "test/core/test_util/test_config.h"
56 #include "test/core/test_util/test_tcp_server.h"
57
58 #define HTTP1_RESP_400 \
59 "HTTP/1.0 400 Bad Request\n" \
60 "Content-Type: text/html; charset=UTF-8\n" \
61 "Content-Length: 0\n" \
62 "Date: Tue, 07 Jun 2016 17:43:20 GMT\n\n"
63
64 #define HTTP2_SETTINGS_FRAME "\x00\x00\x00\x04\x00\x00\x00\x00\x00"
65
66 #define HTTP2_RESP(STATUS_CODE) \
67 "\x00\x00>\x01\x04\x00\x00\x00\x01" \
68 "\x10\x0e" \
69 "content-length\x01" \
70 "0" \
71 "\x10\x0c" \
72 "content-type\x10" \
73 "application/grpc" \
74 "\x10\x07:status\x03" #STATUS_CODE
75
76 #define UNPARSABLE_RESP "Bad Request\n"
77
78 #define HTTP2_DETAIL_MSG(STATUS_CODE) \
79 "Received http2 header with status: " #STATUS_CODE
80
81 // TODO(zyc) Check the content of incoming data instead of using this length
82 // The 'bad' server will start sending responses after reading this amount of
83 // data from the client.
84 #define SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD (size_t)200
85
86 struct rpc_state {
87 std::string target;
88 grpc_completion_queue* cq;
89 grpc_channel* channel;
90 grpc_call* call;
91 size_t incoming_data_length;
92 grpc_slice_buffer temp_incoming_buffer;
93 grpc_slice_buffer outgoing_buffer;
94 grpc_endpoint* tcp;
95 gpr_atm done_atm;
96 bool http2_response;
97 bool send_settings;
98 const char* response_payload;
99 size_t response_payload_length;
100 bool connection_attempt_made;
101 std::unique_ptr<grpc_core::Notification> on_connect_done;
102 };
103
104 static int server_port;
105 static struct rpc_state state;
106 static grpc_closure on_read;
107 static grpc_closure on_writing_settings_frame;
108 static grpc_closure on_write;
109
tag(intptr_t t)110 static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
111
done_write(void *,grpc_error_handle error)112 static void done_write(void* /*arg*/, grpc_error_handle error) {
113 CHECK_OK(error);
114 gpr_atm_rel_store(&state.done_atm, 1);
115 }
116
done_writing_settings_frame(void *,grpc_error_handle error)117 static void done_writing_settings_frame(void* /* arg */,
118 grpc_error_handle error) {
119 CHECK_OK(error);
120 grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
121 /*urgent=*/false, /*min_progress_size=*/1);
122 }
123
handle_write()124 static void handle_write() {
125 grpc_slice slice = grpc_slice_from_copied_buffer(
126 state.response_payload, state.response_payload_length);
127
128 grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
129 grpc_slice_buffer_add(&state.outgoing_buffer, slice);
130 grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr,
131 /*max_frame_size=*/INT_MAX);
132 }
133
handle_read(void *,grpc_error_handle error)134 static void handle_read(void* /*arg*/, grpc_error_handle error) {
135 if (!error.ok()) {
136 LOG(ERROR) << "handle_read error: " << grpc_core::StatusToString(error);
137 return;
138 }
139 state.incoming_data_length += state.temp_incoming_buffer.length;
140
141 size_t i;
142 for (i = 0; i < state.temp_incoming_buffer.count; i++) {
143 char* dump = grpc_dump_slice(state.temp_incoming_buffer.slices[i],
144 GPR_DUMP_HEX | GPR_DUMP_ASCII);
145 VLOG(2) << "Server received: " << dump;
146 gpr_free(dump);
147 }
148
149 VLOG(2) << "got " << state.incoming_data_length << " bytes, expected "
150 << SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD
151 << " bytes or a non-HTTP2 response to be sent";
152 if (state.incoming_data_length >=
153 SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD ||
154 !state.http2_response) {
155 handle_write();
156 } else {
157 grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
158 /*urgent=*/false, /*min_progress_size=*/1);
159 }
160 }
161
on_connect(void * arg,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)162 static void on_connect(void* arg, grpc_endpoint* tcp,
163 grpc_pollset* /*accepting_pollset*/,
164 grpc_tcp_server_acceptor* acceptor) {
165 gpr_free(acceptor);
166 test_tcp_server* server = static_cast<test_tcp_server*>(arg);
167 GRPC_CLOSURE_INIT(&on_read, handle_read, nullptr, grpc_schedule_on_exec_ctx);
168 GRPC_CLOSURE_INIT(&on_writing_settings_frame, done_writing_settings_frame,
169 nullptr, grpc_schedule_on_exec_ctx);
170 GRPC_CLOSURE_INIT(&on_write, done_write, nullptr, grpc_schedule_on_exec_ctx);
171 grpc_slice_buffer_init(&state.temp_incoming_buffer);
172 grpc_slice_buffer_init(&state.outgoing_buffer);
173 state.connection_attempt_made = true;
174 state.tcp = tcp;
175 state.incoming_data_length = 0;
176 grpc_endpoint_add_to_pollset(tcp, server->pollset[0]);
177 if (state.send_settings) {
178 // Send settings frame from server
179 grpc_slice slice = grpc_slice_from_static_buffer(
180 HTTP2_SETTINGS_FRAME, sizeof(HTTP2_SETTINGS_FRAME) - 1);
181 grpc_slice_buffer_add(&state.outgoing_buffer, slice);
182 grpc_endpoint_write(state.tcp, &state.outgoing_buffer,
183 &on_writing_settings_frame, nullptr,
184 /*max_frame_size=*/INT_MAX);
185 } else {
186 grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
187 /*urgent=*/false, /*min_progress_size=*/1);
188 }
189 state.on_connect_done->Notify();
190 }
191
n_sec_deadline(int seconds)192 static gpr_timespec n_sec_deadline(int seconds) {
193 return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
194 gpr_time_from_seconds(seconds, GPR_TIMESPAN));
195 }
196
start_rpc(int target_port,grpc_status_code expected_status,const char * expected_detail)197 static void start_rpc(int target_port, grpc_status_code expected_status,
198 const char* expected_detail) {
199 grpc_op ops[6];
200 grpc_op* op;
201 grpc_metadata_array initial_metadata_recv;
202 grpc_metadata_array trailing_metadata_recv;
203 grpc_status_code status;
204 grpc_call_error error;
205 grpc_slice details;
206
207 state.cq = grpc_completion_queue_create_for_next(nullptr);
208 grpc_core::CqVerifier cqv(state.cq);
209 state.target = grpc_core::JoinHostPort("127.0.0.1", target_port);
210
211 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
212 state.channel = grpc_channel_create(state.target.c_str(), creds, nullptr);
213 grpc_channel_credentials_release(creds);
214 grpc_slice host = grpc_slice_from_static_string("localhost");
215 // The default connect deadline is 20 seconds, so reduce the RPC deadline to 1
216 // second. This helps us verify - a) If the server responded with a non-HTTP2
217 // response, the connect fails immediately resulting in
218 // GRPC_STATUS_UNAVAILABLE instead of GRPC_STATUS_DEADLINE_EXCEEDED. b) If the
219 // server does not send a HTTP2 SETTINGs frame, the RPC fails with a
220 // DEADLINE_EXCEEDED.
221 state.call = grpc_channel_create_call(
222 state.channel, nullptr, GRPC_PROPAGATE_DEFAULTS, state.cq,
223 grpc_slice_from_static_string("/Service/Method"), &host,
224 n_sec_deadline(5), nullptr);
225
226 grpc_metadata_array_init(&initial_metadata_recv);
227 grpc_metadata_array_init(&trailing_metadata_recv);
228
229 memset(ops, 0, sizeof(ops));
230 op = ops;
231 op->op = GRPC_OP_SEND_INITIAL_METADATA;
232 op->data.send_initial_metadata.count = 0;
233 op->flags = 0;
234 op->reserved = nullptr;
235 op++;
236 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
237 op->flags = 0;
238 op->reserved = nullptr;
239 op++;
240 op->op = GRPC_OP_RECV_INITIAL_METADATA;
241 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
242 op->flags = 0;
243 op->reserved = nullptr;
244 op++;
245 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
246 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
247 op->data.recv_status_on_client.status = &status;
248 op->data.recv_status_on_client.status_details = &details;
249 op->flags = 0;
250 op->reserved = nullptr;
251 op++;
252 error = grpc_call_start_batch(state.call, ops, static_cast<size_t>(op - ops),
253 tag(1), nullptr);
254
255 CHECK_EQ(error, GRPC_CALL_OK);
256
257 cqv.Expect(tag(1), true);
258 cqv.Verify();
259
260 CHECK_EQ(status, expected_status);
261 if (expected_detail != nullptr) {
262 CHECK_NE(-1, grpc_slice_slice(
263 details, grpc_slice_from_static_string(expected_detail)));
264 }
265
266 grpc_metadata_array_destroy(&initial_metadata_recv);
267 grpc_metadata_array_destroy(&trailing_metadata_recv);
268 grpc_slice_unref(details);
269 }
270
cleanup_rpc()271 static void cleanup_rpc() {
272 grpc_event ev;
273 grpc_slice_buffer_destroy(&state.temp_incoming_buffer);
274 grpc_slice_buffer_destroy(&state.outgoing_buffer);
275 grpc_call_unref(state.call);
276 grpc_completion_queue_shutdown(state.cq);
277 do {
278 ev = grpc_completion_queue_next(state.cq, n_sec_deadline(1), nullptr);
279 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
280 grpc_completion_queue_destroy(state.cq);
281 grpc_channel_destroy(state.channel);
282 state.target.clear();
283 }
284
285 typedef struct {
286 test_tcp_server* server;
287 gpr_event* signal_when_done;
288 } poll_args;
289
actually_poll_server(void * arg)290 static void actually_poll_server(void* arg) {
291 poll_args* pa = static_cast<poll_args*>(arg);
292 gpr_timespec deadline = n_sec_deadline(5);
293 while (true) {
294 bool done = gpr_atm_acq_load(&state.done_atm) != 0;
295 gpr_timespec time_left =
296 gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME));
297 VLOG(2) << "done=" << done << ", time_left=" << time_left.tv_sec << "."
298 << time_left.tv_nsec;
299 if (done || gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) < 0) {
300 break;
301 }
302 int milliseconds = 1000;
303 if (grpc_event_engine::experimental::UseEventEngineListener()) {
304 milliseconds = 10;
305 }
306 test_tcp_server_poll(pa->server, milliseconds);
307 }
308 gpr_event_set(pa->signal_when_done, reinterpret_cast<void*>(1));
309 gpr_free(pa);
310 }
311
poll_server_until_read_done(test_tcp_server * server,gpr_event * signal_when_done)312 static grpc_core::Thread* poll_server_until_read_done(
313 test_tcp_server* server, gpr_event* signal_when_done) {
314 gpr_atm_rel_store(&state.done_atm, 0);
315 state.connection_attempt_made = false;
316 poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa)));
317 pa->server = server;
318 pa->signal_when_done = signal_when_done;
319 auto* th =
320 new grpc_core::Thread("grpc_poll_server", actually_poll_server, pa);
321 th->Start();
322 return th;
323 }
324
run_test(bool http2_response,bool send_settings,const char * response_payload,size_t response_payload_length,grpc_status_code expected_status,const char * expected_detail)325 static void run_test(bool http2_response, bool send_settings,
326 const char* response_payload,
327 size_t response_payload_length,
328 grpc_status_code expected_status,
329 const char* expected_detail) {
330 test_tcp_server test_server;
331 grpc_core::ExecCtx exec_ctx;
332 gpr_event ev;
333
334 grpc_init();
335 gpr_event_init(&ev);
336 server_port = grpc_pick_unused_port_or_die();
337 test_tcp_server_init(&test_server, on_connect, &test_server);
338 test_tcp_server_start(&test_server, server_port);
339 state.on_connect_done = std::make_unique<grpc_core::Notification>();
340 state.http2_response = http2_response;
341 state.send_settings = send_settings;
342 state.response_payload = response_payload;
343 state.response_payload_length = response_payload_length;
344
345 // poll server until sending out the response
346 std::unique_ptr<grpc_core::Thread> thdptr(
347 poll_server_until_read_done(&test_server, &ev));
348 start_rpc(server_port, expected_status, expected_detail);
349 gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
350 thdptr->Join();
351 state.on_connect_done->WaitForNotification();
352 // Proof that the server accepted the TCP connection.
353 CHECK_EQ(state.connection_attempt_made, true);
354 // clean up
355 grpc_endpoint_destroy(state.tcp);
356 cleanup_rpc();
357 grpc_core::ExecCtx::Get()->Flush();
358 test_tcp_server_destroy(&test_server);
359
360 grpc_shutdown();
361 }
362
main(int argc,char ** argv)363 int main(int argc, char** argv) {
364 grpc::testing::TestEnvironment env(&argc, argv);
365 grpc_init();
366 // status defined in hpack static table
367 run_test(true, true, HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1,
368 GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(204));
369 run_test(true, true, HTTP2_RESP(206), sizeof(HTTP2_RESP(206)) - 1,
370 GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(206));
371 run_test(true, true, HTTP2_RESP(304), sizeof(HTTP2_RESP(304)) - 1,
372 GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(304));
373 run_test(true, true, HTTP2_RESP(400), sizeof(HTTP2_RESP(400)) - 1,
374 GRPC_STATUS_INTERNAL, HTTP2_DETAIL_MSG(400));
375 run_test(true, true, HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1,
376 GRPC_STATUS_UNIMPLEMENTED, HTTP2_DETAIL_MSG(404));
377 run_test(true, true, HTTP2_RESP(500), sizeof(HTTP2_RESP(500)) - 1,
378 GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(500));
379
380 // status not defined in hpack static table
381 run_test(true, true, HTTP2_RESP(401), sizeof(HTTP2_RESP(401)) - 1,
382 GRPC_STATUS_UNAUTHENTICATED, HTTP2_DETAIL_MSG(401));
383 run_test(true, true, HTTP2_RESP(403), sizeof(HTTP2_RESP(403)) - 1,
384 GRPC_STATUS_PERMISSION_DENIED, HTTP2_DETAIL_MSG(403));
385 run_test(true, true, HTTP2_RESP(429), sizeof(HTTP2_RESP(429)) - 1,
386 GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(429));
387 run_test(true, true, HTTP2_RESP(499), sizeof(HTTP2_RESP(499)) - 1,
388 GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(499));
389 run_test(true, true, HTTP2_RESP(502), sizeof(HTTP2_RESP(502)) - 1,
390 GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(502));
391 run_test(true, true, HTTP2_RESP(503), sizeof(HTTP2_RESP(503)) - 1,
392 GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(503));
393 run_test(true, true, HTTP2_RESP(504), sizeof(HTTP2_RESP(504)) - 1,
394 GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(504));
395 // unparsable response. RPC should fail immediately due to a connect
396 // failure.
397 //
398 run_test(false, false, UNPARSABLE_RESP, sizeof(UNPARSABLE_RESP) - 1,
399 GRPC_STATUS_UNAVAILABLE, nullptr);
400
401 // http1 response. RPC should fail immediately due to a connect failure.
402 run_test(false, false, HTTP1_RESP_400, sizeof(HTTP1_RESP_400) - 1,
403 GRPC_STATUS_UNAVAILABLE, nullptr);
404
405 // http2 response without sending a SETTINGs frame. RPC should fail with
406 // DEADLINE_EXCEEDED since the RPC deadline is lower than the connection
407 // attempt deadline.
408 run_test(true, false, HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1,
409 GRPC_STATUS_DEADLINE_EXCEEDED, nullptr);
410 grpc_shutdown();
411 return 0;
412 }
413