• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
20 
21 #include <grpc/slice.h>
22 #include <grpc/slice_buffer.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/port_platform.h>
25 
26 #include <memory>
27 #include <utility>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/functional/any_invocable.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/types/optional.h"
35 #include "src/core/config/core_configuration.h"
36 #include "src/core/handshaker/handshaker.h"
37 #include "src/core/handshaker/handshaker_factory.h"
38 #include "src/core/handshaker/handshaker_registry.h"
39 #include "src/core/lib/address_utils/parse_address.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/iomgr/iomgr_fwd.h"
47 #include "src/core/lib/iomgr/polling_entity.h"
48 #include "src/core/lib/iomgr/pollset_set.h"
49 #include "src/core/lib/iomgr/resolved_address.h"
50 #include "src/core/lib/iomgr/tcp_client.h"
51 #include "src/core/lib/iomgr/tcp_server.h"
52 #include "src/core/util/debug_location.h"
53 #include "src/core/util/ref_counted_ptr.h"
54 #include "src/core/util/sync.h"
55 #include "src/core/util/uri.h"
56 
57 namespace grpc_core {
58 
59 namespace {
60 
61 class TCPConnectHandshaker : public Handshaker {
62  public:
63   explicit TCPConnectHandshaker(grpc_pollset_set* pollset_set);
name() const64   absl::string_view name() const override { return "tcp_connect"; }
65   void DoHandshake(
66       HandshakerArgs* args,
67       absl::AnyInvocable<void(absl::Status)> on_handshake_done) override;
68   void Shutdown(absl::Status error) override;
69 
70  private:
71   ~TCPConnectHandshaker() override;
72   void FinishLocked(absl::Status error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
73   static void Connected(void* arg, grpc_error_handle error);
74 
75   Mutex mu_;
76   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
77   // Endpoint to destroy after a shutdown.
78   grpc_endpoint* endpoint_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
79   absl::AnyInvocable<void(absl::Status)> on_handshake_done_
80       ABSL_GUARDED_BY(mu_);
81   grpc_pollset_set* interested_parties_ = nullptr;
82   grpc_polling_entity pollent_;
83   HandshakerArgs* args_ = nullptr;
84   bool bind_endpoint_to_pollset_ = false;
85   grpc_resolved_address addr_;
86   grpc_closure connected_;
87 };
88 
TCPConnectHandshaker(grpc_pollset_set * pollset_set)89 TCPConnectHandshaker::TCPConnectHandshaker(grpc_pollset_set* pollset_set)
90     : interested_parties_(grpc_pollset_set_create()),
91       pollent_(grpc_polling_entity_create_from_pollset_set(pollset_set)) {
92   // Interested parties might be null for platforms like Apple.
93   // Explicitly check before adding/deleting from pollset_set to handle this
94   // use case.
95   if (interested_parties_ != nullptr) {
96     grpc_polling_entity_add_to_pollset_set(&pollent_, interested_parties_);
97   }
98   GRPC_CLOSURE_INIT(&connected_, Connected, this, grpc_schedule_on_exec_ctx);
99 }
100 
Shutdown(absl::Status)101 void TCPConnectHandshaker::Shutdown(absl::Status /*error*/) {
102   // TODO(anramach): After migration to EventEngine, cancel the in-progress
103   // TCP connection attempt.
104   MutexLock lock(&mu_);
105   if (!shutdown_) {
106     shutdown_ = true;
107     // If we are shutting down while connecting, respond back with
108     // handshake done.
109     // The callback from grpc_tcp_client_connect will perform
110     // the necessary clean up.
111     if (on_handshake_done_ != nullptr) {
112       // TODO(roth): When we remove the legacy grpc_error APIs, propagate the
113       // status passed to shutdown as part of the message here.
114       FinishLocked(GRPC_ERROR_CREATE("tcp handshaker shutdown"));
115     }
116   }
117 }
118 
DoHandshake(HandshakerArgs * args,absl::AnyInvocable<void (absl::Status)> on_handshake_done)119 void TCPConnectHandshaker::DoHandshake(
120     HandshakerArgs* args,
121     absl::AnyInvocable<void(absl::Status)> on_handshake_done) {
122   {
123     MutexLock lock(&mu_);
124     on_handshake_done_ = std::move(on_handshake_done);
125   }
126   CHECK_EQ(args->endpoint.get(), nullptr);
127   args_ = args;
128   absl::StatusOr<URI> uri = URI::Parse(
129       args->args.GetString(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS).value());
130   if (!uri.ok() || !grpc_parse_uri(*uri, &addr_)) {
131     MutexLock lock(&mu_);
132     FinishLocked(GRPC_ERROR_CREATE("Resolved address in invalid format"));
133     return;
134   }
135   bind_endpoint_to_pollset_ =
136       args->args.GetBool(GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET)
137           .value_or(false);
138   // Update args to not contain the args relevant to TCP connect handshaker.
139   args->args = args->args.Remove(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS)
140                    .Remove(GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET);
141   // In some implementations, the closure can be flushed before
142   // grpc_tcp_client_connect() returns, and since the closure requires access
143   // to mu_, this can result in a deadlock (see
144   // https://github.com/grpc/grpc/issues/16427 for details).
145   // grpc_tcp_client_connect() will fill endpoint_ with proper contents, and we
146   // make sure that we still exist at that point by taking a ref.
147   Ref().release();  // Ref held by callback.
148   // As we fake the TCP client connection failure when shutdown is called
149   // we don't want to pass args->endpoint directly.
150   // Instead pass endpoint_to_destroy_ and swap this endpoint to
151   // args endpoint on success.
152   grpc_tcp_client_connect(
153       &connected_, &endpoint_to_destroy_, interested_parties_,
154       grpc_event_engine::experimental::ChannelArgsEndpointConfig(args->args),
155       &addr_, args->deadline);
156 }
157 
Connected(void * arg,grpc_error_handle error)158 void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) {
159   RefCountedPtr<TCPConnectHandshaker> self(
160       static_cast<TCPConnectHandshaker*>(arg));
161   {
162     MutexLock lock(&self->mu_);
163     if (!error.ok() || self->shutdown_) {
164       if (error.ok()) {
165         error = GRPC_ERROR_CREATE("tcp handshaker shutdown");
166       }
167       if (self->endpoint_to_destroy_ != nullptr) {
168         grpc_endpoint_destroy(self->endpoint_to_destroy_);
169         self->endpoint_to_destroy_ = nullptr;
170       }
171       if (!self->shutdown_) {
172         self->shutdown_ = true;
173         self->FinishLocked(std::move(error));
174       } else {
175         // The on_handshake_done_ callback was already invoked as part of
176         // shutdown when connecting, so nothing to be done here.
177       }
178       return;
179     }
180     CHECK_NE(self->endpoint_to_destroy_, nullptr);
181     self->args_->endpoint.reset(self->endpoint_to_destroy_);
182     self->endpoint_to_destroy_ = nullptr;
183     if (self->bind_endpoint_to_pollset_) {
184       grpc_endpoint_add_to_pollset_set(self->args_->endpoint.get(),
185                                        self->interested_parties_);
186     }
187     self->FinishLocked(absl::OkStatus());
188   }
189 }
190 
~TCPConnectHandshaker()191 TCPConnectHandshaker::~TCPConnectHandshaker() {
192   if (endpoint_to_destroy_ != nullptr) {
193     grpc_endpoint_destroy(endpoint_to_destroy_);
194   }
195   grpc_pollset_set_destroy(interested_parties_);
196 }
197 
FinishLocked(absl::Status error)198 void TCPConnectHandshaker::FinishLocked(absl::Status error) {
199   if (interested_parties_ != nullptr) {
200     grpc_polling_entity_del_from_pollset_set(&pollent_, interested_parties_);
201   }
202   InvokeOnHandshakeDone(args_, std::move(on_handshake_done_), std::move(error));
203 }
204 
205 //
206 // TCPConnectHandshakerFactory
207 //
208 
209 class TCPConnectHandshakerFactory : public HandshakerFactory {
210  public:
AddHandshakers(const ChannelArgs &,grpc_pollset_set * interested_parties,HandshakeManager * handshake_mgr)211   void AddHandshakers(const ChannelArgs& /*args*/,
212                       grpc_pollset_set* interested_parties,
213                       HandshakeManager* handshake_mgr) override {
214     handshake_mgr->Add(
215         MakeRefCounted<TCPConnectHandshaker>(interested_parties));
216   }
Priority()217   HandshakerPriority Priority() override {
218     return HandshakerPriority::kTCPConnectHandshakers;
219   }
220   ~TCPConnectHandshakerFactory() override = default;
221 };
222 
223 }  // namespace
224 
RegisterTCPConnectHandshaker(CoreConfiguration::Builder * builder)225 void RegisterTCPConnectHandshaker(CoreConfiguration::Builder* builder) {
226   builder->handshaker_registry()->RegisterHandshakerFactory(
227       HANDSHAKER_CLIENT, std::make_unique<TCPConnectHandshakerFactory>());
228 }
229 
230 }  // namespace grpc_core
231