1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example gRPC Python-using server-side application.""" 15 16import grpc 17 18import threading 19 20# requests_pb2 is a semantic dependency of this module. 21from tests.testing import _application_common 22from tests.testing.proto import requests_pb2 # pylint: disable=unused-import 23from tests.testing.proto import services_pb2 24from tests.testing.proto import services_pb2_grpc 25 26 27class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): 28 """Services RPCs.""" 29 30 def __init__(self): 31 self._abort_lock = threading.RLock() 32 self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE 33 34 def UnUn(self, request, context): 35 if request == _application_common.UNARY_UNARY_REQUEST: 36 return _application_common.UNARY_UNARY_RESPONSE 37 elif request == _application_common.ABORT_REQUEST: 38 with self._abort_lock: 39 try: 40 context.abort(grpc.StatusCode.PERMISSION_DENIED, 41 "Denying permission to test abort.") 42 except Exception as e: # pylint: disable=broad-except 43 self._abort_response = _application_common.ABORT_SUCCESS_RESPONSE 44 else: 45 self._abort_status = _application_common.ABORT_FAILURE_RESPONSE 46 return None # NOTE: For the linter. 47 elif request == _application_common.ABORT_SUCCESS_QUERY: 48 with self._abort_lock: 49 return self._abort_response 50 else: 51 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 52 context.set_details('Something is wrong with your request!') 53 return services_pb2.Down() 54 55 def UnStre(self, request, context): 56 if _application_common.UNARY_STREAM_REQUEST != request: 57 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 58 context.set_details('Something is wrong with your request!') 59 return 60 yield services_pb2.Strange() # pylint: disable=unreachable 61 62 def StreUn(self, request_iterator, context): 63 context.send_initial_metadata((( 64 'server_application_metadata_key', 65 'Hi there!', 66 ),)) 67 for request in request_iterator: 68 if request != _application_common.STREAM_UNARY_REQUEST: 69 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 70 context.set_details('Something is wrong with your request!') 71 return services_pb2.Strange() 72 elif not context.is_active(): 73 return services_pb2.Strange() 74 else: 75 return _application_common.STREAM_UNARY_RESPONSE 76 77 def StreStre(self, request_iterator, context): 78 valid_requests = (_application_common.STREAM_STREAM_REQUEST, 79 _application_common.STREAM_STREAM_MUTATING_REQUEST) 80 for request in request_iterator: 81 if request not in valid_requests: 82 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 83 context.set_details('Something is wrong with your request!') 84 return 85 elif not context.is_active(): 86 return 87 elif request == _application_common.STREAM_STREAM_REQUEST: 88 yield _application_common.STREAM_STREAM_RESPONSE 89 yield _application_common.STREAM_STREAM_RESPONSE 90 elif request == _application_common.STREAM_STREAM_MUTATING_REQUEST: 91 response = services_pb2.Bottom() 92 for i in range( 93 _application_common.STREAM_STREAM_MUTATING_COUNT): 94 response.first_bottom_field = i 95 yield response 96