1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/handshaker/handshaker.h"
20
21 #include <grpc/byte_buffer.h>
22 #include <grpc/event_engine/event_engine.h>
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/port_platform.h>
26 #include <inttypes.h>
27
28 #include <string>
29 #include <utility>
30
31 #include "absl/functional/any_invocable.h"
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_format.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/iomgr/endpoint.h"
40 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/util/debug_location.h"
43 #include "src/core/util/status_helper.h"
44 #include "src/core/util/time.h"
45
46 using ::grpc_event_engine::experimental::EventEngine;
47
48 namespace grpc_core {
49
InvokeOnHandshakeDone(HandshakerArgs * args,absl::AnyInvocable<void (absl::Status)> on_handshake_done,absl::Status status)50 void Handshaker::InvokeOnHandshakeDone(
51 HandshakerArgs* args,
52 absl::AnyInvocable<void(absl::Status)> on_handshake_done,
53 absl::Status status) {
54 args->event_engine->Run([on_handshake_done = std::move(on_handshake_done),
55 status = std::move(status)]() mutable {
56 ApplicationCallbackExecCtx callback_exec_ctx;
57 ExecCtx exec_ctx;
58 on_handshake_done(std::move(status));
59 // Destroy callback while ExecCtx is still in scope.
60 on_handshake_done = nullptr;
61 });
62 }
63
64 namespace {
65
HandshakerArgsString(HandshakerArgs * args)66 std::string HandshakerArgsString(HandshakerArgs* args) {
67 return absl::StrFormat("{endpoint=%p, args=%s, read_buffer.Length()=%" PRIuPTR
68 ", exit_early=%d}",
69 args->endpoint.get(), args->args.ToString(),
70 args->read_buffer.Length(), args->exit_early);
71 }
72
73 } // namespace
74
HandshakeManager()75 HandshakeManager::HandshakeManager()
76 : RefCounted(GRPC_TRACE_FLAG_ENABLED(handshaker) ? "HandshakeManager"
77 : nullptr) {}
78
Add(RefCountedPtr<Handshaker> handshaker)79 void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
80 MutexLock lock(&mu_);
81 GRPC_TRACE_LOG(handshaker, INFO)
82 << "handshake_manager " << this << ": adding handshaker "
83 << std::string(handshaker->name()) << " [" << handshaker.get()
84 << "] at index " << handshakers_.size();
85 handshakers_.push_back(std::move(handshaker));
86 }
87
DoHandshake(OrphanablePtr<grpc_endpoint> endpoint,const ChannelArgs & channel_args,Timestamp deadline,grpc_tcp_server_acceptor * acceptor,absl::AnyInvocable<void (absl::StatusOr<HandshakerArgs * >)> on_handshake_done)88 void HandshakeManager::DoHandshake(
89 OrphanablePtr<grpc_endpoint> endpoint, const ChannelArgs& channel_args,
90 Timestamp deadline, grpc_tcp_server_acceptor* acceptor,
91 absl::AnyInvocable<void(absl::StatusOr<HandshakerArgs*>)>
92 on_handshake_done) {
93 // We hold a ref until after the mutex is released, because we might
94 // wind up invoking on_handshake_done in another thread before we
95 // return from this function, and on_handshake_done might release the
96 // last ref to this object.
97 auto self = Ref();
98 MutexLock lock(&mu_);
99 CHECK_EQ(index_, 0u);
100 on_handshake_done_ = std::move(on_handshake_done);
101 // Construct handshaker args. These will be passed through all
102 // handshakers and eventually be freed by the on_handshake_done callback.
103 args_.endpoint = std::move(endpoint);
104 args_.deadline = deadline;
105 args_.args = channel_args;
106 args_.event_engine = args_.args.GetObject<EventEngine>();
107 args_.acceptor = acceptor;
108 if (acceptor != nullptr && acceptor->external_connection &&
109 acceptor->pending_data != nullptr) {
110 grpc_slice_buffer_swap(args_.read_buffer.c_slice_buffer(),
111 &(acceptor->pending_data->data.raw.slice_buffer));
112 // TODO(vigneshbabu): For connections accepted through event engine
113 // listeners, the ownership of the byte buffer received is transferred to
114 // this callback and it is thus this callback's duty to delete it.
115 // Make this hack default once event engine is rolled out.
116 if (grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
117 args_.endpoint.get())) {
118 grpc_byte_buffer_destroy(acceptor->pending_data);
119 }
120 }
121 // Start deadline timer, which owns a ref.
122 const Duration time_to_deadline = deadline - Timestamp::Now();
123 deadline_timer_handle_ =
124 args_.event_engine->RunAfter(time_to_deadline, [self = Ref()]() mutable {
125 ApplicationCallbackExecCtx callback_exec_ctx;
126 ExecCtx exec_ctx;
127 self->Shutdown(GRPC_ERROR_CREATE("Handshake timed out"));
128 // HandshakeManager deletion might require an active ExecCtx.
129 self.reset();
130 });
131 // Start first handshaker.
132 CallNextHandshakerLocked(absl::OkStatus());
133 }
134
Shutdown(absl::Status error)135 void HandshakeManager::Shutdown(absl::Status error) {
136 MutexLock lock(&mu_);
137 if (!is_shutdown_) {
138 GRPC_TRACE_LOG(handshaker, INFO)
139 << "handshake_manager " << this << ": Shutdown() called: " << error;
140 is_shutdown_ = true;
141 // Shutdown the handshaker that's currently in progress, if any.
142 if (index_ > 0) {
143 GRPC_TRACE_LOG(handshaker, INFO)
144 << "handshake_manager " << this
145 << ": shutting down handshaker at index " << index_ - 1;
146 handshakers_[index_ - 1]->Shutdown(std::move(error));
147 }
148 }
149 }
150
CallNextHandshakerLocked(absl::Status error)151 void HandshakeManager::CallNextHandshakerLocked(absl::Status error) {
152 GRPC_TRACE_LOG(handshaker, INFO)
153 << "handshake_manager " << this << ": error=" << error
154 << " shutdown=" << is_shutdown_ << " index=" << index_
155 << ", args=" << HandshakerArgsString(&args_);
156 CHECK(index_ <= handshakers_.size());
157 // If we got an error or we've been shut down or we're exiting early or
158 // we've finished the last handshaker, invoke the on_handshake_done
159 // callback.
160 if (!error.ok() || is_shutdown_ || args_.exit_early ||
161 index_ == handshakers_.size()) {
162 if (error.ok() && is_shutdown_) {
163 error = GRPC_ERROR_CREATE("handshaker shutdown");
164 args_.endpoint.reset();
165 }
166 GRPC_TRACE_LOG(handshaker, INFO) << "handshake_manager " << this
167 << ": handshaking complete -- scheduling "
168 "on_handshake_done with error="
169 << error;
170 // Cancel deadline timer, since we're invoking the on_handshake_done
171 // callback now.
172 args_.event_engine->Cancel(deadline_timer_handle_);
173 is_shutdown_ = true;
174 absl::StatusOr<HandshakerArgs*> result(&args_);
175 if (!error.ok()) result = std::move(error);
176 args_.event_engine->Run([on_handshake_done = std::move(on_handshake_done_),
177 result = std::move(result)]() mutable {
178 ApplicationCallbackExecCtx callback_exec_ctx;
179 ExecCtx exec_ctx;
180 on_handshake_done(std::move(result));
181 // Destroy callback while ExecCtx is still in scope.
182 on_handshake_done = nullptr;
183 });
184 return;
185 }
186 // Call the next handshaker.
187 auto handshaker = handshakers_[index_];
188 GRPC_TRACE_LOG(handshaker, INFO)
189 << "handshake_manager " << this << ": calling handshaker "
190 << handshaker->name() << " [" << handshaker.get() << "] at index "
191 << index_;
192 ++index_;
193 handshaker->DoHandshake(&args_, [self = Ref()](absl::Status error) mutable {
194 MutexLock lock(&self->mu_);
195 self->CallNextHandshakerLocked(std::move(error));
196 });
197 }
198
199 } // namespace grpc_core
200