1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/handshaker.h"
22
23 #include <inttypes.h>
24
25 #include <string>
26 #include <utility>
27
28 #include "absl/status/status.h"
29 #include "absl/strings/str_format.h"
30
31 #include <grpc/byte_buffer.h>
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/slice_buffer.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/debug_location.h"
40 #include "src/core/lib/gprpp/status_helper.h"
41 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
42 #include "src/core/lib/iomgr/exec_ctx.h"
43
44 namespace grpc_core {
45
46 TraceFlag grpc_handshaker_trace(false, "handshaker");
47
48 namespace {
49
50 using ::grpc_event_engine::experimental::EventEngine;
51
HandshakerArgsString(HandshakerArgs * args)52 std::string HandshakerArgsString(HandshakerArgs* args) {
53 size_t read_buffer_length =
54 args->read_buffer != nullptr ? args->read_buffer->length : 0;
55 return absl::StrFormat(
56 "{endpoint=%p, args=%s, read_buffer=%p (length=%" PRIuPTR
57 "), exit_early=%d}",
58 args->endpoint, args->args.ToString(), args->read_buffer,
59 read_buffer_length, args->exit_early);
60 }
61
62 } // namespace
63
HandshakeManager()64 HandshakeManager::HandshakeManager()
65 : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)
66 ? "HandshakeManager"
67 : nullptr) {}
68
Add(RefCountedPtr<Handshaker> handshaker)69 void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
70 MutexLock lock(&mu_);
71 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
72 gpr_log(
73 GPR_INFO,
74 "handshake_manager %p: adding handshaker %s [%p] at index %" PRIuPTR,
75 this, handshaker->name(), handshaker.get(), handshakers_.size());
76 }
77 handshakers_.push_back(std::move(handshaker));
78 }
79
~HandshakeManager()80 HandshakeManager::~HandshakeManager() { handshakers_.clear(); }
81
Shutdown(grpc_error_handle why)82 void HandshakeManager::Shutdown(grpc_error_handle why) {
83 {
84 MutexLock lock(&mu_);
85 // Shutdown the handshaker that's currently in progress, if any.
86 if (!is_shutdown_ && index_ > 0) {
87 is_shutdown_ = true;
88 handshakers_[index_ - 1]->Shutdown(why);
89 }
90 }
91 }
92
93 // Helper function to call either the next handshaker or the
94 // on_handshake_done callback.
95 // Returns true if we've scheduled the on_handshake_done callback.
CallNextHandshakerLocked(grpc_error_handle error)96 bool HandshakeManager::CallNextHandshakerLocked(grpc_error_handle error) {
97 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
98 gpr_log(GPR_INFO,
99 "handshake_manager %p: error=%s shutdown=%d index=%" PRIuPTR
100 ", args=%s",
101 this, StatusToString(error).c_str(), is_shutdown_, index_,
102 HandshakerArgsString(&args_).c_str());
103 }
104 GPR_ASSERT(index_ <= handshakers_.size());
105 // If we got an error or we've been shut down or we're exiting early or
106 // we've finished the last handshaker, invoke the on_handshake_done
107 // callback. Otherwise, call the next handshaker.
108 if (!error.ok() || is_shutdown_ || args_.exit_early ||
109 index_ == handshakers_.size()) {
110 if (error.ok() && is_shutdown_) {
111 error = GRPC_ERROR_CREATE("handshaker shutdown");
112 // It is possible that the endpoint has already been destroyed by
113 // a shutdown call while this callback was sitting on the ExecCtx
114 // with no error.
115 if (args_.endpoint != nullptr) {
116 // TODO(roth): It is currently necessary to shutdown endpoints
117 // before destroying then, even when we know that there are no
118 // pending read/write callbacks. This should be fixed, at which
119 // point this can be removed.
120 grpc_endpoint_shutdown(args_.endpoint, error);
121 grpc_endpoint_destroy(args_.endpoint);
122 args_.endpoint = nullptr;
123 args_.args = ChannelArgs();
124 grpc_slice_buffer_destroy(args_.read_buffer);
125 gpr_free(args_.read_buffer);
126 args_.read_buffer = nullptr;
127 }
128 }
129 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
130 gpr_log(GPR_INFO,
131 "handshake_manager %p: handshaking complete -- scheduling "
132 "on_handshake_done with error=%s",
133 this, StatusToString(error).c_str());
134 }
135 // Cancel deadline timer, since we're invoking the on_handshake_done
136 // callback now.
137 event_engine_->Cancel(deadline_timer_handle_);
138 ExecCtx::Run(DEBUG_LOCATION, &on_handshake_done_, error);
139 is_shutdown_ = true;
140 } else {
141 auto handshaker = handshakers_[index_];
142 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
143 gpr_log(
144 GPR_INFO,
145 "handshake_manager %p: calling handshaker %s [%p] at index %" PRIuPTR,
146 this, handshaker->name(), handshaker.get(), index_);
147 }
148 handshaker->DoHandshake(acceptor_, &call_next_handshaker_, &args_);
149 }
150 ++index_;
151 return is_shutdown_;
152 }
153
CallNextHandshakerFn(void * arg,grpc_error_handle error)154 void HandshakeManager::CallNextHandshakerFn(void* arg,
155 grpc_error_handle error) {
156 auto* mgr = static_cast<HandshakeManager*>(arg);
157 bool done;
158 {
159 MutexLock lock(&mgr->mu_);
160 done = mgr->CallNextHandshakerLocked(error);
161 }
162 // If we're invoked the final callback, we won't be coming back
163 // to this function, so we can release our reference to the
164 // handshake manager.
165 if (done) {
166 mgr->Unref();
167 }
168 }
169
DoHandshake(grpc_endpoint * endpoint,const ChannelArgs & channel_args,Timestamp deadline,grpc_tcp_server_acceptor * acceptor,grpc_iomgr_cb_func on_handshake_done,void * user_data)170 void HandshakeManager::DoHandshake(grpc_endpoint* endpoint,
171 const ChannelArgs& channel_args,
172 Timestamp deadline,
173 grpc_tcp_server_acceptor* acceptor,
174 grpc_iomgr_cb_func on_handshake_done,
175 void* user_data) {
176 bool done;
177 {
178 MutexLock lock(&mu_);
179 GPR_ASSERT(index_ == 0);
180 // Construct handshaker args. These will be passed through all
181 // handshakers and eventually be freed by the on_handshake_done callback.
182 args_.endpoint = endpoint;
183 args_.deadline = deadline;
184 args_.args = channel_args;
185 args_.user_data = user_data;
186 args_.read_buffer =
187 static_cast<grpc_slice_buffer*>(gpr_malloc(sizeof(*args_.read_buffer)));
188 grpc_slice_buffer_init(args_.read_buffer);
189 if (acceptor != nullptr && acceptor->external_connection &&
190 acceptor->pending_data != nullptr) {
191 grpc_slice_buffer_swap(args_.read_buffer,
192 &(acceptor->pending_data->data.raw.slice_buffer));
193 // TODO(vigneshbabu): For connections accepted through event engine
194 // listeners, the ownership of the byte buffer received is transferred to
195 // this callback and it is thus this callback's duty to delete it.
196 // Make this hack default once event engine is rolled out.
197 if (grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
198 endpoint)) {
199 grpc_byte_buffer_destroy(acceptor->pending_data);
200 }
201 }
202 // Initialize state needed for calling handshakers.
203 acceptor_ = acceptor;
204 GRPC_CLOSURE_INIT(&call_next_handshaker_,
205 &HandshakeManager::CallNextHandshakerFn, this,
206 grpc_schedule_on_exec_ctx);
207 GRPC_CLOSURE_INIT(&on_handshake_done_, on_handshake_done, &args_,
208 grpc_schedule_on_exec_ctx);
209 // Start deadline timer, which owns a ref.
210 const Duration time_to_deadline = deadline - Timestamp::Now();
211 event_engine_ = args_.args.GetObjectRef<EventEngine>();
212 deadline_timer_handle_ =
213 event_engine_->RunAfter(time_to_deadline, [self = Ref()]() mutable {
214 ApplicationCallbackExecCtx callback_exec_ctx;
215 ExecCtx exec_ctx;
216 self->Shutdown(GRPC_ERROR_CREATE("Handshake timed out"));
217 // HandshakeManager deletion might require an active ExecCtx.
218 self.reset();
219 });
220 // Start first handshaker, which also owns a ref.
221 Ref().release();
222 done = CallNextHandshakerLocked(absl::OkStatus());
223 }
224 if (done) {
225 Unref();
226 }
227 }
228
229 } // namespace grpc_core
230