1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_event_thread.h"
22 #include "rb_grpc_imports.generated.h"
23
24 #include <stdbool.h>
25
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29 #include <grpc/support/time.h>
30 #include <ruby/thread.h>
31
32 #include "rb_grpc.h"
33
34 typedef struct grpc_rb_event {
35 // callback will be called with argument while holding the GVL
36 void (*callback)(void*);
37 void* argument;
38
39 struct grpc_rb_event* next;
40 } grpc_rb_event;
41
42 typedef struct grpc_rb_event_queue {
43 grpc_rb_event* head;
44 grpc_rb_event* tail;
45
46 gpr_mu mu;
47 gpr_cv cv;
48
49 // Indicates that the thread should stop waiting
50 bool abort;
51 } grpc_rb_event_queue;
52
53 static grpc_rb_event_queue event_queue;
54
grpc_rb_event_queue_enqueue(void (* callback)(void *),void * argument)55 void grpc_rb_event_queue_enqueue(void (*callback)(void*), void* argument) {
56 grpc_rb_event* event = gpr_malloc(sizeof(grpc_rb_event));
57 event->callback = callback;
58 event->argument = argument;
59 event->next = NULL;
60 gpr_mu_lock(&event_queue.mu);
61 if (event_queue.tail == NULL) {
62 event_queue.head = event_queue.tail = event;
63 } else {
64 event_queue.tail->next = event;
65 event_queue.tail = event;
66 }
67 gpr_cv_signal(&event_queue.cv);
68 gpr_mu_unlock(&event_queue.mu);
69 }
70
grpc_rb_event_queue_dequeue()71 static grpc_rb_event* grpc_rb_event_queue_dequeue() {
72 grpc_rb_event* event;
73 if (event_queue.head == NULL) {
74 event = NULL;
75 } else {
76 event = event_queue.head;
77 if (event_queue.head->next == NULL) {
78 event_queue.head = event_queue.tail = NULL;
79 } else {
80 event_queue.head = event_queue.head->next;
81 }
82 }
83 return event;
84 }
85
grpc_rb_event_queue_destroy()86 static void grpc_rb_event_queue_destroy() {
87 gpr_mu_destroy(&event_queue.mu);
88 gpr_cv_destroy(&event_queue.cv);
89 }
90
grpc_rb_wait_for_event_no_gil(void * param)91 static void* grpc_rb_wait_for_event_no_gil(void* param) {
92 grpc_rb_event* event = NULL;
93 (void)param;
94 gpr_mu_lock(&event_queue.mu);
95 while (!event_queue.abort) {
96 if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
97 gpr_mu_unlock(&event_queue.mu);
98 return event;
99 }
100 gpr_cv_wait(&event_queue.cv, &event_queue.mu,
101 gpr_inf_future(GPR_CLOCK_REALTIME));
102 }
103 gpr_mu_unlock(&event_queue.mu);
104 return NULL;
105 }
106
grpc_rb_event_unblocking_func(void * arg)107 static void grpc_rb_event_unblocking_func(void* arg) {
108 (void)arg;
109 gpr_mu_lock(&event_queue.mu);
110 event_queue.abort = true;
111 gpr_cv_signal(&event_queue.cv);
112 gpr_mu_unlock(&event_queue.mu);
113 }
114
115 /* This is the implementation of the thread that handles auth metadata plugin
116 * events */
grpc_rb_event_thread(VALUE arg)117 static VALUE grpc_rb_event_thread(VALUE arg) {
118 grpc_rb_event* event;
119 (void)arg;
120 grpc_ruby_init();
121 while (true) {
122 event = (grpc_rb_event*)rb_thread_call_without_gvl(
123 grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
124 NULL);
125 if (event == NULL) {
126 // Indicates that the thread needs to shut down
127 break;
128 } else {
129 event->callback(event->argument);
130 gpr_free(event);
131 }
132 }
133 grpc_rb_event_queue_destroy();
134 grpc_ruby_shutdown();
135 return Qnil;
136 }
137
grpc_rb_event_queue_thread_start()138 void grpc_rb_event_queue_thread_start() {
139 event_queue.head = event_queue.tail = NULL;
140 event_queue.abort = false;
141 gpr_mu_init(&event_queue.mu);
142 gpr_cv_init(&event_queue.cv);
143
144 rb_thread_create(grpc_rb_event_thread, NULL);
145 }
146