1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests server context abort mechanism""" 15 16import unittest 17import collections 18import gc 19import logging 20import weakref 21 22import grpc 23 24from tests.unit import test_common 25from tests.unit.framework.common import test_constants 26 27_ABORT = '/test/abort' 28_ABORT_WITH_STATUS = '/test/AbortWithStatus' 29_INVALID_CODE = '/test/InvalidCode' 30 31_REQUEST = b'\x00\x00\x00' 32_RESPONSE = b'\x00\x00\x00' 33 34_ABORT_DETAILS = 'Abandon ship!' 35_ABORT_METADATA = (('a-trailing-metadata', '42'),) 36 37 38class _Status( 39 collections.namedtuple('_Status', 40 ('code', 'details', 'trailing_metadata')), 41 grpc.Status): 42 pass 43 44 45class _Object(object): 46 pass 47 48 49do_not_leak_me = _Object() 50 51 52def abort_unary_unary(request, servicer_context): 53 this_should_not_be_leaked = do_not_leak_me 54 servicer_context.abort( 55 grpc.StatusCode.INTERNAL, 56 _ABORT_DETAILS, 57 ) 58 raise Exception('This line should not be executed!') 59 60 61def abort_with_status_unary_unary(request, servicer_context): 62 servicer_context.abort_with_status( 63 _Status( 64 code=grpc.StatusCode.INTERNAL, 65 details=_ABORT_DETAILS, 66 trailing_metadata=_ABORT_METADATA, 67 )) 68 raise Exception('This line should not be executed!') 69 70 71def invalid_code_unary_unary(request, servicer_context): 72 servicer_context.abort( 73 42, 74 _ABORT_DETAILS, 75 ) 76 77 78class _GenericHandler(grpc.GenericRpcHandler): 79 80 def service(self, handler_call_details): 81 if handler_call_details.method == _ABORT: 82 return grpc.unary_unary_rpc_method_handler(abort_unary_unary) 83 elif handler_call_details.method == _ABORT_WITH_STATUS: 84 return grpc.unary_unary_rpc_method_handler( 85 abort_with_status_unary_unary) 86 elif handler_call_details.method == _INVALID_CODE: 87 return grpc.stream_stream_rpc_method_handler( 88 invalid_code_unary_unary) 89 else: 90 return None 91 92 93class AbortTest(unittest.TestCase): 94 95 def setUp(self): 96 self._server = test_common.test_server() 97 port = self._server.add_insecure_port('[::]:0') 98 self._server.add_generic_rpc_handlers((_GenericHandler(),)) 99 self._server.start() 100 101 self._channel = grpc.insecure_channel('localhost:%d' % port) 102 103 def tearDown(self): 104 self._channel.close() 105 self._server.stop(0) 106 107 def test_abort(self): 108 with self.assertRaises(grpc.RpcError) as exception_context: 109 self._channel.unary_unary(_ABORT)(_REQUEST) 110 rpc_error = exception_context.exception 111 112 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) 113 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 114 115 # This test ensures that abort() does not store the raised exception, which 116 # on Python 3 (via the `__traceback__` attribute) holds a reference to 117 # all local vars. Storing the raised exception can prevent GC and stop the 118 # grpc_call from being unref'ed, even after server shutdown. 119 @unittest.skip("https://github.com/grpc/grpc/issues/17927") 120 def test_abort_does_not_leak_local_vars(self): 121 global do_not_leak_me # pylint: disable=global-statement 122 weak_ref = weakref.ref(do_not_leak_me) 123 124 # Servicer will abort() after creating a local ref to do_not_leak_me. 125 with self.assertRaises(grpc.RpcError): 126 self._channel.unary_unary(_ABORT)(_REQUEST) 127 128 # Server may still have a stack frame reference to the exception even 129 # after client sees error, so ensure server has shutdown. 130 self._server.stop(None) 131 do_not_leak_me = None 132 self.assertIsNone(weak_ref()) 133 134 def test_abort_with_status(self): 135 with self.assertRaises(grpc.RpcError) as exception_context: 136 self._channel.unary_unary(_ABORT_WITH_STATUS)(_REQUEST) 137 rpc_error = exception_context.exception 138 139 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) 140 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 141 self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA) 142 143 def test_invalid_code(self): 144 with self.assertRaises(grpc.RpcError) as exception_context: 145 self._channel.unary_unary(_INVALID_CODE)(_REQUEST) 146 rpc_error = exception_context.exception 147 148 self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN) 149 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 150 151 152if __name__ == '__main__': 153 logging.basicConfig() 154 unittest.main(verbosity=2) 155