1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_call.h"
22 #include "rb_grpc_imports.generated.h"
23
24 #include <grpc/grpc.h>
25 #include <grpc/impl/codegen/compression_types.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28
29 #include "rb_byte_buffer.h"
30 #include "rb_call_credentials.h"
31 #include "rb_completion_queue.h"
32 #include "rb_grpc.h"
33
34 /* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
35 static VALUE grpc_rb_cCall;
36
37 /* grpc_rb_eCallError is the ruby class of the exception thrown during call
38 operations; */
39 VALUE grpc_rb_eCallError = Qnil;
40
41 /* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
42 a timeout. */
43 static VALUE grpc_rb_eOutOfTime = Qnil;
44
45 /* grpc_rb_sBatchResult is struct class used to hold the results of a batch
46 * call. */
47 static VALUE grpc_rb_sBatchResult;
48
49 /* grpc_rb_cMdAry is the MetadataArray class whose instances proxy
50 * grpc_metadata_array. */
51 static VALUE grpc_rb_cMdAry;
52
53 /* id_credentials is the name of the hidden ivar that preserves the value
54 * of the credentials added to the call */
55 static ID id_credentials;
56
57 /* id_metadata is name of the attribute used to access the metadata hash
58 * received by the call and subsequently saved on it. */
59 static ID id_metadata;
60
61 /* id_trailing_metadata is the name of the attribute used to access the trailing
62 * metadata hash received by the call and subsequently saved on it. */
63 static ID id_trailing_metadata;
64
65 /* id_status is name of the attribute used to access the status object
66 * received by the call and subsequently saved on it. */
67 static ID id_status;
68
69 /* id_write_flag is name of the attribute used to access the write_flag
70 * saved on the call. */
71 static ID id_write_flag;
72
73 /* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
74 static VALUE sym_send_message;
75 static VALUE sym_send_metadata;
76 static VALUE sym_send_close;
77 static VALUE sym_send_status;
78 static VALUE sym_message;
79 static VALUE sym_status;
80 static VALUE sym_cancelled;
81
82 typedef struct grpc_rb_call {
83 grpc_call* wrapped;
84 grpc_completion_queue* queue;
85 } grpc_rb_call;
86
destroy_call(grpc_rb_call * call)87 static void destroy_call(grpc_rb_call* call) {
88 /* Ensure that we only try to destroy the call once */
89 if (call->wrapped != NULL) {
90 grpc_call_unref(call->wrapped);
91 call->wrapped = NULL;
92 grpc_rb_completion_queue_destroy(call->queue);
93 call->queue = NULL;
94 }
95 }
96
97 /* Destroys a Call. */
grpc_rb_call_destroy(void * p)98 static void grpc_rb_call_destroy(void* p) {
99 if (p == NULL) {
100 return;
101 }
102 destroy_call((grpc_rb_call*)p);
103 xfree(p);
104 }
105
106 static const rb_data_type_t grpc_rb_md_ary_data_type = {
107 "grpc_metadata_array",
108 {GRPC_RB_GC_NOT_MARKED,
109 GRPC_RB_GC_DONT_FREE,
110 GRPC_RB_MEMSIZE_UNAVAILABLE,
111 {NULL, NULL}},
112 NULL,
113 NULL,
114 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
115 /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
116 * grpc_rb_call_destroy
117 * touches a hash object.
118 * TODO(yugui) Directly use st_table and call the free function earlier?
119 */
120 0,
121 #endif
122 };
123
124 /* Describes grpc_call struct for RTypedData */
125 static const rb_data_type_t grpc_call_data_type = {"grpc_call",
126 {GRPC_RB_GC_NOT_MARKED,
127 grpc_rb_call_destroy,
128 GRPC_RB_MEMSIZE_UNAVAILABLE,
129 {NULL, NULL}},
130 NULL,
131 NULL,
132 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
133 RUBY_TYPED_FREE_IMMEDIATELY
134 #endif
135 };
136
137 /* Error code details is a hash containing text strings describing errors */
138 VALUE rb_error_code_details;
139
140 /* Obtains the error detail string for given error code */
grpc_call_error_detail_of(grpc_call_error err)141 const char* grpc_call_error_detail_of(grpc_call_error err) {
142 VALUE detail_ref = rb_hash_aref(rb_error_code_details, UINT2NUM(err));
143 const char* detail = "unknown error code!";
144 if (detail_ref != Qnil) {
145 detail = StringValueCStr(detail_ref);
146 }
147 return detail;
148 }
149
150 /* Called by clients to cancel an RPC on the server.
151 Can be called multiple times, from any thread. */
grpc_rb_call_cancel(VALUE self)152 static VALUE grpc_rb_call_cancel(VALUE self) {
153 grpc_rb_call* call = NULL;
154 grpc_call_error err;
155 if (RTYPEDDATA_DATA(self) == NULL) {
156 // This call has been closed
157 return Qnil;
158 }
159
160 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
161 err = grpc_call_cancel(call->wrapped, NULL);
162 if (err != GRPC_CALL_OK) {
163 rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
164 grpc_call_error_detail_of(err), err);
165 }
166
167 return Qnil;
168 }
169
170 /* TODO: expose this as part of the surface API if needed.
171 * This is meant for internal usage by the "write thread" of grpc-ruby
172 * client-side bidi calls. It provides a way for the background write-thread
173 * to propogate failures to the main read-thread and give the user an error
174 * message. */
grpc_rb_call_cancel_with_status(VALUE self,VALUE status_code,VALUE details)175 static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code,
176 VALUE details) {
177 grpc_rb_call* call = NULL;
178 grpc_call_error err;
179 if (RTYPEDDATA_DATA(self) == NULL) {
180 // This call has been closed
181 return Qnil;
182 }
183
184 if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) {
185 rb_raise(rb_eTypeError,
186 "Bad parameter type error for cancel with status. Want Fixnum, "
187 "String.");
188 return Qnil;
189 }
190
191 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
192 err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code),
193 StringValueCStr(details), NULL);
194 if (err != GRPC_CALL_OK) {
195 rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)",
196 grpc_call_error_detail_of(err), err);
197 }
198
199 return Qnil;
200 }
201
202 /* Releases the c-level resources associated with a call
203 Once a call has been closed, no further requests can be
204 processed.
205 */
grpc_rb_call_close(VALUE self)206 static VALUE grpc_rb_call_close(VALUE self) {
207 grpc_rb_call* call = NULL;
208 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
209 if (call != NULL) {
210 destroy_call(call);
211 xfree(RTYPEDDATA_DATA(self));
212 RTYPEDDATA_DATA(self) = NULL;
213 }
214 return Qnil;
215 }
216
217 /* Called to obtain the peer that this call is connected to. */
grpc_rb_call_get_peer(VALUE self)218 static VALUE grpc_rb_call_get_peer(VALUE self) {
219 VALUE res = Qnil;
220 grpc_rb_call* call = NULL;
221 char* peer = NULL;
222 if (RTYPEDDATA_DATA(self) == NULL) {
223 rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
224 return Qnil;
225 }
226 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
227 peer = grpc_call_get_peer(call->wrapped);
228 res = rb_str_new2(peer);
229 gpr_free(peer);
230
231 return res;
232 }
233
234 /* Called to obtain the x509 cert of an authenticated peer. */
grpc_rb_call_get_peer_cert(VALUE self)235 static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
236 grpc_rb_call* call = NULL;
237 VALUE res = Qnil;
238 grpc_auth_context* ctx = NULL;
239 if (RTYPEDDATA_DATA(self) == NULL) {
240 rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
241 return Qnil;
242 }
243 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
244
245 ctx = grpc_call_auth_context(call->wrapped);
246
247 if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) {
248 return Qnil;
249 }
250
251 {
252 grpc_auth_property_iterator it = grpc_auth_context_find_properties_by_name(
253 ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME);
254 const grpc_auth_property* prop = grpc_auth_property_iterator_next(&it);
255 if (prop == NULL) {
256 return Qnil;
257 }
258
259 res = rb_str_new2(prop->value);
260 }
261
262 grpc_auth_context_release(ctx);
263
264 return res;
265 }
266
267 /*
268 call-seq:
269 status = call.status
270
271 Gets the status object saved the call. */
grpc_rb_call_get_status(VALUE self)272 static VALUE grpc_rb_call_get_status(VALUE self) {
273 return rb_ivar_get(self, id_status);
274 }
275
276 /*
277 call-seq:
278 call.status = status
279
280 Saves a status object on the call. */
grpc_rb_call_set_status(VALUE self,VALUE status)281 static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
282 if (!NIL_P(status) && rb_obj_class(status) != grpc_rb_sStatus) {
283 rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
284 rb_obj_classname(status));
285 return Qnil;
286 }
287
288 return rb_ivar_set(self, id_status, status);
289 }
290
291 /*
292 call-seq:
293 metadata = call.metadata
294
295 Gets the metadata object saved the call. */
grpc_rb_call_get_metadata(VALUE self)296 static VALUE grpc_rb_call_get_metadata(VALUE self) {
297 return rb_ivar_get(self, id_metadata);
298 }
299
300 /*
301 call-seq:
302 call.metadata = metadata
303
304 Saves the metadata hash on the call. */
grpc_rb_call_set_metadata(VALUE self,VALUE metadata)305 static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
306 if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
307 rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
308 rb_obj_classname(metadata));
309 return Qnil;
310 }
311
312 return rb_ivar_set(self, id_metadata, metadata);
313 }
314
315 /*
316 call-seq:
317 trailing_metadata = call.trailing_metadata
318
319 Gets the trailing metadata object saved on the call */
grpc_rb_call_get_trailing_metadata(VALUE self)320 static VALUE grpc_rb_call_get_trailing_metadata(VALUE self) {
321 return rb_ivar_get(self, id_trailing_metadata);
322 }
323
324 /*
325 call-seq:
326 call.trailing_metadata = trailing_metadata
327
328 Saves the trailing metadata hash on the call. */
grpc_rb_call_set_trailing_metadata(VALUE self,VALUE metadata)329 static VALUE grpc_rb_call_set_trailing_metadata(VALUE self, VALUE metadata) {
330 if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
331 rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
332 rb_obj_classname(metadata));
333 return Qnil;
334 }
335
336 return rb_ivar_set(self, id_trailing_metadata, metadata);
337 }
338
339 /*
340 call-seq:
341 write_flag = call.write_flag
342
343 Gets the write_flag value saved the call. */
grpc_rb_call_get_write_flag(VALUE self)344 static VALUE grpc_rb_call_get_write_flag(VALUE self) {
345 return rb_ivar_get(self, id_write_flag);
346 }
347
348 /*
349 call-seq:
350 call.write_flag = write_flag
351
352 Saves the write_flag on the call. */
grpc_rb_call_set_write_flag(VALUE self,VALUE write_flag)353 static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
354 if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
355 rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
356 rb_obj_classname(write_flag));
357 return Qnil;
358 }
359
360 return rb_ivar_set(self, id_write_flag, write_flag);
361 }
362
363 /*
364 call-seq:
365 call.set_credentials call_credentials
366
367 Sets credentials on a call */
grpc_rb_call_set_credentials(VALUE self,VALUE credentials)368 static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
369 grpc_rb_call* call = NULL;
370 grpc_call_credentials* creds;
371 grpc_call_error err;
372 if (RTYPEDDATA_DATA(self) == NULL) {
373 rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
374 return Qnil;
375 }
376 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
377 creds = grpc_rb_get_wrapped_call_credentials(credentials);
378 err = grpc_call_set_credentials(call->wrapped, creds);
379 if (err != GRPC_CALL_OK) {
380 rb_raise(grpc_rb_eCallError,
381 "grpc_call_set_credentials failed with %s (code=%d)",
382 grpc_call_error_detail_of(err), err);
383 }
384 /* We need the credentials to be alive for as long as the call is alive,
385 but we don't care about destruction order. */
386 rb_ivar_set(self, id_credentials, credentials);
387 return Qnil;
388 }
389
390 /* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
391 to fill grpc_metadata_array.
392
393 it's capacity should have been computed via a prior call to
394 grpc_rb_md_ary_capacity_hash_cb
395 */
grpc_rb_md_ary_fill_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)396 static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
397 grpc_metadata_array* md_ary = NULL;
398 long array_length;
399 long i;
400 grpc_slice key_slice;
401 grpc_slice value_slice;
402 char* tmp_str = NULL;
403
404 if (TYPE(key) == T_SYMBOL) {
405 key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key)));
406 } else if (TYPE(key) == T_STRING) {
407 key_slice =
408 grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key));
409 } else {
410 rb_raise(rb_eTypeError,
411 "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
412 return ST_STOP;
413 }
414
415 if (!grpc_header_key_is_legal(key_slice)) {
416 tmp_str = grpc_slice_to_c_string(key_slice);
417 rb_raise(rb_eArgError,
418 "'%s' is an invalid header key, must match [a-z0-9-_.]+", tmp_str);
419 return ST_STOP;
420 }
421
422 /* Construct a metadata object from key and value and add it */
423 TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
424 &grpc_rb_md_ary_data_type, md_ary);
425
426 if (TYPE(val) == T_ARRAY) {
427 array_length = RARRAY_LEN(val);
428 /* If the value is an array, add capacity for each value in the array */
429 for (i = 0; i < array_length; i++) {
430 value_slice = grpc_slice_from_copied_buffer(
431 RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i)));
432 if (!grpc_is_binary_header(key_slice) &&
433 !grpc_header_nonbin_value_is_legal(value_slice)) {
434 // The value has invalid characters
435 tmp_str = grpc_slice_to_c_string(value_slice);
436 rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
437 tmp_str);
438 return ST_STOP;
439 }
440 GPR_ASSERT(md_ary->count < md_ary->capacity);
441 md_ary->metadata[md_ary->count].key = key_slice;
442 md_ary->metadata[md_ary->count].value = value_slice;
443 md_ary->count += 1;
444 }
445 } else if (TYPE(val) == T_STRING) {
446 value_slice =
447 grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val));
448 if (!grpc_is_binary_header(key_slice) &&
449 !grpc_header_nonbin_value_is_legal(value_slice)) {
450 // The value has invalid characters
451 tmp_str = grpc_slice_to_c_string(value_slice);
452 rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
453 tmp_str);
454 return ST_STOP;
455 }
456 GPR_ASSERT(md_ary->count < md_ary->capacity);
457 md_ary->metadata[md_ary->count].key = key_slice;
458 md_ary->metadata[md_ary->count].value = value_slice;
459 md_ary->count += 1;
460 } else {
461 rb_raise(rb_eArgError, "Header values must be of type string or array");
462 return ST_STOP;
463 }
464 return ST_CONTINUE;
465 }
466
467 /* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
468 to pre-compute the capacity a grpc_metadata_array.
469 */
grpc_rb_md_ary_capacity_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)470 static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
471 VALUE md_ary_obj) {
472 grpc_metadata_array* md_ary = NULL;
473
474 (void)key;
475
476 /* Construct a metadata object from key and value and add it */
477 TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
478 &grpc_rb_md_ary_data_type, md_ary);
479
480 if (TYPE(val) == T_ARRAY) {
481 /* If the value is an array, add capacity for each value in the array */
482 md_ary->capacity += RARRAY_LEN(val);
483 } else {
484 md_ary->capacity += 1;
485 }
486
487 return ST_CONTINUE;
488 }
489
490 /* grpc_rb_md_ary_convert converts a ruby metadata hash into
491 a grpc_metadata_array.
492 */
grpc_rb_md_ary_convert(VALUE md_ary_hash,grpc_metadata_array * md_ary)493 void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array* md_ary) {
494 VALUE md_ary_obj = Qnil;
495 if (md_ary_hash == Qnil) {
496 return; /* Do nothing if the expected has value is nil */
497 }
498 if (TYPE(md_ary_hash) != T_HASH) {
499 rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
500 rb_obj_classname(md_ary_hash));
501 return;
502 }
503
504 /* Initialize the array, compute it's capacity, then fill it. */
505 grpc_metadata_array_init(md_ary);
506 md_ary_obj =
507 TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
508 rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
509 md_ary->metadata = gpr_zalloc(md_ary->capacity * sizeof(grpc_metadata));
510 rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
511 }
512
513 /* Converts a metadata array to a hash. */
grpc_rb_md_ary_to_h(grpc_metadata_array * md_ary)514 VALUE grpc_rb_md_ary_to_h(grpc_metadata_array* md_ary) {
515 VALUE key = Qnil;
516 VALUE new_ary = Qnil;
517 VALUE value = Qnil;
518 VALUE result = rb_hash_new();
519 size_t i;
520
521 for (i = 0; i < md_ary->count; i++) {
522 key = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].key);
523 value = rb_hash_aref(result, key);
524 if (value == Qnil) {
525 value = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value);
526 rb_hash_aset(result, key, value);
527 } else if (TYPE(value) == T_ARRAY) {
528 /* Add the string to the returned array */
529 rb_ary_push(value,
530 grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
531 } else {
532 /* Add the current value with this key and the new one to an array */
533 new_ary = rb_ary_new();
534 rb_ary_push(new_ary, value);
535 rb_ary_push(new_ary,
536 grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
537 rb_hash_aset(result, key, new_ary);
538 }
539 }
540 return result;
541 }
542
543 /* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
544 each key of an ops hash is valid.
545 */
grpc_rb_call_check_op_keys_hash_cb(VALUE key,VALUE val,VALUE ops_ary)546 static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
547 VALUE ops_ary) {
548 (void)val;
549 /* Update the capacity; the value is an array, add capacity for each value in
550 * the array */
551 if (TYPE(key) != T_FIXNUM) {
552 rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>",
553 rb_obj_classname(key));
554 return ST_STOP;
555 }
556 switch (NUM2INT(key)) {
557 case GRPC_OP_SEND_INITIAL_METADATA:
558 case GRPC_OP_SEND_MESSAGE:
559 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
560 case GRPC_OP_SEND_STATUS_FROM_SERVER:
561 case GRPC_OP_RECV_INITIAL_METADATA:
562 case GRPC_OP_RECV_MESSAGE:
563 case GRPC_OP_RECV_STATUS_ON_CLIENT:
564 case GRPC_OP_RECV_CLOSE_ON_SERVER:
565 rb_ary_push(ops_ary, key);
566 return ST_CONTINUE;
567 default:
568 rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
569 };
570 return ST_STOP;
571 }
572
573 /* grpc_rb_op_update_status_from_server adds the values in a ruby status
574 struct to the 'send_status_from_server' portion of an op.
575 */
grpc_rb_op_update_status_from_server(grpc_op * op,grpc_metadata_array * md_ary,grpc_slice * send_status_details,VALUE status)576 static void grpc_rb_op_update_status_from_server(
577 grpc_op* op, grpc_metadata_array* md_ary, grpc_slice* send_status_details,
578 VALUE status) {
579 VALUE code = rb_struct_aref(status, sym_code);
580 VALUE details = rb_struct_aref(status, sym_details);
581 VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
582
583 /* TODO: add check to ensure status is the correct struct type */
584 if (TYPE(code) != T_FIXNUM) {
585 rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>",
586 rb_obj_classname(code));
587 return;
588 }
589 if (TYPE(details) != T_STRING) {
590 rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>",
591 rb_obj_classname(code));
592 return;
593 }
594
595 *send_status_details =
596 grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details));
597
598 op->data.send_status_from_server.status = NUM2INT(code);
599 op->data.send_status_from_server.status_details = send_status_details;
600 grpc_rb_md_ary_convert(metadata_hash, md_ary);
601 op->data.send_status_from_server.trailing_metadata_count = md_ary->count;
602 op->data.send_status_from_server.trailing_metadata = md_ary->metadata;
603 }
604
605 /* run_batch_stack holds various values used by the
606 * grpc_rb_call_run_batch function */
607 typedef struct run_batch_stack {
608 /* The batch ops */
609 grpc_op ops[8]; /* 8 is the maximum number of operations */
610 size_t op_num; /* tracks the last added operation */
611
612 /* Data being sent */
613 grpc_metadata_array send_metadata;
614 grpc_metadata_array send_trailing_metadata;
615
616 /* Data being received */
617 grpc_byte_buffer* recv_message;
618 grpc_metadata_array recv_metadata;
619 grpc_metadata_array recv_trailing_metadata;
620 int recv_cancelled;
621 grpc_status_code recv_status;
622 grpc_slice recv_status_details;
623 unsigned write_flag;
624 grpc_slice send_status_details;
625 } run_batch_stack;
626
627 /* grpc_run_batch_stack_init ensures the run_batch_stack is properly
628 * initialized */
grpc_run_batch_stack_init(run_batch_stack * st,unsigned write_flag)629 static void grpc_run_batch_stack_init(run_batch_stack* st,
630 unsigned write_flag) {
631 MEMZERO(st, run_batch_stack, 1);
632 grpc_metadata_array_init(&st->send_metadata);
633 grpc_metadata_array_init(&st->send_trailing_metadata);
634 grpc_metadata_array_init(&st->recv_metadata);
635 grpc_metadata_array_init(&st->recv_trailing_metadata);
636 st->op_num = 0;
637 st->write_flag = write_flag;
638 }
639
grpc_rb_metadata_array_destroy_including_entries(grpc_metadata_array * array)640 void grpc_rb_metadata_array_destroy_including_entries(
641 grpc_metadata_array* array) {
642 size_t i;
643 if (array->metadata) {
644 for (i = 0; i < array->count; i++) {
645 grpc_slice_unref(array->metadata[i].key);
646 grpc_slice_unref(array->metadata[i].value);
647 }
648 }
649 grpc_metadata_array_destroy(array);
650 }
651
652 /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
653 * cleaned up */
grpc_run_batch_stack_cleanup(run_batch_stack * st)654 static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
655 size_t i = 0;
656
657 grpc_rb_metadata_array_destroy_including_entries(&st->send_metadata);
658 grpc_rb_metadata_array_destroy_including_entries(&st->send_trailing_metadata);
659 grpc_metadata_array_destroy(&st->recv_metadata);
660 grpc_metadata_array_destroy(&st->recv_trailing_metadata);
661
662 if (GRPC_SLICE_START_PTR(st->send_status_details) != NULL) {
663 grpc_slice_unref(st->send_status_details);
664 }
665
666 if (GRPC_SLICE_START_PTR(st->recv_status_details) != NULL) {
667 grpc_slice_unref(st->recv_status_details);
668 }
669
670 if (st->recv_message != NULL) {
671 grpc_byte_buffer_destroy(st->recv_message);
672 }
673
674 for (i = 0; i < st->op_num; i++) {
675 if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) {
676 grpc_byte_buffer_destroy(st->ops[i].data.send_message.send_message);
677 }
678 }
679 }
680
681 /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
682 * ops_hash */
grpc_run_batch_stack_fill_ops(run_batch_stack * st,VALUE ops_hash)683 static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
684 VALUE this_op = Qnil;
685 VALUE this_value = Qnil;
686 VALUE ops_ary = rb_ary_new();
687 size_t i = 0;
688
689 /* Create a ruby array with just the operation keys */
690 rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary);
691
692 /* Fill the ops array */
693 for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
694 this_op = rb_ary_entry(ops_ary, i);
695 this_value = rb_hash_aref(ops_hash, this_op);
696 st->ops[st->op_num].flags = 0;
697 switch (NUM2INT(this_op)) {
698 case GRPC_OP_SEND_INITIAL_METADATA:
699 grpc_rb_md_ary_convert(this_value, &st->send_metadata);
700 st->ops[st->op_num].data.send_initial_metadata.count =
701 st->send_metadata.count;
702 st->ops[st->op_num].data.send_initial_metadata.metadata =
703 st->send_metadata.metadata;
704 break;
705 case GRPC_OP_SEND_MESSAGE:
706 st->ops[st->op_num].data.send_message.send_message =
707 grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
708 RSTRING_LEN(this_value));
709 st->ops[st->op_num].flags = st->write_flag;
710 break;
711 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
712 break;
713 case GRPC_OP_SEND_STATUS_FROM_SERVER:
714 grpc_rb_op_update_status_from_server(
715 &st->ops[st->op_num], &st->send_trailing_metadata,
716 &st->send_status_details, this_value);
717 break;
718 case GRPC_OP_RECV_INITIAL_METADATA:
719 st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata =
720 &st->recv_metadata;
721 break;
722 case GRPC_OP_RECV_MESSAGE:
723 st->ops[st->op_num].data.recv_message.recv_message = &st->recv_message;
724 break;
725 case GRPC_OP_RECV_STATUS_ON_CLIENT:
726 st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =
727 &st->recv_trailing_metadata;
728 st->ops[st->op_num].data.recv_status_on_client.status =
729 &st->recv_status;
730 st->ops[st->op_num].data.recv_status_on_client.status_details =
731 &st->recv_status_details;
732 break;
733 case GRPC_OP_RECV_CLOSE_ON_SERVER:
734 st->ops[st->op_num].data.recv_close_on_server.cancelled =
735 &st->recv_cancelled;
736 break;
737 default:
738 grpc_run_batch_stack_cleanup(st);
739 rb_raise(rb_eTypeError, "invalid operation : bad value %d",
740 NUM2INT(this_op));
741 };
742 st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
743 st->ops[st->op_num].reserved = NULL;
744 st->op_num++;
745 }
746 }
747
748 /* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
749 after the results have run */
grpc_run_batch_stack_build_result(run_batch_stack * st)750 static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
751 size_t i = 0;
752 VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
753 Qnil, Qnil, Qnil, Qnil, NULL);
754 for (i = 0; i < st->op_num; i++) {
755 switch (st->ops[i].op) {
756 case GRPC_OP_SEND_INITIAL_METADATA:
757 rb_struct_aset(result, sym_send_metadata, Qtrue);
758 break;
759 case GRPC_OP_SEND_MESSAGE:
760 rb_struct_aset(result, sym_send_message, Qtrue);
761 break;
762 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
763 rb_struct_aset(result, sym_send_close, Qtrue);
764 break;
765 case GRPC_OP_SEND_STATUS_FROM_SERVER:
766 rb_struct_aset(result, sym_send_status, Qtrue);
767 break;
768 case GRPC_OP_RECV_INITIAL_METADATA:
769 rb_struct_aset(result, sym_metadata,
770 grpc_rb_md_ary_to_h(&st->recv_metadata));
771 case GRPC_OP_RECV_MESSAGE:
772 rb_struct_aset(result, sym_message,
773 grpc_rb_byte_buffer_to_s(st->recv_message));
774 break;
775 case GRPC_OP_RECV_STATUS_ON_CLIENT:
776 rb_struct_aset(
777 result, sym_status,
778 rb_struct_new(
779 grpc_rb_sStatus, UINT2NUM(st->recv_status),
780 (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL
781 ? Qnil
782 : grpc_rb_slice_to_ruby_string(st->recv_status_details)),
783 grpc_rb_md_ary_to_h(&st->recv_trailing_metadata), NULL));
784 break;
785 case GRPC_OP_RECV_CLOSE_ON_SERVER:
786 rb_struct_aset(result, sym_send_close, Qtrue);
787 break;
788 default:
789 break;
790 }
791 }
792 return result;
793 }
794
795 /* call-seq:
796 ops = {
797 GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
798 GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
799 ...
800 }
801 tag = Object.new
802 timeout = 10
803 call.start_batch(tag, timeout, ops)
804
805 Start a batch of operations defined in the array ops; when complete, post a
806 completion of type 'tag' to the completion queue bound to the call.
807
808 Also waits for the batch to complete, until timeout is reached.
809 The order of ops specified in the batch has no significance.
810 Only one operation of each type can be active at once in any given
811 batch */
grpc_rb_call_run_batch(VALUE self,VALUE ops_hash)812 static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
813 run_batch_stack* st = NULL;
814 grpc_rb_call* call = NULL;
815 grpc_event ev;
816 grpc_call_error err;
817 VALUE result = Qnil;
818 VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
819 unsigned write_flag = 0;
820 void* tag = (void*)&st;
821
822 grpc_ruby_fork_guard();
823 if (RTYPEDDATA_DATA(self) == NULL) {
824 rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
825 return Qnil;
826 }
827 TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
828
829 /* Validate the ops args, adding them to a ruby array */
830 if (TYPE(ops_hash) != T_HASH) {
831 rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
832 return Qnil;
833 }
834 if (rb_write_flag != Qnil) {
835 write_flag = NUM2UINT(rb_write_flag);
836 }
837 st = gpr_malloc(sizeof(run_batch_stack));
838 grpc_run_batch_stack_init(st, write_flag);
839 grpc_run_batch_stack_fill_ops(st, ops_hash);
840
841 /* call grpc_call_start_batch, then wait for it to complete using
842 * pluck_event */
843 err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL);
844 if (err != GRPC_CALL_OK) {
845 grpc_run_batch_stack_cleanup(st);
846 gpr_free(st);
847 rb_raise(grpc_rb_eCallError,
848 "grpc_call_start_batch failed with %s (code=%d)",
849 grpc_call_error_detail_of(err), err);
850 return Qnil;
851 }
852 ev = rb_completion_queue_pluck(call->queue, tag,
853 gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
854 if (!ev.success) {
855 rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
856 }
857 /* Build and return the BatchResult struct result,
858 if there is an error, it's reflected in the status */
859 result = grpc_run_batch_stack_build_result(st);
860 grpc_run_batch_stack_cleanup(st);
861 gpr_free(st);
862 return result;
863 }
864
Init_grpc_write_flags()865 static void Init_grpc_write_flags() {
866 /* Constants representing the write flags in grpc.h */
867 VALUE grpc_rb_mWriteFlags =
868 rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
869 rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
870 UINT2NUM(GRPC_WRITE_BUFFER_HINT));
871 rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
872 UINT2NUM(GRPC_WRITE_NO_COMPRESS));
873 }
874
Init_grpc_error_codes()875 static void Init_grpc_error_codes() {
876 /* Constants representing the error codes of grpc_call_error in grpc.h */
877 VALUE grpc_rb_mRpcErrors =
878 rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors");
879 rb_define_const(grpc_rb_mRpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
880 rb_define_const(grpc_rb_mRpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
881 rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_SERVER",
882 UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER));
883 rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_CLIENT",
884 UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT));
885 rb_define_const(grpc_rb_mRpcErrors, "ALREADY_ACCEPTED",
886 UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED));
887 rb_define_const(grpc_rb_mRpcErrors, "ALREADY_INVOKED",
888 UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED));
889 rb_define_const(grpc_rb_mRpcErrors, "NOT_INVOKED",
890 UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED));
891 rb_define_const(grpc_rb_mRpcErrors, "ALREADY_FINISHED",
892 UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED));
893 rb_define_const(grpc_rb_mRpcErrors, "TOO_MANY_OPERATIONS",
894 UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
895 rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS",
896 UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS));
897
898 /* Hint the GC that this is a global and shouldn't be sweeped. */
899 rb_global_variable(&rb_error_code_details);
900
901 /* Add the detail strings to a Hash */
902 rb_error_code_details = rb_hash_new();
903 rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_OK),
904 rb_str_new2("ok"));
905 rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR),
906 rb_str_new2("unknown error"));
907 rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER),
908 rb_str_new2("not available on a server"));
909 rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT),
910 rb_str_new2("not available on a client"));
911 rb_hash_aset(rb_error_code_details,
912 UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED),
913 rb_str_new2("call is already accepted"));
914 rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED),
915 rb_str_new2("call is already invoked"));
916 rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED),
917 rb_str_new2("call is not yet invoked"));
918 rb_hash_aset(rb_error_code_details,
919 UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED),
920 rb_str_new2("call is already finished"));
921 rb_hash_aset(rb_error_code_details,
922 UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS),
923 rb_str_new2("outstanding read or write present"));
924 rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS),
925 rb_str_new2("a bad flag was given"));
926 rb_define_const(grpc_rb_mRpcErrors, "ErrorMessages", rb_error_code_details);
927 rb_obj_freeze(rb_error_code_details);
928 }
929
Init_grpc_op_codes()930 static void Init_grpc_op_codes() {
931 /* Constants representing operation type codes in grpc.h */
932 VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
933 rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
934 UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
935 rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
936 UINT2NUM(GRPC_OP_SEND_MESSAGE));
937 rb_define_const(grpc_rb_mCallOps, "SEND_CLOSE_FROM_CLIENT",
938 UINT2NUM(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
939 rb_define_const(grpc_rb_mCallOps, "SEND_STATUS_FROM_SERVER",
940 UINT2NUM(GRPC_OP_SEND_STATUS_FROM_SERVER));
941 rb_define_const(grpc_rb_mCallOps, "RECV_INITIAL_METADATA",
942 UINT2NUM(GRPC_OP_RECV_INITIAL_METADATA));
943 rb_define_const(grpc_rb_mCallOps, "RECV_MESSAGE",
944 UINT2NUM(GRPC_OP_RECV_MESSAGE));
945 rb_define_const(grpc_rb_mCallOps, "RECV_STATUS_ON_CLIENT",
946 UINT2NUM(GRPC_OP_RECV_STATUS_ON_CLIENT));
947 rb_define_const(grpc_rb_mCallOps, "RECV_CLOSE_ON_SERVER",
948 UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER));
949 }
950
Init_grpc_metadata_keys()951 static void Init_grpc_metadata_keys() {
952 VALUE grpc_rb_mMetadataKeys =
953 rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
954 rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM",
955 rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY));
956 }
957
Init_grpc_call()958 void Init_grpc_call() {
959 /* CallError inherits from Exception to signal that it is non-recoverable */
960 grpc_rb_eCallError =
961 rb_define_class_under(grpc_rb_mGrpcCore, "CallError", rb_eException);
962 grpc_rb_eOutOfTime =
963 rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
964 grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
965 grpc_rb_cMdAry =
966 rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
967
968 /* Prevent allocation or inialization of the Call class */
969 rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
970 rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
971 rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
972 1);
973
974 /* Add ruby analogues of the Call methods. */
975 rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
976 rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
977 rb_define_method(grpc_rb_cCall, "cancel_with_status",
978 grpc_rb_call_cancel_with_status, 2);
979 rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
980 rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
981 rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
982 rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
983 rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
984 rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
985 rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
986 rb_define_method(grpc_rb_cCall, "trailing_metadata",
987 grpc_rb_call_get_trailing_metadata, 0);
988 rb_define_method(grpc_rb_cCall,
989 "trailing_metadata=", grpc_rb_call_set_trailing_metadata, 1);
990 rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
991 rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
992 1);
993 rb_define_method(grpc_rb_cCall, "set_credentials!",
994 grpc_rb_call_set_credentials, 1);
995
996 /* Ids used to support call attributes */
997 id_metadata = rb_intern("metadata");
998 id_trailing_metadata = rb_intern("trailing_metadata");
999 id_status = rb_intern("status");
1000 id_write_flag = rb_intern("write_flag");
1001
1002 /* Ids used by the c wrapping internals. */
1003 id_credentials = rb_intern("__credentials");
1004
1005 /* Ids used in constructing the batch result. */
1006 sym_send_message = ID2SYM(rb_intern("send_message"));
1007 sym_send_metadata = ID2SYM(rb_intern("send_metadata"));
1008 sym_send_close = ID2SYM(rb_intern("send_close"));
1009 sym_send_status = ID2SYM(rb_intern("send_status"));
1010 sym_message = ID2SYM(rb_intern("message"));
1011 sym_status = ID2SYM(rb_intern("status"));
1012 sym_cancelled = ID2SYM(rb_intern("cancelled"));
1013
1014 /* The Struct used to return the run_batch result. */
1015 grpc_rb_sBatchResult = rb_struct_define(
1016 "BatchResult", "send_message", "send_metadata", "send_close",
1017 "send_status", "message", "metadata", "status", "cancelled", NULL);
1018
1019 Init_grpc_error_codes();
1020 Init_grpc_op_codes();
1021 Init_grpc_write_flags();
1022 Init_grpc_metadata_keys();
1023 }
1024
1025 /* Gets the call from the ruby object */
grpc_rb_get_wrapped_call(VALUE v)1026 grpc_call* grpc_rb_get_wrapped_call(VALUE v) {
1027 grpc_rb_call* call = NULL;
1028 TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call);
1029 return call->wrapped;
1030 }
1031
1032 /* Obtains the wrapped object for a given call */
grpc_rb_wrap_call(grpc_call * c,grpc_completion_queue * q)1033 VALUE grpc_rb_wrap_call(grpc_call* c, grpc_completion_queue* q) {
1034 grpc_rb_call* wrapper;
1035 if (c == NULL || q == NULL) {
1036 return Qnil;
1037 }
1038 wrapper = ALLOC(grpc_rb_call);
1039 wrapper->wrapped = c;
1040 wrapper->queue = q;
1041 return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
1042 }
1043