1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <string.h>
26 #include <vector>
27
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/impl/codegen/grpc_types.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37
38 #include "src/core/ext/filters/http/server/http_server_filter.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/internal.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/handshaker.h"
44 #include "src/core/lib/channel/handshaker_registry.h"
45 #include "src/core/lib/gprpp/ref_counted.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/iomgr/endpoint.h"
48 #include "src/core/lib/iomgr/resolve_address.h"
49 #include "src/core/lib/iomgr/resource_quota.h"
50 #include "src/core/lib/iomgr/tcp_server.h"
51 #include "src/core/lib/iomgr/unix_sockets_posix.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/server.h"
55
56 namespace grpc_core {
57 namespace {
58
59 const char kUnixUriPrefix[] = "unix:";
60 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
61
62 class Chttp2ServerListener : public Server::ListenerInterface {
63 public:
64 static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
65 grpc_channel_args* args,
66 Chttp2ServerArgsModifier args_modifier,
67 int* port_num);
68
69 static grpc_error_handle CreateWithAcceptor(
70 Server* server, const char* name, grpc_channel_args* args,
71 Chttp2ServerArgsModifier args_modifier);
72
73 // Do not instantiate directly. Use one of the factory methods above.
74 Chttp2ServerListener(Server* server, grpc_channel_args* args,
75 Chttp2ServerArgsModifier args_modifier);
76 ~Chttp2ServerListener() override;
77
78 void Start(Server* server,
79 const std::vector<grpc_pollset*>* pollsets) override;
80
channelz_listen_socket_node() const81 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
82 return channelz_listen_socket_.get();
83 }
84
85 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
86
87 void Orphan() override;
88
89 private:
90 class ConfigFetcherWatcher
91 : public grpc_server_config_fetcher::WatcherInterface {
92 public:
ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)93 explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
94 : listener_(std::move(listener)) {}
95
96 void UpdateConnectionManager(
97 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
98 connection_manager) override;
99
100 void StopServing() override;
101
102 private:
103 RefCountedPtr<Chttp2ServerListener> listener_;
104 };
105
106 class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
107 public:
108 class HandshakingState : public InternallyRefCounted<HandshakingState> {
109 public:
110 HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
111 grpc_pollset* accepting_pollset,
112 grpc_tcp_server_acceptor* acceptor,
113 grpc_channel_args* args);
114
115 ~HandshakingState() override;
116
117 void Orphan() override;
118
119 void Start(grpc_endpoint* endpoint, grpc_channel_args* args);
120
121 // Needed to be able to grab an external ref in ActiveConnection::Start()
122 using InternallyRefCounted<HandshakingState>::Ref;
123
124 private:
125 static void OnTimeout(void* arg, grpc_error_handle error);
126 static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
127 static void OnHandshakeDone(void* arg, grpc_error_handle error);
128 RefCountedPtr<ActiveConnection> const connection_;
129 grpc_pollset* const accepting_pollset_;
130 grpc_tcp_server_acceptor* const acceptor_;
131 RefCountedPtr<HandshakeManager> handshake_mgr_
132 ABSL_GUARDED_BY(&connection_->mu_);
133 // State for enforcing handshake timeout on receiving HTTP/2 settings.
134 grpc_millis const deadline_;
135 grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
136 grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
137 grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
138 grpc_pollset_set* const interested_parties_;
139 };
140
141 ActiveConnection(grpc_pollset* accepting_pollset,
142 grpc_tcp_server_acceptor* acceptor,
143 grpc_channel_args* args);
144 ~ActiveConnection() override;
145
146 void Orphan() override;
147
148 void SendGoAway();
149
150 void Start(RefCountedPtr<Chttp2ServerListener> listener,
151 grpc_endpoint* endpoint, grpc_channel_args* args);
152
153 // Needed to be able to grab an external ref in
154 // Chttp2ServerListener::OnAccept()
155 using InternallyRefCounted<ActiveConnection>::Ref;
156
157 private:
158 static void OnClose(void* arg, grpc_error_handle error);
159
160 RefCountedPtr<Chttp2ServerListener> listener_;
161 Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
162 // Set by HandshakingState before the handshaking begins and reset when
163 // handshaking is done.
164 OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
165 // Set by HandshakingState when handshaking is done and a valid transport is
166 // created.
167 grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
168 grpc_closure on_close_;
169 bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
170 };
171
172 // To allow access to RefCounted<> like interface.
173 friend class RefCountedPtr<Chttp2ServerListener>;
174
175 // Should only be called once so as to start the TCP server.
176 void StartListening();
177
178 static void OnAccept(void* arg, grpc_endpoint* tcp,
179 grpc_pollset* accepting_pollset,
180 grpc_tcp_server_acceptor* acceptor);
181
182 static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
183
184 static void DestroyListener(Server* /*server*/, void* arg,
185 grpc_closure* destroy_done);
186
187 // The interface required by RefCountedPtr<> has been manually implemented
188 // here to take a ref on tcp_server_ instead. Note that, the handshaker needs
189 // tcp_server_ to exist for the lifetime of the handshake since it's needed by
190 // acceptor. Sharing refs between the listener and tcp_server_ is just an
191 // optimization to avoid taking additional refs on the listener, since
192 // TcpServerShutdownComplete already holds a ref to the listener.
IncrementRefCount()193 void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
IncrementRefCount(const DebugLocation &,const char *)194 void IncrementRefCount(const DebugLocation& /* location */,
195 const char* /* reason */) {
196 IncrementRefCount();
197 }
198
Ref()199 RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
200 IncrementRefCount();
201 return RefCountedPtr<Chttp2ServerListener>(this);
202 }
Ref(const DebugLocation &,const char *)203 RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
204 const char* /* reason */)
205 GRPC_MUST_USE_RESULT {
206 return Ref();
207 }
208
Unref()209 void Unref() { grpc_tcp_server_unref(tcp_server_); }
Unref(const DebugLocation &,const char *)210 void Unref(const DebugLocation& /* location */, const char* /* reason */) {
211 Unref();
212 }
213
214 Server* const server_;
215 grpc_tcp_server* tcp_server_;
216 grpc_resolved_address resolved_address_;
217 Chttp2ServerArgsModifier const args_modifier_;
218 ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
219 Mutex channel_args_mu_;
220 grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_);
221 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
222 connection_manager_ ABSL_GUARDED_BY(channel_args_mu_);
223 Mutex mu_;
224 // Signals whether grpc_tcp_server_start() has been called.
225 bool started_ ABSL_GUARDED_BY(mu_) = false;
226 // Signals whether grpc_tcp_server_start() has completed.
227 CondVar started_cv_ ABSL_GUARDED_BY(mu_);
228 // Signals whether new requests/connections are to be accepted.
229 bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
230 // Signals whether the application has triggered shutdown.
231 bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
232 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
233 ABSL_GUARDED_BY(mu_);
234 grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
235 grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
236 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
237 };
238
239 //
240 // Chttp2ServerListener::ConfigFetcherWatcher
241 //
242
UpdateConnectionManager(RefCountedPtr<grpc_server_config_fetcher::ConnectionManager> connection_manager)243 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
244 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
245 connection_manager) {
246 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
247 connection_manager_to_destroy;
248 {
249 MutexLock lock(&listener_->channel_args_mu_);
250 connection_manager_to_destroy = listener_->connection_manager_;
251 listener_->connection_manager_ = std::move(connection_manager);
252 }
253 {
254 MutexLock lock(&listener_->mu_);
255 if (listener_->shutdown_) {
256 return;
257 }
258 listener_->is_serving_ = true;
259 if (listener_->started_) return;
260 }
261 int port_temp;
262 grpc_error_handle error = grpc_tcp_server_add_port(
263 listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
264 if (error != GRPC_ERROR_NONE) {
265 GRPC_ERROR_UNREF(error);
266 gpr_log(GPR_ERROR, "Error adding port to server: %s",
267 grpc_error_std_string(error).c_str());
268 // TODO(yashykt): We wouldn't need to assert here if we bound to the
269 // port earlier during AddPort.
270 GPR_ASSERT(0);
271 }
272 listener_->StartListening();
273 {
274 MutexLock lock(&listener_->mu_);
275 listener_->started_ = true;
276 listener_->started_cv_.SignalAll();
277 }
278 }
279
StopServing()280 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
281 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
282 {
283 MutexLock lock(&listener_->mu_);
284 listener_->is_serving_ = false;
285 connections = std::move(listener_->connections_);
286 }
287 // Send GOAWAYs on the transports so that they disconnected when existing RPCs
288 // finish.
289 for (auto& connection : connections) {
290 connection.first->SendGoAway();
291 }
292 }
293
294 //
295 // Chttp2ServerListener::ActiveConnection::HandshakingState
296 //
297
GetConnectionDeadline(const grpc_channel_args * args)298 grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
299 int timeout_ms =
300 grpc_channel_args_find_integer(args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS,
301 {120 * GPR_MS_PER_SEC, 1, INT_MAX});
302 return ExecCtx::Get()->Now() + timeout_ms;
303 }
304
HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,grpc_channel_args * args)305 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
306 RefCountedPtr<ActiveConnection> connection_ref,
307 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
308 grpc_channel_args* args)
309 : connection_(std::move(connection_ref)),
310 accepting_pollset_(accepting_pollset),
311 acceptor_(acceptor),
312 handshake_mgr_(MakeRefCounted<HandshakeManager>()),
313 deadline_(GetConnectionDeadline(args)),
314 interested_parties_(grpc_pollset_set_create()) {
315 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
316 HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args,
317 interested_parties_, handshake_mgr_.get());
318 }
319
~HandshakingState()320 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
321 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
322 grpc_pollset_set_destroy(interested_parties_);
323 }
324
Orphan()325 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
326 {
327 MutexLock lock(&connection_->mu_);
328 if (handshake_mgr_ != nullptr) {
329 handshake_mgr_->Shutdown(
330 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
331 }
332 }
333 Unref();
334 }
335
Start(grpc_endpoint * endpoint,grpc_channel_args * args)336 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
337 grpc_endpoint* endpoint, grpc_channel_args* args) {
338 Ref().release(); // Held by OnHandshakeDone
339 RefCountedPtr<HandshakeManager> handshake_mgr;
340 {
341 MutexLock lock(&connection_->mu_);
342 if (handshake_mgr_ == nullptr) return;
343 handshake_mgr = handshake_mgr_;
344 }
345 handshake_mgr->DoHandshake(endpoint, args, deadline_, acceptor_,
346 OnHandshakeDone, this);
347 }
348
OnTimeout(void * arg,grpc_error_handle error)349 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
350 void* arg, grpc_error_handle error) {
351 HandshakingState* self = static_cast<HandshakingState*>(arg);
352 // Note that we may be called with GRPC_ERROR_NONE when the timer fires
353 // or with an error indicating that the timer system is being shut down.
354 if (error != GRPC_ERROR_CANCELLED) {
355 grpc_transport_op* op = grpc_make_transport_op(nullptr);
356 op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
357 "Did not receive HTTP/2 settings before handshake timeout");
358 grpc_chttp2_transport* transport = nullptr;
359 {
360 MutexLock lock(&self->connection_->mu_);
361 transport = self->connection_->transport_;
362 }
363 grpc_transport_perform_op(&transport->base, op);
364 }
365 self->Unref();
366 }
367
368 void Chttp2ServerListener::ActiveConnection::HandshakingState::
OnReceiveSettings(void * arg,grpc_error_handle)369 OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
370 HandshakingState* self = static_cast<HandshakingState*>(arg);
371 grpc_timer_cancel(&self->timer_);
372 self->Unref();
373 }
374
OnHandshakeDone(void * arg,grpc_error_handle error)375 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
376 void* arg, grpc_error_handle error) {
377 auto* args = static_cast<HandshakerArgs*>(arg);
378 HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
379 OrphanablePtr<HandshakingState> handshaking_state_ref;
380 RefCountedPtr<HandshakeManager> handshake_mgr;
381 bool cleanup_connection = false;
382 bool free_resource_quota = false;
383 grpc_resource_user* resource_user =
384 self->connection_->listener_->server_->default_resource_user();
385 {
386 MutexLock connection_lock(&self->connection_->mu_);
387 if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) {
388 std::string error_str = grpc_error_std_string(error);
389 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
390 cleanup_connection = true;
391 free_resource_quota = true;
392 if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
393 // We were shut down or stopped serving after handshaking completed
394 // successfully, so destroy the endpoint here.
395 // TODO(ctiller): It is currently necessary to shutdown endpoints
396 // before destroying them, even if we know that there are no
397 // pending read/write callbacks. This should be fixed, at which
398 // point this can be removed.
399 grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
400 grpc_endpoint_destroy(args->endpoint);
401 grpc_channel_args_destroy(args->args);
402 grpc_slice_buffer_destroy_internal(args->read_buffer);
403 gpr_free(args->read_buffer);
404 }
405 } else {
406 // If the handshaking succeeded but there is no endpoint, then the
407 // handshaker may have handed off the connection to some external
408 // code, so we can just clean up here without creating a transport.
409 if (args->endpoint != nullptr) {
410 grpc_transport* transport = grpc_create_chttp2_transport(
411 args->args, args->endpoint, false, resource_user);
412 grpc_error_handle channel_init_err =
413 self->connection_->listener_->server_->SetupTransport(
414 transport, self->accepting_pollset_, args->args,
415 grpc_chttp2_transport_get_socket_node(transport),
416 resource_user);
417 if (channel_init_err == GRPC_ERROR_NONE) {
418 // Use notify_on_receive_settings callback to enforce the
419 // handshake deadline.
420 // Note: The reinterpret_cast<>s here are safe, because
421 // grpc_chttp2_transport is a C-style extension of
422 // grpc_transport, so this is morally equivalent of a
423 // static_cast<> to a derived class.
424 // TODO(roth): Change to static_cast<> when we C++-ify the
425 // transport API.
426 self->connection_->transport_ =
427 reinterpret_cast<grpc_chttp2_transport*>(transport);
428 GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
429 "ActiveConnection"); // Held by connection_
430 self->Ref().release(); // Held by OnReceiveSettings().
431 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
432 self, grpc_schedule_on_exec_ctx);
433 // If the listener has been configured with a config fetcher, we need
434 // to watch on the transport being closed so that we can an updated
435 // list of active connections.
436 grpc_closure* on_close = nullptr;
437 if (self->connection_->listener_->config_fetcher_watcher_ !=
438 nullptr) {
439 // Refs helds by OnClose()
440 self->connection_->Ref().release();
441 on_close = &self->connection_->on_close_;
442 } else {
443 // Remove the connection from the connections_ map since OnClose()
444 // will not be invoked when a config fetcher is set.
445 cleanup_connection = true;
446 }
447 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
448 &self->on_receive_settings_,
449 on_close);
450 grpc_channel_args_destroy(args->args);
451 self->Ref().release(); // Held by OnTimeout().
452 GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
453 grpc_schedule_on_exec_ctx);
454 grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
455 } else {
456 // Failed to create channel from transport. Clean up.
457 gpr_log(GPR_ERROR, "Failed to create channel: %s",
458 grpc_error_std_string(channel_init_err).c_str());
459 GRPC_ERROR_UNREF(channel_init_err);
460 grpc_transport_destroy(transport);
461 grpc_slice_buffer_destroy_internal(args->read_buffer);
462 gpr_free(args->read_buffer);
463 cleanup_connection = true;
464 free_resource_quota = true;
465 grpc_channel_args_destroy(args->args);
466 }
467 } else {
468 cleanup_connection = true;
469 free_resource_quota = true;
470 }
471 }
472 // Since the handshake manager is done, the connection no longer needs to
473 // shutdown the handshake when the listener needs to stop serving.
474 // Avoid calling the destructor of HandshakeManager and HandshakingState
475 // from within the critical region.
476 handshake_mgr = std::move(self->handshake_mgr_);
477 handshaking_state_ref = std::move(self->connection_->handshaking_state_);
478 }
479 gpr_free(self->acceptor_);
480 OrphanablePtr<ActiveConnection> connection;
481 if (free_resource_quota && resource_user != nullptr) {
482 grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
483 }
484 if (cleanup_connection) {
485 MutexLock listener_lock(&self->connection_->listener_->mu_);
486 auto it = self->connection_->listener_->connections_.find(
487 self->connection_.get());
488 if (it != self->connection_->listener_->connections_.end()) {
489 connection = std::move(it->second);
490 self->connection_->listener_->connections_.erase(it);
491 }
492 }
493 self->Unref();
494 }
495
496 //
497 // Chttp2ServerListener::ActiveConnection
498 //
499
ActiveConnection(grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,grpc_channel_args * args)500 Chttp2ServerListener::ActiveConnection::ActiveConnection(
501 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
502 grpc_channel_args* args)
503 : handshaking_state_(MakeOrphanable<HandshakingState>(
504 Ref(), accepting_pollset, acceptor, args)) {
505 GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
506 grpc_schedule_on_exec_ctx);
507 }
508
~ActiveConnection()509 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
510 if (transport_ != nullptr) {
511 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
512 }
513 }
514
Orphan()515 void Chttp2ServerListener::ActiveConnection::Orphan() {
516 OrphanablePtr<HandshakingState> handshaking_state;
517 {
518 MutexLock lock(&mu_);
519 shutdown_ = true;
520 // Reset handshaking_state_ since we have been orphaned by the listener
521 // signaling that the listener has stopped serving.
522 handshaking_state = std::move(handshaking_state_);
523 }
524 Unref();
525 }
526
SendGoAway()527 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
528 grpc_chttp2_transport* transport = nullptr;
529 {
530 MutexLock lock(&mu_);
531 transport = transport_;
532 }
533 if (transport != nullptr) {
534 grpc_transport_op* op = grpc_make_transport_op(nullptr);
535 op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
536 "Server is stopping to serve requests.");
537 grpc_transport_perform_op(&transport->base, op);
538 }
539 }
540
Start(RefCountedPtr<Chttp2ServerListener> listener,grpc_endpoint * endpoint,grpc_channel_args * args)541 void Chttp2ServerListener::ActiveConnection::Start(
542 RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
543 grpc_channel_args* args) {
544 RefCountedPtr<HandshakingState> handshaking_state_ref;
545 listener_ = std::move(listener);
546 {
547 MutexLock lock(&mu_);
548 if (shutdown_) return;
549 // Hold a ref to HandshakingState to allow starting the handshake outside
550 // the critical region.
551 handshaking_state_ref = handshaking_state_->Ref();
552 }
553 handshaking_state_ref->Start(endpoint, args);
554 }
555
OnClose(void * arg,grpc_error_handle)556 void Chttp2ServerListener::ActiveConnection::OnClose(
557 void* arg, grpc_error_handle /* error */) {
558 ActiveConnection* self = static_cast<ActiveConnection*>(arg);
559 OrphanablePtr<ActiveConnection> connection;
560 {
561 MutexLock listener_lock(&self->listener_->mu_);
562 MutexLock connection_lock(&self->mu_);
563 // The node was already deleted from the connections_ list if the connection
564 // is shutdown.
565 if (!self->shutdown_) {
566 auto it = self->listener_->connections_.find(self);
567 if (it != self->listener_->connections_.end()) {
568 connection = std::move(it->second);
569 self->listener_->connections_.erase(it);
570 }
571 }
572 }
573 self->Unref();
574 }
575
576 //
577 // Chttp2ServerListener
578 //
579
Create(Server * server,grpc_resolved_address * addr,grpc_channel_args * args,Chttp2ServerArgsModifier args_modifier,int * port_num)580 grpc_error_handle Chttp2ServerListener::Create(
581 Server* server, grpc_resolved_address* addr, grpc_channel_args* args,
582 Chttp2ServerArgsModifier args_modifier, int* port_num) {
583 Chttp2ServerListener* listener = nullptr;
584 // The bulk of this method is inside of a lambda to make cleanup
585 // easier without using goto.
586 grpc_error_handle error = [&]() {
587 // Create Chttp2ServerListener.
588 listener = new Chttp2ServerListener(server, args, args_modifier);
589 error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
590 args, &listener->tcp_server_);
591 if (error != GRPC_ERROR_NONE) return error;
592 if (server->config_fetcher() != nullptr) {
593 listener->resolved_address_ = *addr;
594 // TODO(yashykt): Consider binding so as to be able to return the port
595 // number.
596 } else {
597 error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
598 if (error != GRPC_ERROR_NONE) return error;
599 }
600 // Create channelz node.
601 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
602 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
603 std::string string_address = grpc_sockaddr_to_uri(addr);
604 listener->channelz_listen_socket_ =
605 MakeRefCounted<channelz::ListenSocketNode>(
606 string_address.c_str(),
607 absl::StrFormat("chttp2 listener %s", string_address.c_str()));
608 }
609 // Register with the server only upon success
610 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
611 return GRPC_ERROR_NONE;
612 }();
613 if (error != GRPC_ERROR_NONE) {
614 if (listener != nullptr) {
615 if (listener->tcp_server_ != nullptr) {
616 // listener is deleted when tcp_server_ is shutdown.
617 grpc_tcp_server_unref(listener->tcp_server_);
618 } else {
619 delete listener;
620 }
621 } else {
622 grpc_channel_args_destroy(args);
623 }
624 }
625 return error;
626 }
627
CreateWithAcceptor(Server * server,const char * name,grpc_channel_args * args,Chttp2ServerArgsModifier args_modifier)628 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
629 Server* server, const char* name, grpc_channel_args* args,
630 Chttp2ServerArgsModifier args_modifier) {
631 Chttp2ServerListener* listener =
632 new Chttp2ServerListener(server, args, args_modifier);
633 grpc_error_handle error = grpc_tcp_server_create(
634 &listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_);
635 if (error != GRPC_ERROR_NONE) {
636 delete listener;
637 return error;
638 }
639 // TODO(yangg) channelz
640 TcpServerFdHandler** arg_val =
641 grpc_channel_args_find_pointer<TcpServerFdHandler*>(args, name);
642 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
643 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
644 return GRPC_ERROR_NONE;
645 }
646
Chttp2ServerListener(Server * server,grpc_channel_args * args,Chttp2ServerArgsModifier args_modifier)647 Chttp2ServerListener::Chttp2ServerListener(
648 Server* server, grpc_channel_args* args,
649 Chttp2ServerArgsModifier args_modifier)
650 : server_(server), args_modifier_(args_modifier), args_(args) {
651 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
652 this, grpc_schedule_on_exec_ctx);
653 }
654
~Chttp2ServerListener()655 Chttp2ServerListener::~Chttp2ServerListener() {
656 // Flush queued work before destroying handshaker factory, since that
657 // may do a synchronous unref.
658 ExecCtx::Get()->Flush();
659 if (on_destroy_done_ != nullptr) {
660 ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
661 ExecCtx::Get()->Flush();
662 }
663 grpc_channel_args_destroy(args_);
664 }
665
666 /* Server callback: start listening on our ports */
Start(Server *,const std::vector<grpc_pollset * > *)667 void Chttp2ServerListener::Start(
668 Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
669 if (server_->config_fetcher() != nullptr) {
670 grpc_channel_args* args = nullptr;
671 auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
672 config_fetcher_watcher_ = watcher.get();
673 {
674 MutexLock lock(&channel_args_mu_);
675 args = grpc_channel_args_copy(args_);
676 }
677 server_->config_fetcher()->StartWatch(
678 grpc_sockaddr_to_string(&resolved_address_, false), args,
679 std::move(watcher));
680 } else {
681 {
682 MutexLock lock(&mu_);
683 started_ = true;
684 is_serving_ = true;
685 }
686 StartListening();
687 }
688 }
689
StartListening()690 void Chttp2ServerListener::StartListening() {
691 grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
692 }
693
SetOnDestroyDone(grpc_closure * on_destroy_done)694 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
695 MutexLock lock(&mu_);
696 on_destroy_done_ = on_destroy_done;
697 }
698
OnAccept(void * arg,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)699 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
700 grpc_pollset* accepting_pollset,
701 grpc_tcp_server_acceptor* acceptor) {
702 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
703 grpc_channel_args* args = nullptr;
704 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
705 connection_manager;
706 {
707 MutexLock lock(&self->channel_args_mu_);
708 args = grpc_channel_args_copy(self->args_);
709 connection_manager = self->connection_manager_;
710 }
711 auto endpoint_cleanup = [&](grpc_error_handle error) {
712 grpc_endpoint_shutdown(tcp, error);
713 grpc_endpoint_destroy(tcp);
714 gpr_free(acceptor);
715 };
716 if (self->server_->config_fetcher() != nullptr) {
717 if (connection_manager == nullptr) {
718 grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
719 "No ConnectionManager configured. Closing connection.");
720 endpoint_cleanup(error);
721 grpc_channel_args_destroy(args);
722 return;
723 }
724 // TODO(yashykt): Maybe combine the following two arg modifiers into a
725 // single one.
726 absl::StatusOr<grpc_channel_args*> args_result =
727 connection_manager->UpdateChannelArgsForConnection(args, tcp);
728 if (!args_result.ok()) {
729 gpr_log(GPR_DEBUG, "Closing connection: %s",
730 args_result.status().ToString().c_str());
731 endpoint_cleanup(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
732 args_result.status().ToString().c_str()));
733 return;
734 }
735 grpc_error_handle error = GRPC_ERROR_NONE;
736 args = self->args_modifier_(*args_result, &error);
737 if (error != GRPC_ERROR_NONE) {
738 gpr_log(GPR_DEBUG, "Closing connection: %s",
739 grpc_error_std_string(error).c_str());
740 endpoint_cleanup(error);
741 grpc_channel_args_destroy(args);
742 return;
743 }
744 }
745 auto connection =
746 MakeOrphanable<ActiveConnection>(accepting_pollset, acceptor, args);
747 // Hold a ref to connection to allow starting handshake outside the
748 // critical region
749 RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
750 RefCountedPtr<Chttp2ServerListener> listener_ref;
751 {
752 MutexLock lock(&self->mu_);
753 // Shutdown the the connection if listener's stopped serving.
754 if (!self->shutdown_ && self->is_serving_) {
755 grpc_resource_user* resource_user =
756 self->server_->default_resource_user();
757 if (resource_user != nullptr &&
758 !grpc_resource_user_safe_alloc(resource_user,
759 GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
760 gpr_log(
761 GPR_ERROR,
762 "Memory quota exhausted, rejecting connection, no handshaking.");
763 } else {
764 // This ref needs to be taken in the critical region after having made
765 // sure that the listener has not been Orphaned, so as to avoid
766 // heap-use-after-free issues where `Ref()` is invoked when the ref of
767 // tcp_server_ has already reached 0. (Ref() implementation of
768 // Chttp2ServerListener is grpc_tcp_server_ref().)
769 listener_ref = self->Ref();
770 self->connections_.emplace(connection.get(), std::move(connection));
771 }
772 }
773 }
774 if (connection != nullptr) {
775 endpoint_cleanup(GRPC_ERROR_NONE);
776 } else {
777 connection_ref->Start(std::move(listener_ref), tcp, args);
778 }
779 grpc_channel_args_destroy(args);
780 }
781
TcpServerShutdownComplete(void * arg,grpc_error_handle error)782 void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
783 grpc_error_handle error) {
784 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
785 self->channelz_listen_socket_.reset();
786 GRPC_ERROR_UNREF(error);
787 delete self;
788 }
789
790 /* Server callback: destroy the tcp listener (so we don't generate further
791 callbacks) */
Orphan()792 void Chttp2ServerListener::Orphan() {
793 // Cancel the watch before shutting down so as to avoid holding a ref to the
794 // listener in the watcher.
795 if (config_fetcher_watcher_ != nullptr) {
796 server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
797 }
798 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
799 grpc_tcp_server* tcp_server;
800 {
801 MutexLock lock(&mu_);
802 shutdown_ = true;
803 is_serving_ = false;
804 // Orphan the connections so that they can start cleaning up.
805 connections = std::move(connections_);
806 // If the listener is currently set to be serving but has not been started
807 // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
808 // operation to finish to avoid causing races.
809 while (is_serving_ && !started_) {
810 started_cv_.Wait(&mu_);
811 }
812 tcp_server = tcp_server_;
813 }
814 grpc_tcp_server_shutdown_listeners(tcp_server);
815 grpc_tcp_server_unref(tcp_server);
816 }
817
818 } // namespace
819
820 //
821 // Chttp2ServerAddPort()
822 //
823
Chttp2ServerAddPort(Server * server,const char * addr,grpc_channel_args * args,Chttp2ServerArgsModifier args_modifier,int * port_num)824 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
825 grpc_channel_args* args,
826 Chttp2ServerArgsModifier args_modifier,
827 int* port_num) {
828 if (strncmp(addr, "external:", 9) == 0) {
829 return grpc_core::Chttp2ServerListener::CreateWithAcceptor(
830 server, addr, args, args_modifier);
831 }
832 *port_num = -1;
833 grpc_resolved_addresses* resolved = nullptr;
834 std::vector<grpc_error_handle> error_list;
835 // Using lambda to avoid use of goto.
836 grpc_error_handle error = [&]() {
837 if (absl::StartsWith(addr, kUnixUriPrefix)) {
838 error = grpc_resolve_unix_domain_address(
839 addr + sizeof(kUnixUriPrefix) - 1, &resolved);
840 } else if (absl::StartsWith(addr, kUnixAbstractUriPrefix)) {
841 error = grpc_resolve_unix_abstract_domain_address(
842 addr + sizeof(kUnixAbstractUriPrefix) - 1, &resolved);
843 } else {
844 error = grpc_blocking_resolve_address(addr, "https", &resolved);
845 }
846 if (error != GRPC_ERROR_NONE) return error;
847 // Create a listener for each resolved address.
848 for (size_t i = 0; i < resolved->naddrs; i++) {
849 // If address has a wildcard port (0), use the same port as a previous
850 // listener.
851 if (*port_num != -1 && grpc_sockaddr_get_port(&resolved->addrs[i]) == 0) {
852 grpc_sockaddr_set_port(&resolved->addrs[i], *port_num);
853 }
854 int port_temp = -1;
855 error = grpc_core::Chttp2ServerListener::Create(
856 server, &resolved->addrs[i], grpc_channel_args_copy(args),
857 args_modifier, &port_temp);
858 if (error != GRPC_ERROR_NONE) {
859 error_list.push_back(error);
860 } else {
861 if (*port_num == -1) {
862 *port_num = port_temp;
863 } else {
864 GPR_ASSERT(*port_num == port_temp);
865 }
866 }
867 }
868 if (error_list.size() == resolved->naddrs) {
869 std::string msg =
870 absl::StrFormat("No address added out of total %" PRIuPTR " resolved",
871 resolved->naddrs);
872 return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
873 msg.c_str(), error_list.data(), error_list.size());
874 } else if (!error_list.empty()) {
875 std::string msg = absl::StrFormat(
876 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
877 " resolved",
878 resolved->naddrs - error_list.size(), resolved->naddrs);
879 error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
880 msg.c_str(), error_list.data(), error_list.size());
881 gpr_log(GPR_INFO, "WARNING: %s", grpc_error_std_string(error).c_str());
882 GRPC_ERROR_UNREF(error);
883 // we managed to bind some addresses: continue without error
884 }
885 return GRPC_ERROR_NONE;
886 }(); // lambda end
887 for (grpc_error_handle error : error_list) {
888 GRPC_ERROR_UNREF(error);
889 }
890 grpc_channel_args_destroy(args);
891 if (resolved != nullptr) {
892 grpc_resolved_addresses_destroy(resolved);
893 }
894 if (error != GRPC_ERROR_NONE) *port_num = 0;
895 return error;
896 }
897
898 } // namespace grpc_core
899