1from __future__ import print_function 2 3import gdbremote_testcase 4import select 5import socket 6from lldbsuite.test.decorators import * 7from lldbsuite.test.lldbtest import * 8 9 10class TestGdbRemoteConnection(gdbremote_testcase.GdbRemoteTestCaseBase): 11 12 mydir = TestBase.compute_mydir(__file__) 13 14 @debugserver_test 15 # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet 16 @skipIfDarwinEmbedded 17 def test_reverse_connect_debugserver(self): 18 self.init_debugserver_test() 19 self._reverse_connect() 20 21 @llgs_test 22 @skipIfRemote # reverse connect is not a supported use case for now 23 def test_reverse_connect_llgs(self): 24 self.init_llgs_test() 25 self._reverse_connect() 26 27 def _reverse_connect(self): 28 # Reverse connect is the default connection method. 29 self.connect_to_debug_monitor() 30 # Verify we can do the handshake. If that works, we'll call it good. 31 self.do_handshake(self.sock) 32 33 @debugserver_test 34 @skipIfRemote 35 def test_named_pipe_debugserver(self): 36 self.init_debugserver_test() 37 self._named_pipe() 38 39 @llgs_test 40 @skipIfRemote 41 @skipIfWindows 42 def test_named_pipe_llgs(self): 43 self.init_llgs_test() 44 self._named_pipe() 45 46 def _named_pipe(self): 47 family, type, proto, _, addr = socket.getaddrinfo( 48 self.stub_hostname, 0, proto=socket.IPPROTO_TCP)[0] 49 self.sock = socket.socket(family, type, proto) 50 self.sock.settimeout(self.DEFAULT_TIMEOUT) 51 52 self.addTearDownHook(lambda: self.sock.close()) 53 54 named_pipe_path = self.getBuildArtifact("stub_port_number") 55 56 # Create the named pipe. 57 os.mkfifo(named_pipe_path) 58 59 # Open the read side of the pipe in non-blocking mode. This will 60 # return right away, ready or not. 61 named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK) 62 63 self.addTearDownHook(lambda: os.close(named_pipe_fd)) 64 65 args = self.debug_monitor_extra_args 66 if lldb.remote_platform: 67 args += ["*:0"] 68 else: 69 args += ["localhost:0"] 70 71 args += ["--named-pipe", named_pipe_path] 72 73 server = self.spawnSubprocess( 74 self.debug_monitor_exe, 75 args, 76 install_remote=False) 77 78 (ready_readers, _, _) = select.select( 79 [named_pipe_fd], [], [], self.DEFAULT_TIMEOUT) 80 self.assertIsNotNone( 81 ready_readers, 82 "write side of pipe has not written anything - stub isn't writing to pipe.") 83 port = os.read(named_pipe_fd, 10) 84 # Trim null byte, convert to int 85 addr = (addr[0], int(port[:-1])) 86 self.sock.connect(addr) 87 88 # Verify we can do the handshake. If that works, we'll call it good. 89 self.do_handshake(self.sock) 90