• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 
25 #include <string.h>
26 
27 #include <grpc/grpc.h>
28 #include <grpc/slice.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 
32 #include "src/core/lib/gpr/host_port.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/memory.h"
35 #include "src/core/lib/gprpp/thd.h"
36 #include "src/core/lib/iomgr/sockaddr.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "test/core/end2end/cq_verifier.h"
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42 #include "test/core/util/test_tcp_server.h"
43 
44 #define HTTP1_RESP                           \
45   "HTTP/1.0 400 Bad Request\n"               \
46   "Content-Type: text/html; charset=UTF-8\n" \
47   "Content-Length: 0\n"                      \
48   "Date: Tue, 07 Jun 2016 17:43:20 GMT\n\n"
49 
50 #define HTTP2_RESP(STATUS_CODE)          \
51   "\x00\x00\x00\x04\x00\x00\x00\x00\x00" \
52   "\x00\x00>\x01\x04\x00\x00\x00\x01"    \
53   "\x10\x0e"                             \
54   "content-length\x01"                   \
55   "0"                                    \
56   "\x10\x0c"                             \
57   "content-type\x10"                     \
58   "application/grpc"                     \
59   "\x10\x07:status\x03" #STATUS_CODE
60 
61 #define UNPARSEABLE_RESP "Bad Request\n"
62 
63 #define HTTP2_DETAIL_MSG(STATUS_CODE) \
64   "Received http2 header with status: " #STATUS_CODE
65 
66 #define HTTP1_DETAIL_MSG "Trying to connect an http1.x server"
67 
68 /* TODO(zyc) Check the content of incomming data instead of using this length */
69 /* The 'bad' server will start sending responses after reading this amount of
70  * data from the client. */
71 #define SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD (size_t)200
72 
73 struct rpc_state {
74   char* target;
75   grpc_completion_queue* cq;
76   grpc_channel* channel;
77   grpc_call* call;
78   size_t incoming_data_length;
79   grpc_slice_buffer temp_incoming_buffer;
80   grpc_slice_buffer outgoing_buffer;
81   grpc_endpoint* tcp;
82   gpr_atm done_atm;
83   bool write_done;
84   const char* response_payload;
85   size_t response_payload_length;
86 };
87 
88 static int server_port;
89 static struct rpc_state state;
90 static grpc_closure on_read;
91 static grpc_closure on_write;
92 
tag(intptr_t t)93 static void* tag(intptr_t t) { return (void*)t; }
94 
done_write(void * arg,grpc_error * error)95 static void done_write(void* arg, grpc_error* error) {
96   GPR_ASSERT(error == GRPC_ERROR_NONE);
97 
98   gpr_atm_rel_store(&state.done_atm, 1);
99 }
100 
handle_write()101 static void handle_write() {
102   grpc_slice slice = grpc_slice_from_copied_buffer(
103       state.response_payload, state.response_payload_length);
104 
105   grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
106   grpc_slice_buffer_add(&state.outgoing_buffer, slice);
107   grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr);
108 }
109 
handle_read(void * arg,grpc_error * error)110 static void handle_read(void* arg, grpc_error* error) {
111   GPR_ASSERT(error == GRPC_ERROR_NONE);
112   state.incoming_data_length += state.temp_incoming_buffer.length;
113 
114   size_t i;
115   for (i = 0; i < state.temp_incoming_buffer.count; i++) {
116     char* dump = grpc_dump_slice(state.temp_incoming_buffer.slices[i],
117                                  GPR_DUMP_HEX | GPR_DUMP_ASCII);
118     gpr_log(GPR_DEBUG, "Server received: %s", dump);
119     gpr_free(dump);
120   }
121 
122   gpr_log(GPR_DEBUG, "got %" PRIuPTR " bytes, expected %" PRIuPTR " bytes",
123           state.incoming_data_length,
124           SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD);
125   if (state.incoming_data_length >=
126       SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) {
127     handle_write();
128   } else {
129     grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
130   }
131 }
132 
on_connect(void * arg,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)133 static void on_connect(void* arg, grpc_endpoint* tcp,
134                        grpc_pollset* accepting_pollset,
135                        grpc_tcp_server_acceptor* acceptor) {
136   gpr_free(acceptor);
137   test_tcp_server* server = static_cast<test_tcp_server*>(arg);
138   GRPC_CLOSURE_INIT(&on_read, handle_read, nullptr, grpc_schedule_on_exec_ctx);
139   GRPC_CLOSURE_INIT(&on_write, done_write, nullptr, grpc_schedule_on_exec_ctx);
140   grpc_slice_buffer_init(&state.temp_incoming_buffer);
141   grpc_slice_buffer_init(&state.outgoing_buffer);
142   state.tcp = tcp;
143   state.incoming_data_length = 0;
144   grpc_endpoint_add_to_pollset(tcp, server->pollset);
145   grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read);
146 }
147 
n_sec_deadline(int seconds)148 static gpr_timespec n_sec_deadline(int seconds) {
149   return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
150                       gpr_time_from_seconds(seconds, GPR_TIMESPAN));
151 }
152 
start_rpc(int target_port,grpc_status_code expected_status,const char * expected_detail)153 static void start_rpc(int target_port, grpc_status_code expected_status,
154                       const char* expected_detail) {
155   grpc_op ops[6];
156   grpc_op* op;
157   grpc_metadata_array initial_metadata_recv;
158   grpc_metadata_array trailing_metadata_recv;
159   grpc_status_code status;
160   grpc_call_error error;
161   cq_verifier* cqv;
162   grpc_slice details;
163 
164   state.cq = grpc_completion_queue_create_for_next(nullptr);
165   cqv = cq_verifier_create(state.cq);
166   gpr_join_host_port(&state.target, "127.0.0.1", target_port);
167   state.channel = grpc_insecure_channel_create(state.target, nullptr, nullptr);
168   grpc_slice host = grpc_slice_from_static_string("localhost");
169   state.call = grpc_channel_create_call(
170       state.channel, nullptr, GRPC_PROPAGATE_DEFAULTS, state.cq,
171       grpc_slice_from_static_string("/Service/Method"), &host,
172       gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
173 
174   grpc_metadata_array_init(&initial_metadata_recv);
175   grpc_metadata_array_init(&trailing_metadata_recv);
176 
177   memset(ops, 0, sizeof(ops));
178   op = ops;
179   op->op = GRPC_OP_SEND_INITIAL_METADATA;
180   op->data.send_initial_metadata.count = 0;
181   op->flags = 0;
182   op->reserved = nullptr;
183   op++;
184   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
185   op->flags = 0;
186   op->reserved = nullptr;
187   op++;
188   op->op = GRPC_OP_RECV_INITIAL_METADATA;
189   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
190   op->flags = 0;
191   op->reserved = nullptr;
192   op++;
193   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
194   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
195   op->data.recv_status_on_client.status = &status;
196   op->data.recv_status_on_client.status_details = &details;
197   op->flags = 0;
198   op->reserved = nullptr;
199   op++;
200   error = grpc_call_start_batch(state.call, ops, static_cast<size_t>(op - ops),
201                                 tag(1), nullptr);
202 
203   GPR_ASSERT(GRPC_CALL_OK == error);
204 
205   CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
206   cq_verify(cqv);
207 
208   GPR_ASSERT(status == expected_status);
209   if (expected_detail != nullptr) {
210     GPR_ASSERT(-1 != grpc_slice_slice(details, grpc_slice_from_static_string(
211                                                    expected_detail)));
212   }
213 
214   grpc_metadata_array_destroy(&initial_metadata_recv);
215   grpc_metadata_array_destroy(&trailing_metadata_recv);
216   grpc_slice_unref(details);
217   cq_verifier_destroy(cqv);
218 }
219 
cleanup_rpc()220 static void cleanup_rpc() {
221   grpc_event ev;
222   grpc_slice_buffer_destroy_internal(&state.temp_incoming_buffer);
223   grpc_slice_buffer_destroy_internal(&state.outgoing_buffer);
224   grpc_call_unref(state.call);
225   grpc_completion_queue_shutdown(state.cq);
226   do {
227     ev = grpc_completion_queue_next(state.cq, n_sec_deadline(1), nullptr);
228   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
229   grpc_completion_queue_destroy(state.cq);
230   grpc_channel_destroy(state.channel);
231   gpr_free(state.target);
232 }
233 
234 typedef struct {
235   test_tcp_server* server;
236   gpr_event* signal_when_done;
237 } poll_args;
238 
actually_poll_server(void * arg)239 static void actually_poll_server(void* arg) {
240   poll_args* pa = static_cast<poll_args*>(arg);
241   gpr_timespec deadline = n_sec_deadline(10);
242   while (true) {
243     bool done = gpr_atm_acq_load(&state.done_atm) != 0;
244     gpr_timespec time_left =
245         gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME));
246     gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64 ".%09d", done,
247             time_left.tv_sec, time_left.tv_nsec);
248     if (done || gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) < 0) {
249       break;
250     }
251     test_tcp_server_poll(pa->server, 1);
252   }
253   gpr_event_set(pa->signal_when_done, (void*)1);
254   gpr_free(pa);
255 }
256 
poll_server_until_read_done(test_tcp_server * server,gpr_event * signal_when_done)257 static grpc_core::Thread* poll_server_until_read_done(
258     test_tcp_server* server, gpr_event* signal_when_done) {
259   gpr_atm_rel_store(&state.done_atm, 0);
260   state.write_done = 0;
261   poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa)));
262   pa->server = server;
263   pa->signal_when_done = signal_when_done;
264   auto* th = grpc_core::New<grpc_core::Thread>("grpc_poll_server",
265                                                actually_poll_server, pa);
266   th->Start();
267   return th;
268 }
269 
run_test(const char * response_payload,size_t response_payload_length,grpc_status_code expected_status,const char * expected_detail)270 static void run_test(const char* response_payload,
271                      size_t response_payload_length,
272                      grpc_status_code expected_status,
273                      const char* expected_detail) {
274   test_tcp_server test_server;
275   grpc_core::ExecCtx exec_ctx;
276   gpr_event ev;
277 
278   grpc_init();
279   gpr_event_init(&ev);
280   server_port = grpc_pick_unused_port_or_die();
281   test_tcp_server_init(&test_server, on_connect, &test_server);
282   test_tcp_server_start(&test_server, server_port);
283   state.response_payload = response_payload;
284   state.response_payload_length = response_payload_length;
285 
286   /* poll server until sending out the response */
287   grpc_core::UniquePtr<grpc_core::Thread> thdptr(
288       poll_server_until_read_done(&test_server, &ev));
289   start_rpc(server_port, expected_status, expected_detail);
290   gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
291   thdptr->Join();
292 
293   /* clean up */
294   grpc_endpoint_shutdown(state.tcp,
295                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
296   grpc_endpoint_destroy(state.tcp);
297   cleanup_rpc();
298   grpc_core::ExecCtx::Get()->Flush();
299   test_tcp_server_destroy(&test_server);
300 
301   grpc_shutdown();
302 }
303 
main(int argc,char ** argv)304 int main(int argc, char** argv) {
305   grpc_test_init(argc, argv);
306   grpc_init();
307 
308   /* status defined in hpack static table */
309   run_test(HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1, GRPC_STATUS_CANCELLED,
310            HTTP2_DETAIL_MSG(204));
311 
312   run_test(HTTP2_RESP(206), sizeof(HTTP2_RESP(206)) - 1, GRPC_STATUS_CANCELLED,
313            HTTP2_DETAIL_MSG(206));
314 
315   run_test(HTTP2_RESP(304), sizeof(HTTP2_RESP(304)) - 1, GRPC_STATUS_CANCELLED,
316            HTTP2_DETAIL_MSG(304));
317 
318   run_test(HTTP2_RESP(400), sizeof(HTTP2_RESP(400)) - 1, GRPC_STATUS_CANCELLED,
319            HTTP2_DETAIL_MSG(400));
320 
321   run_test(HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1, GRPC_STATUS_CANCELLED,
322            HTTP2_DETAIL_MSG(404));
323 
324   run_test(HTTP2_RESP(500), sizeof(HTTP2_RESP(500)) - 1, GRPC_STATUS_CANCELLED,
325            HTTP2_DETAIL_MSG(500));
326 
327   /* status not defined in hpack static table */
328   run_test(HTTP2_RESP(401), sizeof(HTTP2_RESP(401)) - 1, GRPC_STATUS_CANCELLED,
329            HTTP2_DETAIL_MSG(401));
330 
331   run_test(HTTP2_RESP(403), sizeof(HTTP2_RESP(403)) - 1, GRPC_STATUS_CANCELLED,
332            HTTP2_DETAIL_MSG(403));
333 
334   run_test(HTTP2_RESP(502), sizeof(HTTP2_RESP(502)) - 1, GRPC_STATUS_CANCELLED,
335            HTTP2_DETAIL_MSG(502));
336 
337   /* unparseable response */
338   run_test(UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1, GRPC_STATUS_UNKNOWN,
339            nullptr);
340 
341   /* http1 response */
342   run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE,
343            HTTP1_DETAIL_MSG);
344 
345   grpc_shutdown();
346   return 0;
347 }
348