1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include <grpc/support/port_platform.h>
24
25 #include <new>
26
27 #include "src/core/lib/iomgr/sockaddr.h"
28
29 #include <grpc/slice.h>
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/memory.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/core/lib/security/transport/secure_endpoint.h"
39 #include "src/core/lib/security/transport/tsi_error.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/tsi/transport_security_grpc.h"
43
44 #define STAGING_BUFFER_SIZE 8192
45
46 static void on_read(void* user_data, grpc_error* error);
47
48 namespace {
49 struct secure_endpoint {
secure_endpoint__anon8dd21e320111::secure_endpoint50 secure_endpoint(const grpc_endpoint_vtable* vtable,
51 tsi_frame_protector* protector,
52 tsi_zero_copy_grpc_protector* zero_copy_protector,
53 grpc_endpoint* transport, grpc_slice* leftover_slices,
54 size_t leftover_nslices)
55 : wrapped_ep(transport),
56 protector(protector),
57 zero_copy_protector(zero_copy_protector) {
58 base.vtable = vtable;
59 gpr_mu_init(&protector_mu);
60 GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
61 grpc_slice_buffer_init(&source_buffer);
62 grpc_slice_buffer_init(&leftover_bytes);
63 for (size_t i = 0; i < leftover_nslices; i++) {
64 grpc_slice_buffer_add(&leftover_bytes,
65 grpc_slice_ref_internal(leftover_slices[i]));
66 }
67 grpc_slice_buffer_init(&output_buffer);
68 gpr_ref_init(&ref, 1);
69 }
70
~secure_endpoint__anon8dd21e320111::secure_endpoint71 ~secure_endpoint() {
72 grpc_endpoint_destroy(wrapped_ep);
73 tsi_frame_protector_destroy(protector);
74 tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
75 grpc_slice_buffer_destroy_internal(&source_buffer);
76 grpc_slice_buffer_destroy_internal(&leftover_bytes);
77 grpc_slice_unref_internal(read_staging_buffer);
78 grpc_slice_unref_internal(write_staging_buffer);
79 grpc_slice_buffer_destroy_internal(&output_buffer);
80 gpr_mu_destroy(&protector_mu);
81 }
82
83 grpc_endpoint base;
84 grpc_endpoint* wrapped_ep;
85 struct tsi_frame_protector* protector;
86 struct tsi_zero_copy_grpc_protector* zero_copy_protector;
87 gpr_mu protector_mu;
88 /* saved upper level callbacks and user_data. */
89 grpc_closure* read_cb = nullptr;
90 grpc_closure* write_cb = nullptr;
91 grpc_closure on_read;
92 grpc_slice_buffer* read_buffer = nullptr;
93 grpc_slice_buffer source_buffer;
94 /* saved handshaker leftover data to unprotect. */
95 grpc_slice_buffer leftover_bytes;
96 /* buffers for read and write */
97 grpc_slice read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
98 grpc_slice write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
99 grpc_slice_buffer output_buffer;
100
101 gpr_refcount ref;
102 };
103 } // namespace
104
105 grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
106
destroy(secure_endpoint * ep)107 static void destroy(secure_endpoint* ep) { delete ep; }
108
109 #ifndef NDEBUG
110 #define SECURE_ENDPOINT_UNREF(ep, reason) \
111 secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
112 #define SECURE_ENDPOINT_REF(ep, reason) \
113 secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
secure_endpoint_unref(secure_endpoint * ep,const char * reason,const char * file,int line)114 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
115 const char* file, int line) {
116 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
117 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
118 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
119 "SECENDP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
120 val - 1);
121 }
122 if (gpr_unref(&ep->ref)) {
123 destroy(ep);
124 }
125 }
126
secure_endpoint_ref(secure_endpoint * ep,const char * reason,const char * file,int line)127 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
128 const char* file, int line) {
129 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
130 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
131 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
132 "SECENDP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
133 val + 1);
134 }
135 gpr_ref(&ep->ref);
136 }
137 #else
138 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
139 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
secure_endpoint_unref(secure_endpoint * ep)140 static void secure_endpoint_unref(secure_endpoint* ep) {
141 if (gpr_unref(&ep->ref)) {
142 destroy(ep);
143 }
144 }
145
secure_endpoint_ref(secure_endpoint * ep)146 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
147 #endif
148
flush_read_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)149 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
150 uint8_t** end) {
151 grpc_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
152 ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
153 *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
154 *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
155 }
156
call_read_cb(secure_endpoint * ep,grpc_error * error)157 static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
158 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
159 size_t i;
160 for (i = 0; i < ep->read_buffer->count; i++) {
161 char* data = grpc_dump_slice(ep->read_buffer->slices[i],
162 GPR_DUMP_HEX | GPR_DUMP_ASCII);
163 gpr_log(GPR_INFO, "READ %p: %s", ep, data);
164 gpr_free(data);
165 }
166 }
167 ep->read_buffer = nullptr;
168 grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
169 SECURE_ENDPOINT_UNREF(ep, "read");
170 }
171
on_read(void * user_data,grpc_error * error)172 static void on_read(void* user_data, grpc_error* error) {
173 unsigned i;
174 uint8_t keep_looping = 0;
175 tsi_result result = TSI_OK;
176 secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
177 uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
178 uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
179
180 if (error != GRPC_ERROR_NONE) {
181 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
182 call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
183 "Secure read failed", &error, 1));
184 return;
185 }
186
187 if (ep->zero_copy_protector != nullptr) {
188 // Use zero-copy grpc protector to unprotect.
189 result = tsi_zero_copy_grpc_protector_unprotect(
190 ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
191 } else {
192 // Use frame protector to unprotect.
193 /* TODO(yangg) check error, maybe bail out early */
194 for (i = 0; i < ep->source_buffer.count; i++) {
195 grpc_slice encrypted = ep->source_buffer.slices[i];
196 uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
197 size_t message_size = GRPC_SLICE_LENGTH(encrypted);
198
199 while (message_size > 0 || keep_looping) {
200 size_t unprotected_buffer_size_written = static_cast<size_t>(end - cur);
201 size_t processed_message_size = message_size;
202 gpr_mu_lock(&ep->protector_mu);
203 result = tsi_frame_protector_unprotect(
204 ep->protector, message_bytes, &processed_message_size, cur,
205 &unprotected_buffer_size_written);
206 gpr_mu_unlock(&ep->protector_mu);
207 if (result != TSI_OK) {
208 gpr_log(GPR_ERROR, "Decryption error: %s",
209 tsi_result_to_string(result));
210 break;
211 }
212 message_bytes += processed_message_size;
213 message_size -= processed_message_size;
214 cur += unprotected_buffer_size_written;
215
216 if (cur == end) {
217 flush_read_staging_buffer(ep, &cur, &end);
218 /* Force to enter the loop again to extract buffered bytes in
219 protector. The bytes could be buffered because of running out of
220 staging_buffer. If this happens at the end of all slices, doing
221 another unprotect avoids leaving data in the protector. */
222 keep_looping = 1;
223 } else if (unprotected_buffer_size_written > 0) {
224 keep_looping = 1;
225 } else {
226 keep_looping = 0;
227 }
228 }
229 if (result != TSI_OK) break;
230 }
231
232 if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
233 grpc_slice_buffer_add(
234 ep->read_buffer,
235 grpc_slice_split_head(
236 &ep->read_staging_buffer,
237 static_cast<size_t>(
238 cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
239 }
240 }
241
242 /* TODO(yangg) experiment with moving this block after read_cb to see if it
243 helps latency */
244 grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer);
245
246 if (result != TSI_OK) {
247 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
248 call_read_cb(
249 ep, grpc_set_tsi_error_result(
250 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result));
251 return;
252 }
253
254 call_read_cb(ep, GRPC_ERROR_NONE);
255 }
256
endpoint_read(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent)257 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
258 grpc_closure* cb, bool urgent) {
259 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
260 ep->read_cb = cb;
261 ep->read_buffer = slices;
262 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
263
264 SECURE_ENDPOINT_REF(ep, "read");
265 if (ep->leftover_bytes.count) {
266 grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
267 GPR_ASSERT(ep->leftover_bytes.count == 0);
268 on_read(ep, GRPC_ERROR_NONE);
269 return;
270 }
271
272 grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent);
273 }
274
flush_write_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)275 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
276 uint8_t** end) {
277 grpc_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer);
278 ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
279 *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
280 *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
281 }
282
endpoint_write(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)283 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
284 grpc_closure* cb, void* arg) {
285 GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
286
287 unsigned i;
288 tsi_result result = TSI_OK;
289 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
290 uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
291 uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
292
293 grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
294
295 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
296 for (i = 0; i < slices->count; i++) {
297 char* data =
298 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
299 gpr_log(GPR_INFO, "WRITE %p: %s", ep, data);
300 gpr_free(data);
301 }
302 }
303
304 if (ep->zero_copy_protector != nullptr) {
305 // Use zero-copy grpc protector to protect.
306 result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector,
307 slices, &ep->output_buffer);
308 } else {
309 // Use frame protector to protect.
310 for (i = 0; i < slices->count; i++) {
311 grpc_slice plain = slices->slices[i];
312 uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
313 size_t message_size = GRPC_SLICE_LENGTH(plain);
314 while (message_size > 0) {
315 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
316 size_t processed_message_size = message_size;
317 gpr_mu_lock(&ep->protector_mu);
318 result = tsi_frame_protector_protect(ep->protector, message_bytes,
319 &processed_message_size, cur,
320 &protected_buffer_size_to_send);
321 gpr_mu_unlock(&ep->protector_mu);
322 if (result != TSI_OK) {
323 gpr_log(GPR_ERROR, "Encryption error: %s",
324 tsi_result_to_string(result));
325 break;
326 }
327 message_bytes += processed_message_size;
328 message_size -= processed_message_size;
329 cur += protected_buffer_size_to_send;
330
331 if (cur == end) {
332 flush_write_staging_buffer(ep, &cur, &end);
333 }
334 }
335 if (result != TSI_OK) break;
336 }
337 if (result == TSI_OK) {
338 size_t still_pending_size;
339 do {
340 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
341 gpr_mu_lock(&ep->protector_mu);
342 result = tsi_frame_protector_protect_flush(
343 ep->protector, cur, &protected_buffer_size_to_send,
344 &still_pending_size);
345 gpr_mu_unlock(&ep->protector_mu);
346 if (result != TSI_OK) break;
347 cur += protected_buffer_size_to_send;
348 if (cur == end) {
349 flush_write_staging_buffer(ep, &cur, &end);
350 }
351 } while (still_pending_size > 0);
352 if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
353 grpc_slice_buffer_add(
354 &ep->output_buffer,
355 grpc_slice_split_head(
356 &ep->write_staging_buffer,
357 static_cast<size_t>(
358 cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
359 }
360 }
361 }
362
363 if (result != TSI_OK) {
364 /* TODO(yangg) do different things according to the error type? */
365 grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
366 grpc_core::ExecCtx::Run(
367 DEBUG_LOCATION, cb,
368 grpc_set_tsi_error_result(
369 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
370 return;
371 }
372
373 grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
374 }
375
endpoint_shutdown(grpc_endpoint * secure_ep,grpc_error * why)376 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
377 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
378 grpc_endpoint_shutdown(ep->wrapped_ep, why);
379 }
380
endpoint_destroy(grpc_endpoint * secure_ep)381 static void endpoint_destroy(grpc_endpoint* secure_ep) {
382 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
383 SECURE_ENDPOINT_UNREF(ep, "destroy");
384 }
385
endpoint_add_to_pollset(grpc_endpoint * secure_ep,grpc_pollset * pollset)386 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
387 grpc_pollset* pollset) {
388 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
389 grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
390 }
391
endpoint_add_to_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)392 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
393 grpc_pollset_set* pollset_set) {
394 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
395 grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
396 }
397
endpoint_delete_from_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)398 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
399 grpc_pollset_set* pollset_set) {
400 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
401 grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
402 }
403
endpoint_get_peer(grpc_endpoint * secure_ep)404 static absl::string_view endpoint_get_peer(grpc_endpoint* secure_ep) {
405 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
406 return grpc_endpoint_get_peer(ep->wrapped_ep);
407 }
408
endpoint_get_local_address(grpc_endpoint * secure_ep)409 static absl::string_view endpoint_get_local_address(grpc_endpoint* secure_ep) {
410 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
411 return grpc_endpoint_get_local_address(ep->wrapped_ep);
412 }
413
endpoint_get_fd(grpc_endpoint * secure_ep)414 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
415 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
416 return grpc_endpoint_get_fd(ep->wrapped_ep);
417 }
418
endpoint_get_resource_user(grpc_endpoint * secure_ep)419 static grpc_resource_user* endpoint_get_resource_user(
420 grpc_endpoint* secure_ep) {
421 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
422 return grpc_endpoint_get_resource_user(ep->wrapped_ep);
423 }
424
endpoint_can_track_err(grpc_endpoint * secure_ep)425 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
426 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
427 return grpc_endpoint_can_track_err(ep->wrapped_ep);
428 }
429
430 static const grpc_endpoint_vtable vtable = {endpoint_read,
431 endpoint_write,
432 endpoint_add_to_pollset,
433 endpoint_add_to_pollset_set,
434 endpoint_delete_from_pollset_set,
435 endpoint_shutdown,
436 endpoint_destroy,
437 endpoint_get_resource_user,
438 endpoint_get_peer,
439 endpoint_get_local_address,
440 endpoint_get_fd,
441 endpoint_can_track_err};
442
grpc_secure_endpoint_create(struct tsi_frame_protector * protector,struct tsi_zero_copy_grpc_protector * zero_copy_protector,grpc_endpoint * to_wrap,grpc_slice * leftover_slices,size_t leftover_nslices)443 grpc_endpoint* grpc_secure_endpoint_create(
444 struct tsi_frame_protector* protector,
445 struct tsi_zero_copy_grpc_protector* zero_copy_protector,
446 grpc_endpoint* to_wrap, grpc_slice* leftover_slices,
447 size_t leftover_nslices) {
448 secure_endpoint* ep =
449 new secure_endpoint(&vtable, protector, zero_copy_protector, to_wrap,
450 leftover_slices, leftover_nslices);
451 return &ep->base;
452 }
453