1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import copy 16 17import grpc 18 19 20class _RequestIterator(object): 21 def __init__(self, rpc, handler): 22 self._rpc = rpc 23 self._handler = handler 24 25 def _next(self): 26 read = self._handler.take_request() 27 if read.requests_closed: 28 raise StopIteration() 29 elif read.terminated: 30 rpc_error = grpc.RpcError() 31 self._rpc.add_rpc_error(rpc_error) 32 raise rpc_error 33 else: 34 return read.request 35 36 def __iter__(self): 37 return self 38 39 def __next__(self): 40 return self._next() 41 42 def next(self): 43 return self._next() 44 45 46def _unary_response(argument, implementation, rpc, servicer_context): 47 try: 48 response = implementation(argument, servicer_context) 49 except Exception as exception: # pylint: disable=broad-except 50 rpc.application_exception_abort(exception) 51 else: 52 rpc.unary_response_complete(response) 53 54 55def _stream_response(argument, implementation, rpc, servicer_context): 56 try: 57 response_iterator = implementation(argument, servicer_context) 58 except Exception as exception: # pylint: disable=broad-except 59 rpc.application_exception_abort(exception) 60 else: 61 while True: 62 try: 63 response = copy.deepcopy(next(response_iterator)) 64 except StopIteration: 65 rpc.stream_response_complete() 66 break 67 except Exception as exception: # pylint: disable=broad-except 68 rpc.application_exception_abort(exception) 69 break 70 else: 71 rpc.stream_response(response) 72 73 74def unary_unary(implementation, rpc, request, servicer_context): 75 _unary_response(request, implementation, rpc, servicer_context) 76 77 78def unary_stream(implementation, rpc, request, servicer_context): 79 _stream_response(request, implementation, rpc, servicer_context) 80 81 82def stream_unary(implementation, rpc, handler, servicer_context): 83 _unary_response( 84 _RequestIterator(rpc, handler), implementation, rpc, servicer_context 85 ) 86 87 88def stream_stream(implementation, rpc, handler, servicer_context): 89 _stream_response( 90 _RequestIterator(rpc, handler), implementation, rpc, servicer_context 91 ) 92