1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_completion_queue.h"
22 #include "rb_grpc_imports.generated.h"
23
24 #include <ruby/thread.h>
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include "rb_grpc.h"
30
31 /* Used to allow grpc_completion_queue_next call to release the GIL */
32 typedef struct next_call_stack {
33 grpc_completion_queue* cq;
34 grpc_event event;
35 gpr_timespec timeout;
36 void* tag;
37 volatile int interrupted;
38 } next_call_stack;
39
40 /* Calls grpc_completion_queue_pluck without holding the ruby GIL */
grpc_rb_completion_queue_pluck_no_gil(void * param)41 static void* grpc_rb_completion_queue_pluck_no_gil(void* param) {
42 next_call_stack* const next_call = (next_call_stack*)param;
43 gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
44 gpr_timespec deadline;
45 do {
46 deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
47 next_call->event = grpc_completion_queue_pluck(
48 next_call->cq, next_call->tag, deadline, NULL);
49 if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
50 gpr_time_cmp(deadline, next_call->timeout) > 0) {
51 break;
52 }
53 } while (!next_call->interrupted);
54 return NULL;
55 }
56
57 /* Helper function to free a completion queue. */
grpc_rb_completion_queue_destroy(grpc_completion_queue * cq)58 void grpc_rb_completion_queue_destroy(grpc_completion_queue* cq) {
59 /* Every function that adds an event to a queue also synchronously plucks
60 that event from the queue, and holds a reference to the Ruby object that
61 holds the queue, so we only get to this point if all of those functions
62 have completed, and the queue is empty */
63 grpc_completion_queue_shutdown(cq);
64 grpc_completion_queue_destroy(cq);
65 }
66
unblock_func(void * param)67 static void unblock_func(void* param) {
68 next_call_stack* const next_call = (next_call_stack*)param;
69 next_call->interrupted = 1;
70 }
71
72 /* Does the same thing as grpc_completion_queue_pluck, while properly releasing
73 the GVL and handling interrupts */
rb_completion_queue_pluck(grpc_completion_queue * queue,void * tag,gpr_timespec deadline,void * reserved)74 grpc_event rb_completion_queue_pluck(grpc_completion_queue* queue, void* tag,
75 gpr_timespec deadline, void* reserved) {
76 next_call_stack next_call;
77 MEMZERO(&next_call, next_call_stack, 1);
78 next_call.cq = queue;
79 next_call.timeout = deadline;
80 next_call.tag = tag;
81 next_call.event.type = GRPC_QUEUE_TIMEOUT;
82 (void)reserved;
83 /* Loop until we finish a pluck without an interruption. The internal
84 pluck function runs either until it is interrupted or it gets an
85 event, or time runs out.
86
87 The basic reason we need this relatively complicated construction is that
88 we need to re-acquire the GVL when an interrupt comes in, so that the ruby
89 interpreter can do what it needs to do with the interrupt. But we also need
90 to get back to plucking when the interrupt has been handled. */
91 do {
92 next_call.interrupted = 0;
93 rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
94 (void*)&next_call, unblock_func,
95 (void*)&next_call);
96 /* If an interrupt prevented pluck from returning useful information, then
97 any plucks that did complete must have timed out */
98 } while (next_call.interrupted && next_call.event.type == GRPC_QUEUE_TIMEOUT);
99 return next_call.event;
100 }
101