1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
20
21 #include <grpc/slice.h>
22 #include <grpc/slice_buffer.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/port_platform.h>
25
26 #include <memory>
27 #include <utility>
28
29 #include "absl/base/thread_annotations.h"
30 #include "absl/functional/any_invocable.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/types/optional.h"
35 #include "src/core/config/core_configuration.h"
36 #include "src/core/handshaker/handshaker.h"
37 #include "src/core/handshaker/handshaker_factory.h"
38 #include "src/core/handshaker/handshaker_registry.h"
39 #include "src/core/lib/address_utils/parse_address.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/iomgr/iomgr_fwd.h"
47 #include "src/core/lib/iomgr/polling_entity.h"
48 #include "src/core/lib/iomgr/pollset_set.h"
49 #include "src/core/lib/iomgr/resolved_address.h"
50 #include "src/core/lib/iomgr/tcp_client.h"
51 #include "src/core/lib/iomgr/tcp_server.h"
52 #include "src/core/util/debug_location.h"
53 #include "src/core/util/ref_counted_ptr.h"
54 #include "src/core/util/sync.h"
55 #include "src/core/util/uri.h"
56
57 namespace grpc_core {
58
59 namespace {
60
61 class TCPConnectHandshaker : public Handshaker {
62 public:
63 explicit TCPConnectHandshaker(grpc_pollset_set* pollset_set);
name() const64 absl::string_view name() const override { return "tcp_connect"; }
65 void DoHandshake(
66 HandshakerArgs* args,
67 absl::AnyInvocable<void(absl::Status)> on_handshake_done) override;
68 void Shutdown(absl::Status error) override;
69
70 private:
71 ~TCPConnectHandshaker() override;
72 void FinishLocked(absl::Status error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
73 static void Connected(void* arg, grpc_error_handle error);
74
75 Mutex mu_;
76 bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
77 // Endpoint to destroy after a shutdown.
78 grpc_endpoint* endpoint_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
79 absl::AnyInvocable<void(absl::Status)> on_handshake_done_
80 ABSL_GUARDED_BY(mu_);
81 grpc_pollset_set* interested_parties_ = nullptr;
82 grpc_polling_entity pollent_;
83 HandshakerArgs* args_ = nullptr;
84 bool bind_endpoint_to_pollset_ = false;
85 grpc_resolved_address addr_;
86 grpc_closure connected_;
87 };
88
TCPConnectHandshaker(grpc_pollset_set * pollset_set)89 TCPConnectHandshaker::TCPConnectHandshaker(grpc_pollset_set* pollset_set)
90 : interested_parties_(grpc_pollset_set_create()),
91 pollent_(grpc_polling_entity_create_from_pollset_set(pollset_set)) {
92 // Interested parties might be null for platforms like Apple.
93 // Explicitly check before adding/deleting from pollset_set to handle this
94 // use case.
95 if (interested_parties_ != nullptr) {
96 grpc_polling_entity_add_to_pollset_set(&pollent_, interested_parties_);
97 }
98 GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx);
99 }
100
Shutdown(absl::Status)101 void TCPConnectHandshaker::Shutdown(absl::Status /*error*/) {
102 // TODO(anramach): After migration to EventEngine, cancel the in-progress
103 // TCP connection attempt.
104 MutexLock lock(&mu_);
105 if (!shutdown_) {
106 shutdown_ = true;
107 // If we are shutting down while connecting, respond back with
108 // handshake done.
109 // The callback from grpc_tcp_client_connect will perform
110 // the necessary clean up.
111 if (on_handshake_done_ != nullptr) {
112 // TODO(roth): When we remove the legacy grpc_error APIs, propagate the
113 // status passed to shutdown as part of the message here.
114 FinishLocked(GRPC_ERROR_CREATE("tcp handshaker shutdown"));
115 }
116 }
117 }
118
DoHandshake(HandshakerArgs * args,absl::AnyInvocable<void (absl::Status)> on_handshake_done)119 void TCPConnectHandshaker::DoHandshake(
120 HandshakerArgs* args,
121 absl::AnyInvocable<void(absl::Status)> on_handshake_done) {
122 {
123 MutexLock lock(&mu_);
124 on_handshake_done_ = std::move(on_handshake_done);
125 }
126 CHECK_EQ(args->endpoint.get(), nullptr);
127 args_ = args;
128 absl::StatusOr<URI> uri = URI::Parse(
129 args->args.GetString(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS).value());
130 if (!uri.ok() || !grpc_parse_uri(*uri, &addr_)) {
131 MutexLock lock(&mu_);
132 FinishLocked(GRPC_ERROR_CREATE("Resolved address in invalid format"));
133 return;
134 }
135 bind_endpoint_to_pollset_ =
136 args->args.GetBool(GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET)
137 .value_or(false);
138 // Update args to not contain the args relevant to TCP connect handshaker.
139 args->args = args->args.Remove(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS)
140 .Remove(GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET);
141 // In some implementations, the closure can be flushed before
142 // grpc_tcp_client_connect() returns, and since the closure requires access
143 // to mu_, this can result in a deadlock (see
144 // https://github.com/grpc/grpc/issues/16427 for details).
145 // grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we
146 // make sure that we still exist at that point by taking a ref.
147 Ref().release(); // Ref held by callback.
148 // As we fake the TCP client connection failure when shutdown is called
149 // we don't want to pass args->endpoint directly.
150 // Instead pass endpoint_to_destroy_ and swap this endpoint to
151 // args endpoint on success.
152 grpc_tcp_client_connect(
153 &connected_, &endpoint_to_destroy_, interested_parties_,
154 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args->args),
155 &addr_, args->deadline);
156 }
157
Connected(void * arg,grpc_error_handle error)158 void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) {
159 RefCountedPtr<TCPConnectHandshaker> self(
160 static_cast<TCPConnectHandshaker*>(arg));
161 {
162 MutexLock lock(&self->mu_);
163 if (!error.ok() || self->shutdown_) {
164 if (error.ok()) {
165 error = GRPC_ERROR_CREATE("tcp handshaker shutdown");
166 }
167 if (self->endpoint_to_destroy_ != nullptr) {
168 grpc_endpoint_destroy(self->endpoint_to_destroy_);
169 self->endpoint_to_destroy_ = nullptr;
170 }
171 if (!self->shutdown_) {
172 self->shutdown_ = true;
173 self->FinishLocked(std::move(error));
174 } else {
175 // The on_handshake_done_ callback was already invoked as part of
176 // shutdown when connecting, so nothing to be done here.
177 }
178 return;
179 }
180 CHECK_NE(self->endpoint_to_destroy_, nullptr);
181 self->args_->endpoint.reset(self->endpoint_to_destroy_);
182 self->endpoint_to_destroy_ = nullptr;
183 if (self->bind_endpoint_to_pollset_) {
184 grpc_endpoint_add_to_pollset_set(self->args_->endpoint.get(),
185 self->interested_parties_);
186 }
187 self->FinishLocked(absl::OkStatus());
188 }
189 }
190
~TCPConnectHandshaker()191 TCPConnectHandshaker::~TCPConnectHandshaker() {
192 if (endpoint_to_destroy_ != nullptr) {
193 grpc_endpoint_destroy(endpoint_to_destroy_);
194 }
195 grpc_pollset_set_destroy(interested_parties_);
196 }
197
FinishLocked(absl::Status error)198 void TCPConnectHandshaker::FinishLocked(absl::Status error) {
199 if (interested_parties_ != nullptr) {
200 grpc_polling_entity_del_from_pollset_set(&pollent_, interested_parties_);
201 }
202 InvokeOnHandshakeDone(args_, std::move(on_handshake_done_), std::move(error));
203 }
204
205 //
206 // TCPConnectHandshakerFactory
207 //
208
209 class TCPConnectHandshakerFactory : public HandshakerFactory {
210 public:
AddHandshakers(const ChannelArgs &,grpc_pollset_set * interested_parties,HandshakeManager * handshake_mgr)211 void AddHandshakers(const ChannelArgs& /*args*/,
212 grpc_pollset_set* interested_parties,
213 HandshakeManager* handshake_mgr) override {
214 handshake_mgr->Add(
215 MakeRefCounted<TCPConnectHandshaker>(interested_parties));
216 }
Priority()217 HandshakerPriority Priority() override {
218 return HandshakerPriority::kTCPConnectHandshakers;
219 }
220 ~TCPConnectHandshakerFactory() override = default;
221 };
222
223 } // namespace
224
RegisterTCPConnectHandshaker(CoreConfiguration::Builder * builder)225 void RegisterTCPConnectHandshaker(CoreConfiguration::Builder* builder) {
226 builder->handshaker_registry()->RegisterHandshakerFactory(
227 HANDSHAKER_CLIENT, std::make_unique<TCPConnectHandshakerFactory>());
228 }
229
230 } // namespace grpc_core
231