• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16cdef class Server:
17
18  def __cinit__(self, object arguments, bint xds):
19    fork_handlers_and_grpc_init()
20    self.references = []
21    self.registered_completion_queues = []
22    self.is_started = False
23    self.is_shutting_down = False
24    self.is_shutdown = False
25    self.c_server = NULL
26    cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
27    self.c_server = grpc_server_create(channel_args.c_args(), NULL)
28    cdef grpc_server_xds_status_notifier notifier
29    notifier.on_serving_status_update = NULL
30    notifier.user_data = NULL
31    if xds:
32      grpc_server_set_config_fetcher(self.c_server,
33        grpc_server_config_fetcher_xds_create(notifier, channel_args.c_args()))
34    self.references.append(arguments)
35
36  def request_call(
37      self, CompletionQueue call_queue not None,
38      CompletionQueue server_queue not None, tag):
39    if not self.is_started or self.is_shutting_down:
40      raise ValueError("server must be started and not shutting down")
41    if server_queue not in self.registered_completion_queues:
42      raise ValueError("server_queue must be a registered completion queue")
43    cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
44    request_call_tag.prepare()
45    cpython.Py_INCREF(request_call_tag)
46    return grpc_server_request_call(
47        self.c_server, &request_call_tag.call.c_call,
48        &request_call_tag.call_details.c_details,
49        &request_call_tag.c_invocation_metadata,
50        call_queue.c_completion_queue, server_queue.c_completion_queue,
51        <cpython.PyObject *>request_call_tag)
52
53  def register_completion_queue(
54      self, CompletionQueue queue not None):
55    if self.is_started:
56      raise ValueError("cannot register completion queues after start")
57    with nogil:
58      grpc_server_register_completion_queue(
59          self.c_server, queue.c_completion_queue, NULL)
60    self.registered_completion_queues.append(queue)
61
62  def start(self, backup_queue=True):
63    """Start the Cython gRPC Server.
64
65    Args:
66      backup_queue: a bool indicates whether to spawn a backup completion
67        queue. In the case that no CQ is bound to the server, and the shutdown
68        of server becomes un-observable.
69    """
70    if self.is_started:
71      raise ValueError("the server has already started")
72    if backup_queue:
73      self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True)
74      self.register_completion_queue(self.backup_shutdown_queue)
75    self.is_started = True
76    with nogil:
77      grpc_server_start(self.c_server)
78    if backup_queue:
79      # Ensure the core has gotten a chance to do the start-up work
80      self.backup_shutdown_queue.poll(deadline=time.time())
81
82  def add_http2_port(self, bytes address,
83                     ServerCredentials server_credentials=None):
84    address = str_to_bytes(address)
85    self.references.append(address)
86    cdef int result
87    cdef char *address_c_string = address
88    if server_credentials is not None:
89      self.references.append(server_credentials)
90      with nogil:
91        result = grpc_server_add_http2_port(
92            self.c_server, address_c_string, server_credentials.c_credentials)
93    else:
94      with nogil:
95        creds = grpc_insecure_server_credentials_create()
96        result = grpc_server_add_http2_port(self.c_server,
97                                            address_c_string, creds)
98        grpc_server_credentials_release(creds)
99    return result
100
101  cdef _c_shutdown(self, CompletionQueue queue, tag):
102    self.is_shutting_down = True
103    cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
104    cpython.Py_INCREF(server_shutdown_tag)
105    with nogil:
106      grpc_server_shutdown_and_notify(
107          self.c_server, queue.c_completion_queue,
108          <cpython.PyObject *>server_shutdown_tag)
109
110  def shutdown(self, CompletionQueue queue not None, tag):
111    if queue.is_shutting_down:
112      raise ValueError("queue must be live")
113    elif not self.is_started:
114      raise ValueError("the server hasn't started yet")
115    elif self.is_shutting_down:
116      return
117    elif queue not in self.registered_completion_queues:
118      raise ValueError("expected registered completion queue")
119    else:
120      self._c_shutdown(queue, tag)
121
122  cdef notify_shutdown_complete(self):
123    # called only after our server shutdown tag has emerged from a completion
124    # queue.
125    self.is_shutdown = True
126
127  def cancel_all_calls(self):
128    if not self.is_shutting_down:
129      raise UsageError("the server must be shutting down to cancel all calls")
130    elif self.is_shutdown:
131      return
132    else:
133      with nogil:
134        grpc_server_cancel_all_calls(self.c_server)
135
136  # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any,
137  # portion of this is safe to call from __dealloc__, and potentially remove
138  # backup_shutdown_queue.
139  def destroy(self):
140    if self.c_server != NULL:
141      if not self.is_started:
142        pass
143      elif self.is_shutdown:
144        pass
145      elif not self.is_shutting_down:
146        if self.backup_shutdown_queue is None:
147          raise InternalError('Server shutdown failed: no completion queue.')
148        else:
149          # the user didn't call shutdown - use our backup queue
150          self._c_shutdown(self.backup_shutdown_queue, None)
151          # and now we wait
152          while not self.is_shutdown:
153            self.backup_shutdown_queue.poll()
154      else:
155        # We're in the process of shutting down, but have not shutdown; can't do
156        # much but repeatedly release the GIL and wait
157        while not self.is_shutdown:
158          time.sleep(0)
159      with nogil:
160        grpc_server_destroy(self.c_server)
161        self.c_server = NULL
162
163  def __dealloc__(self):
164    if self.c_server == NULL:
165      grpc_shutdown()
166