1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include "absl/strings/str_format.h"
24
25 #include <grpc/impl/codegen/slice.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/string_util.h>
29
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/channel/handshaker.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/iomgr/timer.h"
34 #include "src/core/lib/slice/slice_internal.h"
35
36 namespace grpc_core {
37
38 TraceFlag grpc_handshaker_trace(false, "handshaker");
39
40 namespace {
41
HandshakerArgsString(HandshakerArgs * args)42 std::string HandshakerArgsString(HandshakerArgs* args) {
43 size_t num_args = args->args != nullptr ? args->args->num_args : 0;
44 size_t read_buffer_length =
45 args->read_buffer != nullptr ? args->read_buffer->length : 0;
46 return absl::StrFormat(
47 "{endpoint=%p, args=%p {size=%" PRIuPTR
48 ": %s}, read_buffer=%p (length=%" PRIuPTR "), exit_early=%d}",
49 args->endpoint, args->args, num_args,
50 grpc_channel_args_string(args->args), args->read_buffer,
51 read_buffer_length, args->exit_early);
52 }
53
54 } // namespace
55
HandshakeManager()56 HandshakeManager::HandshakeManager() {}
57
Add(RefCountedPtr<Handshaker> handshaker)58 void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
59 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
60 gpr_log(
61 GPR_INFO,
62 "handshake_manager %p: adding handshaker %s [%p] at index %" PRIuPTR,
63 this, handshaker->name(), handshaker.get(), handshakers_.size());
64 }
65 MutexLock lock(&mu_);
66 handshakers_.push_back(std::move(handshaker));
67 }
68
~HandshakeManager()69 HandshakeManager::~HandshakeManager() { handshakers_.clear(); }
70
Shutdown(grpc_error_handle why)71 void HandshakeManager::Shutdown(grpc_error_handle why) {
72 {
73 MutexLock lock(&mu_);
74 // Shutdown the handshaker that's currently in progress, if any.
75 if (!is_shutdown_ && index_ > 0) {
76 is_shutdown_ = true;
77 handshakers_[index_ - 1]->Shutdown(GRPC_ERROR_REF(why));
78 }
79 }
80 GRPC_ERROR_UNREF(why);
81 }
82
83 // Helper function to call either the next handshaker or the
84 // on_handshake_done callback.
85 // Returns true if we've scheduled the on_handshake_done callback.
CallNextHandshakerLocked(grpc_error_handle error)86 bool HandshakeManager::CallNextHandshakerLocked(grpc_error_handle error) {
87 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
88 gpr_log(GPR_INFO,
89 "handshake_manager %p: error=%s shutdown=%d index=%" PRIuPTR
90 ", args=%s",
91 this, grpc_error_std_string(error).c_str(), is_shutdown_, index_,
92 HandshakerArgsString(&args_).c_str());
93 }
94 GPR_ASSERT(index_ <= handshakers_.size());
95 // If we got an error or we've been shut down or we're exiting early or
96 // we've finished the last handshaker, invoke the on_handshake_done
97 // callback. Otherwise, call the next handshaker.
98 if (error != GRPC_ERROR_NONE || is_shutdown_ || args_.exit_early ||
99 index_ == handshakers_.size()) {
100 if (error == GRPC_ERROR_NONE && is_shutdown_) {
101 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("handshaker shutdown");
102 // It is possible that the endpoint has already been destroyed by
103 // a shutdown call while this callback was sitting on the ExecCtx
104 // with no error.
105 if (args_.endpoint != nullptr) {
106 // TODO(roth): It is currently necessary to shutdown endpoints
107 // before destroying then, even when we know that there are no
108 // pending read/write callbacks. This should be fixed, at which
109 // point this can be removed.
110 grpc_endpoint_shutdown(args_.endpoint, GRPC_ERROR_REF(error));
111 grpc_endpoint_destroy(args_.endpoint);
112 args_.endpoint = nullptr;
113 grpc_channel_args_destroy(args_.args);
114 args_.args = nullptr;
115 grpc_slice_buffer_destroy_internal(args_.read_buffer);
116 gpr_free(args_.read_buffer);
117 args_.read_buffer = nullptr;
118 }
119 }
120 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
121 gpr_log(GPR_INFO,
122 "handshake_manager %p: handshaking complete -- scheduling "
123 "on_handshake_done with error=%s",
124 this, grpc_error_std_string(error).c_str());
125 }
126 // Cancel deadline timer, since we're invoking the on_handshake_done
127 // callback now.
128 grpc_timer_cancel(&deadline_timer_);
129 ExecCtx::Run(DEBUG_LOCATION, &on_handshake_done_, error);
130 is_shutdown_ = true;
131 } else {
132 auto handshaker = handshakers_[index_];
133 if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
134 gpr_log(
135 GPR_INFO,
136 "handshake_manager %p: calling handshaker %s [%p] at index %" PRIuPTR,
137 this, handshaker->name(), handshaker.get(), index_);
138 }
139 handshaker->DoHandshake(acceptor_, &call_next_handshaker_, &args_);
140 }
141 ++index_;
142 return is_shutdown_;
143 }
144
CallNextHandshakerFn(void * arg,grpc_error_handle error)145 void HandshakeManager::CallNextHandshakerFn(void* arg,
146 grpc_error_handle error) {
147 auto* mgr = static_cast<HandshakeManager*>(arg);
148 bool done;
149 {
150 MutexLock lock(&mgr->mu_);
151 done = mgr->CallNextHandshakerLocked(GRPC_ERROR_REF(error));
152 }
153 // If we're invoked the final callback, we won't be coming back
154 // to this function, so we can release our reference to the
155 // handshake manager.
156 if (done) {
157 mgr->Unref();
158 }
159 }
160
OnTimeoutFn(void * arg,grpc_error_handle error)161 void HandshakeManager::OnTimeoutFn(void* arg, grpc_error_handle error) {
162 auto* mgr = static_cast<HandshakeManager*>(arg);
163 if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled
164 mgr->Shutdown(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake timed out"));
165 }
166 mgr->Unref();
167 }
168
DoHandshake(grpc_endpoint * endpoint,const grpc_channel_args * channel_args,grpc_millis deadline,grpc_tcp_server_acceptor * acceptor,grpc_iomgr_cb_func on_handshake_done,void * user_data)169 void HandshakeManager::DoHandshake(grpc_endpoint* endpoint,
170 const grpc_channel_args* channel_args,
171 grpc_millis deadline,
172 grpc_tcp_server_acceptor* acceptor,
173 grpc_iomgr_cb_func on_handshake_done,
174 void* user_data) {
175 bool done;
176 {
177 MutexLock lock(&mu_);
178 GPR_ASSERT(index_ == 0);
179 // Construct handshaker args. These will be passed through all
180 // handshakers and eventually be freed by the on_handshake_done callback.
181 args_.endpoint = endpoint;
182 args_.args = grpc_channel_args_copy(channel_args);
183 args_.user_data = user_data;
184 args_.read_buffer =
185 static_cast<grpc_slice_buffer*>(gpr_malloc(sizeof(*args_.read_buffer)));
186 grpc_slice_buffer_init(args_.read_buffer);
187 if (acceptor != nullptr && acceptor->external_connection &&
188 acceptor->pending_data != nullptr) {
189 grpc_slice_buffer_swap(args_.read_buffer,
190 &(acceptor->pending_data->data.raw.slice_buffer));
191 }
192 // Initialize state needed for calling handshakers.
193 acceptor_ = acceptor;
194 GRPC_CLOSURE_INIT(&call_next_handshaker_,
195 &HandshakeManager::CallNextHandshakerFn, this,
196 grpc_schedule_on_exec_ctx);
197 GRPC_CLOSURE_INIT(&on_handshake_done_, on_handshake_done, &args_,
198 grpc_schedule_on_exec_ctx);
199 // Start deadline timer, which owns a ref.
200 Ref().release();
201 GRPC_CLOSURE_INIT(&on_timeout_, &HandshakeManager::OnTimeoutFn, this,
202 grpc_schedule_on_exec_ctx);
203 grpc_timer_init(&deadline_timer_, deadline, &on_timeout_);
204 // Start first handshaker, which also owns a ref.
205 Ref().release();
206 done = CallNextHandshakerLocked(GRPC_ERROR_NONE);
207 }
208 if (done) {
209 Unref();
210 }
211 }
212
213 } // namespace grpc_core
214
grpc_handshake_manager_add(grpc_handshake_manager * mgr,grpc_handshaker * handshaker)215 void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
216 grpc_handshaker* handshaker) {
217 // This is a transition method to aid the API change for handshakers.
218 grpc_core::RefCountedPtr<grpc_core::Handshaker> refd_hs(
219 static_cast<grpc_core::Handshaker*>(handshaker));
220 mgr->Add(refd_hs);
221 }
222