• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_server.h"
22 
23 #include <grpc/credentials.h>
24 #include <grpc/grpc.h>
25 #include <grpc/grpc_security.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/log.h>
28 
29 #include "rb_byte_buffer.h"
30 #include "rb_call.h"
31 #include "rb_channel_args.h"
32 #include "rb_completion_queue.h"
33 #include "rb_grpc.h"
34 #include "rb_grpc_imports.generated.h"
35 #include "rb_server_credentials.h"
36 #include "rb_xds_server_credentials.h"
37 
38 /* grpc_rb_cServer is the ruby class that proxies grpc_server. */
39 static VALUE grpc_rb_cServer = Qnil;
40 
41 /* id_at is the constructor method of the ruby standard Time class. */
42 static ID id_at;
43 
44 /* id_insecure_server is used to indicate that a server is insecure */
45 static VALUE id_insecure_server;
46 
47 /* grpc_rb_server wraps a grpc_server. */
48 typedef struct grpc_rb_server {
49   /* The actual server */
50   grpc_server* wrapped;
51   grpc_completion_queue* queue;
52   int destroy_done;
53 } grpc_rb_server;
54 
grpc_rb_server_shutdown_and_notify_internal(grpc_rb_server * server,gpr_timespec deadline)55 static void grpc_rb_server_shutdown_and_notify_internal(grpc_rb_server* server,
56                                                         gpr_timespec deadline) {
57   grpc_event ev;
58   void* tag = &ev;
59   if (server->wrapped != NULL) {
60     grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
61     // Following pluck calls will release the GIL and block but cannot
62     // be interrupted. They should terminate quickly enough though b/c
63     // we will cancel all server calls after the deadline.
64     ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL, NULL);
65     if (ev.type == GRPC_QUEUE_TIMEOUT) {
66       grpc_server_cancel_all_calls(server->wrapped);
67       ev = rb_completion_queue_pluck(
68           server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL, NULL);
69     }
70     if (ev.type != GRPC_OP_COMPLETE) {
71       grpc_absl_log_int(
72           GPR_DEBUG,
73           "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:", ev.type);
74     }
75   }
76 }
77 
grpc_rb_server_maybe_destroy(grpc_rb_server * server)78 static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
79   // This can be started by app or implicitly by GC. Avoid a race between these.
80   if (!server->destroy_done) {
81     server->destroy_done = 1;
82     if (server->wrapped != NULL) {
83       grpc_server_destroy(server->wrapped);
84       grpc_rb_completion_queue_destroy(server->queue);
85       server->wrapped = NULL;
86       server->queue = NULL;
87     }
88   }
89 }
90 
grpc_rb_server_free_internal(void * p)91 static void grpc_rb_server_free_internal(void* p) {
92   grpc_rb_server* svr = NULL;
93   gpr_timespec deadline;
94   if (p == NULL) {
95     return;
96   };
97   svr = (grpc_rb_server*)p;
98 
99   deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
100                           gpr_time_from_seconds(2, GPR_TIMESPAN));
101 
102   grpc_rb_server_shutdown_and_notify_internal(svr, deadline);
103   grpc_rb_server_maybe_destroy(svr);
104 
105   xfree(p);
106 }
107 
108 /* Destroys server instances. */
grpc_rb_server_free(void * p)109 static void grpc_rb_server_free(void* p) { grpc_rb_server_free_internal(p); }
110 
111 static const rb_data_type_t grpc_rb_server_data_type = {
112     "grpc_server",
113     {GRPC_RB_GC_NOT_MARKED,
114      grpc_rb_server_free,
115      GRPC_RB_MEMSIZE_UNAVAILABLE,
116      {NULL, NULL}},
117     NULL,
118     NULL,
119 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
120     /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
121      * function would block and we might want to unlock GVL
122      * TODO(yugui) Unlock GVL?
123      */
124     0,
125 #endif
126 };
127 
128 /* Allocates grpc_rb_server instances. */
grpc_rb_server_alloc(VALUE cls)129 static VALUE grpc_rb_server_alloc(VALUE cls) {
130   grpc_ruby_init();
131   grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
132   wrapper->wrapped = NULL;
133   wrapper->destroy_done = 0;
134   return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
135 }
136 
137 /*
138   call-seq:
139     server = Server.new({'arg1': 'value1'})
140 
141   Initializes server instances. */
grpc_rb_server_init(VALUE self,VALUE channel_args)142 static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
143   grpc_completion_queue* cq = NULL;
144   grpc_rb_server* wrapper = NULL;
145   grpc_server* srv = NULL;
146   grpc_channel_args args;
147   MEMZERO(&args, grpc_channel_args, 1);
148 
149   cq = grpc_completion_queue_create_for_pluck(NULL);
150   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
151                        wrapper);
152   grpc_rb_hash_convert_to_channel_args(channel_args, &args);
153   srv = grpc_server_create(&args, NULL);
154   grpc_rb_channel_args_destroy(&args);
155   if (srv == NULL) {
156     rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why");
157   }
158   grpc_server_register_completion_queue(srv, cq, NULL);
159   wrapper->wrapped = srv;
160   wrapper->queue = cq;
161 
162   return self;
163 }
164 
165 /* request_call_stack holds various values used by the
166  * grpc_rb_server_request_call function */
167 typedef struct request_call_stack {
168   grpc_call_details details;
169   grpc_metadata_array md_ary;
170 } request_call_stack;
171 
172 /* grpc_request_call_stack_init ensures the request_call_stack is properly
173  * initialized */
grpc_request_call_stack_init(request_call_stack * st)174 static void grpc_request_call_stack_init(request_call_stack* st) {
175   MEMZERO(st, request_call_stack, 1);
176   grpc_metadata_array_init(&st->md_ary);
177   grpc_call_details_init(&st->details);
178 }
179 
180 /* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
181  * cleaned up */
grpc_request_call_stack_cleanup(request_call_stack * st)182 static void grpc_request_call_stack_cleanup(request_call_stack* st) {
183   grpc_metadata_array_destroy(&st->md_ary);
184   grpc_call_details_destroy(&st->details);
185 }
186 
187 struct server_request_call_args {
188   grpc_rb_server* server;
189   grpc_completion_queue* call_queue;
190   request_call_stack st;
191 };
192 
shutdown_server_unblock_func(void * arg)193 static void shutdown_server_unblock_func(void* arg) {
194   grpc_rb_server* server = (grpc_rb_server*)arg;
195   grpc_absl_log(GPR_DEBUG, "GRPC_RUBY: shutdown_server_unblock_func");
196   GRPC_RUBY_ASSERT(server->wrapped != NULL);
197   grpc_event event;
198   void* tag = &event;
199   grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
200   grpc_server_cancel_all_calls(server->wrapped);
201   // Following call is blocking, but should finish quickly since we've
202   // cancelled all calls.
203   event = grpc_completion_queue_pluck(server->queue, tag,
204                                       gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
205   grpc_absl_log_int(
206       GPR_DEBUG,
207       "GRPC_RUBY: shutdown_server_unblock_func pluck event.type: ", event.type);
208   grpc_absl_log_int(
209       GPR_DEBUG,
210       "GRPC_RUBY: shutdown_server_unblock_func event.success: ", event.success);
211 }
212 
grpc_rb_server_request_call_try(VALUE value_args)213 static VALUE grpc_rb_server_request_call_try(VALUE value_args) {
214   grpc_rb_fork_unsafe_begin();
215   struct server_request_call_args* args =
216       (struct server_request_call_args*)value_args;
217 
218   grpc_call* call = NULL;
219   void* tag = (void*)&args->st;
220 
221   args->call_queue = grpc_completion_queue_create_for_pluck(NULL);
222   grpc_request_call_stack_init(&args->st);
223 
224   /* call grpc_server_request_call, then wait for it to complete using
225    * pluck_event */
226   grpc_call_error err = grpc_server_request_call(
227       args->server->wrapped, &call, &args->st.details, &args->st.md_ary,
228       args->call_queue, args->server->queue, tag);
229   if (err != GRPC_CALL_OK) {
230     rb_raise(grpc_rb_eCallError,
231              "grpc_server_request_call failed: %s (code=%d)",
232              grpc_call_error_detail_of(err), err);
233   }
234 
235   grpc_event ev = rb_completion_queue_pluck(
236       args->server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
237       shutdown_server_unblock_func, args->server);
238   if (!ev.success) {
239     rb_raise(grpc_rb_eCallError, "request_call completion failed");
240   }
241 
242   /* build the NewServerRpc struct result */
243   gpr_timespec deadline =
244       gpr_convert_clock_type(args->st.details.deadline, GPR_CLOCK_REALTIME);
245   VALUE result =
246       rb_struct_new(grpc_rb_sNewServerRpc,
247                     grpc_rb_slice_to_ruby_string(args->st.details.method),
248                     grpc_rb_slice_to_ruby_string(args->st.details.host),
249                     rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
250                                INT2NUM(deadline.tv_nsec / 1000)),
251                     grpc_rb_md_ary_to_h(&args->st.md_ary),
252                     grpc_rb_wrap_call(call, args->call_queue), NULL);
253   args->call_queue = NULL;
254   return result;
255 }
256 
grpc_rb_server_request_call_ensure(VALUE value_args)257 static VALUE grpc_rb_server_request_call_ensure(VALUE value_args) {
258   grpc_rb_fork_unsafe_end();
259   struct server_request_call_args* args =
260       (struct server_request_call_args*)value_args;
261 
262   if (args->call_queue) {
263     grpc_rb_completion_queue_destroy(args->call_queue);
264   }
265 
266   grpc_request_call_stack_cleanup(&args->st);
267 
268   return Qnil;
269 }
270 
271 /* call-seq:
272    server.request_call
273 
274    Requests notification of a new call on a server. */
grpc_rb_server_request_call(VALUE self)275 static VALUE grpc_rb_server_request_call(VALUE self) {
276   grpc_rb_server* s;
277   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
278   grpc_ruby_fork_guard();
279   if (s->wrapped == NULL) {
280     rb_raise(rb_eRuntimeError, "destroyed!");
281   }
282   struct server_request_call_args args = {.server = s, .call_queue = NULL};
283   return rb_ensure(grpc_rb_server_request_call_try, (VALUE)&args,
284                    grpc_rb_server_request_call_ensure, (VALUE)&args);
285 }
286 
grpc_rb_server_start(VALUE self)287 static VALUE grpc_rb_server_start(VALUE self) {
288   grpc_rb_server* s = NULL;
289   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
290   grpc_ruby_fork_guard();
291   if (s->wrapped == NULL) {
292     rb_raise(rb_eRuntimeError, "destroyed!");
293   } else {
294     grpc_server_start(s->wrapped);
295   }
296   return Qnil;
297 }
298 
grpc_rb_server_shutdown_and_notify(VALUE self,VALUE timeout)299 static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
300   gpr_timespec deadline;
301   grpc_rb_server* s = NULL;
302 
303   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
304   if (TYPE(timeout) == T_NIL) {
305     deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
306   } else {
307     deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
308   }
309 
310   grpc_rb_server_shutdown_and_notify_internal(s, deadline);
311 
312   return Qnil;
313 }
314 
315 /*
316   call-seq:
317     server = Server.new({'arg1': 'value1'})
318     ... // do stuff with server
319     ...
320     ... // initiate server shutdown
321     server.shutdown_and_notify(timeout)
322     ... // to shutdown the server
323     server.destroy()
324 
325   Destroys server instances. */
grpc_rb_server_destroy(VALUE self)326 static VALUE grpc_rb_server_destroy(VALUE self) {
327   grpc_rb_server* s = NULL;
328   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
329   grpc_rb_server_maybe_destroy(s);
330   return Qnil;
331 }
332 
333 /*
334   call-seq:
335     // insecure port
336     insecure_server = Server.new(cq, {'arg1': 'value1'})
337     insecure_server.add_http2_port('mydomain:50051', :this_port_is_insecure)
338 
339     // secure port
340     server_creds = ...
341     secure_server = Server.new(cq, {'arg1': 'value1'})
342     secure_server.add_http_port('mydomain:50051', server_creds)
343 
344     Adds a http2 port to server */
grpc_rb_server_add_http2_port(VALUE self,VALUE port,VALUE rb_creds)345 static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
346                                            VALUE rb_creds) {
347   grpc_rb_server* s = NULL;
348   grpc_server_credentials* creds = NULL;
349   int recvd_port = 0;
350 
351   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
352   if (s->wrapped == NULL) {
353     rb_raise(rb_eRuntimeError, "destroyed!");
354     return Qnil;
355   } else if (TYPE(rb_creds) == T_SYMBOL) {
356     if (id_insecure_server != SYM2ID(rb_creds)) {
357       rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
358       return Qnil;
359     }
360     grpc_server_credentials* insecure_creds =
361         grpc_insecure_server_credentials_create();
362     recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port),
363                                             insecure_creds);
364     grpc_server_credentials_release(insecure_creds);
365     if (recvd_port == 0) {
366       rb_raise(rb_eRuntimeError,
367                "could not add port %s to server, not sure why",
368                StringValueCStr(port));
369     }
370   } else {
371     // TODO: create a common parent class for all server-side credentials,
372     // then we can have a single method to retrieve the underlying
373     // grpc_server_credentials object, and avoid the need for this reflection
374     if (grpc_rb_is_server_credentials(rb_creds)) {
375       creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
376     } else if (grpc_rb_is_xds_server_credentials(rb_creds)) {
377       creds = grpc_rb_get_wrapped_xds_server_credentials(rb_creds);
378     } else {
379       rb_raise(rb_eTypeError,
380                "failed to create server because credentials parameter has an "
381                "invalid type, want ServerCredentials or XdsServerCredentials");
382     }
383     recvd_port =
384         grpc_server_add_http2_port(s->wrapped, StringValueCStr(port), creds);
385     if (recvd_port == 0) {
386       rb_raise(rb_eRuntimeError,
387                "could not add secure port %s to server, not sure why",
388                StringValueCStr(port));
389     }
390   }
391   return INT2NUM(recvd_port);
392 }
393 
Init_grpc_server()394 void Init_grpc_server() {
395   grpc_rb_cServer =
396       rb_define_class_under(grpc_rb_mGrpcCore, "Server", rb_cObject);
397 
398   /* Allocates an object managed by the ruby runtime */
399   rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc);
400 
401   /* Provides a ruby constructor and support for dup/clone. */
402   rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
403   rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
404                    1);
405 
406   /* Add the server methods. */
407   rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
408                    0);
409   rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
410   rb_define_method(grpc_rb_cServer, "shutdown_and_notify",
411                    grpc_rb_server_shutdown_and_notify, 1);
412   rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
413   rb_define_alias(grpc_rb_cServer, "close", "destroy");
414   rb_define_method(grpc_rb_cServer, "add_http2_port",
415                    grpc_rb_server_add_http2_port, 2);
416   id_at = rb_intern("at");
417   id_insecure_server = rb_intern("this_port_is_insecure");
418 }
419 
420 /* Gets the wrapped server from the ruby wrapper */
grpc_rb_get_wrapped_server(VALUE v)421 grpc_server* grpc_rb_get_wrapped_server(VALUE v) {
422   grpc_rb_server* wrapper = NULL;
423   TypedData_Get_Struct(v, grpc_rb_server, &grpc_rb_server_data_type, wrapper);
424   return wrapper->wrapped;
425 }
426