1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <string>
24
25 #include "absl/strings/str_cat.h"
26
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30
31 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
32 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
33 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
34 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
35 #include "src/core/lib/debug/trace.h"
36 #include "src/core/lib/gpr/string.h"
37 #include "src/core/lib/gprpp/manual_constructor.h"
38 #include "src/core/lib/iomgr/endpoint.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/lib/surface/channel.h"
43 #include "src/core/lib/surface/validate_metadata.h"
44 #include "src/core/lib/transport/metadata_batch.h"
45 #include "src/core/lib/transport/static_metadata.h"
46 #include "src/core/lib/transport/timeout_encoding.h"
47 #include "src/core/lib/transport/transport_impl.h"
48
49 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
50
51 #define GRPC_HEADER_SIZE_IN_BYTES 5
52 #define GRPC_FLUSH_READ_SIZE 4096
53
54 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
55 #define CRONET_LOG(...) \
56 do { \
57 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
58 } while (0)
59
60 enum e_op_result {
61 ACTION_TAKEN_WITH_CALLBACK,
62 ACTION_TAKEN_NO_CALLBACK,
63 NO_ACTION_POSSIBLE
64 };
65
66 enum e_op_id {
67 OP_SEND_INITIAL_METADATA = 0,
68 OP_SEND_MESSAGE,
69 OP_SEND_TRAILING_METADATA,
70 OP_RECV_MESSAGE,
71 OP_RECV_INITIAL_METADATA,
72 OP_RECV_TRAILING_METADATA,
73 OP_CANCEL_ERROR,
74 OP_ON_COMPLETE,
75 OP_FAILED,
76 OP_SUCCEEDED,
77 OP_CANCELED,
78 OP_RECV_MESSAGE_AND_ON_COMPLETE,
79 OP_READ_REQ_MADE,
80 OP_NUM_OPS
81 };
82
83 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
84
85 static void on_stream_ready(bidirectional_stream*);
86 static void on_response_headers_received(
87 bidirectional_stream*, const bidirectional_stream_header_array*,
88 const char*);
89 static void on_write_completed(bidirectional_stream*, const char*);
90 static void on_read_completed(bidirectional_stream*, char*, int);
91 static void on_response_trailers_received(
92 bidirectional_stream*, const bidirectional_stream_header_array*);
93 static void on_succeeded(bidirectional_stream*);
94 static void on_failed(bidirectional_stream*, int);
95 static void on_canceled(bidirectional_stream*);
96 static bidirectional_stream_callback cronet_callbacks = {
97 on_stream_ready,
98 on_response_headers_received,
99 on_read_completed,
100 on_write_completed,
101 on_response_trailers_received,
102 on_succeeded,
103 on_failed,
104 on_canceled};
105
106 /* Cronet transport object */
107 struct grpc_cronet_transport {
108 grpc_transport base; /* must be first element in this structure */
109 stream_engine* engine;
110 char* host;
111 bool use_packet_coalescing;
112 };
113 typedef struct grpc_cronet_transport grpc_cronet_transport;
114
115 /* TODO (makdharma): reorder structure for memory efficiency per
116 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
117 struct read_state {
read_stateread_state118 read_state(grpc_core::Arena* arena)
119 : trailing_metadata(arena), initial_metadata(arena) {
120 grpc_slice_buffer_init(&read_slice_buffer);
121 }
122
123 /* vars to store data coming from server */
124 char* read_buffer = nullptr;
125 bool length_field_received = false;
126 int received_bytes = 0;
127 int remaining_bytes = 0;
128 int length_field = 0;
129 bool compressed = 0;
130 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
131 char* payload_field = nullptr;
132 bool read_stream_closed = 0;
133
134 /* vars for holding data destined for the application */
135 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
136 grpc_slice_buffer read_slice_buffer;
137
138 /* vars for trailing metadata */
139 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
140 bool trailing_metadata_valid = false;
141
142 /* vars for initial metadata */
143 grpc_chttp2_incoming_metadata_buffer initial_metadata;
144 };
145
146 struct write_state {
147 char* write_buffer = nullptr;
148 };
149
150 /* track state of one stream op */
151 struct op_state {
op_stateop_state152 op_state(grpc_core::Arena* arena) : rs(arena) {}
153
154 bool state_op_done[OP_NUM_OPS] = {};
155 bool state_callback_received[OP_NUM_OPS] = {};
156 /* A non-zero gRPC status code has been seen */
157 bool fail_state = false;
158 /* Transport is discarding all buffered messages */
159 bool flush_read = false;
160 bool flush_cronet_when_ready = false;
161 bool pending_write_for_trailer = false;
162 bool pending_send_message = false;
163 /* User requested RECV_TRAILING_METADATA */
164 bool pending_recv_trailing_metadata = false;
165 /* Cronet has not issued a callback of a bidirectional read */
166 bool pending_read_from_cronet = false;
167 grpc_error* cancel_error = GRPC_ERROR_NONE;
168 /* data structure for storing data coming from server */
169 struct read_state rs;
170 /* data structure for storing data going to the server */
171 struct write_state ws;
172 };
173
174 struct stream_obj;
175
176 struct op_and_state {
177 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
178
179 grpc_transport_stream_op_batch op;
180 struct op_state state;
181 bool done = false;
182 struct stream_obj* s; /* Pointer back to the stream object */
183 /* next op_and_state in the linked list */
184 struct op_and_state* next = nullptr;
185 };
186
187 struct op_storage {
188 int num_pending_ops = 0;
189 struct op_and_state* head = nullptr;
190 };
191
192 struct stream_obj {
193 stream_obj(grpc_transport* gt, grpc_stream* gs,
194 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
195 ~stream_obj();
196
197 grpc_core::Arena* arena;
198 struct op_and_state* oas = nullptr;
199 grpc_transport_stream_op_batch* curr_op = nullptr;
200 grpc_cronet_transport* curr_ct;
201 grpc_stream* curr_gs;
202 bidirectional_stream* cbs = nullptr;
203 bidirectional_stream_header_array header_array =
204 bidirectional_stream_header_array(); // Zero-initialize the structure.
205
206 /* Stream level state. Some state will be tracked both at stream and stream_op
207 * level */
208 struct op_state state;
209
210 /* OP storage */
211 struct op_storage storage;
212
213 /* Mutex to protect storage */
214 gpr_mu mu;
215
216 /* Refcount object of the stream */
217 grpc_stream_refcount* refcount;
218 };
219
220 #ifndef NDEBUG
221 #define GRPC_CRONET_STREAM_REF(stream, reason) \
222 grpc_cronet_stream_ref((stream), (reason))
223 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
224 grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)225 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
226 grpc_stream_ref(s->refcount, reason);
227 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)228 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
229 grpc_stream_unref(s->refcount, reason);
230 }
231 #else
232 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
233 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
234 grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)235 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)236 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
237 #endif
238
239 static enum e_op_result execute_stream_op(struct op_and_state* oas);
240
241 /*
242 Utility function to translate enum into string for printing
243 */
op_result_string(enum e_op_result i)244 static const char* op_result_string(enum e_op_result i) {
245 switch (i) {
246 case ACTION_TAKEN_WITH_CALLBACK:
247 return "ACTION_TAKEN_WITH_CALLBACK";
248 case ACTION_TAKEN_NO_CALLBACK:
249 return "ACTION_TAKEN_NO_CALLBACK";
250 case NO_ACTION_POSSIBLE:
251 return "NO_ACTION_POSSIBLE";
252 }
253 GPR_UNREACHABLE_CODE(return "UNKNOWN");
254 }
255
op_id_string(enum e_op_id i)256 static const char* op_id_string(enum e_op_id i) {
257 switch (i) {
258 case OP_SEND_INITIAL_METADATA:
259 return "OP_SEND_INITIAL_METADATA";
260 case OP_SEND_MESSAGE:
261 return "OP_SEND_MESSAGE";
262 case OP_SEND_TRAILING_METADATA:
263 return "OP_SEND_TRAILING_METADATA";
264 case OP_RECV_MESSAGE:
265 return "OP_RECV_MESSAGE";
266 case OP_RECV_INITIAL_METADATA:
267 return "OP_RECV_INITIAL_METADATA";
268 case OP_RECV_TRAILING_METADATA:
269 return "OP_RECV_TRAILING_METADATA";
270 case OP_CANCEL_ERROR:
271 return "OP_CANCEL_ERROR";
272 case OP_ON_COMPLETE:
273 return "OP_ON_COMPLETE";
274 case OP_FAILED:
275 return "OP_FAILED";
276 case OP_SUCCEEDED:
277 return "OP_SUCCEEDED";
278 case OP_CANCELED:
279 return "OP_CANCELED";
280 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
281 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
282 case OP_READ_REQ_MADE:
283 return "OP_READ_REQ_MADE";
284 case OP_NUM_OPS:
285 return "OP_NUM_OPS";
286 }
287 return "UNKNOWN";
288 }
289
null_and_maybe_free_read_buffer(stream_obj * s)290 static void null_and_maybe_free_read_buffer(stream_obj* s) {
291 if (s->state.rs.read_buffer &&
292 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
293 gpr_free(s->state.rs.read_buffer);
294 }
295 s->state.rs.read_buffer = nullptr;
296 }
297
maybe_flush_read(stream_obj * s)298 static void maybe_flush_read(stream_obj* s) {
299 /* To enter flush read state (discarding all the buffered messages in
300 * transport layer), two conditions must be satisfied: 1) non-zero grpc status
301 * has been received, and 2) an op requesting the status code
302 * (RECV_TRAILING_METADATA) is issued by the user. (See
303 * doc/status_ordering.md) */
304 /* Whenever the evaluation of any of the two condition is changed, we check
305 * whether we should enter the flush read state. */
306 if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
307 if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
308 CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
309 s->state.flush_read = true;
310 null_and_maybe_free_read_buffer(s);
311 s->state.rs.read_buffer =
312 static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
313 if (!s->state.pending_read_from_cronet) {
314 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
315 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
316 GRPC_FLUSH_READ_SIZE);
317 s->state.pending_read_from_cronet = true;
318 }
319 }
320 }
321 }
322
read_grpc_header(stream_obj * s)323 static void read_grpc_header(stream_obj* s) {
324 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
325 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
326 s->state.rs.received_bytes = 0;
327 s->state.rs.compressed = false;
328 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
329 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
330 s->state.rs.remaining_bytes);
331 s->state.pending_read_from_cronet = true;
332 }
333
make_error_with_desc(int error_code,const char * desc)334 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
335 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
336 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
337 return error;
338 }
339
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)340 inline op_and_state::op_and_state(stream_obj* s,
341 const grpc_transport_stream_op_batch& op)
342 : op(op), state(s->arena), s(s) {}
343
344 /*
345 Add a new stream op to op storage.
346 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)347 static void add_to_storage(struct stream_obj* s,
348 grpc_transport_stream_op_batch* op) {
349 struct op_storage* storage = &s->storage;
350 /* add new op at the beginning of the linked list. The memory is freed
351 in remove_from_storage */
352 op_and_state* new_op = new op_and_state(s, *op);
353 gpr_mu_lock(&s->mu);
354 new_op->next = storage->head;
355 storage->head = new_op;
356 storage->num_pending_ops++;
357 if (op->send_message) {
358 s->state.pending_send_message = true;
359 }
360 if (op->recv_trailing_metadata) {
361 s->state.pending_recv_trailing_metadata = true;
362 maybe_flush_read(s);
363 }
364 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
365 storage->num_pending_ops);
366 gpr_mu_unlock(&s->mu);
367 }
368
369 /*
370 Traverse the linked list and delete op and free memory
371 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)372 static void remove_from_storage(struct stream_obj* s,
373 struct op_and_state* oas) {
374 struct op_and_state* curr;
375 if (s->storage.head == nullptr || oas == nullptr) {
376 return;
377 }
378 if (s->storage.head == oas) {
379 s->storage.head = oas->next;
380 delete oas;
381 s->storage.num_pending_ops--;
382 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
383 s->storage.num_pending_ops);
384 } else {
385 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
386 if (curr->next == oas) {
387 curr->next = oas->next;
388 s->storage.num_pending_ops--;
389 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
390 s->storage.num_pending_ops);
391 delete oas;
392 break;
393 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
394 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
395 }
396 }
397 }
398 }
399
400 /*
401 Cycle through ops and try to take next action. Break when either
402 an action with callback is taken, or no action is possible.
403 This can get executed from the Cronet network thread via cronet callback
404 or on the application supplied thread via the perform_stream_op function.
405 */
execute_from_storage(stream_obj * s)406 static void execute_from_storage(stream_obj* s) {
407 gpr_mu_lock(&s->mu);
408 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
409 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
410 GPR_ASSERT(!curr->done);
411 enum e_op_result result = execute_stream_op(curr);
412 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
413 op_result_string(result));
414 /* if this op is done, then remove it and free memory */
415 if (curr->done) {
416 struct op_and_state* next = curr->next;
417 remove_from_storage(s, curr);
418 curr = next;
419 } else if (result == NO_ACTION_POSSIBLE) {
420 curr = curr->next;
421 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
422 /* wait for the callback */
423 break;
424 } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
425 }
426 gpr_mu_unlock(&s->mu);
427 }
428
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)429 static void convert_cronet_array_to_metadata(
430 const bidirectional_stream_header_array* header_array,
431 grpc_chttp2_incoming_metadata_buffer* mds) {
432 for (size_t i = 0; i < header_array->count; i++) {
433 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
434 header_array->headers[i].key, header_array->headers[i].value);
435 grpc_slice key = grpc_slice_intern(
436 grpc_slice_from_static_string(header_array->headers[i].key));
437 grpc_slice value;
438 if (grpc_is_refcounted_slice_binary_header(key)) {
439 value = grpc_slice_from_static_string(header_array->headers[i].value);
440 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
441 value, grpc_chttp2_base64_infer_length_after_decode(value)));
442 } else {
443 value = grpc_slice_intern(
444 grpc_slice_from_static_string(header_array->headers[i].value));
445 }
446 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
447 grpc_chttp2_incoming_metadata_buffer_add(
448 mds, grpc_mdelem_from_slices(key, value)));
449 }
450 }
451
452 /*
453 Cronet callback
454 */
on_failed(bidirectional_stream * stream,int net_error)455 static void on_failed(bidirectional_stream* stream, int net_error) {
456 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
457 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
458 grpc_core::ExecCtx exec_ctx;
459
460 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
461 gpr_mu_lock(&s->mu);
462 bidirectional_stream_destroy(s->cbs);
463 s->state.state_callback_received[OP_FAILED] = true;
464 s->cbs = nullptr;
465 if (s->header_array.headers) {
466 gpr_free(s->header_array.headers);
467 s->header_array.headers = nullptr;
468 }
469 if (s->state.ws.write_buffer) {
470 gpr_free(s->state.ws.write_buffer);
471 s->state.ws.write_buffer = nullptr;
472 }
473 null_and_maybe_free_read_buffer(s);
474 gpr_mu_unlock(&s->mu);
475 execute_from_storage(s);
476 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
477 }
478
479 /*
480 Cronet callback
481 */
on_canceled(bidirectional_stream * stream)482 static void on_canceled(bidirectional_stream* stream) {
483 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
484 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
485 grpc_core::ExecCtx exec_ctx;
486
487 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
488 gpr_mu_lock(&s->mu);
489 bidirectional_stream_destroy(s->cbs);
490 s->state.state_callback_received[OP_CANCELED] = true;
491 s->cbs = nullptr;
492 if (s->header_array.headers) {
493 gpr_free(s->header_array.headers);
494 s->header_array.headers = nullptr;
495 }
496 if (s->state.ws.write_buffer) {
497 gpr_free(s->state.ws.write_buffer);
498 s->state.ws.write_buffer = nullptr;
499 }
500 null_and_maybe_free_read_buffer(s);
501 gpr_mu_unlock(&s->mu);
502 execute_from_storage(s);
503 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
504 }
505
506 /*
507 Cronet callback
508 */
on_succeeded(bidirectional_stream * stream)509 static void on_succeeded(bidirectional_stream* stream) {
510 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
511 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
512 grpc_core::ExecCtx exec_ctx;
513
514 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
515 gpr_mu_lock(&s->mu);
516 bidirectional_stream_destroy(s->cbs);
517 s->state.state_callback_received[OP_SUCCEEDED] = true;
518 s->cbs = nullptr;
519 null_and_maybe_free_read_buffer(s);
520 gpr_mu_unlock(&s->mu);
521 execute_from_storage(s);
522 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
523 }
524
525 /*
526 Cronet callback
527 */
on_stream_ready(bidirectional_stream * stream)528 static void on_stream_ready(bidirectional_stream* stream) {
529 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
530 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
531 grpc_core::ExecCtx exec_ctx;
532 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
533 grpc_cronet_transport* t = s->curr_ct;
534 gpr_mu_lock(&s->mu);
535 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
536 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
537 /* Free the memory allocated for headers */
538 if (s->header_array.headers) {
539 gpr_free(s->header_array.headers);
540 s->header_array.headers = nullptr;
541 }
542 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
543 * SEND_TRAILING_METADATA ops pending */
544 if (t->use_packet_coalescing) {
545 if (s->state.flush_cronet_when_ready) {
546 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
547 bidirectional_stream_flush(stream);
548 }
549 }
550 gpr_mu_unlock(&s->mu);
551 execute_from_storage(s);
552 }
553
554 /*
555 Cronet callback
556 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)557 static void on_response_headers_received(
558 bidirectional_stream* stream,
559 const bidirectional_stream_header_array* headers,
560 const char* negotiated_protocol) {
561 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
562 grpc_core::ExecCtx exec_ctx;
563 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
564 headers, negotiated_protocol);
565 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
566
567 /* Identify if this is a header or a trailer (in a trailer-only response case)
568 */
569 for (size_t i = 0; i < headers->count; i++) {
570 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
571 on_response_trailers_received(stream, headers);
572 /* Do an extra read for a trailer-only stream with grpc_status = 0
573 to trigger on_succeeded() callback */
574 if (0 == strcmp(headers->headers[i].value, "0")) {
575 read_grpc_header(s);
576 }
577 return;
578 }
579 }
580
581 gpr_mu_lock(&s->mu);
582 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
583 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
584 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
585 s->state.state_callback_received[OP_FAILED])) {
586 /* Do an extra read to trigger on_succeeded() callback in case connection
587 is closed */
588 GPR_ASSERT(s->state.rs.length_field_received == false);
589 read_grpc_header(s);
590 }
591 gpr_mu_unlock(&s->mu);
592 execute_from_storage(s);
593 }
594
595 /*
596 Cronet callback
597 */
on_write_completed(bidirectional_stream * stream,const char * data)598 static void on_write_completed(bidirectional_stream* stream, const char* data) {
599 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
600 grpc_core::ExecCtx exec_ctx;
601 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
602 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
603 gpr_mu_lock(&s->mu);
604 if (s->state.ws.write_buffer) {
605 gpr_free(s->state.ws.write_buffer);
606 s->state.ws.write_buffer = nullptr;
607 }
608 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
609 gpr_mu_unlock(&s->mu);
610 execute_from_storage(s);
611 }
612
613 /*
614 Cronet callback
615 */
on_read_completed(bidirectional_stream * stream,char * data,int count)616 static void on_read_completed(bidirectional_stream* stream, char* data,
617 int count) {
618 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
619 grpc_core::ExecCtx exec_ctx;
620 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
621 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
622 count);
623 gpr_mu_lock(&s->mu);
624 s->state.pending_read_from_cronet = false;
625 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
626 if (count > 0 && s->state.flush_read) {
627 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
628 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
629 GRPC_FLUSH_READ_SIZE);
630 s->state.pending_read_from_cronet = true;
631 gpr_mu_unlock(&s->mu);
632 } else if (count > 0) {
633 s->state.rs.received_bytes += count;
634 s->state.rs.remaining_bytes -= count;
635 if (s->state.rs.remaining_bytes > 0) {
636 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
637 s->state.state_op_done[OP_READ_REQ_MADE] = true;
638 bidirectional_stream_read(
639 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
640 s->state.rs.remaining_bytes);
641 s->state.pending_read_from_cronet = true;
642 gpr_mu_unlock(&s->mu);
643 } else {
644 gpr_mu_unlock(&s->mu);
645 execute_from_storage(s);
646 }
647 } else {
648 null_and_maybe_free_read_buffer(s);
649 s->state.rs.read_stream_closed = true;
650 gpr_mu_unlock(&s->mu);
651 execute_from_storage(s);
652 }
653 }
654
655 /*
656 Cronet callback
657 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)658 static void on_response_trailers_received(
659 bidirectional_stream* stream,
660 const bidirectional_stream_header_array* trailers) {
661 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
662 grpc_core::ExecCtx exec_ctx;
663 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
664 trailers);
665 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
666 grpc_cronet_transport* t = s->curr_ct;
667 gpr_mu_lock(&s->mu);
668 s->state.rs.trailing_metadata_valid = false;
669 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
670 if (trailers->count > 0) {
671 s->state.rs.trailing_metadata_valid = true;
672 }
673 for (size_t i = 0; i < trailers->count; i++) {
674 if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
675 0 != strcmp(trailers->headers[i].value, "0")) {
676 s->state.fail_state = true;
677 maybe_flush_read(s);
678 }
679 }
680 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
681 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
682 * trigger on_succeeded */
683 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
684 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
685 s->state.state_callback_received[OP_FAILED])) {
686 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
687 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
688 bidirectional_stream_write(s->cbs, "", 0, true);
689 if (t->use_packet_coalescing) {
690 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
691 bidirectional_stream_flush(s->cbs);
692 }
693 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
694
695 gpr_mu_unlock(&s->mu);
696 } else {
697 gpr_mu_unlock(&s->mu);
698 execute_from_storage(s);
699 }
700 }
701
702 /*
703 Utility function that takes the data from s->write_slice_buffer and assembles
704 into a contiguous byte stream with 5 byte gRPC header prepended.
705 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)706 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
707 char** pp_write_buffer,
708 size_t* p_write_buffer_size, uint32_t flags) {
709 grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
710 size_t length = GRPC_SLICE_LENGTH(slice);
711 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
712 /* This is freed in the on_write_completed callback */
713 char* write_buffer =
714 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
715 *pp_write_buffer = write_buffer;
716 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
717 /* Append 5 byte header */
718 /* Compressed flag */
719 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
720 /* Message length */
721 *p++ = static_cast<uint8_t>(length >> 24);
722 *p++ = static_cast<uint8_t>(length >> 16);
723 *p++ = static_cast<uint8_t>(length >> 8);
724 *p++ = static_cast<uint8_t>(length);
725 /* append actual data */
726 memcpy(p, GRPC_SLICE_START_PTR(slice), length);
727 grpc_slice_unref_internal(slice);
728 }
729
730 /*
731 Convert metadata in a format that Cronet can consume
732 */
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,std::string * pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)733 static void convert_metadata_to_cronet_headers(
734 grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
735 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
736 const char** method) {
737 grpc_linked_mdelem* curr = metadata->list.head;
738 /* Walk the linked list and get number of header fields */
739 size_t num_headers_available = 0;
740 while (curr != nullptr) {
741 curr = curr->next;
742 num_headers_available++;
743 }
744 grpc_millis deadline = metadata->deadline;
745 if (deadline != GRPC_MILLIS_INF_FUTURE) {
746 num_headers_available++;
747 }
748 /* Allocate enough memory. It is freed in the on_stream_ready callback
749 */
750 bidirectional_stream_header* headers =
751 static_cast<bidirectional_stream_header*>(gpr_malloc(
752 sizeof(bidirectional_stream_header) * num_headers_available));
753 *pp_headers = headers;
754
755 /* Walk the linked list again, this time copying the header fields.
756 s->num_headers can be less than num_headers_available, as some headers
757 are not used for cronet.
758 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
759 */
760 curr = metadata->list.head;
761 size_t num_headers = 0;
762 while (num_headers < num_headers_available) {
763 grpc_mdelem mdelem = curr->md;
764 curr = curr->next;
765 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
766 char* value;
767 if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
768 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
769 value = grpc_slice_to_c_string(wire_value);
770 grpc_slice_unref_internal(wire_value);
771 } else {
772 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
773 }
774 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
775 grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
776 GRPC_MDSTR_AUTHORITY)) {
777 /* Cronet populates these fields on its own */
778 gpr_free(key);
779 gpr_free(value);
780 continue;
781 }
782 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
783 if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
784 *method = "PUT";
785 } else {
786 /* POST method in default*/
787 *method = "POST";
788 }
789 gpr_free(key);
790 gpr_free(value);
791 continue;
792 }
793 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
794 /* Create URL by appending :path value to the hostname */
795 *pp_url = absl::StrCat("https://", host, value);
796 gpr_free(key);
797 gpr_free(value);
798 continue;
799 }
800 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
801 headers[num_headers].key = key;
802 headers[num_headers].value = value;
803 num_headers++;
804 if (curr == nullptr) {
805 break;
806 }
807 }
808 if (deadline != GRPC_MILLIS_INF_FUTURE) {
809 char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
810 char* value =
811 static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
812 grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
813 value);
814 headers[num_headers].key = key;
815 headers[num_headers].value = value;
816
817 num_headers++;
818 }
819
820 *p_num_headers = num_headers;
821 }
822
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)823 static void parse_grpc_header(const uint8_t* data, int* length,
824 bool* compressed) {
825 const uint8_t c = *data;
826 const uint8_t* p = data + 1;
827 *compressed = ((c & 0x01) == 0x01);
828 *length = 0;
829 *length |= (*p++) << 24;
830 *length |= (*p++) << 16;
831 *length |= (*p++) << 8;
832 *length |= (*p++);
833 }
834
header_has_authority(grpc_linked_mdelem * head)835 static bool header_has_authority(grpc_linked_mdelem* head) {
836 while (head != nullptr) {
837 if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
838 GRPC_MDSTR_AUTHORITY)) {
839 return true;
840 }
841 head = head->next;
842 }
843 return false;
844 }
845
846 /*
847 Op Execution: Decide if one of the actions contained in the stream op can be
848 executed. This is the heart of the state machine.
849 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)850 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
851 struct stream_obj* s, struct op_state* op_state,
852 enum e_op_id op_id) {
853 struct op_state* stream_state = &s->state;
854 grpc_cronet_transport* t = s->curr_ct;
855 bool result = true;
856 /* When call is canceled, every op can be run, except under following
857 conditions
858 */
859 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
860 stream_state->state_callback_received[OP_FAILED];
861 if (is_canceled_or_failed) {
862 if (op_id == OP_SEND_INITIAL_METADATA) {
863 CRONET_LOG(GPR_DEBUG, "Because");
864 result = false;
865 }
866 if (op_id == OP_SEND_MESSAGE) {
867 CRONET_LOG(GPR_DEBUG, "Because");
868 result = false;
869 }
870 if (op_id == OP_SEND_TRAILING_METADATA) {
871 CRONET_LOG(GPR_DEBUG, "Because");
872 result = false;
873 }
874 if (op_id == OP_CANCEL_ERROR) {
875 CRONET_LOG(GPR_DEBUG, "Because");
876 result = false;
877 }
878 /* already executed */
879 if (op_id == OP_RECV_INITIAL_METADATA &&
880 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
881 CRONET_LOG(GPR_DEBUG, "Because");
882 result = false;
883 }
884 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
885 CRONET_LOG(GPR_DEBUG, "Because");
886 result = false;
887 }
888 if (op_id == OP_RECV_TRAILING_METADATA &&
889 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
890 CRONET_LOG(GPR_DEBUG, "Because");
891 result = false;
892 }
893 /* ON_COMPLETE can be processed if one of the following conditions is met:
894 * 1. the stream failed
895 * 2. the stream is cancelled, and the callback is received
896 * 3. the stream succeeded before cancel is effective
897 * 4. the stream is cancelled, and the stream is never started */
898 if (op_id == OP_ON_COMPLETE &&
899 !(stream_state->state_callback_received[OP_FAILED] ||
900 stream_state->state_callback_received[OP_CANCELED] ||
901 stream_state->state_callback_received[OP_SUCCEEDED] ||
902 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
903 CRONET_LOG(GPR_DEBUG, "Because");
904 result = false;
905 }
906 } else if (op_id == OP_SEND_INITIAL_METADATA) {
907 /* already executed */
908 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
909 } else if (op_id == OP_RECV_INITIAL_METADATA) {
910 /* already executed */
911 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
912 /* we haven't sent headers yet. */
913 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
914 result = false;
915 /* we haven't received headers yet. */
916 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
917 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
918 result = false;
919 } else if (op_id == OP_SEND_MESSAGE) {
920 /* already executed (note we're checking op specific state, not stream
921 state) */
922 if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
923 /* we haven't sent headers yet. */
924 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
925 result = false;
926 } else if (op_id == OP_RECV_MESSAGE) {
927 /* already executed */
928 if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
929 /* we haven't received headers yet. */
930 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
931 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
932 result = false;
933 } else if (op_id == OP_RECV_TRAILING_METADATA) {
934 /* already executed */
935 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
936 /* we have asked for but haven't received message yet. */
937 else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
938 !stream_state->state_op_done[OP_RECV_MESSAGE])
939 result = false;
940 /* we haven't received trailers yet. */
941 else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
942 result = false;
943 /* we haven't received on_succeeded yet. */
944 else if (!stream_state->state_callback_received[OP_SUCCEEDED])
945 result = false;
946 } else if (op_id == OP_SEND_TRAILING_METADATA) {
947 /* already executed */
948 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
949 /* we haven't sent initial metadata yet */
950 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
951 result = false;
952 /* we haven't sent message yet */
953 else if (stream_state->pending_send_message &&
954 !stream_state->state_op_done[OP_SEND_MESSAGE])
955 result = false;
956 /* we haven't got on_write_completed for the send yet */
957 else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
958 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
959 !(t->use_packet_coalescing &&
960 stream_state->pending_write_for_trailer))
961 result = false;
962 } else if (op_id == OP_CANCEL_ERROR) {
963 /* already executed */
964 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
965 } else if (op_id == OP_ON_COMPLETE) {
966 /* already executed (note we're checking op specific state, not stream
967 state) */
968 if (op_state->state_op_done[OP_ON_COMPLETE]) {
969 CRONET_LOG(GPR_DEBUG, "Because");
970 result = false;
971 }
972 /* Check if every op that was asked for is done. */
973 /* TODO(muxi): We should not consider the recv ops here, since they
974 * have their own callbacks. We should invoke a batch's on_complete
975 * as soon as all of the batch's send ops are complete, even if
976 * there are still recv ops pending. */
977 else if (curr_op->send_initial_metadata &&
978 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
979 CRONET_LOG(GPR_DEBUG, "Because");
980 result = false;
981 } else if (curr_op->send_message &&
982 !op_state->state_op_done[OP_SEND_MESSAGE]) {
983 CRONET_LOG(GPR_DEBUG, "Because");
984 result = false;
985 } else if (curr_op->send_message &&
986 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
987 CRONET_LOG(GPR_DEBUG, "Because");
988 result = false;
989 } else if (curr_op->send_trailing_metadata &&
990 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
991 CRONET_LOG(GPR_DEBUG, "Because");
992 result = false;
993 } else if (curr_op->recv_initial_metadata &&
994 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
995 CRONET_LOG(GPR_DEBUG, "Because");
996 result = false;
997 } else if (curr_op->recv_message &&
998 !op_state->state_op_done[OP_RECV_MESSAGE]) {
999 CRONET_LOG(GPR_DEBUG, "Because");
1000 result = false;
1001 } else if (curr_op->cancel_stream &&
1002 !stream_state->state_callback_received[OP_CANCELED]) {
1003 CRONET_LOG(GPR_DEBUG, "Because");
1004 result = false;
1005 } else if (curr_op->recv_trailing_metadata) {
1006 /* We aren't done with trailing metadata yet */
1007 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1008 CRONET_LOG(GPR_DEBUG, "Because");
1009 result = false;
1010 }
1011 /* We've asked for actual message in an earlier op, and it hasn't been
1012 delivered yet. */
1013 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1014 /* If this op is not the one asking for read, (which means some earlier
1015 op has asked), and the read hasn't been delivered. */
1016 if (!curr_op->recv_message &&
1017 !stream_state->state_callback_received[OP_SUCCEEDED]) {
1018 CRONET_LOG(GPR_DEBUG, "Because");
1019 result = false;
1020 }
1021 }
1022 }
1023 /* We should see at least one on_write_completed for the trailers that we
1024 sent */
1025 else if (curr_op->send_trailing_metadata &&
1026 !stream_state->state_callback_received[OP_SEND_MESSAGE])
1027 result = false;
1028 }
1029 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1030 result ? "YES" : "NO");
1031 return result;
1032 }
1033
1034 /*
1035 TODO (makdharma): Break down this function in smaller chunks for readability.
1036 */
execute_stream_op(struct op_and_state * oas)1037 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1038 grpc_transport_stream_op_batch* stream_op = &oas->op;
1039 struct stream_obj* s = oas->s;
1040 grpc_cronet_transport* t = s->curr_ct;
1041 struct op_state* stream_state = &s->state;
1042 enum e_op_result result = NO_ACTION_POSSIBLE;
1043 if (stream_op->send_initial_metadata &&
1044 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1045 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1046 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1047 * on_failed */
1048 GPR_ASSERT(s->cbs == nullptr);
1049 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1050 s->cbs =
1051 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1052 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1053 if (t->use_packet_coalescing) {
1054 bidirectional_stream_disable_auto_flush(s->cbs, true);
1055 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1056 }
1057 std::string url;
1058 const char* method = "POST";
1059 s->header_array.headers = nullptr;
1060 convert_metadata_to_cronet_headers(
1061 stream_op->payload->send_initial_metadata.send_initial_metadata,
1062 t->host, &url, &s->header_array.headers, &s->header_array.count,
1063 &method);
1064 s->header_array.capacity = s->header_array.count;
1065 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1066 url.c_str());
1067 bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1068 false);
1069 unsigned int header_index;
1070 for (header_index = 0; header_index < s->header_array.count;
1071 header_index++) {
1072 gpr_free((void*)s->header_array.headers[header_index].key);
1073 gpr_free((void*)s->header_array.headers[header_index].value);
1074 }
1075 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1076 if (t->use_packet_coalescing) {
1077 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1078 s->state.flush_cronet_when_ready = true;
1079 }
1080 }
1081 result = ACTION_TAKEN_WITH_CALLBACK;
1082 } else if (stream_op->send_message &&
1083 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1084 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1085 stream_state->pending_send_message = false;
1086 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1087 stream_state->state_callback_received[OP_FAILED] ||
1088 stream_state->state_callback_received[OP_SUCCEEDED]) {
1089 result = NO_ACTION_POSSIBLE;
1090 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1091 } else {
1092 grpc_slice_buffer write_slice_buffer;
1093 grpc_slice slice;
1094 grpc_slice_buffer_init(&write_slice_buffer);
1095 if (1 != stream_op->payload->send_message.send_message->Next(
1096 stream_op->payload->send_message.send_message->length(),
1097 nullptr)) {
1098 /* Should never reach here */
1099 GPR_ASSERT(false);
1100 }
1101 if (GRPC_ERROR_NONE !=
1102 stream_op->payload->send_message.send_message->Pull(&slice)) {
1103 /* Should never reach here */
1104 GPR_ASSERT(false);
1105 }
1106 grpc_slice_buffer_add(&write_slice_buffer, slice);
1107 if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1108 /* Empty request not handled yet */
1109 gpr_log(GPR_ERROR, "Empty request is not supported");
1110 GPR_ASSERT(write_slice_buffer.count == 1);
1111 }
1112 if (write_slice_buffer.count > 0) {
1113 size_t write_buffer_size;
1114 create_grpc_frame(
1115 &write_slice_buffer, &stream_state->ws.write_buffer,
1116 &write_buffer_size,
1117 stream_op->payload->send_message.send_message->flags());
1118 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1119 stream_state->ws.write_buffer);
1120 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1121 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1122 static_cast<int>(write_buffer_size), false);
1123 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1124 if (t->use_packet_coalescing) {
1125 if (!stream_op->send_trailing_metadata) {
1126 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1127 bidirectional_stream_flush(s->cbs);
1128 result = ACTION_TAKEN_WITH_CALLBACK;
1129 } else {
1130 stream_state->pending_write_for_trailer = true;
1131 result = ACTION_TAKEN_NO_CALLBACK;
1132 }
1133 } else {
1134 result = ACTION_TAKEN_WITH_CALLBACK;
1135 }
1136 } else {
1137 result = NO_ACTION_POSSIBLE;
1138 }
1139 }
1140 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1141 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1142 stream_op->payload->send_message.send_message.reset();
1143 } else if (stream_op->send_trailing_metadata &&
1144 op_can_be_run(stream_op, s, &oas->state,
1145 OP_SEND_TRAILING_METADATA)) {
1146 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1147 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1148 stream_state->state_callback_received[OP_FAILED] ||
1149 stream_state->state_callback_received[OP_SUCCEEDED]) {
1150 result = NO_ACTION_POSSIBLE;
1151 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1152 } else {
1153 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1154 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1155 bidirectional_stream_write(s->cbs, "", 0, true);
1156 if (t->use_packet_coalescing) {
1157 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1158 bidirectional_stream_flush(s->cbs);
1159 }
1160 result = ACTION_TAKEN_WITH_CALLBACK;
1161 }
1162 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1163 } else if (stream_op->recv_initial_metadata &&
1164 op_can_be_run(stream_op, s, &oas->state,
1165 OP_RECV_INITIAL_METADATA)) {
1166 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1167 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1168 grpc_core::ExecCtx::Run(
1169 DEBUG_LOCATION,
1170 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1171 GRPC_ERROR_NONE);
1172 } else if (stream_state->state_callback_received[OP_FAILED]) {
1173 grpc_core::ExecCtx::Run(
1174 DEBUG_LOCATION,
1175 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1176 GRPC_ERROR_NONE);
1177 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1178 grpc_core::ExecCtx::Run(
1179 DEBUG_LOCATION,
1180 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1181 GRPC_ERROR_NONE);
1182 } else {
1183 grpc_chttp2_incoming_metadata_buffer_publish(
1184 &oas->s->state.rs.initial_metadata,
1185 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1186 grpc_core::ExecCtx::Run(
1187 DEBUG_LOCATION,
1188 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1189 GRPC_ERROR_NONE);
1190 }
1191 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1192 result = ACTION_TAKEN_NO_CALLBACK;
1193 } else if (stream_op->recv_message &&
1194 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1195 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1196 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1197 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1198 grpc_core::ExecCtx::Run(
1199 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1200 GRPC_ERROR_NONE);
1201 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1202 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1203 result = ACTION_TAKEN_NO_CALLBACK;
1204 } else if (stream_state->state_callback_received[OP_FAILED]) {
1205 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1206 grpc_core::ExecCtx::Run(
1207 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1208 GRPC_ERROR_NONE);
1209 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1210 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1211 result = ACTION_TAKEN_NO_CALLBACK;
1212 } else if (stream_state->rs.read_stream_closed == true) {
1213 /* No more data will be received */
1214 CRONET_LOG(GPR_DEBUG, "read stream closed");
1215 grpc_core::ExecCtx::Run(
1216 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1217 GRPC_ERROR_NONE);
1218 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1219 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1220 result = ACTION_TAKEN_NO_CALLBACK;
1221 } else if (stream_state->flush_read) {
1222 CRONET_LOG(GPR_DEBUG, "flush read");
1223 grpc_core::ExecCtx::Run(
1224 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1225 GRPC_ERROR_NONE);
1226 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1227 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1228 result = ACTION_TAKEN_NO_CALLBACK;
1229 } else if (stream_state->rs.length_field_received == false) {
1230 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1231 stream_state->rs.remaining_bytes == 0) {
1232 /* Start a read operation for data */
1233 stream_state->rs.length_field_received = true;
1234 parse_grpc_header(
1235 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1236 &stream_state->rs.length_field, &stream_state->rs.compressed);
1237 CRONET_LOG(GPR_DEBUG, "length field = %d",
1238 stream_state->rs.length_field);
1239 if (stream_state->rs.length_field > 0) {
1240 stream_state->rs.read_buffer = static_cast<char*>(
1241 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1242 GPR_ASSERT(stream_state->rs.read_buffer);
1243 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1244 stream_state->rs.received_bytes = 0;
1245 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1246 stream_state->state_op_done[OP_READ_REQ_MADE] =
1247 true; /* Indicates that at least one read request has been made */
1248 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1249 stream_state->rs.remaining_bytes);
1250 stream_state->pending_read_from_cronet = true;
1251 result = ACTION_TAKEN_WITH_CALLBACK;
1252 } else {
1253 stream_state->rs.remaining_bytes = 0;
1254 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1255 /* Clean up read_slice_buffer in case there is unread data. */
1256 grpc_slice_buffer_destroy_internal(
1257 &stream_state->rs.read_slice_buffer);
1258 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1259 uint32_t flags = 0;
1260 if (stream_state->rs.compressed) {
1261 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1262 }
1263 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1264 stream_op->payload->recv_message.recv_message->reset(
1265 stream_state->rs.sbs.get());
1266 grpc_core::ExecCtx::Run(
1267 DEBUG_LOCATION,
1268 stream_op->payload->recv_message.recv_message_ready,
1269 GRPC_ERROR_NONE);
1270 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1271 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1272
1273 /* Extra read to trigger on_succeed */
1274 stream_state->rs.length_field_received = false;
1275 stream_state->state_op_done[OP_READ_REQ_MADE] =
1276 true; /* Indicates that at least one read request has been made */
1277 read_grpc_header(s);
1278 result = ACTION_TAKEN_NO_CALLBACK;
1279 }
1280 } else if (stream_state->rs.remaining_bytes == 0) {
1281 /* Start a read operation for first 5 bytes (GRPC header) */
1282 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1283 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1284 stream_state->rs.received_bytes = 0;
1285 stream_state->rs.compressed = false;
1286 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1287 stream_state->state_op_done[OP_READ_REQ_MADE] =
1288 true; /* Indicates that at least one read request has been made */
1289 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1290 stream_state->rs.remaining_bytes);
1291 stream_state->pending_read_from_cronet = true;
1292 result = ACTION_TAKEN_WITH_CALLBACK;
1293 } else {
1294 result = NO_ACTION_POSSIBLE;
1295 }
1296 } else if (stream_state->rs.remaining_bytes == 0) {
1297 CRONET_LOG(GPR_DEBUG, "read operation complete");
1298 grpc_slice read_data_slice =
1299 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1300 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1301 memcpy(dst_p, stream_state->rs.read_buffer,
1302 static_cast<size_t>(stream_state->rs.length_field));
1303 null_and_maybe_free_read_buffer(s);
1304 /* Clean up read_slice_buffer in case there is unread data. */
1305 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1306 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1307 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1308 read_data_slice);
1309 uint32_t flags = 0;
1310 if (stream_state->rs.compressed) {
1311 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1312 }
1313 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1314 stream_op->payload->recv_message.recv_message->reset(
1315 stream_state->rs.sbs.get());
1316 grpc_core::ExecCtx::Run(
1317 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1318 GRPC_ERROR_NONE);
1319 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1320 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1321 /* Do an extra read to trigger on_succeeded() callback in case connection
1322 is closed */
1323 stream_state->rs.length_field_received = false;
1324 read_grpc_header(s);
1325 result = ACTION_TAKEN_NO_CALLBACK;
1326 }
1327 } else if (stream_op->recv_trailing_metadata &&
1328 op_can_be_run(stream_op, s, &oas->state,
1329 OP_RECV_TRAILING_METADATA)) {
1330 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1331 grpc_error* error = GRPC_ERROR_NONE;
1332 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1333 error = GRPC_ERROR_REF(stream_state->cancel_error);
1334 } else if (stream_state->state_callback_received[OP_FAILED]) {
1335 error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1336 } else if (oas->s->state.rs.trailing_metadata_valid) {
1337 grpc_chttp2_incoming_metadata_buffer_publish(
1338 &oas->s->state.rs.trailing_metadata,
1339 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1340 stream_state->rs.trailing_metadata_valid = false;
1341 }
1342 grpc_core::ExecCtx::Run(
1343 DEBUG_LOCATION,
1344 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1345 error);
1346 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1347 result = ACTION_TAKEN_NO_CALLBACK;
1348 } else if (stream_op->cancel_stream &&
1349 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1350 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1351 if (s->cbs) {
1352 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1353 bidirectional_stream_cancel(s->cbs);
1354 result = ACTION_TAKEN_WITH_CALLBACK;
1355 } else {
1356 result = ACTION_TAKEN_NO_CALLBACK;
1357 }
1358 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1359 if (!stream_state->cancel_error) {
1360 stream_state->cancel_error =
1361 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1362 }
1363 } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1364 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1365 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1366 if (stream_op->on_complete) {
1367 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1368 GRPC_ERROR_REF(stream_state->cancel_error));
1369 }
1370 } else if (stream_state->state_callback_received[OP_FAILED]) {
1371 if (stream_op->on_complete) {
1372 grpc_core::ExecCtx::Run(
1373 DEBUG_LOCATION, stream_op->on_complete,
1374 make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1375 }
1376 } else {
1377 /* All actions in this stream_op are complete. Call the on_complete
1378 * callback
1379 */
1380 if (stream_op->on_complete) {
1381 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1382 GRPC_ERROR_NONE);
1383 }
1384 }
1385 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1386 oas->done = true;
1387 /* reset any send message state, only if this ON_COMPLETE is about a send.
1388 */
1389 if (stream_op->send_message) {
1390 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1391 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1392 }
1393 result = ACTION_TAKEN_NO_CALLBACK;
1394 /* If this is the on_complete callback being called for a received message -
1395 make a note */
1396 if (stream_op->recv_message)
1397 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1398 } else {
1399 result = NO_ACTION_POSSIBLE;
1400 }
1401 return result;
1402 }
1403
1404 /*
1405 Functions used by upper layers to access transport functionality.
1406 */
1407
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1408 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1409 grpc_stream_refcount* refcount,
1410 grpc_core::Arena* arena)
1411 : arena(arena),
1412 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1413 curr_gs(gs),
1414 state(arena),
1415 refcount(refcount) {
1416 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1417 gpr_mu_init(&mu);
1418 }
1419
~stream_obj()1420 inline stream_obj::~stream_obj() {
1421 null_and_maybe_free_read_buffer(this);
1422 /* Clean up read_slice_buffer in case there is unread data. */
1423 grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1424 GRPC_ERROR_UNREF(state.cancel_error);
1425 }
1426
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)1427 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1428 grpc_stream_refcount* refcount, const void* server_data,
1429 grpc_core::Arena* arena) {
1430 new (gs) stream_obj(gt, gs, refcount, arena);
1431 return 0;
1432 }
1433
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1434 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1435 grpc_pollset* pollset) {}
1436
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1437 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1438 grpc_pollset_set* pollset_set) {}
1439
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1440 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1441 grpc_transport_stream_op_batch* op) {
1442 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1443 if (op->send_initial_metadata &&
1444 header_has_authority(op->payload->send_initial_metadata
1445 .send_initial_metadata->list.head)) {
1446 /* Cronet does not support :authority header field. We cancel the call when
1447 this field is present in metadata */
1448 if (op->recv_initial_metadata) {
1449 grpc_core::ExecCtx::Run(
1450 DEBUG_LOCATION,
1451 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1452 GRPC_ERROR_CANCELLED);
1453 }
1454 if (op->recv_message) {
1455 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1456 op->payload->recv_message.recv_message_ready,
1457 GRPC_ERROR_CANCELLED);
1458 }
1459 if (op->recv_trailing_metadata) {
1460 grpc_core::ExecCtx::Run(
1461 DEBUG_LOCATION,
1462 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1463 GRPC_ERROR_CANCELLED);
1464 }
1465 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1466 GRPC_ERROR_CANCELLED);
1467 return;
1468 }
1469 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1470 add_to_storage(s, op);
1471 execute_from_storage(s);
1472 }
1473
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1474 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1475 grpc_closure* then_schedule_closure) {
1476 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1477 s->~stream_obj();
1478 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1479 GRPC_ERROR_NONE);
1480 }
1481
destroy_transport(grpc_transport * gt)1482 static void destroy_transport(grpc_transport* gt) {}
1483
get_endpoint(grpc_transport * gt)1484 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1485
perform_op(grpc_transport * gt,grpc_transport_op * op)1486 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1487
1488 static const grpc_transport_vtable grpc_cronet_vtable = {
1489 sizeof(stream_obj),
1490 "cronet_http",
1491 init_stream,
1492 set_pollset_do_nothing,
1493 set_pollset_set_do_nothing,
1494 perform_stream_op,
1495 perform_op,
1496 destroy_stream,
1497 destroy_transport,
1498 get_endpoint};
1499
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1500 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1501 const grpc_channel_args* args,
1502 void* reserved) {
1503 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1504 gpr_malloc(sizeof(grpc_cronet_transport)));
1505 if (!ct) {
1506 goto error;
1507 }
1508 ct->base.vtable = &grpc_cronet_vtable;
1509 ct->engine = static_cast<stream_engine*>(engine);
1510 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1511 if (!ct->host) {
1512 goto error;
1513 }
1514 strcpy(ct->host, target);
1515
1516 ct->use_packet_coalescing = true;
1517 if (args) {
1518 for (size_t i = 0; i < args->num_args; i++) {
1519 if (0 ==
1520 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1521 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1522 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1523 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1524 } else {
1525 ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1526 }
1527 }
1528 }
1529 }
1530
1531 return &ct->base;
1532
1533 error:
1534 if (ct) {
1535 if (ct->host) {
1536 gpr_free(ct->host);
1537 }
1538 gpr_free(ct);
1539 }
1540
1541 return nullptr;
1542 }
1543