1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_grpc_imports.generated.h"
22 #include "rb_server.h"
23
24 #include <grpc/grpc.h>
25 #include <grpc/grpc_security.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/log.h>
28 #include "rb_byte_buffer.h"
29 #include "rb_call.h"
30 #include "rb_channel_args.h"
31 #include "rb_completion_queue.h"
32 #include "rb_grpc.h"
33 #include "rb_server_credentials.h"
34
35 /* grpc_rb_cServer is the ruby class that proxies grpc_server. */
36 static VALUE grpc_rb_cServer = Qnil;
37
38 /* id_at is the constructor method of the ruby standard Time class. */
39 static ID id_at;
40
41 /* id_insecure_server is used to indicate that a server is insecure */
42 static VALUE id_insecure_server;
43
44 /* grpc_rb_server wraps a grpc_server. */
45 typedef struct grpc_rb_server {
46 /* The actual server */
47 grpc_server* wrapped;
48 grpc_completion_queue* queue;
49 int shutdown_and_notify_done;
50 int destroy_done;
51 } grpc_rb_server;
52
grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server * server,gpr_timespec deadline)53 static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server,
54 gpr_timespec deadline) {
55 grpc_event ev;
56 void* tag = &ev;
57 if (!server->shutdown_and_notify_done) {
58 server->shutdown_and_notify_done = 1;
59 if (server->wrapped != NULL) {
60 grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
61 ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL);
62 if (ev.type == GRPC_QUEUE_TIMEOUT) {
63 grpc_server_cancel_all_calls(server->wrapped);
64 ev = rb_completion_queue_pluck(
65 server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
66 }
67 if (ev.type != GRPC_OP_COMPLETE) {
68 gpr_log(GPR_INFO,
69 "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
70 ev.type);
71 }
72 }
73 }
74 }
75
grpc_rb_server_maybe_destroy(grpc_rb_server * server)76 static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
77 // This can be started by app or implicitly by GC. Avoid a race between these.
78 if (!server->destroy_done) {
79 server->destroy_done = 1;
80 if (server->wrapped != NULL) {
81 grpc_server_destroy(server->wrapped);
82 grpc_rb_completion_queue_destroy(server->queue);
83 server->wrapped = NULL;
84 server->queue = NULL;
85 }
86 }
87 }
88
89 /* Destroys server instances. */
grpc_rb_server_free(void * p)90 static void grpc_rb_server_free(void* p) {
91 grpc_rb_server* svr = NULL;
92 gpr_timespec deadline;
93 if (p == NULL) {
94 return;
95 };
96 svr = (grpc_rb_server*)p;
97
98 deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
99 gpr_time_from_seconds(2, GPR_TIMESPAN));
100
101 grpc_rb_server_maybe_shutdown_and_notify(svr, deadline);
102 grpc_rb_server_maybe_destroy(svr);
103
104 xfree(p);
105 }
106
107 static const rb_data_type_t grpc_rb_server_data_type = {
108 "grpc_server",
109 {GRPC_RB_GC_NOT_MARKED,
110 grpc_rb_server_free,
111 GRPC_RB_MEMSIZE_UNAVAILABLE,
112 {NULL, NULL}},
113 NULL,
114 NULL,
115 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
116 /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
117 * function would block and we might want to unlock GVL
118 * TODO(yugui) Unlock GVL?
119 */
120 0,
121 #endif
122 };
123
124 /* Allocates grpc_rb_server instances. */
grpc_rb_server_alloc(VALUE cls)125 static VALUE grpc_rb_server_alloc(VALUE cls) {
126 grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
127 wrapper->wrapped = NULL;
128 wrapper->destroy_done = 0;
129 wrapper->shutdown_and_notify_done = 0;
130 return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
131 }
132
133 /*
134 call-seq:
135 server = Server.new({'arg1': 'value1'})
136
137 Initializes server instances. */
grpc_rb_server_init(VALUE self,VALUE channel_args)138 static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
139 grpc_completion_queue* cq = NULL;
140 grpc_rb_server* wrapper = NULL;
141 grpc_server* srv = NULL;
142 grpc_channel_args args;
143 MEMZERO(&args, grpc_channel_args, 1);
144
145 grpc_ruby_once_init();
146
147 cq = grpc_completion_queue_create_for_pluck(NULL);
148 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
149 wrapper);
150 grpc_rb_hash_convert_to_channel_args(channel_args, &args);
151 srv = grpc_server_create(&args, NULL);
152
153 if (args.args != NULL) {
154 xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
155 }
156 if (srv == NULL) {
157 rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why");
158 }
159 grpc_server_register_completion_queue(srv, cq, NULL);
160 wrapper->wrapped = srv;
161 wrapper->queue = cq;
162
163 return self;
164 }
165
166 /* request_call_stack holds various values used by the
167 * grpc_rb_server_request_call function */
168 typedef struct request_call_stack {
169 grpc_call_details details;
170 grpc_metadata_array md_ary;
171 } request_call_stack;
172
173 /* grpc_request_call_stack_init ensures the request_call_stack is properly
174 * initialized */
grpc_request_call_stack_init(request_call_stack * st)175 static void grpc_request_call_stack_init(request_call_stack* st) {
176 MEMZERO(st, request_call_stack, 1);
177 grpc_metadata_array_init(&st->md_ary);
178 grpc_call_details_init(&st->details);
179 }
180
181 /* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
182 * cleaned up */
grpc_request_call_stack_cleanup(request_call_stack * st)183 static void grpc_request_call_stack_cleanup(request_call_stack* st) {
184 grpc_metadata_array_destroy(&st->md_ary);
185 grpc_call_details_destroy(&st->details);
186 }
187
188 /* call-seq:
189 server.request_call
190
191 Requests notification of a new call on a server. */
grpc_rb_server_request_call(VALUE self)192 static VALUE grpc_rb_server_request_call(VALUE self) {
193 grpc_rb_server* s = NULL;
194 grpc_call* call = NULL;
195 grpc_event ev;
196 grpc_call_error err;
197 request_call_stack st;
198 VALUE result;
199 void* tag = (void*)&st;
200 grpc_completion_queue* call_queue =
201 grpc_completion_queue_create_for_pluck(NULL);
202 gpr_timespec deadline;
203
204 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
205 if (s->wrapped == NULL) {
206 rb_raise(rb_eRuntimeError, "destroyed!");
207 return Qnil;
208 }
209 grpc_request_call_stack_init(&st);
210 /* call grpc_server_request_call, then wait for it to complete using
211 * pluck_event */
212 err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary,
213 call_queue, s->queue, tag);
214 if (err != GRPC_CALL_OK) {
215 grpc_request_call_stack_cleanup(&st);
216 rb_raise(grpc_rb_eCallError,
217 "grpc_server_request_call failed: %s (code=%d)",
218 grpc_call_error_detail_of(err), err);
219 return Qnil;
220 }
221
222 ev = rb_completion_queue_pluck(s->queue, tag,
223 gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
224 if (!ev.success) {
225 grpc_request_call_stack_cleanup(&st);
226 rb_raise(grpc_rb_eCallError, "request_call completion failed");
227 return Qnil;
228 }
229
230 /* build the NewServerRpc struct result */
231 deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
232 result = rb_struct_new(
233 grpc_rb_sNewServerRpc, grpc_rb_slice_to_ruby_string(st.details.method),
234 grpc_rb_slice_to_ruby_string(st.details.host),
235 rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
236 INT2NUM(deadline.tv_nsec / 1000)),
237 grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call, call_queue),
238 NULL);
239 grpc_request_call_stack_cleanup(&st);
240 return result;
241 }
242
grpc_rb_server_start(VALUE self)243 static VALUE grpc_rb_server_start(VALUE self) {
244 grpc_rb_server* s = NULL;
245 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
246
247 grpc_ruby_fork_guard();
248 if (s->wrapped == NULL) {
249 rb_raise(rb_eRuntimeError, "destroyed!");
250 } else {
251 grpc_server_start(s->wrapped);
252 }
253 return Qnil;
254 }
255
grpc_rb_server_shutdown_and_notify(VALUE self,VALUE timeout)256 static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
257 gpr_timespec deadline;
258 grpc_rb_server* s = NULL;
259
260 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
261 if (TYPE(timeout) == T_NIL) {
262 deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
263 } else {
264 deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
265 }
266
267 grpc_rb_server_maybe_shutdown_and_notify(s, deadline);
268
269 return Qnil;
270 }
271
272 /*
273 call-seq:
274 server = Server.new({'arg1': 'value1'})
275 ... // do stuff with server
276 ...
277 ... // initiate server shutdown
278 server.shutdown_and_notify(timeout)
279 ... // to shutdown the server
280 server.destroy()
281
282 Destroys server instances. */
grpc_rb_server_destroy(VALUE self)283 static VALUE grpc_rb_server_destroy(VALUE self) {
284 grpc_rb_server* s = NULL;
285 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
286 grpc_rb_server_maybe_destroy(s);
287 return Qnil;
288 }
289
290 /*
291 call-seq:
292 // insecure port
293 insecure_server = Server.new(cq, {'arg1': 'value1'})
294 insecure_server.add_http2_port('mydomain:50051', :this_port_is_insecure)
295
296 // secure port
297 server_creds = ...
298 secure_server = Server.new(cq, {'arg1': 'value1'})
299 secure_server.add_http_port('mydomain:50051', server_creds)
300
301 Adds a http2 port to server */
grpc_rb_server_add_http2_port(VALUE self,VALUE port,VALUE rb_creds)302 static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
303 VALUE rb_creds) {
304 grpc_rb_server* s = NULL;
305 grpc_server_credentials* creds = NULL;
306 int recvd_port = 0;
307
308 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
309 if (s->wrapped == NULL) {
310 rb_raise(rb_eRuntimeError, "destroyed!");
311 return Qnil;
312 } else if (TYPE(rb_creds) == T_SYMBOL) {
313 if (id_insecure_server != SYM2ID(rb_creds)) {
314 rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
315 return Qnil;
316 }
317 recvd_port =
318 grpc_server_add_insecure_http2_port(s->wrapped, StringValueCStr(port));
319 if (recvd_port == 0) {
320 rb_raise(rb_eRuntimeError,
321 "could not add port %s to server, not sure why",
322 StringValueCStr(port));
323 }
324 } else {
325 creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
326 recvd_port = grpc_server_add_secure_http2_port(
327 s->wrapped, StringValueCStr(port), creds);
328 if (recvd_port == 0) {
329 rb_raise(rb_eRuntimeError,
330 "could not add secure port %s to server, not sure why",
331 StringValueCStr(port));
332 }
333 }
334 return INT2NUM(recvd_port);
335 }
336
Init_grpc_server()337 void Init_grpc_server() {
338 grpc_rb_cServer =
339 rb_define_class_under(grpc_rb_mGrpcCore, "Server", rb_cObject);
340
341 /* Allocates an object managed by the ruby runtime */
342 rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc);
343
344 /* Provides a ruby constructor and support for dup/clone. */
345 rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
346 rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
347 1);
348
349 /* Add the server methods. */
350 rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
351 0);
352 rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
353 rb_define_method(grpc_rb_cServer, "shutdown_and_notify",
354 grpc_rb_server_shutdown_and_notify, 1);
355 rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
356 rb_define_alias(grpc_rb_cServer, "close", "destroy");
357 rb_define_method(grpc_rb_cServer, "add_http2_port",
358 grpc_rb_server_add_http2_port, 2);
359 id_at = rb_intern("at");
360 id_insecure_server = rb_intern("this_port_is_insecure");
361 }
362
363 /* Gets the wrapped server from the ruby wrapper */
grpc_rb_get_wrapped_server(VALUE v)364 grpc_server* grpc_rb_get_wrapped_server(VALUE v) {
365 grpc_rb_server* wrapper = NULL;
366 TypedData_Get_Struct(v, grpc_rb_server, &grpc_rb_server_data_type, wrapper);
367 return wrapper->wrapped;
368 }
369