• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/end2end/fixtures/proxy.h"
20 
21 #include <string.h>
22 
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 
27 #include "src/core/lib/gpr/host_port.h"
28 #include "src/core/lib/gpr/useful.h"
29 #include "src/core/lib/gprpp/thd.h"
30 #include "test/core/util/port.h"
31 
32 struct grpc_end2end_proxy {
grpc_end2end_proxygrpc_end2end_proxy33   grpc_end2end_proxy()
34       : proxy_port(nullptr),
35         server_port(nullptr),
36         cq(nullptr),
37         server(nullptr),
38         client(nullptr),
39         shutdown(false),
40         new_call(nullptr) {
41     memset(&new_call_details, 0, sizeof(new_call_details));
42     memset(&new_call_metadata, 0, sizeof(new_call_metadata));
43   }
44   grpc_core::Thread thd;
45   char* proxy_port;
46   char* server_port;
47   grpc_completion_queue* cq;
48   grpc_server* server;
49   grpc_channel* client;
50 
51   int shutdown;
52 
53   /* requested call */
54   grpc_call* new_call;
55   grpc_call_details new_call_details;
56   grpc_metadata_array new_call_metadata;
57 };
58 
59 typedef struct {
60   void (*func)(void* arg, int success);
61   void* arg;
62 } closure;
63 
64 typedef struct {
65   gpr_refcount refs;
66   grpc_end2end_proxy* proxy;
67 
68   grpc_call* c2p;
69   grpc_call* p2s;
70 
71   grpc_metadata_array c2p_initial_metadata;
72   grpc_metadata_array p2s_initial_metadata;
73 
74   grpc_byte_buffer* c2p_msg;
75   grpc_byte_buffer* p2s_msg;
76 
77   grpc_metadata_array p2s_trailing_metadata;
78   grpc_status_code p2s_status;
79   grpc_slice p2s_status_details;
80 
81   int c2p_server_cancelled;
82 } proxy_call;
83 
84 static void thread_main(void* arg);
85 static void request_call(grpc_end2end_proxy* proxy);
86 
grpc_end2end_proxy_create(const grpc_end2end_proxy_def * def,grpc_channel_args * client_args,grpc_channel_args * server_args)87 grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def,
88                                               grpc_channel_args* client_args,
89                                               grpc_channel_args* server_args) {
90   int proxy_port = grpc_pick_unused_port_or_die();
91   int server_port = grpc_pick_unused_port_or_die();
92 
93   grpc_end2end_proxy* proxy = grpc_core::New<grpc_end2end_proxy>();
94 
95   gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port);
96   gpr_join_host_port(&proxy->server_port, "localhost", server_port);
97 
98   gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port,
99           proxy->server_port);
100 
101   proxy->cq = grpc_completion_queue_create_for_next(nullptr);
102   proxy->server = def->create_server(proxy->proxy_port, server_args);
103   proxy->client = def->create_client(proxy->server_port, client_args);
104 
105   grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
106   grpc_server_start(proxy->server);
107 
108   grpc_call_details_init(&proxy->new_call_details);
109   proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
110   proxy->thd.Start();
111 
112   request_call(proxy);
113 
114   return proxy;
115 }
116 
new_closure(void (* func)(void * arg,int success),void * arg)117 static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
118   closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
119   cl->func = func;
120   cl->arg = arg;
121   return cl;
122 }
123 
shutdown_complete(void * arg,int success)124 static void shutdown_complete(void* arg, int success) {
125   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
126   proxy->shutdown = 1;
127   grpc_completion_queue_shutdown(proxy->cq);
128 }
129 
grpc_end2end_proxy_destroy(grpc_end2end_proxy * proxy)130 void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
131   grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
132                                   new_closure(shutdown_complete, proxy));
133   proxy->thd.Join();
134   gpr_free(proxy->proxy_port);
135   gpr_free(proxy->server_port);
136   grpc_server_destroy(proxy->server);
137   grpc_channel_destroy(proxy->client);
138   grpc_completion_queue_destroy(proxy->cq);
139   grpc_call_details_destroy(&proxy->new_call_details);
140   grpc_core::Delete(proxy);
141 }
142 
unrefpc(proxy_call * pc,const char * reason)143 static void unrefpc(proxy_call* pc, const char* reason) {
144   if (gpr_unref(&pc->refs)) {
145     grpc_call_unref(pc->c2p);
146     grpc_call_unref(pc->p2s);
147     grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
148     grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
149     grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
150     grpc_slice_unref(pc->p2s_status_details);
151     gpr_free(pc);
152   }
153 }
154 
refpc(proxy_call * pc,const char * reason)155 static void refpc(proxy_call* pc, const char* reason) { gpr_ref(&pc->refs); }
156 
on_c2p_sent_initial_metadata(void * arg,int success)157 static void on_c2p_sent_initial_metadata(void* arg, int success) {
158   proxy_call* pc = static_cast<proxy_call*>(arg);
159   unrefpc(pc, "on_c2p_sent_initial_metadata");
160 }
161 
on_p2s_recv_initial_metadata(void * arg,int success)162 static void on_p2s_recv_initial_metadata(void* arg, int success) {
163   proxy_call* pc = static_cast<proxy_call*>(arg);
164   grpc_op op;
165   grpc_call_error err;
166 
167   memset(&op, 0, sizeof(op));
168   if (!pc->proxy->shutdown) {
169     op.op = GRPC_OP_SEND_INITIAL_METADATA;
170     op.flags = 0;
171     op.reserved = nullptr;
172     op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
173     op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
174     refpc(pc, "on_c2p_sent_initial_metadata");
175     err = grpc_call_start_batch(pc->c2p, &op, 1,
176                                 new_closure(on_c2p_sent_initial_metadata, pc),
177                                 nullptr);
178     GPR_ASSERT(err == GRPC_CALL_OK);
179   }
180 
181   unrefpc(pc, "on_p2s_recv_initial_metadata");
182 }
183 
on_p2s_sent_initial_metadata(void * arg,int success)184 static void on_p2s_sent_initial_metadata(void* arg, int success) {
185   proxy_call* pc = static_cast<proxy_call*>(arg);
186   unrefpc(pc, "on_p2s_sent_initial_metadata");
187 }
188 
189 static void on_c2p_recv_msg(void* arg, int success);
190 
on_p2s_sent_message(void * arg,int success)191 static void on_p2s_sent_message(void* arg, int success) {
192   proxy_call* pc = static_cast<proxy_call*>(arg);
193   grpc_op op;
194   grpc_call_error err;
195 
196   grpc_byte_buffer_destroy(pc->c2p_msg);
197   if (!pc->proxy->shutdown && success) {
198     op.op = GRPC_OP_RECV_MESSAGE;
199     op.flags = 0;
200     op.reserved = nullptr;
201     op.data.recv_message.recv_message = &pc->c2p_msg;
202     refpc(pc, "on_c2p_recv_msg");
203     err = grpc_call_start_batch(pc->c2p, &op, 1,
204                                 new_closure(on_c2p_recv_msg, pc), nullptr);
205     GPR_ASSERT(err == GRPC_CALL_OK);
206   }
207 
208   unrefpc(pc, "on_p2s_sent_message");
209 }
210 
on_p2s_sent_close(void * arg,int success)211 static void on_p2s_sent_close(void* arg, int success) {
212   proxy_call* pc = static_cast<proxy_call*>(arg);
213   unrefpc(pc, "on_p2s_sent_close");
214 }
215 
on_c2p_recv_msg(void * arg,int success)216 static void on_c2p_recv_msg(void* arg, int success) {
217   proxy_call* pc = static_cast<proxy_call*>(arg);
218   grpc_op op;
219   grpc_call_error err;
220 
221   if (!pc->proxy->shutdown && success) {
222     if (pc->c2p_msg != nullptr) {
223       op.op = GRPC_OP_SEND_MESSAGE;
224       op.flags = 0;
225       op.reserved = nullptr;
226       op.data.send_message.send_message = pc->c2p_msg;
227       refpc(pc, "on_p2s_sent_message");
228       err = grpc_call_start_batch(
229           pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
230       GPR_ASSERT(err == GRPC_CALL_OK);
231     } else {
232       op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
233       op.flags = 0;
234       op.reserved = nullptr;
235       refpc(pc, "on_p2s_sent_close");
236       err = grpc_call_start_batch(pc->p2s, &op, 1,
237                                   new_closure(on_p2s_sent_close, pc), nullptr);
238       GPR_ASSERT(err == GRPC_CALL_OK);
239     }
240   } else {
241     if (pc->c2p_msg != nullptr) {
242       grpc_byte_buffer_destroy(pc->c2p_msg);
243     }
244   }
245 
246   unrefpc(pc, "on_c2p_recv_msg");
247 }
248 
249 static void on_p2s_recv_msg(void* arg, int success);
250 
on_c2p_sent_message(void * arg,int success)251 static void on_c2p_sent_message(void* arg, int success) {
252   proxy_call* pc = static_cast<proxy_call*>(arg);
253   grpc_op op;
254   grpc_call_error err;
255 
256   grpc_byte_buffer_destroy(pc->p2s_msg);
257   if (!pc->proxy->shutdown && success) {
258     op.op = GRPC_OP_RECV_MESSAGE;
259     op.flags = 0;
260     op.reserved = nullptr;
261     op.data.recv_message.recv_message = &pc->p2s_msg;
262     refpc(pc, "on_p2s_recv_msg");
263     err = grpc_call_start_batch(pc->p2s, &op, 1,
264                                 new_closure(on_p2s_recv_msg, pc), nullptr);
265     GPR_ASSERT(err == GRPC_CALL_OK);
266   }
267 
268   unrefpc(pc, "on_c2p_sent_message");
269 }
270 
on_p2s_recv_msg(void * arg,int success)271 static void on_p2s_recv_msg(void* arg, int success) {
272   proxy_call* pc = static_cast<proxy_call*>(arg);
273   grpc_op op;
274   grpc_call_error err;
275 
276   if (!pc->proxy->shutdown && success && pc->p2s_msg) {
277     op.op = GRPC_OP_SEND_MESSAGE;
278     op.flags = 0;
279     op.reserved = nullptr;
280     op.data.send_message.send_message = pc->p2s_msg;
281     refpc(pc, "on_c2p_sent_message");
282     err = grpc_call_start_batch(pc->c2p, &op, 1,
283                                 new_closure(on_c2p_sent_message, pc), nullptr);
284     GPR_ASSERT(err == GRPC_CALL_OK);
285   } else {
286     grpc_byte_buffer_destroy(pc->p2s_msg);
287   }
288   unrefpc(pc, "on_p2s_recv_msg");
289 }
290 
on_c2p_sent_status(void * arg,int success)291 static void on_c2p_sent_status(void* arg, int success) {
292   proxy_call* pc = static_cast<proxy_call*>(arg);
293   unrefpc(pc, "on_c2p_sent_status");
294 }
295 
on_p2s_status(void * arg,int success)296 static void on_p2s_status(void* arg, int success) {
297   proxy_call* pc = static_cast<proxy_call*>(arg);
298   grpc_op op;
299   grpc_call_error err;
300 
301   if (!pc->proxy->shutdown) {
302     GPR_ASSERT(success);
303     op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
304     op.flags = 0;
305     op.reserved = nullptr;
306     op.data.send_status_from_server.trailing_metadata_count =
307         pc->p2s_trailing_metadata.count;
308     op.data.send_status_from_server.trailing_metadata =
309         pc->p2s_trailing_metadata.metadata;
310     op.data.send_status_from_server.status = pc->p2s_status;
311     op.data.send_status_from_server.status_details = &pc->p2s_status_details;
312     refpc(pc, "on_c2p_sent_status");
313     err = grpc_call_start_batch(pc->c2p, &op, 1,
314                                 new_closure(on_c2p_sent_status, pc), nullptr);
315     GPR_ASSERT(err == GRPC_CALL_OK);
316   }
317 
318   unrefpc(pc, "on_p2s_status");
319 }
320 
on_c2p_closed(void * arg,int success)321 static void on_c2p_closed(void* arg, int success) {
322   proxy_call* pc = static_cast<proxy_call*>(arg);
323   unrefpc(pc, "on_c2p_closed");
324 }
325 
on_new_call(void * arg,int success)326 static void on_new_call(void* arg, int success) {
327   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
328   grpc_call_error err;
329 
330   if (success) {
331     grpc_op op;
332     memset(&op, 0, sizeof(op));
333     proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
334     memset(pc, 0, sizeof(*pc));
335     pc->proxy = proxy;
336     GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
337              proxy->new_call_metadata);
338     pc->c2p = proxy->new_call;
339     pc->p2s = grpc_channel_create_call(
340         proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
341         proxy->new_call_details.method, &proxy->new_call_details.host,
342         proxy->new_call_details.deadline, nullptr);
343     gpr_ref_init(&pc->refs, 1);
344 
345     op.reserved = nullptr;
346 
347     op.op = GRPC_OP_RECV_INITIAL_METADATA;
348     op.flags = 0;
349     op.data.recv_initial_metadata.recv_initial_metadata =
350         &pc->p2s_initial_metadata;
351     refpc(pc, "on_p2s_recv_initial_metadata");
352     err = grpc_call_start_batch(pc->p2s, &op, 1,
353                                 new_closure(on_p2s_recv_initial_metadata, pc),
354                                 nullptr);
355     GPR_ASSERT(err == GRPC_CALL_OK);
356 
357     op.op = GRPC_OP_SEND_INITIAL_METADATA;
358     op.flags = proxy->new_call_details.flags;
359     op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
360     op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
361     refpc(pc, "on_p2s_sent_initial_metadata");
362     err = grpc_call_start_batch(pc->p2s, &op, 1,
363                                 new_closure(on_p2s_sent_initial_metadata, pc),
364                                 nullptr);
365     GPR_ASSERT(err == GRPC_CALL_OK);
366 
367     op.op = GRPC_OP_RECV_MESSAGE;
368     op.flags = 0;
369     op.data.recv_message.recv_message = &pc->c2p_msg;
370     refpc(pc, "on_c2p_recv_msg");
371     err = grpc_call_start_batch(pc->c2p, &op, 1,
372                                 new_closure(on_c2p_recv_msg, pc), nullptr);
373     GPR_ASSERT(err == GRPC_CALL_OK);
374 
375     op.op = GRPC_OP_RECV_MESSAGE;
376     op.flags = 0;
377     op.data.recv_message.recv_message = &pc->p2s_msg;
378     refpc(pc, "on_p2s_recv_msg");
379     err = grpc_call_start_batch(pc->p2s, &op, 1,
380                                 new_closure(on_p2s_recv_msg, pc), nullptr);
381     GPR_ASSERT(err == GRPC_CALL_OK);
382 
383     op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
384     op.flags = 0;
385     op.data.recv_status_on_client.trailing_metadata =
386         &pc->p2s_trailing_metadata;
387     op.data.recv_status_on_client.status = &pc->p2s_status;
388     op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
389     refpc(pc, "on_p2s_status");
390     err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
391                                 nullptr);
392     GPR_ASSERT(err == GRPC_CALL_OK);
393 
394     op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
395     op.flags = 0;
396     op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
397     refpc(pc, "on_c2p_closed");
398     err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
399                                 nullptr);
400     GPR_ASSERT(err == GRPC_CALL_OK);
401 
402     request_call(proxy);
403 
404     grpc_call_details_destroy(&proxy->new_call_details);
405     grpc_call_details_init(&proxy->new_call_details);
406 
407     unrefpc(pc, "init");
408   } else {
409     GPR_ASSERT(proxy->new_call == nullptr);
410   }
411 }
412 
request_call(grpc_end2end_proxy * proxy)413 static void request_call(grpc_end2end_proxy* proxy) {
414   proxy->new_call = nullptr;
415   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
416                                  proxy->server, &proxy->new_call,
417                                  &proxy->new_call_details,
418                                  &proxy->new_call_metadata, proxy->cq,
419                                  proxy->cq, new_closure(on_new_call, proxy)));
420 }
421 
thread_main(void * arg)422 static void thread_main(void* arg) {
423   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
424   closure* cl;
425   for (;;) {
426     grpc_event ev = grpc_completion_queue_next(
427         proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
428     switch (ev.type) {
429       case GRPC_QUEUE_TIMEOUT:
430         gpr_log(GPR_ERROR, "Should never reach here");
431         abort();
432       case GRPC_QUEUE_SHUTDOWN:
433         return;
434       case GRPC_OP_COMPLETE:
435         cl = static_cast<closure*>(ev.tag);
436         cl->func(cl->arg, ev.success);
437         gpr_free(cl);
438         break;
439     }
440   }
441 }
442 
grpc_end2end_proxy_get_client_target(grpc_end2end_proxy * proxy)443 const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
444   return proxy->proxy_port;
445 }
446 
grpc_end2end_proxy_get_server_port(grpc_end2end_proxy * proxy)447 const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
448   return proxy->server_port;
449 }
450