1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseBase(TestBase): 31 32 NO_DEBUG_INFO_TESTCASE = True 33 34 # Default time out in seconds. The timeout is increased tenfold under Asan. 35 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 36 # Default sleep time in seconds. The sleep time is doubled under Asan. 37 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 38 39 _GDBREMOTE_KILL_PACKET = b"$k#6b" 40 41 # Start the inferior separately, attach to the inferior on the stub 42 # command line. 43 _STARTUP_ATTACH = "attach" 44 # Start the inferior separately, start the stub without attaching, allow 45 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 46 _STARTUP_ATTACH_MANUALLY = "attach_manually" 47 # Start the stub, and launch the inferior with an $A packet via the 48 # initial packet stream. 49 _STARTUP_LAUNCH = "launch" 50 51 # GDB Signal numbers that are not target-specific used for common 52 # exceptions 53 TARGET_EXC_BAD_ACCESS = 0x91 54 TARGET_EXC_BAD_INSTRUCTION = 0x92 55 TARGET_EXC_ARITHMETIC = 0x93 56 TARGET_EXC_EMULATION = 0x94 57 TARGET_EXC_SOFTWARE = 0x95 58 TARGET_EXC_BREAKPOINT = 0x96 59 60 _verbose_log_handler = None 61 _log_formatter = logging.Formatter( 62 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 63 64 def setUpBaseLogging(self): 65 self.logger = logging.getLogger(__name__) 66 67 if len(self.logger.handlers) > 0: 68 return # We have set up this handler already 69 70 self.logger.propagate = False 71 self.logger.setLevel(logging.DEBUG) 72 73 # log all warnings to stderr 74 handler = logging.StreamHandler() 75 handler.setLevel(logging.WARNING) 76 handler.setFormatter(self._log_formatter) 77 self.logger.addHandler(handler) 78 79 def isVerboseLoggingRequested(self): 80 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 81 # logged. 82 return any(("gdb-remote" in channel) 83 for channel in lldbtest_config.channels) 84 85 def setUp(self): 86 TestBase.setUp(self) 87 88 self.setUpBaseLogging() 89 self.debug_monitor_extra_args = [] 90 91 if self.isVerboseLoggingRequested(): 92 # If requested, full logs go to a log file 93 self._verbose_log_handler = logging.FileHandler( 94 self.log_basename + "-host.log") 95 self._verbose_log_handler.setFormatter(self._log_formatter) 96 self._verbose_log_handler.setLevel(logging.DEBUG) 97 self.logger.addHandler(self._verbose_log_handler) 98 99 self.test_sequence = GdbRemoteTestSequence(self.logger) 100 self.set_inferior_startup_launch() 101 self.port = self.get_next_port() 102 self.stub_sends_two_stop_notifications_on_kill = False 103 if configuration.lldb_platform_url: 104 if configuration.lldb_platform_url.startswith('unix-'): 105 url_pattern = '(.+)://\[?(.+?)\]?/.*' 106 else: 107 url_pattern = '(.+)://(.+):\d+' 108 scheme, host = re.match( 109 url_pattern, configuration.lldb_platform_url).groups() 110 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 111 self.stub_device = host 112 self.stub_hostname = 'localhost' 113 else: 114 self.stub_device = None 115 self.stub_hostname = host 116 else: 117 self.stub_hostname = "localhost" 118 119 def tearDown(self): 120 self.logger.removeHandler(self._verbose_log_handler) 121 self._verbose_log_handler = None 122 TestBase.tearDown(self) 123 124 def getLocalServerLogFile(self): 125 return self.log_basename + "-server.log" 126 127 def setUpServerLogging(self, is_llgs): 128 if len(lldbtest_config.channels) == 0: 129 return # No logging requested 130 131 if lldb.remote_platform: 132 log_file = lldbutil.join_remote_paths( 133 lldb.remote_platform.GetWorkingDirectory(), "server.log") 134 else: 135 log_file = self.getLocalServerLogFile() 136 137 if is_llgs: 138 self.debug_monitor_extra_args.append("--log-file=" + log_file) 139 self.debug_monitor_extra_args.append( 140 "--log-channels={}".format(":".join(lldbtest_config.channels))) 141 else: 142 self.debug_monitor_extra_args = [ 143 "--log-file=" + log_file, "--log-flags=0x800000"] 144 145 def get_next_port(self): 146 return 12000 + random.randint(0, 3999) 147 148 def reset_test_sequence(self): 149 self.test_sequence = GdbRemoteTestSequence(self.logger) 150 151 152 def init_llgs_test(self): 153 reverse_connect = True 154 if lldb.remote_platform: 155 # Reverse connections may be tricky due to firewalls/NATs. 156 reverse_connect = False 157 158 triple = self.dbg.GetSelectedPlatform().GetTriple() 159 if re.match(".*-.*-windows", triple): 160 self.skipTest("Remotely testing is not supported on Windows yet.") 161 162 # Grab the ppid from /proc/[shell pid]/stat 163 err, retcode, shell_stat = self.run_platform_command( 164 "cat /proc/$$/stat") 165 self.assertTrue( 166 err.Success() and retcode == 0, 167 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 168 (err.GetCString(), 169 retcode)) 170 171 # [pid] ([executable]) [state] [*ppid*] 172 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 173 err, retcode, ls_output = self.run_platform_command( 174 "ls -l /proc/%s/exe" % pid) 175 self.assertTrue( 176 err.Success() and retcode == 0, 177 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 178 (pid, 179 err.GetCString(), 180 retcode)) 181 exe = ls_output.split()[-1] 182 183 # If the binary has been deleted, the link name has " (deleted)" appended. 184 # Remove if it's there. 185 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 186 else: 187 # TODO: enable this 188 if platform.system() == 'Windows': 189 reverse_connect = False 190 191 self.debug_monitor_exe = get_lldb_server_exe() 192 if not self.debug_monitor_exe: 193 self.skipTest("lldb-server exe not found") 194 195 self.debug_monitor_extra_args = ["gdbserver"] 196 self.setUpServerLogging(is_llgs=True) 197 198 self.reverse_connect = reverse_connect 199 200 def init_debugserver_test(self): 201 self.debug_monitor_exe = get_debugserver_exe() 202 if not self.debug_monitor_exe: 203 self.skipTest("debugserver exe not found") 204 self.setUpServerLogging(is_llgs=False) 205 self.reverse_connect = True 206 207 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 208 # when the process truly dies. 209 self.stub_sends_two_stop_notifications_on_kill = True 210 211 def forward_adb_port(self, source, target, direction, device): 212 adb = ['adb'] + (['-s', device] if device else []) + [direction] 213 214 def remove_port_forward(): 215 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 216 217 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 218 self.addTearDownHook(remove_port_forward) 219 220 def _verify_socket(self, sock): 221 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 222 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 223 # the connect() will always be successful, but the connection will be immediately dropped 224 # if ADB could not connect on the remote side. This function tries to detect this 225 # situation, and report it as "connection refused" so that the upper layers attempt the 226 # connection again. 227 triple = self.dbg.GetSelectedPlatform().GetTriple() 228 if not re.match(".*-.*-.*-android", triple): 229 return # Not android. 230 can_read, _, _ = select.select([sock], [], [], 0.1) 231 if sock not in can_read: 232 return # Data is not available, but the connection is alive. 233 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 234 raise _ConnectionRefused() # Got EOF, connection dropped. 235 236 def create_socket(self): 237 try: 238 sock = socket.socket(family=socket.AF_INET) 239 except OSError as e: 240 if e.errno != errno.EAFNOSUPPORT: 241 raise 242 sock = socket.socket(family=socket.AF_INET6) 243 244 logger = self.logger 245 246 triple = self.dbg.GetSelectedPlatform().GetTriple() 247 if re.match(".*-.*-.*-android", triple): 248 self.forward_adb_port( 249 self.port, 250 self.port, 251 "forward", 252 self.stub_device) 253 254 logger.info( 255 "Connecting to debug monitor on %s:%d", 256 self.stub_hostname, 257 self.port) 258 connect_info = (self.stub_hostname, self.port) 259 try: 260 sock.connect(connect_info) 261 except socket.error as serr: 262 if serr.errno == errno.ECONNREFUSED: 263 raise _ConnectionRefused() 264 raise serr 265 266 def shutdown_socket(): 267 if sock: 268 try: 269 # send the kill packet so lldb-server shuts down gracefully 270 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 271 except: 272 logger.warning( 273 "failed to send kill packet to debug monitor: {}; ignoring".format( 274 sys.exc_info()[0])) 275 276 try: 277 sock.close() 278 except: 279 logger.warning( 280 "failed to close socket to debug monitor: {}; ignoring".format( 281 sys.exc_info()[0])) 282 283 self.addTearDownHook(shutdown_socket) 284 285 self._verify_socket(sock) 286 287 return sock 288 289 def set_inferior_startup_launch(self): 290 self._inferior_startup = self._STARTUP_LAUNCH 291 292 def set_inferior_startup_attach(self): 293 self._inferior_startup = self._STARTUP_ATTACH 294 295 def set_inferior_startup_attach_manually(self): 296 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 297 298 def get_debug_monitor_command_line_args(self, attach_pid=None): 299 commandline_args = self.debug_monitor_extra_args 300 if attach_pid: 301 commandline_args += ["--attach=%d" % attach_pid] 302 if self.reverse_connect: 303 commandline_args += ["--reverse-connect", self.connect_address] 304 else: 305 if lldb.remote_platform: 306 commandline_args += ["*:{}".format(self.port)] 307 else: 308 commandline_args += ["localhost:{}".format(self.port)] 309 310 return commandline_args 311 312 def get_target_byte_order(self): 313 inferior_exe_path = self.getBuildArtifact("a.out") 314 target = self.dbg.CreateTarget(inferior_exe_path) 315 return target.GetByteOrder() 316 317 def launch_debug_monitor(self, attach_pid=None, logfile=None): 318 if self.reverse_connect: 319 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 320 sock = socket.socket(family, type, proto) 321 sock.settimeout(self.DEFAULT_TIMEOUT) 322 323 sock.bind(addr) 324 sock.listen(1) 325 addr = sock.getsockname() 326 self.connect_address = "[{}]:{}".format(*addr) 327 328 329 # Create the command line. 330 commandline_args = self.get_debug_monitor_command_line_args( 331 attach_pid=attach_pid) 332 333 # Start the server. 334 server = self.spawnSubprocess( 335 self.debug_monitor_exe, 336 commandline_args, 337 install_remote=False) 338 self.assertIsNotNone(server) 339 340 if self.reverse_connect: 341 self.sock = sock.accept()[0] 342 self.sock.settimeout(self.DEFAULT_TIMEOUT) 343 344 return server 345 346 def connect_to_debug_monitor(self, attach_pid=None): 347 if self.reverse_connect: 348 # Create the stub. 349 server = self.launch_debug_monitor(attach_pid=attach_pid) 350 self.assertIsNotNone(server) 351 352 # Schedule debug monitor to be shut down during teardown. 353 logger = self.logger 354 355 self._server = Server(self.sock, server) 356 return server 357 358 # We're using a random port algorithm to try not to collide with other ports, 359 # and retry a max # times. 360 attempts = 0 361 MAX_ATTEMPTS = 20 362 363 while attempts < MAX_ATTEMPTS: 364 server = self.launch_debug_monitor(attach_pid=attach_pid) 365 366 # Schedule debug monitor to be shut down during teardown. 367 logger = self.logger 368 369 connect_attemps = 0 370 MAX_CONNECT_ATTEMPTS = 10 371 372 while connect_attemps < MAX_CONNECT_ATTEMPTS: 373 # Create a socket to talk to the server 374 try: 375 logger.info("Connect attempt %d", connect_attemps + 1) 376 self.sock = self.create_socket() 377 self._server = Server(self.sock, server) 378 return server 379 except _ConnectionRefused as serr: 380 # Ignore, and try again. 381 pass 382 time.sleep(0.5) 383 connect_attemps += 1 384 385 # We should close the server here to be safe. 386 server.terminate() 387 388 # Increment attempts. 389 print( 390 "connect to debug monitor on port %d failed, attempt #%d of %d" % 391 (self.port, attempts + 1, MAX_ATTEMPTS)) 392 attempts += 1 393 394 # And wait a random length of time before next attempt, to avoid 395 # collisions. 396 time.sleep(random.randint(1, 5)) 397 398 # Now grab a new port number. 399 self.port = self.get_next_port() 400 401 raise Exception( 402 "failed to create a socket to the launched debug monitor after %d tries" % 403 attempts) 404 405 def launch_process_for_attach( 406 self, 407 inferior_args=None, 408 sleep_seconds=3, 409 exe_path=None): 410 # We're going to start a child process that the debug monitor stub can later attach to. 411 # This process needs to be started so that it just hangs around for a while. We'll 412 # have it sleep. 413 if not exe_path: 414 exe_path = self.getBuildArtifact("a.out") 415 416 args = [] 417 if inferior_args: 418 args.extend(inferior_args) 419 if sleep_seconds: 420 args.append("sleep:%d" % sleep_seconds) 421 422 return self.spawnSubprocess(exe_path, args) 423 424 def prep_debug_monitor_and_inferior( 425 self, 426 inferior_args=None, 427 inferior_sleep_seconds=3, 428 inferior_exe_path=None, 429 inferior_env=None): 430 """Prep the debug monitor, the inferior, and the expected packet stream. 431 432 Handle the separate cases of using the debug monitor in attach-to-inferior mode 433 and in launch-inferior mode. 434 435 For attach-to-inferior mode, the inferior process is first started, then 436 the debug monitor is started in attach to pid mode (using --attach on the 437 stub command line), and the no-ack-mode setup is appended to the packet 438 stream. The packet stream is not yet executed, ready to have more expected 439 packet entries added to it. 440 441 For launch-inferior mode, the stub is first started, then no ack mode is 442 setup on the expected packet stream, then the verified launch packets are added 443 to the expected socket stream. The packet stream is not yet executed, ready 444 to have more expected packet entries added to it. 445 446 The return value is: 447 {inferior:<inferior>, server:<server>} 448 """ 449 inferior = None 450 attach_pid = None 451 452 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 453 # Launch the process that we'll use as the inferior. 454 inferior = self.launch_process_for_attach( 455 inferior_args=inferior_args, 456 sleep_seconds=inferior_sleep_seconds, 457 exe_path=inferior_exe_path) 458 self.assertIsNotNone(inferior) 459 self.assertTrue(inferior.pid > 0) 460 if self._inferior_startup == self._STARTUP_ATTACH: 461 # In this case, we want the stub to attach via the command 462 # line, so set the command line attach pid here. 463 attach_pid = inferior.pid 464 465 if self._inferior_startup == self._STARTUP_LAUNCH: 466 # Build launch args 467 if not inferior_exe_path: 468 inferior_exe_path = self.getBuildArtifact("a.out") 469 470 if lldb.remote_platform: 471 remote_path = lldbutil.append_to_process_working_directory(self, 472 os.path.basename(inferior_exe_path)) 473 remote_file_spec = lldb.SBFileSpec(remote_path, False) 474 err = lldb.remote_platform.Install(lldb.SBFileSpec( 475 inferior_exe_path, True), remote_file_spec) 476 if err.Fail(): 477 raise Exception( 478 "remote_platform.Install('%s', '%s') failed: %s" % 479 (inferior_exe_path, remote_path, err)) 480 inferior_exe_path = remote_path 481 482 launch_args = [inferior_exe_path] 483 if inferior_args: 484 launch_args.extend(inferior_args) 485 486 # Launch the debug monitor stub, attaching to the inferior. 487 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 488 self.assertIsNotNone(server) 489 490 # Build the expected protocol stream 491 self.add_no_ack_remote_stream() 492 if inferior_env: 493 for name, value in inferior_env.items(): 494 self.add_set_environment_packets(name, value) 495 if self._inferior_startup == self._STARTUP_LAUNCH: 496 self.add_verified_launch_packets(launch_args) 497 498 return {"inferior": inferior, "server": server} 499 500 def expect_socket_recv( 501 self, 502 sock, 503 expected_content_regex 504 ): 505 response = "" 506 timeout_time = time.time() + self.DEFAULT_TIMEOUT 507 508 while not expected_content_regex.match( 509 response) and time.time() < timeout_time: 510 can_read, _, _ = select.select([sock], [], [], self.DEFAULT_TIMEOUT) 511 if can_read and sock in can_read: 512 recv_bytes = sock.recv(4096) 513 if recv_bytes: 514 response += seven.bitcast_to_string(recv_bytes) 515 516 self.assertTrue(expected_content_regex.match(response)) 517 518 def expect_socket_send(self, sock, content): 519 request_bytes_remaining = content 520 timeout_time = time.time() + self.DEFAULT_TIMEOUT 521 522 while len(request_bytes_remaining) > 0 and time.time() < timeout_time: 523 _, can_write, _ = select.select([], [sock], [], self.DEFAULT_TIMEOUT) 524 if can_write and sock in can_write: 525 written_byte_count = sock.send(request_bytes_remaining.encode()) 526 request_bytes_remaining = request_bytes_remaining[ 527 written_byte_count:] 528 self.assertEqual(len(request_bytes_remaining), 0) 529 530 def do_handshake(self, stub_socket): 531 # Write the ack. 532 self.expect_socket_send(stub_socket, "+") 533 534 # Send the start no ack mode packet. 535 NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0" 536 bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST.encode()) 537 self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST)) 538 539 # Receive the ack and "OK" 540 self.expect_socket_recv(stub_socket, re.compile( 541 r"^\+\$OK#[0-9a-fA-F]{2}$")) 542 543 # Send the final ack. 544 self.expect_socket_send(stub_socket, "+") 545 546 def add_no_ack_remote_stream(self): 547 self.test_sequence.add_log_lines( 548 ["read packet: +", 549 "read packet: $QStartNoAckMode#b0", 550 "send packet: +", 551 "send packet: $OK#9a", 552 "read packet: +"], 553 True) 554 555 def add_verified_launch_packets(self, launch_args): 556 self.test_sequence.add_log_lines( 557 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 558 "send packet: $OK#00", 559 "read packet: $qLaunchSuccess#a5", 560 "send packet: $OK#00"], 561 True) 562 563 def add_thread_suffix_request_packets(self): 564 self.test_sequence.add_log_lines( 565 ["read packet: $QThreadSuffixSupported#e4", 566 "send packet: $OK#00", 567 ], True) 568 569 def add_process_info_collection_packets(self): 570 self.test_sequence.add_log_lines( 571 ["read packet: $qProcessInfo#dc", 572 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 573 True) 574 575 def add_set_environment_packets(self, name, value): 576 self.test_sequence.add_log_lines( 577 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 578 "send packet: $OK#00", 579 ], True) 580 581 _KNOWN_PROCESS_INFO_KEYS = [ 582 "pid", 583 "parent-pid", 584 "real-uid", 585 "real-gid", 586 "effective-uid", 587 "effective-gid", 588 "cputype", 589 "cpusubtype", 590 "ostype", 591 "triple", 592 "vendor", 593 "endian", 594 "elf_abi", 595 "ptrsize" 596 ] 597 598 def parse_process_info_response(self, context): 599 # Ensure we have a process info response. 600 self.assertIsNotNone(context) 601 process_info_raw = context.get("process_info_raw") 602 self.assertIsNotNone(process_info_raw) 603 604 # Pull out key:value; pairs. 605 process_info_dict = { 606 match.group(1): match.group(2) for match in re.finditer( 607 r"([^:]+):([^;]+);", process_info_raw)} 608 609 # Validate keys are known. 610 for (key, val) in list(process_info_dict.items()): 611 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 612 self.assertIsNotNone(val) 613 614 return process_info_dict 615 616 def add_register_info_collection_packets(self): 617 self.test_sequence.add_log_lines( 618 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 619 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 620 "save_key": "reg_info_responses"}], 621 True) 622 623 def parse_register_info_packets(self, context): 624 """Return an array of register info dictionaries, one per register info.""" 625 reg_info_responses = context.get("reg_info_responses") 626 self.assertIsNotNone(reg_info_responses) 627 628 # Parse register infos. 629 return [parse_reg_info_response(reg_info_response) 630 for reg_info_response in reg_info_responses] 631 632 def expect_gdbremote_sequence(self): 633 return expect_lldb_gdbserver_replay( 634 self, 635 self._server, 636 self.test_sequence, 637 self.DEFAULT_TIMEOUT * len(self.test_sequence), 638 self.logger) 639 640 _KNOWN_REGINFO_KEYS = [ 641 "name", 642 "alt-name", 643 "bitsize", 644 "offset", 645 "encoding", 646 "format", 647 "set", 648 "gcc", 649 "ehframe", 650 "dwarf", 651 "generic", 652 "container-regs", 653 "invalidate-regs", 654 "dynamic_size_dwarf_expr_bytes", 655 "dynamic_size_dwarf_len" 656 ] 657 658 def assert_valid_reg_info(self, reg_info): 659 # Assert we know about all the reginfo keys parsed. 660 for key in reg_info: 661 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 662 663 # Check the bare-minimum expected set of register info keys. 664 self.assertTrue("name" in reg_info) 665 self.assertTrue("bitsize" in reg_info) 666 667 if not self.getArchitecture() == 'aarch64': 668 self.assertTrue("offset" in reg_info) 669 670 self.assertTrue("encoding" in reg_info) 671 self.assertTrue("format" in reg_info) 672 673 def find_pc_reg_info(self, reg_infos): 674 lldb_reg_index = 0 675 for reg_info in reg_infos: 676 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 677 return (lldb_reg_index, reg_info) 678 lldb_reg_index += 1 679 680 return (None, None) 681 682 def add_lldb_register_index(self, reg_infos): 683 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 684 685 We'll use this when we want to call packets like P/p with a register index but do so 686 on only a subset of the full register info set. 687 """ 688 self.assertIsNotNone(reg_infos) 689 690 reg_index = 0 691 for reg_info in reg_infos: 692 reg_info["lldb_register_index"] = reg_index 693 reg_index += 1 694 695 def add_query_memory_region_packets(self, address): 696 self.test_sequence.add_log_lines( 697 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 698 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 699 True) 700 701 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 702 self.assertIsNotNone(key_val_text) 703 kv_dict = {} 704 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 705 key = match.group(1) 706 val = match.group(2) 707 if key in kv_dict: 708 if allow_dupes: 709 if isinstance(kv_dict[key], list): 710 kv_dict[key].append(val) 711 else: 712 # Promote to list 713 kv_dict[key] = [kv_dict[key], val] 714 else: 715 self.fail( 716 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 717 key, val, key_val_text, kv_dict)) 718 else: 719 kv_dict[key] = val 720 return kv_dict 721 722 def parse_memory_region_packet(self, context): 723 # Ensure we have a context. 724 self.assertIsNotNone(context.get("memory_region_response")) 725 726 # Pull out key:value; pairs. 727 mem_region_dict = self.parse_key_val_dict( 728 context.get("memory_region_response")) 729 730 # Validate keys are known. 731 for (key, val) in list(mem_region_dict.items()): 732 self.assertIn(key, 733 ["start", 734 "size", 735 "permissions", 736 "flags", 737 "name", 738 "error"]) 739 self.assertIsNotNone(val) 740 741 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 742 # Return the dictionary of key-value pairs for the memory region. 743 return mem_region_dict 744 745 def assert_address_within_memory_region( 746 self, test_address, mem_region_dict): 747 self.assertIsNotNone(mem_region_dict) 748 self.assertTrue("start" in mem_region_dict) 749 self.assertTrue("size" in mem_region_dict) 750 751 range_start = int(mem_region_dict["start"], 16) 752 range_size = int(mem_region_dict["size"], 16) 753 range_end = range_start + range_size 754 755 if test_address < range_start: 756 self.fail( 757 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 758 test_address, 759 range_start, 760 range_end, 761 range_size)) 762 elif test_address >= range_end: 763 self.fail( 764 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 765 test_address, 766 range_start, 767 range_end, 768 range_size)) 769 770 def add_threadinfo_collection_packets(self): 771 self.test_sequence.add_log_lines( 772 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 773 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 774 "save_key": "threadinfo_responses"}], 775 True) 776 777 def parse_threadinfo_packets(self, context): 778 """Return an array of thread ids (decimal ints), one per thread.""" 779 threadinfo_responses = context.get("threadinfo_responses") 780 self.assertIsNotNone(threadinfo_responses) 781 782 thread_ids = [] 783 for threadinfo_response in threadinfo_responses: 784 new_thread_infos = parse_threadinfo_response(threadinfo_response) 785 thread_ids.extend(new_thread_infos) 786 return thread_ids 787 788 def wait_for_thread_count(self, thread_count): 789 start_time = time.time() 790 timeout_time = start_time + self.DEFAULT_TIMEOUT 791 792 actual_thread_count = 0 793 while actual_thread_count < thread_count: 794 self.reset_test_sequence() 795 self.add_threadinfo_collection_packets() 796 797 context = self.expect_gdbremote_sequence() 798 self.assertIsNotNone(context) 799 800 threads = self.parse_threadinfo_packets(context) 801 self.assertIsNotNone(threads) 802 803 actual_thread_count = len(threads) 804 805 if time.time() > timeout_time: 806 raise Exception( 807 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format( 808 self.DEFAULT_TIMEOUT, thread_count, actual_thread_count)) 809 810 return threads 811 812 def add_set_breakpoint_packets( 813 self, 814 address, 815 z_packet_type=0, 816 do_continue=True, 817 breakpoint_kind=1): 818 self.test_sequence.add_log_lines( 819 [ # Set the breakpoint. 820 "read packet: $Z{2},{0:x},{1}#00".format( 821 address, breakpoint_kind, z_packet_type), 822 # Verify the stub could set it. 823 "send packet: $OK#00", 824 ], True) 825 826 if (do_continue): 827 self.test_sequence.add_log_lines( 828 [ # Continue the inferior. 829 "read packet: $c#63", 830 # Expect a breakpoint stop report. 831 {"direction": "send", 832 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 833 "capture": {1: "stop_signo", 834 2: "stop_thread_id"}}, 835 ], True) 836 837 def add_remove_breakpoint_packets( 838 self, 839 address, 840 z_packet_type=0, 841 breakpoint_kind=1): 842 self.test_sequence.add_log_lines( 843 [ # Remove the breakpoint. 844 "read packet: $z{2},{0:x},{1}#00".format( 845 address, breakpoint_kind, z_packet_type), 846 # Verify the stub could unset it. 847 "send packet: $OK#00", 848 ], True) 849 850 def add_qSupported_packets(self): 851 self.test_sequence.add_log_lines( 852 ["read packet: $qSupported#00", 853 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 854 ], True) 855 856 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 857 "augmented-libraries-svr4-read", 858 "PacketSize", 859 "QStartNoAckMode", 860 "QThreadSuffixSupported", 861 "QListThreadsInStopReply", 862 "qXfer:auxv:read", 863 "qXfer:libraries:read", 864 "qXfer:libraries-svr4:read", 865 "qXfer:features:read", 866 "qEcho", 867 "QPassSignals" 868 ] 869 870 def parse_qSupported_response(self, context): 871 self.assertIsNotNone(context) 872 873 raw_response = context.get("qSupported_response") 874 self.assertIsNotNone(raw_response) 875 876 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 877 # +,-,? is stripped from the key and set as the value. 878 supported_dict = {} 879 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 880 key = match.group(1) 881 val = match.group(3) 882 883 # key=val: store as is 884 if val and len(val) > 0: 885 supported_dict[key] = val 886 else: 887 if len(key) < 2: 888 raise Exception( 889 "singular stub feature is too short: must be stub_feature{+,-,?}") 890 supported_type = key[-1] 891 key = key[:-1] 892 if not supported_type in ["+", "-", "?"]: 893 raise Exception( 894 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 895 supported_dict[key] = supported_type 896 # Ensure we know the supported element 897 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 898 raise Exception( 899 "unknown qSupported stub feature reported: %s" % 900 key) 901 902 return supported_dict 903 904 def run_process_then_stop(self, run_seconds=1): 905 # Tell the stub to continue. 906 self.test_sequence.add_log_lines( 907 ["read packet: $vCont;c#a8"], 908 True) 909 context = self.expect_gdbremote_sequence() 910 911 # Wait for run_seconds. 912 time.sleep(run_seconds) 913 914 # Send an interrupt, capture a T response. 915 self.reset_test_sequence() 916 self.test_sequence.add_log_lines( 917 ["read packet: {}".format(chr(3)), 918 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], 919 True) 920 context = self.expect_gdbremote_sequence() 921 self.assertIsNotNone(context) 922 self.assertIsNotNone(context.get("stop_result")) 923 924 return context 925 926 def continue_process_and_wait_for_stop(self): 927 self.test_sequence.add_log_lines( 928 [ 929 "read packet: $vCont;c#a8", 930 { 931 "direction": "send", 932 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 933 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 934 }, 935 ], 936 True, 937 ) 938 context = self.expect_gdbremote_sequence() 939 self.assertIsNotNone(context) 940 return self.parse_interrupt_packets(context) 941 942 def select_modifiable_register(self, reg_infos): 943 """Find a register that can be read/written freely.""" 944 PREFERRED_REGISTER_NAMES = set(["rax", ]) 945 946 # First check for the first register from the preferred register name 947 # set. 948 alternative_register_index = None 949 950 self.assertIsNotNone(reg_infos) 951 for reg_info in reg_infos: 952 if ("name" in reg_info) and ( 953 reg_info["name"] in PREFERRED_REGISTER_NAMES): 954 # We found a preferred register. Use it. 955 return reg_info["lldb_register_index"] 956 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 957 reg_info["generic"] == "arg1"): 958 # A frame pointer or first arg register will do as a 959 # register to modify temporarily. 960 alternative_register_index = reg_info["lldb_register_index"] 961 962 # We didn't find a preferred register. Return whatever alternative register 963 # we found, if any. 964 return alternative_register_index 965 966 def extract_registers_from_stop_notification(self, stop_key_vals_text): 967 self.assertIsNotNone(stop_key_vals_text) 968 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 969 970 registers = {} 971 for (key, val) in list(kv_dict.items()): 972 if re.match(r"^[0-9a-fA-F]+$", key): 973 registers[int(key, 16)] = val 974 return registers 975 976 def gather_register_infos(self): 977 self.reset_test_sequence() 978 self.add_register_info_collection_packets() 979 980 context = self.expect_gdbremote_sequence() 981 self.assertIsNotNone(context) 982 983 reg_infos = self.parse_register_info_packets(context) 984 self.assertIsNotNone(reg_infos) 985 self.add_lldb_register_index(reg_infos) 986 987 return reg_infos 988 989 def find_generic_register_with_name(self, reg_infos, generic_name): 990 self.assertIsNotNone(reg_infos) 991 for reg_info in reg_infos: 992 if ("generic" in reg_info) and ( 993 reg_info["generic"] == generic_name): 994 return reg_info 995 return None 996 997 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 998 self.assertIsNotNone(reg_infos) 999 for reg_info in reg_infos: 1000 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 1001 return reg_info 1002 return None 1003 1004 def decode_gdbremote_binary(self, encoded_bytes): 1005 decoded_bytes = "" 1006 i = 0 1007 while i < len(encoded_bytes): 1008 if encoded_bytes[i] == "}": 1009 # Handle escaped char. 1010 self.assertTrue(i + 1 < len(encoded_bytes)) 1011 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1012 i += 2 1013 elif encoded_bytes[i] == "*": 1014 # Handle run length encoding. 1015 self.assertTrue(len(decoded_bytes) > 0) 1016 self.assertTrue(i + 1 < len(encoded_bytes)) 1017 repeat_count = ord(encoded_bytes[i + 1]) - 29 1018 decoded_bytes += decoded_bytes[-1] * repeat_count 1019 i += 2 1020 else: 1021 decoded_bytes += encoded_bytes[i] 1022 i += 1 1023 return decoded_bytes 1024 1025 def build_auxv_dict(self, endian, word_size, auxv_data): 1026 self.assertIsNotNone(endian) 1027 self.assertIsNotNone(word_size) 1028 self.assertIsNotNone(auxv_data) 1029 1030 auxv_dict = {} 1031 1032 # PowerPC64le's auxvec has a special key that must be ignored. 1033 # This special key may be used multiple times, resulting in 1034 # multiple key/value pairs with the same key, which would otherwise 1035 # break this test check for repeated keys. 1036 # 1037 # AT_IGNOREPPC = 22 1038 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1039 arch = self.getArchitecture() 1040 ignore_keys = None 1041 if arch in ignored_keys_for_arch: 1042 ignore_keys = ignored_keys_for_arch[arch] 1043 1044 while len(auxv_data) > 0: 1045 # Chop off key. 1046 raw_key = auxv_data[:word_size] 1047 auxv_data = auxv_data[word_size:] 1048 1049 # Chop of value. 1050 raw_value = auxv_data[:word_size] 1051 auxv_data = auxv_data[word_size:] 1052 1053 # Convert raw text from target endian. 1054 key = unpack_endian_binary_string(endian, raw_key) 1055 value = unpack_endian_binary_string(endian, raw_value) 1056 1057 if ignore_keys and key in ignore_keys: 1058 continue 1059 1060 # Handle ending entry. 1061 if key == 0: 1062 self.assertEqual(value, 0) 1063 return auxv_dict 1064 1065 # The key should not already be present. 1066 self.assertFalse(key in auxv_dict) 1067 auxv_dict[key] = value 1068 1069 self.fail( 1070 "should not reach here - implies required double zero entry not found") 1071 return auxv_dict 1072 1073 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1074 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1075 offset = 0 1076 done = False 1077 decoded_data = "" 1078 1079 while not done: 1080 # Grab the next iteration of data. 1081 self.reset_test_sequence() 1082 self.test_sequence.add_log_lines( 1083 [ 1084 "read packet: ${}{:x},{:x}:#00".format( 1085 command_prefix, 1086 offset, 1087 chunk_length), 1088 { 1089 "direction": "send", 1090 "regex": re.compile( 1091 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1092 re.MULTILINE | re.DOTALL), 1093 "capture": { 1094 1: "response_type", 1095 2: "content_raw"}}], 1096 True) 1097 1098 context = self.expect_gdbremote_sequence() 1099 self.assertIsNotNone(context) 1100 1101 response_type = context.get("response_type") 1102 self.assertIsNotNone(response_type) 1103 self.assertTrue(response_type in ["l", "m"]) 1104 1105 # Move offset along. 1106 offset += chunk_length 1107 1108 # Figure out if we're done. We're done if the response type is l. 1109 done = response_type == "l" 1110 1111 # Decode binary data. 1112 content_raw = context.get("content_raw") 1113 if content_raw and len(content_raw) > 0: 1114 self.assertIsNotNone(content_raw) 1115 decoded_data += self.decode_gdbremote_binary(content_raw) 1116 return decoded_data 1117 1118 def add_interrupt_packets(self): 1119 self.test_sequence.add_log_lines([ 1120 # Send the intterupt. 1121 "read packet: {}".format(chr(3)), 1122 # And wait for the stop notification. 1123 {"direction": "send", 1124 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1125 "capture": {1: "stop_signo", 1126 2: "stop_key_val_text"}}, 1127 ], True) 1128 1129 def parse_interrupt_packets(self, context): 1130 self.assertIsNotNone(context.get("stop_signo")) 1131 self.assertIsNotNone(context.get("stop_key_val_text")) 1132 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1133 context["stop_key_val_text"])) 1134 1135 def add_QSaveRegisterState_packets(self, thread_id): 1136 if thread_id: 1137 # Use the thread suffix form. 1138 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1139 thread_id) 1140 else: 1141 request = "read packet: $QSaveRegisterState#00" 1142 1143 self.test_sequence.add_log_lines([request, 1144 {"direction": "send", 1145 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1146 "capture": {1: "save_response"}}, 1147 ], 1148 True) 1149 1150 def parse_QSaveRegisterState_response(self, context): 1151 self.assertIsNotNone(context) 1152 1153 save_response = context.get("save_response") 1154 self.assertIsNotNone(save_response) 1155 1156 if len(save_response) < 1 or save_response[0] == "E": 1157 # error received 1158 return (False, None) 1159 else: 1160 return (True, int(save_response)) 1161 1162 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1163 if thread_id: 1164 # Use the thread suffix form. 1165 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1166 save_id, thread_id) 1167 else: 1168 request = "read packet: $QRestoreRegisterState:{}#00".format( 1169 save_id) 1170 1171 self.test_sequence.add_log_lines([ 1172 request, 1173 "send packet: $OK#00" 1174 ], True) 1175 1176 def flip_all_bits_in_each_register_value( 1177 self, reg_infos, endian, thread_id=None): 1178 self.assertIsNotNone(reg_infos) 1179 1180 successful_writes = 0 1181 failed_writes = 0 1182 1183 for reg_info in reg_infos: 1184 # Use the lldb register index added to the reg info. We're not necessarily 1185 # working off a full set of register infos, so an inferred register 1186 # index could be wrong. 1187 reg_index = reg_info["lldb_register_index"] 1188 self.assertIsNotNone(reg_index) 1189 1190 reg_byte_size = int(reg_info["bitsize"]) // 8 1191 self.assertTrue(reg_byte_size > 0) 1192 1193 # Handle thread suffix. 1194 if thread_id: 1195 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1196 reg_index, thread_id) 1197 else: 1198 p_request = "read packet: $p{:x}#00".format(reg_index) 1199 1200 # Read the existing value. 1201 self.reset_test_sequence() 1202 self.test_sequence.add_log_lines([ 1203 p_request, 1204 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1205 ], True) 1206 context = self.expect_gdbremote_sequence() 1207 self.assertIsNotNone(context) 1208 1209 # Verify the response length. 1210 p_response = context.get("p_response") 1211 self.assertIsNotNone(p_response) 1212 initial_reg_value = unpack_register_hex_unsigned( 1213 endian, p_response) 1214 1215 # Flip the value by xoring with all 1s 1216 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1217 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1218 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1219 1220 # Handle thread suffix for P. 1221 if thread_id: 1222 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1223 reg_index, pack_register_hex( 1224 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1225 else: 1226 P_request = "read packet: $P{:x}={}#00".format( 1227 reg_index, pack_register_hex( 1228 endian, flipped_bits_int, byte_size=reg_byte_size)) 1229 1230 # Write the flipped value to the register. 1231 self.reset_test_sequence() 1232 self.test_sequence.add_log_lines([P_request, 1233 {"direction": "send", 1234 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1235 "capture": {1: "P_response"}}, 1236 ], 1237 True) 1238 context = self.expect_gdbremote_sequence() 1239 self.assertIsNotNone(context) 1240 1241 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1242 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1243 # all flipping perfectly. 1244 P_response = context.get("P_response") 1245 self.assertIsNotNone(P_response) 1246 if P_response == "OK": 1247 successful_writes += 1 1248 else: 1249 failed_writes += 1 1250 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1251 1252 # Read back the register value, ensure it matches the flipped 1253 # value. 1254 if P_response == "OK": 1255 self.reset_test_sequence() 1256 self.test_sequence.add_log_lines([ 1257 p_request, 1258 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1259 ], True) 1260 context = self.expect_gdbremote_sequence() 1261 self.assertIsNotNone(context) 1262 1263 verify_p_response_raw = context.get("p_response") 1264 self.assertIsNotNone(verify_p_response_raw) 1265 verify_bits = unpack_register_hex_unsigned( 1266 endian, verify_p_response_raw) 1267 1268 if verify_bits != flipped_bits_int: 1269 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1270 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1271 successful_writes -= 1 1272 failed_writes += 1 1273 1274 return (successful_writes, failed_writes) 1275 1276 def is_bit_flippable_register(self, reg_info): 1277 if not reg_info: 1278 return False 1279 if not "set" in reg_info: 1280 return False 1281 if reg_info["set"] != "General Purpose Registers": 1282 return False 1283 if ("container-regs" in reg_info) and ( 1284 len(reg_info["container-regs"]) > 0): 1285 # Don't try to bit flip registers contained in another register. 1286 return False 1287 if re.match("^.s$", reg_info["name"]): 1288 # This is a 2-letter register name that ends in "s", like a segment register. 1289 # Don't try to bit flip these. 1290 return False 1291 if re.match("^(c|)psr$", reg_info["name"]): 1292 # This is an ARM program status register; don't flip it. 1293 return False 1294 # Okay, this looks fine-enough. 1295 return True 1296 1297 def read_register_values(self, reg_infos, endian, thread_id=None): 1298 self.assertIsNotNone(reg_infos) 1299 values = {} 1300 1301 for reg_info in reg_infos: 1302 # We append a register index when load reg infos so we can work 1303 # with subsets. 1304 reg_index = reg_info.get("lldb_register_index") 1305 self.assertIsNotNone(reg_index) 1306 1307 # Handle thread suffix. 1308 if thread_id: 1309 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1310 reg_index, thread_id) 1311 else: 1312 p_request = "read packet: $p{:x}#00".format(reg_index) 1313 1314 # Read it with p. 1315 self.reset_test_sequence() 1316 self.test_sequence.add_log_lines([ 1317 p_request, 1318 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1319 ], True) 1320 context = self.expect_gdbremote_sequence() 1321 self.assertIsNotNone(context) 1322 1323 # Convert value from target endian to integral. 1324 p_response = context.get("p_response") 1325 self.assertIsNotNone(p_response) 1326 self.assertTrue(len(p_response) > 0) 1327 self.assertFalse(p_response[0] == "E") 1328 1329 values[reg_index] = unpack_register_hex_unsigned( 1330 endian, p_response) 1331 1332 return values 1333 1334 def add_vCont_query_packets(self): 1335 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1336 {"direction": "send", 1337 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1338 "capture": {2: "vCont_query_response"}}, 1339 ], 1340 True) 1341 1342 def parse_vCont_query_response(self, context): 1343 self.assertIsNotNone(context) 1344 vCont_query_response = context.get("vCont_query_response") 1345 1346 # Handle case of no vCont support at all - in which case the capture 1347 # group will be none or zero length. 1348 if not vCont_query_response or len(vCont_query_response) == 0: 1349 return {} 1350 1351 return {key: 1 for key in vCont_query_response.split( 1352 ";") if key and len(key) > 0} 1353 1354 def count_single_steps_until_true( 1355 self, 1356 thread_id, 1357 predicate, 1358 args, 1359 max_step_count=100, 1360 use_Hc_packet=True, 1361 step_instruction="s"): 1362 """Used by single step test that appears in a few different contexts.""" 1363 single_step_count = 0 1364 1365 while single_step_count < max_step_count: 1366 self.assertIsNotNone(thread_id) 1367 1368 # Build the packet for the single step instruction. We replace 1369 # {thread}, if present, with the thread_id. 1370 step_packet = "read packet: ${}#00".format( 1371 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1372 # print("\nstep_packet created: {}\n".format(step_packet)) 1373 1374 # Single step. 1375 self.reset_test_sequence() 1376 if use_Hc_packet: 1377 self.test_sequence.add_log_lines( 1378 [ # Set the continue thread. 1379 "read packet: $Hc{0:x}#00".format(thread_id), 1380 "send packet: $OK#00", 1381 ], True) 1382 self.test_sequence.add_log_lines([ 1383 # Single step. 1384 step_packet, 1385 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1386 # Expect a breakpoint stop report. 1387 {"direction": "send", 1388 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1389 "capture": {1: "stop_signo", 1390 2: "stop_thread_id"}}, 1391 ], True) 1392 context = self.expect_gdbremote_sequence() 1393 self.assertIsNotNone(context) 1394 self.assertIsNotNone(context.get("stop_signo")) 1395 self.assertEqual(int(context.get("stop_signo"), 16), 1396 lldbutil.get_signal_number('SIGTRAP')) 1397 1398 single_step_count += 1 1399 1400 # See if the predicate is true. If so, we're done. 1401 if predicate(args): 1402 return (True, single_step_count) 1403 1404 # The predicate didn't return true within the runaway step count. 1405 return (False, single_step_count) 1406 1407 def g_c1_c2_contents_are(self, args): 1408 """Used by single step test that appears in a few different contexts.""" 1409 g_c1_address = args["g_c1_address"] 1410 g_c2_address = args["g_c2_address"] 1411 expected_g_c1 = args["expected_g_c1"] 1412 expected_g_c2 = args["expected_g_c2"] 1413 1414 # Read g_c1 and g_c2 contents. 1415 self.reset_test_sequence() 1416 self.test_sequence.add_log_lines( 1417 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1418 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1419 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1420 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1421 True) 1422 1423 # Run the packet stream. 1424 context = self.expect_gdbremote_sequence() 1425 self.assertIsNotNone(context) 1426 1427 # Check if what we read from inferior memory is what we are expecting. 1428 self.assertIsNotNone(context.get("g_c1_contents")) 1429 self.assertIsNotNone(context.get("g_c2_contents")) 1430 1431 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1432 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1433 1434 def single_step_only_steps_one_instruction( 1435 self, use_Hc_packet=True, step_instruction="s"): 1436 """Used by single step test that appears in a few different contexts.""" 1437 # Start up the inferior. 1438 procs = self.prep_debug_monitor_and_inferior( 1439 inferior_args=[ 1440 "get-code-address-hex:swap_chars", 1441 "get-data-address-hex:g_c1", 1442 "get-data-address-hex:g_c2", 1443 "sleep:1", 1444 "call-function:swap_chars", 1445 "sleep:5"]) 1446 1447 # Run the process 1448 self.test_sequence.add_log_lines( 1449 [ # Start running after initial stop. 1450 "read packet: $c#63", 1451 # Match output line that prints the memory address of the function call entry point. 1452 # Note we require launch-only testing so we can get inferior otuput. 1453 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1454 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1455 # Now stop the inferior. 1456 "read packet: {}".format(chr(3)), 1457 # And wait for the stop notification. 1458 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1459 True) 1460 1461 # Run the packet stream. 1462 context = self.expect_gdbremote_sequence() 1463 self.assertIsNotNone(context) 1464 1465 # Grab the main thread id. 1466 self.assertIsNotNone(context.get("stop_thread_id")) 1467 main_thread_id = int(context.get("stop_thread_id"), 16) 1468 1469 # Grab the function address. 1470 self.assertIsNotNone(context.get("function_address")) 1471 function_address = int(context.get("function_address"), 16) 1472 1473 # Grab the data addresses. 1474 self.assertIsNotNone(context.get("g_c1_address")) 1475 g_c1_address = int(context.get("g_c1_address"), 16) 1476 1477 self.assertIsNotNone(context.get("g_c2_address")) 1478 g_c2_address = int(context.get("g_c2_address"), 16) 1479 1480 # Set a breakpoint at the given address. 1481 if self.getArchitecture().startswith("arm"): 1482 # TODO: Handle case when setting breakpoint in thumb code 1483 BREAKPOINT_KIND = 4 1484 else: 1485 BREAKPOINT_KIND = 1 1486 self.reset_test_sequence() 1487 self.add_set_breakpoint_packets( 1488 function_address, 1489 do_continue=True, 1490 breakpoint_kind=BREAKPOINT_KIND) 1491 context = self.expect_gdbremote_sequence() 1492 self.assertIsNotNone(context) 1493 1494 # Remove the breakpoint. 1495 self.reset_test_sequence() 1496 self.add_remove_breakpoint_packets( 1497 function_address, breakpoint_kind=BREAKPOINT_KIND) 1498 context = self.expect_gdbremote_sequence() 1499 self.assertIsNotNone(context) 1500 1501 # Verify g_c1 and g_c2 match expected initial state. 1502 args = {} 1503 args["g_c1_address"] = g_c1_address 1504 args["g_c2_address"] = g_c2_address 1505 args["expected_g_c1"] = "0" 1506 args["expected_g_c2"] = "1" 1507 1508 self.assertTrue(self.g_c1_c2_contents_are(args)) 1509 1510 # Verify we take only a small number of steps to hit the first state. 1511 # Might need to work through function entry prologue code. 1512 args["expected_g_c1"] = "1" 1513 args["expected_g_c2"] = "1" 1514 (state_reached, 1515 step_count) = self.count_single_steps_until_true(main_thread_id, 1516 self.g_c1_c2_contents_are, 1517 args, 1518 max_step_count=25, 1519 use_Hc_packet=use_Hc_packet, 1520 step_instruction=step_instruction) 1521 self.assertTrue(state_reached) 1522 1523 # Verify we hit the next state. 1524 args["expected_g_c1"] = "1" 1525 args["expected_g_c2"] = "0" 1526 (state_reached, 1527 step_count) = self.count_single_steps_until_true(main_thread_id, 1528 self.g_c1_c2_contents_are, 1529 args, 1530 max_step_count=5, 1531 use_Hc_packet=use_Hc_packet, 1532 step_instruction=step_instruction) 1533 self.assertTrue(state_reached) 1534 expected_step_count = 1 1535 arch = self.getArchitecture() 1536 1537 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1538 # of variable value 1539 if re.match("mips", arch): 1540 expected_step_count = 3 1541 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1542 # variable value 1543 if re.match("s390x", arch): 1544 expected_step_count = 2 1545 # ARM64 requires "4" instructions: 2 to compute the address (adrp, add), 1546 # one to materialize the constant (mov) and the store 1547 if re.match("arm64", arch): 1548 expected_step_count = 4 1549 1550 self.assertEqual(step_count, expected_step_count) 1551 1552 # ARM64: Once addresses and constants are materialized, only one 1553 # instruction is needed. 1554 if re.match("arm64", arch): 1555 expected_step_count = 1 1556 1557 # Verify we hit the next state. 1558 args["expected_g_c1"] = "0" 1559 args["expected_g_c2"] = "0" 1560 (state_reached, 1561 step_count) = self.count_single_steps_until_true(main_thread_id, 1562 self.g_c1_c2_contents_are, 1563 args, 1564 max_step_count=5, 1565 use_Hc_packet=use_Hc_packet, 1566 step_instruction=step_instruction) 1567 self.assertTrue(state_reached) 1568 self.assertEqual(step_count, expected_step_count) 1569 1570 # Verify we hit the next state. 1571 args["expected_g_c1"] = "0" 1572 args["expected_g_c2"] = "1" 1573 (state_reached, 1574 step_count) = self.count_single_steps_until_true(main_thread_id, 1575 self.g_c1_c2_contents_are, 1576 args, 1577 max_step_count=5, 1578 use_Hc_packet=use_Hc_packet, 1579 step_instruction=step_instruction) 1580 self.assertTrue(state_reached) 1581 self.assertEqual(step_count, expected_step_count) 1582 1583 def maybe_strict_output_regex(self, regex): 1584 return '.*' + regex + \ 1585 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1586 1587 def install_and_create_launch_args(self): 1588 exe_path = self.getBuildArtifact("a.out") 1589 if not lldb.remote_platform: 1590 return [exe_path] 1591 remote_path = lldbutil.append_to_process_working_directory(self, 1592 os.path.basename(exe_path)) 1593 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1594 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1595 remote_file_spec) 1596 if err.Fail(): 1597 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1598 (exe_path, remote_path, err)) 1599 return [remote_path] 1600