1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests server context abort mechanism""" 15 16import collections 17import gc 18import logging 19import unittest 20import weakref 21 22import grpc 23 24from tests.unit import test_common 25from tests.unit.framework.common import test_constants 26 27_SERVICE_NAME = "test" 28_ABORT = "abort" 29_ABORT_WITH_STATUS = "AbortWithStatus" 30_INVALID_CODE = "InvalidCode" 31 32_REQUEST = b"\x00\x00\x00" 33_RESPONSE = b"\x00\x00\x00" 34 35_ABORT_DETAILS = "Abandon ship!" 36_ABORT_METADATA = (("a-trailing-metadata", "42"),) 37 38 39class _Status( 40 collections.namedtuple("_Status", ("code", "details", "trailing_metadata")), 41 grpc.Status, 42): 43 pass 44 45 46class _Object(object): 47 pass 48 49 50do_not_leak_me = _Object() 51 52 53def abort_unary_unary(request, servicer_context): 54 this_should_not_be_leaked = do_not_leak_me 55 servicer_context.abort( 56 grpc.StatusCode.INTERNAL, 57 _ABORT_DETAILS, 58 ) 59 raise Exception("This line should not be executed!") 60 61 62def abort_with_status_unary_unary(request, servicer_context): 63 servicer_context.abort_with_status( 64 _Status( 65 code=grpc.StatusCode.INTERNAL, 66 details=_ABORT_DETAILS, 67 trailing_metadata=_ABORT_METADATA, 68 ) 69 ) 70 raise Exception("This line should not be executed!") 71 72 73def invalid_code_unary_unary(request, servicer_context): 74 servicer_context.abort( 75 42, 76 _ABORT_DETAILS, 77 ) 78 79 80RPC_METHOD_HANDLERS = { 81 _ABORT: grpc.unary_unary_rpc_method_handler(abort_unary_unary), 82 _ABORT_WITH_STATUS: grpc.unary_unary_rpc_method_handler( 83 abort_with_status_unary_unary 84 ), 85 _INVALID_CODE: grpc.stream_stream_rpc_method_handler( 86 invalid_code_unary_unary 87 ), 88} 89 90 91class AbortTest(unittest.TestCase): 92 def setUp(self): 93 self._server = test_common.test_server() 94 port = self._server.add_insecure_port("[::]:0") 95 self._server.add_registered_method_handlers( 96 _SERVICE_NAME, RPC_METHOD_HANDLERS 97 ) 98 self._server.start() 99 100 self._channel = grpc.insecure_channel("localhost:%d" % port) 101 102 def tearDown(self): 103 self._channel.close() 104 self._server.stop(0) 105 106 def test_abort(self): 107 with self.assertRaises(grpc.RpcError) as exception_context: 108 self._channel.unary_unary( 109 grpc._common.fully_qualified_method(_SERVICE_NAME, _ABORT), 110 _registered_method=True, 111 )(_REQUEST) 112 rpc_error = exception_context.exception 113 114 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) 115 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 116 117 # This test ensures that abort() does not store the raised exception, which 118 # on Python 3 (via the `__traceback__` attribute) holds a reference to 119 # all local vars. Storing the raised exception can prevent GC and stop the 120 # grpc_call from being unref'ed, even after server shutdown. 121 @unittest.skip("https://github.com/grpc/grpc/issues/17927") 122 def test_abort_does_not_leak_local_vars(self): 123 global do_not_leak_me # pylint: disable=global-statement 124 weak_ref = weakref.ref(do_not_leak_me) 125 126 # Servicer will abort() after creating a local ref to do_not_leak_me. 127 with self.assertRaises(grpc.RpcError): 128 self._channel.unary_unary( 129 grpc._common.fully_qualified_method(_SERVICE_NAME, _ABORT), 130 _registered_method=True, 131 )(_REQUEST) 132 133 # Server may still have a stack frame reference to the exception even 134 # after client sees error, so ensure server has shutdown. 135 self._server.stop(None) 136 do_not_leak_me = None 137 self.assertIsNone(weak_ref()) 138 139 def test_abort_with_status(self): 140 with self.assertRaises(grpc.RpcError) as exception_context: 141 self._channel.unary_unary( 142 grpc._common.fully_qualified_method( 143 _SERVICE_NAME, _ABORT_WITH_STATUS 144 ), 145 _registered_method=True, 146 )(_REQUEST) 147 rpc_error = exception_context.exception 148 149 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) 150 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 151 self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA) 152 153 def test_invalid_code(self): 154 with self.assertRaises(grpc.RpcError) as exception_context: 155 self._channel.unary_unary( 156 grpc._common.fully_qualified_method( 157 _SERVICE_NAME, _INVALID_CODE 158 ), 159 _registered_method=True, 160 )(_REQUEST) 161 rpc_error = exception_context.exception 162 163 self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN) 164 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 165 166 167if __name__ == "__main__": 168 logging.basicConfig() 169 unittest.main(verbosity=2) 170