1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include <grpc/support/port_platform.h>
24
25 #include "src/core/lib/iomgr/sockaddr.h"
26
27 #include <grpc/slice.h>
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/sync.h>
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/profiling/timers.h"
35 #include "src/core/lib/security/transport/secure_endpoint.h"
36 #include "src/core/lib/security/transport/tsi_error.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "src/core/tsi/transport_security_grpc.h"
40
41 #define STAGING_BUFFER_SIZE 8192
42
43 typedef struct {
44 grpc_endpoint base;
45 grpc_endpoint* wrapped_ep;
46 struct tsi_frame_protector* protector;
47 struct tsi_zero_copy_grpc_protector* zero_copy_protector;
48 gpr_mu protector_mu;
49 /* saved upper level callbacks and user_data. */
50 grpc_closure* read_cb;
51 grpc_closure* write_cb;
52 grpc_closure on_read;
53 grpc_slice_buffer* read_buffer;
54 grpc_slice_buffer source_buffer;
55 /* saved handshaker leftover data to unprotect. */
56 grpc_slice_buffer leftover_bytes;
57 /* buffers for read and write */
58 grpc_slice read_staging_buffer;
59
60 grpc_slice write_staging_buffer;
61 grpc_slice_buffer output_buffer;
62
63 gpr_refcount ref;
64 } secure_endpoint;
65
66 grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
67
destroy(secure_endpoint * secure_ep)68 static void destroy(secure_endpoint* secure_ep) {
69 secure_endpoint* ep = secure_ep;
70 grpc_endpoint_destroy(ep->wrapped_ep);
71 tsi_frame_protector_destroy(ep->protector);
72 tsi_zero_copy_grpc_protector_destroy(ep->zero_copy_protector);
73 grpc_slice_buffer_destroy_internal(&ep->leftover_bytes);
74 grpc_slice_unref_internal(ep->read_staging_buffer);
75 grpc_slice_unref_internal(ep->write_staging_buffer);
76 grpc_slice_buffer_destroy_internal(&ep->output_buffer);
77 grpc_slice_buffer_destroy_internal(&ep->source_buffer);
78 gpr_mu_destroy(&ep->protector_mu);
79 gpr_free(ep);
80 }
81
82 #ifndef NDEBUG
83 #define SECURE_ENDPOINT_UNREF(ep, reason) \
84 secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
85 #define SECURE_ENDPOINT_REF(ep, reason) \
86 secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
secure_endpoint_unref(secure_endpoint * ep,const char * reason,const char * file,int line)87 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
88 const char* file, int line) {
89 if (grpc_trace_secure_endpoint.enabled()) {
90 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
91 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
92 "SECENDP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
93 val - 1);
94 }
95 if (gpr_unref(&ep->ref)) {
96 destroy(ep);
97 }
98 }
99
secure_endpoint_ref(secure_endpoint * ep,const char * reason,const char * file,int line)100 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
101 const char* file, int line) {
102 if (grpc_trace_secure_endpoint.enabled()) {
103 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
104 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
105 "SECENDP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
106 val + 1);
107 }
108 gpr_ref(&ep->ref);
109 }
110 #else
111 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
112 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
secure_endpoint_unref(secure_endpoint * ep)113 static void secure_endpoint_unref(secure_endpoint* ep) {
114 if (gpr_unref(&ep->ref)) {
115 destroy(ep);
116 }
117 }
118
secure_endpoint_ref(secure_endpoint * ep)119 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
120 #endif
121
flush_read_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)122 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
123 uint8_t** end) {
124 grpc_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
125 ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
126 *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
127 *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
128 }
129
call_read_cb(secure_endpoint * ep,grpc_error * error)130 static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
131 if (grpc_trace_secure_endpoint.enabled()) {
132 size_t i;
133 for (i = 0; i < ep->read_buffer->count; i++) {
134 char* data = grpc_dump_slice(ep->read_buffer->slices[i],
135 GPR_DUMP_HEX | GPR_DUMP_ASCII);
136 gpr_log(GPR_INFO, "READ %p: %s", ep, data);
137 gpr_free(data);
138 }
139 }
140 ep->read_buffer = nullptr;
141 GRPC_CLOSURE_SCHED(ep->read_cb, error);
142 SECURE_ENDPOINT_UNREF(ep, "read");
143 }
144
on_read(void * user_data,grpc_error * error)145 static void on_read(void* user_data, grpc_error* error) {
146 unsigned i;
147 uint8_t keep_looping = 0;
148 tsi_result result = TSI_OK;
149 secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
150 uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
151 uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
152
153 if (error != GRPC_ERROR_NONE) {
154 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
155 call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
156 "Secure read failed", &error, 1));
157 return;
158 }
159
160 if (ep->zero_copy_protector != nullptr) {
161 // Use zero-copy grpc protector to unprotect.
162 result = tsi_zero_copy_grpc_protector_unprotect(
163 ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
164 } else {
165 // Use frame protector to unprotect.
166 /* TODO(yangg) check error, maybe bail out early */
167 for (i = 0; i < ep->source_buffer.count; i++) {
168 grpc_slice encrypted = ep->source_buffer.slices[i];
169 uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
170 size_t message_size = GRPC_SLICE_LENGTH(encrypted);
171
172 while (message_size > 0 || keep_looping) {
173 size_t unprotected_buffer_size_written = static_cast<size_t>(end - cur);
174 size_t processed_message_size = message_size;
175 gpr_mu_lock(&ep->protector_mu);
176 result = tsi_frame_protector_unprotect(
177 ep->protector, message_bytes, &processed_message_size, cur,
178 &unprotected_buffer_size_written);
179 gpr_mu_unlock(&ep->protector_mu);
180 if (result != TSI_OK) {
181 gpr_log(GPR_ERROR, "Decryption error: %s",
182 tsi_result_to_string(result));
183 break;
184 }
185 message_bytes += processed_message_size;
186 message_size -= processed_message_size;
187 cur += unprotected_buffer_size_written;
188
189 if (cur == end) {
190 flush_read_staging_buffer(ep, &cur, &end);
191 /* Force to enter the loop again to extract buffered bytes in
192 protector. The bytes could be buffered because of running out of
193 staging_buffer. If this happens at the end of all slices, doing
194 another unprotect avoids leaving data in the protector. */
195 keep_looping = 1;
196 } else if (unprotected_buffer_size_written > 0) {
197 keep_looping = 1;
198 } else {
199 keep_looping = 0;
200 }
201 }
202 if (result != TSI_OK) break;
203 }
204
205 if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
206 grpc_slice_buffer_add(
207 ep->read_buffer,
208 grpc_slice_split_head(
209 &ep->read_staging_buffer,
210 static_cast<size_t>(
211 cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
212 }
213 }
214
215 /* TODO(yangg) experiment with moving this block after read_cb to see if it
216 helps latency */
217 grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer);
218
219 if (result != TSI_OK) {
220 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
221 call_read_cb(
222 ep, grpc_set_tsi_error_result(
223 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result));
224 return;
225 }
226
227 call_read_cb(ep, GRPC_ERROR_NONE);
228 }
229
endpoint_read(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb)230 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
231 grpc_closure* cb) {
232 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
233 ep->read_cb = cb;
234 ep->read_buffer = slices;
235 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
236
237 SECURE_ENDPOINT_REF(ep, "read");
238 if (ep->leftover_bytes.count) {
239 grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
240 GPR_ASSERT(ep->leftover_bytes.count == 0);
241 on_read(ep, GRPC_ERROR_NONE);
242 return;
243 }
244
245 grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read);
246 }
247
flush_write_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)248 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
249 uint8_t** end) {
250 grpc_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer);
251 ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
252 *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
253 *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
254 }
255
endpoint_write(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)256 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
257 grpc_closure* cb, void* arg) {
258 GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
259
260 unsigned i;
261 tsi_result result = TSI_OK;
262 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
263 uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
264 uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
265
266 grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
267
268 if (grpc_trace_secure_endpoint.enabled()) {
269 for (i = 0; i < slices->count; i++) {
270 char* data =
271 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
272 gpr_log(GPR_INFO, "WRITE %p: %s", ep, data);
273 gpr_free(data);
274 }
275 }
276
277 if (ep->zero_copy_protector != nullptr) {
278 // Use zero-copy grpc protector to protect.
279 result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector,
280 slices, &ep->output_buffer);
281 } else {
282 // Use frame protector to protect.
283 for (i = 0; i < slices->count; i++) {
284 grpc_slice plain = slices->slices[i];
285 uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
286 size_t message_size = GRPC_SLICE_LENGTH(plain);
287 while (message_size > 0) {
288 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
289 size_t processed_message_size = message_size;
290 gpr_mu_lock(&ep->protector_mu);
291 result = tsi_frame_protector_protect(ep->protector, message_bytes,
292 &processed_message_size, cur,
293 &protected_buffer_size_to_send);
294 gpr_mu_unlock(&ep->protector_mu);
295 if (result != TSI_OK) {
296 gpr_log(GPR_ERROR, "Encryption error: %s",
297 tsi_result_to_string(result));
298 break;
299 }
300 message_bytes += processed_message_size;
301 message_size -= processed_message_size;
302 cur += protected_buffer_size_to_send;
303
304 if (cur == end) {
305 flush_write_staging_buffer(ep, &cur, &end);
306 }
307 }
308 if (result != TSI_OK) break;
309 }
310 if (result == TSI_OK) {
311 size_t still_pending_size;
312 do {
313 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
314 gpr_mu_lock(&ep->protector_mu);
315 result = tsi_frame_protector_protect_flush(
316 ep->protector, cur, &protected_buffer_size_to_send,
317 &still_pending_size);
318 gpr_mu_unlock(&ep->protector_mu);
319 if (result != TSI_OK) break;
320 cur += protected_buffer_size_to_send;
321 if (cur == end) {
322 flush_write_staging_buffer(ep, &cur, &end);
323 }
324 } while (still_pending_size > 0);
325 if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
326 grpc_slice_buffer_add(
327 &ep->output_buffer,
328 grpc_slice_split_head(
329 &ep->write_staging_buffer,
330 static_cast<size_t>(
331 cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
332 }
333 }
334 }
335
336 if (result != TSI_OK) {
337 /* TODO(yangg) do different things according to the error type? */
338 grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
339 GRPC_CLOSURE_SCHED(
340 cb, grpc_set_tsi_error_result(
341 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
342 return;
343 }
344
345 grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
346 }
347
endpoint_shutdown(grpc_endpoint * secure_ep,grpc_error * why)348 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
349 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
350 grpc_endpoint_shutdown(ep->wrapped_ep, why);
351 }
352
endpoint_destroy(grpc_endpoint * secure_ep)353 static void endpoint_destroy(grpc_endpoint* secure_ep) {
354 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
355 SECURE_ENDPOINT_UNREF(ep, "destroy");
356 }
357
endpoint_add_to_pollset(grpc_endpoint * secure_ep,grpc_pollset * pollset)358 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
359 grpc_pollset* pollset) {
360 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
361 grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
362 }
363
endpoint_add_to_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)364 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
365 grpc_pollset_set* pollset_set) {
366 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
367 grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
368 }
369
endpoint_delete_from_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)370 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
371 grpc_pollset_set* pollset_set) {
372 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
373 grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
374 }
375
endpoint_get_peer(grpc_endpoint * secure_ep)376 static char* endpoint_get_peer(grpc_endpoint* secure_ep) {
377 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
378 return grpc_endpoint_get_peer(ep->wrapped_ep);
379 }
380
endpoint_get_fd(grpc_endpoint * secure_ep)381 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
382 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
383 return grpc_endpoint_get_fd(ep->wrapped_ep);
384 }
385
endpoint_get_resource_user(grpc_endpoint * secure_ep)386 static grpc_resource_user* endpoint_get_resource_user(
387 grpc_endpoint* secure_ep) {
388 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
389 return grpc_endpoint_get_resource_user(ep->wrapped_ep);
390 }
391
392 static const grpc_endpoint_vtable vtable = {endpoint_read,
393 endpoint_write,
394 endpoint_add_to_pollset,
395 endpoint_add_to_pollset_set,
396 endpoint_delete_from_pollset_set,
397 endpoint_shutdown,
398 endpoint_destroy,
399 endpoint_get_resource_user,
400 endpoint_get_peer,
401 endpoint_get_fd};
402
grpc_secure_endpoint_create(struct tsi_frame_protector * protector,struct tsi_zero_copy_grpc_protector * zero_copy_protector,grpc_endpoint * transport,grpc_slice * leftover_slices,size_t leftover_nslices)403 grpc_endpoint* grpc_secure_endpoint_create(
404 struct tsi_frame_protector* protector,
405 struct tsi_zero_copy_grpc_protector* zero_copy_protector,
406 grpc_endpoint* transport, grpc_slice* leftover_slices,
407 size_t leftover_nslices) {
408 size_t i;
409 secure_endpoint* ep =
410 static_cast<secure_endpoint*>(gpr_malloc(sizeof(secure_endpoint)));
411 ep->base.vtable = &vtable;
412 ep->wrapped_ep = transport;
413 ep->protector = protector;
414 ep->zero_copy_protector = zero_copy_protector;
415 grpc_slice_buffer_init(&ep->leftover_bytes);
416 for (i = 0; i < leftover_nslices; i++) {
417 grpc_slice_buffer_add(&ep->leftover_bytes,
418 grpc_slice_ref_internal(leftover_slices[i]));
419 }
420 ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
421 ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
422 grpc_slice_buffer_init(&ep->output_buffer);
423 grpc_slice_buffer_init(&ep->source_buffer);
424 ep->read_buffer = nullptr;
425 GRPC_CLOSURE_INIT(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx);
426 gpr_mu_init(&ep->protector_mu);
427 gpr_ref_init(&ep->ref, 1);
428 return &ep->base;
429 }
430