• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_grpc_imports.generated.h"
22 #include "rb_server.h"
23 
24 #include <grpc/grpc.h>
25 #include <grpc/grpc_security.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/log.h>
28 #include "rb_byte_buffer.h"
29 #include "rb_call.h"
30 #include "rb_channel_args.h"
31 #include "rb_completion_queue.h"
32 #include "rb_grpc.h"
33 #include "rb_server_credentials.h"
34 
35 /* grpc_rb_cServer is the ruby class that proxies grpc_server. */
36 static VALUE grpc_rb_cServer = Qnil;
37 
38 /* id_at is the constructor method of the ruby standard Time class. */
39 static ID id_at;
40 
41 /* id_insecure_server is used to indicate that a server is insecure */
42 static VALUE id_insecure_server;
43 
44 /* grpc_rb_server wraps a grpc_server. */
45 typedef struct grpc_rb_server {
46   /* The actual server */
47   grpc_server* wrapped;
48   grpc_completion_queue* queue;
49   int shutdown_and_notify_done;
50   int destroy_done;
51 } grpc_rb_server;
52 
grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server * server,gpr_timespec deadline)53 static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server,
54                                                      gpr_timespec deadline) {
55   grpc_event ev;
56   void* tag = &ev;
57   if (!server->shutdown_and_notify_done) {
58     server->shutdown_and_notify_done = 1;
59     if (server->wrapped != NULL) {
60       grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
61       ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL);
62       if (ev.type == GRPC_QUEUE_TIMEOUT) {
63         grpc_server_cancel_all_calls(server->wrapped);
64         ev = rb_completion_queue_pluck(
65             server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
66       }
67       if (ev.type != GRPC_OP_COMPLETE) {
68         gpr_log(GPR_INFO,
69                 "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
70                 ev.type);
71       }
72     }
73   }
74 }
75 
grpc_rb_server_maybe_destroy(grpc_rb_server * server)76 static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
77   // This can be started by app or implicitly by GC. Avoid a race between these.
78   if (!server->destroy_done) {
79     server->destroy_done = 1;
80     if (server->wrapped != NULL) {
81       grpc_server_destroy(server->wrapped);
82       grpc_rb_completion_queue_destroy(server->queue);
83       server->wrapped = NULL;
84       server->queue = NULL;
85     }
86   }
87 }
88 
grpc_rb_server_free_internal(void * p)89 static void grpc_rb_server_free_internal(void* p) {
90   grpc_rb_server* svr = NULL;
91   gpr_timespec deadline;
92   if (p == NULL) {
93     return;
94   };
95   svr = (grpc_rb_server*)p;
96 
97   deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
98                           gpr_time_from_seconds(2, GPR_TIMESPAN));
99 
100   grpc_rb_server_maybe_shutdown_and_notify(svr, deadline);
101   grpc_rb_server_maybe_destroy(svr);
102 
103   xfree(p);
104 }
105 
106 /* Destroys server instances. */
grpc_rb_server_free(void * p)107 static void grpc_rb_server_free(void* p) {
108   grpc_rb_server_free_internal(p);
109   grpc_ruby_shutdown();
110 }
111 
112 static const rb_data_type_t grpc_rb_server_data_type = {
113     "grpc_server",
114     {GRPC_RB_GC_NOT_MARKED,
115      grpc_rb_server_free,
116      GRPC_RB_MEMSIZE_UNAVAILABLE,
117      {NULL, NULL}},
118     NULL,
119     NULL,
120 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
121     /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
122      * function would block and we might want to unlock GVL
123      * TODO(yugui) Unlock GVL?
124      */
125     0,
126 #endif
127 };
128 
129 /* Allocates grpc_rb_server instances. */
grpc_rb_server_alloc(VALUE cls)130 static VALUE grpc_rb_server_alloc(VALUE cls) {
131   grpc_ruby_init();
132   grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
133   wrapper->wrapped = NULL;
134   wrapper->destroy_done = 0;
135   wrapper->shutdown_and_notify_done = 0;
136   return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
137 }
138 
139 /*
140   call-seq:
141     server = Server.new({'arg1': 'value1'})
142 
143   Initializes server instances. */
grpc_rb_server_init(VALUE self,VALUE channel_args)144 static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
145   grpc_completion_queue* cq = NULL;
146   grpc_rb_server* wrapper = NULL;
147   grpc_server* srv = NULL;
148   grpc_channel_args args;
149   MEMZERO(&args, grpc_channel_args, 1);
150 
151   cq = grpc_completion_queue_create_for_pluck(NULL);
152   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
153                        wrapper);
154   grpc_rb_hash_convert_to_channel_args(channel_args, &args);
155   srv = grpc_server_create(&args, NULL);
156 
157   if (args.args != NULL) {
158     xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
159   }
160   if (srv == NULL) {
161     rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why");
162   }
163   grpc_server_register_completion_queue(srv, cq, NULL);
164   wrapper->wrapped = srv;
165   wrapper->queue = cq;
166 
167   return self;
168 }
169 
170 /* request_call_stack holds various values used by the
171  * grpc_rb_server_request_call function */
172 typedef struct request_call_stack {
173   grpc_call_details details;
174   grpc_metadata_array md_ary;
175 } request_call_stack;
176 
177 /* grpc_request_call_stack_init ensures the request_call_stack is properly
178  * initialized */
grpc_request_call_stack_init(request_call_stack * st)179 static void grpc_request_call_stack_init(request_call_stack* st) {
180   MEMZERO(st, request_call_stack, 1);
181   grpc_metadata_array_init(&st->md_ary);
182   grpc_call_details_init(&st->details);
183 }
184 
185 /* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
186  * cleaned up */
grpc_request_call_stack_cleanup(request_call_stack * st)187 static void grpc_request_call_stack_cleanup(request_call_stack* st) {
188   grpc_metadata_array_destroy(&st->md_ary);
189   grpc_call_details_destroy(&st->details);
190 }
191 
192 /* call-seq:
193    server.request_call
194 
195    Requests notification of a new call on a server. */
grpc_rb_server_request_call(VALUE self)196 static VALUE grpc_rb_server_request_call(VALUE self) {
197   grpc_rb_server* s = NULL;
198   grpc_call* call = NULL;
199   grpc_event ev;
200   grpc_call_error err;
201   request_call_stack st;
202   VALUE result;
203   void* tag = (void*)&st;
204   grpc_completion_queue* call_queue =
205       grpc_completion_queue_create_for_pluck(NULL);
206   gpr_timespec deadline;
207 
208   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
209   if (s->wrapped == NULL) {
210     rb_raise(rb_eRuntimeError, "destroyed!");
211     return Qnil;
212   }
213   grpc_request_call_stack_init(&st);
214   /* call grpc_server_request_call, then wait for it to complete using
215    * pluck_event */
216   err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary,
217                                  call_queue, s->queue, tag);
218   if (err != GRPC_CALL_OK) {
219     grpc_request_call_stack_cleanup(&st);
220     rb_raise(grpc_rb_eCallError,
221              "grpc_server_request_call failed: %s (code=%d)",
222              grpc_call_error_detail_of(err), err);
223     return Qnil;
224   }
225 
226   ev = rb_completion_queue_pluck(s->queue, tag,
227                                  gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
228   if (!ev.success) {
229     grpc_request_call_stack_cleanup(&st);
230     rb_raise(grpc_rb_eCallError, "request_call completion failed");
231     return Qnil;
232   }
233 
234   /* build the NewServerRpc struct result */
235   deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
236   result = rb_struct_new(
237       grpc_rb_sNewServerRpc, grpc_rb_slice_to_ruby_string(st.details.method),
238       grpc_rb_slice_to_ruby_string(st.details.host),
239       rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
240                  INT2NUM(deadline.tv_nsec / 1000)),
241       grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call, call_queue),
242       NULL);
243   grpc_request_call_stack_cleanup(&st);
244   return result;
245 }
246 
grpc_rb_server_start(VALUE self)247 static VALUE grpc_rb_server_start(VALUE self) {
248   grpc_rb_server* s = NULL;
249   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
250 
251   grpc_ruby_fork_guard();
252   if (s->wrapped == NULL) {
253     rb_raise(rb_eRuntimeError, "destroyed!");
254   } else {
255     grpc_server_start(s->wrapped);
256   }
257   return Qnil;
258 }
259 
grpc_rb_server_shutdown_and_notify(VALUE self,VALUE timeout)260 static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
261   gpr_timespec deadline;
262   grpc_rb_server* s = NULL;
263 
264   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
265   if (TYPE(timeout) == T_NIL) {
266     deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
267   } else {
268     deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
269   }
270 
271   grpc_rb_server_maybe_shutdown_and_notify(s, deadline);
272 
273   return Qnil;
274 }
275 
276 /*
277   call-seq:
278     server = Server.new({'arg1': 'value1'})
279     ... // do stuff with server
280     ...
281     ... // initiate server shutdown
282     server.shutdown_and_notify(timeout)
283     ... // to shutdown the server
284     server.destroy()
285 
286   Destroys server instances. */
grpc_rb_server_destroy(VALUE self)287 static VALUE grpc_rb_server_destroy(VALUE self) {
288   grpc_rb_server* s = NULL;
289   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
290   grpc_rb_server_maybe_destroy(s);
291   return Qnil;
292 }
293 
294 /*
295   call-seq:
296     // insecure port
297     insecure_server = Server.new(cq, {'arg1': 'value1'})
298     insecure_server.add_http2_port('mydomain:50051', :this_port_is_insecure)
299 
300     // secure port
301     server_creds = ...
302     secure_server = Server.new(cq, {'arg1': 'value1'})
303     secure_server.add_http_port('mydomain:50051', server_creds)
304 
305     Adds a http2 port to server */
grpc_rb_server_add_http2_port(VALUE self,VALUE port,VALUE rb_creds)306 static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
307                                            VALUE rb_creds) {
308   grpc_rb_server* s = NULL;
309   grpc_server_credentials* creds = NULL;
310   int recvd_port = 0;
311 
312   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
313   if (s->wrapped == NULL) {
314     rb_raise(rb_eRuntimeError, "destroyed!");
315     return Qnil;
316   } else if (TYPE(rb_creds) == T_SYMBOL) {
317     if (id_insecure_server != SYM2ID(rb_creds)) {
318       rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
319       return Qnil;
320     }
321     recvd_port =
322         grpc_server_add_insecure_http2_port(s->wrapped, StringValueCStr(port));
323     if (recvd_port == 0) {
324       rb_raise(rb_eRuntimeError,
325                "could not add port %s to server, not sure why",
326                StringValueCStr(port));
327     }
328   } else {
329     creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
330     recvd_port = grpc_server_add_secure_http2_port(
331         s->wrapped, StringValueCStr(port), creds);
332     if (recvd_port == 0) {
333       rb_raise(rb_eRuntimeError,
334                "could not add secure port %s to server, not sure why",
335                StringValueCStr(port));
336     }
337   }
338   return INT2NUM(recvd_port);
339 }
340 
Init_grpc_server()341 void Init_grpc_server() {
342   grpc_rb_cServer =
343       rb_define_class_under(grpc_rb_mGrpcCore, "Server", rb_cObject);
344 
345   /* Allocates an object managed by the ruby runtime */
346   rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc);
347 
348   /* Provides a ruby constructor and support for dup/clone. */
349   rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
350   rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
351                    1);
352 
353   /* Add the server methods. */
354   rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
355                    0);
356   rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
357   rb_define_method(grpc_rb_cServer, "shutdown_and_notify",
358                    grpc_rb_server_shutdown_and_notify, 1);
359   rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
360   rb_define_alias(grpc_rb_cServer, "close", "destroy");
361   rb_define_method(grpc_rb_cServer, "add_http2_port",
362                    grpc_rb_server_add_http2_port, 2);
363   id_at = rb_intern("at");
364   id_insecure_server = rb_intern("this_port_is_insecure");
365 }
366 
367 /* Gets the wrapped server from the ruby wrapper */
grpc_rb_get_wrapped_server(VALUE v)368 grpc_server* grpc_rb_get_wrapped_server(VALUE v) {
369   grpc_rb_server* wrapper = NULL;
370   TypedData_Get_Struct(v, grpc_rb_server, &grpc_rb_server_data_type, wrapper);
371   return wrapper->wrapped;
372 }
373