1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for an actual dns resolution.""" 15 16import unittest 17import logging 18import six 19 20import grpc 21from tests.unit import test_common 22from tests.unit.framework.common import test_constants 23 24_METHOD = '/ANY/METHOD' 25_REQUEST = b'\x00\x00\x00' 26_RESPONSE = _REQUEST 27 28 29class GenericHandler(grpc.GenericRpcHandler): 30 31 def service(self, unused_handler_details): 32 return grpc.unary_unary_rpc_method_handler( 33 lambda request, unused_context: request, 34 ) 35 36 37class DNSResolverTest(unittest.TestCase): 38 39 def setUp(self): 40 self._server = test_common.test_server() 41 self._server.add_generic_rpc_handlers((GenericHandler(),)) 42 self._port = self._server.add_insecure_port('[::]:0') 43 self._server.start() 44 45 def tearDown(self): 46 self._server.stop(None) 47 48 def test_connect_loopback(self): 49 # NOTE(https://github.com/grpc/grpc/issues/18422) 50 # In short, Gevent + C-Ares = Segfault. The C-Ares driver is not 51 # supported by custom io manager like "gevent" or "libuv". 52 with grpc.insecure_channel('loopback4.unittest.grpc.io:%d' % 53 self._port) as channel: 54 self.assertEqual( 55 channel.unary_unary(_METHOD)( 56 _REQUEST, 57 timeout=test_constants.SHORT_TIMEOUT, 58 ), _RESPONSE) 59 60 61if __name__ == '__main__': 62 logging.basicConfig() 63 unittest.main(verbosity=2) 64