• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/handshaker/security/secure_endpoint.h"
20 
21 #include <grpc/event_engine/memory_allocator.h>
22 #include <grpc/event_engine/memory_request.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/port_platform.h>
28 #include <grpc/support/sync.h>
29 #include <inttypes.h>
30 
31 #include <algorithm>
32 #include <atomic>
33 #include <memory>
34 #include <utility>
35 
36 #include "absl/base/thread_annotations.h"
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/status/status.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/iomgr/closure.h"
44 #include "src/core/lib/iomgr/endpoint.h"
45 #include "src/core/lib/iomgr/error.h"
46 #include "src/core/lib/iomgr/exec_ctx.h"
47 #include "src/core/lib/iomgr/iomgr_fwd.h"
48 #include "src/core/lib/resource_quota/api.h"
49 #include "src/core/lib/resource_quota/memory_quota.h"
50 #include "src/core/lib/resource_quota/resource_quota.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/slice/slice_string_helpers.h"
53 #include "src/core/tsi/transport_security_grpc.h"
54 #include "src/core/tsi/transport_security_interface.h"
55 #include "src/core/util/debug_location.h"
56 #include "src/core/util/orphanable.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/string.h"
59 #include "src/core/util/sync.h"
60 
61 #define STAGING_BUFFER_SIZE 8192
62 
63 static void on_read(void* user_data, grpc_error_handle error);
64 static void on_write(void* user_data, grpc_error_handle error);
65 
66 namespace {
67 struct secure_endpoint : public grpc_endpoint {
secure_endpoint__anon51bdef380111::secure_endpoint68   secure_endpoint(const grpc_endpoint_vtable* vtbl,
69                   tsi_frame_protector* protector,
70                   tsi_zero_copy_grpc_protector* zero_copy_protector,
71                   grpc_core::OrphanablePtr<grpc_endpoint> endpoint,
72                   grpc_slice* leftover_slices,
73                   const grpc_channel_args* channel_args,
74                   size_t leftover_nslices)
75       : wrapped_ep(std::move(endpoint)),
76         protector(protector),
77         zero_copy_protector(zero_copy_protector) {
78     this->vtable = vtbl;
79     gpr_mu_init(&protector_mu);
80     GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
81     GRPC_CLOSURE_INIT(&on_write, ::on_write, this, grpc_schedule_on_exec_ctx);
82     grpc_slice_buffer_init(&source_buffer);
83     grpc_slice_buffer_init(&leftover_bytes);
84     for (size_t i = 0; i < leftover_nslices; i++) {
85       grpc_slice_buffer_add(&leftover_bytes,
86                             grpc_core::CSliceRef(leftover_slices[i]));
87     }
88     grpc_slice_buffer_init(&output_buffer);
89     memory_owner = grpc_core::ResourceQuotaFromChannelArgs(channel_args)
90                        ->memory_quota()
91                        ->CreateMemoryOwner();
92     self_reservation = memory_owner.MakeReservation(sizeof(*this));
93     if (zero_copy_protector) {
94       read_staging_buffer = grpc_empty_slice();
95       write_staging_buffer = grpc_empty_slice();
96     } else {
97       read_staging_buffer =
98           memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
99       write_staging_buffer =
100           memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
101     }
102     has_posted_reclaimer.store(false, std::memory_order_relaxed);
103     min_progress_size = 1;
104     grpc_slice_buffer_init(&protector_staging_buffer);
105     gpr_ref_init(&ref, 1);
106   }
107 
~secure_endpoint__anon51bdef380111::secure_endpoint108   ~secure_endpoint() {
109     tsi_frame_protector_destroy(protector);
110     tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
111     grpc_slice_buffer_destroy(&source_buffer);
112     grpc_slice_buffer_destroy(&leftover_bytes);
113     grpc_core::CSliceUnref(read_staging_buffer);
114     grpc_core::CSliceUnref(write_staging_buffer);
115     grpc_slice_buffer_destroy(&output_buffer);
116     grpc_slice_buffer_destroy(&protector_staging_buffer);
117     gpr_mu_destroy(&protector_mu);
118   }
119 
120   grpc_core::OrphanablePtr<grpc_endpoint> wrapped_ep;
121   struct tsi_frame_protector* protector;
122   struct tsi_zero_copy_grpc_protector* zero_copy_protector;
123   gpr_mu protector_mu;
124   grpc_core::Mutex read_mu;
125   grpc_core::Mutex write_mu;
126   // saved upper level callbacks and user_data.
127   grpc_closure* read_cb = nullptr;
128   grpc_closure* write_cb = nullptr;
129   grpc_closure on_read;
130   grpc_closure on_write;
131   grpc_slice_buffer* read_buffer = nullptr;
132   grpc_slice_buffer source_buffer;
133   // saved handshaker leftover data to unprotect.
134   grpc_slice_buffer leftover_bytes;
135   // buffers for read and write
136   grpc_slice read_staging_buffer ABSL_GUARDED_BY(read_mu);
137   grpc_slice write_staging_buffer ABSL_GUARDED_BY(write_mu);
138   grpc_slice_buffer output_buffer;
139   grpc_core::MemoryOwner memory_owner;
140   grpc_core::MemoryAllocator::Reservation self_reservation;
141   std::atomic<bool> has_posted_reclaimer;
142   int min_progress_size;
143   grpc_slice_buffer protector_staging_buffer;
144   gpr_refcount ref;
145 };
146 }  // namespace
147 
destroy(secure_endpoint * ep)148 static void destroy(secure_endpoint* ep) { delete ep; }
149 
150 #ifndef NDEBUG
151 #define SECURE_ENDPOINT_UNREF(ep, reason) \
152   secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
153 #define SECURE_ENDPOINT_REF(ep, reason) \
154   secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
secure_endpoint_unref(secure_endpoint * ep,const char * reason,const char * file,int line)155 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
156                                   const char* file, int line) {
157   if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint)) {
158     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
159     VLOG(2).AtLocation(file, line) << "SECENDP unref " << ep << " : " << reason
160                                    << " " << val << " -> " << val - 1;
161   }
162   if (gpr_unref(&ep->ref)) {
163     destroy(ep);
164   }
165 }
166 
secure_endpoint_ref(secure_endpoint * ep,const char * reason,const char * file,int line)167 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
168                                 const char* file, int line) {
169   if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint)) {
170     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
171     VLOG(2).AtLocation(file, line) << "SECENDP   ref " << ep << " : " << reason
172                                    << " " << val << " -> " << val + 1;
173   }
174   gpr_ref(&ep->ref);
175 }
176 #else
177 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
178 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
secure_endpoint_unref(secure_endpoint * ep)179 static void secure_endpoint_unref(secure_endpoint* ep) {
180   if (gpr_unref(&ep->ref)) {
181     destroy(ep);
182   }
183 }
184 
secure_endpoint_ref(secure_endpoint * ep)185 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
186 #endif
187 
maybe_post_reclaimer(secure_endpoint * ep)188 static void maybe_post_reclaimer(secure_endpoint* ep) {
189   if (!ep->has_posted_reclaimer) {
190     SECURE_ENDPOINT_REF(ep, "benign_reclaimer");
191     ep->has_posted_reclaimer.exchange(true, std::memory_order_relaxed);
192     ep->memory_owner.PostReclaimer(
193         grpc_core::ReclamationPass::kBenign,
194         [ep](absl::optional<grpc_core::ReclamationSweep> sweep) {
195           if (sweep.has_value()) {
196             GRPC_TRACE_LOG(resource_quota, INFO)
197                 << "secure endpoint: benign reclamation to free memory";
198             grpc_slice temp_read_slice;
199             grpc_slice temp_write_slice;
200 
201             ep->read_mu.Lock();
202             temp_read_slice = ep->read_staging_buffer;
203             ep->read_staging_buffer = grpc_empty_slice();
204             ep->read_mu.Unlock();
205 
206             ep->write_mu.Lock();
207             temp_write_slice = ep->write_staging_buffer;
208             ep->write_staging_buffer = grpc_empty_slice();
209             ep->write_mu.Unlock();
210 
211             grpc_core::CSliceUnref(temp_read_slice);
212             grpc_core::CSliceUnref(temp_write_slice);
213             ep->has_posted_reclaimer.exchange(false, std::memory_order_relaxed);
214           }
215           SECURE_ENDPOINT_UNREF(ep, "benign_reclaimer");
216         });
217   }
218 }
219 
flush_read_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)220 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
221                                       uint8_t** end)
222     ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->read_mu) {
223   grpc_slice_buffer_add_indexed(ep->read_buffer, ep->read_staging_buffer);
224   ep->read_staging_buffer =
225       ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
226   *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
227   *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
228 }
229 
call_read_cb(secure_endpoint * ep,grpc_error_handle error)230 static void call_read_cb(secure_endpoint* ep, grpc_error_handle error) {
231   if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint) && ABSL_VLOG_IS_ON(2)) {
232     size_t i;
233     for (i = 0; i < ep->read_buffer->count; i++) {
234       char* data = grpc_dump_slice(ep->read_buffer->slices[i],
235                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
236       VLOG(2) << "READ " << ep << ": " << data;
237       gpr_free(data);
238     }
239   }
240   ep->read_buffer = nullptr;
241   grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
242   SECURE_ENDPOINT_UNREF(ep, "read");
243 }
244 
on_read(void * user_data,grpc_error_handle error)245 static void on_read(void* user_data, grpc_error_handle error) {
246   unsigned i;
247   uint8_t keep_looping = 0;
248   tsi_result result = TSI_OK;
249   secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
250 
251   {
252     grpc_core::MutexLock l(&ep->read_mu);
253 
254     // If we were shut down after this callback was scheduled with OK
255     // status but before it was invoked, we need to treat that as an error.
256     if (ep->wrapped_ep == nullptr && error.ok()) {
257       error = absl::CancelledError("secure endpoint shutdown");
258     }
259 
260     uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
261     uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
262 
263     if (!error.ok()) {
264       grpc_slice_buffer_reset_and_unref(ep->read_buffer);
265     } else if (ep->zero_copy_protector != nullptr) {
266       // Use zero-copy grpc protector to unprotect.
267       int min_progress_size = 1;
268       // Get the size of the last frame which is not yet fully decrypted.
269       // This estimated frame size is stored in ep->min_progress_size which is
270       // passed to the TCP layer to indicate the minimum number of
271       // bytes that need to be read to make meaningful progress. This would
272       // avoid reading of small slices from the network.
273       // TODO(vigneshbabu): Set min_progress_size in the regular (non-zero-copy)
274       // frame protector code path as well.
275       result = tsi_zero_copy_grpc_protector_unprotect(
276           ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer,
277           &min_progress_size);
278       min_progress_size = std::max(1, min_progress_size);
279       ep->min_progress_size = result != TSI_OK ? 1 : min_progress_size;
280     } else {
281       // Use frame protector to unprotect.
282       // TODO(yangg) check error, maybe bail out early
283       for (i = 0; i < ep->source_buffer.count; i++) {
284         grpc_slice encrypted = ep->source_buffer.slices[i];
285         uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
286         size_t message_size = GRPC_SLICE_LENGTH(encrypted);
287 
288         while (message_size > 0 || keep_looping) {
289           size_t unprotected_buffer_size_written =
290               static_cast<size_t>(end - cur);
291           size_t processed_message_size = message_size;
292           gpr_mu_lock(&ep->protector_mu);
293           result = tsi_frame_protector_unprotect(
294               ep->protector, message_bytes, &processed_message_size, cur,
295               &unprotected_buffer_size_written);
296           gpr_mu_unlock(&ep->protector_mu);
297           if (result != TSI_OK) {
298             LOG(ERROR) << "Decryption error: " << tsi_result_to_string(result);
299             break;
300           }
301           message_bytes += processed_message_size;
302           message_size -= processed_message_size;
303           cur += unprotected_buffer_size_written;
304 
305           if (cur == end) {
306             flush_read_staging_buffer(ep, &cur, &end);
307             // Force to enter the loop again to extract buffered bytes in
308             // protector. The bytes could be buffered because of running out of
309             // staging_buffer. If this happens at the end of all slices, doing
310             // another unprotect avoids leaving data in the protector.
311             keep_looping = 1;
312           } else if (unprotected_buffer_size_written > 0) {
313             keep_looping = 1;
314           } else {
315             keep_looping = 0;
316           }
317         }
318         if (result != TSI_OK) break;
319       }
320 
321       if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
322         grpc_slice_buffer_add(
323             ep->read_buffer,
324             grpc_slice_split_head(
325                 &ep->read_staging_buffer,
326                 static_cast<size_t>(
327                     cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
328       }
329     }
330   }
331 
332   if (!error.ok()) {
333     call_read_cb(
334         ep, GRPC_ERROR_CREATE_REFERENCING("Secure read failed", &error, 1));
335     return;
336   }
337 
338   // TODO(yangg) experiment with moving this block after read_cb to see if it
339   // helps latency
340   grpc_slice_buffer_reset_and_unref(&ep->source_buffer);
341 
342   if (result != TSI_OK) {
343     grpc_slice_buffer_reset_and_unref(ep->read_buffer);
344     call_read_cb(
345         ep, GRPC_ERROR_CREATE(absl::StrCat("Unwrap failed (",
346                                            tsi_result_to_string(result), ")")));
347     return;
348   }
349 
350   call_read_cb(ep, absl::OkStatus());
351 }
352 
endpoint_read(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent,int)353 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
354                           grpc_closure* cb, bool urgent,
355                           int /*min_progress_size*/) {
356   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
357   ep->read_cb = cb;
358   ep->read_buffer = slices;
359   grpc_slice_buffer_reset_and_unref(ep->read_buffer);
360 
361   SECURE_ENDPOINT_REF(ep, "read");
362   if (ep->leftover_bytes.count) {
363     grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
364     CHECK_EQ(ep->leftover_bytes.count, 0u);
365     on_read(ep, absl::OkStatus());
366     return;
367   }
368 
369   grpc_endpoint_read(ep->wrapped_ep.get(), &ep->source_buffer, &ep->on_read,
370                      urgent, /*min_progress_size=*/ep->min_progress_size);
371 }
372 
flush_write_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)373 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
374                                        uint8_t** end)
375     ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->write_mu) {
376   grpc_slice_buffer_add_indexed(&ep->output_buffer, ep->write_staging_buffer);
377   ep->write_staging_buffer =
378       ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
379   *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
380   *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
381   maybe_post_reclaimer(ep);
382 }
383 
on_write(void * user_data,grpc_error_handle error)384 static void on_write(void* user_data, grpc_error_handle error) {
385   secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
386   grpc_closure* cb = ep->write_cb;
387   ep->write_cb = nullptr;
388   SECURE_ENDPOINT_UNREF(ep, "write");
389   grpc_core::EnsureRunInExecCtx([cb, error = std::move(error)]() {
390     grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
391   });
392 }
393 
endpoint_write(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)394 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
395                            grpc_closure* cb, void* arg, int max_frame_size) {
396   GRPC_LATENT_SEE_INNER_SCOPE("secure_endpoint write");
397   unsigned i;
398   tsi_result result = TSI_OK;
399   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
400 
401   {
402     grpc_core::MutexLock l(&ep->write_mu);
403     uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
404     uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
405 
406     grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
407 
408     if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint) && ABSL_VLOG_IS_ON(2)) {
409       for (i = 0; i < slices->count; i++) {
410         char* data =
411             grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
412         VLOG(2) << "WRITE " << ep << ": " << data;
413         gpr_free(data);
414       }
415     }
416 
417     if (ep->zero_copy_protector != nullptr) {
418       // Use zero-copy grpc protector to protect.
419       result = TSI_OK;
420       // Break the input slices into chunks of size = max_frame_size and call
421       // tsi_zero_copy_grpc_protector_protect on each chunk. This ensures that
422       // the protector cannot create frames larger than the specified
423       // max_frame_size.
424       while (slices->length > static_cast<size_t>(max_frame_size) &&
425              result == TSI_OK) {
426         grpc_slice_buffer_move_first(slices,
427                                      static_cast<size_t>(max_frame_size),
428                                      &ep->protector_staging_buffer);
429         result = tsi_zero_copy_grpc_protector_protect(
430             ep->zero_copy_protector, &ep->protector_staging_buffer,
431             &ep->output_buffer);
432       }
433       if (result == TSI_OK && slices->length > 0) {
434         result = tsi_zero_copy_grpc_protector_protect(
435             ep->zero_copy_protector, slices, &ep->output_buffer);
436       }
437       grpc_slice_buffer_reset_and_unref(&ep->protector_staging_buffer);
438     } else {
439       // Use frame protector to protect.
440       for (i = 0; i < slices->count; i++) {
441         grpc_slice plain = slices->slices[i];
442         uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
443         size_t message_size = GRPC_SLICE_LENGTH(plain);
444         while (message_size > 0) {
445           size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
446           size_t processed_message_size = message_size;
447           gpr_mu_lock(&ep->protector_mu);
448           result = tsi_frame_protector_protect(ep->protector, message_bytes,
449                                                &processed_message_size, cur,
450                                                &protected_buffer_size_to_send);
451           gpr_mu_unlock(&ep->protector_mu);
452           if (result != TSI_OK) {
453             LOG(ERROR) << "Encryption error: " << tsi_result_to_string(result);
454             break;
455           }
456           message_bytes += processed_message_size;
457           message_size -= processed_message_size;
458           cur += protected_buffer_size_to_send;
459 
460           if (cur == end) {
461             flush_write_staging_buffer(ep, &cur, &end);
462           }
463         }
464         if (result != TSI_OK) break;
465       }
466       if (result == TSI_OK) {
467         size_t still_pending_size;
468         do {
469           size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
470           gpr_mu_lock(&ep->protector_mu);
471           result = tsi_frame_protector_protect_flush(
472               ep->protector, cur, &protected_buffer_size_to_send,
473               &still_pending_size);
474           gpr_mu_unlock(&ep->protector_mu);
475           if (result != TSI_OK) break;
476           cur += protected_buffer_size_to_send;
477           if (cur == end) {
478             flush_write_staging_buffer(ep, &cur, &end);
479           }
480         } while (still_pending_size > 0);
481         if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
482           grpc_slice_buffer_add(
483               &ep->output_buffer,
484               grpc_slice_split_head(
485                   &ep->write_staging_buffer,
486                   static_cast<size_t>(
487                       cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
488         }
489       }
490     }
491   }
492 
493   if (result != TSI_OK) {
494     // TODO(yangg) do different things according to the error type?
495     grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
496     grpc_core::ExecCtx::Run(
497         DEBUG_LOCATION, cb,
498         GRPC_ERROR_CREATE(
499             absl::StrCat("Wrap failed (", tsi_result_to_string(result), ")")));
500     return;
501   }
502 
503   // Need to hold a ref here, because the wrapped endpoint may access
504   // output_buffer at any time until the write completes.
505   SECURE_ENDPOINT_REF(ep, "write");
506   ep->write_cb = cb;
507   grpc_endpoint_write(ep->wrapped_ep.get(), &ep->output_buffer, &ep->on_write,
508                       arg, max_frame_size);
509 }
510 
endpoint_destroy(grpc_endpoint * secure_ep)511 static void endpoint_destroy(grpc_endpoint* secure_ep) {
512   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
513   ep->read_mu.Lock();
514   ep->wrapped_ep.reset();
515   ep->memory_owner.Reset();
516   ep->read_mu.Unlock();
517   SECURE_ENDPOINT_UNREF(ep, "destroy");
518 }
519 
endpoint_add_to_pollset(grpc_endpoint * secure_ep,grpc_pollset * pollset)520 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
521                                     grpc_pollset* pollset) {
522   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
523   grpc_endpoint_add_to_pollset(ep->wrapped_ep.get(), pollset);
524 }
525 
endpoint_add_to_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)526 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
527                                         grpc_pollset_set* pollset_set) {
528   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
529   grpc_endpoint_add_to_pollset_set(ep->wrapped_ep.get(), pollset_set);
530 }
531 
endpoint_delete_from_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)532 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
533                                              grpc_pollset_set* pollset_set) {
534   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
535   grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep.get(), pollset_set);
536 }
537 
endpoint_get_peer(grpc_endpoint * secure_ep)538 static absl::string_view endpoint_get_peer(grpc_endpoint* secure_ep) {
539   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
540   return grpc_endpoint_get_peer(ep->wrapped_ep.get());
541 }
542 
endpoint_get_local_address(grpc_endpoint * secure_ep)543 static absl::string_view endpoint_get_local_address(grpc_endpoint* secure_ep) {
544   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
545   return grpc_endpoint_get_local_address(ep->wrapped_ep.get());
546 }
547 
endpoint_get_fd(grpc_endpoint * secure_ep)548 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
549   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
550   return grpc_endpoint_get_fd(ep->wrapped_ep.get());
551 }
552 
endpoint_can_track_err(grpc_endpoint * secure_ep)553 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
554   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
555   return grpc_endpoint_can_track_err(ep->wrapped_ep.get());
556 }
557 
558 static const grpc_endpoint_vtable vtable = {endpoint_read,
559                                             endpoint_write,
560                                             endpoint_add_to_pollset,
561                                             endpoint_add_to_pollset_set,
562                                             endpoint_delete_from_pollset_set,
563                                             endpoint_destroy,
564                                             endpoint_get_peer,
565                                             endpoint_get_local_address,
566                                             endpoint_get_fd,
567                                             endpoint_can_track_err};
568 
grpc_secure_endpoint_create(struct tsi_frame_protector * protector,struct tsi_zero_copy_grpc_protector * zero_copy_protector,grpc_core::OrphanablePtr<grpc_endpoint> to_wrap,grpc_slice * leftover_slices,const grpc_channel_args * channel_args,size_t leftover_nslices)569 grpc_core::OrphanablePtr<grpc_endpoint> grpc_secure_endpoint_create(
570     struct tsi_frame_protector* protector,
571     struct tsi_zero_copy_grpc_protector* zero_copy_protector,
572     grpc_core::OrphanablePtr<grpc_endpoint> to_wrap,
573     grpc_slice* leftover_slices, const grpc_channel_args* channel_args,
574     size_t leftover_nslices) {
575   return grpc_core::MakeOrphanable<secure_endpoint>(
576       &vtable, protector, zero_copy_protector, std::move(to_wrap),
577       leftover_slices, channel_args, leftover_nslices);
578 }
579