1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/handshaker/security/secure_endpoint.h"
20
21 #include <grpc/event_engine/memory_allocator.h>
22 #include <grpc/event_engine/memory_request.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/port_platform.h>
28 #include <grpc/support/sync.h>
29 #include <inttypes.h>
30
31 #include <algorithm>
32 #include <atomic>
33 #include <memory>
34 #include <utility>
35
36 #include "absl/base/thread_annotations.h"
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/status/status.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/iomgr/closure.h"
44 #include "src/core/lib/iomgr/endpoint.h"
45 #include "src/core/lib/iomgr/error.h"
46 #include "src/core/lib/iomgr/exec_ctx.h"
47 #include "src/core/lib/iomgr/iomgr_fwd.h"
48 #include "src/core/lib/resource_quota/api.h"
49 #include "src/core/lib/resource_quota/memory_quota.h"
50 #include "src/core/lib/resource_quota/resource_quota.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/slice/slice_string_helpers.h"
53 #include "src/core/tsi/transport_security_grpc.h"
54 #include "src/core/tsi/transport_security_interface.h"
55 #include "src/core/util/debug_location.h"
56 #include "src/core/util/orphanable.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/string.h"
59 #include "src/core/util/sync.h"
60
61 #define STAGING_BUFFER_SIZE 8192
62
63 static void on_read(void* user_data, grpc_error_handle error);
64 static void on_write(void* user_data, grpc_error_handle error);
65
66 namespace {
67 struct secure_endpoint : public grpc_endpoint {
secure_endpoint__anon51bdef380111::secure_endpoint68 secure_endpoint(const grpc_endpoint_vtable* vtbl,
69 tsi_frame_protector* protector,
70 tsi_zero_copy_grpc_protector* zero_copy_protector,
71 grpc_core::OrphanablePtr<grpc_endpoint> endpoint,
72 grpc_slice* leftover_slices,
73 const grpc_channel_args* channel_args,
74 size_t leftover_nslices)
75 : wrapped_ep(std::move(endpoint)),
76 protector(protector),
77 zero_copy_protector(zero_copy_protector) {
78 this->vtable = vtbl;
79 gpr_mu_init(&protector_mu);
80 GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
81 GRPC_CLOSURE_INIT(&on_write, ::on_write, this, grpc_schedule_on_exec_ctx);
82 grpc_slice_buffer_init(&source_buffer);
83 grpc_slice_buffer_init(&leftover_bytes);
84 for (size_t i = 0; i < leftover_nslices; i++) {
85 grpc_slice_buffer_add(&leftover_bytes,
86 grpc_core::CSliceRef(leftover_slices[i]));
87 }
88 grpc_slice_buffer_init(&output_buffer);
89 memory_owner = grpc_core::ResourceQuotaFromChannelArgs(channel_args)
90 ->memory_quota()
91 ->CreateMemoryOwner();
92 self_reservation = memory_owner.MakeReservation(sizeof(*this));
93 if (zero_copy_protector) {
94 read_staging_buffer = grpc_empty_slice();
95 write_staging_buffer = grpc_empty_slice();
96 } else {
97 read_staging_buffer =
98 memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
99 write_staging_buffer =
100 memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
101 }
102 has_posted_reclaimer.store(false, std::memory_order_relaxed);
103 min_progress_size = 1;
104 grpc_slice_buffer_init(&protector_staging_buffer);
105 gpr_ref_init(&ref, 1);
106 }
107
~secure_endpoint__anon51bdef380111::secure_endpoint108 ~secure_endpoint() {
109 tsi_frame_protector_destroy(protector);
110 tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
111 grpc_slice_buffer_destroy(&source_buffer);
112 grpc_slice_buffer_destroy(&leftover_bytes);
113 grpc_core::CSliceUnref(read_staging_buffer);
114 grpc_core::CSliceUnref(write_staging_buffer);
115 grpc_slice_buffer_destroy(&output_buffer);
116 grpc_slice_buffer_destroy(&protector_staging_buffer);
117 gpr_mu_destroy(&protector_mu);
118 }
119
120 grpc_core::OrphanablePtr<grpc_endpoint> wrapped_ep;
121 struct tsi_frame_protector* protector;
122 struct tsi_zero_copy_grpc_protector* zero_copy_protector;
123 gpr_mu protector_mu;
124 grpc_core::Mutex read_mu;
125 grpc_core::Mutex write_mu;
126 // saved upper level callbacks and user_data.
127 grpc_closure* read_cb = nullptr;
128 grpc_closure* write_cb = nullptr;
129 grpc_closure on_read;
130 grpc_closure on_write;
131 grpc_slice_buffer* read_buffer = nullptr;
132 grpc_slice_buffer source_buffer;
133 // saved handshaker leftover data to unprotect.
134 grpc_slice_buffer leftover_bytes;
135 // buffers for read and write
136 grpc_slice read_staging_buffer ABSL_GUARDED_BY(read_mu);
137 grpc_slice write_staging_buffer ABSL_GUARDED_BY(write_mu);
138 grpc_slice_buffer output_buffer;
139 grpc_core::MemoryOwner memory_owner;
140 grpc_core::MemoryAllocator::Reservation self_reservation;
141 std::atomic<bool> has_posted_reclaimer;
142 int min_progress_size;
143 grpc_slice_buffer protector_staging_buffer;
144 gpr_refcount ref;
145 };
146 } // namespace
147
destroy(secure_endpoint * ep)148 static void destroy(secure_endpoint* ep) { delete ep; }
149
150 #ifndef NDEBUG
151 #define SECURE_ENDPOINT_UNREF(ep, reason) \
152 secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
153 #define SECURE_ENDPOINT_REF(ep, reason) \
154 secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
secure_endpoint_unref(secure_endpoint * ep,const char * reason,const char * file,int line)155 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
156 const char* file, int line) {
157 if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint)) {
158 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
159 VLOG(2).AtLocation(file, line) << "SECENDP unref " << ep << " : " << reason
160 << " " << val << " -> " << val - 1;
161 }
162 if (gpr_unref(&ep->ref)) {
163 destroy(ep);
164 }
165 }
166
secure_endpoint_ref(secure_endpoint * ep,const char * reason,const char * file,int line)167 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
168 const char* file, int line) {
169 if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint)) {
170 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
171 VLOG(2).AtLocation(file, line) << "SECENDP ref " << ep << " : " << reason
172 << " " << val << " -> " << val + 1;
173 }
174 gpr_ref(&ep->ref);
175 }
176 #else
177 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
178 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
secure_endpoint_unref(secure_endpoint * ep)179 static void secure_endpoint_unref(secure_endpoint* ep) {
180 if (gpr_unref(&ep->ref)) {
181 destroy(ep);
182 }
183 }
184
secure_endpoint_ref(secure_endpoint * ep)185 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
186 #endif
187
maybe_post_reclaimer(secure_endpoint * ep)188 static void maybe_post_reclaimer(secure_endpoint* ep) {
189 if (!ep->has_posted_reclaimer) {
190 SECURE_ENDPOINT_REF(ep, "benign_reclaimer");
191 ep->has_posted_reclaimer.exchange(true, std::memory_order_relaxed);
192 ep->memory_owner.PostReclaimer(
193 grpc_core::ReclamationPass::kBenign,
194 [ep](absl::optional<grpc_core::ReclamationSweep> sweep) {
195 if (sweep.has_value()) {
196 GRPC_TRACE_LOG(resource_quota, INFO)
197 << "secure endpoint: benign reclamation to free memory";
198 grpc_slice temp_read_slice;
199 grpc_slice temp_write_slice;
200
201 ep->read_mu.Lock();
202 temp_read_slice = ep->read_staging_buffer;
203 ep->read_staging_buffer = grpc_empty_slice();
204 ep->read_mu.Unlock();
205
206 ep->write_mu.Lock();
207 temp_write_slice = ep->write_staging_buffer;
208 ep->write_staging_buffer = grpc_empty_slice();
209 ep->write_mu.Unlock();
210
211 grpc_core::CSliceUnref(temp_read_slice);
212 grpc_core::CSliceUnref(temp_write_slice);
213 ep->has_posted_reclaimer.exchange(false, std::memory_order_relaxed);
214 }
215 SECURE_ENDPOINT_UNREF(ep, "benign_reclaimer");
216 });
217 }
218 }
219
flush_read_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)220 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
221 uint8_t** end)
222 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->read_mu) {
223 grpc_slice_buffer_add_indexed(ep->read_buffer, ep->read_staging_buffer);
224 ep->read_staging_buffer =
225 ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
226 *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
227 *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
228 }
229
call_read_cb(secure_endpoint * ep,grpc_error_handle error)230 static void call_read_cb(secure_endpoint* ep, grpc_error_handle error) {
231 if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint) && ABSL_VLOG_IS_ON(2)) {
232 size_t i;
233 for (i = 0; i < ep->read_buffer->count; i++) {
234 char* data = grpc_dump_slice(ep->read_buffer->slices[i],
235 GPR_DUMP_HEX | GPR_DUMP_ASCII);
236 VLOG(2) << "READ " << ep << ": " << data;
237 gpr_free(data);
238 }
239 }
240 ep->read_buffer = nullptr;
241 grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
242 SECURE_ENDPOINT_UNREF(ep, "read");
243 }
244
on_read(void * user_data,grpc_error_handle error)245 static void on_read(void* user_data, grpc_error_handle error) {
246 unsigned i;
247 uint8_t keep_looping = 0;
248 tsi_result result = TSI_OK;
249 secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
250
251 {
252 grpc_core::MutexLock l(&ep->read_mu);
253
254 // If we were shut down after this callback was scheduled with OK
255 // status but before it was invoked, we need to treat that as an error.
256 if (ep->wrapped_ep == nullptr && error.ok()) {
257 error = absl::CancelledError("secure endpoint shutdown");
258 }
259
260 uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
261 uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
262
263 if (!error.ok()) {
264 grpc_slice_buffer_reset_and_unref(ep->read_buffer);
265 } else if (ep->zero_copy_protector != nullptr) {
266 // Use zero-copy grpc protector to unprotect.
267 int min_progress_size = 1;
268 // Get the size of the last frame which is not yet fully decrypted.
269 // This estimated frame size is stored in ep->min_progress_size which is
270 // passed to the TCP layer to indicate the minimum number of
271 // bytes that need to be read to make meaningful progress. This would
272 // avoid reading of small slices from the network.
273 // TODO(vigneshbabu): Set min_progress_size in the regular (non-zero-copy)
274 // frame protector code path as well.
275 result = tsi_zero_copy_grpc_protector_unprotect(
276 ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer,
277 &min_progress_size);
278 min_progress_size = std::max(1, min_progress_size);
279 ep->min_progress_size = result != TSI_OK ? 1 : min_progress_size;
280 } else {
281 // Use frame protector to unprotect.
282 // TODO(yangg) check error, maybe bail out early
283 for (i = 0; i < ep->source_buffer.count; i++) {
284 grpc_slice encrypted = ep->source_buffer.slices[i];
285 uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
286 size_t message_size = GRPC_SLICE_LENGTH(encrypted);
287
288 while (message_size > 0 || keep_looping) {
289 size_t unprotected_buffer_size_written =
290 static_cast<size_t>(end - cur);
291 size_t processed_message_size = message_size;
292 gpr_mu_lock(&ep->protector_mu);
293 result = tsi_frame_protector_unprotect(
294 ep->protector, message_bytes, &processed_message_size, cur,
295 &unprotected_buffer_size_written);
296 gpr_mu_unlock(&ep->protector_mu);
297 if (result != TSI_OK) {
298 LOG(ERROR) << "Decryption error: " << tsi_result_to_string(result);
299 break;
300 }
301 message_bytes += processed_message_size;
302 message_size -= processed_message_size;
303 cur += unprotected_buffer_size_written;
304
305 if (cur == end) {
306 flush_read_staging_buffer(ep, &cur, &end);
307 // Force to enter the loop again to extract buffered bytes in
308 // protector. The bytes could be buffered because of running out of
309 // staging_buffer. If this happens at the end of all slices, doing
310 // another unprotect avoids leaving data in the protector.
311 keep_looping = 1;
312 } else if (unprotected_buffer_size_written > 0) {
313 keep_looping = 1;
314 } else {
315 keep_looping = 0;
316 }
317 }
318 if (result != TSI_OK) break;
319 }
320
321 if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
322 grpc_slice_buffer_add(
323 ep->read_buffer,
324 grpc_slice_split_head(
325 &ep->read_staging_buffer,
326 static_cast<size_t>(
327 cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
328 }
329 }
330 }
331
332 if (!error.ok()) {
333 call_read_cb(
334 ep, GRPC_ERROR_CREATE_REFERENCING("Secure read failed", &error, 1));
335 return;
336 }
337
338 // TODO(yangg) experiment with moving this block after read_cb to see if it
339 // helps latency
340 grpc_slice_buffer_reset_and_unref(&ep->source_buffer);
341
342 if (result != TSI_OK) {
343 grpc_slice_buffer_reset_and_unref(ep->read_buffer);
344 call_read_cb(
345 ep, GRPC_ERROR_CREATE(absl::StrCat("Unwrap failed (",
346 tsi_result_to_string(result), ")")));
347 return;
348 }
349
350 call_read_cb(ep, absl::OkStatus());
351 }
352
endpoint_read(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent,int)353 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
354 grpc_closure* cb, bool urgent,
355 int /*min_progress_size*/) {
356 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
357 ep->read_cb = cb;
358 ep->read_buffer = slices;
359 grpc_slice_buffer_reset_and_unref(ep->read_buffer);
360
361 SECURE_ENDPOINT_REF(ep, "read");
362 if (ep->leftover_bytes.count) {
363 grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
364 CHECK_EQ(ep->leftover_bytes.count, 0u);
365 on_read(ep, absl::OkStatus());
366 return;
367 }
368
369 grpc_endpoint_read(ep->wrapped_ep.get(), &ep->source_buffer, &ep->on_read,
370 urgent, /*min_progress_size=*/ep->min_progress_size);
371 }
372
flush_write_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)373 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
374 uint8_t** end)
375 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->write_mu) {
376 grpc_slice_buffer_add_indexed(&ep->output_buffer, ep->write_staging_buffer);
377 ep->write_staging_buffer =
378 ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
379 *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
380 *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
381 maybe_post_reclaimer(ep);
382 }
383
on_write(void * user_data,grpc_error_handle error)384 static void on_write(void* user_data, grpc_error_handle error) {
385 secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
386 grpc_closure* cb = ep->write_cb;
387 ep->write_cb = nullptr;
388 SECURE_ENDPOINT_UNREF(ep, "write");
389 grpc_core::EnsureRunInExecCtx([cb, error = std::move(error)]() {
390 grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
391 });
392 }
393
endpoint_write(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)394 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
395 grpc_closure* cb, void* arg, int max_frame_size) {
396 GRPC_LATENT_SEE_INNER_SCOPE("secure_endpoint write");
397 unsigned i;
398 tsi_result result = TSI_OK;
399 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
400
401 {
402 grpc_core::MutexLock l(&ep->write_mu);
403 uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
404 uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
405
406 grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
407
408 if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint) && ABSL_VLOG_IS_ON(2)) {
409 for (i = 0; i < slices->count; i++) {
410 char* data =
411 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
412 VLOG(2) << "WRITE " << ep << ": " << data;
413 gpr_free(data);
414 }
415 }
416
417 if (ep->zero_copy_protector != nullptr) {
418 // Use zero-copy grpc protector to protect.
419 result = TSI_OK;
420 // Break the input slices into chunks of size = max_frame_size and call
421 // tsi_zero_copy_grpc_protector_protect on each chunk. This ensures that
422 // the protector cannot create frames larger than the specified
423 // max_frame_size.
424 while (slices->length > static_cast<size_t>(max_frame_size) &&
425 result == TSI_OK) {
426 grpc_slice_buffer_move_first(slices,
427 static_cast<size_t>(max_frame_size),
428 &ep->protector_staging_buffer);
429 result = tsi_zero_copy_grpc_protector_protect(
430 ep->zero_copy_protector, &ep->protector_staging_buffer,
431 &ep->output_buffer);
432 }
433 if (result == TSI_OK && slices->length > 0) {
434 result = tsi_zero_copy_grpc_protector_protect(
435 ep->zero_copy_protector, slices, &ep->output_buffer);
436 }
437 grpc_slice_buffer_reset_and_unref(&ep->protector_staging_buffer);
438 } else {
439 // Use frame protector to protect.
440 for (i = 0; i < slices->count; i++) {
441 grpc_slice plain = slices->slices[i];
442 uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
443 size_t message_size = GRPC_SLICE_LENGTH(plain);
444 while (message_size > 0) {
445 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
446 size_t processed_message_size = message_size;
447 gpr_mu_lock(&ep->protector_mu);
448 result = tsi_frame_protector_protect(ep->protector, message_bytes,
449 &processed_message_size, cur,
450 &protected_buffer_size_to_send);
451 gpr_mu_unlock(&ep->protector_mu);
452 if (result != TSI_OK) {
453 LOG(ERROR) << "Encryption error: " << tsi_result_to_string(result);
454 break;
455 }
456 message_bytes += processed_message_size;
457 message_size -= processed_message_size;
458 cur += protected_buffer_size_to_send;
459
460 if (cur == end) {
461 flush_write_staging_buffer(ep, &cur, &end);
462 }
463 }
464 if (result != TSI_OK) break;
465 }
466 if (result == TSI_OK) {
467 size_t still_pending_size;
468 do {
469 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
470 gpr_mu_lock(&ep->protector_mu);
471 result = tsi_frame_protector_protect_flush(
472 ep->protector, cur, &protected_buffer_size_to_send,
473 &still_pending_size);
474 gpr_mu_unlock(&ep->protector_mu);
475 if (result != TSI_OK) break;
476 cur += protected_buffer_size_to_send;
477 if (cur == end) {
478 flush_write_staging_buffer(ep, &cur, &end);
479 }
480 } while (still_pending_size > 0);
481 if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
482 grpc_slice_buffer_add(
483 &ep->output_buffer,
484 grpc_slice_split_head(
485 &ep->write_staging_buffer,
486 static_cast<size_t>(
487 cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
488 }
489 }
490 }
491 }
492
493 if (result != TSI_OK) {
494 // TODO(yangg) do different things according to the error type?
495 grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
496 grpc_core::ExecCtx::Run(
497 DEBUG_LOCATION, cb,
498 GRPC_ERROR_CREATE(
499 absl::StrCat("Wrap failed (", tsi_result_to_string(result), ")")));
500 return;
501 }
502
503 // Need to hold a ref here, because the wrapped endpoint may access
504 // output_buffer at any time until the write completes.
505 SECURE_ENDPOINT_REF(ep, "write");
506 ep->write_cb = cb;
507 grpc_endpoint_write(ep->wrapped_ep.get(), &ep->output_buffer, &ep->on_write,
508 arg, max_frame_size);
509 }
510
endpoint_destroy(grpc_endpoint * secure_ep)511 static void endpoint_destroy(grpc_endpoint* secure_ep) {
512 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
513 ep->read_mu.Lock();
514 ep->wrapped_ep.reset();
515 ep->memory_owner.Reset();
516 ep->read_mu.Unlock();
517 SECURE_ENDPOINT_UNREF(ep, "destroy");
518 }
519
endpoint_add_to_pollset(grpc_endpoint * secure_ep,grpc_pollset * pollset)520 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
521 grpc_pollset* pollset) {
522 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
523 grpc_endpoint_add_to_pollset(ep->wrapped_ep.get(), pollset);
524 }
525
endpoint_add_to_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)526 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
527 grpc_pollset_set* pollset_set) {
528 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
529 grpc_endpoint_add_to_pollset_set(ep->wrapped_ep.get(), pollset_set);
530 }
531
endpoint_delete_from_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)532 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
533 grpc_pollset_set* pollset_set) {
534 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
535 grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep.get(), pollset_set);
536 }
537
endpoint_get_peer(grpc_endpoint * secure_ep)538 static absl::string_view endpoint_get_peer(grpc_endpoint* secure_ep) {
539 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
540 return grpc_endpoint_get_peer(ep->wrapped_ep.get());
541 }
542
endpoint_get_local_address(grpc_endpoint * secure_ep)543 static absl::string_view endpoint_get_local_address(grpc_endpoint* secure_ep) {
544 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
545 return grpc_endpoint_get_local_address(ep->wrapped_ep.get());
546 }
547
endpoint_get_fd(grpc_endpoint * secure_ep)548 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
549 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
550 return grpc_endpoint_get_fd(ep->wrapped_ep.get());
551 }
552
endpoint_can_track_err(grpc_endpoint * secure_ep)553 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
554 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
555 return grpc_endpoint_can_track_err(ep->wrapped_ep.get());
556 }
557
558 static const grpc_endpoint_vtable vtable = {endpoint_read,
559 endpoint_write,
560 endpoint_add_to_pollset,
561 endpoint_add_to_pollset_set,
562 endpoint_delete_from_pollset_set,
563 endpoint_destroy,
564 endpoint_get_peer,
565 endpoint_get_local_address,
566 endpoint_get_fd,
567 endpoint_can_track_err};
568
grpc_secure_endpoint_create(struct tsi_frame_protector * protector,struct tsi_zero_copy_grpc_protector * zero_copy_protector,grpc_core::OrphanablePtr<grpc_endpoint> to_wrap,grpc_slice * leftover_slices,const grpc_channel_args * channel_args,size_t leftover_nslices)569 grpc_core::OrphanablePtr<grpc_endpoint> grpc_secure_endpoint_create(
570 struct tsi_frame_protector* protector,
571 struct tsi_zero_copy_grpc_protector* zero_copy_protector,
572 grpc_core::OrphanablePtr<grpc_endpoint> to_wrap,
573 grpc_slice* leftover_slices, const grpc_channel_args* channel_args,
574 size_t leftover_nslices) {
575 return grpc_core::MakeOrphanable<secure_endpoint>(
576 &vtable, protector, zero_copy_protector, std::move(to_wrap),
577 leftover_slices, channel_args, leftover_nslices);
578 }
579