• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/handshaker/handshaker.h"
20 
21 #include <grpc/byte_buffer.h>
22 #include <grpc/event_engine/event_engine.h>
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/port_platform.h>
26 #include <inttypes.h>
27 
28 #include <string>
29 #include <utility>
30 
31 #include "absl/functional/any_invocable.h"
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_format.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/iomgr/endpoint.h"
40 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/util/debug_location.h"
43 #include "src/core/util/status_helper.h"
44 #include "src/core/util/time.h"
45 
46 using ::grpc_event_engine::experimental::EventEngine;
47 
48 namespace grpc_core {
49 
InvokeOnHandshakeDone(HandshakerArgs * args,absl::AnyInvocable<void (absl::Status)> on_handshake_done,absl::Status status)50 void Handshaker::InvokeOnHandshakeDone(
51     HandshakerArgs* args,
52     absl::AnyInvocable<void(absl::Status)> on_handshake_done,
53     absl::Status status) {
54   args->event_engine->Run([on_handshake_done = std::move(on_handshake_done),
55                            status = std::move(status)]() mutable {
56     ApplicationCallbackExecCtx callback_exec_ctx;
57     ExecCtx exec_ctx;
58     on_handshake_done(std::move(status));
59     // Destroy callback while ExecCtx is still in scope.
60     on_handshake_done = nullptr;
61   });
62 }
63 
64 namespace {
65 
HandshakerArgsString(HandshakerArgs * args)66 std::string HandshakerArgsString(HandshakerArgs* args) {
67   return absl::StrFormat("{endpoint=%p, args=%s, read_buffer.Length()=%" PRIuPTR
68                          ", exit_early=%d}",
69                          args->endpoint.get(), args->args.ToString(),
70                          args->read_buffer.Length(), args->exit_early);
71 }
72 
73 }  // namespace
74 
HandshakeManager()75 HandshakeManager::HandshakeManager()
76     : RefCounted(GRPC_TRACE_FLAG_ENABLED(handshaker) ? "HandshakeManager"
77                                                      : nullptr) {}
78 
Add(RefCountedPtr<Handshaker> handshaker)79 void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
80   MutexLock lock(&mu_);
81   GRPC_TRACE_LOG(handshaker, INFO)
82       << "handshake_manager " << this << ": adding handshaker "
83       << std::string(handshaker->name()) << " [" << handshaker.get()
84       << "] at index " << handshakers_.size();
85   handshakers_.push_back(std::move(handshaker));
86 }
87 
DoHandshake(OrphanablePtr<grpc_endpoint> endpoint,const ChannelArgs & channel_args,Timestamp deadline,grpc_tcp_server_acceptor * acceptor,absl::AnyInvocable<void (absl::StatusOr<HandshakerArgs * >)> on_handshake_done)88 void HandshakeManager::DoHandshake(
89     OrphanablePtr<grpc_endpoint> endpoint, const ChannelArgs& channel_args,
90     Timestamp deadline, grpc_tcp_server_acceptor* acceptor,
91     absl::AnyInvocable<void(absl::StatusOr<HandshakerArgs*>)>
92         on_handshake_done) {
93   // We hold a ref until after the mutex is released, because we might
94   // wind up invoking on_handshake_done in another thread before we
95   // return from this function, and on_handshake_done might release the
96   // last ref to this object.
97   auto self = Ref();
98   MutexLock lock(&mu_);
99   CHECK_EQ(index_, 0u);
100   on_handshake_done_ = std::move(on_handshake_done);
101   // Construct handshaker args.  These will be passed through all
102   // handshakers and eventually be freed by the on_handshake_done callback.
103   args_.endpoint = std::move(endpoint);
104   args_.deadline = deadline;
105   args_.args = channel_args;
106   args_.event_engine = args_.args.GetObject<EventEngine>();
107   args_.acceptor = acceptor;
108   if (acceptor != nullptr && acceptor->external_connection &&
109       acceptor->pending_data != nullptr) {
110     grpc_slice_buffer_swap(args_.read_buffer.c_slice_buffer(),
111                            &(acceptor->pending_data->data.raw.slice_buffer));
112     // TODO(vigneshbabu): For connections accepted through event engine
113     // listeners, the ownership of the byte buffer received is transferred to
114     // this callback and it is thus this callback's duty to delete it.
115     // Make this hack default once event engine is rolled out.
116     if (grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
117             args_.endpoint.get())) {
118       grpc_byte_buffer_destroy(acceptor->pending_data);
119     }
120   }
121   // Start deadline timer, which owns a ref.
122   const Duration time_to_deadline = deadline - Timestamp::Now();
123   deadline_timer_handle_ =
124       args_.event_engine->RunAfter(time_to_deadline, [self = Ref()]() mutable {
125         ApplicationCallbackExecCtx callback_exec_ctx;
126         ExecCtx exec_ctx;
127         self->Shutdown(GRPC_ERROR_CREATE("Handshake timed out"));
128         // HandshakeManager deletion might require an active ExecCtx.
129         self.reset();
130       });
131   // Start first handshaker.
132   CallNextHandshakerLocked(absl::OkStatus());
133 }
134 
Shutdown(absl::Status error)135 void HandshakeManager::Shutdown(absl::Status error) {
136   MutexLock lock(&mu_);
137   if (!is_shutdown_) {
138     GRPC_TRACE_LOG(handshaker, INFO)
139         << "handshake_manager " << this << ": Shutdown() called: " << error;
140     is_shutdown_ = true;
141     // Shutdown the handshaker that's currently in progress, if any.
142     if (index_ > 0) {
143       GRPC_TRACE_LOG(handshaker, INFO)
144           << "handshake_manager " << this
145           << ": shutting down handshaker at index " << index_ - 1;
146       handshakers_[index_ - 1]->Shutdown(std::move(error));
147     }
148   }
149 }
150 
CallNextHandshakerLocked(absl::Status error)151 void HandshakeManager::CallNextHandshakerLocked(absl::Status error) {
152   GRPC_TRACE_LOG(handshaker, INFO)
153       << "handshake_manager " << this << ": error=" << error
154       << " shutdown=" << is_shutdown_ << " index=" << index_
155       << ", args=" << HandshakerArgsString(&args_);
156   CHECK(index_ <= handshakers_.size());
157   // If we got an error or we've been shut down or we're exiting early or
158   // we've finished the last handshaker, invoke the on_handshake_done
159   // callback.
160   if (!error.ok() || is_shutdown_ || args_.exit_early ||
161       index_ == handshakers_.size()) {
162     if (error.ok() && is_shutdown_) {
163       error = GRPC_ERROR_CREATE("handshaker shutdown");
164       args_.endpoint.reset();
165     }
166     GRPC_TRACE_LOG(handshaker, INFO) << "handshake_manager " << this
167                                      << ": handshaking complete -- scheduling "
168                                         "on_handshake_done with error="
169                                      << error;
170     // Cancel deadline timer, since we're invoking the on_handshake_done
171     // callback now.
172     args_.event_engine->Cancel(deadline_timer_handle_);
173     is_shutdown_ = true;
174     absl::StatusOr<HandshakerArgs*> result(&args_);
175     if (!error.ok()) result = std::move(error);
176     args_.event_engine->Run([on_handshake_done = std::move(on_handshake_done_),
177                              result = std::move(result)]() mutable {
178       ApplicationCallbackExecCtx callback_exec_ctx;
179       ExecCtx exec_ctx;
180       on_handshake_done(std::move(result));
181       // Destroy callback while ExecCtx is still in scope.
182       on_handshake_done = nullptr;
183     });
184     return;
185   }
186   // Call the next handshaker.
187   auto handshaker = handshakers_[index_];
188   GRPC_TRACE_LOG(handshaker, INFO)
189       << "handshake_manager " << this << ": calling handshaker "
190       << handshaker->name() << " [" << handshaker.get() << "] at index "
191       << index_;
192   ++index_;
193   handshaker->DoHandshake(&args_, [self = Ref()](absl::Status error) mutable {
194     MutexLock lock(&self->mu_);
195     self->CallNextHandshakerLocked(std::move(error));
196   });
197 }
198 
199 }  // namespace grpc_core
200