• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 #include "test/core/util/test_config.h"
21 
22 #ifdef GRPC_TEST_PICK_PORT
23 #include "test/core/util/port_server_client.h"
24 
25 #include <math.h>
26 #include <string.h>
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/lib/http/httpcli.h"
36 
37 typedef struct freereq {
38   gpr_mu* mu = nullptr;
39   grpc_polling_entity pops = {};
40   int done = 0;
41 } freereq;
42 
destroy_pops_and_shutdown(void * p,grpc_error *)43 static void destroy_pops_and_shutdown(void* p, grpc_error* /*error*/) {
44   grpc_pollset* pollset =
45       grpc_polling_entity_pollset(static_cast<grpc_polling_entity*>(p));
46   grpc_pollset_destroy(pollset);
47   gpr_free(pollset);
48 }
49 
freed_port_from_server(void * arg,grpc_error *)50 static void freed_port_from_server(void* arg, grpc_error* /*error*/) {
51   freereq* pr = static_cast<freereq*>(arg);
52   gpr_mu_lock(pr->mu);
53   pr->done = 1;
54   GRPC_LOG_IF_ERROR(
55       "pollset_kick",
56       grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
57   gpr_mu_unlock(pr->mu);
58 }
59 
grpc_free_port_using_server(int port)60 void grpc_free_port_using_server(int port) {
61   grpc_httpcli_context context;
62   grpc_httpcli_request req;
63   grpc_httpcli_response rsp;
64   freereq pr;
65   char* path;
66   grpc_closure* shutdown_closure;
67 
68   grpc_init();
69   {
70     grpc_core::ExecCtx exec_ctx;
71 
72     pr = {};
73     memset(&req, 0, sizeof(req));
74     rsp = {};
75 
76     grpc_pollset* pollset =
77         static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
78     grpc_pollset_init(pollset, &pr.mu);
79     pr.pops = grpc_polling_entity_create_from_pollset(pollset);
80     shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
81                                            grpc_schedule_on_exec_ctx);
82 
83     req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
84     gpr_asprintf(&path, "/drop/%d", port);
85     req.http.path = path;
86 
87     grpc_httpcli_context_init(&context);
88     grpc_resource_quota* resource_quota =
89         grpc_resource_quota_create("port_server_client/free");
90     grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
91                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
92                      GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
93                                          grpc_schedule_on_exec_ctx),
94                      &rsp);
95     grpc_resource_quota_unref_internal(resource_quota);
96     grpc_core::ExecCtx::Get()->Flush();
97     gpr_mu_lock(pr.mu);
98     while (!pr.done) {
99       grpc_pollset_worker* worker = nullptr;
100       if (!GRPC_LOG_IF_ERROR(
101               "pollset_work",
102               grpc_pollset_work(
103                   grpc_polling_entity_pollset(&pr.pops), &worker,
104                   grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
105         pr.done = 1;
106       }
107     }
108     gpr_mu_unlock(pr.mu);
109 
110     grpc_httpcli_context_destroy(&context);
111     grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
112                           shutdown_closure);
113 
114     gpr_free(path);
115     grpc_http_response_destroy(&rsp);
116   }
117   grpc_shutdown();
118 }
119 
120 typedef struct portreq {
121   gpr_mu* mu = nullptr;
122   grpc_polling_entity pops = {};
123   int port = 0;
124   int retries = 0;
125   char* server = nullptr;
126   grpc_httpcli_context* ctx = nullptr;
127   grpc_httpcli_response response = {};
128 } portreq;
129 
got_port_from_server(void * arg,grpc_error * error)130 static void got_port_from_server(void* arg, grpc_error* error) {
131   size_t i;
132   int port = 0;
133   portreq* pr = static_cast<portreq*>(arg);
134   int failed = 0;
135   grpc_httpcli_response* response = &pr->response;
136 
137   if (error != GRPC_ERROR_NONE) {
138     failed = 1;
139     const char* msg = grpc_error_string(error);
140     gpr_log(GPR_DEBUG, "failed port pick from server: retrying [%s]", msg);
141 
142   } else if (response->status != 200) {
143     failed = 1;
144     gpr_log(GPR_DEBUG, "failed port pick from server: status=%d",
145             response->status);
146   }
147 
148   if (failed) {
149     grpc_httpcli_request req;
150     memset(&req, 0, sizeof(req));
151     if (pr->retries >= 5) {
152       gpr_mu_lock(pr->mu);
153       pr->port = 0;
154       GRPC_LOG_IF_ERROR(
155           "pollset_kick",
156           grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
157       gpr_mu_unlock(pr->mu);
158       return;
159     }
160     GPR_ASSERT(pr->retries < 10);
161     gpr_sleep_until(gpr_time_add(
162         gpr_now(GPR_CLOCK_REALTIME),
163         gpr_time_from_millis(
164             static_cast<int64_t>(
165                 1000.0 * (1 + pow(1.3, pr->retries) * rand() / RAND_MAX)),
166             GPR_TIMESPAN)));
167     pr->retries++;
168     req.host = pr->server;
169     req.http.path = const_cast<char*>("/get");
170     grpc_http_response_destroy(&pr->response);
171     pr->response = {};
172     grpc_resource_quota* resource_quota =
173         grpc_resource_quota_create("port_server_client/pick_retry");
174     grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req,
175                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
176                      GRPC_CLOSURE_CREATE(got_port_from_server, pr,
177                                          grpc_schedule_on_exec_ctx),
178                      &pr->response);
179     grpc_resource_quota_unref_internal(resource_quota);
180     return;
181   }
182   GPR_ASSERT(response);
183   GPR_ASSERT(response->status == 200);
184   for (i = 0; i < response->body_length; i++) {
185     GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
186     port = port * 10 + response->body[i] - '0';
187   }
188   GPR_ASSERT(port > 1024);
189   gpr_mu_lock(pr->mu);
190   pr->port = port;
191   GRPC_LOG_IF_ERROR(
192       "pollset_kick",
193       grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
194   gpr_mu_unlock(pr->mu);
195 }
196 
grpc_pick_port_using_server(void)197 int grpc_pick_port_using_server(void) {
198   grpc_httpcli_context context;
199   grpc_httpcli_request req;
200   portreq pr;
201   grpc_closure* shutdown_closure;
202 
203   grpc_init();
204   {
205     grpc_core::ExecCtx exec_ctx;
206     pr = {};
207     memset(&req, 0, sizeof(req));
208     grpc_pollset* pollset =
209         static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
210     grpc_pollset_init(pollset, &pr.mu);
211     pr.pops = grpc_polling_entity_create_from_pollset(pollset);
212     shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
213                                            grpc_schedule_on_exec_ctx);
214     pr.port = -1;
215     pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
216     pr.ctx = &context;
217 
218     req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
219     req.http.path = const_cast<char*>("/get");
220 
221     grpc_httpcli_context_init(&context);
222     grpc_resource_quota* resource_quota =
223         grpc_resource_quota_create("port_server_client/pick");
224     grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
225                      grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
226                      GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
227                                          grpc_schedule_on_exec_ctx),
228                      &pr.response);
229     grpc_resource_quota_unref_internal(resource_quota);
230     grpc_core::ExecCtx::Get()->Flush();
231     gpr_mu_lock(pr.mu);
232     while (pr.port == -1) {
233       grpc_pollset_worker* worker = nullptr;
234       if (!GRPC_LOG_IF_ERROR(
235               "pollset_work",
236               grpc_pollset_work(
237                   grpc_polling_entity_pollset(&pr.pops), &worker,
238                   grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
239         pr.port = 0;
240       }
241     }
242     gpr_mu_unlock(pr.mu);
243 
244     grpc_http_response_destroy(&pr.response);
245     grpc_httpcli_context_destroy(&context);
246     grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
247                           shutdown_closure);
248 
249     grpc_core::ExecCtx::Get()->Flush();
250   }
251   grpc_shutdown();
252 
253   return pr.port;
254 }
255 
256 #endif  // GRPC_TEST_PICK_PORT
257