• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseBase(TestBase):
31
32    NO_DEBUG_INFO_TESTCASE = True
33
34    # Default time out in seconds. The timeout is increased tenfold under Asan.
35    DEFAULT_TIMEOUT =  20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
36    # Default sleep time in seconds. The sleep time is doubled under Asan.
37    DEFAULT_SLEEP   =  5  * (2  if ('ASAN_OPTIONS' in os.environ) else 1)
38
39    _GDBREMOTE_KILL_PACKET = b"$k#6b"
40
41    # Start the inferior separately, attach to the inferior on the stub
42    # command line.
43    _STARTUP_ATTACH = "attach"
44    # Start the inferior separately, start the stub without attaching, allow
45    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
46    _STARTUP_ATTACH_MANUALLY = "attach_manually"
47    # Start the stub, and launch the inferior with an $A packet via the
48    # initial packet stream.
49    _STARTUP_LAUNCH = "launch"
50
51    # GDB Signal numbers that are not target-specific used for common
52    # exceptions
53    TARGET_EXC_BAD_ACCESS = 0x91
54    TARGET_EXC_BAD_INSTRUCTION = 0x92
55    TARGET_EXC_ARITHMETIC = 0x93
56    TARGET_EXC_EMULATION = 0x94
57    TARGET_EXC_SOFTWARE = 0x95
58    TARGET_EXC_BREAKPOINT = 0x96
59
60    _verbose_log_handler = None
61    _log_formatter = logging.Formatter(
62        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
63
64    def setUpBaseLogging(self):
65        self.logger = logging.getLogger(__name__)
66
67        if len(self.logger.handlers) > 0:
68            return  # We have set up this handler already
69
70        self.logger.propagate = False
71        self.logger.setLevel(logging.DEBUG)
72
73        # log all warnings to stderr
74        handler = logging.StreamHandler()
75        handler.setLevel(logging.WARNING)
76        handler.setFormatter(self._log_formatter)
77        self.logger.addHandler(handler)
78
79    def isVerboseLoggingRequested(self):
80        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
81        # logged.
82        return any(("gdb-remote" in channel)
83                   for channel in lldbtest_config.channels)
84
85    def setUp(self):
86        TestBase.setUp(self)
87
88        self.setUpBaseLogging()
89        self.debug_monitor_extra_args = []
90
91        if self.isVerboseLoggingRequested():
92            # If requested, full logs go to a log file
93            self._verbose_log_handler = logging.FileHandler(
94                self.log_basename + "-host.log")
95            self._verbose_log_handler.setFormatter(self._log_formatter)
96            self._verbose_log_handler.setLevel(logging.DEBUG)
97            self.logger.addHandler(self._verbose_log_handler)
98
99        self.test_sequence = GdbRemoteTestSequence(self.logger)
100        self.set_inferior_startup_launch()
101        self.port = self.get_next_port()
102        self.stub_sends_two_stop_notifications_on_kill = False
103        if configuration.lldb_platform_url:
104            if configuration.lldb_platform_url.startswith('unix-'):
105                url_pattern = '(.+)://\[?(.+?)\]?/.*'
106            else:
107                url_pattern = '(.+)://(.+):\d+'
108            scheme, host = re.match(
109                url_pattern, configuration.lldb_platform_url).groups()
110            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
111                self.stub_device = host
112                self.stub_hostname = 'localhost'
113            else:
114                self.stub_device = None
115                self.stub_hostname = host
116        else:
117            self.stub_hostname = "localhost"
118
119    def tearDown(self):
120        self.logger.removeHandler(self._verbose_log_handler)
121        self._verbose_log_handler = None
122        TestBase.tearDown(self)
123
124    def getLocalServerLogFile(self):
125        return self.log_basename + "-server.log"
126
127    def setUpServerLogging(self, is_llgs):
128        if len(lldbtest_config.channels) == 0:
129            return  # No logging requested
130
131        if lldb.remote_platform:
132            log_file = lldbutil.join_remote_paths(
133                lldb.remote_platform.GetWorkingDirectory(), "server.log")
134        else:
135            log_file = self.getLocalServerLogFile()
136
137        if is_llgs:
138            self.debug_monitor_extra_args.append("--log-file=" + log_file)
139            self.debug_monitor_extra_args.append(
140                "--log-channels={}".format(":".join(lldbtest_config.channels)))
141        else:
142            self.debug_monitor_extra_args = [
143                "--log-file=" + log_file, "--log-flags=0x800000"]
144
145    def get_next_port(self):
146        return 12000 + random.randint(0, 3999)
147
148    def reset_test_sequence(self):
149        self.test_sequence = GdbRemoteTestSequence(self.logger)
150
151
152    def init_llgs_test(self):
153        reverse_connect = True
154        if lldb.remote_platform:
155            # Reverse connections may be tricky due to firewalls/NATs.
156            reverse_connect = False
157
158            triple = self.dbg.GetSelectedPlatform().GetTriple()
159            if re.match(".*-.*-windows", triple):
160                self.skipTest("Remotely testing is not supported on Windows yet.")
161
162            # Grab the ppid from /proc/[shell pid]/stat
163            err, retcode, shell_stat = self.run_platform_command(
164                "cat /proc/$$/stat")
165            self.assertTrue(
166                err.Success() and retcode == 0,
167                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
168                (err.GetCString(),
169                 retcode))
170
171            # [pid] ([executable]) [state] [*ppid*]
172            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
173            err, retcode, ls_output = self.run_platform_command(
174                "ls -l /proc/%s/exe" % pid)
175            self.assertTrue(
176                err.Success() and retcode == 0,
177                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
178                (pid,
179                 err.GetCString(),
180                 retcode))
181            exe = ls_output.split()[-1]
182
183            # If the binary has been deleted, the link name has " (deleted)" appended.
184            # Remove if it's there.
185            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
186        else:
187            # TODO: enable this
188            if platform.system() == 'Windows':
189                reverse_connect = False
190
191            self.debug_monitor_exe = get_lldb_server_exe()
192            if not self.debug_monitor_exe:
193                self.skipTest("lldb-server exe not found")
194
195        self.debug_monitor_extra_args = ["gdbserver"]
196        self.setUpServerLogging(is_llgs=True)
197
198        self.reverse_connect = reverse_connect
199
200    def init_debugserver_test(self):
201        self.debug_monitor_exe = get_debugserver_exe()
202        if not self.debug_monitor_exe:
203            self.skipTest("debugserver exe not found")
204        self.setUpServerLogging(is_llgs=False)
205        self.reverse_connect = True
206
207        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
208        # when the process truly dies.
209        self.stub_sends_two_stop_notifications_on_kill = True
210
211    def forward_adb_port(self, source, target, direction, device):
212        adb = ['adb'] + (['-s', device] if device else []) + [direction]
213
214        def remove_port_forward():
215            subprocess.call(adb + ["--remove", "tcp:%d" % source])
216
217        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
218        self.addTearDownHook(remove_port_forward)
219
220    def _verify_socket(self, sock):
221        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
222        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
223        # the connect() will always be successful, but the connection will be immediately dropped
224        # if ADB could not connect on the remote side. This function tries to detect this
225        # situation, and report it as "connection refused" so that the upper layers attempt the
226        # connection again.
227        triple = self.dbg.GetSelectedPlatform().GetTriple()
228        if not re.match(".*-.*-.*-android", triple):
229            return  # Not android.
230        can_read, _, _ = select.select([sock], [], [], 0.1)
231        if sock not in can_read:
232            return  # Data is not available, but the connection is alive.
233        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
234            raise _ConnectionRefused()  # Got EOF, connection dropped.
235
236    def create_socket(self):
237        try:
238            sock = socket.socket(family=socket.AF_INET)
239        except OSError as e:
240            if e.errno != errno.EAFNOSUPPORT:
241                raise
242            sock = socket.socket(family=socket.AF_INET6)
243
244        logger = self.logger
245
246        triple = self.dbg.GetSelectedPlatform().GetTriple()
247        if re.match(".*-.*-.*-android", triple):
248            self.forward_adb_port(
249                self.port,
250                self.port,
251                "forward",
252                self.stub_device)
253
254        logger.info(
255            "Connecting to debug monitor on %s:%d",
256            self.stub_hostname,
257            self.port)
258        connect_info = (self.stub_hostname, self.port)
259        try:
260            sock.connect(connect_info)
261        except socket.error as serr:
262            if serr.errno == errno.ECONNREFUSED:
263                raise _ConnectionRefused()
264            raise serr
265
266        def shutdown_socket():
267            if sock:
268                try:
269                    # send the kill packet so lldb-server shuts down gracefully
270                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
271                except:
272                    logger.warning(
273                        "failed to send kill packet to debug monitor: {}; ignoring".format(
274                            sys.exc_info()[0]))
275
276                try:
277                    sock.close()
278                except:
279                    logger.warning(
280                        "failed to close socket to debug monitor: {}; ignoring".format(
281                            sys.exc_info()[0]))
282
283        self.addTearDownHook(shutdown_socket)
284
285        self._verify_socket(sock)
286
287        return sock
288
289    def set_inferior_startup_launch(self):
290        self._inferior_startup = self._STARTUP_LAUNCH
291
292    def set_inferior_startup_attach(self):
293        self._inferior_startup = self._STARTUP_ATTACH
294
295    def set_inferior_startup_attach_manually(self):
296        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
297
298    def get_debug_monitor_command_line_args(self, attach_pid=None):
299        commandline_args = self.debug_monitor_extra_args
300        if attach_pid:
301            commandline_args += ["--attach=%d" % attach_pid]
302        if self.reverse_connect:
303            commandline_args += ["--reverse-connect", self.connect_address]
304        else:
305            if lldb.remote_platform:
306                commandline_args += ["*:{}".format(self.port)]
307            else:
308                commandline_args += ["localhost:{}".format(self.port)]
309
310        return commandline_args
311
312    def get_target_byte_order(self):
313        inferior_exe_path = self.getBuildArtifact("a.out")
314        target = self.dbg.CreateTarget(inferior_exe_path)
315        return target.GetByteOrder()
316
317    def launch_debug_monitor(self, attach_pid=None, logfile=None):
318        if self.reverse_connect:
319            family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0]
320            sock = socket.socket(family, type, proto)
321            sock.settimeout(self.DEFAULT_TIMEOUT)
322
323            sock.bind(addr)
324            sock.listen(1)
325            addr = sock.getsockname()
326            self.connect_address = "[{}]:{}".format(*addr)
327
328
329        # Create the command line.
330        commandline_args = self.get_debug_monitor_command_line_args(
331            attach_pid=attach_pid)
332
333        # Start the server.
334        server = self.spawnSubprocess(
335            self.debug_monitor_exe,
336            commandline_args,
337            install_remote=False)
338        self.assertIsNotNone(server)
339
340        if self.reverse_connect:
341            self.sock = sock.accept()[0]
342            self.sock.settimeout(self.DEFAULT_TIMEOUT)
343
344        return server
345
346    def connect_to_debug_monitor(self, attach_pid=None):
347        if self.reverse_connect:
348            # Create the stub.
349            server = self.launch_debug_monitor(attach_pid=attach_pid)
350            self.assertIsNotNone(server)
351
352            # Schedule debug monitor to be shut down during teardown.
353            logger = self.logger
354
355            self._server = Server(self.sock, server)
356            return server
357
358        # We're using a random port algorithm to try not to collide with other ports,
359        # and retry a max # times.
360        attempts = 0
361        MAX_ATTEMPTS = 20
362
363        while attempts < MAX_ATTEMPTS:
364            server = self.launch_debug_monitor(attach_pid=attach_pid)
365
366            # Schedule debug monitor to be shut down during teardown.
367            logger = self.logger
368
369            connect_attemps = 0
370            MAX_CONNECT_ATTEMPTS = 10
371
372            while connect_attemps < MAX_CONNECT_ATTEMPTS:
373                # Create a socket to talk to the server
374                try:
375                    logger.info("Connect attempt %d", connect_attemps + 1)
376                    self.sock = self.create_socket()
377                    self._server = Server(self.sock, server)
378                    return server
379                except _ConnectionRefused as serr:
380                    # Ignore, and try again.
381                    pass
382                time.sleep(0.5)
383                connect_attemps += 1
384
385            # We should close the server here to be safe.
386            server.terminate()
387
388            # Increment attempts.
389            print(
390                "connect to debug monitor on port %d failed, attempt #%d of %d" %
391                (self.port, attempts + 1, MAX_ATTEMPTS))
392            attempts += 1
393
394            # And wait a random length of time before next attempt, to avoid
395            # collisions.
396            time.sleep(random.randint(1, 5))
397
398            # Now grab a new port number.
399            self.port = self.get_next_port()
400
401        raise Exception(
402            "failed to create a socket to the launched debug monitor after %d tries" %
403            attempts)
404
405    def launch_process_for_attach(
406            self,
407            inferior_args=None,
408            sleep_seconds=3,
409            exe_path=None):
410        # We're going to start a child process that the debug monitor stub can later attach to.
411        # This process needs to be started so that it just hangs around for a while.  We'll
412        # have it sleep.
413        if not exe_path:
414            exe_path = self.getBuildArtifact("a.out")
415
416        args = []
417        if inferior_args:
418            args.extend(inferior_args)
419        if sleep_seconds:
420            args.append("sleep:%d" % sleep_seconds)
421
422        return self.spawnSubprocess(exe_path, args)
423
424    def prep_debug_monitor_and_inferior(
425            self,
426            inferior_args=None,
427            inferior_sleep_seconds=3,
428            inferior_exe_path=None,
429            inferior_env=None):
430        """Prep the debug monitor, the inferior, and the expected packet stream.
431
432        Handle the separate cases of using the debug monitor in attach-to-inferior mode
433        and in launch-inferior mode.
434
435        For attach-to-inferior mode, the inferior process is first started, then
436        the debug monitor is started in attach to pid mode (using --attach on the
437        stub command line), and the no-ack-mode setup is appended to the packet
438        stream.  The packet stream is not yet executed, ready to have more expected
439        packet entries added to it.
440
441        For launch-inferior mode, the stub is first started, then no ack mode is
442        setup on the expected packet stream, then the verified launch packets are added
443        to the expected socket stream.  The packet stream is not yet executed, ready
444        to have more expected packet entries added to it.
445
446        The return value is:
447        {inferior:<inferior>, server:<server>}
448        """
449        inferior = None
450        attach_pid = None
451
452        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
453            # Launch the process that we'll use as the inferior.
454            inferior = self.launch_process_for_attach(
455                inferior_args=inferior_args,
456                sleep_seconds=inferior_sleep_seconds,
457                exe_path=inferior_exe_path)
458            self.assertIsNotNone(inferior)
459            self.assertTrue(inferior.pid > 0)
460            if self._inferior_startup == self._STARTUP_ATTACH:
461                # In this case, we want the stub to attach via the command
462                # line, so set the command line attach pid here.
463                attach_pid = inferior.pid
464
465        if self._inferior_startup == self._STARTUP_LAUNCH:
466            # Build launch args
467            if not inferior_exe_path:
468                inferior_exe_path = self.getBuildArtifact("a.out")
469
470            if lldb.remote_platform:
471                remote_path = lldbutil.append_to_process_working_directory(self,
472                    os.path.basename(inferior_exe_path))
473                remote_file_spec = lldb.SBFileSpec(remote_path, False)
474                err = lldb.remote_platform.Install(lldb.SBFileSpec(
475                    inferior_exe_path, True), remote_file_spec)
476                if err.Fail():
477                    raise Exception(
478                        "remote_platform.Install('%s', '%s') failed: %s" %
479                        (inferior_exe_path, remote_path, err))
480                inferior_exe_path = remote_path
481
482            launch_args = [inferior_exe_path]
483            if inferior_args:
484                launch_args.extend(inferior_args)
485
486        # Launch the debug monitor stub, attaching to the inferior.
487        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
488        self.assertIsNotNone(server)
489
490        # Build the expected protocol stream
491        self.add_no_ack_remote_stream()
492        if inferior_env:
493            for name, value in inferior_env.items():
494                self.add_set_environment_packets(name, value)
495        if self._inferior_startup == self._STARTUP_LAUNCH:
496            self.add_verified_launch_packets(launch_args)
497
498        return {"inferior": inferior, "server": server}
499
500    def expect_socket_recv(
501            self,
502            sock,
503            expected_content_regex
504            ):
505        response = ""
506        timeout_time = time.time() + self.DEFAULT_TIMEOUT
507
508        while not expected_content_regex.match(
509                response) and time.time() < timeout_time:
510            can_read, _, _ = select.select([sock], [], [], self.DEFAULT_TIMEOUT)
511            if can_read and sock in can_read:
512                recv_bytes = sock.recv(4096)
513                if recv_bytes:
514                    response += seven.bitcast_to_string(recv_bytes)
515
516        self.assertTrue(expected_content_regex.match(response))
517
518    def expect_socket_send(self, sock, content):
519        request_bytes_remaining = content
520        timeout_time = time.time() + self.DEFAULT_TIMEOUT
521
522        while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
523            _, can_write, _ = select.select([], [sock], [], self.DEFAULT_TIMEOUT)
524            if can_write and sock in can_write:
525                written_byte_count = sock.send(request_bytes_remaining.encode())
526                request_bytes_remaining = request_bytes_remaining[
527                    written_byte_count:]
528        self.assertEqual(len(request_bytes_remaining), 0)
529
530    def do_handshake(self, stub_socket):
531        # Write the ack.
532        self.expect_socket_send(stub_socket, "+")
533
534        # Send the start no ack mode packet.
535        NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
536        bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST.encode())
537        self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
538
539        # Receive the ack and "OK"
540        self.expect_socket_recv(stub_socket, re.compile(
541            r"^\+\$OK#[0-9a-fA-F]{2}$"))
542
543        # Send the final ack.
544        self.expect_socket_send(stub_socket, "+")
545
546    def add_no_ack_remote_stream(self):
547        self.test_sequence.add_log_lines(
548            ["read packet: +",
549             "read packet: $QStartNoAckMode#b0",
550             "send packet: +",
551             "send packet: $OK#9a",
552             "read packet: +"],
553            True)
554
555    def add_verified_launch_packets(self, launch_args):
556        self.test_sequence.add_log_lines(
557            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
558             "send packet: $OK#00",
559             "read packet: $qLaunchSuccess#a5",
560             "send packet: $OK#00"],
561            True)
562
563    def add_thread_suffix_request_packets(self):
564        self.test_sequence.add_log_lines(
565            ["read packet: $QThreadSuffixSupported#e4",
566             "send packet: $OK#00",
567             ], True)
568
569    def add_process_info_collection_packets(self):
570        self.test_sequence.add_log_lines(
571            ["read packet: $qProcessInfo#dc",
572             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
573            True)
574
575    def add_set_environment_packets(self, name, value):
576        self.test_sequence.add_log_lines(
577            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
578             "send packet: $OK#00",
579             ], True)
580
581    _KNOWN_PROCESS_INFO_KEYS = [
582        "pid",
583        "parent-pid",
584        "real-uid",
585        "real-gid",
586        "effective-uid",
587        "effective-gid",
588        "cputype",
589        "cpusubtype",
590        "ostype",
591        "triple",
592        "vendor",
593        "endian",
594        "elf_abi",
595        "ptrsize"
596    ]
597
598    def parse_process_info_response(self, context):
599        # Ensure we have a process info response.
600        self.assertIsNotNone(context)
601        process_info_raw = context.get("process_info_raw")
602        self.assertIsNotNone(process_info_raw)
603
604        # Pull out key:value; pairs.
605        process_info_dict = {
606            match.group(1): match.group(2) for match in re.finditer(
607                r"([^:]+):([^;]+);", process_info_raw)}
608
609        # Validate keys are known.
610        for (key, val) in list(process_info_dict.items()):
611            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
612            self.assertIsNotNone(val)
613
614        return process_info_dict
615
616    def add_register_info_collection_packets(self):
617        self.test_sequence.add_log_lines(
618            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
619                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
620                "save_key": "reg_info_responses"}],
621            True)
622
623    def parse_register_info_packets(self, context):
624        """Return an array of register info dictionaries, one per register info."""
625        reg_info_responses = context.get("reg_info_responses")
626        self.assertIsNotNone(reg_info_responses)
627
628        # Parse register infos.
629        return [parse_reg_info_response(reg_info_response)
630                for reg_info_response in reg_info_responses]
631
632    def expect_gdbremote_sequence(self):
633        return expect_lldb_gdbserver_replay(
634            self,
635            self._server,
636            self.test_sequence,
637            self.DEFAULT_TIMEOUT * len(self.test_sequence),
638            self.logger)
639
640    _KNOWN_REGINFO_KEYS = [
641        "name",
642        "alt-name",
643        "bitsize",
644        "offset",
645        "encoding",
646        "format",
647        "set",
648        "gcc",
649        "ehframe",
650        "dwarf",
651        "generic",
652        "container-regs",
653        "invalidate-regs",
654        "dynamic_size_dwarf_expr_bytes",
655        "dynamic_size_dwarf_len"
656    ]
657
658    def assert_valid_reg_info(self, reg_info):
659        # Assert we know about all the reginfo keys parsed.
660        for key in reg_info:
661            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
662
663        # Check the bare-minimum expected set of register info keys.
664        self.assertTrue("name" in reg_info)
665        self.assertTrue("bitsize" in reg_info)
666
667        if not self.getArchitecture() == 'aarch64':
668            self.assertTrue("offset" in reg_info)
669
670        self.assertTrue("encoding" in reg_info)
671        self.assertTrue("format" in reg_info)
672
673    def find_pc_reg_info(self, reg_infos):
674        lldb_reg_index = 0
675        for reg_info in reg_infos:
676            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
677                return (lldb_reg_index, reg_info)
678            lldb_reg_index += 1
679
680        return (None, None)
681
682    def add_lldb_register_index(self, reg_infos):
683        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
684
685        We'll use this when we want to call packets like P/p with a register index but do so
686        on only a subset of the full register info set.
687        """
688        self.assertIsNotNone(reg_infos)
689
690        reg_index = 0
691        for reg_info in reg_infos:
692            reg_info["lldb_register_index"] = reg_index
693            reg_index += 1
694
695    def add_query_memory_region_packets(self, address):
696        self.test_sequence.add_log_lines(
697            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
698             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
699            True)
700
701    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
702        self.assertIsNotNone(key_val_text)
703        kv_dict = {}
704        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
705            key = match.group(1)
706            val = match.group(2)
707            if key in kv_dict:
708                if allow_dupes:
709                    if isinstance(kv_dict[key], list):
710                        kv_dict[key].append(val)
711                    else:
712                        # Promote to list
713                        kv_dict[key] = [kv_dict[key], val]
714                else:
715                    self.fail(
716                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
717                            key, val, key_val_text, kv_dict))
718            else:
719                kv_dict[key] = val
720        return kv_dict
721
722    def parse_memory_region_packet(self, context):
723        # Ensure we have a context.
724        self.assertIsNotNone(context.get("memory_region_response"))
725
726        # Pull out key:value; pairs.
727        mem_region_dict = self.parse_key_val_dict(
728            context.get("memory_region_response"))
729
730        # Validate keys are known.
731        for (key, val) in list(mem_region_dict.items()):
732            self.assertIn(key,
733                ["start",
734                 "size",
735                 "permissions",
736                 "flags",
737                 "name",
738                 "error"])
739            self.assertIsNotNone(val)
740
741        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
742        # Return the dictionary of key-value pairs for the memory region.
743        return mem_region_dict
744
745    def assert_address_within_memory_region(
746            self, test_address, mem_region_dict):
747        self.assertIsNotNone(mem_region_dict)
748        self.assertTrue("start" in mem_region_dict)
749        self.assertTrue("size" in mem_region_dict)
750
751        range_start = int(mem_region_dict["start"], 16)
752        range_size = int(mem_region_dict["size"], 16)
753        range_end = range_start + range_size
754
755        if test_address < range_start:
756            self.fail(
757                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
758                    test_address,
759                    range_start,
760                    range_end,
761                    range_size))
762        elif test_address >= range_end:
763            self.fail(
764                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
765                    test_address,
766                    range_start,
767                    range_end,
768                    range_size))
769
770    def add_threadinfo_collection_packets(self):
771        self.test_sequence.add_log_lines(
772            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
773                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
774                "save_key": "threadinfo_responses"}],
775            True)
776
777    def parse_threadinfo_packets(self, context):
778        """Return an array of thread ids (decimal ints), one per thread."""
779        threadinfo_responses = context.get("threadinfo_responses")
780        self.assertIsNotNone(threadinfo_responses)
781
782        thread_ids = []
783        for threadinfo_response in threadinfo_responses:
784            new_thread_infos = parse_threadinfo_response(threadinfo_response)
785            thread_ids.extend(new_thread_infos)
786        return thread_ids
787
788    def wait_for_thread_count(self, thread_count):
789        start_time = time.time()
790        timeout_time = start_time + self.DEFAULT_TIMEOUT
791
792        actual_thread_count = 0
793        while actual_thread_count < thread_count:
794            self.reset_test_sequence()
795            self.add_threadinfo_collection_packets()
796
797            context = self.expect_gdbremote_sequence()
798            self.assertIsNotNone(context)
799
800            threads = self.parse_threadinfo_packets(context)
801            self.assertIsNotNone(threads)
802
803            actual_thread_count = len(threads)
804
805            if time.time() > timeout_time:
806                raise Exception(
807                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
808                        self.DEFAULT_TIMEOUT, thread_count, actual_thread_count))
809
810        return threads
811
812    def add_set_breakpoint_packets(
813            self,
814            address,
815            z_packet_type=0,
816            do_continue=True,
817            breakpoint_kind=1):
818        self.test_sequence.add_log_lines(
819            [  # Set the breakpoint.
820                "read packet: $Z{2},{0:x},{1}#00".format(
821                    address, breakpoint_kind, z_packet_type),
822                # Verify the stub could set it.
823                "send packet: $OK#00",
824            ], True)
825
826        if (do_continue):
827            self.test_sequence.add_log_lines(
828                [  # Continue the inferior.
829                    "read packet: $c#63",
830                    # Expect a breakpoint stop report.
831                    {"direction": "send",
832                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
833                     "capture": {1: "stop_signo",
834                                 2: "stop_thread_id"}},
835                ], True)
836
837    def add_remove_breakpoint_packets(
838            self,
839            address,
840            z_packet_type=0,
841            breakpoint_kind=1):
842        self.test_sequence.add_log_lines(
843            [  # Remove the breakpoint.
844                "read packet: $z{2},{0:x},{1}#00".format(
845                    address, breakpoint_kind, z_packet_type),
846                # Verify the stub could unset it.
847                "send packet: $OK#00",
848            ], True)
849
850    def add_qSupported_packets(self):
851        self.test_sequence.add_log_lines(
852            ["read packet: $qSupported#00",
853             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
854             ], True)
855
856    _KNOWN_QSUPPORTED_STUB_FEATURES = [
857        "augmented-libraries-svr4-read",
858        "PacketSize",
859        "QStartNoAckMode",
860        "QThreadSuffixSupported",
861        "QListThreadsInStopReply",
862        "qXfer:auxv:read",
863        "qXfer:libraries:read",
864        "qXfer:libraries-svr4:read",
865        "qXfer:features:read",
866        "qEcho",
867        "QPassSignals"
868    ]
869
870    def parse_qSupported_response(self, context):
871        self.assertIsNotNone(context)
872
873        raw_response = context.get("qSupported_response")
874        self.assertIsNotNone(raw_response)
875
876        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
877        # +,-,? is stripped from the key and set as the value.
878        supported_dict = {}
879        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
880            key = match.group(1)
881            val = match.group(3)
882
883            # key=val: store as is
884            if val and len(val) > 0:
885                supported_dict[key] = val
886            else:
887                if len(key) < 2:
888                    raise Exception(
889                        "singular stub feature is too short: must be stub_feature{+,-,?}")
890                supported_type = key[-1]
891                key = key[:-1]
892                if not supported_type in ["+", "-", "?"]:
893                    raise Exception(
894                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
895                supported_dict[key] = supported_type
896            # Ensure we know the supported element
897            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
898                raise Exception(
899                    "unknown qSupported stub feature reported: %s" %
900                    key)
901
902        return supported_dict
903
904    def run_process_then_stop(self, run_seconds=1):
905        # Tell the stub to continue.
906        self.test_sequence.add_log_lines(
907            ["read packet: $vCont;c#a8"],
908            True)
909        context = self.expect_gdbremote_sequence()
910
911        # Wait for run_seconds.
912        time.sleep(run_seconds)
913
914        # Send an interrupt, capture a T response.
915        self.reset_test_sequence()
916        self.test_sequence.add_log_lines(
917            ["read packet: {}".format(chr(3)),
918             {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
919            True)
920        context = self.expect_gdbremote_sequence()
921        self.assertIsNotNone(context)
922        self.assertIsNotNone(context.get("stop_result"))
923
924        return context
925
926    def continue_process_and_wait_for_stop(self):
927        self.test_sequence.add_log_lines(
928            [
929                "read packet: $vCont;c#a8",
930                {
931                    "direction": "send",
932                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
933                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
934                },
935            ],
936            True,
937        )
938        context = self.expect_gdbremote_sequence()
939        self.assertIsNotNone(context)
940        return self.parse_interrupt_packets(context)
941
942    def select_modifiable_register(self, reg_infos):
943        """Find a register that can be read/written freely."""
944        PREFERRED_REGISTER_NAMES = set(["rax", ])
945
946        # First check for the first register from the preferred register name
947        # set.
948        alternative_register_index = None
949
950        self.assertIsNotNone(reg_infos)
951        for reg_info in reg_infos:
952            if ("name" in reg_info) and (
953                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
954                # We found a preferred register.  Use it.
955                return reg_info["lldb_register_index"]
956            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
957                    reg_info["generic"] == "arg1"):
958                # A frame pointer or first arg register will do as a
959                # register to modify temporarily.
960                alternative_register_index = reg_info["lldb_register_index"]
961
962        # We didn't find a preferred register.  Return whatever alternative register
963        # we found, if any.
964        return alternative_register_index
965
966    def extract_registers_from_stop_notification(self, stop_key_vals_text):
967        self.assertIsNotNone(stop_key_vals_text)
968        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
969
970        registers = {}
971        for (key, val) in list(kv_dict.items()):
972            if re.match(r"^[0-9a-fA-F]+$", key):
973                registers[int(key, 16)] = val
974        return registers
975
976    def gather_register_infos(self):
977        self.reset_test_sequence()
978        self.add_register_info_collection_packets()
979
980        context = self.expect_gdbremote_sequence()
981        self.assertIsNotNone(context)
982
983        reg_infos = self.parse_register_info_packets(context)
984        self.assertIsNotNone(reg_infos)
985        self.add_lldb_register_index(reg_infos)
986
987        return reg_infos
988
989    def find_generic_register_with_name(self, reg_infos, generic_name):
990        self.assertIsNotNone(reg_infos)
991        for reg_info in reg_infos:
992            if ("generic" in reg_info) and (
993                    reg_info["generic"] == generic_name):
994                return reg_info
995        return None
996
997    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
998        self.assertIsNotNone(reg_infos)
999        for reg_info in reg_infos:
1000            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
1001                return reg_info
1002        return None
1003
1004    def decode_gdbremote_binary(self, encoded_bytes):
1005        decoded_bytes = ""
1006        i = 0
1007        while i < len(encoded_bytes):
1008            if encoded_bytes[i] == "}":
1009                # Handle escaped char.
1010                self.assertTrue(i + 1 < len(encoded_bytes))
1011                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1012                i += 2
1013            elif encoded_bytes[i] == "*":
1014                # Handle run length encoding.
1015                self.assertTrue(len(decoded_bytes) > 0)
1016                self.assertTrue(i + 1 < len(encoded_bytes))
1017                repeat_count = ord(encoded_bytes[i + 1]) - 29
1018                decoded_bytes += decoded_bytes[-1] * repeat_count
1019                i += 2
1020            else:
1021                decoded_bytes += encoded_bytes[i]
1022                i += 1
1023        return decoded_bytes
1024
1025    def build_auxv_dict(self, endian, word_size, auxv_data):
1026        self.assertIsNotNone(endian)
1027        self.assertIsNotNone(word_size)
1028        self.assertIsNotNone(auxv_data)
1029
1030        auxv_dict = {}
1031
1032        # PowerPC64le's auxvec has a special key that must be ignored.
1033        # This special key may be used multiple times, resulting in
1034        # multiple key/value pairs with the same key, which would otherwise
1035        # break this test check for repeated keys.
1036        #
1037        # AT_IGNOREPPC = 22
1038        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1039        arch = self.getArchitecture()
1040        ignore_keys = None
1041        if arch in ignored_keys_for_arch:
1042            ignore_keys = ignored_keys_for_arch[arch]
1043
1044        while len(auxv_data) > 0:
1045            # Chop off key.
1046            raw_key = auxv_data[:word_size]
1047            auxv_data = auxv_data[word_size:]
1048
1049            # Chop of value.
1050            raw_value = auxv_data[:word_size]
1051            auxv_data = auxv_data[word_size:]
1052
1053            # Convert raw text from target endian.
1054            key = unpack_endian_binary_string(endian, raw_key)
1055            value = unpack_endian_binary_string(endian, raw_value)
1056
1057            if ignore_keys and key in ignore_keys:
1058                continue
1059
1060            # Handle ending entry.
1061            if key == 0:
1062                self.assertEqual(value, 0)
1063                return auxv_dict
1064
1065            # The key should not already be present.
1066            self.assertFalse(key in auxv_dict)
1067            auxv_dict[key] = value
1068
1069        self.fail(
1070            "should not reach here - implies required double zero entry not found")
1071        return auxv_dict
1072
1073    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1074        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1075        offset = 0
1076        done = False
1077        decoded_data = ""
1078
1079        while not done:
1080            # Grab the next iteration of data.
1081            self.reset_test_sequence()
1082            self.test_sequence.add_log_lines(
1083                [
1084                    "read packet: ${}{:x},{:x}:#00".format(
1085                        command_prefix,
1086                        offset,
1087                        chunk_length),
1088                    {
1089                        "direction": "send",
1090                        "regex": re.compile(
1091                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1092                            re.MULTILINE | re.DOTALL),
1093                        "capture": {
1094                            1: "response_type",
1095                            2: "content_raw"}}],
1096                True)
1097
1098            context = self.expect_gdbremote_sequence()
1099            self.assertIsNotNone(context)
1100
1101            response_type = context.get("response_type")
1102            self.assertIsNotNone(response_type)
1103            self.assertTrue(response_type in ["l", "m"])
1104
1105            # Move offset along.
1106            offset += chunk_length
1107
1108            # Figure out if we're done.  We're done if the response type is l.
1109            done = response_type == "l"
1110
1111            # Decode binary data.
1112            content_raw = context.get("content_raw")
1113            if content_raw and len(content_raw) > 0:
1114                self.assertIsNotNone(content_raw)
1115                decoded_data += self.decode_gdbremote_binary(content_raw)
1116        return decoded_data
1117
1118    def add_interrupt_packets(self):
1119        self.test_sequence.add_log_lines([
1120            # Send the intterupt.
1121            "read packet: {}".format(chr(3)),
1122            # And wait for the stop notification.
1123            {"direction": "send",
1124             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1125             "capture": {1: "stop_signo",
1126                         2: "stop_key_val_text"}},
1127        ], True)
1128
1129    def parse_interrupt_packets(self, context):
1130        self.assertIsNotNone(context.get("stop_signo"))
1131        self.assertIsNotNone(context.get("stop_key_val_text"))
1132        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1133            context["stop_key_val_text"]))
1134
1135    def add_QSaveRegisterState_packets(self, thread_id):
1136        if thread_id:
1137            # Use the thread suffix form.
1138            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1139                thread_id)
1140        else:
1141            request = "read packet: $QSaveRegisterState#00"
1142
1143        self.test_sequence.add_log_lines([request,
1144                                          {"direction": "send",
1145                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1146                                           "capture": {1: "save_response"}},
1147                                          ],
1148                                         True)
1149
1150    def parse_QSaveRegisterState_response(self, context):
1151        self.assertIsNotNone(context)
1152
1153        save_response = context.get("save_response")
1154        self.assertIsNotNone(save_response)
1155
1156        if len(save_response) < 1 or save_response[0] == "E":
1157            # error received
1158            return (False, None)
1159        else:
1160            return (True, int(save_response))
1161
1162    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1163        if thread_id:
1164            # Use the thread suffix form.
1165            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1166                save_id, thread_id)
1167        else:
1168            request = "read packet: $QRestoreRegisterState:{}#00".format(
1169                save_id)
1170
1171        self.test_sequence.add_log_lines([
1172            request,
1173            "send packet: $OK#00"
1174        ], True)
1175
1176    def flip_all_bits_in_each_register_value(
1177            self, reg_infos, endian, thread_id=None):
1178        self.assertIsNotNone(reg_infos)
1179
1180        successful_writes = 0
1181        failed_writes = 0
1182
1183        for reg_info in reg_infos:
1184            # Use the lldb register index added to the reg info.  We're not necessarily
1185            # working off a full set of register infos, so an inferred register
1186            # index could be wrong.
1187            reg_index = reg_info["lldb_register_index"]
1188            self.assertIsNotNone(reg_index)
1189
1190            reg_byte_size = int(reg_info["bitsize"]) // 8
1191            self.assertTrue(reg_byte_size > 0)
1192
1193            # Handle thread suffix.
1194            if thread_id:
1195                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1196                    reg_index, thread_id)
1197            else:
1198                p_request = "read packet: $p{:x}#00".format(reg_index)
1199
1200            # Read the existing value.
1201            self.reset_test_sequence()
1202            self.test_sequence.add_log_lines([
1203                p_request,
1204                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1205            ], True)
1206            context = self.expect_gdbremote_sequence()
1207            self.assertIsNotNone(context)
1208
1209            # Verify the response length.
1210            p_response = context.get("p_response")
1211            self.assertIsNotNone(p_response)
1212            initial_reg_value = unpack_register_hex_unsigned(
1213                endian, p_response)
1214
1215            # Flip the value by xoring with all 1s
1216            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1217            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1218            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1219
1220            # Handle thread suffix for P.
1221            if thread_id:
1222                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1223                    reg_index, pack_register_hex(
1224                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1225            else:
1226                P_request = "read packet: $P{:x}={}#00".format(
1227                    reg_index, pack_register_hex(
1228                        endian, flipped_bits_int, byte_size=reg_byte_size))
1229
1230            # Write the flipped value to the register.
1231            self.reset_test_sequence()
1232            self.test_sequence.add_log_lines([P_request,
1233                                              {"direction": "send",
1234                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1235                                               "capture": {1: "P_response"}},
1236                                              ],
1237                                             True)
1238            context = self.expect_gdbremote_sequence()
1239            self.assertIsNotNone(context)
1240
1241            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1242            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1243            # all flipping perfectly.
1244            P_response = context.get("P_response")
1245            self.assertIsNotNone(P_response)
1246            if P_response == "OK":
1247                successful_writes += 1
1248            else:
1249                failed_writes += 1
1250                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1251
1252            # Read back the register value, ensure it matches the flipped
1253            # value.
1254            if P_response == "OK":
1255                self.reset_test_sequence()
1256                self.test_sequence.add_log_lines([
1257                    p_request,
1258                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1259                ], True)
1260                context = self.expect_gdbremote_sequence()
1261                self.assertIsNotNone(context)
1262
1263                verify_p_response_raw = context.get("p_response")
1264                self.assertIsNotNone(verify_p_response_raw)
1265                verify_bits = unpack_register_hex_unsigned(
1266                    endian, verify_p_response_raw)
1267
1268                if verify_bits != flipped_bits_int:
1269                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1270                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1271                    successful_writes -= 1
1272                    failed_writes += 1
1273
1274        return (successful_writes, failed_writes)
1275
1276    def is_bit_flippable_register(self, reg_info):
1277        if not reg_info:
1278            return False
1279        if not "set" in reg_info:
1280            return False
1281        if reg_info["set"] != "General Purpose Registers":
1282            return False
1283        if ("container-regs" in reg_info) and (
1284                len(reg_info["container-regs"]) > 0):
1285            # Don't try to bit flip registers contained in another register.
1286            return False
1287        if re.match("^.s$", reg_info["name"]):
1288            # This is a 2-letter register name that ends in "s", like a segment register.
1289            # Don't try to bit flip these.
1290            return False
1291        if re.match("^(c|)psr$", reg_info["name"]):
1292            # This is an ARM program status register; don't flip it.
1293            return False
1294        # Okay, this looks fine-enough.
1295        return True
1296
1297    def read_register_values(self, reg_infos, endian, thread_id=None):
1298        self.assertIsNotNone(reg_infos)
1299        values = {}
1300
1301        for reg_info in reg_infos:
1302            # We append a register index when load reg infos so we can work
1303            # with subsets.
1304            reg_index = reg_info.get("lldb_register_index")
1305            self.assertIsNotNone(reg_index)
1306
1307            # Handle thread suffix.
1308            if thread_id:
1309                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1310                    reg_index, thread_id)
1311            else:
1312                p_request = "read packet: $p{:x}#00".format(reg_index)
1313
1314            # Read it with p.
1315            self.reset_test_sequence()
1316            self.test_sequence.add_log_lines([
1317                p_request,
1318                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1319            ], True)
1320            context = self.expect_gdbremote_sequence()
1321            self.assertIsNotNone(context)
1322
1323            # Convert value from target endian to integral.
1324            p_response = context.get("p_response")
1325            self.assertIsNotNone(p_response)
1326            self.assertTrue(len(p_response) > 0)
1327            self.assertFalse(p_response[0] == "E")
1328
1329            values[reg_index] = unpack_register_hex_unsigned(
1330                endian, p_response)
1331
1332        return values
1333
1334    def add_vCont_query_packets(self):
1335        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1336                                          {"direction": "send",
1337                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1338                                           "capture": {2: "vCont_query_response"}},
1339                                          ],
1340                                         True)
1341
1342    def parse_vCont_query_response(self, context):
1343        self.assertIsNotNone(context)
1344        vCont_query_response = context.get("vCont_query_response")
1345
1346        # Handle case of no vCont support at all - in which case the capture
1347        # group will be none or zero length.
1348        if not vCont_query_response or len(vCont_query_response) == 0:
1349            return {}
1350
1351        return {key: 1 for key in vCont_query_response.split(
1352            ";") if key and len(key) > 0}
1353
1354    def count_single_steps_until_true(
1355            self,
1356            thread_id,
1357            predicate,
1358            args,
1359            max_step_count=100,
1360            use_Hc_packet=True,
1361            step_instruction="s"):
1362        """Used by single step test that appears in a few different contexts."""
1363        single_step_count = 0
1364
1365        while single_step_count < max_step_count:
1366            self.assertIsNotNone(thread_id)
1367
1368            # Build the packet for the single step instruction.  We replace
1369            # {thread}, if present, with the thread_id.
1370            step_packet = "read packet: ${}#00".format(
1371                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1372            # print("\nstep_packet created: {}\n".format(step_packet))
1373
1374            # Single step.
1375            self.reset_test_sequence()
1376            if use_Hc_packet:
1377                self.test_sequence.add_log_lines(
1378                    [  # Set the continue thread.
1379                        "read packet: $Hc{0:x}#00".format(thread_id),
1380                        "send packet: $OK#00",
1381                    ], True)
1382            self.test_sequence.add_log_lines([
1383                # Single step.
1384                step_packet,
1385                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1386                # Expect a breakpoint stop report.
1387                {"direction": "send",
1388                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1389                 "capture": {1: "stop_signo",
1390                             2: "stop_thread_id"}},
1391            ], True)
1392            context = self.expect_gdbremote_sequence()
1393            self.assertIsNotNone(context)
1394            self.assertIsNotNone(context.get("stop_signo"))
1395            self.assertEqual(int(context.get("stop_signo"), 16),
1396                             lldbutil.get_signal_number('SIGTRAP'))
1397
1398            single_step_count += 1
1399
1400            # See if the predicate is true.  If so, we're done.
1401            if predicate(args):
1402                return (True, single_step_count)
1403
1404        # The predicate didn't return true within the runaway step count.
1405        return (False, single_step_count)
1406
1407    def g_c1_c2_contents_are(self, args):
1408        """Used by single step test that appears in a few different contexts."""
1409        g_c1_address = args["g_c1_address"]
1410        g_c2_address = args["g_c2_address"]
1411        expected_g_c1 = args["expected_g_c1"]
1412        expected_g_c2 = args["expected_g_c2"]
1413
1414        # Read g_c1 and g_c2 contents.
1415        self.reset_test_sequence()
1416        self.test_sequence.add_log_lines(
1417            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1418             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1419             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1420             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1421            True)
1422
1423        # Run the packet stream.
1424        context = self.expect_gdbremote_sequence()
1425        self.assertIsNotNone(context)
1426
1427        # Check if what we read from inferior memory is what we are expecting.
1428        self.assertIsNotNone(context.get("g_c1_contents"))
1429        self.assertIsNotNone(context.get("g_c2_contents"))
1430
1431        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1432            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1433
1434    def single_step_only_steps_one_instruction(
1435            self, use_Hc_packet=True, step_instruction="s"):
1436        """Used by single step test that appears in a few different contexts."""
1437        # Start up the inferior.
1438        procs = self.prep_debug_monitor_and_inferior(
1439            inferior_args=[
1440                "get-code-address-hex:swap_chars",
1441                "get-data-address-hex:g_c1",
1442                "get-data-address-hex:g_c2",
1443                "sleep:1",
1444                "call-function:swap_chars",
1445                "sleep:5"])
1446
1447        # Run the process
1448        self.test_sequence.add_log_lines(
1449            [  # Start running after initial stop.
1450                "read packet: $c#63",
1451                # Match output line that prints the memory address of the function call entry point.
1452                # Note we require launch-only testing so we can get inferior otuput.
1453                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1454                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1455                # Now stop the inferior.
1456                "read packet: {}".format(chr(3)),
1457                # And wait for the stop notification.
1458                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1459            True)
1460
1461        # Run the packet stream.
1462        context = self.expect_gdbremote_sequence()
1463        self.assertIsNotNone(context)
1464
1465        # Grab the main thread id.
1466        self.assertIsNotNone(context.get("stop_thread_id"))
1467        main_thread_id = int(context.get("stop_thread_id"), 16)
1468
1469        # Grab the function address.
1470        self.assertIsNotNone(context.get("function_address"))
1471        function_address = int(context.get("function_address"), 16)
1472
1473        # Grab the data addresses.
1474        self.assertIsNotNone(context.get("g_c1_address"))
1475        g_c1_address = int(context.get("g_c1_address"), 16)
1476
1477        self.assertIsNotNone(context.get("g_c2_address"))
1478        g_c2_address = int(context.get("g_c2_address"), 16)
1479
1480        # Set a breakpoint at the given address.
1481        if self.getArchitecture().startswith("arm"):
1482            # TODO: Handle case when setting breakpoint in thumb code
1483            BREAKPOINT_KIND = 4
1484        else:
1485            BREAKPOINT_KIND = 1
1486        self.reset_test_sequence()
1487        self.add_set_breakpoint_packets(
1488            function_address,
1489            do_continue=True,
1490            breakpoint_kind=BREAKPOINT_KIND)
1491        context = self.expect_gdbremote_sequence()
1492        self.assertIsNotNone(context)
1493
1494        # Remove the breakpoint.
1495        self.reset_test_sequence()
1496        self.add_remove_breakpoint_packets(
1497            function_address, breakpoint_kind=BREAKPOINT_KIND)
1498        context = self.expect_gdbremote_sequence()
1499        self.assertIsNotNone(context)
1500
1501        # Verify g_c1 and g_c2 match expected initial state.
1502        args = {}
1503        args["g_c1_address"] = g_c1_address
1504        args["g_c2_address"] = g_c2_address
1505        args["expected_g_c1"] = "0"
1506        args["expected_g_c2"] = "1"
1507
1508        self.assertTrue(self.g_c1_c2_contents_are(args))
1509
1510        # Verify we take only a small number of steps to hit the first state.
1511        # Might need to work through function entry prologue code.
1512        args["expected_g_c1"] = "1"
1513        args["expected_g_c2"] = "1"
1514        (state_reached,
1515         step_count) = self.count_single_steps_until_true(main_thread_id,
1516                                                          self.g_c1_c2_contents_are,
1517                                                          args,
1518                                                          max_step_count=25,
1519                                                          use_Hc_packet=use_Hc_packet,
1520                                                          step_instruction=step_instruction)
1521        self.assertTrue(state_reached)
1522
1523        # Verify we hit the next state.
1524        args["expected_g_c1"] = "1"
1525        args["expected_g_c2"] = "0"
1526        (state_reached,
1527         step_count) = self.count_single_steps_until_true(main_thread_id,
1528                                                          self.g_c1_c2_contents_are,
1529                                                          args,
1530                                                          max_step_count=5,
1531                                                          use_Hc_packet=use_Hc_packet,
1532                                                          step_instruction=step_instruction)
1533        self.assertTrue(state_reached)
1534        expected_step_count = 1
1535        arch = self.getArchitecture()
1536
1537        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1538        # of variable value
1539        if re.match("mips", arch):
1540            expected_step_count = 3
1541        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1542        # variable value
1543        if re.match("s390x", arch):
1544            expected_step_count = 2
1545        # ARM64 requires "4" instructions: 2 to compute the address (adrp, add),
1546        # one to materialize the constant (mov) and the store
1547        if re.match("arm64", arch):
1548            expected_step_count = 4
1549
1550        self.assertEqual(step_count, expected_step_count)
1551
1552        # ARM64: Once addresses and constants are materialized, only one
1553        # instruction is needed.
1554        if re.match("arm64", arch):
1555            expected_step_count = 1
1556
1557        # Verify we hit the next state.
1558        args["expected_g_c1"] = "0"
1559        args["expected_g_c2"] = "0"
1560        (state_reached,
1561         step_count) = self.count_single_steps_until_true(main_thread_id,
1562                                                          self.g_c1_c2_contents_are,
1563                                                          args,
1564                                                          max_step_count=5,
1565                                                          use_Hc_packet=use_Hc_packet,
1566                                                          step_instruction=step_instruction)
1567        self.assertTrue(state_reached)
1568        self.assertEqual(step_count, expected_step_count)
1569
1570        # Verify we hit the next state.
1571        args["expected_g_c1"] = "0"
1572        args["expected_g_c2"] = "1"
1573        (state_reached,
1574         step_count) = self.count_single_steps_until_true(main_thread_id,
1575                                                          self.g_c1_c2_contents_are,
1576                                                          args,
1577                                                          max_step_count=5,
1578                                                          use_Hc_packet=use_Hc_packet,
1579                                                          step_instruction=step_instruction)
1580        self.assertTrue(state_reached)
1581        self.assertEqual(step_count, expected_step_count)
1582
1583    def maybe_strict_output_regex(self, regex):
1584        return '.*' + regex + \
1585            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1586
1587    def install_and_create_launch_args(self):
1588        exe_path = self.getBuildArtifact("a.out")
1589        if not lldb.remote_platform:
1590            return [exe_path]
1591        remote_path = lldbutil.append_to_process_working_directory(self,
1592            os.path.basename(exe_path))
1593        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1594        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1595                                           remote_file_spec)
1596        if err.Fail():
1597            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1598                            (exe_path, remote_path, err))
1599        return [remote_path]
1600