• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_event_thread.h"
22 #include "rb_grpc_imports.generated.h"
23 
24 #include <stdbool.h>
25 
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29 #include <grpc/support/time.h>
30 #include <ruby/thread.h>
31 
32 typedef struct grpc_rb_event {
33   // callback will be called with argument while holding the GVL
34   void (*callback)(void*);
35   void* argument;
36 
37   struct grpc_rb_event* next;
38 } grpc_rb_event;
39 
40 typedef struct grpc_rb_event_queue {
41   grpc_rb_event* head;
42   grpc_rb_event* tail;
43 
44   gpr_mu mu;
45   gpr_cv cv;
46 
47   // Indicates that the thread should stop waiting
48   bool abort;
49 } grpc_rb_event_queue;
50 
51 static grpc_rb_event_queue event_queue;
52 
grpc_rb_event_queue_enqueue(void (* callback)(void *),void * argument)53 void grpc_rb_event_queue_enqueue(void (*callback)(void*), void* argument) {
54   grpc_rb_event* event = gpr_malloc(sizeof(grpc_rb_event));
55   event->callback = callback;
56   event->argument = argument;
57   event->next = NULL;
58   gpr_mu_lock(&event_queue.mu);
59   if (event_queue.tail == NULL) {
60     event_queue.head = event_queue.tail = event;
61   } else {
62     event_queue.tail->next = event;
63     event_queue.tail = event;
64   }
65   gpr_cv_signal(&event_queue.cv);
66   gpr_mu_unlock(&event_queue.mu);
67 }
68 
grpc_rb_event_queue_dequeue()69 static grpc_rb_event* grpc_rb_event_queue_dequeue() {
70   grpc_rb_event* event;
71   if (event_queue.head == NULL) {
72     event = NULL;
73   } else {
74     event = event_queue.head;
75     if (event_queue.head->next == NULL) {
76       event_queue.head = event_queue.tail = NULL;
77     } else {
78       event_queue.head = event_queue.head->next;
79     }
80   }
81   return event;
82 }
83 
grpc_rb_event_queue_destroy()84 static void grpc_rb_event_queue_destroy() {
85   gpr_mu_destroy(&event_queue.mu);
86   gpr_cv_destroy(&event_queue.cv);
87 }
88 
grpc_rb_wait_for_event_no_gil(void * param)89 static void* grpc_rb_wait_for_event_no_gil(void* param) {
90   grpc_rb_event* event = NULL;
91   (void)param;
92   gpr_mu_lock(&event_queue.mu);
93   while (!event_queue.abort) {
94     if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
95       gpr_mu_unlock(&event_queue.mu);
96       return event;
97     }
98     gpr_cv_wait(&event_queue.cv, &event_queue.mu,
99                 gpr_inf_future(GPR_CLOCK_REALTIME));
100   }
101   gpr_mu_unlock(&event_queue.mu);
102   return NULL;
103 }
104 
grpc_rb_event_unblocking_func(void * arg)105 static void grpc_rb_event_unblocking_func(void* arg) {
106   (void)arg;
107   gpr_mu_lock(&event_queue.mu);
108   event_queue.abort = true;
109   gpr_cv_signal(&event_queue.cv);
110   gpr_mu_unlock(&event_queue.mu);
111 }
112 
113 /* This is the implementation of the thread that handles auth metadata plugin
114  * events */
grpc_rb_event_thread(VALUE arg)115 static VALUE grpc_rb_event_thread(VALUE arg) {
116   grpc_rb_event* event;
117   (void)arg;
118   while (true) {
119     event = (grpc_rb_event*)rb_thread_call_without_gvl(
120         grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
121         NULL);
122     if (event == NULL) {
123       // Indicates that the thread needs to shut down
124       break;
125     } else {
126       event->callback(event->argument);
127       gpr_free(event);
128     }
129   }
130   grpc_rb_event_queue_destroy();
131   return Qnil;
132 }
133 
grpc_rb_event_queue_thread_start()134 void grpc_rb_event_queue_thread_start() {
135   event_queue.head = event_queue.tail = NULL;
136   event_queue.abort = false;
137   gpr_mu_init(&event_queue.mu);
138   gpr_cv_init(&event_queue.cv);
139 
140   rb_thread_create(grpc_rb_event_thread, NULL);
141 }
142