• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_call.h"
22 #include "rb_grpc_imports.generated.h"
23 
24 #include <grpc/grpc.h>
25 #include <grpc/impl/codegen/compression_types.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 
29 #include "rb_byte_buffer.h"
30 #include "rb_call_credentials.h"
31 #include "rb_completion_queue.h"
32 #include "rb_grpc.h"
33 
34 /* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
35 static VALUE grpc_rb_cCall;
36 
37 /* grpc_rb_eCallError is the ruby class of the exception thrown during call
38    operations; */
39 VALUE grpc_rb_eCallError = Qnil;
40 
41 /* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
42    a timeout. */
43 static VALUE grpc_rb_eOutOfTime = Qnil;
44 
45 /* grpc_rb_sBatchResult is struct class used to hold the results of a batch
46  * call. */
47 static VALUE grpc_rb_sBatchResult;
48 
49 /* grpc_rb_cMdAry is the MetadataArray class whose instances proxy
50  * grpc_metadata_array. */
51 static VALUE grpc_rb_cMdAry;
52 
53 /* id_credentials is the name of the hidden ivar that preserves the value
54  * of the credentials added to the call */
55 static ID id_credentials;
56 
57 /* id_metadata is name of the attribute used to access the metadata hash
58  * received by the call and subsequently saved on it. */
59 static ID id_metadata;
60 
61 /* id_trailing_metadata is the name of the attribute used to access the trailing
62  * metadata hash received by the call and subsequently saved on it. */
63 static ID id_trailing_metadata;
64 
65 /* id_status is name of the attribute used to access the status object
66  * received by the call and subsequently saved on it. */
67 static ID id_status;
68 
69 /* id_write_flag is name of the attribute used to access the write_flag
70  * saved on the call. */
71 static ID id_write_flag;
72 
73 /* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
74 static VALUE sym_send_message;
75 static VALUE sym_send_metadata;
76 static VALUE sym_send_close;
77 static VALUE sym_send_status;
78 static VALUE sym_message;
79 static VALUE sym_status;
80 static VALUE sym_cancelled;
81 
82 typedef struct grpc_rb_call {
83   grpc_call* wrapped;
84   grpc_completion_queue* queue;
85 } grpc_rb_call;
86 
destroy_call(grpc_rb_call * call)87 static void destroy_call(grpc_rb_call* call) {
88   /* Ensure that we only try to destroy the call once */
89   if (call->wrapped != NULL) {
90     grpc_call_unref(call->wrapped);
91     call->wrapped = NULL;
92     grpc_rb_completion_queue_destroy(call->queue);
93     call->queue = NULL;
94   }
95 }
96 
97 /* Destroys a Call. */
grpc_rb_call_destroy(void * p)98 static void grpc_rb_call_destroy(void* p) {
99   if (p == NULL) {
100     return;
101   }
102   destroy_call((grpc_rb_call*)p);
103   xfree(p);
104 }
105 
106 static const rb_data_type_t grpc_rb_md_ary_data_type = {
107     "grpc_metadata_array",
108     {GRPC_RB_GC_NOT_MARKED,
109      GRPC_RB_GC_DONT_FREE,
110      GRPC_RB_MEMSIZE_UNAVAILABLE,
111      {NULL, NULL}},
112     NULL,
113     NULL,
114 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
115     /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
116      * grpc_rb_call_destroy
117      * touches a hash object.
118      * TODO(yugui) Directly use st_table and call the free function earlier?
119      */
120     0,
121 #endif
122 };
123 
124 /* Describes grpc_call struct for RTypedData */
125 static const rb_data_type_t grpc_call_data_type = {"grpc_call",
126                                                    {GRPC_RB_GC_NOT_MARKED,
127                                                     grpc_rb_call_destroy,
128                                                     GRPC_RB_MEMSIZE_UNAVAILABLE,
129                                                     {NULL, NULL}},
130                                                    NULL,
131                                                    NULL,
132 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
133                                                    RUBY_TYPED_FREE_IMMEDIATELY
134 #endif
135 };
136 
137 /* Error code details is a hash containing text strings describing errors */
138 VALUE rb_error_code_details;
139 
140 /* Obtains the error detail string for given error code */
grpc_call_error_detail_of(grpc_call_error err)141 const char* grpc_call_error_detail_of(grpc_call_error err) {
142   VALUE detail_ref = rb_hash_aref(rb_error_code_details, UINT2NUM(err));
143   const char* detail = "unknown error code!";
144   if (detail_ref != Qnil) {
145     detail = StringValueCStr(detail_ref);
146   }
147   return detail;
148 }
149 
150 /* Called by clients to cancel an RPC on the server.
151    Can be called multiple times, from any thread. */
grpc_rb_call_cancel(VALUE self)152 static VALUE grpc_rb_call_cancel(VALUE self) {
153   grpc_rb_call* call = NULL;
154   grpc_call_error err;
155   if (RTYPEDDATA_DATA(self) == NULL) {
156     // This call has been closed
157     return Qnil;
158   }
159 
160   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
161   err = grpc_call_cancel(call->wrapped, NULL);
162   if (err != GRPC_CALL_OK) {
163     rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
164              grpc_call_error_detail_of(err), err);
165   }
166 
167   return Qnil;
168 }
169 
170 /* TODO: expose this as part of the surface API if needed.
171  * This is meant for internal usage by the "write thread" of grpc-ruby
172  * client-side bidi calls. It provides a way for the background write-thread
173  * to propogate failures to the main read-thread and give the user an error
174  * message. */
grpc_rb_call_cancel_with_status(VALUE self,VALUE status_code,VALUE details)175 static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code,
176                                              VALUE details) {
177   grpc_rb_call* call = NULL;
178   grpc_call_error err;
179   if (RTYPEDDATA_DATA(self) == NULL) {
180     // This call has been closed
181     return Qnil;
182   }
183 
184   if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) {
185     rb_raise(rb_eTypeError,
186              "Bad parameter type error for cancel with status. Want Fixnum, "
187              "String.");
188     return Qnil;
189   }
190 
191   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
192   err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code),
193                                      StringValueCStr(details), NULL);
194   if (err != GRPC_CALL_OK) {
195     rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)",
196              grpc_call_error_detail_of(err), err);
197   }
198 
199   return Qnil;
200 }
201 
202 /* Releases the c-level resources associated with a call
203    Once a call has been closed, no further requests can be
204    processed.
205 */
grpc_rb_call_close(VALUE self)206 static VALUE grpc_rb_call_close(VALUE self) {
207   grpc_rb_call* call = NULL;
208   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
209   if (call != NULL) {
210     destroy_call(call);
211     xfree(RTYPEDDATA_DATA(self));
212     RTYPEDDATA_DATA(self) = NULL;
213   }
214   return Qnil;
215 }
216 
217 /* Called to obtain the peer that this call is connected to. */
grpc_rb_call_get_peer(VALUE self)218 static VALUE grpc_rb_call_get_peer(VALUE self) {
219   VALUE res = Qnil;
220   grpc_rb_call* call = NULL;
221   char* peer = NULL;
222   if (RTYPEDDATA_DATA(self) == NULL) {
223     rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
224     return Qnil;
225   }
226   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
227   peer = grpc_call_get_peer(call->wrapped);
228   res = rb_str_new2(peer);
229   gpr_free(peer);
230 
231   return res;
232 }
233 
234 /* Called to obtain the x509 cert of an authenticated peer. */
grpc_rb_call_get_peer_cert(VALUE self)235 static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
236   grpc_rb_call* call = NULL;
237   VALUE res = Qnil;
238   grpc_auth_context* ctx = NULL;
239   if (RTYPEDDATA_DATA(self) == NULL) {
240     rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
241     return Qnil;
242   }
243   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
244 
245   ctx = grpc_call_auth_context(call->wrapped);
246 
247   if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) {
248     return Qnil;
249   }
250 
251   {
252     grpc_auth_property_iterator it = grpc_auth_context_find_properties_by_name(
253         ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME);
254     const grpc_auth_property* prop = grpc_auth_property_iterator_next(&it);
255     if (prop == NULL) {
256       return Qnil;
257     }
258 
259     res = rb_str_new2(prop->value);
260   }
261 
262   grpc_auth_context_release(ctx);
263 
264   return res;
265 }
266 
267 /*
268   call-seq:
269   status = call.status
270 
271   Gets the status object saved the call.  */
grpc_rb_call_get_status(VALUE self)272 static VALUE grpc_rb_call_get_status(VALUE self) {
273   return rb_ivar_get(self, id_status);
274 }
275 
276 /*
277   call-seq:
278   call.status = status
279 
280   Saves a status object on the call.  */
grpc_rb_call_set_status(VALUE self,VALUE status)281 static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
282   if (!NIL_P(status) && rb_obj_class(status) != grpc_rb_sStatus) {
283     rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
284              rb_obj_classname(status));
285     return Qnil;
286   }
287 
288   return rb_ivar_set(self, id_status, status);
289 }
290 
291 /*
292   call-seq:
293   metadata = call.metadata
294 
295   Gets the metadata object saved the call.  */
grpc_rb_call_get_metadata(VALUE self)296 static VALUE grpc_rb_call_get_metadata(VALUE self) {
297   return rb_ivar_get(self, id_metadata);
298 }
299 
300 /*
301   call-seq:
302   call.metadata = metadata
303 
304   Saves the metadata hash on the call.  */
grpc_rb_call_set_metadata(VALUE self,VALUE metadata)305 static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
306   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
307     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
308              rb_obj_classname(metadata));
309     return Qnil;
310   }
311 
312   return rb_ivar_set(self, id_metadata, metadata);
313 }
314 
315 /*
316   call-seq:
317   trailing_metadata = call.trailing_metadata
318 
319   Gets the trailing metadata object saved on the call */
grpc_rb_call_get_trailing_metadata(VALUE self)320 static VALUE grpc_rb_call_get_trailing_metadata(VALUE self) {
321   return rb_ivar_get(self, id_trailing_metadata);
322 }
323 
324 /*
325   call-seq:
326   call.trailing_metadata = trailing_metadata
327 
328   Saves the trailing metadata hash on the call. */
grpc_rb_call_set_trailing_metadata(VALUE self,VALUE metadata)329 static VALUE grpc_rb_call_set_trailing_metadata(VALUE self, VALUE metadata) {
330   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
331     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
332              rb_obj_classname(metadata));
333     return Qnil;
334   }
335 
336   return rb_ivar_set(self, id_trailing_metadata, metadata);
337 }
338 
339 /*
340   call-seq:
341   write_flag = call.write_flag
342 
343   Gets the write_flag value saved the call.  */
grpc_rb_call_get_write_flag(VALUE self)344 static VALUE grpc_rb_call_get_write_flag(VALUE self) {
345   return rb_ivar_get(self, id_write_flag);
346 }
347 
348 /*
349   call-seq:
350   call.write_flag = write_flag
351 
352   Saves the write_flag on the call.  */
grpc_rb_call_set_write_flag(VALUE self,VALUE write_flag)353 static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
354   if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
355     rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
356              rb_obj_classname(write_flag));
357     return Qnil;
358   }
359 
360   return rb_ivar_set(self, id_write_flag, write_flag);
361 }
362 
363 /*
364   call-seq:
365   call.set_credentials call_credentials
366 
367   Sets credentials on a call */
grpc_rb_call_set_credentials(VALUE self,VALUE credentials)368 static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
369   grpc_rb_call* call = NULL;
370   grpc_call_credentials* creds;
371   grpc_call_error err;
372   if (RTYPEDDATA_DATA(self) == NULL) {
373     rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
374     return Qnil;
375   }
376   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
377   creds = grpc_rb_get_wrapped_call_credentials(credentials);
378   err = grpc_call_set_credentials(call->wrapped, creds);
379   if (err != GRPC_CALL_OK) {
380     rb_raise(grpc_rb_eCallError,
381              "grpc_call_set_credentials failed with %s (code=%d)",
382              grpc_call_error_detail_of(err), err);
383   }
384   /* We need the credentials to be alive for as long as the call is alive,
385      but we don't care about destruction order. */
386   rb_ivar_set(self, id_credentials, credentials);
387   return Qnil;
388 }
389 
390 /* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
391    to fill grpc_metadata_array.
392 
393    it's capacity should have been computed via a prior call to
394    grpc_rb_md_ary_capacity_hash_cb
395 */
grpc_rb_md_ary_fill_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)396 static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
397   grpc_metadata_array* md_ary = NULL;
398   long array_length;
399   long i;
400   grpc_slice key_slice;
401   grpc_slice value_slice;
402   char* tmp_str = NULL;
403 
404   if (TYPE(key) == T_SYMBOL) {
405     key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key)));
406   } else if (TYPE(key) == T_STRING) {
407     key_slice =
408         grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key));
409   } else {
410     rb_raise(rb_eTypeError,
411              "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
412     return ST_STOP;
413   }
414 
415   if (!grpc_header_key_is_legal(key_slice)) {
416     tmp_str = grpc_slice_to_c_string(key_slice);
417     rb_raise(rb_eArgError,
418              "'%s' is an invalid header key, must match [a-z0-9-_.]+", tmp_str);
419     return ST_STOP;
420   }
421 
422   /* Construct a metadata object from key and value and add it */
423   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
424                        &grpc_rb_md_ary_data_type, md_ary);
425 
426   if (TYPE(val) == T_ARRAY) {
427     array_length = RARRAY_LEN(val);
428     /* If the value is an array, add capacity for each value in the array */
429     for (i = 0; i < array_length; i++) {
430       value_slice = grpc_slice_from_copied_buffer(
431           RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i)));
432       if (!grpc_is_binary_header(key_slice) &&
433           !grpc_header_nonbin_value_is_legal(value_slice)) {
434         // The value has invalid characters
435         tmp_str = grpc_slice_to_c_string(value_slice);
436         rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
437                  tmp_str);
438         return ST_STOP;
439       }
440       GPR_ASSERT(md_ary->count < md_ary->capacity);
441       md_ary->metadata[md_ary->count].key = key_slice;
442       md_ary->metadata[md_ary->count].value = value_slice;
443       md_ary->count += 1;
444     }
445   } else if (TYPE(val) == T_STRING) {
446     value_slice =
447         grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val));
448     if (!grpc_is_binary_header(key_slice) &&
449         !grpc_header_nonbin_value_is_legal(value_slice)) {
450       // The value has invalid characters
451       tmp_str = grpc_slice_to_c_string(value_slice);
452       rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
453                tmp_str);
454       return ST_STOP;
455     }
456     GPR_ASSERT(md_ary->count < md_ary->capacity);
457     md_ary->metadata[md_ary->count].key = key_slice;
458     md_ary->metadata[md_ary->count].value = value_slice;
459     md_ary->count += 1;
460   } else {
461     rb_raise(rb_eArgError, "Header values must be of type string or array");
462     return ST_STOP;
463   }
464   return ST_CONTINUE;
465 }
466 
467 /* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
468    to pre-compute the capacity a grpc_metadata_array.
469 */
grpc_rb_md_ary_capacity_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)470 static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
471                                            VALUE md_ary_obj) {
472   grpc_metadata_array* md_ary = NULL;
473 
474   (void)key;
475 
476   /* Construct a metadata object from key and value and add it */
477   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
478                        &grpc_rb_md_ary_data_type, md_ary);
479 
480   if (TYPE(val) == T_ARRAY) {
481     /* If the value is an array, add capacity for each value in the array */
482     md_ary->capacity += RARRAY_LEN(val);
483   } else {
484     md_ary->capacity += 1;
485   }
486 
487   return ST_CONTINUE;
488 }
489 
490 /* grpc_rb_md_ary_convert converts a ruby metadata hash into
491    a grpc_metadata_array.
492 */
grpc_rb_md_ary_convert(VALUE md_ary_hash,grpc_metadata_array * md_ary)493 void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array* md_ary) {
494   VALUE md_ary_obj = Qnil;
495   if (md_ary_hash == Qnil) {
496     return; /* Do nothing if the expected has value is nil */
497   }
498   if (TYPE(md_ary_hash) != T_HASH) {
499     rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
500              rb_obj_classname(md_ary_hash));
501     return;
502   }
503 
504   /* Initialize the array, compute it's capacity, then fill it. */
505   grpc_metadata_array_init(md_ary);
506   md_ary_obj =
507       TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
508   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
509   md_ary->metadata = gpr_zalloc(md_ary->capacity * sizeof(grpc_metadata));
510   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
511 }
512 
513 /* Converts a metadata array to a hash. */
grpc_rb_md_ary_to_h(grpc_metadata_array * md_ary)514 VALUE grpc_rb_md_ary_to_h(grpc_metadata_array* md_ary) {
515   VALUE key = Qnil;
516   VALUE new_ary = Qnil;
517   VALUE value = Qnil;
518   VALUE result = rb_hash_new();
519   size_t i;
520 
521   for (i = 0; i < md_ary->count; i++) {
522     key = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].key);
523     value = rb_hash_aref(result, key);
524     if (value == Qnil) {
525       value = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value);
526       rb_hash_aset(result, key, value);
527     } else if (TYPE(value) == T_ARRAY) {
528       /* Add the string to the returned array */
529       rb_ary_push(value,
530                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
531     } else {
532       /* Add the current value with this key and the new one to an array */
533       new_ary = rb_ary_new();
534       rb_ary_push(new_ary, value);
535       rb_ary_push(new_ary,
536                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
537       rb_hash_aset(result, key, new_ary);
538     }
539   }
540   return result;
541 }
542 
543 /* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
544    each key of an ops hash is valid.
545 */
grpc_rb_call_check_op_keys_hash_cb(VALUE key,VALUE val,VALUE ops_ary)546 static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
547                                               VALUE ops_ary) {
548   (void)val;
549   /* Update the capacity; the value is an array, add capacity for each value in
550    * the array */
551   if (TYPE(key) != T_FIXNUM) {
552     rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>",
553              rb_obj_classname(key));
554     return ST_STOP;
555   }
556   switch (NUM2INT(key)) {
557     case GRPC_OP_SEND_INITIAL_METADATA:
558     case GRPC_OP_SEND_MESSAGE:
559     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
560     case GRPC_OP_SEND_STATUS_FROM_SERVER:
561     case GRPC_OP_RECV_INITIAL_METADATA:
562     case GRPC_OP_RECV_MESSAGE:
563     case GRPC_OP_RECV_STATUS_ON_CLIENT:
564     case GRPC_OP_RECV_CLOSE_ON_SERVER:
565       rb_ary_push(ops_ary, key);
566       return ST_CONTINUE;
567     default:
568       rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
569   };
570   return ST_STOP;
571 }
572 
573 /* grpc_rb_op_update_status_from_server adds the values in a ruby status
574    struct to the 'send_status_from_server' portion of an op.
575 */
grpc_rb_op_update_status_from_server(grpc_op * op,grpc_metadata_array * md_ary,grpc_slice * send_status_details,VALUE status)576 static void grpc_rb_op_update_status_from_server(
577     grpc_op* op, grpc_metadata_array* md_ary, grpc_slice* send_status_details,
578     VALUE status) {
579   VALUE code = rb_struct_aref(status, sym_code);
580   VALUE details = rb_struct_aref(status, sym_details);
581   VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
582 
583   /* TODO: add check to ensure status is the correct struct type */
584   if (TYPE(code) != T_FIXNUM) {
585     rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>",
586              rb_obj_classname(code));
587     return;
588   }
589   if (TYPE(details) != T_STRING) {
590     rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>",
591              rb_obj_classname(code));
592     return;
593   }
594 
595   *send_status_details =
596       grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details));
597 
598   op->data.send_status_from_server.status = NUM2INT(code);
599   op->data.send_status_from_server.status_details = send_status_details;
600   grpc_rb_md_ary_convert(metadata_hash, md_ary);
601   op->data.send_status_from_server.trailing_metadata_count = md_ary->count;
602   op->data.send_status_from_server.trailing_metadata = md_ary->metadata;
603 }
604 
605 /* run_batch_stack holds various values used by the
606  * grpc_rb_call_run_batch function */
607 typedef struct run_batch_stack {
608   /* The batch ops */
609   grpc_op ops[8]; /* 8 is the maximum number of operations */
610   size_t op_num;  /* tracks the last added operation */
611 
612   /* Data being sent */
613   grpc_metadata_array send_metadata;
614   grpc_metadata_array send_trailing_metadata;
615 
616   /* Data being received */
617   grpc_byte_buffer* recv_message;
618   grpc_metadata_array recv_metadata;
619   grpc_metadata_array recv_trailing_metadata;
620   int recv_cancelled;
621   grpc_status_code recv_status;
622   grpc_slice recv_status_details;
623   unsigned write_flag;
624   grpc_slice send_status_details;
625 } run_batch_stack;
626 
627 /* grpc_run_batch_stack_init ensures the run_batch_stack is properly
628  * initialized */
grpc_run_batch_stack_init(run_batch_stack * st,unsigned write_flag)629 static void grpc_run_batch_stack_init(run_batch_stack* st,
630                                       unsigned write_flag) {
631   MEMZERO(st, run_batch_stack, 1);
632   grpc_metadata_array_init(&st->send_metadata);
633   grpc_metadata_array_init(&st->send_trailing_metadata);
634   grpc_metadata_array_init(&st->recv_metadata);
635   grpc_metadata_array_init(&st->recv_trailing_metadata);
636   st->op_num = 0;
637   st->write_flag = write_flag;
638 }
639 
grpc_rb_metadata_array_destroy_including_entries(grpc_metadata_array * array)640 void grpc_rb_metadata_array_destroy_including_entries(
641     grpc_metadata_array* array) {
642   size_t i;
643   if (array->metadata) {
644     for (i = 0; i < array->count; i++) {
645       grpc_slice_unref(array->metadata[i].key);
646       grpc_slice_unref(array->metadata[i].value);
647     }
648   }
649   grpc_metadata_array_destroy(array);
650 }
651 
652 /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
653  * cleaned up */
grpc_run_batch_stack_cleanup(run_batch_stack * st)654 static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
655   size_t i = 0;
656 
657   grpc_rb_metadata_array_destroy_including_entries(&st->send_metadata);
658   grpc_rb_metadata_array_destroy_including_entries(&st->send_trailing_metadata);
659   grpc_metadata_array_destroy(&st->recv_metadata);
660   grpc_metadata_array_destroy(&st->recv_trailing_metadata);
661 
662   if (GRPC_SLICE_START_PTR(st->send_status_details) != NULL) {
663     grpc_slice_unref(st->send_status_details);
664   }
665 
666   if (GRPC_SLICE_START_PTR(st->recv_status_details) != NULL) {
667     grpc_slice_unref(st->recv_status_details);
668   }
669 
670   if (st->recv_message != NULL) {
671     grpc_byte_buffer_destroy(st->recv_message);
672   }
673 
674   for (i = 0; i < st->op_num; i++) {
675     if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) {
676       grpc_byte_buffer_destroy(st->ops[i].data.send_message.send_message);
677     }
678   }
679 }
680 
681 /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
682  * ops_hash */
grpc_run_batch_stack_fill_ops(run_batch_stack * st,VALUE ops_hash)683 static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
684   VALUE this_op = Qnil;
685   VALUE this_value = Qnil;
686   VALUE ops_ary = rb_ary_new();
687   size_t i = 0;
688 
689   /* Create a ruby array with just the operation keys */
690   rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary);
691 
692   /* Fill the ops array */
693   for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
694     this_op = rb_ary_entry(ops_ary, i);
695     this_value = rb_hash_aref(ops_hash, this_op);
696     st->ops[st->op_num].flags = 0;
697     switch (NUM2INT(this_op)) {
698       case GRPC_OP_SEND_INITIAL_METADATA:
699         grpc_rb_md_ary_convert(this_value, &st->send_metadata);
700         st->ops[st->op_num].data.send_initial_metadata.count =
701             st->send_metadata.count;
702         st->ops[st->op_num].data.send_initial_metadata.metadata =
703             st->send_metadata.metadata;
704         break;
705       case GRPC_OP_SEND_MESSAGE:
706         st->ops[st->op_num].data.send_message.send_message =
707             grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
708                                      RSTRING_LEN(this_value));
709         st->ops[st->op_num].flags = st->write_flag;
710         break;
711       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
712         break;
713       case GRPC_OP_SEND_STATUS_FROM_SERVER:
714         grpc_rb_op_update_status_from_server(
715             &st->ops[st->op_num], &st->send_trailing_metadata,
716             &st->send_status_details, this_value);
717         break;
718       case GRPC_OP_RECV_INITIAL_METADATA:
719         st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata =
720             &st->recv_metadata;
721         break;
722       case GRPC_OP_RECV_MESSAGE:
723         st->ops[st->op_num].data.recv_message.recv_message = &st->recv_message;
724         break;
725       case GRPC_OP_RECV_STATUS_ON_CLIENT:
726         st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =
727             &st->recv_trailing_metadata;
728         st->ops[st->op_num].data.recv_status_on_client.status =
729             &st->recv_status;
730         st->ops[st->op_num].data.recv_status_on_client.status_details =
731             &st->recv_status_details;
732         break;
733       case GRPC_OP_RECV_CLOSE_ON_SERVER:
734         st->ops[st->op_num].data.recv_close_on_server.cancelled =
735             &st->recv_cancelled;
736         break;
737       default:
738         grpc_run_batch_stack_cleanup(st);
739         rb_raise(rb_eTypeError, "invalid operation : bad value %d",
740                  NUM2INT(this_op));
741     };
742     st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
743     st->ops[st->op_num].reserved = NULL;
744     st->op_num++;
745   }
746 }
747 
748 /* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
749    after the results have run */
grpc_run_batch_stack_build_result(run_batch_stack * st)750 static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
751   size_t i = 0;
752   VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
753                                Qnil, Qnil, Qnil, Qnil, NULL);
754   for (i = 0; i < st->op_num; i++) {
755     switch (st->ops[i].op) {
756       case GRPC_OP_SEND_INITIAL_METADATA:
757         rb_struct_aset(result, sym_send_metadata, Qtrue);
758         break;
759       case GRPC_OP_SEND_MESSAGE:
760         rb_struct_aset(result, sym_send_message, Qtrue);
761         break;
762       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
763         rb_struct_aset(result, sym_send_close, Qtrue);
764         break;
765       case GRPC_OP_SEND_STATUS_FROM_SERVER:
766         rb_struct_aset(result, sym_send_status, Qtrue);
767         break;
768       case GRPC_OP_RECV_INITIAL_METADATA:
769         rb_struct_aset(result, sym_metadata,
770                        grpc_rb_md_ary_to_h(&st->recv_metadata));
771       case GRPC_OP_RECV_MESSAGE:
772         rb_struct_aset(result, sym_message,
773                        grpc_rb_byte_buffer_to_s(st->recv_message));
774         break;
775       case GRPC_OP_RECV_STATUS_ON_CLIENT:
776         rb_struct_aset(
777             result, sym_status,
778             rb_struct_new(
779                 grpc_rb_sStatus, UINT2NUM(st->recv_status),
780                 (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL
781                      ? Qnil
782                      : grpc_rb_slice_to_ruby_string(st->recv_status_details)),
783                 grpc_rb_md_ary_to_h(&st->recv_trailing_metadata), NULL));
784         break;
785       case GRPC_OP_RECV_CLOSE_ON_SERVER:
786         rb_struct_aset(result, sym_send_close, Qtrue);
787         break;
788       default:
789         break;
790     }
791   }
792   return result;
793 }
794 
795 /* call-seq:
796    ops = {
797      GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
798      GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
799      ...
800    }
801    tag = Object.new
802    timeout = 10
803    call.start_batch(tag, timeout, ops)
804 
805    Start a batch of operations defined in the array ops; when complete, post a
806    completion of type 'tag' to the completion queue bound to the call.
807 
808    Also waits for the batch to complete, until timeout is reached.
809    The order of ops specified in the batch has no significance.
810    Only one operation of each type can be active at once in any given
811    batch */
grpc_rb_call_run_batch(VALUE self,VALUE ops_hash)812 static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
813   run_batch_stack* st = NULL;
814   grpc_rb_call* call = NULL;
815   grpc_event ev;
816   grpc_call_error err;
817   VALUE result = Qnil;
818   VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
819   unsigned write_flag = 0;
820   void* tag = (void*)&st;
821 
822   grpc_ruby_fork_guard();
823   if (RTYPEDDATA_DATA(self) == NULL) {
824     rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
825     return Qnil;
826   }
827   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
828 
829   /* Validate the ops args, adding them to a ruby array */
830   if (TYPE(ops_hash) != T_HASH) {
831     rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
832     return Qnil;
833   }
834   if (rb_write_flag != Qnil) {
835     write_flag = NUM2UINT(rb_write_flag);
836   }
837   st = gpr_malloc(sizeof(run_batch_stack));
838   grpc_run_batch_stack_init(st, write_flag);
839   grpc_run_batch_stack_fill_ops(st, ops_hash);
840 
841   /* call grpc_call_start_batch, then wait for it to complete using
842    * pluck_event */
843   err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL);
844   if (err != GRPC_CALL_OK) {
845     grpc_run_batch_stack_cleanup(st);
846     gpr_free(st);
847     rb_raise(grpc_rb_eCallError,
848              "grpc_call_start_batch failed with %s (code=%d)",
849              grpc_call_error_detail_of(err), err);
850     return Qnil;
851   }
852   ev = rb_completion_queue_pluck(call->queue, tag,
853                                  gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
854   if (!ev.success) {
855     rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
856   }
857   /* Build and return the BatchResult struct result,
858      if there is an error, it's reflected in the status */
859   result = grpc_run_batch_stack_build_result(st);
860   grpc_run_batch_stack_cleanup(st);
861   gpr_free(st);
862   return result;
863 }
864 
Init_grpc_write_flags()865 static void Init_grpc_write_flags() {
866   /* Constants representing the write flags in grpc.h */
867   VALUE grpc_rb_mWriteFlags =
868       rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
869   rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
870                   UINT2NUM(GRPC_WRITE_BUFFER_HINT));
871   rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
872                   UINT2NUM(GRPC_WRITE_NO_COMPRESS));
873 }
874 
Init_grpc_error_codes()875 static void Init_grpc_error_codes() {
876   /* Constants representing the error codes of grpc_call_error in grpc.h */
877   VALUE grpc_rb_mRpcErrors =
878       rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors");
879   rb_define_const(grpc_rb_mRpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
880   rb_define_const(grpc_rb_mRpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
881   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_SERVER",
882                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER));
883   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_CLIENT",
884                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT));
885   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_ACCEPTED",
886                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED));
887   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_INVOKED",
888                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED));
889   rb_define_const(grpc_rb_mRpcErrors, "NOT_INVOKED",
890                   UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED));
891   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_FINISHED",
892                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED));
893   rb_define_const(grpc_rb_mRpcErrors, "TOO_MANY_OPERATIONS",
894                   UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
895   rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS",
896                   UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS));
897 
898   /* Hint the GC that this is a global and shouldn't be sweeped. */
899   rb_global_variable(&rb_error_code_details);
900 
901   /* Add the detail strings to a Hash */
902   rb_error_code_details = rb_hash_new();
903   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_OK),
904                rb_str_new2("ok"));
905   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR),
906                rb_str_new2("unknown error"));
907   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER),
908                rb_str_new2("not available on a server"));
909   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT),
910                rb_str_new2("not available on a client"));
911   rb_hash_aset(rb_error_code_details,
912                UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED),
913                rb_str_new2("call is already accepted"));
914   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED),
915                rb_str_new2("call is already invoked"));
916   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED),
917                rb_str_new2("call is not yet invoked"));
918   rb_hash_aset(rb_error_code_details,
919                UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED),
920                rb_str_new2("call is already finished"));
921   rb_hash_aset(rb_error_code_details,
922                UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS),
923                rb_str_new2("outstanding read or write present"));
924   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS),
925                rb_str_new2("a bad flag was given"));
926   rb_define_const(grpc_rb_mRpcErrors, "ErrorMessages", rb_error_code_details);
927   rb_obj_freeze(rb_error_code_details);
928 }
929 
Init_grpc_op_codes()930 static void Init_grpc_op_codes() {
931   /* Constants representing operation type codes in grpc.h */
932   VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
933   rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
934                   UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
935   rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
936                   UINT2NUM(GRPC_OP_SEND_MESSAGE));
937   rb_define_const(grpc_rb_mCallOps, "SEND_CLOSE_FROM_CLIENT",
938                   UINT2NUM(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
939   rb_define_const(grpc_rb_mCallOps, "SEND_STATUS_FROM_SERVER",
940                   UINT2NUM(GRPC_OP_SEND_STATUS_FROM_SERVER));
941   rb_define_const(grpc_rb_mCallOps, "RECV_INITIAL_METADATA",
942                   UINT2NUM(GRPC_OP_RECV_INITIAL_METADATA));
943   rb_define_const(grpc_rb_mCallOps, "RECV_MESSAGE",
944                   UINT2NUM(GRPC_OP_RECV_MESSAGE));
945   rb_define_const(grpc_rb_mCallOps, "RECV_STATUS_ON_CLIENT",
946                   UINT2NUM(GRPC_OP_RECV_STATUS_ON_CLIENT));
947   rb_define_const(grpc_rb_mCallOps, "RECV_CLOSE_ON_SERVER",
948                   UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER));
949 }
950 
Init_grpc_metadata_keys()951 static void Init_grpc_metadata_keys() {
952   VALUE grpc_rb_mMetadataKeys =
953       rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
954   rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM",
955                   rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY));
956 }
957 
Init_grpc_call()958 void Init_grpc_call() {
959   /* CallError inherits from Exception to signal that it is non-recoverable */
960   grpc_rb_eCallError =
961       rb_define_class_under(grpc_rb_mGrpcCore, "CallError", rb_eException);
962   grpc_rb_eOutOfTime =
963       rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
964   grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
965   grpc_rb_cMdAry =
966       rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
967 
968   /* Prevent allocation or inialization of the Call class */
969   rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
970   rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
971   rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
972                    1);
973 
974   /* Add ruby analogues of the Call methods. */
975   rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
976   rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
977   rb_define_method(grpc_rb_cCall, "cancel_with_status",
978                    grpc_rb_call_cancel_with_status, 2);
979   rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
980   rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
981   rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
982   rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
983   rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
984   rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
985   rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
986   rb_define_method(grpc_rb_cCall, "trailing_metadata",
987                    grpc_rb_call_get_trailing_metadata, 0);
988   rb_define_method(grpc_rb_cCall,
989                    "trailing_metadata=", grpc_rb_call_set_trailing_metadata, 1);
990   rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
991   rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
992                    1);
993   rb_define_method(grpc_rb_cCall, "set_credentials!",
994                    grpc_rb_call_set_credentials, 1);
995 
996   /* Ids used to support call attributes */
997   id_metadata = rb_intern("metadata");
998   id_trailing_metadata = rb_intern("trailing_metadata");
999   id_status = rb_intern("status");
1000   id_write_flag = rb_intern("write_flag");
1001 
1002   /* Ids used by the c wrapping internals. */
1003   id_credentials = rb_intern("__credentials");
1004 
1005   /* Ids used in constructing the batch result. */
1006   sym_send_message = ID2SYM(rb_intern("send_message"));
1007   sym_send_metadata = ID2SYM(rb_intern("send_metadata"));
1008   sym_send_close = ID2SYM(rb_intern("send_close"));
1009   sym_send_status = ID2SYM(rb_intern("send_status"));
1010   sym_message = ID2SYM(rb_intern("message"));
1011   sym_status = ID2SYM(rb_intern("status"));
1012   sym_cancelled = ID2SYM(rb_intern("cancelled"));
1013 
1014   /* The Struct used to return the run_batch result. */
1015   grpc_rb_sBatchResult = rb_struct_define(
1016       "BatchResult", "send_message", "send_metadata", "send_close",
1017       "send_status", "message", "metadata", "status", "cancelled", NULL);
1018 
1019   Init_grpc_error_codes();
1020   Init_grpc_op_codes();
1021   Init_grpc_write_flags();
1022   Init_grpc_metadata_keys();
1023 }
1024 
1025 /* Gets the call from the ruby object */
grpc_rb_get_wrapped_call(VALUE v)1026 grpc_call* grpc_rb_get_wrapped_call(VALUE v) {
1027   grpc_rb_call* call = NULL;
1028   TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call);
1029   return call->wrapped;
1030 }
1031 
1032 /* Obtains the wrapped object for a given call */
grpc_rb_wrap_call(grpc_call * c,grpc_completion_queue * q)1033 VALUE grpc_rb_wrap_call(grpc_call* c, grpc_completion_queue* q) {
1034   grpc_rb_call* wrapper;
1035   if (c == NULL || q == NULL) {
1036     return Qnil;
1037   }
1038   wrapper = ALLOC(grpc_rb_call);
1039   wrapper->wrapped = c;
1040   wrapper->queue = q;
1041   return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
1042 }
1043