• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <string.h>
22 
23 #include <string>
24 
25 #include "absl/strings/str_cat.h"
26 
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 
31 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
32 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
33 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
34 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
35 #include "src/core/lib/debug/trace.h"
36 #include "src/core/lib/gpr/string.h"
37 #include "src/core/lib/gprpp/manual_constructor.h"
38 #include "src/core/lib/iomgr/endpoint.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/lib/surface/channel.h"
43 #include "src/core/lib/surface/validate_metadata.h"
44 #include "src/core/lib/transport/metadata_batch.h"
45 #include "src/core/lib/transport/static_metadata.h"
46 #include "src/core/lib/transport/timeout_encoding.h"
47 #include "src/core/lib/transport/transport_impl.h"
48 
49 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
50 
51 #define GRPC_HEADER_SIZE_IN_BYTES 5
52 #define GRPC_FLUSH_READ_SIZE 4096
53 
54 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
55 #define CRONET_LOG(...)                                    \
56   do {                                                     \
57     if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
58   } while (0)
59 
60 enum e_op_result {
61   ACTION_TAKEN_WITH_CALLBACK,
62   ACTION_TAKEN_NO_CALLBACK,
63   NO_ACTION_POSSIBLE
64 };
65 
66 enum e_op_id {
67   OP_SEND_INITIAL_METADATA = 0,
68   OP_SEND_MESSAGE,
69   OP_SEND_TRAILING_METADATA,
70   OP_RECV_MESSAGE,
71   OP_RECV_INITIAL_METADATA,
72   OP_RECV_TRAILING_METADATA,
73   OP_CANCEL_ERROR,
74   OP_ON_COMPLETE,
75   OP_FAILED,
76   OP_SUCCEEDED,
77   OP_CANCELED,
78   OP_RECV_MESSAGE_AND_ON_COMPLETE,
79   OP_READ_REQ_MADE,
80   OP_NUM_OPS
81 };
82 
83 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
84 
85 static void on_stream_ready(bidirectional_stream*);
86 static void on_response_headers_received(
87     bidirectional_stream*, const bidirectional_stream_header_array*,
88     const char*);
89 static void on_write_completed(bidirectional_stream*, const char*);
90 static void on_read_completed(bidirectional_stream*, char*, int);
91 static void on_response_trailers_received(
92     bidirectional_stream*, const bidirectional_stream_header_array*);
93 static void on_succeeded(bidirectional_stream*);
94 static void on_failed(bidirectional_stream*, int);
95 static void on_canceled(bidirectional_stream*);
96 static bidirectional_stream_callback cronet_callbacks = {
97     on_stream_ready,
98     on_response_headers_received,
99     on_read_completed,
100     on_write_completed,
101     on_response_trailers_received,
102     on_succeeded,
103     on_failed,
104     on_canceled};
105 
106 /* Cronet transport object */
107 struct grpc_cronet_transport {
108   grpc_transport base; /* must be first element in this structure */
109   stream_engine* engine;
110   char* host;
111   bool use_packet_coalescing;
112 };
113 typedef struct grpc_cronet_transport grpc_cronet_transport;
114 
115 /* TODO (makdharma): reorder structure for memory efficiency per
116    http://www.catb.org/esr/structure-packing/#_structure_reordering: */
117 struct read_state {
read_stateread_state118   read_state(grpc_core::Arena* arena)
119       : trailing_metadata(arena), initial_metadata(arena) {
120     grpc_slice_buffer_init(&read_slice_buffer);
121   }
122 
123   /* vars to store data coming from server */
124   char* read_buffer = nullptr;
125   bool length_field_received = false;
126   int received_bytes = 0;
127   int remaining_bytes = 0;
128   int length_field = 0;
129   bool compressed = 0;
130   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
131   char* payload_field = nullptr;
132   bool read_stream_closed = 0;
133 
134   /* vars for holding data destined for the application */
135   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
136   grpc_slice_buffer read_slice_buffer;
137 
138   /* vars for trailing metadata */
139   grpc_chttp2_incoming_metadata_buffer trailing_metadata;
140   bool trailing_metadata_valid = false;
141 
142   /* vars for initial metadata */
143   grpc_chttp2_incoming_metadata_buffer initial_metadata;
144 };
145 
146 struct write_state {
147   char* write_buffer = nullptr;
148 };
149 
150 /* track state of one stream op */
151 struct op_state {
op_stateop_state152   op_state(grpc_core::Arena* arena) : rs(arena) {}
153 
154   bool state_op_done[OP_NUM_OPS] = {};
155   bool state_callback_received[OP_NUM_OPS] = {};
156   /* A non-zero gRPC status code has been seen */
157   bool fail_state = false;
158   /* Transport is discarding all buffered messages */
159   bool flush_read = false;
160   bool flush_cronet_when_ready = false;
161   bool pending_write_for_trailer = false;
162   bool pending_send_message = false;
163   /* User requested RECV_TRAILING_METADATA */
164   bool pending_recv_trailing_metadata = false;
165   /* Cronet has not issued a callback of a bidirectional read */
166   bool pending_read_from_cronet = false;
167   grpc_error* cancel_error = GRPC_ERROR_NONE;
168   /* data structure for storing data coming from server */
169   struct read_state rs;
170   /* data structure for storing data going to the server */
171   struct write_state ws;
172 };
173 
174 struct stream_obj;
175 
176 struct op_and_state {
177   op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
178 
179   grpc_transport_stream_op_batch op;
180   struct op_state state;
181   bool done = false;
182   struct stream_obj* s; /* Pointer back to the stream object */
183   /* next op_and_state in the linked list */
184   struct op_and_state* next = nullptr;
185 };
186 
187 struct op_storage {
188   int num_pending_ops = 0;
189   struct op_and_state* head = nullptr;
190 };
191 
192 struct stream_obj {
193   stream_obj(grpc_transport* gt, grpc_stream* gs,
194              grpc_stream_refcount* refcount, grpc_core::Arena* arena);
195   ~stream_obj();
196 
197   grpc_core::Arena* arena;
198   struct op_and_state* oas = nullptr;
199   grpc_transport_stream_op_batch* curr_op = nullptr;
200   grpc_cronet_transport* curr_ct;
201   grpc_stream* curr_gs;
202   bidirectional_stream* cbs = nullptr;
203   bidirectional_stream_header_array header_array =
204       bidirectional_stream_header_array();  // Zero-initialize the structure.
205 
206   /* Stream level state. Some state will be tracked both at stream and stream_op
207    * level */
208   struct op_state state;
209 
210   /* OP storage */
211   struct op_storage storage;
212 
213   /* Mutex to protect storage */
214   gpr_mu mu;
215 
216   /* Refcount object of the stream */
217   grpc_stream_refcount* refcount;
218 };
219 
220 #ifndef NDEBUG
221 #define GRPC_CRONET_STREAM_REF(stream, reason) \
222   grpc_cronet_stream_ref((stream), (reason))
223 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
224   grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)225 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
226   grpc_stream_ref(s->refcount, reason);
227 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)228 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
229   grpc_stream_unref(s->refcount, reason);
230 }
231 #else
232 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
233 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
234   grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)235 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)236 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
237 #endif
238 
239 static enum e_op_result execute_stream_op(struct op_and_state* oas);
240 
241 /*
242   Utility function to translate enum into string for printing
243 */
op_result_string(enum e_op_result i)244 static const char* op_result_string(enum e_op_result i) {
245   switch (i) {
246     case ACTION_TAKEN_WITH_CALLBACK:
247       return "ACTION_TAKEN_WITH_CALLBACK";
248     case ACTION_TAKEN_NO_CALLBACK:
249       return "ACTION_TAKEN_NO_CALLBACK";
250     case NO_ACTION_POSSIBLE:
251       return "NO_ACTION_POSSIBLE";
252   }
253   GPR_UNREACHABLE_CODE(return "UNKNOWN");
254 }
255 
op_id_string(enum e_op_id i)256 static const char* op_id_string(enum e_op_id i) {
257   switch (i) {
258     case OP_SEND_INITIAL_METADATA:
259       return "OP_SEND_INITIAL_METADATA";
260     case OP_SEND_MESSAGE:
261       return "OP_SEND_MESSAGE";
262     case OP_SEND_TRAILING_METADATA:
263       return "OP_SEND_TRAILING_METADATA";
264     case OP_RECV_MESSAGE:
265       return "OP_RECV_MESSAGE";
266     case OP_RECV_INITIAL_METADATA:
267       return "OP_RECV_INITIAL_METADATA";
268     case OP_RECV_TRAILING_METADATA:
269       return "OP_RECV_TRAILING_METADATA";
270     case OP_CANCEL_ERROR:
271       return "OP_CANCEL_ERROR";
272     case OP_ON_COMPLETE:
273       return "OP_ON_COMPLETE";
274     case OP_FAILED:
275       return "OP_FAILED";
276     case OP_SUCCEEDED:
277       return "OP_SUCCEEDED";
278     case OP_CANCELED:
279       return "OP_CANCELED";
280     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
281       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
282     case OP_READ_REQ_MADE:
283       return "OP_READ_REQ_MADE";
284     case OP_NUM_OPS:
285       return "OP_NUM_OPS";
286   }
287   return "UNKNOWN";
288 }
289 
null_and_maybe_free_read_buffer(stream_obj * s)290 static void null_and_maybe_free_read_buffer(stream_obj* s) {
291   if (s->state.rs.read_buffer &&
292       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
293     gpr_free(s->state.rs.read_buffer);
294   }
295   s->state.rs.read_buffer = nullptr;
296 }
297 
maybe_flush_read(stream_obj * s)298 static void maybe_flush_read(stream_obj* s) {
299   /* To enter flush read state (discarding all the buffered messages in
300    * transport layer), two conditions must be satisfied: 1) non-zero grpc status
301    * has been received, and 2) an op requesting the status code
302    * (RECV_TRAILING_METADATA) is issued by the user. (See
303    * doc/status_ordering.md) */
304   /* Whenever the evaluation of any of the two condition is changed, we check
305    * whether we should enter the flush read state. */
306   if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
307     if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
308       CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
309       s->state.flush_read = true;
310       null_and_maybe_free_read_buffer(s);
311       s->state.rs.read_buffer =
312           static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
313       if (!s->state.pending_read_from_cronet) {
314         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
315         bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
316                                   GRPC_FLUSH_READ_SIZE);
317         s->state.pending_read_from_cronet = true;
318       }
319     }
320   }
321 }
322 
read_grpc_header(stream_obj * s)323 static void read_grpc_header(stream_obj* s) {
324   s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
325   s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
326   s->state.rs.received_bytes = 0;
327   s->state.rs.compressed = false;
328   CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
329   bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
330                             s->state.rs.remaining_bytes);
331   s->state.pending_read_from_cronet = true;
332 }
333 
make_error_with_desc(int error_code,const char * desc)334 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
335   grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
336   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
337   return error;
338 }
339 
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)340 inline op_and_state::op_and_state(stream_obj* s,
341                                   const grpc_transport_stream_op_batch& op)
342     : op(op), state(s->arena), s(s) {}
343 
344 /*
345   Add a new stream op to op storage.
346 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)347 static void add_to_storage(struct stream_obj* s,
348                            grpc_transport_stream_op_batch* op) {
349   struct op_storage* storage = &s->storage;
350   /* add new op at the beginning of the linked list. The memory is freed
351   in remove_from_storage */
352   op_and_state* new_op = new op_and_state(s, *op);
353   gpr_mu_lock(&s->mu);
354   new_op->next = storage->head;
355   storage->head = new_op;
356   storage->num_pending_ops++;
357   if (op->send_message) {
358     s->state.pending_send_message = true;
359   }
360   if (op->recv_trailing_metadata) {
361     s->state.pending_recv_trailing_metadata = true;
362     maybe_flush_read(s);
363   }
364   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
365              storage->num_pending_ops);
366   gpr_mu_unlock(&s->mu);
367 }
368 
369 /*
370   Traverse the linked list and delete op and free memory
371 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)372 static void remove_from_storage(struct stream_obj* s,
373                                 struct op_and_state* oas) {
374   struct op_and_state* curr;
375   if (s->storage.head == nullptr || oas == nullptr) {
376     return;
377   }
378   if (s->storage.head == oas) {
379     s->storage.head = oas->next;
380     delete oas;
381     s->storage.num_pending_ops--;
382     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
383                s->storage.num_pending_ops);
384   } else {
385     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
386       if (curr->next == oas) {
387         curr->next = oas->next;
388         s->storage.num_pending_ops--;
389         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
390                    s->storage.num_pending_ops);
391         delete oas;
392         break;
393       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
394         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
395       }
396     }
397   }
398 }
399 
400 /*
401   Cycle through ops and try to take next action. Break when either
402   an action with callback is taken, or no action is possible.
403   This can get executed from the Cronet network thread via cronet callback
404   or on the application supplied thread via the perform_stream_op function.
405 */
execute_from_storage(stream_obj * s)406 static void execute_from_storage(stream_obj* s) {
407   gpr_mu_lock(&s->mu);
408   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
409     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
410     GPR_ASSERT(!curr->done);
411     enum e_op_result result = execute_stream_op(curr);
412     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
413                op_result_string(result));
414     /* if this op is done, then remove it and free memory */
415     if (curr->done) {
416       struct op_and_state* next = curr->next;
417       remove_from_storage(s, curr);
418       curr = next;
419     } else if (result == NO_ACTION_POSSIBLE) {
420       curr = curr->next;
421     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
422       /* wait for the callback */
423       break;
424     } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
425   }
426   gpr_mu_unlock(&s->mu);
427 }
428 
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)429 static void convert_cronet_array_to_metadata(
430     const bidirectional_stream_header_array* header_array,
431     grpc_chttp2_incoming_metadata_buffer* mds) {
432   for (size_t i = 0; i < header_array->count; i++) {
433     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
434                header_array->headers[i].key, header_array->headers[i].value);
435     grpc_slice key = grpc_slice_intern(
436         grpc_slice_from_static_string(header_array->headers[i].key));
437     grpc_slice value;
438     if (grpc_is_refcounted_slice_binary_header(key)) {
439       value = grpc_slice_from_static_string(header_array->headers[i].value);
440       value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
441           value, grpc_chttp2_base64_infer_length_after_decode(value)));
442     } else {
443       value = grpc_slice_intern(
444           grpc_slice_from_static_string(header_array->headers[i].value));
445     }
446     GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
447                       grpc_chttp2_incoming_metadata_buffer_add(
448                           mds, grpc_mdelem_from_slices(key, value)));
449   }
450 }
451 
452 /*
453   Cronet callback
454 */
on_failed(bidirectional_stream * stream,int net_error)455 static void on_failed(bidirectional_stream* stream, int net_error) {
456   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
457   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
458   grpc_core::ExecCtx exec_ctx;
459 
460   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
461   gpr_mu_lock(&s->mu);
462   bidirectional_stream_destroy(s->cbs);
463   s->state.state_callback_received[OP_FAILED] = true;
464   s->cbs = nullptr;
465   if (s->header_array.headers) {
466     gpr_free(s->header_array.headers);
467     s->header_array.headers = nullptr;
468   }
469   if (s->state.ws.write_buffer) {
470     gpr_free(s->state.ws.write_buffer);
471     s->state.ws.write_buffer = nullptr;
472   }
473   null_and_maybe_free_read_buffer(s);
474   gpr_mu_unlock(&s->mu);
475   execute_from_storage(s);
476   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
477 }
478 
479 /*
480   Cronet callback
481 */
on_canceled(bidirectional_stream * stream)482 static void on_canceled(bidirectional_stream* stream) {
483   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
484   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
485   grpc_core::ExecCtx exec_ctx;
486 
487   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
488   gpr_mu_lock(&s->mu);
489   bidirectional_stream_destroy(s->cbs);
490   s->state.state_callback_received[OP_CANCELED] = true;
491   s->cbs = nullptr;
492   if (s->header_array.headers) {
493     gpr_free(s->header_array.headers);
494     s->header_array.headers = nullptr;
495   }
496   if (s->state.ws.write_buffer) {
497     gpr_free(s->state.ws.write_buffer);
498     s->state.ws.write_buffer = nullptr;
499   }
500   null_and_maybe_free_read_buffer(s);
501   gpr_mu_unlock(&s->mu);
502   execute_from_storage(s);
503   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
504 }
505 
506 /*
507   Cronet callback
508 */
on_succeeded(bidirectional_stream * stream)509 static void on_succeeded(bidirectional_stream* stream) {
510   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
511   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
512   grpc_core::ExecCtx exec_ctx;
513 
514   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
515   gpr_mu_lock(&s->mu);
516   bidirectional_stream_destroy(s->cbs);
517   s->state.state_callback_received[OP_SUCCEEDED] = true;
518   s->cbs = nullptr;
519   null_and_maybe_free_read_buffer(s);
520   gpr_mu_unlock(&s->mu);
521   execute_from_storage(s);
522   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
523 }
524 
525 /*
526   Cronet callback
527 */
on_stream_ready(bidirectional_stream * stream)528 static void on_stream_ready(bidirectional_stream* stream) {
529   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
530   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
531   grpc_core::ExecCtx exec_ctx;
532   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
533   grpc_cronet_transport* t = s->curr_ct;
534   gpr_mu_lock(&s->mu);
535   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
536   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
537   /* Free the memory allocated for headers */
538   if (s->header_array.headers) {
539     gpr_free(s->header_array.headers);
540     s->header_array.headers = nullptr;
541   }
542   /* Send the initial metadata on wire if there is no SEND_MESSAGE or
543    * SEND_TRAILING_METADATA ops pending */
544   if (t->use_packet_coalescing) {
545     if (s->state.flush_cronet_when_ready) {
546       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
547       bidirectional_stream_flush(stream);
548     }
549   }
550   gpr_mu_unlock(&s->mu);
551   execute_from_storage(s);
552 }
553 
554 /*
555   Cronet callback
556 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)557 static void on_response_headers_received(
558     bidirectional_stream* stream,
559     const bidirectional_stream_header_array* headers,
560     const char* negotiated_protocol) {
561   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
562   grpc_core::ExecCtx exec_ctx;
563   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
564              headers, negotiated_protocol);
565   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
566 
567   /* Identify if this is a header or a trailer (in a trailer-only response case)
568    */
569   for (size_t i = 0; i < headers->count; i++) {
570     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
571       on_response_trailers_received(stream, headers);
572       /* Do an extra read for a trailer-only stream with grpc_status = 0
573        to trigger on_succeeded() callback */
574       if (0 == strcmp(headers->headers[i].value, "0")) {
575         read_grpc_header(s);
576       }
577       return;
578     }
579   }
580 
581   gpr_mu_lock(&s->mu);
582   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
583   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
584   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
585         s->state.state_callback_received[OP_FAILED])) {
586     /* Do an extra read to trigger on_succeeded() callback in case connection
587      is closed */
588     GPR_ASSERT(s->state.rs.length_field_received == false);
589     read_grpc_header(s);
590   }
591   gpr_mu_unlock(&s->mu);
592   execute_from_storage(s);
593 }
594 
595 /*
596   Cronet callback
597 */
on_write_completed(bidirectional_stream * stream,const char * data)598 static void on_write_completed(bidirectional_stream* stream, const char* data) {
599   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
600   grpc_core::ExecCtx exec_ctx;
601   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
602   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
603   gpr_mu_lock(&s->mu);
604   if (s->state.ws.write_buffer) {
605     gpr_free(s->state.ws.write_buffer);
606     s->state.ws.write_buffer = nullptr;
607   }
608   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
609   gpr_mu_unlock(&s->mu);
610   execute_from_storage(s);
611 }
612 
613 /*
614   Cronet callback
615 */
on_read_completed(bidirectional_stream * stream,char * data,int count)616 static void on_read_completed(bidirectional_stream* stream, char* data,
617                               int count) {
618   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
619   grpc_core::ExecCtx exec_ctx;
620   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
621   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
622              count);
623   gpr_mu_lock(&s->mu);
624   s->state.pending_read_from_cronet = false;
625   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
626   if (count > 0 && s->state.flush_read) {
627     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
628     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
629                               GRPC_FLUSH_READ_SIZE);
630     s->state.pending_read_from_cronet = true;
631     gpr_mu_unlock(&s->mu);
632   } else if (count > 0) {
633     s->state.rs.received_bytes += count;
634     s->state.rs.remaining_bytes -= count;
635     if (s->state.rs.remaining_bytes > 0) {
636       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
637       s->state.state_op_done[OP_READ_REQ_MADE] = true;
638       bidirectional_stream_read(
639           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
640           s->state.rs.remaining_bytes);
641       s->state.pending_read_from_cronet = true;
642       gpr_mu_unlock(&s->mu);
643     } else {
644       gpr_mu_unlock(&s->mu);
645       execute_from_storage(s);
646     }
647   } else {
648     null_and_maybe_free_read_buffer(s);
649     s->state.rs.read_stream_closed = true;
650     gpr_mu_unlock(&s->mu);
651     execute_from_storage(s);
652   }
653 }
654 
655 /*
656   Cronet callback
657 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)658 static void on_response_trailers_received(
659     bidirectional_stream* stream,
660     const bidirectional_stream_header_array* trailers) {
661   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
662   grpc_core::ExecCtx exec_ctx;
663   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
664              trailers);
665   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
666   grpc_cronet_transport* t = s->curr_ct;
667   gpr_mu_lock(&s->mu);
668   s->state.rs.trailing_metadata_valid = false;
669   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
670   if (trailers->count > 0) {
671     s->state.rs.trailing_metadata_valid = true;
672   }
673   for (size_t i = 0; i < trailers->count; i++) {
674     if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
675         0 != strcmp(trailers->headers[i].value, "0")) {
676       s->state.fail_state = true;
677       maybe_flush_read(s);
678     }
679   }
680   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
681   /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
682    * trigger on_succeeded */
683   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
684       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
685         s->state.state_callback_received[OP_FAILED])) {
686     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
687     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
688     bidirectional_stream_write(s->cbs, "", 0, true);
689     if (t->use_packet_coalescing) {
690       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
691       bidirectional_stream_flush(s->cbs);
692     }
693     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
694 
695     gpr_mu_unlock(&s->mu);
696   } else {
697     gpr_mu_unlock(&s->mu);
698     execute_from_storage(s);
699   }
700 }
701 
702 /*
703  Utility function that takes the data from s->write_slice_buffer and assembles
704  into a contiguous byte stream with 5 byte gRPC header prepended.
705 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)706 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
707                               char** pp_write_buffer,
708                               size_t* p_write_buffer_size, uint32_t flags) {
709   grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
710   size_t length = GRPC_SLICE_LENGTH(slice);
711   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
712   /* This is freed in the on_write_completed callback */
713   char* write_buffer =
714       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
715   *pp_write_buffer = write_buffer;
716   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
717   /* Append 5 byte header */
718   /* Compressed flag */
719   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
720   /* Message length */
721   *p++ = static_cast<uint8_t>(length >> 24);
722   *p++ = static_cast<uint8_t>(length >> 16);
723   *p++ = static_cast<uint8_t>(length >> 8);
724   *p++ = static_cast<uint8_t>(length);
725   /* append actual data */
726   memcpy(p, GRPC_SLICE_START_PTR(slice), length);
727   grpc_slice_unref_internal(slice);
728 }
729 
730 /*
731  Convert metadata in a format that Cronet can consume
732 */
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,std::string * pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)733 static void convert_metadata_to_cronet_headers(
734     grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
735     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
736     const char** method) {
737   grpc_linked_mdelem* curr = metadata->list.head;
738   /* Walk the linked list and get number of header fields */
739   size_t num_headers_available = 0;
740   while (curr != nullptr) {
741     curr = curr->next;
742     num_headers_available++;
743   }
744   grpc_millis deadline = metadata->deadline;
745   if (deadline != GRPC_MILLIS_INF_FUTURE) {
746     num_headers_available++;
747   }
748   /* Allocate enough memory. It is freed in the on_stream_ready callback
749    */
750   bidirectional_stream_header* headers =
751       static_cast<bidirectional_stream_header*>(gpr_malloc(
752           sizeof(bidirectional_stream_header) * num_headers_available));
753   *pp_headers = headers;
754 
755   /* Walk the linked list again, this time copying the header fields.
756     s->num_headers can be less than num_headers_available, as some headers
757     are not used for cronet.
758     TODO (makdharma): Eliminate need to traverse the LL second time for perf.
759    */
760   curr = metadata->list.head;
761   size_t num_headers = 0;
762   while (num_headers < num_headers_available) {
763     grpc_mdelem mdelem = curr->md;
764     curr = curr->next;
765     char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
766     char* value;
767     if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
768       grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
769       value = grpc_slice_to_c_string(wire_value);
770       grpc_slice_unref_internal(wire_value);
771     } else {
772       value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
773     }
774     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
775         grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
776                                       GRPC_MDSTR_AUTHORITY)) {
777       /* Cronet populates these fields on its own */
778       gpr_free(key);
779       gpr_free(value);
780       continue;
781     }
782     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
783       if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
784         *method = "PUT";
785       } else {
786         /* POST method in default*/
787         *method = "POST";
788       }
789       gpr_free(key);
790       gpr_free(value);
791       continue;
792     }
793     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
794       /* Create URL by appending :path value to the hostname */
795       *pp_url = absl::StrCat("https://", host, value);
796       gpr_free(key);
797       gpr_free(value);
798       continue;
799     }
800     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
801     headers[num_headers].key = key;
802     headers[num_headers].value = value;
803     num_headers++;
804     if (curr == nullptr) {
805       break;
806     }
807   }
808   if (deadline != GRPC_MILLIS_INF_FUTURE) {
809     char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
810     char* value =
811         static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
812     grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
813                               value);
814     headers[num_headers].key = key;
815     headers[num_headers].value = value;
816 
817     num_headers++;
818   }
819 
820   *p_num_headers = num_headers;
821 }
822 
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)823 static void parse_grpc_header(const uint8_t* data, int* length,
824                               bool* compressed) {
825   const uint8_t c = *data;
826   const uint8_t* p = data + 1;
827   *compressed = ((c & 0x01) == 0x01);
828   *length = 0;
829   *length |= (*p++) << 24;
830   *length |= (*p++) << 16;
831   *length |= (*p++) << 8;
832   *length |= (*p++);
833 }
834 
header_has_authority(grpc_linked_mdelem * head)835 static bool header_has_authority(grpc_linked_mdelem* head) {
836   while (head != nullptr) {
837     if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
838                                       GRPC_MDSTR_AUTHORITY)) {
839       return true;
840     }
841     head = head->next;
842   }
843   return false;
844 }
845 
846 /*
847   Op Execution: Decide if one of the actions contained in the stream op can be
848   executed. This is the heart of the state machine.
849 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)850 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
851                           struct stream_obj* s, struct op_state* op_state,
852                           enum e_op_id op_id) {
853   struct op_state* stream_state = &s->state;
854   grpc_cronet_transport* t = s->curr_ct;
855   bool result = true;
856   /* When call is canceled, every op can be run, except under following
857   conditions
858   */
859   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
860                                stream_state->state_callback_received[OP_FAILED];
861   if (is_canceled_or_failed) {
862     if (op_id == OP_SEND_INITIAL_METADATA) {
863       CRONET_LOG(GPR_DEBUG, "Because");
864       result = false;
865     }
866     if (op_id == OP_SEND_MESSAGE) {
867       CRONET_LOG(GPR_DEBUG, "Because");
868       result = false;
869     }
870     if (op_id == OP_SEND_TRAILING_METADATA) {
871       CRONET_LOG(GPR_DEBUG, "Because");
872       result = false;
873     }
874     if (op_id == OP_CANCEL_ERROR) {
875       CRONET_LOG(GPR_DEBUG, "Because");
876       result = false;
877     }
878     /* already executed */
879     if (op_id == OP_RECV_INITIAL_METADATA &&
880         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
881       CRONET_LOG(GPR_DEBUG, "Because");
882       result = false;
883     }
884     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
885       CRONET_LOG(GPR_DEBUG, "Because");
886       result = false;
887     }
888     if (op_id == OP_RECV_TRAILING_METADATA &&
889         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
890       CRONET_LOG(GPR_DEBUG, "Because");
891       result = false;
892     }
893     /* ON_COMPLETE can be processed if one of the following conditions is met:
894      * 1. the stream failed
895      * 2. the stream is cancelled, and the callback is received
896      * 3. the stream succeeded before cancel is effective
897      * 4. the stream is cancelled, and the stream is never started */
898     if (op_id == OP_ON_COMPLETE &&
899         !(stream_state->state_callback_received[OP_FAILED] ||
900           stream_state->state_callback_received[OP_CANCELED] ||
901           stream_state->state_callback_received[OP_SUCCEEDED] ||
902           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
903       CRONET_LOG(GPR_DEBUG, "Because");
904       result = false;
905     }
906   } else if (op_id == OP_SEND_INITIAL_METADATA) {
907     /* already executed */
908     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
909   } else if (op_id == OP_RECV_INITIAL_METADATA) {
910     /* already executed */
911     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
912     /* we haven't sent headers yet. */
913     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
914       result = false;
915     /* we haven't received headers yet. */
916     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
917              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
918       result = false;
919   } else if (op_id == OP_SEND_MESSAGE) {
920     /* already executed (note we're checking op specific state, not stream
921      state) */
922     if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
923     /* we haven't sent headers yet. */
924     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
925       result = false;
926   } else if (op_id == OP_RECV_MESSAGE) {
927     /* already executed */
928     if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
929     /* we haven't received headers yet. */
930     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
931              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
932       result = false;
933   } else if (op_id == OP_RECV_TRAILING_METADATA) {
934     /* already executed */
935     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
936     /* we have asked for but haven't received message yet. */
937     else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
938              !stream_state->state_op_done[OP_RECV_MESSAGE])
939       result = false;
940     /* we haven't received trailers  yet. */
941     else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
942       result = false;
943     /* we haven't received on_succeeded  yet. */
944     else if (!stream_state->state_callback_received[OP_SUCCEEDED])
945       result = false;
946   } else if (op_id == OP_SEND_TRAILING_METADATA) {
947     /* already executed */
948     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
949     /* we haven't sent initial metadata yet */
950     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
951       result = false;
952     /* we haven't sent message yet */
953     else if (stream_state->pending_send_message &&
954              !stream_state->state_op_done[OP_SEND_MESSAGE])
955       result = false;
956     /* we haven't got on_write_completed for the send yet */
957     else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
958              !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
959              !(t->use_packet_coalescing &&
960                stream_state->pending_write_for_trailer))
961       result = false;
962   } else if (op_id == OP_CANCEL_ERROR) {
963     /* already executed */
964     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
965   } else if (op_id == OP_ON_COMPLETE) {
966     /* already executed (note we're checking op specific state, not stream
967     state) */
968     if (op_state->state_op_done[OP_ON_COMPLETE]) {
969       CRONET_LOG(GPR_DEBUG, "Because");
970       result = false;
971     }
972     /* Check if every op that was asked for is done. */
973     /* TODO(muxi): We should not consider the recv ops here, since they
974      * have their own callbacks.  We should invoke a batch's on_complete
975      * as soon as all of the batch's send ops are complete, even if
976      * there are still recv ops pending. */
977     else if (curr_op->send_initial_metadata &&
978              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
979       CRONET_LOG(GPR_DEBUG, "Because");
980       result = false;
981     } else if (curr_op->send_message &&
982                !op_state->state_op_done[OP_SEND_MESSAGE]) {
983       CRONET_LOG(GPR_DEBUG, "Because");
984       result = false;
985     } else if (curr_op->send_message &&
986                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
987       CRONET_LOG(GPR_DEBUG, "Because");
988       result = false;
989     } else if (curr_op->send_trailing_metadata &&
990                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
991       CRONET_LOG(GPR_DEBUG, "Because");
992       result = false;
993     } else if (curr_op->recv_initial_metadata &&
994                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
995       CRONET_LOG(GPR_DEBUG, "Because");
996       result = false;
997     } else if (curr_op->recv_message &&
998                !op_state->state_op_done[OP_RECV_MESSAGE]) {
999       CRONET_LOG(GPR_DEBUG, "Because");
1000       result = false;
1001     } else if (curr_op->cancel_stream &&
1002                !stream_state->state_callback_received[OP_CANCELED]) {
1003       CRONET_LOG(GPR_DEBUG, "Because");
1004       result = false;
1005     } else if (curr_op->recv_trailing_metadata) {
1006       /* We aren't done with trailing metadata yet */
1007       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1008         CRONET_LOG(GPR_DEBUG, "Because");
1009         result = false;
1010       }
1011       /* We've asked for actual message in an earlier op, and it hasn't been
1012         delivered yet. */
1013       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1014         /* If this op is not the one asking for read, (which means some earlier
1015           op has asked), and the read hasn't been delivered. */
1016         if (!curr_op->recv_message &&
1017             !stream_state->state_callback_received[OP_SUCCEEDED]) {
1018           CRONET_LOG(GPR_DEBUG, "Because");
1019           result = false;
1020         }
1021       }
1022     }
1023     /* We should see at least one on_write_completed for the trailers that we
1024       sent */
1025     else if (curr_op->send_trailing_metadata &&
1026              !stream_state->state_callback_received[OP_SEND_MESSAGE])
1027       result = false;
1028   }
1029   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1030              result ? "YES" : "NO");
1031   return result;
1032 }
1033 
1034 /*
1035   TODO (makdharma): Break down this function in smaller chunks for readability.
1036 */
execute_stream_op(struct op_and_state * oas)1037 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1038   grpc_transport_stream_op_batch* stream_op = &oas->op;
1039   struct stream_obj* s = oas->s;
1040   grpc_cronet_transport* t = s->curr_ct;
1041   struct op_state* stream_state = &s->state;
1042   enum e_op_result result = NO_ACTION_POSSIBLE;
1043   if (stream_op->send_initial_metadata &&
1044       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1045     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1046     /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1047      * on_failed */
1048     GPR_ASSERT(s->cbs == nullptr);
1049     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1050     s->cbs =
1051         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1052     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1053     if (t->use_packet_coalescing) {
1054       bidirectional_stream_disable_auto_flush(s->cbs, true);
1055       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1056     }
1057     std::string url;
1058     const char* method = "POST";
1059     s->header_array.headers = nullptr;
1060     convert_metadata_to_cronet_headers(
1061         stream_op->payload->send_initial_metadata.send_initial_metadata,
1062         t->host, &url, &s->header_array.headers, &s->header_array.count,
1063         &method);
1064     s->header_array.capacity = s->header_array.count;
1065     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1066                url.c_str());
1067     bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1068                                false);
1069     unsigned int header_index;
1070     for (header_index = 0; header_index < s->header_array.count;
1071          header_index++) {
1072       gpr_free((void*)s->header_array.headers[header_index].key);
1073       gpr_free((void*)s->header_array.headers[header_index].value);
1074     }
1075     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1076     if (t->use_packet_coalescing) {
1077       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1078         s->state.flush_cronet_when_ready = true;
1079       }
1080     }
1081     result = ACTION_TAKEN_WITH_CALLBACK;
1082   } else if (stream_op->send_message &&
1083              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1084     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1085     stream_state->pending_send_message = false;
1086     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1087         stream_state->state_callback_received[OP_FAILED] ||
1088         stream_state->state_callback_received[OP_SUCCEEDED]) {
1089       result = NO_ACTION_POSSIBLE;
1090       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1091     } else {
1092       grpc_slice_buffer write_slice_buffer;
1093       grpc_slice slice;
1094       grpc_slice_buffer_init(&write_slice_buffer);
1095       if (1 != stream_op->payload->send_message.send_message->Next(
1096                    stream_op->payload->send_message.send_message->length(),
1097                    nullptr)) {
1098         /* Should never reach here */
1099         GPR_ASSERT(false);
1100       }
1101       if (GRPC_ERROR_NONE !=
1102           stream_op->payload->send_message.send_message->Pull(&slice)) {
1103         /* Should never reach here */
1104         GPR_ASSERT(false);
1105       }
1106       grpc_slice_buffer_add(&write_slice_buffer, slice);
1107       if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1108         /* Empty request not handled yet */
1109         gpr_log(GPR_ERROR, "Empty request is not supported");
1110         GPR_ASSERT(write_slice_buffer.count == 1);
1111       }
1112       if (write_slice_buffer.count > 0) {
1113         size_t write_buffer_size;
1114         create_grpc_frame(
1115             &write_slice_buffer, &stream_state->ws.write_buffer,
1116             &write_buffer_size,
1117             stream_op->payload->send_message.send_message->flags());
1118         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1119                    stream_state->ws.write_buffer);
1120         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1121         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1122                                    static_cast<int>(write_buffer_size), false);
1123         grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1124         if (t->use_packet_coalescing) {
1125           if (!stream_op->send_trailing_metadata) {
1126             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1127             bidirectional_stream_flush(s->cbs);
1128             result = ACTION_TAKEN_WITH_CALLBACK;
1129           } else {
1130             stream_state->pending_write_for_trailer = true;
1131             result = ACTION_TAKEN_NO_CALLBACK;
1132           }
1133         } else {
1134           result = ACTION_TAKEN_WITH_CALLBACK;
1135         }
1136       } else {
1137         result = NO_ACTION_POSSIBLE;
1138       }
1139     }
1140     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1141     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1142     stream_op->payload->send_message.send_message.reset();
1143   } else if (stream_op->send_trailing_metadata &&
1144              op_can_be_run(stream_op, s, &oas->state,
1145                            OP_SEND_TRAILING_METADATA)) {
1146     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1147     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1148         stream_state->state_callback_received[OP_FAILED] ||
1149         stream_state->state_callback_received[OP_SUCCEEDED]) {
1150       result = NO_ACTION_POSSIBLE;
1151       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1152     } else {
1153       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1154       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1155       bidirectional_stream_write(s->cbs, "", 0, true);
1156       if (t->use_packet_coalescing) {
1157         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1158         bidirectional_stream_flush(s->cbs);
1159       }
1160       result = ACTION_TAKEN_WITH_CALLBACK;
1161     }
1162     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1163   } else if (stream_op->recv_initial_metadata &&
1164              op_can_be_run(stream_op, s, &oas->state,
1165                            OP_RECV_INITIAL_METADATA)) {
1166     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1167     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1168       grpc_core::ExecCtx::Run(
1169           DEBUG_LOCATION,
1170           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1171           GRPC_ERROR_NONE);
1172     } else if (stream_state->state_callback_received[OP_FAILED]) {
1173       grpc_core::ExecCtx::Run(
1174           DEBUG_LOCATION,
1175           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1176           GRPC_ERROR_NONE);
1177     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1178       grpc_core::ExecCtx::Run(
1179           DEBUG_LOCATION,
1180           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1181           GRPC_ERROR_NONE);
1182     } else {
1183       grpc_chttp2_incoming_metadata_buffer_publish(
1184           &oas->s->state.rs.initial_metadata,
1185           stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1186       grpc_core::ExecCtx::Run(
1187           DEBUG_LOCATION,
1188           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1189           GRPC_ERROR_NONE);
1190     }
1191     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1192     result = ACTION_TAKEN_NO_CALLBACK;
1193   } else if (stream_op->recv_message &&
1194              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1195     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1196     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1197       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1198       grpc_core::ExecCtx::Run(
1199           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1200           GRPC_ERROR_NONE);
1201       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1202       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1203       result = ACTION_TAKEN_NO_CALLBACK;
1204     } else if (stream_state->state_callback_received[OP_FAILED]) {
1205       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1206       grpc_core::ExecCtx::Run(
1207           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1208           GRPC_ERROR_NONE);
1209       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1210       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1211       result = ACTION_TAKEN_NO_CALLBACK;
1212     } else if (stream_state->rs.read_stream_closed == true) {
1213       /* No more data will be received */
1214       CRONET_LOG(GPR_DEBUG, "read stream closed");
1215       grpc_core::ExecCtx::Run(
1216           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1217           GRPC_ERROR_NONE);
1218       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1219       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1220       result = ACTION_TAKEN_NO_CALLBACK;
1221     } else if (stream_state->flush_read) {
1222       CRONET_LOG(GPR_DEBUG, "flush read");
1223       grpc_core::ExecCtx::Run(
1224           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1225           GRPC_ERROR_NONE);
1226       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1227       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1228       result = ACTION_TAKEN_NO_CALLBACK;
1229     } else if (stream_state->rs.length_field_received == false) {
1230       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1231           stream_state->rs.remaining_bytes == 0) {
1232         /* Start a read operation for data */
1233         stream_state->rs.length_field_received = true;
1234         parse_grpc_header(
1235             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1236             &stream_state->rs.length_field, &stream_state->rs.compressed);
1237         CRONET_LOG(GPR_DEBUG, "length field = %d",
1238                    stream_state->rs.length_field);
1239         if (stream_state->rs.length_field > 0) {
1240           stream_state->rs.read_buffer = static_cast<char*>(
1241               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1242           GPR_ASSERT(stream_state->rs.read_buffer);
1243           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1244           stream_state->rs.received_bytes = 0;
1245           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1246           stream_state->state_op_done[OP_READ_REQ_MADE] =
1247               true; /* Indicates that at least one read request has been made */
1248           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1249                                     stream_state->rs.remaining_bytes);
1250           stream_state->pending_read_from_cronet = true;
1251           result = ACTION_TAKEN_WITH_CALLBACK;
1252         } else {
1253           stream_state->rs.remaining_bytes = 0;
1254           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1255           /* Clean up read_slice_buffer in case there is unread data. */
1256           grpc_slice_buffer_destroy_internal(
1257               &stream_state->rs.read_slice_buffer);
1258           grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1259           uint32_t flags = 0;
1260           if (stream_state->rs.compressed) {
1261             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1262           }
1263           stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1264           stream_op->payload->recv_message.recv_message->reset(
1265               stream_state->rs.sbs.get());
1266           grpc_core::ExecCtx::Run(
1267               DEBUG_LOCATION,
1268               stream_op->payload->recv_message.recv_message_ready,
1269               GRPC_ERROR_NONE);
1270           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1271           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1272 
1273           /* Extra read to trigger on_succeed */
1274           stream_state->rs.length_field_received = false;
1275           stream_state->state_op_done[OP_READ_REQ_MADE] =
1276               true; /* Indicates that at least one read request has been made */
1277           read_grpc_header(s);
1278           result = ACTION_TAKEN_NO_CALLBACK;
1279         }
1280       } else if (stream_state->rs.remaining_bytes == 0) {
1281         /* Start a read operation for first 5 bytes (GRPC header) */
1282         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1283         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1284         stream_state->rs.received_bytes = 0;
1285         stream_state->rs.compressed = false;
1286         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1287         stream_state->state_op_done[OP_READ_REQ_MADE] =
1288             true; /* Indicates that at least one read request has been made */
1289         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1290                                   stream_state->rs.remaining_bytes);
1291         stream_state->pending_read_from_cronet = true;
1292         result = ACTION_TAKEN_WITH_CALLBACK;
1293       } else {
1294         result = NO_ACTION_POSSIBLE;
1295       }
1296     } else if (stream_state->rs.remaining_bytes == 0) {
1297       CRONET_LOG(GPR_DEBUG, "read operation complete");
1298       grpc_slice read_data_slice =
1299           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1300       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1301       memcpy(dst_p, stream_state->rs.read_buffer,
1302              static_cast<size_t>(stream_state->rs.length_field));
1303       null_and_maybe_free_read_buffer(s);
1304       /* Clean up read_slice_buffer in case there is unread data. */
1305       grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1306       grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1307       grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1308                             read_data_slice);
1309       uint32_t flags = 0;
1310       if (stream_state->rs.compressed) {
1311         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1312       }
1313       stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1314       stream_op->payload->recv_message.recv_message->reset(
1315           stream_state->rs.sbs.get());
1316       grpc_core::ExecCtx::Run(
1317           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1318           GRPC_ERROR_NONE);
1319       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1320       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1321       /* Do an extra read to trigger on_succeeded() callback in case connection
1322          is closed */
1323       stream_state->rs.length_field_received = false;
1324       read_grpc_header(s);
1325       result = ACTION_TAKEN_NO_CALLBACK;
1326     }
1327   } else if (stream_op->recv_trailing_metadata &&
1328              op_can_be_run(stream_op, s, &oas->state,
1329                            OP_RECV_TRAILING_METADATA)) {
1330     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1331     grpc_error* error = GRPC_ERROR_NONE;
1332     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1333       error = GRPC_ERROR_REF(stream_state->cancel_error);
1334     } else if (stream_state->state_callback_received[OP_FAILED]) {
1335       error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1336     } else if (oas->s->state.rs.trailing_metadata_valid) {
1337       grpc_chttp2_incoming_metadata_buffer_publish(
1338           &oas->s->state.rs.trailing_metadata,
1339           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1340       stream_state->rs.trailing_metadata_valid = false;
1341     }
1342     grpc_core::ExecCtx::Run(
1343         DEBUG_LOCATION,
1344         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1345         error);
1346     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1347     result = ACTION_TAKEN_NO_CALLBACK;
1348   } else if (stream_op->cancel_stream &&
1349              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1350     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1351     if (s->cbs) {
1352       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1353       bidirectional_stream_cancel(s->cbs);
1354       result = ACTION_TAKEN_WITH_CALLBACK;
1355     } else {
1356       result = ACTION_TAKEN_NO_CALLBACK;
1357     }
1358     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1359     if (!stream_state->cancel_error) {
1360       stream_state->cancel_error =
1361           GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1362     }
1363   } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1364     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1365     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1366       if (stream_op->on_complete) {
1367         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1368                                 GRPC_ERROR_REF(stream_state->cancel_error));
1369       }
1370     } else if (stream_state->state_callback_received[OP_FAILED]) {
1371       if (stream_op->on_complete) {
1372         grpc_core::ExecCtx::Run(
1373             DEBUG_LOCATION, stream_op->on_complete,
1374             make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1375       }
1376     } else {
1377       /* All actions in this stream_op are complete. Call the on_complete
1378        * callback
1379        */
1380       if (stream_op->on_complete) {
1381         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1382                                 GRPC_ERROR_NONE);
1383       }
1384     }
1385     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1386     oas->done = true;
1387     /* reset any send message state, only if this ON_COMPLETE is about a send.
1388      */
1389     if (stream_op->send_message) {
1390       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1391       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1392     }
1393     result = ACTION_TAKEN_NO_CALLBACK;
1394     /* If this is the on_complete callback being called for a received message -
1395       make a note */
1396     if (stream_op->recv_message)
1397       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1398   } else {
1399     result = NO_ACTION_POSSIBLE;
1400   }
1401   return result;
1402 }
1403 
1404 /*
1405   Functions used by upper layers to access transport functionality.
1406 */
1407 
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1408 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1409                               grpc_stream_refcount* refcount,
1410                               grpc_core::Arena* arena)
1411     : arena(arena),
1412       curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1413       curr_gs(gs),
1414       state(arena),
1415       refcount(refcount) {
1416   GRPC_CRONET_STREAM_REF(this, "cronet transport");
1417   gpr_mu_init(&mu);
1418 }
1419 
~stream_obj()1420 inline stream_obj::~stream_obj() {
1421   null_and_maybe_free_read_buffer(this);
1422   /* Clean up read_slice_buffer in case there is unread data. */
1423   grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1424   GRPC_ERROR_UNREF(state.cancel_error);
1425 }
1426 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)1427 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1428                        grpc_stream_refcount* refcount, const void* server_data,
1429                        grpc_core::Arena* arena) {
1430   new (gs) stream_obj(gt, gs, refcount, arena);
1431   return 0;
1432 }
1433 
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1434 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1435                                    grpc_pollset* pollset) {}
1436 
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1437 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1438                                        grpc_pollset_set* pollset_set) {}
1439 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1440 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1441                               grpc_transport_stream_op_batch* op) {
1442   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1443   if (op->send_initial_metadata &&
1444       header_has_authority(op->payload->send_initial_metadata
1445                                .send_initial_metadata->list.head)) {
1446     /* Cronet does not support :authority header field. We cancel the call when
1447      this field is present in metadata */
1448     if (op->recv_initial_metadata) {
1449       grpc_core::ExecCtx::Run(
1450           DEBUG_LOCATION,
1451           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1452           GRPC_ERROR_CANCELLED);
1453     }
1454     if (op->recv_message) {
1455       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1456                               op->payload->recv_message.recv_message_ready,
1457                               GRPC_ERROR_CANCELLED);
1458     }
1459     if (op->recv_trailing_metadata) {
1460       grpc_core::ExecCtx::Run(
1461           DEBUG_LOCATION,
1462           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1463           GRPC_ERROR_CANCELLED);
1464     }
1465     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1466                             GRPC_ERROR_CANCELLED);
1467     return;
1468   }
1469   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1470   add_to_storage(s, op);
1471   execute_from_storage(s);
1472 }
1473 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1474 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1475                            grpc_closure* then_schedule_closure) {
1476   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1477   s->~stream_obj();
1478   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1479                           GRPC_ERROR_NONE);
1480 }
1481 
destroy_transport(grpc_transport * gt)1482 static void destroy_transport(grpc_transport* gt) {}
1483 
get_endpoint(grpc_transport * gt)1484 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1485 
perform_op(grpc_transport * gt,grpc_transport_op * op)1486 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1487 
1488 static const grpc_transport_vtable grpc_cronet_vtable = {
1489     sizeof(stream_obj),
1490     "cronet_http",
1491     init_stream,
1492     set_pollset_do_nothing,
1493     set_pollset_set_do_nothing,
1494     perform_stream_op,
1495     perform_op,
1496     destroy_stream,
1497     destroy_transport,
1498     get_endpoint};
1499 
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1500 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1501                                              const grpc_channel_args* args,
1502                                              void* reserved) {
1503   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1504       gpr_malloc(sizeof(grpc_cronet_transport)));
1505   if (!ct) {
1506     goto error;
1507   }
1508   ct->base.vtable = &grpc_cronet_vtable;
1509   ct->engine = static_cast<stream_engine*>(engine);
1510   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1511   if (!ct->host) {
1512     goto error;
1513   }
1514   strcpy(ct->host, target);
1515 
1516   ct->use_packet_coalescing = true;
1517   if (args) {
1518     for (size_t i = 0; i < args->num_args; i++) {
1519       if (0 ==
1520           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1521         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1522           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1523                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1524         } else {
1525           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1526         }
1527       }
1528     }
1529   }
1530 
1531   return &ct->base;
1532 
1533 error:
1534   if (ct) {
1535     if (ct->host) {
1536       gpr_free(ct->host);
1537     }
1538     gpr_free(ct);
1539   }
1540 
1541   return nullptr;
1542 }
1543