1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_event_thread.h"
22 #include "rb_grpc_imports.generated.h"
23
24 #include <stdbool.h>
25
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29 #include <grpc/support/time.h>
30 #include <ruby/thread.h>
31
32 typedef struct grpc_rb_event {
33 // callback will be called with argument while holding the GVL
34 void (*callback)(void*);
35 void* argument;
36
37 struct grpc_rb_event* next;
38 } grpc_rb_event;
39
40 typedef struct grpc_rb_event_queue {
41 grpc_rb_event* head;
42 grpc_rb_event* tail;
43
44 gpr_mu mu;
45 gpr_cv cv;
46
47 // Indicates that the thread should stop waiting
48 bool abort;
49 } grpc_rb_event_queue;
50
51 static grpc_rb_event_queue event_queue;
52
grpc_rb_event_queue_enqueue(void (* callback)(void *),void * argument)53 void grpc_rb_event_queue_enqueue(void (*callback)(void*), void* argument) {
54 grpc_rb_event* event = gpr_malloc(sizeof(grpc_rb_event));
55 event->callback = callback;
56 event->argument = argument;
57 event->next = NULL;
58 gpr_mu_lock(&event_queue.mu);
59 if (event_queue.tail == NULL) {
60 event_queue.head = event_queue.tail = event;
61 } else {
62 event_queue.tail->next = event;
63 event_queue.tail = event;
64 }
65 gpr_cv_signal(&event_queue.cv);
66 gpr_mu_unlock(&event_queue.mu);
67 }
68
grpc_rb_event_queue_dequeue()69 static grpc_rb_event* grpc_rb_event_queue_dequeue() {
70 grpc_rb_event* event;
71 if (event_queue.head == NULL) {
72 event = NULL;
73 } else {
74 event = event_queue.head;
75 if (event_queue.head->next == NULL) {
76 event_queue.head = event_queue.tail = NULL;
77 } else {
78 event_queue.head = event_queue.head->next;
79 }
80 }
81 return event;
82 }
83
grpc_rb_event_queue_destroy()84 static void grpc_rb_event_queue_destroy() {
85 gpr_mu_destroy(&event_queue.mu);
86 gpr_cv_destroy(&event_queue.cv);
87 }
88
grpc_rb_wait_for_event_no_gil(void * param)89 static void* grpc_rb_wait_for_event_no_gil(void* param) {
90 grpc_rb_event* event = NULL;
91 (void)param;
92 gpr_mu_lock(&event_queue.mu);
93 while (!event_queue.abort) {
94 if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
95 gpr_mu_unlock(&event_queue.mu);
96 return event;
97 }
98 gpr_cv_wait(&event_queue.cv, &event_queue.mu,
99 gpr_inf_future(GPR_CLOCK_REALTIME));
100 }
101 gpr_mu_unlock(&event_queue.mu);
102 return NULL;
103 }
104
grpc_rb_event_unblocking_func(void * arg)105 static void grpc_rb_event_unblocking_func(void* arg) {
106 (void)arg;
107 gpr_mu_lock(&event_queue.mu);
108 event_queue.abort = true;
109 gpr_cv_signal(&event_queue.cv);
110 gpr_mu_unlock(&event_queue.mu);
111 }
112
113 /* This is the implementation of the thread that handles auth metadata plugin
114 * events */
grpc_rb_event_thread(VALUE arg)115 static VALUE grpc_rb_event_thread(VALUE arg) {
116 grpc_rb_event* event;
117 (void)arg;
118 while (true) {
119 event = (grpc_rb_event*)rb_thread_call_without_gvl(
120 grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
121 NULL);
122 if (event == NULL) {
123 // Indicates that the thread needs to shut down
124 break;
125 } else {
126 event->callback(event->argument);
127 gpr_free(event);
128 }
129 }
130 grpc_rb_event_queue_destroy();
131 return Qnil;
132 }
133
grpc_rb_event_queue_thread_start()134 void grpc_rb_event_queue_thread_start() {
135 event_queue.head = event_queue.tail = NULL;
136 event_queue.abort = false;
137 gpr_mu_init(&event_queue.mu);
138 gpr_cv_init(&event_queue.cv);
139
140 rb_thread_create(grpc_rb_event_thread, NULL);
141 }
142