1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/gpr/host_port.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/manual_constructor.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "src/core/lib/surface/channel.h"
40 #include "src/core/lib/transport/metadata_batch.h"
41 #include "src/core/lib/transport/static_metadata.h"
42 #include "src/core/lib/transport/transport_impl.h"
43 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
44
45 #define GRPC_HEADER_SIZE_IN_BYTES 5
46 #define GRPC_FLUSH_READ_SIZE 4096
47
48 #define CRONET_LOG(...) \
49 do { \
50 if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
51 } while (0)
52
53 /* TODO (makdharma): Hook up into the wider tracing mechanism */
54 int grpc_cronet_trace = 0;
55
56 enum e_op_result {
57 ACTION_TAKEN_WITH_CALLBACK,
58 ACTION_TAKEN_NO_CALLBACK,
59 NO_ACTION_POSSIBLE
60 };
61
62 enum e_op_id {
63 OP_SEND_INITIAL_METADATA = 0,
64 OP_SEND_MESSAGE,
65 OP_SEND_TRAILING_METADATA,
66 OP_RECV_MESSAGE,
67 OP_RECV_INITIAL_METADATA,
68 OP_RECV_TRAILING_METADATA,
69 OP_CANCEL_ERROR,
70 OP_ON_COMPLETE,
71 OP_FAILED,
72 OP_SUCCEEDED,
73 OP_CANCELED,
74 OP_RECV_MESSAGE_AND_ON_COMPLETE,
75 OP_READ_REQ_MADE,
76 OP_NUM_OPS
77 };
78
79 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
80
81 static void on_stream_ready(bidirectional_stream*);
82 static void on_response_headers_received(
83 bidirectional_stream*, const bidirectional_stream_header_array*,
84 const char*);
85 static void on_write_completed(bidirectional_stream*, const char*);
86 static void on_read_completed(bidirectional_stream*, char*, int);
87 static void on_response_trailers_received(
88 bidirectional_stream*, const bidirectional_stream_header_array*);
89 static void on_succeeded(bidirectional_stream*);
90 static void on_failed(bidirectional_stream*, int);
91 static void on_canceled(bidirectional_stream*);
92 static bidirectional_stream_callback cronet_callbacks = {
93 on_stream_ready,
94 on_response_headers_received,
95 on_read_completed,
96 on_write_completed,
97 on_response_trailers_received,
98 on_succeeded,
99 on_failed,
100 on_canceled};
101
102 /* Cronet transport object */
103 struct grpc_cronet_transport {
104 grpc_transport base; /* must be first element in this structure */
105 stream_engine* engine;
106 char* host;
107 bool use_packet_coalescing;
108 };
109 typedef struct grpc_cronet_transport grpc_cronet_transport;
110
111 /* TODO (makdharma): reorder structure for memory efficiency per
112 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
113 struct read_state {
114 /* vars to store data coming from server */
115 char* read_buffer;
116 bool length_field_received;
117 int received_bytes;
118 int remaining_bytes;
119 int length_field;
120 bool compressed;
121 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
122 char* payload_field;
123 bool read_stream_closed;
124
125 /* vars for holding data destined for the application */
126 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
127 grpc_slice_buffer read_slice_buffer;
128
129 /* vars for trailing metadata */
130 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
131 bool trailing_metadata_valid;
132
133 /* vars for initial metadata */
134 grpc_chttp2_incoming_metadata_buffer initial_metadata;
135 };
136
137 struct write_state {
138 char* write_buffer;
139 };
140
141 /* track state of one stream op */
142 struct op_state {
143 bool state_op_done[OP_NUM_OPS];
144 bool state_callback_received[OP_NUM_OPS];
145 /* A non-zero gRPC status code has been seen */
146 bool fail_state;
147 /* Transport is discarding all buffered messages */
148 bool flush_read;
149 bool flush_cronet_when_ready;
150 bool pending_write_for_trailer;
151 bool pending_send_message;
152 /* User requested RECV_TRAILING_METADATA */
153 bool pending_recv_trailing_metadata;
154 /* Cronet has not issued a callback of a bidirectional read */
155 bool pending_read_from_cronet;
156 grpc_error* cancel_error;
157 /* data structure for storing data coming from server */
158 struct read_state rs;
159 /* data structure for storing data going to the server */
160 struct write_state ws;
161 };
162
163 struct op_and_state {
164 grpc_transport_stream_op_batch op;
165 struct op_state state;
166 bool done;
167 struct stream_obj* s; /* Pointer back to the stream object */
168 struct op_and_state* next; /* next op_and_state in the linked list */
169 };
170
171 struct op_storage {
172 int num_pending_ops;
173 struct op_and_state* head;
174 };
175
176 struct stream_obj {
177 gpr_arena* arena;
178 struct op_and_state* oas;
179 grpc_transport_stream_op_batch* curr_op;
180 grpc_cronet_transport* curr_ct;
181 grpc_stream* curr_gs;
182 bidirectional_stream* cbs;
183 bidirectional_stream_header_array header_array;
184
185 /* Stream level state. Some state will be tracked both at stream and stream_op
186 * level */
187 struct op_state state;
188
189 /* OP storage */
190 struct op_storage storage;
191
192 /* Mutex to protect storage */
193 gpr_mu mu;
194
195 /* Refcount object of the stream */
196 grpc_stream_refcount* refcount;
197 };
198 typedef struct stream_obj stream_obj;
199
200 #ifndef NDEBUG
201 #define GRPC_CRONET_STREAM_REF(stream, reason) \
202 grpc_cronet_stream_ref((stream), (reason))
203 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
204 grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)205 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
206 grpc_stream_ref(s->refcount, reason);
207 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)208 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
209 grpc_stream_unref(s->refcount, reason);
210 }
211 #else
212 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
213 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
214 grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)215 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)216 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
217 #endif
218
219 static enum e_op_result execute_stream_op(struct op_and_state* oas);
220
221 /*
222 Utility function to translate enum into string for printing
223 */
op_result_string(enum e_op_result i)224 static const char* op_result_string(enum e_op_result i) {
225 switch (i) {
226 case ACTION_TAKEN_WITH_CALLBACK:
227 return "ACTION_TAKEN_WITH_CALLBACK";
228 case ACTION_TAKEN_NO_CALLBACK:
229 return "ACTION_TAKEN_NO_CALLBACK";
230 case NO_ACTION_POSSIBLE:
231 return "NO_ACTION_POSSIBLE";
232 }
233 GPR_UNREACHABLE_CODE(return "UNKNOWN");
234 }
235
op_id_string(enum e_op_id i)236 static const char* op_id_string(enum e_op_id i) {
237 switch (i) {
238 case OP_SEND_INITIAL_METADATA:
239 return "OP_SEND_INITIAL_METADATA";
240 case OP_SEND_MESSAGE:
241 return "OP_SEND_MESSAGE";
242 case OP_SEND_TRAILING_METADATA:
243 return "OP_SEND_TRAILING_METADATA";
244 case OP_RECV_MESSAGE:
245 return "OP_RECV_MESSAGE";
246 case OP_RECV_INITIAL_METADATA:
247 return "OP_RECV_INITIAL_METADATA";
248 case OP_RECV_TRAILING_METADATA:
249 return "OP_RECV_TRAILING_METADATA";
250 case OP_CANCEL_ERROR:
251 return "OP_CANCEL_ERROR";
252 case OP_ON_COMPLETE:
253 return "OP_ON_COMPLETE";
254 case OP_FAILED:
255 return "OP_FAILED";
256 case OP_SUCCEEDED:
257 return "OP_SUCCEEDED";
258 case OP_CANCELED:
259 return "OP_CANCELED";
260 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
261 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
262 case OP_READ_REQ_MADE:
263 return "OP_READ_REQ_MADE";
264 case OP_NUM_OPS:
265 return "OP_NUM_OPS";
266 }
267 return "UNKNOWN";
268 }
269
null_and_maybe_free_read_buffer(stream_obj * s)270 static void null_and_maybe_free_read_buffer(stream_obj* s) {
271 if (s->state.rs.read_buffer &&
272 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
273 gpr_free(s->state.rs.read_buffer);
274 }
275 s->state.rs.read_buffer = nullptr;
276 }
277
maybe_flush_read(stream_obj * s)278 static void maybe_flush_read(stream_obj* s) {
279 /* To enter flush read state (discarding all the buffered messages in
280 * transport layer), two conditions must be satisfied: 1) non-zero grpc status
281 * has been received, and 2) an op requesting the status code
282 * (RECV_TRAILING_METADATA) is issued by the user. (See
283 * doc/status_ordering.md) */
284 /* Whenever the evaluation of any of the two condition is changed, we check
285 * whether we should enter the flush read state. */
286 if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
287 if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
288 CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
289 s->state.flush_read = true;
290 null_and_maybe_free_read_buffer(s);
291 s->state.rs.read_buffer =
292 static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
293 if (!s->state.pending_read_from_cronet) {
294 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
295 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
296 GRPC_FLUSH_READ_SIZE);
297 s->state.pending_read_from_cronet = true;
298 }
299 }
300 }
301 }
302
make_error_with_desc(int error_code,const char * desc)303 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
304 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
305 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
306 return error;
307 }
308
309 /*
310 Add a new stream op to op storage.
311 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)312 static void add_to_storage(struct stream_obj* s,
313 grpc_transport_stream_op_batch* op) {
314 struct op_storage* storage = &s->storage;
315 /* add new op at the beginning of the linked list. The memory is freed
316 in remove_from_storage */
317 struct op_and_state* new_op = static_cast<struct op_and_state*>(
318 gpr_malloc(sizeof(struct op_and_state)));
319 memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch));
320 memset(&new_op->state, 0, sizeof(new_op->state));
321 new_op->s = s;
322 new_op->done = false;
323 gpr_mu_lock(&s->mu);
324 new_op->next = storage->head;
325 storage->head = new_op;
326 storage->num_pending_ops++;
327 if (op->send_message) {
328 s->state.pending_send_message = true;
329 }
330 if (op->recv_trailing_metadata) {
331 s->state.pending_recv_trailing_metadata = true;
332 maybe_flush_read(s);
333 }
334 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
335 storage->num_pending_ops);
336 gpr_mu_unlock(&s->mu);
337 }
338
339 /*
340 Traverse the linked list and delete op and free memory
341 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)342 static void remove_from_storage(struct stream_obj* s,
343 struct op_and_state* oas) {
344 struct op_and_state* curr;
345 if (s->storage.head == nullptr || oas == nullptr) {
346 return;
347 }
348 if (s->storage.head == oas) {
349 s->storage.head = oas->next;
350 gpr_free(oas);
351 s->storage.num_pending_ops--;
352 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
353 s->storage.num_pending_ops);
354 } else {
355 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
356 if (curr->next == oas) {
357 curr->next = oas->next;
358 s->storage.num_pending_ops--;
359 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
360 s->storage.num_pending_ops);
361 gpr_free(oas);
362 break;
363 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
364 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
365 }
366 }
367 }
368 }
369
370 /*
371 Cycle through ops and try to take next action. Break when either
372 an action with callback is taken, or no action is possible.
373 This can get executed from the Cronet network thread via cronet callback
374 or on the application supplied thread via the perform_stream_op function.
375 */
execute_from_storage(stream_obj * s)376 static void execute_from_storage(stream_obj* s) {
377 gpr_mu_lock(&s->mu);
378 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
379 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
380 GPR_ASSERT(curr->done == 0);
381 enum e_op_result result = execute_stream_op(curr);
382 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
383 op_result_string(result));
384 /* if this op is done, then remove it and free memory */
385 if (curr->done) {
386 struct op_and_state* next = curr->next;
387 remove_from_storage(s, curr);
388 curr = next;
389 }
390 /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
391 if (result == NO_ACTION_POSSIBLE) {
392 curr = curr->next;
393 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
394 break;
395 }
396 }
397 gpr_mu_unlock(&s->mu);
398 }
399
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)400 static void convert_cronet_array_to_metadata(
401 const bidirectional_stream_header_array* header_array,
402 grpc_chttp2_incoming_metadata_buffer* mds) {
403 for (size_t i = 0; i < header_array->count; i++) {
404 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
405 header_array->headers[i].key, header_array->headers[i].value);
406 grpc_slice key = grpc_slice_intern(
407 grpc_slice_from_static_string(header_array->headers[i].key));
408 grpc_slice value;
409 if (grpc_is_binary_header(key)) {
410 value = grpc_slice_from_static_string(header_array->headers[i].value);
411 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
412 value, grpc_chttp2_base64_infer_length_after_decode(value)));
413 } else {
414 value = grpc_slice_intern(
415 grpc_slice_from_static_string(header_array->headers[i].value));
416 }
417 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
418 grpc_chttp2_incoming_metadata_buffer_add(
419 mds, grpc_mdelem_from_slices(key, value)));
420 }
421 }
422
423 /*
424 Cronet callback
425 */
on_failed(bidirectional_stream * stream,int net_error)426 static void on_failed(bidirectional_stream* stream, int net_error) {
427 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
428 grpc_core::ExecCtx exec_ctx;
429
430 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
431 gpr_mu_lock(&s->mu);
432 bidirectional_stream_destroy(s->cbs);
433 s->state.state_callback_received[OP_FAILED] = true;
434 s->cbs = nullptr;
435 if (s->header_array.headers) {
436 gpr_free(s->header_array.headers);
437 s->header_array.headers = nullptr;
438 }
439 if (s->state.ws.write_buffer) {
440 gpr_free(s->state.ws.write_buffer);
441 s->state.ws.write_buffer = nullptr;
442 }
443 null_and_maybe_free_read_buffer(s);
444 gpr_mu_unlock(&s->mu);
445 execute_from_storage(s);
446 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
447 }
448
449 /*
450 Cronet callback
451 */
on_canceled(bidirectional_stream * stream)452 static void on_canceled(bidirectional_stream* stream) {
453 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
454 grpc_core::ExecCtx exec_ctx;
455
456 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
457 gpr_mu_lock(&s->mu);
458 bidirectional_stream_destroy(s->cbs);
459 s->state.state_callback_received[OP_CANCELED] = true;
460 s->cbs = nullptr;
461 if (s->header_array.headers) {
462 gpr_free(s->header_array.headers);
463 s->header_array.headers = nullptr;
464 }
465 if (s->state.ws.write_buffer) {
466 gpr_free(s->state.ws.write_buffer);
467 s->state.ws.write_buffer = nullptr;
468 }
469 null_and_maybe_free_read_buffer(s);
470 gpr_mu_unlock(&s->mu);
471 execute_from_storage(s);
472 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
473 }
474
475 /*
476 Cronet callback
477 */
on_succeeded(bidirectional_stream * stream)478 static void on_succeeded(bidirectional_stream* stream) {
479 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
480 grpc_core::ExecCtx exec_ctx;
481
482 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
483 gpr_mu_lock(&s->mu);
484 bidirectional_stream_destroy(s->cbs);
485 s->state.state_callback_received[OP_SUCCEEDED] = true;
486 s->cbs = nullptr;
487 null_and_maybe_free_read_buffer(s);
488 gpr_mu_unlock(&s->mu);
489 execute_from_storage(s);
490 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
491 }
492
493 /*
494 Cronet callback
495 */
on_stream_ready(bidirectional_stream * stream)496 static void on_stream_ready(bidirectional_stream* stream) {
497 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
498 grpc_core::ExecCtx exec_ctx;
499 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
500 grpc_cronet_transport* t = s->curr_ct;
501 gpr_mu_lock(&s->mu);
502 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
503 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
504 /* Free the memory allocated for headers */
505 if (s->header_array.headers) {
506 gpr_free(s->header_array.headers);
507 s->header_array.headers = nullptr;
508 }
509 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
510 * SEND_TRAILING_METADATA ops pending */
511 if (t->use_packet_coalescing) {
512 if (s->state.flush_cronet_when_ready) {
513 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
514 bidirectional_stream_flush(stream);
515 }
516 }
517 gpr_mu_unlock(&s->mu);
518 execute_from_storage(s);
519 }
520
521 /*
522 Cronet callback
523 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)524 static void on_response_headers_received(
525 bidirectional_stream* stream,
526 const bidirectional_stream_header_array* headers,
527 const char* negotiated_protocol) {
528 grpc_core::ExecCtx exec_ctx;
529 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
530 headers, negotiated_protocol);
531 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
532
533 /* Identify if this is a header or a trailer (in a trailer-only response case)
534 */
535 for (size_t i = 0; i < headers->count; i++) {
536 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
537 on_response_trailers_received(stream, headers);
538 return;
539 }
540 }
541
542 gpr_mu_lock(&s->mu);
543 memset(&s->state.rs.initial_metadata, 0,
544 sizeof(s->state.rs.initial_metadata));
545 grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata,
546 s->arena);
547 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
548 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
549 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
550 s->state.state_callback_received[OP_FAILED])) {
551 /* Do an extra read to trigger on_succeeded() callback in case connection
552 is closed */
553 GPR_ASSERT(s->state.rs.length_field_received == false);
554 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
555 s->state.rs.compressed = false;
556 s->state.rs.received_bytes = 0;
557 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
558 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
559 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
560 s->state.rs.remaining_bytes);
561 s->state.pending_read_from_cronet = true;
562 }
563 gpr_mu_unlock(&s->mu);
564 execute_from_storage(s);
565 }
566
567 /*
568 Cronet callback
569 */
on_write_completed(bidirectional_stream * stream,const char * data)570 static void on_write_completed(bidirectional_stream* stream, const char* data) {
571 grpc_core::ExecCtx exec_ctx;
572 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
573 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
574 gpr_mu_lock(&s->mu);
575 if (s->state.ws.write_buffer) {
576 gpr_free(s->state.ws.write_buffer);
577 s->state.ws.write_buffer = nullptr;
578 }
579 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
580 gpr_mu_unlock(&s->mu);
581 execute_from_storage(s);
582 }
583
584 /*
585 Cronet callback
586 */
on_read_completed(bidirectional_stream * stream,char * data,int count)587 static void on_read_completed(bidirectional_stream* stream, char* data,
588 int count) {
589 grpc_core::ExecCtx exec_ctx;
590 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
591 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
592 count);
593 gpr_mu_lock(&s->mu);
594 s->state.pending_read_from_cronet = false;
595 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
596 if (count > 0 && s->state.flush_read) {
597 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
598 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
599 GRPC_FLUSH_READ_SIZE);
600 s->state.pending_read_from_cronet = true;
601 gpr_mu_unlock(&s->mu);
602 } else if (count > 0) {
603 s->state.rs.received_bytes += count;
604 s->state.rs.remaining_bytes -= count;
605 if (s->state.rs.remaining_bytes > 0) {
606 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
607 s->state.state_op_done[OP_READ_REQ_MADE] = true;
608 bidirectional_stream_read(
609 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
610 s->state.rs.remaining_bytes);
611 s->state.pending_read_from_cronet = true;
612 gpr_mu_unlock(&s->mu);
613 } else {
614 gpr_mu_unlock(&s->mu);
615 execute_from_storage(s);
616 }
617 } else {
618 null_and_maybe_free_read_buffer(s);
619 s->state.rs.read_stream_closed = true;
620 gpr_mu_unlock(&s->mu);
621 execute_from_storage(s);
622 }
623 }
624
625 /*
626 Cronet callback
627 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)628 static void on_response_trailers_received(
629 bidirectional_stream* stream,
630 const bidirectional_stream_header_array* trailers) {
631 grpc_core::ExecCtx exec_ctx;
632 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
633 trailers);
634 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
635 grpc_cronet_transport* t = s->curr_ct;
636 gpr_mu_lock(&s->mu);
637 memset(&s->state.rs.trailing_metadata, 0,
638 sizeof(s->state.rs.trailing_metadata));
639 s->state.rs.trailing_metadata_valid = false;
640 grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata,
641 s->arena);
642 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
643 if (trailers->count > 0) {
644 s->state.rs.trailing_metadata_valid = true;
645 }
646 for (size_t i = 0; i < trailers->count; i++) {
647 if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
648 0 != strcmp(trailers->headers[i].value, "0")) {
649 s->state.fail_state = true;
650 maybe_flush_read(s);
651 }
652 }
653 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
654 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
655 * trigger on_succeeded */
656 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
657 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
658 s->state.state_callback_received[OP_FAILED])) {
659 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
660 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
661 bidirectional_stream_write(s->cbs, "", 0, true);
662 if (t->use_packet_coalescing) {
663 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
664 bidirectional_stream_flush(s->cbs);
665 }
666 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
667
668 gpr_mu_unlock(&s->mu);
669 } else {
670 gpr_mu_unlock(&s->mu);
671 execute_from_storage(s);
672 }
673 }
674
675 /*
676 Utility function that takes the data from s->write_slice_buffer and assembles
677 into a contiguous byte stream with 5 byte gRPC header prepended.
678 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)679 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
680 char** pp_write_buffer,
681 size_t* p_write_buffer_size, uint32_t flags) {
682 grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
683 size_t length = GRPC_SLICE_LENGTH(slice);
684 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
685 /* This is freed in the on_write_completed callback */
686 char* write_buffer =
687 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
688 *pp_write_buffer = write_buffer;
689 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
690 /* Append 5 byte header */
691 /* Compressed flag */
692 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
693 /* Message length */
694 *p++ = static_cast<uint8_t>(length >> 24);
695 *p++ = static_cast<uint8_t>(length >> 16);
696 *p++ = static_cast<uint8_t>(length >> 8);
697 *p++ = static_cast<uint8_t>(length);
698 /* append actual data */
699 memcpy(p, GRPC_SLICE_START_PTR(slice), length);
700 grpc_slice_unref_internal(slice);
701 }
702
703 /*
704 Convert metadata in a format that Cronet can consume
705 */
convert_metadata_to_cronet_headers(grpc_linked_mdelem * head,const char * host,char ** pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)706 static void convert_metadata_to_cronet_headers(
707 grpc_linked_mdelem* head, const char* host, char** pp_url,
708 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
709 const char** method) {
710 grpc_linked_mdelem* curr = head;
711 /* Walk the linked list and get number of header fields */
712 size_t num_headers_available = 0;
713 while (curr != nullptr) {
714 curr = curr->next;
715 num_headers_available++;
716 }
717 /* Allocate enough memory. It is freed in the on_stream_ready callback
718 */
719 bidirectional_stream_header* headers =
720 static_cast<bidirectional_stream_header*>(gpr_malloc(
721 sizeof(bidirectional_stream_header) * num_headers_available));
722 *pp_headers = headers;
723
724 /* Walk the linked list again, this time copying the header fields.
725 s->num_headers can be less than num_headers_available, as some headers
726 are not used for cronet.
727 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
728 */
729 curr = head;
730 size_t num_headers = 0;
731 while (num_headers < num_headers_available) {
732 grpc_mdelem mdelem = curr->md;
733 curr = curr->next;
734 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
735 char* value;
736 if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) {
737 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
738 value = grpc_slice_to_c_string(wire_value);
739 grpc_slice_unref_internal(wire_value);
740 } else {
741 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
742 }
743 if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
744 grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) {
745 /* Cronet populates these fields on its own */
746 gpr_free(key);
747 gpr_free(value);
748 continue;
749 }
750 if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
751 if (grpc_slice_eq(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
752 *method = "PUT";
753 } else {
754 /* POST method in default*/
755 *method = "POST";
756 }
757 gpr_free(key);
758 gpr_free(value);
759 continue;
760 }
761 if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
762 /* Create URL by appending :path value to the hostname */
763 gpr_asprintf(pp_url, "https://%s%s", host, value);
764 gpr_free(key);
765 gpr_free(value);
766 continue;
767 }
768 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
769 headers[num_headers].key = key;
770 headers[num_headers].value = value;
771 num_headers++;
772 if (curr == nullptr) {
773 break;
774 }
775 }
776 *p_num_headers = num_headers;
777 }
778
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)779 static void parse_grpc_header(const uint8_t* data, int* length,
780 bool* compressed) {
781 const uint8_t c = *data;
782 const uint8_t* p = data + 1;
783 *compressed = ((c & 0x01) == 0x01);
784 *length = 0;
785 *length |= (*p++) << 24;
786 *length |= (*p++) << 16;
787 *length |= (*p++) << 8;
788 *length |= (*p++);
789 }
790
header_has_authority(grpc_linked_mdelem * head)791 static bool header_has_authority(grpc_linked_mdelem* head) {
792 while (head != nullptr) {
793 if (grpc_slice_eq(GRPC_MDKEY(head->md), GRPC_MDSTR_AUTHORITY)) {
794 return true;
795 }
796 head = head->next;
797 }
798 return false;
799 }
800
801 /*
802 Op Execution: Decide if one of the actions contained in the stream op can be
803 executed. This is the heart of the state machine.
804 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)805 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
806 struct stream_obj* s, struct op_state* op_state,
807 enum e_op_id op_id) {
808 struct op_state* stream_state = &s->state;
809 grpc_cronet_transport* t = s->curr_ct;
810 bool result = true;
811 /* When call is canceled, every op can be run, except under following
812 conditions
813 */
814 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
815 stream_state->state_callback_received[OP_FAILED];
816 if (is_canceled_or_failed) {
817 if (op_id == OP_SEND_INITIAL_METADATA) {
818 CRONET_LOG(GPR_DEBUG, "Because");
819 result = false;
820 }
821 if (op_id == OP_SEND_MESSAGE) {
822 CRONET_LOG(GPR_DEBUG, "Because");
823 result = false;
824 }
825 if (op_id == OP_SEND_TRAILING_METADATA) {
826 CRONET_LOG(GPR_DEBUG, "Because");
827 result = false;
828 }
829 if (op_id == OP_CANCEL_ERROR) {
830 CRONET_LOG(GPR_DEBUG, "Because");
831 result = false;
832 }
833 /* already executed */
834 if (op_id == OP_RECV_INITIAL_METADATA &&
835 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
836 CRONET_LOG(GPR_DEBUG, "Because");
837 result = false;
838 }
839 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
840 CRONET_LOG(GPR_DEBUG, "Because");
841 result = false;
842 }
843 if (op_id == OP_RECV_TRAILING_METADATA &&
844 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
845 CRONET_LOG(GPR_DEBUG, "Because");
846 result = false;
847 }
848 /* ON_COMPLETE can be processed if one of the following conditions is met:
849 * 1. the stream failed
850 * 2. the stream is cancelled, and the callback is received
851 * 3. the stream succeeded before cancel is effective
852 * 4. the stream is cancelled, and the stream is never started */
853 if (op_id == OP_ON_COMPLETE &&
854 !(stream_state->state_callback_received[OP_FAILED] ||
855 stream_state->state_callback_received[OP_CANCELED] ||
856 stream_state->state_callback_received[OP_SUCCEEDED] ||
857 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
858 CRONET_LOG(GPR_DEBUG, "Because");
859 result = false;
860 }
861 } else if (op_id == OP_SEND_INITIAL_METADATA) {
862 /* already executed */
863 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
864 } else if (op_id == OP_RECV_INITIAL_METADATA) {
865 /* already executed */
866 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
867 /* we haven't sent headers yet. */
868 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
869 result = false;
870 /* we haven't received headers yet. */
871 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
872 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
873 result = false;
874 } else if (op_id == OP_SEND_MESSAGE) {
875 /* already executed (note we're checking op specific state, not stream
876 state) */
877 if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
878 /* we haven't sent headers yet. */
879 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
880 result = false;
881 } else if (op_id == OP_RECV_MESSAGE) {
882 /* already executed */
883 if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
884 /* we haven't received headers yet. */
885 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
886 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
887 result = false;
888 } else if (op_id == OP_RECV_TRAILING_METADATA) {
889 /* already executed */
890 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
891 /* we have asked for but haven't received message yet. */
892 else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
893 !stream_state->state_op_done[OP_RECV_MESSAGE])
894 result = false;
895 /* we haven't received trailers yet. */
896 else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
897 result = false;
898 /* we haven't received on_succeeded yet. */
899 else if (!stream_state->state_callback_received[OP_SUCCEEDED])
900 result = false;
901 } else if (op_id == OP_SEND_TRAILING_METADATA) {
902 /* already executed */
903 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
904 /* we haven't sent initial metadata yet */
905 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
906 result = false;
907 /* we haven't sent message yet */
908 else if (stream_state->pending_send_message &&
909 !stream_state->state_op_done[OP_SEND_MESSAGE])
910 result = false;
911 /* we haven't got on_write_completed for the send yet */
912 else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
913 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
914 !(t->use_packet_coalescing &&
915 stream_state->pending_write_for_trailer))
916 result = false;
917 } else if (op_id == OP_CANCEL_ERROR) {
918 /* already executed */
919 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
920 } else if (op_id == OP_ON_COMPLETE) {
921 /* already executed (note we're checking op specific state, not stream
922 state) */
923 if (op_state->state_op_done[OP_ON_COMPLETE]) {
924 CRONET_LOG(GPR_DEBUG, "Because");
925 result = false;
926 }
927 /* Check if every op that was asked for is done. */
928 /* TODO(muxi): We should not consider the recv ops here, since they
929 * have their own callbacks. We should invoke a batch's on_complete
930 * as soon as all of the batch's send ops are complete, even if
931 * there are still recv ops pending. */
932 else if (curr_op->send_initial_metadata &&
933 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
934 CRONET_LOG(GPR_DEBUG, "Because");
935 result = false;
936 } else if (curr_op->send_message &&
937 !op_state->state_op_done[OP_SEND_MESSAGE]) {
938 CRONET_LOG(GPR_DEBUG, "Because");
939 result = false;
940 } else if (curr_op->send_message &&
941 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
942 CRONET_LOG(GPR_DEBUG, "Because");
943 result = false;
944 } else if (curr_op->send_trailing_metadata &&
945 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
946 CRONET_LOG(GPR_DEBUG, "Because");
947 result = false;
948 } else if (curr_op->recv_initial_metadata &&
949 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
950 CRONET_LOG(GPR_DEBUG, "Because");
951 result = false;
952 } else if (curr_op->recv_message &&
953 !op_state->state_op_done[OP_RECV_MESSAGE]) {
954 CRONET_LOG(GPR_DEBUG, "Because");
955 result = false;
956 } else if (curr_op->cancel_stream &&
957 !stream_state->state_callback_received[OP_CANCELED]) {
958 CRONET_LOG(GPR_DEBUG, "Because");
959 result = false;
960 } else if (curr_op->recv_trailing_metadata) {
961 /* We aren't done with trailing metadata yet */
962 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
963 CRONET_LOG(GPR_DEBUG, "Because");
964 result = false;
965 }
966 /* We've asked for actual message in an earlier op, and it hasn't been
967 delivered yet. */
968 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
969 /* If this op is not the one asking for read, (which means some earlier
970 op has asked), and the read hasn't been delivered. */
971 if (!curr_op->recv_message &&
972 !stream_state->state_callback_received[OP_SUCCEEDED]) {
973 CRONET_LOG(GPR_DEBUG, "Because");
974 result = false;
975 }
976 }
977 }
978 /* We should see at least one on_write_completed for the trailers that we
979 sent */
980 else if (curr_op->send_trailing_metadata &&
981 !stream_state->state_callback_received[OP_SEND_MESSAGE])
982 result = false;
983 }
984 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
985 result ? "YES" : "NO");
986 return result;
987 }
988
989 /*
990 TODO (makdharma): Break down this function in smaller chunks for readability.
991 */
execute_stream_op(struct op_and_state * oas)992 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
993 grpc_transport_stream_op_batch* stream_op = &oas->op;
994 struct stream_obj* s = oas->s;
995 grpc_cronet_transport* t = s->curr_ct;
996 struct op_state* stream_state = &s->state;
997 enum e_op_result result = NO_ACTION_POSSIBLE;
998 if (stream_op->send_initial_metadata &&
999 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1000 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1001 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1002 * on_failed */
1003 GPR_ASSERT(s->cbs == nullptr);
1004 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1005 s->cbs =
1006 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1007 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1008 if (t->use_packet_coalescing) {
1009 bidirectional_stream_disable_auto_flush(s->cbs, true);
1010 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1011 }
1012 char* url = nullptr;
1013 const char* method = "POST";
1014 s->header_array.headers = nullptr;
1015 convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata
1016 .send_initial_metadata->list.head,
1017 t->host, &url, &s->header_array.headers,
1018 &s->header_array.count, &method);
1019 s->header_array.capacity = s->header_array.count;
1020 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1021 bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1022 if (url) {
1023 gpr_free(url);
1024 }
1025 unsigned int header_index;
1026 for (header_index = 0; header_index < s->header_array.count;
1027 header_index++) {
1028 gpr_free((void*)s->header_array.headers[header_index].key);
1029 gpr_free((void*)s->header_array.headers[header_index].value);
1030 }
1031 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1032 if (t->use_packet_coalescing) {
1033 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1034 s->state.flush_cronet_when_ready = true;
1035 }
1036 }
1037 result = ACTION_TAKEN_WITH_CALLBACK;
1038 } else if (stream_op->send_message &&
1039 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1040 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1041 stream_state->pending_send_message = false;
1042 if (stream_state->state_callback_received[OP_FAILED]) {
1043 result = NO_ACTION_POSSIBLE;
1044 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1045 } else {
1046 grpc_slice_buffer write_slice_buffer;
1047 grpc_slice slice;
1048 grpc_slice_buffer_init(&write_slice_buffer);
1049 if (1 != stream_op->payload->send_message.send_message->Next(
1050 stream_op->payload->send_message.send_message->length(),
1051 nullptr)) {
1052 /* Should never reach here */
1053 GPR_ASSERT(false);
1054 }
1055 if (GRPC_ERROR_NONE !=
1056 stream_op->payload->send_message.send_message->Pull(&slice)) {
1057 /* Should never reach here */
1058 GPR_ASSERT(false);
1059 }
1060 grpc_slice_buffer_add(&write_slice_buffer, slice);
1061 if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1062 /* Empty request not handled yet */
1063 gpr_log(GPR_ERROR, "Empty request is not supported");
1064 GPR_ASSERT(write_slice_buffer.count == 1);
1065 }
1066 if (write_slice_buffer.count > 0) {
1067 size_t write_buffer_size;
1068 create_grpc_frame(
1069 &write_slice_buffer, &stream_state->ws.write_buffer,
1070 &write_buffer_size,
1071 stream_op->payload->send_message.send_message->flags());
1072 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1073 stream_state->ws.write_buffer);
1074 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1075 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1076 static_cast<int>(write_buffer_size), false);
1077 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1078 if (t->use_packet_coalescing) {
1079 if (!stream_op->send_trailing_metadata) {
1080 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1081 bidirectional_stream_flush(s->cbs);
1082 result = ACTION_TAKEN_WITH_CALLBACK;
1083 } else {
1084 stream_state->pending_write_for_trailer = true;
1085 result = ACTION_TAKEN_NO_CALLBACK;
1086 }
1087 } else {
1088 result = ACTION_TAKEN_WITH_CALLBACK;
1089 }
1090 } else {
1091 result = NO_ACTION_POSSIBLE;
1092 }
1093 }
1094 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1095 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1096 stream_op->payload->send_message.send_message.reset();
1097 } else if (stream_op->send_trailing_metadata &&
1098 op_can_be_run(stream_op, s, &oas->state,
1099 OP_SEND_TRAILING_METADATA)) {
1100 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1101 if (stream_state->state_callback_received[OP_FAILED]) {
1102 result = NO_ACTION_POSSIBLE;
1103 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1104 } else {
1105 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1106 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1107 bidirectional_stream_write(s->cbs, "", 0, true);
1108 if (t->use_packet_coalescing) {
1109 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1110 bidirectional_stream_flush(s->cbs);
1111 }
1112 result = ACTION_TAKEN_WITH_CALLBACK;
1113 }
1114 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1115 } else if (stream_op->recv_initial_metadata &&
1116 op_can_be_run(stream_op, s, &oas->state,
1117 OP_RECV_INITIAL_METADATA)) {
1118 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1119 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1120 GRPC_CLOSURE_SCHED(
1121 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1122 GRPC_ERROR_NONE);
1123 } else if (stream_state->state_callback_received[OP_FAILED]) {
1124 GRPC_CLOSURE_SCHED(
1125 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1126 GRPC_ERROR_NONE);
1127 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1128 GRPC_CLOSURE_SCHED(
1129 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1130 GRPC_ERROR_NONE);
1131 } else {
1132 grpc_chttp2_incoming_metadata_buffer_publish(
1133 &oas->s->state.rs.initial_metadata,
1134 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1135 GRPC_CLOSURE_SCHED(
1136 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1137 GRPC_ERROR_NONE);
1138 }
1139 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1140 result = ACTION_TAKEN_NO_CALLBACK;
1141 } else if (stream_op->recv_message &&
1142 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1143 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1144 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1145 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1146 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1147 GRPC_ERROR_NONE);
1148 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1149 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1150 result = ACTION_TAKEN_NO_CALLBACK;
1151 } else if (stream_state->state_callback_received[OP_FAILED]) {
1152 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1153 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1154 GRPC_ERROR_NONE);
1155 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1156 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1157 result = ACTION_TAKEN_NO_CALLBACK;
1158 } else if (stream_state->rs.read_stream_closed == true) {
1159 /* No more data will be received */
1160 CRONET_LOG(GPR_DEBUG, "read stream closed");
1161 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1162 GRPC_ERROR_NONE);
1163 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1164 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1165 result = ACTION_TAKEN_NO_CALLBACK;
1166 } else if (stream_state->flush_read) {
1167 CRONET_LOG(GPR_DEBUG, "flush read");
1168 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1169 GRPC_ERROR_NONE);
1170 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1171 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1172 result = ACTION_TAKEN_NO_CALLBACK;
1173 } else if (stream_state->rs.length_field_received == false) {
1174 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1175 stream_state->rs.remaining_bytes == 0) {
1176 /* Start a read operation for data */
1177 stream_state->rs.length_field_received = true;
1178 parse_grpc_header(
1179 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1180 &stream_state->rs.length_field, &stream_state->rs.compressed);
1181 CRONET_LOG(GPR_DEBUG, "length field = %d",
1182 stream_state->rs.length_field);
1183 if (stream_state->rs.length_field > 0) {
1184 stream_state->rs.read_buffer = static_cast<char*>(
1185 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1186 GPR_ASSERT(stream_state->rs.read_buffer);
1187 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1188 stream_state->rs.received_bytes = 0;
1189 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1190 stream_state->state_op_done[OP_READ_REQ_MADE] =
1191 true; /* Indicates that at least one read request has been made */
1192 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1193 stream_state->rs.remaining_bytes);
1194 stream_state->pending_read_from_cronet = true;
1195 result = ACTION_TAKEN_WITH_CALLBACK;
1196 } else {
1197 stream_state->rs.remaining_bytes = 0;
1198 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1199 /* Clean up read_slice_buffer in case there is unread data. */
1200 grpc_slice_buffer_destroy_internal(
1201 &stream_state->rs.read_slice_buffer);
1202 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1203 uint32_t flags = 0;
1204 if (stream_state->rs.compressed) {
1205 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1206 }
1207 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1208 stream_op->payload->recv_message.recv_message->reset(
1209 stream_state->rs.sbs.get());
1210 GRPC_CLOSURE_SCHED(
1211 stream_op->payload->recv_message.recv_message_ready,
1212 GRPC_ERROR_NONE);
1213 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1214 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1215
1216 /* Extra read to trigger on_succeed */
1217 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1218 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1219 stream_state->rs.received_bytes = 0;
1220 stream_state->rs.compressed = false;
1221 stream_state->rs.length_field_received = false;
1222 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1223 stream_state->state_op_done[OP_READ_REQ_MADE] =
1224 true; /* Indicates that at least one read request has been made */
1225 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1226 stream_state->rs.remaining_bytes);
1227 stream_state->pending_read_from_cronet = true;
1228 result = ACTION_TAKEN_NO_CALLBACK;
1229 }
1230 } else if (stream_state->rs.remaining_bytes == 0) {
1231 /* Start a read operation for first 5 bytes (GRPC header) */
1232 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1233 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1234 stream_state->rs.received_bytes = 0;
1235 stream_state->rs.compressed = false;
1236 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1237 stream_state->state_op_done[OP_READ_REQ_MADE] =
1238 true; /* Indicates that at least one read request has been made */
1239 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1240 stream_state->rs.remaining_bytes);
1241 stream_state->pending_read_from_cronet = true;
1242 result = ACTION_TAKEN_WITH_CALLBACK;
1243 } else {
1244 result = NO_ACTION_POSSIBLE;
1245 }
1246 } else if (stream_state->rs.remaining_bytes == 0) {
1247 CRONET_LOG(GPR_DEBUG, "read operation complete");
1248 grpc_slice read_data_slice =
1249 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1250 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1251 memcpy(dst_p, stream_state->rs.read_buffer,
1252 static_cast<size_t>(stream_state->rs.length_field));
1253 null_and_maybe_free_read_buffer(s);
1254 /* Clean up read_slice_buffer in case there is unread data. */
1255 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1256 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1257 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1258 read_data_slice);
1259 uint32_t flags = 0;
1260 if (stream_state->rs.compressed) {
1261 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1262 }
1263 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1264 stream_op->payload->recv_message.recv_message->reset(
1265 stream_state->rs.sbs.get());
1266 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1267 GRPC_ERROR_NONE);
1268 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1269 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1270 /* Do an extra read to trigger on_succeeded() callback in case connection
1271 is closed */
1272 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1273 stream_state->rs.compressed = false;
1274 stream_state->rs.received_bytes = 0;
1275 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1276 stream_state->rs.length_field_received = false;
1277 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1278 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1279 stream_state->rs.remaining_bytes);
1280 stream_state->pending_read_from_cronet = true;
1281 result = ACTION_TAKEN_NO_CALLBACK;
1282 }
1283 } else if (stream_op->recv_trailing_metadata &&
1284 op_can_be_run(stream_op, s, &oas->state,
1285 OP_RECV_TRAILING_METADATA)) {
1286 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1287 grpc_error* error = GRPC_ERROR_NONE;
1288 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1289 error = GRPC_ERROR_REF(stream_state->cancel_error);
1290 } else if (stream_state->state_callback_received[OP_FAILED]) {
1291 error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1292 } else if (oas->s->state.rs.trailing_metadata_valid) {
1293 grpc_chttp2_incoming_metadata_buffer_publish(
1294 &oas->s->state.rs.trailing_metadata,
1295 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1296 stream_state->rs.trailing_metadata_valid = false;
1297 }
1298 GRPC_CLOSURE_SCHED(
1299 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1300 error);
1301 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1302 result = ACTION_TAKEN_NO_CALLBACK;
1303 } else if (stream_op->cancel_stream &&
1304 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1305 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1306 if (s->cbs) {
1307 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1308 bidirectional_stream_cancel(s->cbs);
1309 result = ACTION_TAKEN_WITH_CALLBACK;
1310 } else {
1311 result = ACTION_TAKEN_NO_CALLBACK;
1312 }
1313 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1314 if (!stream_state->cancel_error) {
1315 stream_state->cancel_error =
1316 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1317 }
1318 } else if (stream_op->on_complete &&
1319 op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1320 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1321 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1322 GRPC_CLOSURE_SCHED(stream_op->on_complete,
1323 GRPC_ERROR_REF(stream_state->cancel_error));
1324 } else if (stream_state->state_callback_received[OP_FAILED]) {
1325 GRPC_CLOSURE_SCHED(
1326 stream_op->on_complete,
1327 make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1328 } else {
1329 /* All actions in this stream_op are complete. Call the on_complete
1330 * callback
1331 */
1332 GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
1333 }
1334 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1335 oas->done = true;
1336 /* reset any send message state, only if this ON_COMPLETE is about a send.
1337 */
1338 if (stream_op->send_message) {
1339 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1340 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1341 }
1342 result = ACTION_TAKEN_NO_CALLBACK;
1343 /* If this is the on_complete callback being called for a received message -
1344 make a note */
1345 if (stream_op->recv_message)
1346 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1347 } else {
1348 result = NO_ACTION_POSSIBLE;
1349 }
1350 return result;
1351 }
1352
1353 /*
1354 Functions used by upper layers to access transport functionality.
1355 */
1356
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,gpr_arena * arena)1357 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1358 grpc_stream_refcount* refcount, const void* server_data,
1359 gpr_arena* arena) {
1360 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1361
1362 s->refcount = refcount;
1363 GRPC_CRONET_STREAM_REF(s, "cronet transport");
1364 memset(&s->storage, 0, sizeof(s->storage));
1365 s->storage.head = nullptr;
1366 memset(&s->state, 0, sizeof(s->state));
1367 s->curr_op = nullptr;
1368 s->cbs = nullptr;
1369 memset(&s->header_array, 0, sizeof(s->header_array));
1370 memset(&s->state.rs, 0, sizeof(s->state.rs));
1371 memset(&s->state.ws, 0, sizeof(s->state.ws));
1372 memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
1373 memset(s->state.state_callback_received, 0,
1374 sizeof(s->state.state_callback_received));
1375 s->state.fail_state = s->state.flush_read = false;
1376 s->state.cancel_error = nullptr;
1377 s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
1378 s->state.pending_send_message = false;
1379 s->state.pending_recv_trailing_metadata = false;
1380 s->state.pending_read_from_cronet = false;
1381
1382 s->curr_gs = gs;
1383 s->curr_ct = reinterpret_cast<grpc_cronet_transport*>(gt);
1384 s->arena = arena;
1385
1386 gpr_mu_init(&s->mu);
1387 return 0;
1388 }
1389
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1390 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1391 grpc_pollset* pollset) {}
1392
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1393 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1394 grpc_pollset_set* pollset_set) {}
1395
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1396 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1397 grpc_transport_stream_op_batch* op) {
1398 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1399 if (op->send_initial_metadata &&
1400 header_has_authority(op->payload->send_initial_metadata
1401 .send_initial_metadata->list.head)) {
1402 /* Cronet does not support :authority header field. We cancel the call when
1403 this field is present in metadata */
1404 if (op->recv_initial_metadata) {
1405 GRPC_CLOSURE_SCHED(
1406 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1407 GRPC_ERROR_CANCELLED);
1408 }
1409 if (op->recv_message) {
1410 GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1411 GRPC_ERROR_CANCELLED);
1412 }
1413 if (op->recv_trailing_metadata) {
1414 GRPC_CLOSURE_SCHED(
1415 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1416 GRPC_ERROR_CANCELLED);
1417 }
1418 GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
1419 return;
1420 }
1421 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1422 add_to_storage(s, op);
1423 execute_from_storage(s);
1424 }
1425
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1426 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1427 grpc_closure* then_schedule_closure) {
1428 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1429 null_and_maybe_free_read_buffer(s);
1430 /* Clean up read_slice_buffer in case there is unread data. */
1431 grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer);
1432 GRPC_ERROR_UNREF(s->state.cancel_error);
1433 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1434 }
1435
destroy_transport(grpc_transport * gt)1436 static void destroy_transport(grpc_transport* gt) {}
1437
get_endpoint(grpc_transport * gt)1438 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1439
perform_op(grpc_transport * gt,grpc_transport_op * op)1440 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1441
1442 static const grpc_transport_vtable grpc_cronet_vtable = {
1443 sizeof(stream_obj),
1444 "cronet_http",
1445 init_stream,
1446 set_pollset_do_nothing,
1447 set_pollset_set_do_nothing,
1448 perform_stream_op,
1449 perform_op,
1450 destroy_stream,
1451 destroy_transport,
1452 get_endpoint};
1453
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1454 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1455 const grpc_channel_args* args,
1456 void* reserved) {
1457 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1458 gpr_malloc(sizeof(grpc_cronet_transport)));
1459 if (!ct) {
1460 goto error;
1461 }
1462 ct->base.vtable = &grpc_cronet_vtable;
1463 ct->engine = static_cast<stream_engine*>(engine);
1464 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1465 if (!ct->host) {
1466 goto error;
1467 }
1468 strcpy(ct->host, target);
1469
1470 ct->use_packet_coalescing = true;
1471 if (args) {
1472 for (size_t i = 0; i < args->num_args; i++) {
1473 if (0 ==
1474 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1475 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1476 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1477 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1478 } else {
1479 ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1480 }
1481 }
1482 }
1483 }
1484
1485 return &ct->base;
1486
1487 error:
1488 if (ct) {
1489 if (ct->host) {
1490 gpr_free(ct->host);
1491 }
1492 gpr_free(ct);
1493 }
1494
1495 return nullptr;
1496 }
1497