1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/core/end2end/fixtures/proxy.h"
20
21 #include <grpc/byte_buffer.h>
22 #include <grpc/impl/channel_arg_names.h>
23 #include <grpc/impl/propagation_bits.h>
24 #include <grpc/slice.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/sync.h>
28 #include <grpc/support/time.h>
29 #include <string.h>
30
31 #include <cstddef>
32 #include <string>
33 #include <utility>
34
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/surface/call.h"
39 #include "src/core/util/crash.h"
40 #include "src/core/util/host_port.h"
41 #include "src/core/util/thd.h"
42 #include "test/core/test_util/port.h"
43
44 struct grpc_end2end_proxy {
grpc_end2end_proxygrpc_end2end_proxy45 grpc_end2end_proxy()
46 : cq(nullptr),
47 server(nullptr),
48 client(nullptr),
49 shutdown(false),
50 new_call(nullptr) {
51 memset(&new_call_details, 0, sizeof(new_call_details));
52 memset(&new_call_metadata, 0, sizeof(new_call_metadata));
53 }
54 grpc_core::Thread thd;
55 std::string proxy_port;
56 std::string server_port;
57 grpc_completion_queue* cq;
58 grpc_server* server;
59 grpc_channel* client;
60
61 int shutdown;
62
63 // requested call
64 grpc_call* new_call;
65 grpc_call_details new_call_details;
66 grpc_metadata_array new_call_metadata;
67 };
68
69 typedef struct {
70 void (*func)(void* arg, int success);
71 void* arg;
72 } closure;
73
74 typedef struct {
75 gpr_refcount refs;
76 grpc_end2end_proxy* proxy;
77
78 grpc_call* c2p;
79 grpc_call* p2s;
80
81 grpc_metadata_array c2p_initial_metadata;
82 grpc_metadata_array p2s_initial_metadata;
83
84 grpc_core::Mutex* initial_metadata_mu;
85 bool p2s_initial_metadata_received ABSL_GUARDED_BY(initial_metadata_mu);
86 grpc_op* deferred_trailing_metadata_op ABSL_GUARDED_BY(initial_metadata_mu);
87
88 grpc_byte_buffer* c2p_msg;
89 grpc_byte_buffer* p2s_msg;
90
91 grpc_metadata_array p2s_trailing_metadata;
92 grpc_status_code p2s_status;
93 grpc_slice p2s_status_details;
94
95 int c2p_server_cancelled;
96 } proxy_call;
97
98 static void thread_main(void* arg);
99 static void request_call(grpc_end2end_proxy* proxy);
100
grpc_end2end_proxy_create(const grpc_end2end_proxy_def * def,const grpc_channel_args * client_args,const grpc_channel_args * server_args)101 grpc_end2end_proxy* grpc_end2end_proxy_create(
102 const grpc_end2end_proxy_def* def, const grpc_channel_args* client_args,
103 const grpc_channel_args* server_args) {
104 int proxy_port = grpc_pick_unused_port_or_die();
105 int server_port = grpc_pick_unused_port_or_die();
106
107 grpc_end2end_proxy* proxy = new grpc_end2end_proxy();
108
109 proxy->proxy_port = grpc_core::JoinHostPort("localhost", proxy_port);
110 proxy->server_port = grpc_core::JoinHostPort("localhost", server_port);
111
112 VLOG(2) << "PROXY ADDR:" << proxy->proxy_port
113 << " BACKEND:" << proxy->server_port;
114
115 proxy->cq = grpc_completion_queue_create_for_next(nullptr);
116 proxy->server = def->create_server(proxy->proxy_port.c_str(), server_args);
117
118 const char* arg_to_remove = GRPC_ARG_ENABLE_RETRIES;
119 grpc_arg arg_to_add = grpc_channel_arg_integer_create(
120 const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0);
121 const grpc_channel_args* proxy_client_args =
122 grpc_channel_args_copy_and_add_and_remove(client_args, &arg_to_remove, 1,
123 &arg_to_add, 1);
124 proxy->client =
125 def->create_client(proxy->server_port.c_str(), proxy_client_args);
126 grpc_channel_args_destroy(proxy_client_args);
127
128 grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
129 grpc_server_start(proxy->server);
130
131 grpc_call_details_init(&proxy->new_call_details);
132 proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
133 proxy->thd.Start();
134
135 request_call(proxy);
136
137 return proxy;
138 }
139
new_closure(void (* func)(void * arg,int success),void * arg)140 static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
141 closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
142 cl->func = func;
143 cl->arg = arg;
144 return cl;
145 }
146
shutdown_complete(void * arg,int)147 static void shutdown_complete(void* arg, int /*success*/) {
148 grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
149 proxy->shutdown = 1;
150 grpc_completion_queue_shutdown(proxy->cq);
151 }
152
grpc_end2end_proxy_destroy(grpc_end2end_proxy * proxy)153 void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
154 grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
155 new_closure(shutdown_complete, proxy));
156 proxy->thd.Join();
157 grpc_server_destroy(proxy->server);
158 grpc_channel_destroy(proxy->client);
159 grpc_completion_queue_destroy(proxy->cq);
160 grpc_call_details_destroy(&proxy->new_call_details);
161 delete proxy;
162 }
163
unrefpc(proxy_call * pc,const char *)164 static void unrefpc(proxy_call* pc, const char* /*reason*/) {
165 if (gpr_unref(&pc->refs)) {
166 grpc_call_unref(pc->c2p);
167 grpc_call_unref(pc->p2s);
168 grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
169 grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
170 grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
171 grpc_slice_unref(pc->p2s_status_details);
172 {
173 grpc_core::MutexLock lock(pc->initial_metadata_mu);
174 if (pc->deferred_trailing_metadata_op != nullptr) {
175 gpr_free(pc->deferred_trailing_metadata_op);
176 }
177 }
178 delete pc->initial_metadata_mu;
179 gpr_free(pc);
180 }
181 }
182
refpc(proxy_call * pc,const char *)183 static void refpc(proxy_call* pc, const char* /*reason*/) {
184 gpr_ref(&pc->refs);
185 }
186
on_c2p_sent_initial_metadata(void * arg,int)187 static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) {
188 proxy_call* pc = static_cast<proxy_call*>(arg);
189 unrefpc(pc, "on_c2p_sent_initial_metadata");
190 }
191
on_c2p_sent_status(void * arg,int)192 static void on_c2p_sent_status(void* arg, int /*success*/) {
193 proxy_call* pc = static_cast<proxy_call*>(arg);
194 unrefpc(pc, "on_c2p_sent_status");
195 }
196
on_p2s_recv_initial_metadata(void * arg,int)197 static void on_p2s_recv_initial_metadata(void* arg, int /*success*/) {
198 proxy_call* pc = static_cast<proxy_call*>(arg);
199 grpc_op op;
200 grpc_call_error err;
201 memset(&op, 0, sizeof(op));
202 if (!pc->proxy->shutdown) {
203 if (!grpc_call_is_trailers_only(pc->p2s)) {
204 op.op = GRPC_OP_SEND_INITIAL_METADATA;
205 op.flags = 0;
206 op.reserved = nullptr;
207 op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
208 op.data.send_initial_metadata.metadata =
209 pc->p2s_initial_metadata.metadata;
210 refpc(pc, "on_c2p_sent_initial_metadata");
211 err = grpc_call_start_batch(pc->c2p, &op, 1,
212 new_closure(on_c2p_sent_initial_metadata, pc),
213 nullptr);
214 CHECK_EQ(err, GRPC_CALL_OK);
215 }
216 grpc_op* deferred_trailing_metadata_op = nullptr;
217 {
218 grpc_core::MutexLock lock(pc->initial_metadata_mu);
219 // Start the batch without the mutex held, just in case.
220 // This will be nullptr if the trailing metadata has not yet been seen.
221 deferred_trailing_metadata_op = pc->deferred_trailing_metadata_op;
222 pc->p2s_initial_metadata_received = true;
223 }
224 if (deferred_trailing_metadata_op != nullptr) {
225 refpc(pc, "on_c2p_sent_status");
226 err = grpc_call_start_batch(pc->c2p, deferred_trailing_metadata_op, 1,
227 new_closure(on_c2p_sent_status, pc), nullptr);
228 CHECK_EQ(err, GRPC_CALL_OK);
229 }
230 }
231 unrefpc(pc, "on_p2s_recv_initial_metadata");
232 }
233
on_p2s_sent_initial_metadata(void * arg,int)234 static void on_p2s_sent_initial_metadata(void* arg, int /*success*/) {
235 proxy_call* pc = static_cast<proxy_call*>(arg);
236 unrefpc(pc, "on_p2s_sent_initial_metadata");
237 }
238
239 static void on_c2p_recv_msg(void* arg, int success);
240
on_p2s_sent_message(void * arg,int success)241 static void on_p2s_sent_message(void* arg, int success) {
242 proxy_call* pc = static_cast<proxy_call*>(arg);
243 grpc_op op;
244 grpc_call_error err;
245
246 grpc_byte_buffer_destroy(std::exchange(pc->c2p_msg, nullptr));
247 if (!pc->proxy->shutdown && success) {
248 op.op = GRPC_OP_RECV_MESSAGE;
249 op.flags = 0;
250 op.reserved = nullptr;
251 op.data.recv_message.recv_message = &pc->c2p_msg;
252 refpc(pc, "on_c2p_recv_msg");
253 err = grpc_call_start_batch(pc->c2p, &op, 1,
254 new_closure(on_c2p_recv_msg, pc), nullptr);
255 CHECK_EQ(err, GRPC_CALL_OK);
256 }
257
258 unrefpc(pc, "on_p2s_sent_message");
259 }
260
on_p2s_sent_close(void * arg,int)261 static void on_p2s_sent_close(void* arg, int /*success*/) {
262 proxy_call* pc = static_cast<proxy_call*>(arg);
263 unrefpc(pc, "on_p2s_sent_close");
264 }
265
on_c2p_recv_msg(void * arg,int success)266 static void on_c2p_recv_msg(void* arg, int success) {
267 proxy_call* pc = static_cast<proxy_call*>(arg);
268 grpc_op op;
269 grpc_call_error err;
270
271 if (!pc->proxy->shutdown && success) {
272 if (pc->c2p_msg != nullptr) {
273 op.op = GRPC_OP_SEND_MESSAGE;
274 op.flags = 0;
275 op.reserved = nullptr;
276 op.data.send_message.send_message = pc->c2p_msg;
277 refpc(pc, "on_p2s_sent_message");
278 err = grpc_call_start_batch(
279 pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
280 CHECK_EQ(err, GRPC_CALL_OK);
281 } else {
282 op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
283 op.flags = 0;
284 op.reserved = nullptr;
285 refpc(pc, "on_p2s_sent_close");
286 err = grpc_call_start_batch(pc->p2s, &op, 1,
287 new_closure(on_p2s_sent_close, pc), nullptr);
288 CHECK_EQ(err, GRPC_CALL_OK);
289 }
290 } else {
291 if (pc->c2p_msg != nullptr) {
292 grpc_byte_buffer_destroy(pc->c2p_msg);
293 }
294 }
295
296 unrefpc(pc, "on_c2p_recv_msg");
297 }
298
299 static void on_p2s_recv_msg(void* arg, int success);
300
on_c2p_sent_message(void * arg,int success)301 static void on_c2p_sent_message(void* arg, int success) {
302 proxy_call* pc = static_cast<proxy_call*>(arg);
303 grpc_op op;
304 grpc_call_error err;
305
306 grpc_byte_buffer_destroy(pc->p2s_msg);
307 if (!pc->proxy->shutdown && success) {
308 op.op = GRPC_OP_RECV_MESSAGE;
309 op.flags = 0;
310 op.reserved = nullptr;
311 op.data.recv_message.recv_message = &pc->p2s_msg;
312 refpc(pc, "on_p2s_recv_msg");
313 err = grpc_call_start_batch(pc->p2s, &op, 1,
314 new_closure(on_p2s_recv_msg, pc), nullptr);
315 CHECK_EQ(err, GRPC_CALL_OK);
316 }
317
318 unrefpc(pc, "on_c2p_sent_message");
319 }
320
on_p2s_recv_msg(void * arg,int success)321 static void on_p2s_recv_msg(void* arg, int success) {
322 proxy_call* pc = static_cast<proxy_call*>(arg);
323 grpc_op op;
324 grpc_call_error err;
325
326 if (!pc->proxy->shutdown && success && pc->p2s_msg) {
327 op.op = GRPC_OP_SEND_MESSAGE;
328 op.flags = 0;
329 op.reserved = nullptr;
330 op.data.send_message.send_message = pc->p2s_msg;
331 refpc(pc, "on_c2p_sent_message");
332 err = grpc_call_start_batch(pc->c2p, &op, 1,
333 new_closure(on_c2p_sent_message, pc), nullptr);
334 CHECK_EQ(err, GRPC_CALL_OK);
335 } else {
336 grpc_byte_buffer_destroy(pc->p2s_msg);
337 }
338 unrefpc(pc, "on_p2s_recv_msg");
339 }
340
on_p2s_status(void * arg,int success)341 static void on_p2s_status(void* arg, int success) {
342 proxy_call* pc = static_cast<proxy_call*>(arg);
343 grpc_op op[2]; // Possibly send empty initial metadata also if trailers-only
344 grpc_call_error err;
345
346 memset(op, 0, sizeof(op));
347
348 if (!pc->proxy->shutdown) {
349 CHECK(success);
350
351 int op_count = 0;
352 if (grpc_call_is_trailers_only(pc->p2s)) {
353 op[op_count].op = GRPC_OP_SEND_INITIAL_METADATA;
354 op_count++;
355 }
356
357 op[op_count].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
358 op[op_count].flags = 0;
359 op[op_count].reserved = nullptr;
360 op[op_count].data.send_status_from_server.trailing_metadata_count =
361 pc->p2s_trailing_metadata.count;
362 op[op_count].data.send_status_from_server.trailing_metadata =
363 pc->p2s_trailing_metadata.metadata;
364 op[op_count].data.send_status_from_server.status = pc->p2s_status;
365 op[op_count].data.send_status_from_server.status_details =
366 &pc->p2s_status_details;
367 op_count++;
368
369 // TODO(ctiller): The current core implementation requires initial
370 // metadata batches to be started *after* initial metadata batches have
371 // been completed. The C++ Callback API does this accounting too, for
372 // example.
373 //
374 // This entire fixture will need a redesign when the batch API goes away.
375 bool op_deferred = false;
376 {
377 grpc_core::MutexLock lock(pc->initial_metadata_mu);
378 if (!pc->p2s_initial_metadata_received) {
379 op_deferred = true;
380 pc->deferred_trailing_metadata_op =
381 static_cast<grpc_op*>(gpr_malloc(sizeof(op)));
382 memcpy(pc->deferred_trailing_metadata_op, &op, sizeof(op));
383 }
384 }
385 if (!op_deferred) {
386 refpc(pc, "on_c2p_sent_status");
387 err = grpc_call_start_batch(pc->c2p, op, op_count,
388 new_closure(on_c2p_sent_status, pc), nullptr);
389 CHECK_EQ(err, GRPC_CALL_OK);
390 }
391 }
392
393 unrefpc(pc, "on_p2s_status");
394 }
395
on_c2p_closed(void * arg,int)396 static void on_c2p_closed(void* arg, int /*success*/) {
397 proxy_call* pc = static_cast<proxy_call*>(arg);
398 unrefpc(pc, "on_c2p_closed");
399 }
400
on_new_call(void * arg,int success)401 static void on_new_call(void* arg, int success) {
402 grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
403 grpc_call_error err;
404
405 if (success) {
406 grpc_op op;
407 memset(&op, 0, sizeof(op));
408 proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
409 memset(pc, 0, sizeof(*pc));
410 pc->proxy = proxy;
411 std::swap(pc->c2p_initial_metadata, proxy->new_call_metadata);
412 pc->initial_metadata_mu = new grpc_core::Mutex();
413 {
414 grpc_core::MutexLock lock(pc->initial_metadata_mu);
415 pc->p2s_initial_metadata_received = false;
416 pc->deferred_trailing_metadata_op = nullptr;
417 }
418 pc->c2p = proxy->new_call;
419 pc->p2s = grpc_channel_create_call(
420 proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
421 proxy->new_call_details.method, &proxy->new_call_details.host,
422 proxy->new_call_details.deadline, nullptr);
423 gpr_ref_init(&pc->refs, 1);
424
425 op.reserved = nullptr;
426
427 // Proxy: receive initial metadata from the server
428 op.op = GRPC_OP_RECV_INITIAL_METADATA;
429 op.flags = 0;
430 op.data.recv_initial_metadata.recv_initial_metadata =
431 &pc->p2s_initial_metadata;
432 refpc(pc, "on_p2s_recv_initial_metadata");
433 err = grpc_call_start_batch(pc->p2s, &op, 1,
434 new_closure(on_p2s_recv_initial_metadata, pc),
435 nullptr);
436 CHECK_EQ(err, GRPC_CALL_OK);
437
438 // Proxy: send initial metadata to the server
439 op.op = GRPC_OP_SEND_INITIAL_METADATA;
440 op.flags = 0;
441 op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
442 op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
443 refpc(pc, "on_p2s_sent_initial_metadata");
444 err = grpc_call_start_batch(pc->p2s, &op, 1,
445 new_closure(on_p2s_sent_initial_metadata, pc),
446 nullptr);
447 CHECK_EQ(err, GRPC_CALL_OK);
448
449 // Client: receive message from the proxy
450 op.op = GRPC_OP_RECV_MESSAGE;
451 op.flags = 0;
452 op.data.recv_message.recv_message = &pc->c2p_msg;
453 refpc(pc, "on_c2p_recv_msg");
454 err = grpc_call_start_batch(pc->c2p, &op, 1,
455 new_closure(on_c2p_recv_msg, pc), nullptr);
456 CHECK_EQ(err, GRPC_CALL_OK);
457
458 // Proxy: receive message from the server
459 op.op = GRPC_OP_RECV_MESSAGE;
460 op.flags = 0;
461 op.data.recv_message.recv_message = &pc->p2s_msg;
462 refpc(pc, "on_p2s_recv_msg");
463 err = grpc_call_start_batch(pc->p2s, &op, 1,
464 new_closure(on_p2s_recv_msg, pc), nullptr);
465 CHECK_EQ(err, GRPC_CALL_OK);
466
467 // Proxy: receive status from the server
468 op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
469 op.flags = 0;
470 op.data.recv_status_on_client.trailing_metadata =
471 &pc->p2s_trailing_metadata;
472 op.data.recv_status_on_client.status = &pc->p2s_status;
473 op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
474 refpc(pc, "on_p2s_status");
475 err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
476 nullptr);
477 CHECK_EQ(err, GRPC_CALL_OK);
478
479 // Client: receive close-ack from the proxy
480 op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
481 op.flags = 0;
482 op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
483 refpc(pc, "on_c2p_closed");
484 err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
485 nullptr);
486 CHECK_EQ(err, GRPC_CALL_OK);
487
488 request_call(proxy);
489
490 grpc_call_details_destroy(&proxy->new_call_details);
491 grpc_call_details_init(&proxy->new_call_details);
492
493 unrefpc(pc, "init");
494 } else {
495 CHECK_EQ(proxy->new_call, nullptr);
496 }
497 }
498
request_call(grpc_end2end_proxy * proxy)499 static void request_call(grpc_end2end_proxy* proxy) {
500 proxy->new_call = nullptr;
501 CHECK(GRPC_CALL_OK ==
502 grpc_server_request_call(proxy->server, &proxy->new_call,
503 &proxy->new_call_details,
504 &proxy->new_call_metadata, proxy->cq,
505 proxy->cq, new_closure(on_new_call, proxy)));
506 }
507
thread_main(void * arg)508 static void thread_main(void* arg) {
509 grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
510 closure* cl;
511 for (;;) {
512 grpc_event ev = grpc_completion_queue_next(
513 proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
514 switch (ev.type) {
515 case GRPC_QUEUE_TIMEOUT:
516 grpc_core::Crash("Should never reach here");
517 case GRPC_QUEUE_SHUTDOWN:
518 return;
519 case GRPC_OP_COMPLETE:
520 cl = static_cast<closure*>(ev.tag);
521 cl->func(cl->arg, ev.success);
522 gpr_free(cl);
523 break;
524 }
525 }
526 }
527
grpc_end2end_proxy_get_client_target(grpc_end2end_proxy * proxy)528 const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
529 return proxy->proxy_port.c_str();
530 }
531
grpc_end2end_proxy_get_server_port(grpc_end2end_proxy * proxy)532 const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
533 return proxy->server_port.c_str();
534 }
535