• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include <grpc/support/port_platform.h>
24 
25 #include <new>
26 
27 #include "src/core/lib/iomgr/sockaddr.h"
28 
29 #include <grpc/slice.h>
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/memory.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/core/lib/security/transport/secure_endpoint.h"
39 #include "src/core/lib/security/transport/tsi_error.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/tsi/transport_security_grpc.h"
43 
44 #define STAGING_BUFFER_SIZE 8192
45 
46 static void on_read(void* user_data, grpc_error* error);
47 
48 namespace {
49 struct secure_endpoint {
secure_endpoint__anona6fcfb300111::secure_endpoint50   secure_endpoint(const grpc_endpoint_vtable* vtable,
51                   tsi_frame_protector* protector,
52                   tsi_zero_copy_grpc_protector* zero_copy_protector,
53                   grpc_endpoint* transport, grpc_slice* leftover_slices,
54                   size_t leftover_nslices)
55       : wrapped_ep(transport),
56         protector(protector),
57         zero_copy_protector(zero_copy_protector) {
58     base.vtable = vtable;
59     gpr_mu_init(&protector_mu);
60     GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
61     grpc_slice_buffer_init(&source_buffer);
62     grpc_slice_buffer_init(&leftover_bytes);
63     for (size_t i = 0; i < leftover_nslices; i++) {
64       grpc_slice_buffer_add(&leftover_bytes,
65                             grpc_slice_ref_internal(leftover_slices[i]));
66     }
67     grpc_slice_buffer_init(&output_buffer);
68     gpr_ref_init(&ref, 1);
69   }
70 
~secure_endpoint__anona6fcfb300111::secure_endpoint71   ~secure_endpoint() {
72     grpc_endpoint_destroy(wrapped_ep);
73     tsi_frame_protector_destroy(protector);
74     tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
75     grpc_slice_buffer_destroy_internal(&source_buffer);
76     grpc_slice_buffer_destroy_internal(&leftover_bytes);
77     grpc_slice_unref_internal(read_staging_buffer);
78     grpc_slice_unref_internal(write_staging_buffer);
79     grpc_slice_buffer_destroy_internal(&output_buffer);
80     gpr_mu_destroy(&protector_mu);
81   }
82 
83   grpc_endpoint base;
84   grpc_endpoint* wrapped_ep;
85   struct tsi_frame_protector* protector;
86   struct tsi_zero_copy_grpc_protector* zero_copy_protector;
87   gpr_mu protector_mu;
88   /* saved upper level callbacks and user_data. */
89   grpc_closure* read_cb = nullptr;
90   grpc_closure* write_cb = nullptr;
91   grpc_closure on_read;
92   grpc_slice_buffer* read_buffer = nullptr;
93   grpc_slice_buffer source_buffer;
94   /* saved handshaker leftover data to unprotect. */
95   grpc_slice_buffer leftover_bytes;
96   /* buffers for read and write */
97   grpc_slice read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
98   grpc_slice write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
99   grpc_slice_buffer output_buffer;
100 
101   gpr_refcount ref;
102 };
103 }  // namespace
104 
105 grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
106 
destroy(secure_endpoint * ep)107 static void destroy(secure_endpoint* ep) { delete ep; }
108 
109 #ifndef NDEBUG
110 #define SECURE_ENDPOINT_UNREF(ep, reason) \
111   secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
112 #define SECURE_ENDPOINT_REF(ep, reason) \
113   secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
secure_endpoint_unref(secure_endpoint * ep,const char * reason,const char * file,int line)114 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
115                                   const char* file, int line) {
116   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
117     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
118     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
119             "SECENDP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
120             val - 1);
121   }
122   if (gpr_unref(&ep->ref)) {
123     destroy(ep);
124   }
125 }
126 
secure_endpoint_ref(secure_endpoint * ep,const char * reason,const char * file,int line)127 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
128                                 const char* file, int line) {
129   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
130     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
131     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
132             "SECENDP   ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
133             val + 1);
134   }
135   gpr_ref(&ep->ref);
136 }
137 #else
138 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
139 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
secure_endpoint_unref(secure_endpoint * ep)140 static void secure_endpoint_unref(secure_endpoint* ep) {
141   if (gpr_unref(&ep->ref)) {
142     destroy(ep);
143   }
144 }
145 
secure_endpoint_ref(secure_endpoint * ep)146 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
147 #endif
148 
flush_read_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)149 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
150                                       uint8_t** end) {
151   grpc_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
152   ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
153   *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
154   *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
155 }
156 
call_read_cb(secure_endpoint * ep,grpc_error * error)157 static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
158   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
159     size_t i;
160     for (i = 0; i < ep->read_buffer->count; i++) {
161       char* data = grpc_dump_slice(ep->read_buffer->slices[i],
162                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
163       gpr_log(GPR_INFO, "READ %p: %s", ep, data);
164       gpr_free(data);
165     }
166   }
167   ep->read_buffer = nullptr;
168   grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
169   SECURE_ENDPOINT_UNREF(ep, "read");
170 }
171 
on_read(void * user_data,grpc_error * error)172 static void on_read(void* user_data, grpc_error* error) {
173   unsigned i;
174   uint8_t keep_looping = 0;
175   tsi_result result = TSI_OK;
176   secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
177   uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
178   uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
179 
180   if (error != GRPC_ERROR_NONE) {
181     grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
182     call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
183                          "Secure read failed", &error, 1));
184     return;
185   }
186 
187   if (ep->zero_copy_protector != nullptr) {
188     // Use zero-copy grpc protector to unprotect.
189     result = tsi_zero_copy_grpc_protector_unprotect(
190         ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
191   } else {
192     // Use frame protector to unprotect.
193     /* TODO(yangg) check error, maybe bail out early */
194     for (i = 0; i < ep->source_buffer.count; i++) {
195       grpc_slice encrypted = ep->source_buffer.slices[i];
196       uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
197       size_t message_size = GRPC_SLICE_LENGTH(encrypted);
198 
199       while (message_size > 0 || keep_looping) {
200         size_t unprotected_buffer_size_written = static_cast<size_t>(end - cur);
201         size_t processed_message_size = message_size;
202         gpr_mu_lock(&ep->protector_mu);
203         result = tsi_frame_protector_unprotect(
204             ep->protector, message_bytes, &processed_message_size, cur,
205             &unprotected_buffer_size_written);
206         gpr_mu_unlock(&ep->protector_mu);
207         if (result != TSI_OK) {
208           gpr_log(GPR_ERROR, "Decryption error: %s",
209                   tsi_result_to_string(result));
210           break;
211         }
212         message_bytes += processed_message_size;
213         message_size -= processed_message_size;
214         cur += unprotected_buffer_size_written;
215 
216         if (cur == end) {
217           flush_read_staging_buffer(ep, &cur, &end);
218           /* Force to enter the loop again to extract buffered bytes in
219              protector. The bytes could be buffered because of running out of
220              staging_buffer. If this happens at the end of all slices, doing
221              another unprotect avoids leaving data in the protector. */
222           keep_looping = 1;
223         } else if (unprotected_buffer_size_written > 0) {
224           keep_looping = 1;
225         } else {
226           keep_looping = 0;
227         }
228       }
229       if (result != TSI_OK) break;
230     }
231 
232     if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
233       grpc_slice_buffer_add(
234           ep->read_buffer,
235           grpc_slice_split_head(
236               &ep->read_staging_buffer,
237               static_cast<size_t>(
238                   cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
239     }
240   }
241 
242   /* TODO(yangg) experiment with moving this block after read_cb to see if it
243      helps latency */
244   grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer);
245 
246   if (result != TSI_OK) {
247     grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
248     call_read_cb(
249         ep, grpc_set_tsi_error_result(
250                 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result));
251     return;
252   }
253 
254   call_read_cb(ep, GRPC_ERROR_NONE);
255 }
256 
endpoint_read(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent)257 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
258                           grpc_closure* cb, bool urgent) {
259   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
260   ep->read_cb = cb;
261   ep->read_buffer = slices;
262   grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
263 
264   SECURE_ENDPOINT_REF(ep, "read");
265   if (ep->leftover_bytes.count) {
266     grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
267     GPR_ASSERT(ep->leftover_bytes.count == 0);
268     on_read(ep, GRPC_ERROR_NONE);
269     return;
270   }
271 
272   grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent);
273 }
274 
flush_write_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)275 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
276                                        uint8_t** end) {
277   grpc_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer);
278   ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
279   *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
280   *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
281 }
282 
endpoint_write(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)283 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
284                            grpc_closure* cb, void* arg) {
285   GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
286 
287   unsigned i;
288   tsi_result result = TSI_OK;
289   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
290   uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
291   uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
292 
293   grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
294 
295   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
296     for (i = 0; i < slices->count; i++) {
297       char* data =
298           grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
299       gpr_log(GPR_INFO, "WRITE %p: %s", ep, data);
300       gpr_free(data);
301     }
302   }
303 
304   if (ep->zero_copy_protector != nullptr) {
305     // Use zero-copy grpc protector to protect.
306     result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector,
307                                                   slices, &ep->output_buffer);
308   } else {
309     // Use frame protector to protect.
310     for (i = 0; i < slices->count; i++) {
311       grpc_slice plain = slices->slices[i];
312       uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
313       size_t message_size = GRPC_SLICE_LENGTH(plain);
314       while (message_size > 0) {
315         size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
316         size_t processed_message_size = message_size;
317         gpr_mu_lock(&ep->protector_mu);
318         result = tsi_frame_protector_protect(ep->protector, message_bytes,
319                                              &processed_message_size, cur,
320                                              &protected_buffer_size_to_send);
321         gpr_mu_unlock(&ep->protector_mu);
322         if (result != TSI_OK) {
323           gpr_log(GPR_ERROR, "Encryption error: %s",
324                   tsi_result_to_string(result));
325           break;
326         }
327         message_bytes += processed_message_size;
328         message_size -= processed_message_size;
329         cur += protected_buffer_size_to_send;
330 
331         if (cur == end) {
332           flush_write_staging_buffer(ep, &cur, &end);
333         }
334       }
335       if (result != TSI_OK) break;
336     }
337     if (result == TSI_OK) {
338       size_t still_pending_size;
339       do {
340         size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
341         gpr_mu_lock(&ep->protector_mu);
342         result = tsi_frame_protector_protect_flush(
343             ep->protector, cur, &protected_buffer_size_to_send,
344             &still_pending_size);
345         gpr_mu_unlock(&ep->protector_mu);
346         if (result != TSI_OK) break;
347         cur += protected_buffer_size_to_send;
348         if (cur == end) {
349           flush_write_staging_buffer(ep, &cur, &end);
350         }
351       } while (still_pending_size > 0);
352       if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
353         grpc_slice_buffer_add(
354             &ep->output_buffer,
355             grpc_slice_split_head(
356                 &ep->write_staging_buffer,
357                 static_cast<size_t>(
358                     cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
359       }
360     }
361   }
362 
363   if (result != TSI_OK) {
364     /* TODO(yangg) do different things according to the error type? */
365     grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
366     grpc_core::ExecCtx::Run(
367         DEBUG_LOCATION, cb,
368         grpc_set_tsi_error_result(
369             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
370     return;
371   }
372 
373   grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
374 }
375 
endpoint_shutdown(grpc_endpoint * secure_ep,grpc_error * why)376 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
377   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
378   grpc_endpoint_shutdown(ep->wrapped_ep, why);
379 }
380 
endpoint_destroy(grpc_endpoint * secure_ep)381 static void endpoint_destroy(grpc_endpoint* secure_ep) {
382   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
383   SECURE_ENDPOINT_UNREF(ep, "destroy");
384 }
385 
endpoint_add_to_pollset(grpc_endpoint * secure_ep,grpc_pollset * pollset)386 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
387                                     grpc_pollset* pollset) {
388   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
389   grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
390 }
391 
endpoint_add_to_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)392 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
393                                         grpc_pollset_set* pollset_set) {
394   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
395   grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
396 }
397 
endpoint_delete_from_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)398 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
399                                              grpc_pollset_set* pollset_set) {
400   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
401   grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
402 }
403 
endpoint_get_peer(grpc_endpoint * secure_ep)404 static absl::string_view endpoint_get_peer(grpc_endpoint* secure_ep) {
405   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
406   return grpc_endpoint_get_peer(ep->wrapped_ep);
407 }
408 
endpoint_get_local_address(grpc_endpoint * secure_ep)409 static absl::string_view endpoint_get_local_address(grpc_endpoint* secure_ep) {
410   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
411   return grpc_endpoint_get_local_address(ep->wrapped_ep);
412 }
413 
endpoint_get_fd(grpc_endpoint * secure_ep)414 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
415   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
416   return grpc_endpoint_get_fd(ep->wrapped_ep);
417 }
418 
endpoint_get_resource_user(grpc_endpoint * secure_ep)419 static grpc_resource_user* endpoint_get_resource_user(
420     grpc_endpoint* secure_ep) {
421   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
422   return grpc_endpoint_get_resource_user(ep->wrapped_ep);
423 }
424 
endpoint_can_track_err(grpc_endpoint * secure_ep)425 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
426   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
427   return grpc_endpoint_can_track_err(ep->wrapped_ep);
428 }
429 
430 static const grpc_endpoint_vtable vtable = {endpoint_read,
431                                             endpoint_write,
432                                             endpoint_add_to_pollset,
433                                             endpoint_add_to_pollset_set,
434                                             endpoint_delete_from_pollset_set,
435                                             endpoint_shutdown,
436                                             endpoint_destroy,
437                                             endpoint_get_resource_user,
438                                             endpoint_get_peer,
439                                             endpoint_get_local_address,
440                                             endpoint_get_fd,
441                                             endpoint_can_track_err};
442 
grpc_secure_endpoint_create(struct tsi_frame_protector * protector,struct tsi_zero_copy_grpc_protector * zero_copy_protector,grpc_endpoint * to_wrap,grpc_slice * leftover_slices,size_t leftover_nslices)443 grpc_endpoint* grpc_secure_endpoint_create(
444     struct tsi_frame_protector* protector,
445     struct tsi_zero_copy_grpc_protector* zero_copy_protector,
446     grpc_endpoint* to_wrap, grpc_slice* leftover_slices,
447     size_t leftover_nslices) {
448   secure_endpoint* ep =
449       new secure_endpoint(&vtable, protector, zero_copy_protector, to_wrap,
450                           leftover_slices, leftover_nslices);
451   return &ep->base;
452 }
453