1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20 #include "test/core/util/test_config.h"
21
22 #ifdef GRPC_TEST_PICK_PORT
23 #include "test/core/util/port_server_client.h"
24
25 #include <math.h>
26 #include <string.h>
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/lib/http/httpcli.h"
36
37 typedef struct freereq {
38 gpr_mu* mu = nullptr;
39 grpc_polling_entity pops = {};
40 int done = 0;
41 } freereq;
42
destroy_pops_and_shutdown(void * p,grpc_error *)43 static void destroy_pops_and_shutdown(void* p, grpc_error* /*error*/) {
44 grpc_pollset* pollset =
45 grpc_polling_entity_pollset(static_cast<grpc_polling_entity*>(p));
46 grpc_pollset_destroy(pollset);
47 gpr_free(pollset);
48 }
49
freed_port_from_server(void * arg,grpc_error *)50 static void freed_port_from_server(void* arg, grpc_error* /*error*/) {
51 freereq* pr = static_cast<freereq*>(arg);
52 gpr_mu_lock(pr->mu);
53 pr->done = 1;
54 GRPC_LOG_IF_ERROR(
55 "pollset_kick",
56 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
57 gpr_mu_unlock(pr->mu);
58 }
59
grpc_free_port_using_server(int port)60 void grpc_free_port_using_server(int port) {
61 grpc_httpcli_context context;
62 grpc_httpcli_request req;
63 grpc_httpcli_response rsp;
64 freereq pr;
65 char* path;
66 grpc_closure* shutdown_closure;
67
68 grpc_init();
69 {
70 grpc_core::ExecCtx exec_ctx;
71
72 pr = {};
73 memset(&req, 0, sizeof(req));
74 rsp = {};
75
76 grpc_pollset* pollset =
77 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
78 grpc_pollset_init(pollset, &pr.mu);
79 pr.pops = grpc_polling_entity_create_from_pollset(pollset);
80 shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
81 grpc_schedule_on_exec_ctx);
82
83 req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
84 gpr_asprintf(&path, "/drop/%d", port);
85 req.http.path = path;
86
87 grpc_httpcli_context_init(&context);
88 grpc_resource_quota* resource_quota =
89 grpc_resource_quota_create("port_server_client/free");
90 grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
91 grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
92 GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
93 grpc_schedule_on_exec_ctx),
94 &rsp);
95 grpc_resource_quota_unref_internal(resource_quota);
96 grpc_core::ExecCtx::Get()->Flush();
97 gpr_mu_lock(pr.mu);
98 while (!pr.done) {
99 grpc_pollset_worker* worker = nullptr;
100 if (!GRPC_LOG_IF_ERROR(
101 "pollset_work",
102 grpc_pollset_work(
103 grpc_polling_entity_pollset(&pr.pops), &worker,
104 grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
105 pr.done = 1;
106 }
107 }
108 gpr_mu_unlock(pr.mu);
109
110 grpc_httpcli_context_destroy(&context);
111 grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
112 shutdown_closure);
113
114 gpr_free(path);
115 grpc_http_response_destroy(&rsp);
116 }
117 grpc_shutdown();
118 }
119
120 typedef struct portreq {
121 gpr_mu* mu = nullptr;
122 grpc_polling_entity pops = {};
123 int port = 0;
124 int retries = 0;
125 char* server = nullptr;
126 grpc_httpcli_context* ctx = nullptr;
127 grpc_httpcli_response response = {};
128 } portreq;
129
got_port_from_server(void * arg,grpc_error * error)130 static void got_port_from_server(void* arg, grpc_error* error) {
131 size_t i;
132 int port = 0;
133 portreq* pr = static_cast<portreq*>(arg);
134 int failed = 0;
135 grpc_httpcli_response* response = &pr->response;
136
137 if (error != GRPC_ERROR_NONE) {
138 failed = 1;
139 const char* msg = grpc_error_string(error);
140 gpr_log(GPR_DEBUG, "failed port pick from server: retrying [%s]", msg);
141
142 } else if (response->status != 200) {
143 failed = 1;
144 gpr_log(GPR_DEBUG, "failed port pick from server: status=%d",
145 response->status);
146 }
147
148 if (failed) {
149 grpc_httpcli_request req;
150 memset(&req, 0, sizeof(req));
151 if (pr->retries >= 5) {
152 gpr_mu_lock(pr->mu);
153 pr->port = 0;
154 GRPC_LOG_IF_ERROR(
155 "pollset_kick",
156 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
157 gpr_mu_unlock(pr->mu);
158 return;
159 }
160 GPR_ASSERT(pr->retries < 10);
161 gpr_sleep_until(gpr_time_add(
162 gpr_now(GPR_CLOCK_REALTIME),
163 gpr_time_from_millis(
164 static_cast<int64_t>(
165 1000.0 * (1 + pow(1.3, pr->retries) * rand() / RAND_MAX)),
166 GPR_TIMESPAN)));
167 pr->retries++;
168 req.host = pr->server;
169 req.http.path = const_cast<char*>("/get");
170 grpc_http_response_destroy(&pr->response);
171 pr->response = {};
172 grpc_resource_quota* resource_quota =
173 grpc_resource_quota_create("port_server_client/pick_retry");
174 grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req,
175 grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
176 GRPC_CLOSURE_CREATE(got_port_from_server, pr,
177 grpc_schedule_on_exec_ctx),
178 &pr->response);
179 grpc_resource_quota_unref_internal(resource_quota);
180 return;
181 }
182 GPR_ASSERT(response);
183 GPR_ASSERT(response->status == 200);
184 for (i = 0; i < response->body_length; i++) {
185 GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
186 port = port * 10 + response->body[i] - '0';
187 }
188 GPR_ASSERT(port > 1024);
189 gpr_mu_lock(pr->mu);
190 pr->port = port;
191 GRPC_LOG_IF_ERROR(
192 "pollset_kick",
193 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
194 gpr_mu_unlock(pr->mu);
195 }
196
grpc_pick_port_using_server(void)197 int grpc_pick_port_using_server(void) {
198 grpc_httpcli_context context;
199 grpc_httpcli_request req;
200 portreq pr;
201 grpc_closure* shutdown_closure;
202
203 grpc_init();
204 {
205 grpc_core::ExecCtx exec_ctx;
206 pr = {};
207 memset(&req, 0, sizeof(req));
208 grpc_pollset* pollset =
209 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
210 grpc_pollset_init(pollset, &pr.mu);
211 pr.pops = grpc_polling_entity_create_from_pollset(pollset);
212 shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
213 grpc_schedule_on_exec_ctx);
214 pr.port = -1;
215 pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
216 pr.ctx = &context;
217
218 req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
219 req.http.path = const_cast<char*>("/get");
220
221 grpc_httpcli_context_init(&context);
222 grpc_resource_quota* resource_quota =
223 grpc_resource_quota_create("port_server_client/pick");
224 grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
225 grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
226 GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
227 grpc_schedule_on_exec_ctx),
228 &pr.response);
229 grpc_resource_quota_unref_internal(resource_quota);
230 grpc_core::ExecCtx::Get()->Flush();
231 gpr_mu_lock(pr.mu);
232 while (pr.port == -1) {
233 grpc_pollset_worker* worker = nullptr;
234 if (!GRPC_LOG_IF_ERROR(
235 "pollset_work",
236 grpc_pollset_work(
237 grpc_polling_entity_pollset(&pr.pops), &worker,
238 grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
239 pr.port = 0;
240 }
241 }
242 gpr_mu_unlock(pr.mu);
243
244 grpc_http_response_destroy(&pr.response);
245 grpc_httpcli_context_destroy(&context);
246 grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
247 shutdown_closure);
248
249 grpc_core::ExecCtx::Get()->Flush();
250 }
251 grpc_shutdown();
252
253 return pr.port;
254 }
255
256 #endif // GRPC_TEST_PICK_PORT
257