• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/core/end2end/fixtures/proxy.h"
20 
21 #include <grpc/byte_buffer.h>
22 #include <grpc/impl/channel_arg_names.h>
23 #include <grpc/impl/propagation_bits.h>
24 #include <grpc/slice.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/sync.h>
28 #include <grpc/support/time.h>
29 #include <string.h>
30 
31 #include <cstddef>
32 #include <string>
33 #include <utility>
34 
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/surface/call.h"
39 #include "src/core/util/crash.h"
40 #include "src/core/util/host_port.h"
41 #include "src/core/util/thd.h"
42 #include "test/core/test_util/port.h"
43 
44 struct grpc_end2end_proxy {
grpc_end2end_proxygrpc_end2end_proxy45   grpc_end2end_proxy()
46       : cq(nullptr),
47         server(nullptr),
48         client(nullptr),
49         shutdown(false),
50         new_call(nullptr) {
51     memset(&new_call_details, 0, sizeof(new_call_details));
52     memset(&new_call_metadata, 0, sizeof(new_call_metadata));
53   }
54   grpc_core::Thread thd;
55   std::string proxy_port;
56   std::string server_port;
57   grpc_completion_queue* cq;
58   grpc_server* server;
59   grpc_channel* client;
60 
61   int shutdown;
62 
63   // requested call
64   grpc_call* new_call;
65   grpc_call_details new_call_details;
66   grpc_metadata_array new_call_metadata;
67 };
68 
69 typedef struct {
70   void (*func)(void* arg, int success);
71   void* arg;
72 } closure;
73 
74 typedef struct {
75   gpr_refcount refs;
76   grpc_end2end_proxy* proxy;
77 
78   grpc_call* c2p;
79   grpc_call* p2s;
80 
81   grpc_metadata_array c2p_initial_metadata;
82   grpc_metadata_array p2s_initial_metadata;
83 
84   grpc_core::Mutex* initial_metadata_mu;
85   bool p2s_initial_metadata_received ABSL_GUARDED_BY(initial_metadata_mu);
86   grpc_op* deferred_trailing_metadata_op ABSL_GUARDED_BY(initial_metadata_mu);
87 
88   grpc_byte_buffer* c2p_msg;
89   grpc_byte_buffer* p2s_msg;
90 
91   grpc_metadata_array p2s_trailing_metadata;
92   grpc_status_code p2s_status;
93   grpc_slice p2s_status_details;
94 
95   int c2p_server_cancelled;
96 } proxy_call;
97 
98 static void thread_main(void* arg);
99 static void request_call(grpc_end2end_proxy* proxy);
100 
grpc_end2end_proxy_create(const grpc_end2end_proxy_def * def,const grpc_channel_args * client_args,const grpc_channel_args * server_args)101 grpc_end2end_proxy* grpc_end2end_proxy_create(
102     const grpc_end2end_proxy_def* def, const grpc_channel_args* client_args,
103     const grpc_channel_args* server_args) {
104   int proxy_port = grpc_pick_unused_port_or_die();
105   int server_port = grpc_pick_unused_port_or_die();
106 
107   grpc_end2end_proxy* proxy = new grpc_end2end_proxy();
108 
109   proxy->proxy_port = grpc_core::JoinHostPort("localhost", proxy_port);
110   proxy->server_port = grpc_core::JoinHostPort("localhost", server_port);
111 
112   VLOG(2) << "PROXY ADDR:" << proxy->proxy_port
113           << " BACKEND:" << proxy->server_port;
114 
115   proxy->cq = grpc_completion_queue_create_for_next(nullptr);
116   proxy->server = def->create_server(proxy->proxy_port.c_str(), server_args);
117 
118   const char* arg_to_remove = GRPC_ARG_ENABLE_RETRIES;
119   grpc_arg arg_to_add = grpc_channel_arg_integer_create(
120       const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0);
121   const grpc_channel_args* proxy_client_args =
122       grpc_channel_args_copy_and_add_and_remove(client_args, &arg_to_remove, 1,
123                                                 &arg_to_add, 1);
124   proxy->client =
125       def->create_client(proxy->server_port.c_str(), proxy_client_args);
126   grpc_channel_args_destroy(proxy_client_args);
127 
128   grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
129   grpc_server_start(proxy->server);
130 
131   grpc_call_details_init(&proxy->new_call_details);
132   proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
133   proxy->thd.Start();
134 
135   request_call(proxy);
136 
137   return proxy;
138 }
139 
new_closure(void (* func)(void * arg,int success),void * arg)140 static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
141   closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
142   cl->func = func;
143   cl->arg = arg;
144   return cl;
145 }
146 
shutdown_complete(void * arg,int)147 static void shutdown_complete(void* arg, int /*success*/) {
148   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
149   proxy->shutdown = 1;
150   grpc_completion_queue_shutdown(proxy->cq);
151 }
152 
grpc_end2end_proxy_destroy(grpc_end2end_proxy * proxy)153 void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
154   grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
155                                   new_closure(shutdown_complete, proxy));
156   proxy->thd.Join();
157   grpc_server_destroy(proxy->server);
158   grpc_channel_destroy(proxy->client);
159   grpc_completion_queue_destroy(proxy->cq);
160   grpc_call_details_destroy(&proxy->new_call_details);
161   delete proxy;
162 }
163 
unrefpc(proxy_call * pc,const char *)164 static void unrefpc(proxy_call* pc, const char* /*reason*/) {
165   if (gpr_unref(&pc->refs)) {
166     grpc_call_unref(pc->c2p);
167     grpc_call_unref(pc->p2s);
168     grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
169     grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
170     grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
171     grpc_slice_unref(pc->p2s_status_details);
172     {
173       grpc_core::MutexLock lock(pc->initial_metadata_mu);
174       if (pc->deferred_trailing_metadata_op != nullptr) {
175         gpr_free(pc->deferred_trailing_metadata_op);
176       }
177     }
178     delete pc->initial_metadata_mu;
179     gpr_free(pc);
180   }
181 }
182 
refpc(proxy_call * pc,const char *)183 static void refpc(proxy_call* pc, const char* /*reason*/) {
184   gpr_ref(&pc->refs);
185 }
186 
on_c2p_sent_initial_metadata(void * arg,int)187 static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) {
188   proxy_call* pc = static_cast<proxy_call*>(arg);
189   unrefpc(pc, "on_c2p_sent_initial_metadata");
190 }
191 
on_c2p_sent_status(void * arg,int)192 static void on_c2p_sent_status(void* arg, int /*success*/) {
193   proxy_call* pc = static_cast<proxy_call*>(arg);
194   unrefpc(pc, "on_c2p_sent_status");
195 }
196 
on_p2s_recv_initial_metadata(void * arg,int)197 static void on_p2s_recv_initial_metadata(void* arg, int /*success*/) {
198   proxy_call* pc = static_cast<proxy_call*>(arg);
199   grpc_op op;
200   grpc_call_error err;
201   memset(&op, 0, sizeof(op));
202   if (!pc->proxy->shutdown) {
203     if (!grpc_call_is_trailers_only(pc->p2s)) {
204       op.op = GRPC_OP_SEND_INITIAL_METADATA;
205       op.flags = 0;
206       op.reserved = nullptr;
207       op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
208       op.data.send_initial_metadata.metadata =
209           pc->p2s_initial_metadata.metadata;
210       refpc(pc, "on_c2p_sent_initial_metadata");
211       err = grpc_call_start_batch(pc->c2p, &op, 1,
212                                   new_closure(on_c2p_sent_initial_metadata, pc),
213                                   nullptr);
214       CHECK_EQ(err, GRPC_CALL_OK);
215     }
216     grpc_op* deferred_trailing_metadata_op = nullptr;
217     {
218       grpc_core::MutexLock lock(pc->initial_metadata_mu);
219       // Start the batch without the mutex held, just in case.
220       // This will be nullptr if the trailing metadata has not yet been seen.
221       deferred_trailing_metadata_op = pc->deferred_trailing_metadata_op;
222       pc->p2s_initial_metadata_received = true;
223     }
224     if (deferred_trailing_metadata_op != nullptr) {
225       refpc(pc, "on_c2p_sent_status");
226       err = grpc_call_start_batch(pc->c2p, deferred_trailing_metadata_op, 1,
227                                   new_closure(on_c2p_sent_status, pc), nullptr);
228       CHECK_EQ(err, GRPC_CALL_OK);
229     }
230   }
231   unrefpc(pc, "on_p2s_recv_initial_metadata");
232 }
233 
on_p2s_sent_initial_metadata(void * arg,int)234 static void on_p2s_sent_initial_metadata(void* arg, int /*success*/) {
235   proxy_call* pc = static_cast<proxy_call*>(arg);
236   unrefpc(pc, "on_p2s_sent_initial_metadata");
237 }
238 
239 static void on_c2p_recv_msg(void* arg, int success);
240 
on_p2s_sent_message(void * arg,int success)241 static void on_p2s_sent_message(void* arg, int success) {
242   proxy_call* pc = static_cast<proxy_call*>(arg);
243   grpc_op op;
244   grpc_call_error err;
245 
246   grpc_byte_buffer_destroy(std::exchange(pc->c2p_msg, nullptr));
247   if (!pc->proxy->shutdown && success) {
248     op.op = GRPC_OP_RECV_MESSAGE;
249     op.flags = 0;
250     op.reserved = nullptr;
251     op.data.recv_message.recv_message = &pc->c2p_msg;
252     refpc(pc, "on_c2p_recv_msg");
253     err = grpc_call_start_batch(pc->c2p, &op, 1,
254                                 new_closure(on_c2p_recv_msg, pc), nullptr);
255     CHECK_EQ(err, GRPC_CALL_OK);
256   }
257 
258   unrefpc(pc, "on_p2s_sent_message");
259 }
260 
on_p2s_sent_close(void * arg,int)261 static void on_p2s_sent_close(void* arg, int /*success*/) {
262   proxy_call* pc = static_cast<proxy_call*>(arg);
263   unrefpc(pc, "on_p2s_sent_close");
264 }
265 
on_c2p_recv_msg(void * arg,int success)266 static void on_c2p_recv_msg(void* arg, int success) {
267   proxy_call* pc = static_cast<proxy_call*>(arg);
268   grpc_op op;
269   grpc_call_error err;
270 
271   if (!pc->proxy->shutdown && success) {
272     if (pc->c2p_msg != nullptr) {
273       op.op = GRPC_OP_SEND_MESSAGE;
274       op.flags = 0;
275       op.reserved = nullptr;
276       op.data.send_message.send_message = pc->c2p_msg;
277       refpc(pc, "on_p2s_sent_message");
278       err = grpc_call_start_batch(
279           pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
280       CHECK_EQ(err, GRPC_CALL_OK);
281     } else {
282       op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
283       op.flags = 0;
284       op.reserved = nullptr;
285       refpc(pc, "on_p2s_sent_close");
286       err = grpc_call_start_batch(pc->p2s, &op, 1,
287                                   new_closure(on_p2s_sent_close, pc), nullptr);
288       CHECK_EQ(err, GRPC_CALL_OK);
289     }
290   } else {
291     if (pc->c2p_msg != nullptr) {
292       grpc_byte_buffer_destroy(pc->c2p_msg);
293     }
294   }
295 
296   unrefpc(pc, "on_c2p_recv_msg");
297 }
298 
299 static void on_p2s_recv_msg(void* arg, int success);
300 
on_c2p_sent_message(void * arg,int success)301 static void on_c2p_sent_message(void* arg, int success) {
302   proxy_call* pc = static_cast<proxy_call*>(arg);
303   grpc_op op;
304   grpc_call_error err;
305 
306   grpc_byte_buffer_destroy(pc->p2s_msg);
307   if (!pc->proxy->shutdown && success) {
308     op.op = GRPC_OP_RECV_MESSAGE;
309     op.flags = 0;
310     op.reserved = nullptr;
311     op.data.recv_message.recv_message = &pc->p2s_msg;
312     refpc(pc, "on_p2s_recv_msg");
313     err = grpc_call_start_batch(pc->p2s, &op, 1,
314                                 new_closure(on_p2s_recv_msg, pc), nullptr);
315     CHECK_EQ(err, GRPC_CALL_OK);
316   }
317 
318   unrefpc(pc, "on_c2p_sent_message");
319 }
320 
on_p2s_recv_msg(void * arg,int success)321 static void on_p2s_recv_msg(void* arg, int success) {
322   proxy_call* pc = static_cast<proxy_call*>(arg);
323   grpc_op op;
324   grpc_call_error err;
325 
326   if (!pc->proxy->shutdown && success && pc->p2s_msg) {
327     op.op = GRPC_OP_SEND_MESSAGE;
328     op.flags = 0;
329     op.reserved = nullptr;
330     op.data.send_message.send_message = pc->p2s_msg;
331     refpc(pc, "on_c2p_sent_message");
332     err = grpc_call_start_batch(pc->c2p, &op, 1,
333                                 new_closure(on_c2p_sent_message, pc), nullptr);
334     CHECK_EQ(err, GRPC_CALL_OK);
335   } else {
336     grpc_byte_buffer_destroy(pc->p2s_msg);
337   }
338   unrefpc(pc, "on_p2s_recv_msg");
339 }
340 
on_p2s_status(void * arg,int success)341 static void on_p2s_status(void* arg, int success) {
342   proxy_call* pc = static_cast<proxy_call*>(arg);
343   grpc_op op[2];  // Possibly send empty initial metadata also if trailers-only
344   grpc_call_error err;
345 
346   memset(op, 0, sizeof(op));
347 
348   if (!pc->proxy->shutdown) {
349     CHECK(success);
350 
351     int op_count = 0;
352     if (grpc_call_is_trailers_only(pc->p2s)) {
353       op[op_count].op = GRPC_OP_SEND_INITIAL_METADATA;
354       op_count++;
355     }
356 
357     op[op_count].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
358     op[op_count].flags = 0;
359     op[op_count].reserved = nullptr;
360     op[op_count].data.send_status_from_server.trailing_metadata_count =
361         pc->p2s_trailing_metadata.count;
362     op[op_count].data.send_status_from_server.trailing_metadata =
363         pc->p2s_trailing_metadata.metadata;
364     op[op_count].data.send_status_from_server.status = pc->p2s_status;
365     op[op_count].data.send_status_from_server.status_details =
366         &pc->p2s_status_details;
367     op_count++;
368 
369     // TODO(ctiller): The current core implementation requires initial
370     // metadata batches to be started *after* initial metadata batches have
371     // been completed. The C++ Callback API does this accounting too, for
372     // example.
373     //
374     // This entire fixture will need a redesign when the batch API goes away.
375     bool op_deferred = false;
376     {
377       grpc_core::MutexLock lock(pc->initial_metadata_mu);
378       if (!pc->p2s_initial_metadata_received) {
379         op_deferred = true;
380         pc->deferred_trailing_metadata_op =
381             static_cast<grpc_op*>(gpr_malloc(sizeof(op)));
382         memcpy(pc->deferred_trailing_metadata_op, &op, sizeof(op));
383       }
384     }
385     if (!op_deferred) {
386       refpc(pc, "on_c2p_sent_status");
387       err = grpc_call_start_batch(pc->c2p, op, op_count,
388                                   new_closure(on_c2p_sent_status, pc), nullptr);
389       CHECK_EQ(err, GRPC_CALL_OK);
390     }
391   }
392 
393   unrefpc(pc, "on_p2s_status");
394 }
395 
on_c2p_closed(void * arg,int)396 static void on_c2p_closed(void* arg, int /*success*/) {
397   proxy_call* pc = static_cast<proxy_call*>(arg);
398   unrefpc(pc, "on_c2p_closed");
399 }
400 
on_new_call(void * arg,int success)401 static void on_new_call(void* arg, int success) {
402   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
403   grpc_call_error err;
404 
405   if (success) {
406     grpc_op op;
407     memset(&op, 0, sizeof(op));
408     proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
409     memset(pc, 0, sizeof(*pc));
410     pc->proxy = proxy;
411     std::swap(pc->c2p_initial_metadata, proxy->new_call_metadata);
412     pc->initial_metadata_mu = new grpc_core::Mutex();
413     {
414       grpc_core::MutexLock lock(pc->initial_metadata_mu);
415       pc->p2s_initial_metadata_received = false;
416       pc->deferred_trailing_metadata_op = nullptr;
417     }
418     pc->c2p = proxy->new_call;
419     pc->p2s = grpc_channel_create_call(
420         proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
421         proxy->new_call_details.method, &proxy->new_call_details.host,
422         proxy->new_call_details.deadline, nullptr);
423     gpr_ref_init(&pc->refs, 1);
424 
425     op.reserved = nullptr;
426 
427     // Proxy: receive initial metadata from the server
428     op.op = GRPC_OP_RECV_INITIAL_METADATA;
429     op.flags = 0;
430     op.data.recv_initial_metadata.recv_initial_metadata =
431         &pc->p2s_initial_metadata;
432     refpc(pc, "on_p2s_recv_initial_metadata");
433     err = grpc_call_start_batch(pc->p2s, &op, 1,
434                                 new_closure(on_p2s_recv_initial_metadata, pc),
435                                 nullptr);
436     CHECK_EQ(err, GRPC_CALL_OK);
437 
438     // Proxy: send initial metadata to the server
439     op.op = GRPC_OP_SEND_INITIAL_METADATA;
440     op.flags = 0;
441     op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
442     op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
443     refpc(pc, "on_p2s_sent_initial_metadata");
444     err = grpc_call_start_batch(pc->p2s, &op, 1,
445                                 new_closure(on_p2s_sent_initial_metadata, pc),
446                                 nullptr);
447     CHECK_EQ(err, GRPC_CALL_OK);
448 
449     // Client: receive message from the proxy
450     op.op = GRPC_OP_RECV_MESSAGE;
451     op.flags = 0;
452     op.data.recv_message.recv_message = &pc->c2p_msg;
453     refpc(pc, "on_c2p_recv_msg");
454     err = grpc_call_start_batch(pc->c2p, &op, 1,
455                                 new_closure(on_c2p_recv_msg, pc), nullptr);
456     CHECK_EQ(err, GRPC_CALL_OK);
457 
458     // Proxy: receive message from the server
459     op.op = GRPC_OP_RECV_MESSAGE;
460     op.flags = 0;
461     op.data.recv_message.recv_message = &pc->p2s_msg;
462     refpc(pc, "on_p2s_recv_msg");
463     err = grpc_call_start_batch(pc->p2s, &op, 1,
464                                 new_closure(on_p2s_recv_msg, pc), nullptr);
465     CHECK_EQ(err, GRPC_CALL_OK);
466 
467     // Proxy: receive status from the server
468     op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
469     op.flags = 0;
470     op.data.recv_status_on_client.trailing_metadata =
471         &pc->p2s_trailing_metadata;
472     op.data.recv_status_on_client.status = &pc->p2s_status;
473     op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
474     refpc(pc, "on_p2s_status");
475     err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
476                                 nullptr);
477     CHECK_EQ(err, GRPC_CALL_OK);
478 
479     // Client: receive close-ack from the proxy
480     op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
481     op.flags = 0;
482     op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
483     refpc(pc, "on_c2p_closed");
484     err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
485                                 nullptr);
486     CHECK_EQ(err, GRPC_CALL_OK);
487 
488     request_call(proxy);
489 
490     grpc_call_details_destroy(&proxy->new_call_details);
491     grpc_call_details_init(&proxy->new_call_details);
492 
493     unrefpc(pc, "init");
494   } else {
495     CHECK_EQ(proxy->new_call, nullptr);
496   }
497 }
498 
request_call(grpc_end2end_proxy * proxy)499 static void request_call(grpc_end2end_proxy* proxy) {
500   proxy->new_call = nullptr;
501   CHECK(GRPC_CALL_OK ==
502         grpc_server_request_call(proxy->server, &proxy->new_call,
503                                  &proxy->new_call_details,
504                                  &proxy->new_call_metadata, proxy->cq,
505                                  proxy->cq, new_closure(on_new_call, proxy)));
506 }
507 
thread_main(void * arg)508 static void thread_main(void* arg) {
509   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
510   closure* cl;
511   for (;;) {
512     grpc_event ev = grpc_completion_queue_next(
513         proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
514     switch (ev.type) {
515       case GRPC_QUEUE_TIMEOUT:
516         grpc_core::Crash("Should never reach here");
517       case GRPC_QUEUE_SHUTDOWN:
518         return;
519       case GRPC_OP_COMPLETE:
520         cl = static_cast<closure*>(ev.tag);
521         cl->func(cl->arg, ev.success);
522         gpr_free(cl);
523         break;
524     }
525   }
526 }
527 
grpc_end2end_proxy_get_client_target(grpc_end2end_proxy * proxy)528 const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
529   return proxy->proxy_port.c_str();
530 }
531 
grpc_end2end_proxy_get_server_port(grpc_end2end_proxy * proxy)532 const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
533   return proxy->server_port.c_str();
534 }
535