• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import copy
16
17import grpc
18
19
20class _RequestIterator(object):
21    def __init__(self, rpc, handler):
22        self._rpc = rpc
23        self._handler = handler
24
25    def _next(self):
26        read = self._handler.take_request()
27        if read.requests_closed:
28            raise StopIteration()
29        elif read.terminated:
30            rpc_error = grpc.RpcError()
31            self._rpc.add_rpc_error(rpc_error)
32            raise rpc_error
33        else:
34            return read.request
35
36    def __iter__(self):
37        return self
38
39    def __next__(self):
40        return self._next()
41
42    def next(self):
43        return self._next()
44
45
46def _unary_response(argument, implementation, rpc, servicer_context):
47    try:
48        response = implementation(argument, servicer_context)
49    except Exception as exception:  # pylint: disable=broad-except
50        rpc.application_exception_abort(exception)
51    else:
52        rpc.unary_response_complete(response)
53
54
55def _stream_response(argument, implementation, rpc, servicer_context):
56    try:
57        response_iterator = implementation(argument, servicer_context)
58    except Exception as exception:  # pylint: disable=broad-except
59        rpc.application_exception_abort(exception)
60    else:
61        while True:
62            try:
63                response = copy.deepcopy(next(response_iterator))
64            except StopIteration:
65                rpc.stream_response_complete()
66                break
67            except Exception as exception:  # pylint: disable=broad-except
68                rpc.application_exception_abort(exception)
69                break
70            else:
71                rpc.stream_response(response)
72
73
74def unary_unary(implementation, rpc, request, servicer_context):
75    _unary_response(request, implementation, rpc, servicer_context)
76
77
78def unary_stream(implementation, rpc, request, servicer_context):
79    _stream_response(request, implementation, rpc, servicer_context)
80
81
82def stream_unary(implementation, rpc, handler, servicer_context):
83    _unary_response(
84        _RequestIterator(rpc, handler), implementation, rpc, servicer_context
85    )
86
87
88def stream_stream(implementation, rpc, handler, servicer_context):
89    _stream_response(
90        _RequestIterator(rpc, handler), implementation, rpc, servicer_context
91    )
92