1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_event_thread.h"
22
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 #include <grpc/support/time.h>
27 #include <ruby/thread.h>
28 #include <stdbool.h>
29
30 #include "rb_grpc.h"
31 #include "rb_grpc_imports.generated.h"
32
33 typedef struct grpc_rb_event {
34 // callback will be called with argument while holding the GVL
35 void (*callback)(void*);
36 void* argument;
37
38 struct grpc_rb_event* next;
39 } grpc_rb_event;
40
41 typedef struct grpc_rb_event_queue {
42 grpc_rb_event* head;
43 grpc_rb_event* tail;
44
45 gpr_mu mu;
46 gpr_cv cv;
47
48 // Indicates that the thread should stop waiting
49 bool abort;
50 } grpc_rb_event_queue;
51
52 static grpc_rb_event_queue event_queue;
53 static VALUE g_event_thread = Qnil;
54 static bool g_one_time_init_done = false;
55
grpc_rb_event_queue_enqueue(void (* callback)(void *),void * argument)56 void grpc_rb_event_queue_enqueue(void (*callback)(void*), void* argument) {
57 grpc_rb_event* event = gpr_malloc(sizeof(grpc_rb_event));
58 event->callback = callback;
59 event->argument = argument;
60 event->next = NULL;
61 gpr_mu_lock(&event_queue.mu);
62 if (event_queue.tail == NULL) {
63 event_queue.head = event_queue.tail = event;
64 } else {
65 event_queue.tail->next = event;
66 event_queue.tail = event;
67 }
68 gpr_cv_signal(&event_queue.cv);
69 gpr_mu_unlock(&event_queue.mu);
70 }
71
grpc_rb_event_queue_dequeue()72 static grpc_rb_event* grpc_rb_event_queue_dequeue() {
73 grpc_rb_event* event;
74 if (event_queue.head == NULL) {
75 event = NULL;
76 } else {
77 event = event_queue.head;
78 if (event_queue.head->next == NULL) {
79 event_queue.head = event_queue.tail = NULL;
80 } else {
81 event_queue.head = event_queue.head->next;
82 }
83 }
84 return event;
85 }
86
grpc_rb_event_queue_destroy()87 static void grpc_rb_event_queue_destroy() {
88 gpr_mu_destroy(&event_queue.mu);
89 gpr_cv_destroy(&event_queue.cv);
90 }
91
grpc_rb_wait_for_event_no_gil(void * param)92 static void* grpc_rb_wait_for_event_no_gil(void* param) {
93 grpc_rb_event* event = NULL;
94 (void)param;
95 gpr_mu_lock(&event_queue.mu);
96 while (!event_queue.abort) {
97 if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
98 gpr_mu_unlock(&event_queue.mu);
99 return event;
100 }
101 gpr_cv_wait(&event_queue.cv, &event_queue.mu,
102 gpr_inf_future(GPR_CLOCK_REALTIME));
103 }
104 gpr_mu_unlock(&event_queue.mu);
105 return NULL;
106 }
107
grpc_rb_event_unblocking_func_wrapper(void * arg)108 static void* grpc_rb_event_unblocking_func_wrapper(void* arg) {
109 (void)arg;
110 gpr_mu_lock(&event_queue.mu);
111 event_queue.abort = true;
112 gpr_cv_signal(&event_queue.cv);
113 gpr_mu_unlock(&event_queue.mu);
114 return NULL;
115 }
116
grpc_rb_event_unblocking_func(void * arg)117 static void grpc_rb_event_unblocking_func(void* arg) {
118 grpc_rb_event_unblocking_func_wrapper(arg);
119 }
120
121 /* This is the implementation of the thread that handles auth metadata plugin
122 * events */
grpc_rb_event_thread(void * arg)123 static VALUE grpc_rb_event_thread(void* arg) {
124 grpc_rb_event* event;
125 (void)arg;
126 while (true) {
127 event = (grpc_rb_event*)rb_thread_call_without_gvl(
128 grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
129 NULL);
130 if (event == NULL) {
131 // Indicates that the thread needs to shut down
132 break;
133 } else {
134 event->callback(event->argument);
135 gpr_free(event);
136 }
137 }
138 grpc_rb_event_queue_destroy();
139 return Qnil;
140 }
141
grpc_rb_event_queue_thread_start()142 void grpc_rb_event_queue_thread_start() {
143 if (!g_one_time_init_done) {
144 g_one_time_init_done = true;
145 gpr_mu_init(&event_queue.mu);
146 gpr_cv_init(&event_queue.cv);
147 rb_global_variable(&g_event_thread);
148 event_queue.head = event_queue.tail = NULL;
149 }
150 event_queue.abort = false;
151 GRPC_RUBY_ASSERT(!RTEST(g_event_thread));
152 g_event_thread = rb_thread_create(grpc_rb_event_thread, NULL);
153 }
154
grpc_rb_event_queue_thread_stop()155 void grpc_rb_event_queue_thread_stop() {
156 GRPC_RUBY_ASSERT(g_one_time_init_done);
157 if (!RTEST(g_event_thread)) {
158 grpc_absl_log(
159 GPR_ERROR,
160 "GRPC_RUBY: call credentials thread stop: thread not running");
161 return;
162 }
163 rb_thread_call_without_gvl(grpc_rb_event_unblocking_func_wrapper, NULL, NULL,
164 NULL);
165 rb_funcall(g_event_thread, rb_intern("join"), 0);
166 g_event_thread = Qnil;
167 }
168