• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_event_thread.h"
22 
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 #include <grpc/support/time.h>
27 #include <ruby/thread.h>
28 #include <stdbool.h>
29 
30 #include "rb_grpc.h"
31 #include "rb_grpc_imports.generated.h"
32 
33 typedef struct grpc_rb_event {
34   // callback will be called with argument while holding the GVL
35   void (*callback)(void*);
36   void* argument;
37 
38   struct grpc_rb_event* next;
39 } grpc_rb_event;
40 
41 typedef struct grpc_rb_event_queue {
42   grpc_rb_event* head;
43   grpc_rb_event* tail;
44 
45   gpr_mu mu;
46   gpr_cv cv;
47 
48   // Indicates that the thread should stop waiting
49   bool abort;
50 } grpc_rb_event_queue;
51 
52 static grpc_rb_event_queue event_queue;
53 static VALUE g_event_thread = Qnil;
54 static bool g_one_time_init_done = false;
55 
grpc_rb_event_queue_enqueue(void (* callback)(void *),void * argument)56 void grpc_rb_event_queue_enqueue(void (*callback)(void*), void* argument) {
57   grpc_rb_event* event = gpr_malloc(sizeof(grpc_rb_event));
58   event->callback = callback;
59   event->argument = argument;
60   event->next = NULL;
61   gpr_mu_lock(&event_queue.mu);
62   if (event_queue.tail == NULL) {
63     event_queue.head = event_queue.tail = event;
64   } else {
65     event_queue.tail->next = event;
66     event_queue.tail = event;
67   }
68   gpr_cv_signal(&event_queue.cv);
69   gpr_mu_unlock(&event_queue.mu);
70 }
71 
grpc_rb_event_queue_dequeue()72 static grpc_rb_event* grpc_rb_event_queue_dequeue() {
73   grpc_rb_event* event;
74   if (event_queue.head == NULL) {
75     event = NULL;
76   } else {
77     event = event_queue.head;
78     if (event_queue.head->next == NULL) {
79       event_queue.head = event_queue.tail = NULL;
80     } else {
81       event_queue.head = event_queue.head->next;
82     }
83   }
84   return event;
85 }
86 
grpc_rb_event_queue_destroy()87 static void grpc_rb_event_queue_destroy() {
88   gpr_mu_destroy(&event_queue.mu);
89   gpr_cv_destroy(&event_queue.cv);
90 }
91 
grpc_rb_wait_for_event_no_gil(void * param)92 static void* grpc_rb_wait_for_event_no_gil(void* param) {
93   grpc_rb_event* event = NULL;
94   (void)param;
95   gpr_mu_lock(&event_queue.mu);
96   while (!event_queue.abort) {
97     if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
98       gpr_mu_unlock(&event_queue.mu);
99       return event;
100     }
101     gpr_cv_wait(&event_queue.cv, &event_queue.mu,
102                 gpr_inf_future(GPR_CLOCK_REALTIME));
103   }
104   gpr_mu_unlock(&event_queue.mu);
105   return NULL;
106 }
107 
grpc_rb_event_unblocking_func_wrapper(void * arg)108 static void* grpc_rb_event_unblocking_func_wrapper(void* arg) {
109   (void)arg;
110   gpr_mu_lock(&event_queue.mu);
111   event_queue.abort = true;
112   gpr_cv_signal(&event_queue.cv);
113   gpr_mu_unlock(&event_queue.mu);
114   return NULL;
115 }
116 
grpc_rb_event_unblocking_func(void * arg)117 static void grpc_rb_event_unblocking_func(void* arg) {
118   grpc_rb_event_unblocking_func_wrapper(arg);
119 }
120 
121 /* This is the implementation of the thread that handles auth metadata plugin
122  * events */
grpc_rb_event_thread(void * arg)123 static VALUE grpc_rb_event_thread(void* arg) {
124   grpc_rb_event* event;
125   (void)arg;
126   while (true) {
127     event = (grpc_rb_event*)rb_thread_call_without_gvl(
128         grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
129         NULL);
130     if (event == NULL) {
131       // Indicates that the thread needs to shut down
132       break;
133     } else {
134       event->callback(event->argument);
135       gpr_free(event);
136     }
137   }
138   grpc_rb_event_queue_destroy();
139   return Qnil;
140 }
141 
grpc_rb_event_queue_thread_start()142 void grpc_rb_event_queue_thread_start() {
143   if (!g_one_time_init_done) {
144     g_one_time_init_done = true;
145     gpr_mu_init(&event_queue.mu);
146     gpr_cv_init(&event_queue.cv);
147     rb_global_variable(&g_event_thread);
148     event_queue.head = event_queue.tail = NULL;
149   }
150   event_queue.abort = false;
151   GRPC_RUBY_ASSERT(!RTEST(g_event_thread));
152   g_event_thread = rb_thread_create(grpc_rb_event_thread, NULL);
153 }
154 
grpc_rb_event_queue_thread_stop()155 void grpc_rb_event_queue_thread_stop() {
156   GRPC_RUBY_ASSERT(g_one_time_init_done);
157   if (!RTEST(g_event_thread)) {
158     grpc_absl_log(
159         GPR_ERROR,
160         "GRPC_RUBY: call credentials thread stop: thread not running");
161     return;
162   }
163   rb_thread_call_without_gvl(grpc_rb_event_unblocking_func_wrapper, NULL, NULL,
164                              NULL);
165   rb_funcall(g_event_thread, rb_intern("join"), 0);
166   g_event_thread = Qnil;
167 }
168