1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example gRPC Python-using server-side application.""" 15 16import threading 17 18import grpc 19 20# requests_pb2 is a semantic dependency of this module. 21from tests.testing import _application_common 22from tests.testing.proto import requests_pb2 # pylint: disable=unused-import 23from tests.testing.proto import services_pb2 24from tests.testing.proto import services_pb2_grpc 25 26 27class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): 28 """Services RPCs.""" 29 30 def __init__(self): 31 self._abort_lock = threading.RLock() 32 self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE 33 34 def UnUn(self, request, context): 35 if request == _application_common.UNARY_UNARY_REQUEST: 36 return _application_common.UNARY_UNARY_RESPONSE 37 elif request == _application_common.ABORT_REQUEST: 38 with self._abort_lock: 39 try: 40 context.abort( 41 grpc.StatusCode.PERMISSION_DENIED, 42 "Denying permission to test abort.", 43 ) 44 except Exception as e: # pylint: disable=broad-except 45 self._abort_response = ( 46 _application_common.ABORT_SUCCESS_RESPONSE 47 ) 48 else: 49 self._abort_status = ( 50 _application_common.ABORT_FAILURE_RESPONSE 51 ) 52 return None # NOTE: For the linter. 53 elif request == _application_common.ABORT_SUCCESS_QUERY: 54 with self._abort_lock: 55 return self._abort_response 56 else: 57 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 58 context.set_details("Something is wrong with your request!") 59 return services_pb2.Down() 60 61 def UnStre(self, request, context): 62 if _application_common.UNARY_STREAM_REQUEST != request: 63 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 64 context.set_details("Something is wrong with your request!") 65 return 66 yield services_pb2.Strange() # pylint: disable=unreachable 67 68 def StreUn(self, request_iterator, context): 69 context.send_initial_metadata( 70 ( 71 ( 72 "server_application_metadata_key", 73 "Hi there!", 74 ), 75 ) 76 ) 77 for request in request_iterator: 78 if request != _application_common.STREAM_UNARY_REQUEST: 79 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 80 context.set_details("Something is wrong with your request!") 81 return services_pb2.Strange() 82 elif not context.is_active(): 83 return services_pb2.Strange() 84 else: 85 return _application_common.STREAM_UNARY_RESPONSE 86 87 def StreStre(self, request_iterator, context): 88 valid_requests = ( 89 _application_common.STREAM_STREAM_REQUEST, 90 _application_common.STREAM_STREAM_MUTATING_REQUEST, 91 ) 92 for request in request_iterator: 93 if request not in valid_requests: 94 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 95 context.set_details("Something is wrong with your request!") 96 return 97 elif not context.is_active(): 98 return 99 elif request == _application_common.STREAM_STREAM_REQUEST: 100 yield _application_common.STREAM_STREAM_RESPONSE 101 yield _application_common.STREAM_STREAM_RESPONSE 102 elif request == _application_common.STREAM_STREAM_MUTATING_REQUEST: 103 response = services_pb2.Bottom() 104 for i in range( 105 _application_common.STREAM_STREAM_MUTATING_COUNT 106 ): 107 response.first_bottom_field = i 108 yield response 109