• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests server context abort mechanism"""
15
16import collections
17import gc
18import logging
19import unittest
20import weakref
21
22import grpc
23
24from tests.unit import test_common
25from tests.unit.framework.common import test_constants
26
27_SERVICE_NAME = "test"
28_ABORT = "abort"
29_ABORT_WITH_STATUS = "AbortWithStatus"
30_INVALID_CODE = "InvalidCode"
31
32_REQUEST = b"\x00\x00\x00"
33_RESPONSE = b"\x00\x00\x00"
34
35_ABORT_DETAILS = "Abandon ship!"
36_ABORT_METADATA = (("a-trailing-metadata", "42"),)
37
38
39class _Status(
40    collections.namedtuple("_Status", ("code", "details", "trailing_metadata")),
41    grpc.Status,
42):
43    pass
44
45
46class _Object(object):
47    pass
48
49
50do_not_leak_me = _Object()
51
52
53def abort_unary_unary(request, servicer_context):
54    this_should_not_be_leaked = do_not_leak_me
55    servicer_context.abort(
56        grpc.StatusCode.INTERNAL,
57        _ABORT_DETAILS,
58    )
59    raise Exception("This line should not be executed!")
60
61
62def abort_with_status_unary_unary(request, servicer_context):
63    servicer_context.abort_with_status(
64        _Status(
65            code=grpc.StatusCode.INTERNAL,
66            details=_ABORT_DETAILS,
67            trailing_metadata=_ABORT_METADATA,
68        )
69    )
70    raise Exception("This line should not be executed!")
71
72
73def invalid_code_unary_unary(request, servicer_context):
74    servicer_context.abort(
75        42,
76        _ABORT_DETAILS,
77    )
78
79
80RPC_METHOD_HANDLERS = {
81    _ABORT: grpc.unary_unary_rpc_method_handler(abort_unary_unary),
82    _ABORT_WITH_STATUS: grpc.unary_unary_rpc_method_handler(
83        abort_with_status_unary_unary
84    ),
85    _INVALID_CODE: grpc.stream_stream_rpc_method_handler(
86        invalid_code_unary_unary
87    ),
88}
89
90
91class AbortTest(unittest.TestCase):
92    def setUp(self):
93        self._server = test_common.test_server()
94        port = self._server.add_insecure_port("[::]:0")
95        self._server.add_registered_method_handlers(
96            _SERVICE_NAME, RPC_METHOD_HANDLERS
97        )
98        self._server.start()
99
100        self._channel = grpc.insecure_channel("localhost:%d" % port)
101
102    def tearDown(self):
103        self._channel.close()
104        self._server.stop(0)
105
106    def test_abort(self):
107        with self.assertRaises(grpc.RpcError) as exception_context:
108            self._channel.unary_unary(
109                grpc._common.fully_qualified_method(_SERVICE_NAME, _ABORT),
110                _registered_method=True,
111            )(_REQUEST)
112        rpc_error = exception_context.exception
113
114        self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
115        self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
116
117    # This test ensures that abort() does not store the raised exception, which
118    # on Python 3 (via the `__traceback__` attribute) holds a reference to
119    # all local vars. Storing the raised exception can prevent GC and stop the
120    # grpc_call from being unref'ed, even after server shutdown.
121    @unittest.skip("https://github.com/grpc/grpc/issues/17927")
122    def test_abort_does_not_leak_local_vars(self):
123        global do_not_leak_me  # pylint: disable=global-statement
124        weak_ref = weakref.ref(do_not_leak_me)
125
126        # Servicer will abort() after creating a local ref to do_not_leak_me.
127        with self.assertRaises(grpc.RpcError):
128            self._channel.unary_unary(
129                grpc._common.fully_qualified_method(_SERVICE_NAME, _ABORT),
130                _registered_method=True,
131            )(_REQUEST)
132
133        # Server may still have a stack frame reference to the exception even
134        # after client sees error, so ensure server has shutdown.
135        self._server.stop(None)
136        do_not_leak_me = None
137        self.assertIsNone(weak_ref())
138
139    def test_abort_with_status(self):
140        with self.assertRaises(grpc.RpcError) as exception_context:
141            self._channel.unary_unary(
142                grpc._common.fully_qualified_method(
143                    _SERVICE_NAME, _ABORT_WITH_STATUS
144                ),
145                _registered_method=True,
146            )(_REQUEST)
147        rpc_error = exception_context.exception
148
149        self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
150        self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
151        self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA)
152
153    def test_invalid_code(self):
154        with self.assertRaises(grpc.RpcError) as exception_context:
155            self._channel.unary_unary(
156                grpc._common.fully_qualified_method(
157                    _SERVICE_NAME, _INVALID_CODE
158                ),
159                _registered_method=True,
160            )(_REQUEST)
161        rpc_error = exception_context.exception
162
163        self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN)
164        self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
165
166
167if __name__ == "__main__":
168    logging.basicConfig()
169    unittest.main(verbosity=2)
170