1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20 #include "test/core/util/test_config.h"
21
22 #ifdef GRPC_TEST_PICK_PORT
23 #include "test/core/util/port_server_client.h"
24
25 #include <math.h>
26 #include <string.h>
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/lib/http/httpcli.h"
36
37 typedef struct freereq {
38 gpr_mu* mu;
39 grpc_polling_entity pops;
40 int done;
41 } freereq;
42
destroy_pops_and_shutdown(void * p,grpc_error * error)43 static void destroy_pops_and_shutdown(void* p, grpc_error* error) {
44 grpc_pollset* pollset =
45 grpc_polling_entity_pollset(static_cast<grpc_polling_entity*>(p));
46 grpc_pollset_destroy(pollset);
47 gpr_free(pollset);
48 }
49
freed_port_from_server(void * arg,grpc_error * error)50 static void freed_port_from_server(void* arg, grpc_error* error) {
51 freereq* pr = static_cast<freereq*>(arg);
52 gpr_mu_lock(pr->mu);
53 pr->done = 1;
54 GRPC_LOG_IF_ERROR(
55 "pollset_kick",
56 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
57 gpr_mu_unlock(pr->mu);
58 }
59
grpc_free_port_using_server(int port)60 void grpc_free_port_using_server(int port) {
61 grpc_httpcli_context context;
62 grpc_httpcli_request req;
63 grpc_httpcli_response rsp;
64 freereq pr;
65 char* path;
66 grpc_core::ExecCtx exec_ctx;
67 grpc_closure* shutdown_closure;
68
69 grpc_init();
70
71 memset(&pr, 0, sizeof(pr));
72 memset(&req, 0, sizeof(req));
73 memset(&rsp, 0, sizeof(rsp));
74
75 grpc_pollset* pollset =
76 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
77 grpc_pollset_init(pollset, &pr.mu);
78 pr.pops = grpc_polling_entity_create_from_pollset(pollset);
79 shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
80 grpc_schedule_on_exec_ctx);
81
82 req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
83 gpr_asprintf(&path, "/drop/%d", port);
84 req.http.path = path;
85
86 grpc_httpcli_context_init(&context);
87 grpc_resource_quota* resource_quota =
88 grpc_resource_quota_create("port_server_client/free");
89 grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
90 grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
91 GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
92 grpc_schedule_on_exec_ctx),
93 &rsp);
94 grpc_resource_quota_unref_internal(resource_quota);
95 grpc_core::ExecCtx::Get()->Flush();
96 gpr_mu_lock(pr.mu);
97 while (!pr.done) {
98 grpc_pollset_worker* worker = nullptr;
99 if (!GRPC_LOG_IF_ERROR(
100 "pollset_work",
101 grpc_pollset_work(
102 grpc_polling_entity_pollset(&pr.pops), &worker,
103 grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
104 pr.done = 1;
105 }
106 }
107 gpr_mu_unlock(pr.mu);
108
109 grpc_httpcli_context_destroy(&context);
110 grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
111 shutdown_closure);
112
113 gpr_free(path);
114 grpc_http_response_destroy(&rsp);
115
116 grpc_shutdown();
117 }
118
119 typedef struct portreq {
120 gpr_mu* mu;
121 grpc_polling_entity pops;
122 int port;
123 int retries;
124 char* server;
125 grpc_httpcli_context* ctx;
126 grpc_httpcli_response response;
127 } portreq;
128
got_port_from_server(void * arg,grpc_error * error)129 static void got_port_from_server(void* arg, grpc_error* error) {
130 size_t i;
131 int port = 0;
132 portreq* pr = static_cast<portreq*>(arg);
133 int failed = 0;
134 grpc_httpcli_response* response = &pr->response;
135
136 if (error != GRPC_ERROR_NONE) {
137 failed = 1;
138 const char* msg = grpc_error_string(error);
139 gpr_log(GPR_DEBUG, "failed port pick from server: retrying [%s]", msg);
140
141 } else if (response->status != 200) {
142 failed = 1;
143 gpr_log(GPR_DEBUG, "failed port pick from server: status=%d",
144 response->status);
145 }
146
147 if (failed) {
148 grpc_httpcli_request req;
149 memset(&req, 0, sizeof(req));
150 if (pr->retries >= 5) {
151 gpr_mu_lock(pr->mu);
152 pr->port = 0;
153 GRPC_LOG_IF_ERROR(
154 "pollset_kick",
155 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
156 gpr_mu_unlock(pr->mu);
157 return;
158 }
159 GPR_ASSERT(pr->retries < 10);
160 gpr_sleep_until(gpr_time_add(
161 gpr_now(GPR_CLOCK_REALTIME),
162 gpr_time_from_millis(
163 static_cast<int64_t>(
164 1000.0 * (1 + pow(1.3, pr->retries) * rand() / RAND_MAX)),
165 GPR_TIMESPAN)));
166 pr->retries++;
167 req.host = pr->server;
168 req.http.path = const_cast<char*>("/get");
169 grpc_http_response_destroy(&pr->response);
170 memset(&pr->response, 0, sizeof(pr->response));
171 grpc_resource_quota* resource_quota =
172 grpc_resource_quota_create("port_server_client/pick_retry");
173 grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req,
174 grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
175 GRPC_CLOSURE_CREATE(got_port_from_server, pr,
176 grpc_schedule_on_exec_ctx),
177 &pr->response);
178 grpc_resource_quota_unref_internal(resource_quota);
179 return;
180 }
181 GPR_ASSERT(response);
182 GPR_ASSERT(response->status == 200);
183 for (i = 0; i < response->body_length; i++) {
184 GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
185 port = port * 10 + response->body[i] - '0';
186 }
187 GPR_ASSERT(port > 1024);
188 gpr_mu_lock(pr->mu);
189 pr->port = port;
190 GRPC_LOG_IF_ERROR(
191 "pollset_kick",
192 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
193 gpr_mu_unlock(pr->mu);
194 }
195
grpc_pick_port_using_server(void)196 int grpc_pick_port_using_server(void) {
197 grpc_httpcli_context context;
198 grpc_httpcli_request req;
199 portreq pr;
200 grpc_closure* shutdown_closure;
201
202 grpc_init();
203 {
204 grpc_core::ExecCtx exec_ctx;
205 memset(&pr, 0, sizeof(pr));
206 memset(&req, 0, sizeof(req));
207 grpc_pollset* pollset =
208 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
209 grpc_pollset_init(pollset, &pr.mu);
210 pr.pops = grpc_polling_entity_create_from_pollset(pollset);
211 shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
212 grpc_schedule_on_exec_ctx);
213 pr.port = -1;
214 pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
215 pr.ctx = &context;
216
217 req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
218 req.http.path = const_cast<char*>("/get");
219
220 grpc_httpcli_context_init(&context);
221 grpc_resource_quota* resource_quota =
222 grpc_resource_quota_create("port_server_client/pick");
223 grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
224 grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
225 GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
226 grpc_schedule_on_exec_ctx),
227 &pr.response);
228 grpc_resource_quota_unref_internal(resource_quota);
229 grpc_core::ExecCtx::Get()->Flush();
230 gpr_mu_lock(pr.mu);
231 while (pr.port == -1) {
232 grpc_pollset_worker* worker = nullptr;
233 if (!GRPC_LOG_IF_ERROR(
234 "pollset_work",
235 grpc_pollset_work(
236 grpc_polling_entity_pollset(&pr.pops), &worker,
237 grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
238 pr.port = 0;
239 }
240 }
241 gpr_mu_unlock(pr.mu);
242
243 grpc_http_response_destroy(&pr.response);
244 grpc_httpcli_context_destroy(&context);
245 grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
246 shutdown_closure);
247
248 grpc_core::ExecCtx::Get()->Flush();
249 }
250 grpc_shutdown();
251
252 return pr.port;
253 }
254
255 #endif // GRPC_TEST_PICK_PORT
256