• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import binascii
31import ipaddress
32import logging
33import os
34import re
35import socket
36import subprocess
37import sys
38import time
39import traceback
40import typing
41import unittest
42from ipaddress import IPv6Address, IPv6Network
43from typing import Union, Dict, Optional, List, Any
44
45import pexpect
46import pexpect.popen_spawn
47
48import config
49import simulator
50import thread_cert
51
52PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
53
54
55class OtbrDocker:
56    RESET_DELAY = 3
57
58    _socat_proc = None
59    _ot_rcp_proc = None
60    _docker_proc = None
61
62    def __init__(self, nodeid: int, **kwargs):
63        self.verbose = int(float(os.getenv('VERBOSE', 0)))
64        try:
65            self._docker_name = config.OTBR_DOCKER_NAME_PREFIX + str(nodeid)
66            self._prepare_ot_rcp_sim(nodeid)
67            self._launch_docker()
68        except Exception:
69            traceback.print_exc()
70            self.destroy()
71            raise
72
73    def _prepare_ot_rcp_sim(self, nodeid: int):
74        self._socat_proc = subprocess.Popen(['socat', '-d', '-d', 'pty,raw,echo=0', 'pty,raw,echo=0'],
75                                            stderr=subprocess.PIPE,
76                                            stdin=subprocess.DEVNULL,
77                                            stdout=subprocess.DEVNULL)
78
79        line = self._socat_proc.stderr.readline().decode('ascii').strip()
80        self._rcp_device_pty = rcp_device_pty = line[line.index('PTY is /dev') + 7:]
81        line = self._socat_proc.stderr.readline().decode('ascii').strip()
82        self._rcp_device = rcp_device = line[line.index('PTY is /dev') + 7:]
83        logging.info(f"socat running: device PTY: {rcp_device_pty}, device: {rcp_device}")
84
85        ot_rcp_path = self._get_ot_rcp_path()
86        self._ot_rcp_proc = subprocess.Popen(f"{ot_rcp_path} {nodeid} > {rcp_device_pty} < {rcp_device_pty}",
87                                             shell=True,
88                                             stdin=subprocess.DEVNULL,
89                                             stdout=subprocess.DEVNULL,
90                                             stderr=subprocess.DEVNULL)
91
92        try:
93            self._ot_rcp_proc.wait(1)
94        except subprocess.TimeoutExpired:
95            # We expect ot-rcp not to quit in 1 second.
96            pass
97        else:
98            raise Exception(f"ot-rcp {nodeid} exited unexpectedly!")
99
100    def _get_ot_rcp_path(self) -> str:
101        srcdir = os.environ['top_builddir']
102        path = '%s/examples/apps/ncp/ot-rcp' % srcdir
103        logging.info("ot-rcp path: %s", path)
104        return path
105
106    def _launch_docker(self):
107        logging.info(f'Docker image: {config.OTBR_DOCKER_IMAGE}')
108        subprocess.check_call(f"docker rm -f {self._docker_name} || true", shell=True)
109        CI_ENV = os.getenv('CI_ENV', '').split()
110        os.makedirs('/tmp/coverage/', exist_ok=True)
111        self._docker_proc = subprocess.Popen(['docker', 'run'] + CI_ENV + [
112            '--rm',
113            '--name',
114            self._docker_name,
115            '--network',
116            config.BACKBONE_DOCKER_NETWORK_NAME,
117            '-i',
118            '--sysctl',
119            'net.ipv6.conf.all.disable_ipv6=0 net.ipv4.conf.all.forwarding=1 net.ipv6.conf.all.forwarding=1',
120            '--privileged',
121            '--cap-add=NET_ADMIN',
122            '--volume',
123            f'{self._rcp_device}:/dev/ttyUSB0',
124            '-v',
125            '/tmp/coverage/:/tmp/coverage/',
126            config.OTBR_DOCKER_IMAGE,
127            '-B',
128            config.BACKBONE_IFNAME,
129            '--trel-url',
130            f'trel://{config.BACKBONE_IFNAME}',
131        ],
132                                             stdin=subprocess.DEVNULL,
133                                             stdout=sys.stdout,
134                                             stderr=sys.stderr)
135
136        launch_docker_deadline = time.time() + 300
137        launch_ok = False
138
139        while time.time() < launch_docker_deadline:
140            try:
141                subprocess.check_call(f'docker exec -i {self._docker_name} ot-ctl state', shell=True)
142                launch_ok = True
143                logging.info("OTBR Docker %s Is Ready!", self._docker_name)
144                break
145            except subprocess.CalledProcessError:
146                time.sleep(5)
147                continue
148
149        assert launch_ok
150
151        self.start_ot_ctl()
152
153    def __repr__(self):
154        return f'OtbrDocker<{self.nodeid}>'
155
156    def start_otbr_service(self):
157        self.bash('service otbr-agent start')
158        self.simulator.go(3)
159        self.start_ot_ctl()
160
161    def stop_otbr_service(self):
162        self.stop_ot_ctl()
163        self.bash('service otbr-agent stop')
164
165    def stop_mdns_service(self):
166        self.bash('service avahi-daemon stop')
167        self.bash('service mdns stop')
168
169    def start_mdns_service(self):
170        self.bash('service avahi-daemon start')
171        self.bash('service mdns start')
172
173    def start_ot_ctl(self):
174        cmd = f'docker exec -i {self._docker_name} ot-ctl'
175        self.pexpect = pexpect.popen_spawn.PopenSpawn(cmd, timeout=30)
176        if self.verbose:
177            self.pexpect.logfile_read = sys.stdout.buffer
178
179        # Add delay to ensure that the process is ready to receive commands.
180        timeout = 0.4
181        while timeout > 0:
182            self.pexpect.send('\r\n')
183            try:
184                self.pexpect.expect('> ', timeout=0.1)
185                break
186            except pexpect.TIMEOUT:
187                timeout -= 0.1
188
189    def stop_ot_ctl(self):
190        self.pexpect.sendeof()
191        self.pexpect.wait()
192        self.pexpect.proc.kill()
193
194    def destroy(self):
195        logging.info("Destroying %s", self)
196        self._shutdown_docker()
197        self._shutdown_ot_rcp()
198        self._shutdown_socat()
199
200    def _shutdown_docker(self):
201        if self._docker_proc is None:
202            return
203
204        try:
205            COVERAGE = int(os.getenv('COVERAGE', '0'))
206            OTBR_COVERAGE = int(os.getenv('OTBR_COVERAGE', '0'))
207            test_name = os.getenv('TEST_NAME')
208            unique_node_id = f'{test_name}-{PORT_OFFSET}-{self.nodeid}'
209
210            if COVERAGE or OTBR_COVERAGE:
211                self.bash('service otbr-agent stop')
212
213                cov_file_path = f'/tmp/coverage/coverage-{unique_node_id}.info'
214                # Upload OTBR code coverage if OTBR_COVERAGE=1, otherwise OpenThread code coverage.
215                if OTBR_COVERAGE:
216                    codecov_cmd = f'lcov --directory . --capture --output-file {cov_file_path}'
217                else:
218                    codecov_cmd = ('lcov --directory build/otbr/third_party/openthread/repo --capture '
219                                   f'--output-file {cov_file_path}')
220
221                self.bash(codecov_cmd)
222
223            copyCore = subprocess.run(f'docker cp {self._docker_name}:/core ./coredump_{unique_node_id}', shell=True)
224            if copyCore.returncode == 0:
225                subprocess.check_call(
226                    f'docker cp {self._docker_name}:/usr/sbin/otbr-agent ./otbr-agent_{unique_node_id}', shell=True)
227
228        finally:
229            subprocess.check_call(f"docker rm -f {self._docker_name}", shell=True)
230            self._docker_proc.wait()
231            del self._docker_proc
232
233    def _shutdown_ot_rcp(self):
234        if self._ot_rcp_proc is not None:
235            self._ot_rcp_proc.kill()
236            self._ot_rcp_proc.wait()
237            del self._ot_rcp_proc
238
239    def _shutdown_socat(self):
240        if self._socat_proc is not None:
241            self._socat_proc.stderr.close()
242            self._socat_proc.kill()
243            self._socat_proc.wait()
244            del self._socat_proc
245
246    def bash(self, cmd: str, encoding='ascii') -> List[str]:
247        logging.info("%s $ %s", self, cmd)
248        proc = subprocess.Popen(['docker', 'exec', '-i', self._docker_name, 'bash', '-c', cmd],
249                                stdin=subprocess.DEVNULL,
250                                stdout=subprocess.PIPE,
251                                stderr=sys.stderr,
252                                encoding=encoding)
253
254        with proc:
255
256            lines = []
257
258            while True:
259                line = proc.stdout.readline()
260
261                if not line:
262                    break
263
264                lines.append(line)
265                logging.info("%s $ %r", self, line.rstrip('\r\n'))
266
267            proc.wait()
268
269            if proc.returncode != 0:
270                raise subprocess.CalledProcessError(proc.returncode, cmd, ''.join(lines))
271            else:
272                return lines
273
274    def dns_dig(self, server: str, name: str, qtype: str):
275        """
276        Run dig command to query a DNS server.
277
278        Args:
279            server: the server address.
280            name: the name to query.
281            qtype: the query type (e.g. AAAA, PTR, TXT, SRV).
282
283        Returns:
284            The dig result similar as below:
285            {
286                "opcode": "QUERY",
287                "status": "NOERROR",
288                "id": "64144",
289                "QUESTION": [
290                    ('google.com.', 'IN', 'AAAA')
291                ],
292                "ANSWER": [
293                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::71'),
294                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8a'),
295                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::66'),
296                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8b'),
297                ],
298                "ADDITIONAL": [
299                ],
300            }
301        """
302        output = self.bash(f'dig -6 @{server} \'{name}\' {qtype}', encoding='raw_unicode_escape')
303
304        section = None
305        dig_result = {
306            'QUESTION': [],
307            'ANSWER': [],
308            'ADDITIONAL': [],
309        }
310
311        for line in output:
312            line = line.strip()
313
314            if line.startswith(';; ->>HEADER<<- '):
315                headers = line[len(';; ->>HEADER<<- '):].split(', ')
316                for header in headers:
317                    key, val = header.split(': ')
318                    dig_result[key] = val
319
320                continue
321
322            if line == ';; QUESTION SECTION:':
323                section = 'QUESTION'
324                continue
325            elif line == ';; ANSWER SECTION:':
326                section = 'ANSWER'
327                continue
328            elif line == ';; ADDITIONAL SECTION:':
329                section = 'ADDITIONAL'
330                continue
331            elif section and not line:
332                section = None
333                continue
334
335            if section:
336                assert line
337
338                if section == 'QUESTION':
339                    assert line.startswith(';')
340                    line = line[1:]
341                record = list(line.split())
342
343                if section == 'QUESTION':
344                    if record[2] in ('SRV', 'TXT'):
345                        record[0] = self.__unescape_dns_instance_name(record[0])
346                else:
347                    record[1] = int(record[1])
348                    if record[3] == 'SRV':
349                        record[0] = self.__unescape_dns_instance_name(record[0])
350                        record[4], record[5], record[6] = map(int, [record[4], record[5], record[6]])
351                    elif record[3] == 'TXT':
352                        record[0] = self.__unescape_dns_instance_name(record[0])
353                        record[4:] = [self.__parse_dns_dig_txt(line)]
354                    elif record[3] == 'PTR':
355                        record[4] = self.__unescape_dns_instance_name(record[4])
356
357                dig_result[section].append(tuple(record))
358
359        return dig_result
360
361    @staticmethod
362    def __unescape_dns_instance_name(name: str) -> str:
363        new_name = []
364        i = 0
365        while i < len(name):
366            c = name[i]
367
368            if c == '\\':
369                assert i + 1 < len(name), name
370                if name[i + 1].isdigit():
371                    assert i + 3 < len(name) and name[i + 2].isdigit() and name[i + 3].isdigit(), name
372                    new_name.append(chr(int(name[i + 1:i + 4])))
373                    i += 3
374                else:
375                    new_name.append(name[i + 1])
376                    i += 1
377            else:
378                new_name.append(c)
379
380            i += 1
381
382        return ''.join(new_name)
383
384    def __parse_dns_dig_txt(self, line: str):
385        # Example TXT entry:
386        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
387        txt = {}
388        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
389            if entry == "":
390                continue
391
392            k, v = entry.split('=', 1)
393            txt[k] = v
394
395        return txt
396
397    def _setup_sysctl(self):
398        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra=2')
399        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra_rt_info_max_plen=64')
400
401
402class OtCli:
403    RESET_DELAY = 0.1
404
405    def __init__(self, nodeid, is_mtd=False, version=None, is_bbr=False, **kwargs):
406        self.verbose = int(float(os.getenv('VERBOSE', 0)))
407        self.node_type = os.getenv('NODE_TYPE', 'sim')
408        self.env_version = os.getenv('THREAD_VERSION', '1.1')
409        self.is_bbr = is_bbr
410        self._initialized = False
411        if os.getenv('COVERAGE', 0) and os.getenv('CC', 'gcc') == 'gcc':
412            self._cmd_prefix = '/usr/bin/env GCOV_PREFIX=%s/ot-run/%s/ot-gcda.%d ' % (os.getenv(
413                'top_srcdir', '.'), sys.argv[0], nodeid)
414        else:
415            self._cmd_prefix = ''
416
417        if version is not None:
418            self.version = version
419        else:
420            self.version = self.env_version
421
422        mode = os.environ.get('USE_MTD') == '1' and is_mtd and 'mtd' or 'ftd'
423
424        if self.node_type == 'soc':
425            self.__init_soc(nodeid)
426        elif self.node_type == 'ncp-sim':
427            # TODO use mode after ncp-mtd is available.
428            self.__init_ncp_sim(nodeid, 'ftd')
429        else:
430            self.__init_sim(nodeid, mode)
431
432        if self.verbose:
433            self.pexpect.logfile_read = sys.stdout.buffer
434
435        self._initialized = True
436
437    def __init_sim(self, nodeid, mode):
438        """ Initialize a simulation node. """
439
440        # Default command if no match below, will be overridden if below conditions are met.
441        cmd = './ot-cli-%s' % (mode)
442
443        # For Thread 1.2 MTD node, use ot-cli-mtd build regardless of OT_CLI_PATH
444        if self.version != '1.1' and mode == 'mtd' and 'top_builddir' in os.environ:
445            srcdir = os.environ['top_builddir']
446            cmd = '%s/examples/apps/cli/ot-cli-%s %d' % (srcdir, mode, nodeid)
447
448        # If Thread version of node matches the testing environment version.
449        elif self.version == self.env_version:
450            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
451            # which requires device with Backbone functionality.
452            if self.version != '1.1' and self.is_bbr:
453                if 'OT_CLI_PATH_BBR' in os.environ:
454                    cmd = os.environ['OT_CLI_PATH_BBR']
455                elif 'top_builddir_1_3_bbr' in os.environ:
456                    srcdir = os.environ['top_builddir_1_3_bbr']
457                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
458
459            # Load Thread device of the testing environment version (may be 1.1 or 1.2)
460            else:
461                if 'OT_CLI_PATH' in os.environ:
462                    cmd = os.environ['OT_CLI_PATH']
463                elif 'top_builddir' in os.environ:
464                    srcdir = os.environ['top_builddir']
465                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
466
467            if 'RADIO_DEVICE' in os.environ:
468                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
469                                                                                           nodeid)
470                self.is_posix = True
471            else:
472                cmd += ' %d' % nodeid
473
474        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability
475        elif self.version == '1.1':
476            # Posix app
477            if 'OT_CLI_PATH_1_1' in os.environ:
478                cmd = os.environ['OT_CLI_PATH_1_1']
479            elif 'top_builddir_1_1' in os.environ:
480                srcdir = os.environ['top_builddir_1_1']
481                cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
482
483            if 'RADIO_DEVICE_1_1' in os.environ:
484                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (
485                    os.environ['RADIO_DEVICE_1_1'], nodeid)
486                self.is_posix = True
487            else:
488                cmd += ' %d' % nodeid
489
490        print("%s" % cmd)
491
492        self.pexpect = pexpect.popen_spawn.PopenSpawn(self._cmd_prefix + cmd, timeout=10)
493
494        # Add delay to ensure that the process is ready to receive commands.
495        timeout = 0.4
496        while timeout > 0:
497            self.pexpect.send('\r\n')
498            try:
499                self.pexpect.expect('> ', timeout=0.1)
500                break
501            except pexpect.TIMEOUT:
502                timeout -= 0.1
503
504    def __init_ncp_sim(self, nodeid, mode):
505        """ Initialize an NCP simulation node. """
506
507        # Default command if no match below, will be overridden if below conditions are met.
508        cmd = 'spinel-cli.py -p ./ot-ncp-%s -n' % mode
509
510        # If Thread version of node matches the testing environment version.
511        if self.version == self.env_version:
512            if 'RADIO_DEVICE' in os.environ:
513                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
514                                                                                        nodeid)
515                self.is_posix = True
516            else:
517                args = ''
518
519            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
520            # which requires device with Backbone functionality.
521            if self.version != '1.1' and self.is_bbr:
522                if 'OT_NCP_PATH_1_3_BBR' in os.environ:
523                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
524                        os.environ['OT_NCP_PATH_1_3_BBR'],
525                        args,
526                    )
527                elif 'top_builddir_1_3_bbr' in os.environ:
528                    srcdir = os.environ['top_builddir_1_3_bbr']
529                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
530                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
531                        cmd,
532                        args,
533                    )
534
535            # Load Thread device of the testing environment version (may be 1.1 or 1.2).
536            else:
537                if 'OT_NCP_PATH' in os.environ:
538                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
539                        os.environ['OT_NCP_PATH'],
540                        args,
541                    )
542                elif 'top_builddir' in os.environ:
543                    srcdir = os.environ['top_builddir']
544                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
545                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
546                        cmd,
547                        args,
548                    )
549
550        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability.
551        elif self.version == '1.1':
552            if 'RADIO_DEVICE_1_1' in os.environ:
553                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE_1_1'],
554                                                                                        nodeid)
555                self.is_posix = True
556            else:
557                args = ''
558
559            if 'OT_NCP_PATH_1_1' in os.environ:
560                cmd = 'spinel-cli.py -p "%s%s" -n' % (
561                    os.environ['OT_NCP_PATH_1_1'],
562                    args,
563                )
564            elif 'top_builddir_1_1' in os.environ:
565                srcdir = os.environ['top_builddir_1_1']
566                cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
567                cmd = 'spinel-cli.py -p "%s%s" -n' % (
568                    cmd,
569                    args,
570                )
571
572        cmd += ' %d' % nodeid
573        print("%s" % cmd)
574
575        self.pexpect = pexpect.spawn(self._cmd_prefix + cmd, timeout=10)
576
577        # Add delay to ensure that the process is ready to receive commands.
578        time.sleep(0.2)
579        self._expect('spinel-cli >')
580        self.debug(int(os.getenv('DEBUG', '0')))
581
582    def __init_soc(self, nodeid):
583        """ Initialize a System-on-a-chip node connected via UART. """
584        import fdpexpect
585
586        serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)
587        self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY))
588
589    def destroy(self):
590        if not self._initialized:
591            return
592
593        if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or
594                not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()):
595            print("%d: exit" % self.nodeid)
596            self.pexpect.send('exit\n')
597            self.pexpect.expect(pexpect.EOF)
598            self.pexpect.wait()
599            self._initialized = False
600
601
602class NodeImpl:
603    is_host = False
604    is_otbr = False
605
606    def __init__(self, nodeid, name=None, simulator=None, **kwargs):
607        self.nodeid = nodeid
608        self.name = name or ('Node%d' % nodeid)
609        self.is_posix = False
610
611        self.simulator = simulator
612        if self.simulator:
613            self.simulator.add_node(self)
614
615        super().__init__(nodeid, **kwargs)
616
617        self.set_mesh_local_prefix(config.MESH_LOCAL_PREFIX)
618        self.set_addr64('%016x' % (thread_cert.EXTENDED_ADDRESS_BASE + nodeid))
619
620    def _expect(self, pattern, timeout=-1, *args, **kwargs):
621        """ Process simulator events until expected the pattern. """
622        if timeout == -1:
623            timeout = self.pexpect.timeout
624
625        assert timeout > 0
626
627        while timeout > 0:
628            try:
629                return self.pexpect.expect(pattern, 0.1, *args, **kwargs)
630            except pexpect.TIMEOUT:
631                timeout -= 0.1
632                self.simulator.go(0)
633                if timeout <= 0:
634                    raise
635
636    def _expect_done(self, timeout=-1):
637        self._expect('Done', timeout)
638
639    def _expect_result(self, pattern, *args, **kwargs):
640        """Expect a single matching result.
641
642        The arguments are identical to pexpect.expect().
643
644        Returns:
645            The matched line.
646        """
647        results = self._expect_results(pattern, *args, **kwargs)
648        assert len(results) == 1, results
649        return results[0]
650
651    def _expect_results(self, pattern, *args, **kwargs):
652        """Expect multiple matching results.
653
654        The arguments are identical to pexpect.expect().
655
656        Returns:
657            The matched lines.
658        """
659        output = self._expect_command_output()
660        results = [line for line in output if self._match_pattern(line, pattern)]
661        return results
662
663    @staticmethod
664    def _match_pattern(line, pattern):
665        if isinstance(pattern, str):
666            pattern = re.compile(pattern)
667
668        if isinstance(pattern, typing.Pattern):
669            return pattern.match(line)
670        else:
671            return any(NodeImpl._match_pattern(line, p) for p in pattern)
672
673    def _expect_command_output(self, ignore_logs=True):
674        lines = []
675
676        while True:
677            line = self.__readline(ignore_logs=ignore_logs)
678
679            if line == 'Done':
680                break
681            elif line.startswith('Error '):
682                raise Exception(line)
683            else:
684                lines.append(line)
685
686        print(f'_expect_command_output() returns {lines!r}')
687        return lines
688
689    def __is_logging_line(self, line: str) -> bool:
690        return len(line) >= 3 and line[:3] in {'[D]', '[I]', '[N]', '[W]', '[C]', '[-]'}
691
692    def read_cert_messages_in_commissioning_log(self, timeout=-1):
693        """Get the log of the traffic after DTLS handshake.
694        """
695        format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}"
696        join_fin_req = format_str % br"JOIN_FIN\.req"
697        join_fin_rsp = format_str % br"JOIN_FIN\.rsp"
698        dummy_format_str = br"\[THCI\].*?type=%s.*?"
699        join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf"
700        join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp"
701        pattern = (b"(" + join_fin_req + b")|(" + join_fin_rsp + b")|(" + join_ent_ntf + b")|(" + join_ent_rsp + b")")
702
703        messages = []
704        # There are at most 4 cert messages both for joiner and commissioner
705        for _ in range(0, 4):
706            try:
707                self._expect(pattern, timeout=timeout)
708                log = self.pexpect.match.group(0)
709                messages.append(self._extract_cert_message(log))
710            except BaseException:
711                break
712        return messages
713
714    def _extract_cert_message(self, log):
715        res = re.search(br"direction=\w+", log)
716        assert res
717        direction = res.group(0).split(b'=')[1].strip()
718
719        res = re.search(br"type=\S+", log)
720        assert res
721        type = res.group(0).split(b'=')[1].strip()
722
723        payload = bytearray([])
724        payload_len = 0
725        if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]:
726            res = re.search(br"len=\d+", log)
727            assert res
728            payload_len = int(res.group(0).split(b'=')[1].strip())
729
730            hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|"
731            while True:
732                res = re.search(hex_pattern, log)
733                if not res:
734                    break
735                data = [int(hex, 16) for hex in res.group(0)[1:-1].split(b' ') if hex and hex != b'..']
736                payload += bytearray(data)
737                log = log[res.end() - 1:]
738        assert len(payload) == payload_len
739        return (direction, type, payload)
740
741    def send_command(self, cmd, go=True, expect_command_echo=True):
742        print("%d: %s" % (self.nodeid, cmd))
743        self.pexpect.send(cmd + '\n')
744        if go:
745            self.simulator.go(0, nodeid=self.nodeid)
746        sys.stdout.flush()
747
748        if expect_command_echo:
749            self._expect_command_echo(cmd)
750
751    def _expect_command_echo(self, cmd):
752        cmd = cmd.strip()
753        while True:
754            line = self.__readline()
755            if line == cmd:
756                break
757
758            logging.warning("expecting echo %r, but read %r", cmd, line)
759
760    def __readline(self, ignore_logs=True):
761        PROMPT = 'spinel-cli > ' if self.node_type == 'ncp-sim' else '> '
762        while True:
763            self._expect(r"[^\n]+\n")
764            line = self.pexpect.match.group(0).decode('utf8').strip()
765            while line.startswith(PROMPT):
766                line = line[len(PROMPT):]
767
768            if line == '':
769                continue
770
771            if ignore_logs and self.__is_logging_line(line):
772                continue
773
774            return line
775
776    def get_commands(self):
777        self.send_command('?')
778        self._expect('Commands:')
779        return self._expect_results(r'\S+')
780
781    def set_mode(self, mode):
782        cmd = 'mode %s' % mode
783        self.send_command(cmd)
784        self._expect_done()
785
786    def debug(self, level):
787        # `debug` command will not trigger interaction with simulator
788        self.send_command('debug %d' % level, go=False)
789
790    def start(self):
791        self.interface_up()
792        self.thread_start()
793
794    def stop(self):
795        self.thread_stop()
796        self.interface_down()
797
798    def set_log_level(self, level: int):
799        self.send_command(f'log level {level}')
800        self._expect_done()
801
802    def interface_up(self):
803        self.send_command('ifconfig up')
804        self._expect_done()
805
806    def interface_down(self):
807        self.send_command('ifconfig down')
808        self._expect_done()
809
810    def thread_start(self):
811        self.send_command('thread start')
812        self._expect_done()
813
814    def thread_stop(self):
815        self.send_command('thread stop')
816        self._expect_done()
817
818    def detach(self, is_async=False):
819        cmd = 'detach'
820        if is_async:
821            cmd += ' async'
822
823        self.send_command(cmd)
824
825        if is_async:
826            self._expect_done()
827            return
828
829        end = self.simulator.now() + 4
830        while True:
831            self.simulator.go(1)
832            try:
833                self._expect_done(timeout=0.1)
834                return
835            except (pexpect.TIMEOUT, socket.timeout):
836                if self.simulator.now() > end:
837                    raise
838
839    def expect_finished_detaching(self):
840        self._expect('Finished detaching')
841
842    def commissioner_start(self):
843        cmd = 'commissioner start'
844        self.send_command(cmd)
845        self._expect_done()
846
847    def commissioner_stop(self):
848        cmd = 'commissioner stop'
849        self.send_command(cmd)
850        self._expect_done()
851
852    def commissioner_state(self):
853        states = [r'disabled', r'petitioning', r'active']
854        self.send_command('commissioner state')
855        return self._expect_result(states)
856
857    def commissioner_add_joiner(self, addr, psk):
858        cmd = 'commissioner joiner add %s %s' % (addr, psk)
859        self.send_command(cmd)
860        self._expect_done()
861
862    def commissioner_set_provisioning_url(self, provisioning_url=''):
863        cmd = 'commissioner provisioningurl %s' % provisioning_url
864        self.send_command(cmd)
865        self._expect_done()
866
867    def joiner_start(self, pskd='', provisioning_url=''):
868        cmd = 'joiner start %s %s' % (pskd, provisioning_url)
869        self.send_command(cmd)
870        self._expect_done()
871
872    def clear_allowlist(self):
873        cmd = 'macfilter addr clear'
874        self.send_command(cmd)
875        self._expect_done()
876
877    def enable_allowlist(self):
878        cmd = 'macfilter addr allowlist'
879        self.send_command(cmd)
880        self._expect_done()
881
882    def disable_allowlist(self):
883        cmd = 'macfilter addr disable'
884        self.send_command(cmd)
885        self._expect_done()
886
887    def add_allowlist(self, addr, rssi=None):
888        cmd = 'macfilter addr add %s' % addr
889
890        if rssi is not None:
891            cmd += ' %s' % rssi
892
893        self.send_command(cmd)
894        self._expect_done()
895
896    def radiofilter_is_enabled(self) -> bool:
897        states = [r'Disabled', r'Enabled']
898        self.send_command('radiofilter')
899        return self._expect_result(states) == 'Enabled'
900
901    def radiofilter_enable(self):
902        cmd = 'radiofilter enable'
903        self.send_command(cmd)
904        self._expect_done()
905
906    def radiofilter_disable(self):
907        cmd = 'radiofilter disable'
908        self.send_command(cmd)
909        self._expect_done()
910
911    def get_bbr_registration_jitter(self):
912        self.send_command('bbr jitter')
913        return int(self._expect_result(r'\d+'))
914
915    def set_bbr_registration_jitter(self, jitter):
916        cmd = 'bbr jitter %d' % jitter
917        self.send_command(cmd)
918        self._expect_done()
919
920    def get_rcp_version(self) -> str:
921        self.send_command('rcp version')
922        rcp_version = self._expect_command_output()[0].strip()
923        return rcp_version
924
925    def srp_server_get_state(self):
926        states = ['disabled', 'running', 'stopped']
927        self.send_command('srp server state')
928        return self._expect_result(states)
929
930    def srp_server_get_addr_mode(self):
931        modes = [r'unicast', r'anycast']
932        self.send_command(f'srp server addrmode')
933        return self._expect_result(modes)
934
935    def srp_server_set_addr_mode(self, mode):
936        self.send_command(f'srp server addrmode {mode}')
937        self._expect_done()
938
939    def srp_server_get_anycast_seq_num(self):
940        self.send_command(f'srp server seqnum')
941        return int(self._expect_result(r'\d+'))
942
943    def srp_server_set_anycast_seq_num(self, seqnum):
944        self.send_command(f'srp server seqnum {seqnum}')
945        self._expect_done()
946
947    def srp_server_set_enabled(self, enable):
948        cmd = f'srp server {"enable" if enable else "disable"}'
949        self.send_command(cmd)
950        self._expect_done()
951
952    def srp_server_set_lease_range(self, min_lease, max_lease, min_key_lease, max_key_lease):
953        self.send_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
954        self._expect_done()
955
956    def srp_server_set_ttl_range(self, min_ttl, max_ttl):
957        self.send_command(f'srp server ttl {min_ttl} {max_ttl}')
958        self._expect_done()
959
960    def srp_server_get_hosts(self):
961        """Returns the host list on the SRP server as a list of property
962           dictionary.
963
964           Example output:
965           [{
966               'fullname': 'my-host.default.service.arpa.',
967               'name': 'my-host',
968               'deleted': 'false',
969               'addresses': ['2001::1', '2001::2']
970           }]
971        """
972
973        cmd = 'srp server host'
974        self.send_command(cmd)
975        lines = self._expect_command_output()
976        host_list = []
977        while lines:
978            host = {}
979
980            host['fullname'] = lines.pop(0).strip()
981            host['name'] = host['fullname'].split('.')[0]
982
983            host['deleted'] = lines.pop(0).strip().split(':')[1].strip()
984            if host['deleted'] == 'true':
985                host_list.append(host)
986                continue
987
988            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
989            map(str.strip, addresses)
990            host['addresses'] = [addr for addr in addresses if addr]
991
992            host_list.append(host)
993
994        return host_list
995
996    def srp_server_get_host(self, host_name):
997        """Returns host on the SRP server that matches given host name.
998
999           Example usage:
1000           self.srp_server_get_host("my-host")
1001        """
1002
1003        for host in self.srp_server_get_hosts():
1004            if host_name == host['name']:
1005                return host
1006
1007    def srp_server_get_services(self):
1008        """Returns the service list on the SRP server as a list of property
1009           dictionary.
1010
1011           Example output:
1012           [{
1013               'fullname': 'my-service._ipps._tcp.default.service.arpa.',
1014               'instance': 'my-service',
1015               'name': '_ipps._tcp',
1016               'deleted': 'false',
1017               'port': '12345',
1018               'priority': '0',
1019               'weight': '0',
1020               'ttl': '7200',
1021               'TXT': ['abc=010203'],
1022               'host_fullname': 'my-host.default.service.arpa.',
1023               'host': 'my-host',
1024               'addresses': ['2001::1', '2001::2']
1025           }]
1026
1027           Note that the TXT data is output as a HEX string.
1028        """
1029
1030        cmd = 'srp server service'
1031        self.send_command(cmd)
1032        lines = self._expect_command_output()
1033        service_list = []
1034        while lines:
1035            service = {}
1036
1037            service['fullname'] = lines.pop(0).strip()
1038            name_labels = service['fullname'].split('.')
1039            service['instance'] = name_labels[0]
1040            service['name'] = '.'.join(name_labels[1:3])
1041
1042            service['deleted'] = lines.pop(0).strip().split(':')[1].strip()
1043            if service['deleted'] == 'true':
1044                service_list.append(service)
1045                continue
1046
1047            # 'subtypes', port', 'priority', 'weight', 'ttl'
1048            for i in range(0, 5):
1049                key_value = lines.pop(0).strip().split(':')
1050                service[key_value[0].strip()] = key_value[1].strip()
1051
1052            txt_entries = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1053            txt_entries = map(str.strip, txt_entries)
1054            service['TXT'] = [txt for txt in txt_entries if txt]
1055
1056            service['host_fullname'] = lines.pop(0).strip().split(':')[1].strip()
1057            service['host'] = service['host_fullname'].split('.')[0]
1058
1059            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1060            addresses = map(str.strip, addresses)
1061            service['addresses'] = [addr for addr in addresses if addr]
1062
1063            service_list.append(service)
1064
1065        return service_list
1066
1067    def srp_server_get_service(self, instance_name, service_name):
1068        """Returns service on the SRP server that matches given instance
1069           name and service name.
1070
1071           Example usage:
1072           self.srp_server_get_service("my-service", "_ipps._tcp")
1073        """
1074
1075        for service in self.srp_server_get_services():
1076            if (instance_name == service['instance'] and service_name == service['name']):
1077                return service
1078
1079    def get_srp_server_port(self):
1080        """Returns the SRP server UDP port by parsing
1081           the SRP Server Data in Network Data.
1082        """
1083
1084        for service in self.get_services():
1085            # TODO: for now, we are using 0xfd as the SRP service data.
1086            #       May use a dedicated bit flag for SRP server.
1087            if int(service[1], 16) == 0x5d:
1088                # The SRP server data contains IPv6 address (16 bytes)
1089                # followed by UDP port number.
1090                return int(service[2][2 * 16:], 16)
1091
1092    def srp_client_start(self, server_address, server_port):
1093        self.send_command(f'srp client start {server_address} {server_port}')
1094        self._expect_done()
1095
1096    def srp_client_stop(self):
1097        self.send_command(f'srp client stop')
1098        self._expect_done()
1099
1100    def srp_client_get_state(self):
1101        cmd = 'srp client state'
1102        self.send_command(cmd)
1103        return self._expect_command_output()[0]
1104
1105    def srp_client_get_auto_start_mode(self):
1106        cmd = 'srp client autostart'
1107        self.send_command(cmd)
1108        return self._expect_command_output()[0]
1109
1110    def srp_client_enable_auto_start_mode(self):
1111        self.send_command(f'srp client autostart enable')
1112        self._expect_done()
1113
1114    def srp_client_disable_auto_start_mode(self):
1115        self.send_command(f'srp client autostart able')
1116        self._expect_done()
1117
1118    def srp_client_get_server_address(self):
1119        cmd = 'srp client server address'
1120        self.send_command(cmd)
1121        return self._expect_command_output()[0]
1122
1123    def srp_client_get_server_port(self):
1124        cmd = 'srp client server port'
1125        self.send_command(cmd)
1126        return int(self._expect_command_output()[0])
1127
1128    def srp_client_get_host_state(self):
1129        cmd = 'srp client host state'
1130        self.send_command(cmd)
1131        return self._expect_command_output()[0]
1132
1133    def srp_client_set_host_name(self, name):
1134        self.send_command(f'srp client host name {name}')
1135        self._expect_done()
1136
1137    def srp_client_get_host_name(self):
1138        self.send_command(f'srp client host name')
1139        self._expect_done()
1140
1141    def srp_client_remove_host(self, remove_key=False, send_unreg_to_server=False):
1142        self.send_command(f'srp client host remove {int(remove_key)} {int(send_unreg_to_server)}')
1143        self._expect_done()
1144
1145    def srp_client_clear_host(self):
1146        self.send_command(f'srp client host clear')
1147        self._expect_done()
1148
1149    def srp_client_enable_auto_host_address(self):
1150        self.send_command(f'srp client host address auto')
1151        self._expect_done()
1152
1153    def srp_client_set_host_address(self, *addrs: str):
1154        self.send_command(f'srp client host address {" ".join(addrs)}')
1155        self._expect_done()
1156
1157    def srp_client_get_host_address(self):
1158        self.send_command(f'srp client host address')
1159        self._expect_done()
1160
1161    def srp_client_add_service(self, instance_name, service_name, port, priority=0, weight=0, txt_entries=[]):
1162        txt_record = "".join(self._encode_txt_entry(entry) for entry in txt_entries)
1163        instance_name = self._escape_escapable(instance_name)
1164        self.send_command(
1165            f'srp client service add {instance_name} {service_name} {port} {priority} {weight} {txt_record}')
1166        self._expect_done()
1167
1168    def srp_client_remove_service(self, instance_name, service_name):
1169        self.send_command(f'srp client service remove {instance_name} {service_name}')
1170        self._expect_done()
1171
1172    def srp_client_clear_service(self, instance_name, service_name):
1173        self.send_command(f'srp client service clear {instance_name} {service_name}')
1174        self._expect_done()
1175
1176    def srp_client_get_services(self):
1177        cmd = 'srp client service'
1178        self.send_command(cmd)
1179        service_lines = self._expect_command_output()
1180        return [self._parse_srp_client_service(line) for line in service_lines]
1181
1182    def srp_client_set_lease_interval(self, leaseinterval: int):
1183        cmd = f'srp client leaseinterval {leaseinterval}'
1184        self.send_command(cmd)
1185        self._expect_done()
1186
1187    def srp_client_get_lease_interval(self) -> int:
1188        cmd = 'srp client leaseinterval'
1189        self.send_command(cmd)
1190        return int(self._expect_result('\d+'))
1191
1192    def srp_client_set_ttl(self, ttl: int):
1193        cmd = f'srp client ttl {ttl}'
1194        self.send_command(cmd)
1195        self._expect_done()
1196
1197    def srp_client_get_ttl(self) -> int:
1198        cmd = 'srp client ttl'
1199        self.send_command(cmd)
1200        return int(self._expect_result('\d+'))
1201
1202    #
1203    # TREL utilities
1204    #
1205
1206    def get_trel_state(self) -> Union[None, bool]:
1207        states = [r'Disabled', r'Enabled']
1208        self.send_command('trel')
1209        try:
1210            return self._expect_result(states) == 'Enabled'
1211        except Exception as ex:
1212            if 'InvalidCommand' in str(ex):
1213                return None
1214
1215            raise
1216
1217    def _encode_txt_entry(self, entry):
1218        """Encodes the TXT entry to the DNS-SD TXT record format as a HEX string.
1219
1220           Example usage:
1221           self._encode_txt_entries(['abc'])     -> '03616263'
1222           self._encode_txt_entries(['def='])    -> '046465663d'
1223           self._encode_txt_entries(['xyz=XYZ']) -> '0778797a3d58595a'
1224        """
1225        return '{:02x}'.format(len(entry)) + "".join("{:02x}".format(ord(c)) for c in entry)
1226
1227    def _parse_srp_client_service(self, line: str):
1228        """Parse one line of srp service list into a dictionary which
1229           maps string keys to string values.
1230
1231           Example output for input
1232           'instance:\"%s\", name:\"%s\", state:%s, port:%d, priority:%d, weight:%d"'
1233           {
1234               'instance': 'my-service',
1235               'name': '_ipps._udp',
1236               'state': 'ToAdd',
1237               'port': '12345',
1238               'priority': '0',
1239               'weight': '0'
1240           }
1241
1242           Note that value of 'port', 'priority' and 'weight' are represented
1243           as strings but not integers.
1244        """
1245        key_values = [word.strip().split(':') for word in line.split(', ')]
1246        keys = [key_value[0] for key_value in key_values]
1247        values = [key_value[1].strip('"') for key_value in key_values]
1248        return dict(zip(keys, values))
1249
1250    def locate(self, anycast_addr):
1251        cmd = 'locate ' + anycast_addr
1252        self.send_command(cmd)
1253        self.simulator.go(5)
1254        return self._parse_locate_result(self._expect_command_output()[0])
1255
1256    def _parse_locate_result(self, line: str):
1257        """Parse anycast locate result as list of ml-eid and rloc16.
1258
1259           Example output for input
1260           'fd00:db8:0:0:acf9:9d0:7f3c:b06e 0xa800'
1261
1262           [ 'fd00:db8:0:0:acf9:9d0:7f3c:b06e', '0xa800' ]
1263        """
1264        return line.split(' ')
1265
1266    def enable_backbone_router(self):
1267        cmd = 'bbr enable'
1268        self.send_command(cmd)
1269        self._expect_done()
1270
1271    def disable_backbone_router(self):
1272        cmd = 'bbr disable'
1273        self.send_command(cmd)
1274        self._expect_done()
1275
1276    def register_backbone_router(self):
1277        cmd = 'bbr register'
1278        self.send_command(cmd)
1279        self._expect_done()
1280
1281    def get_backbone_router_state(self):
1282        states = [r'Disabled', r'Primary', r'Secondary']
1283        self.send_command('bbr state')
1284        return self._expect_result(states)
1285
1286    @property
1287    def is_primary_backbone_router(self) -> bool:
1288        return self.get_backbone_router_state() == 'Primary'
1289
1290    def get_backbone_router(self):
1291        cmd = 'bbr config'
1292        self.send_command(cmd)
1293        self._expect(r'(.*)Done')
1294        g = self.pexpect.match.groups()
1295        output = g[0].decode("utf-8")
1296        lines = output.strip().split('\n')
1297        lines = [l.strip() for l in lines]
1298        ret = {}
1299        for l in lines:
1300            z = re.search(r'seqno:\s+([0-9]+)', l)
1301            if z:
1302                ret['seqno'] = int(z.groups()[0])
1303
1304            z = re.search(r'delay:\s+([0-9]+)', l)
1305            if z:
1306                ret['delay'] = int(z.groups()[0])
1307
1308            z = re.search(r'timeout:\s+([0-9]+)', l)
1309            if z:
1310                ret['timeout'] = int(z.groups()[0])
1311
1312        return ret
1313
1314    def set_backbone_router(self, seqno=None, reg_delay=None, mlr_timeout=None):
1315        cmd = 'bbr config'
1316
1317        if seqno is not None:
1318            cmd += ' seqno %d' % seqno
1319
1320        if reg_delay is not None:
1321            cmd += ' delay %d' % reg_delay
1322
1323        if mlr_timeout is not None:
1324            cmd += ' timeout %d' % mlr_timeout
1325
1326        self.send_command(cmd)
1327        self._expect_done()
1328
1329    def set_domain_prefix(self, prefix, flags='prosD'):
1330        self.add_prefix(prefix, flags)
1331        self.register_netdata()
1332
1333    def remove_domain_prefix(self, prefix):
1334        self.remove_prefix(prefix)
1335        self.register_netdata()
1336
1337    def set_next_dua_response(self, status: Union[str, int], iid=None):
1338        # Convert 5.00 to COAP CODE 160
1339        if isinstance(status, str):
1340            assert '.' in status
1341            status = status.split('.')
1342            status = (int(status[0]) << 5) + int(status[1])
1343
1344        cmd = 'bbr mgmt dua {}'.format(status)
1345        if iid is not None:
1346            cmd += ' ' + str(iid)
1347        self.send_command(cmd)
1348        self._expect_done()
1349
1350    def set_dua_iid(self, iid: str):
1351        assert len(iid) == 16
1352        int(iid, 16)
1353
1354        cmd = 'dua iid {}'.format(iid)
1355        self.send_command(cmd)
1356        self._expect_done()
1357
1358    def clear_dua_iid(self):
1359        cmd = 'dua iid clear'
1360        self.send_command(cmd)
1361        self._expect_done()
1362
1363    def multicast_listener_list(self) -> Dict[IPv6Address, int]:
1364        cmd = 'bbr mgmt mlr listener'
1365        self.send_command(cmd)
1366
1367        table = {}
1368        for line in self._expect_results("\S+ \d+"):
1369            line = line.split()
1370            assert len(line) == 2, line
1371            ip = IPv6Address(line[0])
1372            timeout = int(line[1])
1373            assert ip not in table
1374
1375            table[ip] = timeout
1376
1377        return table
1378
1379    def multicast_listener_clear(self):
1380        cmd = f'bbr mgmt mlr listener clear'
1381        self.send_command(cmd)
1382        self._expect_done()
1383
1384    def multicast_listener_add(self, ip: Union[IPv6Address, str], timeout: int = 0):
1385        if not isinstance(ip, IPv6Address):
1386            ip = IPv6Address(ip)
1387
1388        cmd = f'bbr mgmt mlr listener add {ip.compressed} {timeout}'
1389        self.send_command(cmd)
1390        self._expect(r"(Done|Error .*)")
1391
1392    def set_next_mlr_response(self, status: int):
1393        cmd = 'bbr mgmt mlr response {}'.format(status)
1394        self.send_command(cmd)
1395        self._expect_done()
1396
1397    def register_multicast_listener(self, *ipaddrs: Union[IPv6Address, str], timeout=None):
1398        assert len(ipaddrs) > 0, ipaddrs
1399
1400        ipaddrs = map(str, ipaddrs)
1401        cmd = f'mlr reg {" ".join(ipaddrs)}'
1402        if timeout is not None:
1403            cmd += f' {int(timeout)}'
1404        self.send_command(cmd)
1405        self.simulator.go(3)
1406        lines = self._expect_command_output()
1407        m = re.match(r'status (\d+), (\d+) failed', lines[0])
1408        assert m is not None, lines
1409        status = int(m.group(1))
1410        failed_num = int(m.group(2))
1411        assert failed_num == len(lines) - 1
1412        failed_ips = list(map(IPv6Address, lines[1:]))
1413        print(f"register_multicast_listener {ipaddrs} => status: {status}, failed ips: {failed_ips}")
1414        return status, failed_ips
1415
1416    def set_link_quality(self, addr, lqi):
1417        cmd = 'macfilter rss add-lqi %s %s' % (addr, lqi)
1418        self.send_command(cmd)
1419        self._expect_done()
1420
1421    def set_outbound_link_quality(self, lqi):
1422        cmd = 'macfilter rss add-lqi * %s' % (lqi)
1423        self.send_command(cmd)
1424        self._expect_done()
1425
1426    def remove_allowlist(self, addr):
1427        cmd = 'macfilter addr remove %s' % addr
1428        self.send_command(cmd)
1429        self._expect_done()
1430
1431    def get_addr16(self):
1432        self.send_command('rloc16')
1433        rloc16 = self._expect_result(r'[0-9a-fA-F]{4}')
1434        return int(rloc16, 16)
1435
1436    def get_router_id(self):
1437        rloc16 = self.get_addr16()
1438        return rloc16 >> 10
1439
1440    def get_addr64(self):
1441        self.send_command('extaddr')
1442        return self._expect_result('[0-9a-fA-F]{16}')
1443
1444    def set_addr64(self, addr64: str):
1445        # Make sure `addr64` is a hex string of length 16
1446        assert len(addr64) == 16
1447        int(addr64, 16)
1448        self.send_command('extaddr %s' % addr64)
1449        self._expect_done()
1450
1451    def get_eui64(self):
1452        self.send_command('eui64')
1453        return self._expect_result('[0-9a-fA-F]{16}')
1454
1455    def set_extpanid(self, extpanid):
1456        self.send_command('extpanid %s' % extpanid)
1457        self._expect_done()
1458
1459    def get_extpanid(self):
1460        self.send_command('extpanid')
1461        return self._expect_result('[0-9a-fA-F]{16}')
1462
1463    def get_mesh_local_prefix(self):
1464        self.send_command('prefix meshlocal')
1465        return self._expect_command_output()[0]
1466
1467    def set_mesh_local_prefix(self, mesh_local_prefix):
1468        self.send_command('prefix meshlocal %s' % mesh_local_prefix)
1469        self._expect_done()
1470
1471    def get_joiner_id(self):
1472        self.send_command('joiner id')
1473        return self._expect_result('[0-9a-fA-F]{16}')
1474
1475    def get_channel(self):
1476        self.send_command('channel')
1477        return int(self._expect_result(r'\d+'))
1478
1479    def set_channel(self, channel):
1480        cmd = 'channel %d' % channel
1481        self.send_command(cmd)
1482        self._expect_done()
1483
1484    def get_networkkey(self):
1485        self.send_command('networkkey')
1486        return self._expect_result('[0-9a-fA-F]{32}')
1487
1488    def set_networkkey(self, networkkey):
1489        cmd = 'networkkey %s' % networkkey
1490        self.send_command(cmd)
1491        self._expect_done()
1492
1493    def get_key_sequence_counter(self):
1494        self.send_command('keysequence counter')
1495        result = self._expect_result(r'\d+')
1496        return int(result)
1497
1498    def set_key_sequence_counter(self, key_sequence_counter):
1499        cmd = 'keysequence counter %d' % key_sequence_counter
1500        self.send_command(cmd)
1501        self._expect_done()
1502
1503    def set_key_switch_guardtime(self, key_switch_guardtime):
1504        cmd = 'keysequence guardtime %d' % key_switch_guardtime
1505        self.send_command(cmd)
1506        self._expect_done()
1507
1508    def set_network_id_timeout(self, network_id_timeout):
1509        cmd = 'networkidtimeout %d' % network_id_timeout
1510        self.send_command(cmd)
1511        self._expect_done()
1512
1513    def _escape_escapable(self, string):
1514        """Escape CLI escapable characters in the given string.
1515
1516        Args:
1517            string (str): UTF-8 input string.
1518
1519        Returns:
1520            [str]: The modified string with escaped characters.
1521        """
1522        escapable_chars = '\\ \t\r\n'
1523        for char in escapable_chars:
1524            string = string.replace(char, '\\%s' % char)
1525        return string
1526
1527    def get_network_name(self):
1528        self.send_command('networkname')
1529        return self._expect_result([r'\S+'])
1530
1531    def set_network_name(self, network_name):
1532        cmd = 'networkname %s' % self._escape_escapable(network_name)
1533        self.send_command(cmd)
1534        self._expect_done()
1535
1536    def get_panid(self):
1537        self.send_command('panid')
1538        result = self._expect_result('0x[0-9a-fA-F]{4}')
1539        return int(result, 16)
1540
1541    def set_panid(self, panid=config.PANID):
1542        cmd = 'panid %d' % panid
1543        self.send_command(cmd)
1544        self._expect_done()
1545
1546    def set_parent_priority(self, priority):
1547        cmd = 'parentpriority %d' % priority
1548        self.send_command(cmd)
1549        self._expect_done()
1550
1551    def get_partition_id(self):
1552        self.send_command('partitionid')
1553        return self._expect_result(r'\d+')
1554
1555    def get_preferred_partition_id(self):
1556        self.send_command('partitionid preferred')
1557        return self._expect_result(r'\d+')
1558
1559    def set_preferred_partition_id(self, partition_id):
1560        cmd = 'partitionid preferred %d' % partition_id
1561        self.send_command(cmd)
1562        self._expect_done()
1563
1564    def get_pollperiod(self):
1565        self.send_command('pollperiod')
1566        return self._expect_result(r'\d+')
1567
1568    def set_pollperiod(self, pollperiod):
1569        self.send_command('pollperiod %d' % pollperiod)
1570        self._expect_done()
1571
1572    def get_csl_info(self):
1573        self.send_command('csl')
1574        self._expect_done()
1575
1576    def set_csl_channel(self, csl_channel):
1577        self.send_command('csl channel %d' % csl_channel)
1578        self._expect_done()
1579
1580    def set_csl_period(self, csl_period):
1581        self.send_command('csl period %d' % csl_period)
1582        self._expect_done()
1583
1584    def set_csl_timeout(self, csl_timeout):
1585        self.send_command('csl timeout %d' % csl_timeout)
1586        self._expect_done()
1587
1588    def send_mac_emptydata(self):
1589        self.send_command('mac send emptydata')
1590        self._expect_done()
1591
1592    def send_mac_datarequest(self):
1593        self.send_command('mac send datarequest')
1594        self._expect_done()
1595
1596    def set_router_upgrade_threshold(self, threshold):
1597        cmd = 'routerupgradethreshold %d' % threshold
1598        self.send_command(cmd)
1599        self._expect_done()
1600
1601    def set_router_downgrade_threshold(self, threshold):
1602        cmd = 'routerdowngradethreshold %d' % threshold
1603        self.send_command(cmd)
1604        self._expect_done()
1605
1606    def get_router_downgrade_threshold(self) -> int:
1607        self.send_command('routerdowngradethreshold')
1608        return int(self._expect_result(r'\d+'))
1609
1610    def set_router_eligible(self, enable: bool):
1611        cmd = f'routereligible {"enable" if enable else "disable"}'
1612        self.send_command(cmd)
1613        self._expect_done()
1614
1615    def get_router_eligible(self) -> bool:
1616        states = [r'Disabled', r'Enabled']
1617        self.send_command('routereligible')
1618        return self._expect_result(states) == 'Enabled'
1619
1620    def prefer_router_id(self, router_id):
1621        cmd = 'preferrouterid %d' % router_id
1622        self.send_command(cmd)
1623        self._expect_done()
1624
1625    def release_router_id(self, router_id):
1626        cmd = 'releaserouterid %d' % router_id
1627        self.send_command(cmd)
1628        self._expect_done()
1629
1630    def get_state(self):
1631        states = [r'detached', r'child', r'router', r'leader', r'disabled']
1632        self.send_command('state')
1633        return self._expect_result(states)
1634
1635    def set_state(self, state):
1636        cmd = 'state %s' % state
1637        self.send_command(cmd)
1638        self._expect_done()
1639
1640    def get_timeout(self):
1641        self.send_command('childtimeout')
1642        return self._expect_result(r'\d+')
1643
1644    def set_timeout(self, timeout):
1645        cmd = 'childtimeout %d' % timeout
1646        self.send_command(cmd)
1647        self._expect_done()
1648
1649    def set_max_children(self, number):
1650        cmd = 'childmax %d' % number
1651        self.send_command(cmd)
1652        self._expect_done()
1653
1654    def get_weight(self):
1655        self.send_command('leaderweight')
1656        return self._expect_result(r'\d+')
1657
1658    def set_weight(self, weight):
1659        cmd = 'leaderweight %d' % weight
1660        self.send_command(cmd)
1661        self._expect_done()
1662
1663    def add_ipaddr(self, ipaddr):
1664        cmd = 'ipaddr add %s' % ipaddr
1665        self.send_command(cmd)
1666        self._expect_done()
1667
1668    def del_ipaddr(self, ipaddr):
1669        cmd = 'ipaddr del %s' % ipaddr
1670        self.send_command(cmd)
1671        self._expect_done()
1672
1673    def add_ipmaddr(self, ipmaddr):
1674        cmd = 'ipmaddr add %s' % ipmaddr
1675        self.send_command(cmd)
1676        self._expect_done()
1677
1678    def del_ipmaddr(self, ipmaddr):
1679        cmd = 'ipmaddr del %s' % ipmaddr
1680        self.send_command(cmd)
1681        self._expect_done()
1682
1683    def get_addrs(self):
1684        self.send_command('ipaddr')
1685
1686        return self._expect_results(r'\S+(:\S*)+')
1687
1688    def get_mleid(self):
1689        self.send_command('ipaddr mleid')
1690        return self._expect_result(r'\S+(:\S*)+')
1691
1692    def get_linklocal(self):
1693        self.send_command('ipaddr linklocal')
1694        return self._expect_result(r'\S+(:\S*)+')
1695
1696    def get_rloc(self):
1697        self.send_command('ipaddr rloc')
1698        return self._expect_result(r'\S+(:\S*)+')
1699
1700    def get_addr(self, prefix):
1701        network = ipaddress.ip_network(u'%s' % str(prefix))
1702        addrs = self.get_addrs()
1703
1704        for addr in addrs:
1705            if isinstance(addr, bytearray):
1706                addr = bytes(addr)
1707            ipv6_address = ipaddress.ip_address(addr)
1708            if ipv6_address in network:
1709                return ipv6_address.exploded
1710
1711        return None
1712
1713    def has_ipaddr(self, address):
1714        ipaddr = ipaddress.ip_address(address)
1715        ipaddrs = self.get_addrs()
1716        for addr in ipaddrs:
1717            if isinstance(addr, bytearray):
1718                addr = bytes(addr)
1719            if ipaddress.ip_address(addr) == ipaddr:
1720                return True
1721        return False
1722
1723    def get_ipmaddrs(self):
1724        self.send_command('ipmaddr')
1725        return self._expect_results(r'\S+(:\S*)+')
1726
1727    def has_ipmaddr(self, address):
1728        ipmaddr = ipaddress.ip_address(address)
1729        ipmaddrs = self.get_ipmaddrs()
1730        for addr in ipmaddrs:
1731            if isinstance(addr, bytearray):
1732                addr = bytes(addr)
1733            if ipaddress.ip_address(addr) == ipmaddr:
1734                return True
1735        return False
1736
1737    def get_addr_leader_aloc(self):
1738        addrs = self.get_addrs()
1739        for addr in addrs:
1740            segs = addr.split(':')
1741            if (segs[4] == '0' and segs[5] == 'ff' and segs[6] == 'fe00' and segs[7] == 'fc00'):
1742                return addr
1743        return None
1744
1745    def get_mleid_iid(self):
1746        ml_eid = IPv6Address(self.get_mleid())
1747        return ml_eid.packed[8:].hex()
1748
1749    def get_eidcaches(self):
1750        eidcaches = []
1751        self.send_command('eidcache')
1752        for line in self._expect_results(r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)'):
1753            eidcaches.append(line.split())
1754
1755        return eidcaches
1756
1757    def add_service(self, enterpriseNumber, serviceData, serverData):
1758        cmd = 'service add %s %s %s' % (
1759            enterpriseNumber,
1760            serviceData,
1761            serverData,
1762        )
1763        self.send_command(cmd)
1764        self._expect_done()
1765
1766    def remove_service(self, enterpriseNumber, serviceData):
1767        cmd = 'service remove %s %s' % (enterpriseNumber, serviceData)
1768        self.send_command(cmd)
1769        self._expect_done()
1770
1771    def get_child_table(self) -> Dict[int, Dict[str, Any]]:
1772        """Get the table of attached children."""
1773        cmd = 'child table'
1774        self.send_command(cmd)
1775        output = self._expect_command_output()
1776
1777        #
1778        # Example output:
1779        # | ID  | RLOC16 | Timeout    | Age        | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC     |
1780        # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+
1781        # |   1 | 0xc801 |        240 |         24 |     3 |  131 |1|0|0|  3| 0 |     0 | 4ecede68435358ac |
1782        # |   2 | 0xc802 |        240 |          2 |     3 |  131 |0|0|0|  3| 1 |     0 | a672a601d2ce37d8 |
1783        # Done
1784        #
1785
1786        headers = self.__split_table_row(output[0])
1787
1788        table = {}
1789        for line in output[2:]:
1790            line = line.strip()
1791            if not line:
1792                continue
1793
1794            fields = self.__split_table_row(line)
1795            col = lambda colname: self.__get_table_col(colname, headers, fields)
1796
1797            id = int(col("ID"))
1798            r, d, n = int(col("R")), int(col("D")), int(col("N"))
1799            mode = f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}'
1800
1801            table[int(id)] = {
1802                'id': int(id),
1803                'rloc16': int(col('RLOC16'), 16),
1804                'timeout': int(col('Timeout')),
1805                'age': int(col('Age')),
1806                'lq_in': int(col('LQ In')),
1807                'c_vn': int(col('C_VN')),
1808                'mode': mode,
1809                'extaddr': col('Extended MAC'),
1810                'ver': int(col('Ver')),
1811                'csl': bool(int(col('CSL'))),
1812                'qmsgcnt': int(col('QMsgCnt')),
1813            }
1814
1815        return table
1816
1817    def __split_table_row(self, row: str) -> List[str]:
1818        if not (row.startswith('|') and row.endswith('|')):
1819            raise ValueError(row)
1820
1821        fields = row.split('|')
1822        fields = [x.strip() for x in fields[1:-1]]
1823        return fields
1824
1825    def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
1826        return fields[headers.index(colname)]
1827
1828    def __getOmrAddress(self):
1829        prefixes = [prefix.split('::')[0] for prefix in self.get_prefixes()]
1830        omr_addrs = []
1831        for addr in self.get_addrs():
1832            for prefix in prefixes:
1833                if (addr.startswith(prefix)) and (addr != self.__getDua()):
1834                    omr_addrs.append(addr)
1835                    break
1836
1837        return omr_addrs
1838
1839    def __getLinkLocalAddress(self):
1840        for ip6Addr in self.get_addrs():
1841            if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I):
1842                return ip6Addr
1843
1844        return None
1845
1846    def __getGlobalAddress(self):
1847        global_address = []
1848        for ip6Addr in self.get_addrs():
1849            if ((not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I)) and
1850                (not re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)) and
1851                (not re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I))):
1852                global_address.append(ip6Addr)
1853
1854        return global_address
1855
1856    def __getRloc(self):
1857        for ip6Addr in self.get_addrs():
1858            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
1859                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
1860                    not (re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I))):
1861                return ip6Addr
1862        return None
1863
1864    def __getAloc(self):
1865        aloc = []
1866        for ip6Addr in self.get_addrs():
1867            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
1868                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
1869                    re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)):
1870                aloc.append(ip6Addr)
1871
1872        return aloc
1873
1874    def __getMleid(self):
1875        for ip6Addr in self.get_addrs():
1876            if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr,
1877                        re.I) and not (re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)):
1878                return ip6Addr
1879
1880        return None
1881
1882    def __getDua(self) -> Optional[str]:
1883        for ip6Addr in self.get_addrs():
1884            if re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, ip6Addr, re.I):
1885                return ip6Addr
1886
1887        return None
1888
1889    def get_ip6_address_by_prefix(self, prefix: Union[str, IPv6Network]) -> List[IPv6Address]:
1890        """Get addresses matched with given prefix.
1891
1892        Args:
1893            prefix: the prefix to match against.
1894                    Can be either a string or ipaddress.IPv6Network.
1895
1896        Returns:
1897            The IPv6 address list.
1898        """
1899        if isinstance(prefix, str):
1900            prefix = IPv6Network(prefix)
1901        addrs = map(IPv6Address, self.get_addrs())
1902
1903        return [addr for addr in addrs if addr in prefix]
1904
1905    def get_ip6_address(self, address_type):
1906        """Get specific type of IPv6 address configured on thread device.
1907
1908        Args:
1909            address_type: the config.ADDRESS_TYPE type of IPv6 address.
1910
1911        Returns:
1912            IPv6 address string.
1913        """
1914        if address_type == config.ADDRESS_TYPE.LINK_LOCAL:
1915            return self.__getLinkLocalAddress()
1916        elif address_type == config.ADDRESS_TYPE.GLOBAL:
1917            return self.__getGlobalAddress()
1918        elif address_type == config.ADDRESS_TYPE.RLOC:
1919            return self.__getRloc()
1920        elif address_type == config.ADDRESS_TYPE.ALOC:
1921            return self.__getAloc()
1922        elif address_type == config.ADDRESS_TYPE.ML_EID:
1923            return self.__getMleid()
1924        elif address_type == config.ADDRESS_TYPE.DUA:
1925            return self.__getDua()
1926        elif address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
1927            return self._getBackboneGua()
1928        elif address_type == config.ADDRESS_TYPE.OMR:
1929            return self.__getOmrAddress()
1930        else:
1931            return None
1932
1933    def get_context_reuse_delay(self):
1934        self.send_command('contextreusedelay')
1935        return self._expect_result(r'\d+')
1936
1937    def set_context_reuse_delay(self, delay):
1938        cmd = 'contextreusedelay %d' % delay
1939        self.send_command(cmd)
1940        self._expect_done()
1941
1942    def add_prefix(self, prefix, flags='paosr', prf='med'):
1943        cmd = 'prefix add %s %s %s' % (prefix, flags, prf)
1944        self.send_command(cmd)
1945        self._expect_done()
1946
1947    def remove_prefix(self, prefix):
1948        cmd = 'prefix remove %s' % prefix
1949        self.send_command(cmd)
1950        self._expect_done()
1951
1952    def enable_br(self):
1953        self.send_command('br enable')
1954        self._expect_done()
1955
1956    def disable_br(self):
1957        self.send_command('br disable')
1958        self._expect_done()
1959
1960    def get_br_omr_prefix(self):
1961        cmd = 'br omrprefix'
1962        self.send_command(cmd)
1963        return self._expect_command_output()[0]
1964
1965    def get_netdata_omr_prefixes(self):
1966        omr_prefixes = []
1967        for prefix in self.get_prefixes():
1968            prefix, flags = prefix.split()[:2]
1969            if 'a' in flags and 'o' in flags and 's' in flags and 'D' not in flags:
1970                omr_prefixes.append(prefix)
1971
1972        return omr_prefixes
1973
1974    def get_br_on_link_prefix(self):
1975        cmd = 'br onlinkprefix'
1976        self.send_command(cmd)
1977        return self._expect_command_output()[0]
1978
1979    def get_netdata_non_nat64_prefixes(self):
1980        prefixes = []
1981        routes = self.get_routes()
1982        for route in routes:
1983            if 'n' not in route.split(' ')[1]:
1984                prefixes.append(route.split(' ')[0])
1985        return prefixes
1986
1987    def get_br_nat64_prefix(self):
1988        cmd = 'br nat64prefix'
1989        self.send_command(cmd)
1990        return self._expect_command_output()[0]
1991
1992    def get_netdata_nat64_prefix(self):
1993        prefixes = []
1994        routes = self.get_routes()
1995        for route in routes:
1996            if 'n' in route.split(' ')[1]:
1997                prefixes.append(route.split(' ')[0])
1998        return prefixes
1999
2000    def get_prefixes(self):
2001        return self.get_netdata()['Prefixes']
2002
2003    def get_routes(self):
2004        return self.get_netdata()['Routes']
2005
2006    def get_services(self):
2007        netdata = self.netdata_show()
2008        services = []
2009        services_section = False
2010
2011        for line in netdata:
2012            if line.startswith('Services:'):
2013                services_section = True
2014            elif services_section:
2015                services.append(line.strip().split(' '))
2016        return services
2017
2018    def netdata_show(self):
2019        self.send_command('netdata show')
2020        return self._expect_command_output()
2021
2022    def get_netdata(self):
2023        raw_netdata = self.netdata_show()
2024        netdata = {'Prefixes': [], 'Routes': [], 'Services': []}
2025        key_list = ['Prefixes', 'Routes', 'Services']
2026        key = None
2027
2028        for i in range(0, len(raw_netdata)):
2029            keys = list(filter(raw_netdata[i].startswith, key_list))
2030            if keys != []:
2031                key = keys[0]
2032            elif key is not None:
2033                netdata[key].append(raw_netdata[i])
2034
2035        return netdata
2036
2037    def add_route(self, prefix, stable=False, nat64=False, prf='med'):
2038        cmd = 'route add %s ' % prefix
2039        if stable:
2040            cmd += 's'
2041        if nat64:
2042            cmd += 'n'
2043        cmd += ' %s' % prf
2044        self.send_command(cmd)
2045        self._expect_done()
2046
2047    def remove_route(self, prefix):
2048        cmd = 'route remove %s' % prefix
2049        self.send_command(cmd)
2050        self._expect_done()
2051
2052    def register_netdata(self):
2053        self.send_command('netdata register')
2054        self._expect_done()
2055
2056    def netdata_publish_dnssrp_anycast(self, seqnum):
2057        self.send_command(f'netdata publish dnssrp anycast {seqnum}')
2058        self._expect_done()
2059
2060    def netdata_publish_dnssrp_unicast(self, address, port):
2061        self.send_command(f'netdata publish dnssrp unicast {address} {port}')
2062        self._expect_done()
2063
2064    def netdata_publish_dnssrp_unicast_mleid(self, port):
2065        self.send_command(f'netdata publish dnssrp unicast {port}')
2066        self._expect_done()
2067
2068    def netdata_unpublish_dnssrp(self):
2069        self.send_command('netdata unpublish dnssrp')
2070        self._expect_done()
2071
2072    def netdata_publish_prefix(self, prefix, flags='paosr', prf='med'):
2073        self.send_command(f'netdata publish prefix {prefix} {flags} {prf}')
2074        self._expect_done()
2075
2076    def netdata_publish_route(self, prefix, flags='s', prf='med'):
2077        self.send_command(f'netdata publish route {prefix} {flags} {prf}')
2078        self._expect_done()
2079
2080    def netdata_unpublish_prefix(self, prefix):
2081        self.send_command(f'netdata unpublish {prefix}')
2082        self._expect_done()
2083
2084    def send_network_diag_get(self, addr, tlv_types):
2085        self.send_command('networkdiagnostic get %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2086
2087        if isinstance(self.simulator, simulator.VirtualTime):
2088            self.simulator.go(8)
2089            timeout = 1
2090        else:
2091            timeout = 8
2092
2093        self._expect_done(timeout=timeout)
2094
2095    def send_network_diag_reset(self, addr, tlv_types):
2096        self.send_command('networkdiagnostic reset %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2097
2098        if isinstance(self.simulator, simulator.VirtualTime):
2099            self.simulator.go(8)
2100            timeout = 1
2101        else:
2102            timeout = 8
2103
2104        self._expect_done(timeout=timeout)
2105
2106    def energy_scan(self, mask, count, period, scan_duration, ipaddr):
2107        cmd = 'commissioner energy %d %d %d %d %s' % (
2108            mask,
2109            count,
2110            period,
2111            scan_duration,
2112            ipaddr,
2113        )
2114        self.send_command(cmd)
2115
2116        if isinstance(self.simulator, simulator.VirtualTime):
2117            self.simulator.go(8)
2118            timeout = 1
2119        else:
2120            timeout = 8
2121
2122        self._expect('Energy:', timeout=timeout)
2123
2124    def panid_query(self, panid, mask, ipaddr):
2125        cmd = 'commissioner panid %d %d %s' % (panid, mask, ipaddr)
2126        self.send_command(cmd)
2127
2128        if isinstance(self.simulator, simulator.VirtualTime):
2129            self.simulator.go(8)
2130            timeout = 1
2131        else:
2132            timeout = 8
2133
2134        self._expect('Conflict:', timeout=timeout)
2135
2136    def scan(self, result=1, timeout=10):
2137        self.send_command('scan')
2138
2139        self.simulator.go(timeout)
2140
2141        if result == 1:
2142            networks = []
2143            for line in self._expect_command_output()[2:]:
2144                _, panid, extaddr, channel, dbm, lqi, _ = map(str.strip, line.split('|'))
2145                panid = int(panid, 16)
2146                channel, dbm, lqi = map(int, (channel, dbm, lqi))
2147
2148                networks.append({
2149                    'panid': panid,
2150                    'extaddr': extaddr,
2151                    'channel': channel,
2152                    'dbm': dbm,
2153                    'lqi': lqi,
2154                })
2155            return networks
2156
2157    def scan_energy(self, timeout=10):
2158        self.send_command('scan energy')
2159        self.simulator.go(timeout)
2160        rssi_list = []
2161        for line in self._expect_command_output()[2:]:
2162            _, channel, rssi, _ = line.split('|')
2163            rssi_list.append({
2164                'channel': int(channel.strip()),
2165                'rssi': int(rssi.strip()),
2166            })
2167        return rssi_list
2168
2169    def ping(self, ipaddr, num_responses=1, size=8, timeout=5, count=1, interval=1, hoplimit=64, interface=None):
2170        args = f'{ipaddr} {size} {count} {interval} {hoplimit} {timeout}'
2171        if interface is not None:
2172            args = f'-I {interface} {args}'
2173        cmd = f'ping {args}'
2174
2175        self.send_command(cmd)
2176
2177        wait_allowance = 3
2178        end = self.simulator.now() + timeout + wait_allowance
2179
2180        responders = {}
2181
2182        result = True
2183        # ncp-sim doesn't print Done
2184        done = (self.node_type == 'ncp-sim')
2185        while len(responders) < num_responses or not done:
2186            self.simulator.go(1)
2187            try:
2188                i = self._expect([r'from (\S+):', r'Done'], timeout=0.1)
2189            except (pexpect.TIMEOUT, socket.timeout):
2190                if self.simulator.now() < end:
2191                    continue
2192                result = False
2193                if isinstance(self.simulator, simulator.VirtualTime):
2194                    self.simulator.sync_devices()
2195                break
2196            else:
2197                if i == 0:
2198                    responders[self.pexpect.match.groups()[0]] = 1
2199                elif i == 1:
2200                    done = True
2201        return result
2202
2203    def reset(self):
2204        self._reset('reset')
2205
2206    def factory_reset(self):
2207        self._reset('factoryreset')
2208
2209    def _reset(self, cmd):
2210        self.send_command(cmd, expect_command_echo=False)
2211        time.sleep(self.RESET_DELAY)
2212        # Send a "version" command and drain the CLI output after reset
2213        self.send_command('version', expect_command_echo=False)
2214        while True:
2215            try:
2216                self._expect(r"[^\n]+\n", timeout=0.1)
2217                continue
2218            except pexpect.TIMEOUT:
2219                break
2220
2221        if self.is_otbr:
2222            self.set_log_level(5)
2223
2224    def set_router_selection_jitter(self, jitter):
2225        cmd = 'routerselectionjitter %d' % jitter
2226        self.send_command(cmd)
2227        self._expect_done()
2228
2229    def set_active_dataset(
2230        self,
2231        timestamp,
2232        panid=None,
2233        channel=None,
2234        channel_mask=None,
2235        network_key=None,
2236        security_policy=[],
2237    ):
2238        self.send_command('dataset clear')
2239        self._expect_done()
2240
2241        cmd = 'dataset activetimestamp %d' % timestamp
2242        self.send_command(cmd)
2243        self._expect_done()
2244
2245        if panid is not None:
2246            cmd = 'dataset panid %d' % panid
2247            self.send_command(cmd)
2248            self._expect_done()
2249
2250        if channel is not None:
2251            cmd = 'dataset channel %d' % channel
2252            self.send_command(cmd)
2253            self._expect_done()
2254
2255        if channel_mask is not None:
2256            cmd = 'dataset channelmask %d' % channel_mask
2257            self.send_command(cmd)
2258            self._expect_done()
2259
2260        if network_key is not None:
2261            cmd = 'dataset networkkey %s' % network_key
2262            self.send_command(cmd)
2263            self._expect_done()
2264
2265        if security_policy and len(security_policy) == 2:
2266            cmd = 'dataset securitypolicy %s %s' % (
2267                str(security_policy[0]),
2268                security_policy[1],
2269            )
2270            self.send_command(cmd)
2271            self._expect_done()
2272
2273        # Set the meshlocal prefix in config.py
2274        self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0])
2275        self._expect_done()
2276
2277        self.send_command('dataset commit active')
2278        self._expect_done()
2279
2280    def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None, delay=None):
2281        self.send_command('dataset clear')
2282        self._expect_done()
2283
2284        cmd = 'dataset pendingtimestamp %d' % pendingtimestamp
2285        self.send_command(cmd)
2286        self._expect_done()
2287
2288        cmd = 'dataset activetimestamp %d' % activetimestamp
2289        self.send_command(cmd)
2290        self._expect_done()
2291
2292        if panid is not None:
2293            cmd = 'dataset panid %d' % panid
2294            self.send_command(cmd)
2295            self._expect_done()
2296
2297        if channel is not None:
2298            cmd = 'dataset channel %d' % channel
2299            self.send_command(cmd)
2300            self._expect_done()
2301
2302        if delay is not None:
2303            cmd = 'dataset delay %d' % delay
2304            self.send_command(cmd)
2305            self._expect_done()
2306
2307        # Set the meshlocal prefix in config.py
2308        self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0])
2309        self._expect_done()
2310
2311        self.send_command('dataset commit pending')
2312        self._expect_done()
2313
2314    def start_dataset_updater(self, panid=None, channel=None):
2315        self.send_command('dataset clear')
2316        self._expect_done()
2317
2318        if panid is not None:
2319            cmd = 'dataset panid %d' % panid
2320            self.send_command(cmd)
2321            self._expect_done()
2322
2323        if channel is not None:
2324            cmd = 'dataset channel %d' % channel
2325            self.send_command(cmd)
2326            self._expect_done()
2327
2328        self.send_command('dataset updater start')
2329        self._expect_done()
2330
2331    def announce_begin(self, mask, count, period, ipaddr):
2332        cmd = 'commissioner announce %d %d %d %s' % (
2333            mask,
2334            count,
2335            period,
2336            ipaddr,
2337        )
2338        self.send_command(cmd)
2339        self._expect_done()
2340
2341    def send_mgmt_active_set(
2342        self,
2343        active_timestamp=None,
2344        channel=None,
2345        channel_mask=None,
2346        extended_panid=None,
2347        panid=None,
2348        network_key=None,
2349        mesh_local=None,
2350        network_name=None,
2351        security_policy=None,
2352        binary=None,
2353    ):
2354        cmd = 'dataset mgmtsetcommand active '
2355
2356        if active_timestamp is not None:
2357            cmd += 'activetimestamp %d ' % active_timestamp
2358
2359        if channel is not None:
2360            cmd += 'channel %d ' % channel
2361
2362        if channel_mask is not None:
2363            cmd += 'channelmask %d ' % channel_mask
2364
2365        if extended_panid is not None:
2366            cmd += 'extpanid %s ' % extended_panid
2367
2368        if panid is not None:
2369            cmd += 'panid %d ' % panid
2370
2371        if network_key is not None:
2372            cmd += 'networkkey %s ' % network_key
2373
2374        if mesh_local is not None:
2375            cmd += 'localprefix %s ' % mesh_local
2376
2377        if network_name is not None:
2378            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2379
2380        if security_policy is not None:
2381            rotation, flags = security_policy
2382            cmd += 'securitypolicy %d %s ' % (rotation, flags)
2383
2384        if binary is not None:
2385            cmd += '-x %s ' % binary
2386
2387        self.send_command(cmd)
2388        self._expect_done()
2389
2390    def send_mgmt_active_get(self, addr='', tlvs=[]):
2391        cmd = 'dataset mgmtgetcommand active'
2392
2393        if addr != '':
2394            cmd += ' address '
2395            cmd += addr
2396
2397        if len(tlvs) != 0:
2398            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2399            cmd += ' -x '
2400            cmd += tlv_str
2401
2402        self.send_command(cmd)
2403        self._expect_done()
2404
2405    def send_mgmt_pending_get(self, addr='', tlvs=[]):
2406        cmd = 'dataset mgmtgetcommand pending'
2407
2408        if addr != '':
2409            cmd += ' address '
2410            cmd += addr
2411
2412        if len(tlvs) != 0:
2413            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2414            cmd += ' -x '
2415            cmd += tlv_str
2416
2417        self.send_command(cmd)
2418        self._expect_done()
2419
2420    def send_mgmt_pending_set(
2421        self,
2422        pending_timestamp=None,
2423        active_timestamp=None,
2424        delay_timer=None,
2425        channel=None,
2426        panid=None,
2427        network_key=None,
2428        mesh_local=None,
2429        network_name=None,
2430    ):
2431        cmd = 'dataset mgmtsetcommand pending '
2432        if pending_timestamp is not None:
2433            cmd += 'pendingtimestamp %d ' % pending_timestamp
2434
2435        if active_timestamp is not None:
2436            cmd += 'activetimestamp %d ' % active_timestamp
2437
2438        if delay_timer is not None:
2439            cmd += 'delaytimer %d ' % delay_timer
2440
2441        if channel is not None:
2442            cmd += 'channel %d ' % channel
2443
2444        if panid is not None:
2445            cmd += 'panid %d ' % panid
2446
2447        if network_key is not None:
2448            cmd += 'networkkey %s ' % network_key
2449
2450        if mesh_local is not None:
2451            cmd += 'localprefix %s ' % mesh_local
2452
2453        if network_name is not None:
2454            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2455
2456        self.send_command(cmd)
2457        self._expect_done()
2458
2459    def coap_cancel(self):
2460        """
2461        Cancel a CoAP subscription.
2462        """
2463        cmd = 'coap cancel'
2464        self.send_command(cmd)
2465        self._expect_done()
2466
2467    def coap_delete(self, ipaddr, uri, con=False, payload=None):
2468        """
2469        Send a DELETE request via CoAP.
2470        """
2471        return self._coap_rq('delete', ipaddr, uri, con, payload)
2472
2473    def coap_get(self, ipaddr, uri, con=False, payload=None):
2474        """
2475        Send a GET request via CoAP.
2476        """
2477        return self._coap_rq('get', ipaddr, uri, con, payload)
2478
2479    def coap_get_block(self, ipaddr, uri, size=16, count=0):
2480        """
2481        Send a GET request via CoAP.
2482        """
2483        return self._coap_rq_block('get', ipaddr, uri, size, count)
2484
2485    def coap_observe(self, ipaddr, uri, con=False, payload=None):
2486        """
2487        Send a GET request via CoAP with Observe set.
2488        """
2489        return self._coap_rq('observe', ipaddr, uri, con, payload)
2490
2491    def coap_post(self, ipaddr, uri, con=False, payload=None):
2492        """
2493        Send a POST request via CoAP.
2494        """
2495        return self._coap_rq('post', ipaddr, uri, con, payload)
2496
2497    def coap_post_block(self, ipaddr, uri, size=16, count=0):
2498        """
2499        Send a POST request via CoAP.
2500        """
2501        return self._coap_rq_block('post', ipaddr, uri, size, count)
2502
2503    def coap_put(self, ipaddr, uri, con=False, payload=None):
2504        """
2505        Send a PUT request via CoAP.
2506        """
2507        return self._coap_rq('put', ipaddr, uri, con, payload)
2508
2509    def coap_put_block(self, ipaddr, uri, size=16, count=0):
2510        """
2511        Send a PUT request via CoAP.
2512        """
2513        return self._coap_rq_block('put', ipaddr, uri, size, count)
2514
2515    def _coap_rq(self, method, ipaddr, uri, con=False, payload=None):
2516        """
2517        Issue a GET/POST/PUT/DELETE/GET OBSERVE request.
2518        """
2519        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
2520        if con:
2521            cmd += ' con'
2522        else:
2523            cmd += ' non'
2524
2525        if payload is not None:
2526            cmd += ' %s' % payload
2527
2528        self.send_command(cmd)
2529        return self.coap_wait_response()
2530
2531    def _coap_rq_block(self, method, ipaddr, uri, size=16, count=0):
2532        """
2533        Issue a GET/POST/PUT/DELETE/GET OBSERVE BLOCK request.
2534        """
2535        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
2536
2537        cmd += ' block-%d' % size
2538
2539        if count != 0:
2540            cmd += ' %d' % count
2541
2542        self.send_command(cmd)
2543        return self.coap_wait_response()
2544
2545    def coap_wait_response(self):
2546        """
2547        Wait for a CoAP response, and return it.
2548        """
2549        if isinstance(self.simulator, simulator.VirtualTime):
2550            self.simulator.go(5)
2551            timeout = 1
2552        else:
2553            timeout = 5
2554
2555        self._expect(r'coap response from ([\da-f:]+)(?: OBS=(\d+))?'
2556                     r'(?: with payload: ([\da-f]+))?\b',
2557                     timeout=timeout)
2558        (source, observe, payload) = self.pexpect.match.groups()
2559        source = source.decode('UTF-8')
2560
2561        if observe is not None:
2562            observe = int(observe, base=10)
2563
2564        if payload is not None:
2565            try:
2566                payload = binascii.a2b_hex(payload).decode('UTF-8')
2567            except UnicodeDecodeError:
2568                pass
2569
2570        # Return the values received
2571        return dict(source=source, observe=observe, payload=payload)
2572
2573    def coap_wait_request(self):
2574        """
2575        Wait for a CoAP request to be made.
2576        """
2577        if isinstance(self.simulator, simulator.VirtualTime):
2578            self.simulator.go(5)
2579            timeout = 1
2580        else:
2581            timeout = 5
2582
2583        self._expect(r'coap request from ([\da-f:]+)(?: OBS=(\d+))?'
2584                     r'(?: with payload: ([\da-f]+))?\b',
2585                     timeout=timeout)
2586        (source, observe, payload) = self.pexpect.match.groups()
2587        source = source.decode('UTF-8')
2588
2589        if observe is not None:
2590            observe = int(observe, base=10)
2591
2592        if payload is not None:
2593            payload = binascii.a2b_hex(payload).decode('UTF-8')
2594
2595        # Return the values received
2596        return dict(source=source, observe=observe, payload=payload)
2597
2598    def coap_wait_subscribe(self):
2599        """
2600        Wait for a CoAP client to be subscribed.
2601        """
2602        if isinstance(self.simulator, simulator.VirtualTime):
2603            self.simulator.go(5)
2604            timeout = 1
2605        else:
2606            timeout = 5
2607
2608        self._expect(r'Subscribing client\b', timeout=timeout)
2609
2610    def coap_wait_ack(self):
2611        """
2612        Wait for a CoAP notification ACK.
2613        """
2614        if isinstance(self.simulator, simulator.VirtualTime):
2615            self.simulator.go(5)
2616            timeout = 1
2617        else:
2618            timeout = 5
2619
2620        self._expect(r'Received ACK in reply to notification ' r'from ([\da-f:]+)\b', timeout=timeout)
2621        (source,) = self.pexpect.match.groups()
2622        source = source.decode('UTF-8')
2623
2624        return source
2625
2626    def coap_set_resource_path(self, path):
2627        """
2628        Set the path for the CoAP resource.
2629        """
2630        cmd = 'coap resource %s' % path
2631        self.send_command(cmd)
2632        self._expect_done()
2633
2634    def coap_set_resource_path_block(self, path, count=0):
2635        """
2636        Set the path for the CoAP resource and how many blocks can be received from this resource.
2637        """
2638        cmd = 'coap resource %s %d' % (path, count)
2639        self.send_command(cmd)
2640        self._expect('Done')
2641
2642    def coap_set_content(self, content):
2643        """
2644        Set the content of the CoAP resource.
2645        """
2646        cmd = 'coap set %s' % content
2647        self.send_command(cmd)
2648        self._expect_done()
2649
2650    def coap_start(self):
2651        """
2652        Start the CoAP service.
2653        """
2654        cmd = 'coap start'
2655        self.send_command(cmd)
2656        self._expect_done()
2657
2658    def coap_stop(self):
2659        """
2660        Stop the CoAP service.
2661        """
2662        cmd = 'coap stop'
2663        self.send_command(cmd)
2664
2665        if isinstance(self.simulator, simulator.VirtualTime):
2666            self.simulator.go(5)
2667            timeout = 1
2668        else:
2669            timeout = 5
2670
2671        self._expect_done(timeout=timeout)
2672
2673    def coaps_start_psk(self, psk, pskIdentity):
2674        cmd = 'coaps psk %s %s' % (psk, pskIdentity)
2675        self.send_command(cmd)
2676        self._expect_done()
2677
2678        cmd = 'coaps start'
2679        self.send_command(cmd)
2680        self._expect_done()
2681
2682    def coaps_start_x509(self):
2683        cmd = 'coaps x509'
2684        self.send_command(cmd)
2685        self._expect_done()
2686
2687        cmd = 'coaps start'
2688        self.send_command(cmd)
2689        self._expect_done()
2690
2691    def coaps_set_resource_path(self, path):
2692        cmd = 'coaps resource %s' % path
2693        self.send_command(cmd)
2694        self._expect_done()
2695
2696    def coaps_stop(self):
2697        cmd = 'coaps stop'
2698        self.send_command(cmd)
2699
2700        if isinstance(self.simulator, simulator.VirtualTime):
2701            self.simulator.go(5)
2702            timeout = 1
2703        else:
2704            timeout = 5
2705
2706        self._expect_done(timeout=timeout)
2707
2708    def coaps_connect(self, ipaddr):
2709        cmd = 'coaps connect %s' % ipaddr
2710        self.send_command(cmd)
2711
2712        if isinstance(self.simulator, simulator.VirtualTime):
2713            self.simulator.go(5)
2714            timeout = 1
2715        else:
2716            timeout = 5
2717
2718        self._expect('coaps connected', timeout=timeout)
2719
2720    def coaps_disconnect(self):
2721        cmd = 'coaps disconnect'
2722        self.send_command(cmd)
2723        self._expect_done()
2724        self.simulator.go(5)
2725
2726    def coaps_get(self):
2727        cmd = 'coaps get test'
2728        self.send_command(cmd)
2729
2730        if isinstance(self.simulator, simulator.VirtualTime):
2731            self.simulator.go(5)
2732            timeout = 1
2733        else:
2734            timeout = 5
2735
2736        self._expect('coaps response', timeout=timeout)
2737
2738    def commissioner_mgmtget(self, tlvs_binary=None):
2739        cmd = 'commissioner mgmtget'
2740        if tlvs_binary is not None:
2741            cmd += ' -x %s' % tlvs_binary
2742        self.send_command(cmd)
2743        self._expect_done()
2744
2745    def commissioner_mgmtset(self, tlvs_binary):
2746        cmd = 'commissioner mgmtset -x %s' % tlvs_binary
2747        self.send_command(cmd)
2748        self._expect_done()
2749
2750    def bytes_to_hex_str(self, src):
2751        return ''.join(format(x, '02x') for x in src)
2752
2753    def commissioner_mgmtset_with_tlvs(self, tlvs):
2754        payload = bytearray()
2755        for tlv in tlvs:
2756            payload += tlv.to_hex()
2757        self.commissioner_mgmtset(self.bytes_to_hex_str(payload))
2758
2759    def udp_start(self, local_ipaddr, local_port, bind_unspecified=False):
2760        cmd = 'udp open'
2761        self.send_command(cmd)
2762        self._expect_done()
2763
2764        cmd = 'udp bind %s %s %s' % ("-u" if bind_unspecified else "", local_ipaddr, local_port)
2765        self.send_command(cmd)
2766        self._expect_done()
2767
2768    def udp_stop(self):
2769        cmd = 'udp close'
2770        self.send_command(cmd)
2771        self._expect_done()
2772
2773    def udp_send(self, bytes, ipaddr, port, success=True):
2774        cmd = 'udp send %s %d -s %d ' % (ipaddr, port, bytes)
2775        self.send_command(cmd)
2776        if success:
2777            self._expect_done()
2778        else:
2779            self._expect('Error')
2780
2781    def udp_check_rx(self, bytes_should_rx):
2782        self._expect('%d bytes' % bytes_should_rx)
2783
2784    def set_routereligible(self, enable: bool):
2785        cmd = f'routereligible {"enable" if enable else "disable"}'
2786        self.send_command(cmd)
2787        self._expect_done()
2788
2789    def router_list(self):
2790        cmd = 'router list'
2791        self.send_command(cmd)
2792        self._expect([r'(\d+)((\s\d+)*)'])
2793
2794        g = self.pexpect.match.groups()
2795        router_list = g[0].decode('utf8') + ' ' + g[1].decode('utf8')
2796        router_list = [int(x) for x in router_list.split()]
2797        self._expect_done()
2798        return router_list
2799
2800    def router_table(self):
2801        cmd = 'router table'
2802        self.send_command(cmd)
2803
2804        self._expect(r'(.*)Done')
2805        g = self.pexpect.match.groups()
2806        output = g[0].decode('utf8')
2807        lines = output.strip().split('\n')
2808        lines = [l.strip() for l in lines]
2809        router_table = {}
2810        for i, line in enumerate(lines):
2811            if not line.startswith('|') or not line.endswith('|'):
2812                if i not in (0, 2):
2813                    # should not happen
2814                    print("unexpected line %d: %s" % (i, line))
2815
2816                continue
2817
2818            line = line[1:][:-1]
2819            line = [x.strip() for x in line.split('|')]
2820            if len(line) < 9:
2821                print("unexpected line %d: %s" % (i, line))
2822                continue
2823
2824            try:
2825                int(line[0])
2826            except ValueError:
2827                if i != 1:
2828                    print("unexpected line %d: %s" % (i, line))
2829                continue
2830
2831            id = int(line[0])
2832            rloc16 = int(line[1], 16)
2833            nexthop = int(line[2])
2834            pathcost = int(line[3])
2835            lqin = int(line[4])
2836            lqout = int(line[5])
2837            age = int(line[6])
2838            emac = str(line[7])
2839            link = int(line[8])
2840
2841            router_table[id] = {
2842                'rloc16': rloc16,
2843                'nexthop': nexthop,
2844                'pathcost': pathcost,
2845                'lqin': lqin,
2846                'lqout': lqout,
2847                'age': age,
2848                'emac': emac,
2849                'link': link,
2850            }
2851
2852        return router_table
2853
2854    def link_metrics_query_single_probe(self, dst_addr: str, linkmetrics_flags: str):
2855        cmd = 'linkmetrics query %s single %s' % (dst_addr, linkmetrics_flags)
2856        self.send_command(cmd)
2857        self._expect_done()
2858
2859    def link_metrics_query_forward_tracking_series(self, dst_addr: str, series_id: int):
2860        cmd = 'linkmetrics query %s forward %d' % (dst_addr, series_id)
2861        self.send_command(cmd)
2862        self._expect_done()
2863
2864    def link_metrics_mgmt_req_enhanced_ack_based_probing(self,
2865                                                         dst_addr: str,
2866                                                         enable: bool,
2867                                                         metrics_flags: str,
2868                                                         ext_flags=''):
2869        cmd = "linkmetrics mgmt %s enhanced-ack" % (dst_addr)
2870        if enable:
2871            cmd = cmd + (" register %s %s" % (metrics_flags, ext_flags))
2872        else:
2873            cmd = cmd + " clear"
2874        self.send_command(cmd)
2875        self._expect_done()
2876
2877    def link_metrics_mgmt_req_forward_tracking_series(self, dst_addr: str, series_id: int, series_flags: str,
2878                                                      metrics_flags: str):
2879        cmd = "linkmetrics mgmt %s forward %d %s %s" % (dst_addr, series_id, series_flags, metrics_flags)
2880        self.send_command(cmd)
2881        self._expect_done()
2882
2883    def link_metrics_send_link_probe(self, dst_addr: str, series_id: int, length: int):
2884        cmd = "linkmetrics probe %s %d %d" % (dst_addr, series_id, length)
2885        self.send_command(cmd)
2886        self._expect_done()
2887
2888    def send_address_notification(self, dst: str, target: str, mliid: str):
2889        cmd = f'fake /a/an {dst} {target} {mliid}'
2890        self.send_command(cmd)
2891        self._expect_done()
2892
2893    def send_proactive_backbone_notification(self, target: str, mliid: str, ltt: int):
2894        cmd = f'fake /b/ba {target} {mliid} {ltt}'
2895        self.send_command(cmd)
2896        self._expect_done()
2897
2898    def dns_get_config(self):
2899        """
2900        Returns the DNS config as a list of property dictionary (string key and string value).
2901
2902        Example output:
2903        {
2904            'Server': '[fd00:0:0:0:0:0:0:1]:1234'
2905            'ResponseTimeout': '5000 ms'
2906            'MaxTxAttempts': '2'
2907            'RecursionDesired': 'no'
2908        }
2909        """
2910        cmd = f'dns config'
2911        self.send_command(cmd)
2912        output = self._expect_command_output()
2913        config = {}
2914        for line in output:
2915            k, v = line.split(': ')
2916            config[k] = v
2917        return config
2918
2919    def dns_set_config(self, config):
2920        cmd = f'dns config {config}'
2921        self.send_command(cmd)
2922        self._expect_done()
2923
2924    def dns_resolve(self, hostname, server=None, port=53):
2925        cmd = f'dns resolve {hostname}'
2926        if server is not None:
2927            cmd += f' {server} {port}'
2928
2929        self.send_command(cmd)
2930        self.simulator.go(10)
2931        output = self._expect_command_output()
2932        dns_resp = output[0]
2933        # example output: "DNS response for host1.default.service.arpa. - fd00:db8:0:0:fd3d:d471:1e8c:b60 TTL:7190 "
2934        #                 " fd00:db8:0:0:0:ff:fe00:9000 TTL:7190"
2935        addrs = dns_resp.strip().split(' - ')[1].split(' ')
2936        ip = [item.strip() for item in addrs[::2]]
2937        ttl = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
2938
2939        return list(zip(ip, ttl))
2940
2941    def dns_resolve_service(self, instance, service, server=None, port=53):
2942        """
2943        Resolves the service instance and returns the instance information as a dict.
2944
2945        Example return value:
2946            {
2947                'port': 12345,
2948                'priority': 0,
2949                'weight': 0,
2950                'host': 'ins1._ipps._tcp.default.service.arpa.',
2951                'address': '2001::1',
2952                'txt_data': 'a=00, b=02bb',
2953                'srv_ttl': 7100,
2954                'txt_ttl': 7100,
2955                'aaaa_ttl': 7100,
2956            }
2957        """
2958        instance = self._escape_escapable(instance)
2959        cmd = f'dns service {instance} {service}'
2960        if server is not None:
2961            cmd += f' {server} {port}'
2962
2963        self.send_command(cmd)
2964        self.simulator.go(10)
2965        output = self._expect_command_output()
2966
2967        # Example output:
2968        # DNS service resolution response for ins2 for service _ipps._tcp.default.service.arpa.
2969        # Port:22222, Priority:2, Weight:2, TTL:7155
2970        # Host:host2.default.service.arpa.
2971        # HostAddress:0:0:0:0:0:0:0:0 TTL:0
2972        # TXT:[a=00, b=02bb] TTL:7155
2973        # Done
2974
2975        m = re.match(
2976            r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)',
2977            '\r'.join(output))
2978        if m:
2979            port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
2980            return {
2981                'port': int(port),
2982                'priority': int(priority),
2983                'weight': int(weight),
2984                'host': hostname,
2985                'address': address,
2986                'txt_data': txt_data,
2987                'srv_ttl': int(srv_ttl),
2988                'txt_ttl': int(txt_ttl),
2989                'aaaa_ttl': int(aaaa_ttl),
2990            }
2991        else:
2992            raise Exception('dns resolve service failed: %s.%s' % (instance, service))
2993
2994    @staticmethod
2995    def __parse_hex_string(hexstr: str) -> bytes:
2996        assert (len(hexstr) % 2 == 0)
2997        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
2998
2999    def dns_browse(self, service_name, server=None, port=53):
3000        """
3001        Browse the service and returns the instances.
3002
3003        Example return value:
3004            {
3005                'ins1': {
3006                    'port': 12345,
3007                    'priority': 1,
3008                    'weight': 1,
3009                    'host': 'ins1._ipps._tcp.default.service.arpa.',
3010                    'address': '2001::1',
3011                    'txt_data': 'a=00, b=11cf',
3012                    'srv_ttl': 7100,
3013                    'txt_ttl': 7100,
3014                    'aaaa_ttl': 7100,
3015                },
3016                'ins2': {
3017                    'port': 12345,
3018                    'priority': 2,
3019                    'weight': 2,
3020                    'host': 'ins2._ipps._tcp.default.service.arpa.',
3021                    'address': '2001::2',
3022                    'txt_data': 'a=01, b=23dd',
3023                    'srv_ttl': 7100,
3024                    'txt_ttl': 7100,
3025                    'aaaa_ttl': 7100,
3026                }
3027            }
3028        """
3029        cmd = f'dns browse {service_name}'
3030        if server is not None:
3031            cmd += f' {server} {port}'
3032
3033        self.send_command(cmd)
3034        self.simulator.go(10)
3035        output = '\n'.join(self._expect_command_output())
3036
3037        # Example output:
3038        # ins2
3039        #     Port:22222, Priority:2, Weight:2, TTL:7175
3040        #     Host:host2.default.service.arpa.
3041        #     HostAddress:fd00:db8:0:0:3205:28dd:5b87:6a63 TTL:7175
3042        #     TXT:[a=00, b=11cf] TTL:7175
3043        # ins1
3044        #     Port:11111, Priority:1, Weight:1, TTL:7170
3045        #     Host:host1.default.service.arpa.
3046        #     HostAddress:fd00:db8:0:0:39f4:d9:eb4f:778 TTL:7170
3047        #     TXT:[a=01, b=23dd] TTL:7170
3048        # Done
3049
3050        result = {}
3051        for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall(
3052                r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)',
3053                output):
3054            result[ins] = {
3055                'port': int(port),
3056                'priority': int(priority),
3057                'weight': int(weight),
3058                'host': hostname,
3059                'address': address,
3060                'txt_data': txt_data,
3061                'srv_ttl': int(srv_ttl),
3062                'txt_ttl': int(txt_ttl),
3063                'aaaa_ttl': int(aaaa_ttl),
3064            }
3065
3066        return result
3067
3068    def set_mliid(self, mliid: str):
3069        cmd = f'mliid {mliid}'
3070        self.send_command(cmd)
3071        self._expect_command_output()
3072
3073    def history_netinfo(self, num_entries=0):
3074        """
3075        Get the `netinfo` history list, parse each entry and return
3076        a list of dictionary (string key and string value) entries.
3077
3078        Example of return value:
3079        [
3080            {
3081                'age': '00:00:00.000 ago',
3082                'role': 'disabled',
3083                'mode': 'rdn',
3084                'rloc16': '0x7400',
3085                'partition-id': '1318093703'
3086            },
3087            {
3088                'age': '00:00:02.588 ago',
3089                'role': 'leader',
3090                'mode': 'rdn',
3091                'rloc16': '0x7400',
3092                'partition-id': '1318093703'
3093            }
3094        ]
3095        """
3096        cmd = f'history netinfo list {num_entries}'
3097        self.send_command(cmd)
3098        output = self._expect_command_output()
3099        netinfos = []
3100        for entry in output:
3101            netinfo = {}
3102            age, info = entry.split(' -> ')
3103            netinfo['age'] = age
3104            for item in info.split(' '):
3105                k, v = item.split(':')
3106                netinfo[k] = v
3107            netinfos.append(netinfo)
3108        return netinfos
3109
3110    def history_rx(self, num_entries=0):
3111        """
3112        Get the IPv6 RX history list, parse each entry and return
3113        a list of dictionary (string key and string value) entries.
3114
3115        Example of return value:
3116        [
3117            {
3118                'age': '00:00:01.999',
3119                'type': 'ICMP6(EchoReqst)',
3120                'len': '16',
3121                'sec': 'yes',
3122                'prio': 'norm',
3123                'rss': '-20',
3124                'from': '0xac00',
3125                'radio': '15.4',
3126                'src': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3127                'dst': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3128            }
3129        ]
3130        """
3131        cmd = f'history rx list {num_entries}'
3132        self.send_command(cmd)
3133        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3134
3135    def history_tx(self, num_entries=0):
3136        """
3137        Get the IPv6 TX history list, parse each entry and return
3138        a list of dictionary (string key and string value) entries.
3139
3140        Example of return value:
3141        [
3142            {
3143                'age': '00:00:01.999',
3144                'type': 'ICMP6(EchoReply)',
3145                'len': '16',
3146                'sec': 'yes',
3147                'prio': 'norm',
3148                'to': '0xac00',
3149                'tx-success': 'yes',
3150                'radio': '15.4',
3151                'src': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3152                'dst': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3153
3154            }
3155        ]
3156        """
3157        cmd = f'history tx list {num_entries}'
3158        self.send_command(cmd)
3159        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3160
3161    def _parse_history_rx_tx_ouput(self, lines):
3162        rxtx_list = []
3163        for line in lines:
3164            if line.strip().startswith('type:'):
3165                for item in line.strip().split(' '):
3166                    k, v = item.split(':')
3167                    entry[k] = v
3168            elif line.strip().startswith('src:'):
3169                entry['src'] = line[4:]
3170            elif line.strip().startswith('dst:'):
3171                entry['dst'] = line[4:]
3172                rxtx_list.append(entry)
3173            else:
3174                entry = {}
3175                entry['age'] = line
3176
3177        return rxtx_list
3178
3179    def set_router_id_range(self, min_router_id: int, max_router_id: int):
3180        cmd = f'routeridrange {min_router_id} {max_router_id}'
3181        self.send_command(cmd)
3182        self._expect_command_output()
3183
3184    def get_router_id_range(self):
3185        cmd = 'routeridrange'
3186        self.send_command(cmd)
3187        line = self._expect_command_output()[0]
3188        return [int(item) for item in line.split()]
3189
3190
3191class Node(NodeImpl, OtCli):
3192    pass
3193
3194
3195class LinuxHost():
3196    PING_RESPONSE_PATTERN = re.compile(r'\d+ bytes from .*:.*')
3197    ETH_DEV = config.BACKBONE_IFNAME
3198
3199    def enable_ether(self):
3200        """Enable the ethernet interface.
3201        """
3202
3203        self.bash(f'ip link set {self.ETH_DEV} up')
3204
3205    def disable_ether(self):
3206        """Disable the ethernet interface.
3207        """
3208
3209        self.bash(f'ip link set {self.ETH_DEV} down')
3210
3211    def get_ether_addrs(self):
3212        output = self.bash(f'ip -6 addr list dev {self.ETH_DEV}')
3213
3214        addrs = []
3215        for line in output:
3216            # line example: "inet6 fe80::42:c0ff:fea8:903/64 scope link"
3217            line = line.strip().split()
3218
3219            if line and line[0] == 'inet6':
3220                addr = line[1]
3221                if '/' in addr:
3222                    addr = addr.split('/')[0]
3223                addrs.append(addr)
3224
3225        logging.debug('%s: get_ether_addrs: %r', self, addrs)
3226        return addrs
3227
3228    def get_ether_mac(self):
3229        output = self.bash(f'ip addr list dev {self.ETH_DEV}')
3230        for line in output:
3231            # link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
3232            line = line.strip().split()
3233            if line and line[0] == 'link/ether':
3234                return line[1]
3235
3236        assert False, output
3237
3238    def add_ipmaddr_ether(self, ip: str):
3239        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.ETH_DEV} {ip} &'
3240        self.bash(cmd)
3241
3242    def ping_ether(self, ipaddr, num_responses=1, size=None, timeout=5, ttl=None, interface='eth0') -> int:
3243
3244        cmd = f'ping -6 {ipaddr} -I {interface} -c {num_responses} -W {timeout}'
3245        if size is not None:
3246            cmd += f' -s {size}'
3247
3248        if ttl is not None:
3249            cmd += f' -t {ttl}'
3250
3251        resp_count = 0
3252
3253        try:
3254            for line in self.bash(cmd):
3255                if self.PING_RESPONSE_PATTERN.match(line):
3256                    resp_count += 1
3257        except subprocess.CalledProcessError:
3258            pass
3259
3260        return resp_count
3261
3262    def _getBackboneGua(self) -> Optional[str]:
3263        for addr in self.get_ether_addrs():
3264            if re.match(config.BACKBONE_PREFIX_REGEX_PATTERN, addr, re.I):
3265                return addr
3266
3267        return None
3268
3269    def _getInfraUla(self) -> Optional[str]:
3270        """ Returns the ULA addresses autoconfigured on the infra link.
3271        """
3272        addrs = []
3273        for addr in self.get_ether_addrs():
3274            if re.match(config.ONLINK_PREFIX_REGEX_PATTERN, addr, re.I):
3275                addrs.append(addr)
3276
3277        return addrs
3278
3279    def _getInfraGua(self) -> Optional[str]:
3280        """ Returns the GUA addresses autoconfigured on the infra link.
3281        """
3282
3283        gua_prefix = config.ONLINK_GUA_PREFIX.split('::/')[0]
3284        return [addr for addr in self.get_ether_addrs() if addr.startswith(gua_prefix)]
3285
3286    def ping(self, *args, **kwargs):
3287        backbone = kwargs.pop('backbone', False)
3288        if backbone:
3289            return self.ping_ether(*args, **kwargs)
3290        else:
3291            return super().ping(*args, **kwargs)
3292
3293    def udp_send_host(self, ipaddr, port, data, hop_limit=None):
3294        if hop_limit is None:
3295            if ipaddress.ip_address(ipaddr).is_multicast:
3296                hop_limit = 10
3297            else:
3298                hop_limit = 64
3299        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/udp_send_host.py {ipaddr} {port} "{data}" {hop_limit}'
3300        self.bash(cmd)
3301
3302    def add_ipmaddr(self, *args, **kwargs):
3303        backbone = kwargs.pop('backbone', False)
3304        if backbone:
3305            return self.add_ipmaddr_ether(*args, **kwargs)
3306        else:
3307            return super().add_ipmaddr(*args, **kwargs)
3308
3309    def ip_neighbors_flush(self):
3310        # clear neigh cache on linux
3311        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3312        self.bash(f'ip -6 neigh flush nud all nud failed nud noarp dev {self.ETH_DEV}')
3313        self.bash('ip -6 neigh list nud all dev %s | cut -d " " -f1 | sudo xargs -I{} ip -6 neigh delete {} dev %s' %
3314                  (self.ETH_DEV, self.ETH_DEV))
3315        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3316
3317    def browse_mdns_services(self, name, timeout=2):
3318        """ Browse mDNS services on the ethernet.
3319
3320        :param name: the service type name in format of '<service-name>.<protocol>'.
3321        :param timeout: timeout value in seconds before returning.
3322        :return: A list of service instance names.
3323        """
3324
3325        self.bash(f'dns-sd -Z {name} local. > /tmp/{name} 2>&1 &')
3326        time.sleep(timeout)
3327        self.bash('pkill dns-sd')
3328
3329        instances = []
3330        for line in self.bash(f'cat /tmp/{name}', encoding='raw_unicode_escape'):
3331            elements = line.split()
3332            if len(elements) >= 3 and elements[0] == name and elements[1] == 'PTR':
3333                instances.append(elements[2][:-len('.' + name)])
3334        return instances
3335
3336    def discover_mdns_service(self, instance, name, host_name, timeout=2):
3337        """ Discover/resolve the mDNS service on ethernet.
3338
3339        :param instance: the service instance name.
3340        :param name: the service name in format of '<service-name>.<protocol>'.
3341        :param host_name: the host name this service points to. The domain
3342                          should not be included.
3343        :param timeout: timeout value in seconds before returning.
3344        :return: a dict of service properties or None.
3345
3346        The return value is a dict with the same key/values of srp_server_get_service
3347        except that we don't have a `deleted` field here.
3348        """
3349        host_name_file = self.bash('mktemp')[0].strip()
3350        service_data_file = self.bash('mktemp')[0].strip()
3351
3352        self.bash(f'dns-sd -Z {name} local. > {service_data_file} 2>&1 &')
3353        time.sleep(timeout)
3354
3355        full_service_name = f'{instance}.{name}'
3356        # When hostname is unspecified, extract hostname from browse result
3357        if host_name is None:
3358            for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
3359                elements = line.split()
3360                if len(elements) >= 6 and elements[0] == full_service_name and elements[1] == 'SRV':
3361                    host_name = elements[5].split('.')[0]
3362                    break
3363
3364        assert (host_name is not None)
3365        self.bash(f'dns-sd -G v6 {host_name}.local. > {host_name_file} 2>&1 &')
3366        time.sleep(timeout)
3367
3368        self.bash('pkill dns-sd')
3369        addresses = []
3370        service = {}
3371
3372        logging.debug(self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'))
3373        logging.debug(self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'))
3374
3375        # example output in the host file:
3376        # Timestamp     A/R Flags if Hostname                               Address                                     TTL
3377        # 9:38:09.274  Add     23 48 my-host.local.                         2001:0000:0000:0000:0000:0000:0000:0002%<0>  120
3378        #
3379        for line in self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'):
3380            elements = line.split()
3381            fullname = f'{host_name}.local.'
3382            if fullname not in elements:
3383                continue
3384            addresses.append(elements[elements.index(fullname) + 1].split('%')[0])
3385
3386        logging.debug(f'addresses of {host_name}: {addresses}')
3387
3388        # example output of in the service file:
3389        # _ipps._tcp                                      PTR     my-service._ipps._tcp
3390        # my-service._ipps._tcp                           SRV     0 0 12345 my-host.local. ; Replace with unicast FQDN of target host
3391        # my-service._ipps._tcp                           TXT     ""
3392        #
3393        is_txt = False
3394        txt = ''
3395        for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
3396            elements = line.split()
3397            if len(elements) >= 2 and elements[0] == full_service_name and elements[1] == 'TXT':
3398                is_txt = True
3399            if is_txt:
3400                txt += line.strip()
3401                if line.strip().endswith('"'):
3402                    is_txt = False
3403                    txt_dict = self.__parse_dns_sd_txt(txt)
3404                    logging.info(f'txt = {txt_dict}')
3405                    service['txt'] = txt_dict
3406
3407            if not elements or elements[0] != full_service_name:
3408                continue
3409            if elements[1] == 'SRV':
3410                service['fullname'] = elements[0]
3411                service['instance'] = instance
3412                service['name'] = name
3413                service['priority'] = int(elements[2])
3414                service['weight'] = int(elements[3])
3415                service['port'] = int(elements[4])
3416                service['host_fullname'] = elements[5]
3417                assert (service['host_fullname'] == f'{host_name}.local.')
3418                service['host'] = host_name
3419                service['addresses'] = addresses
3420        return service if 'addresses' in service and service['addresses'] else None
3421
3422    def start_radvd_service(self, prefix, slaac):
3423        self.bash("""cat >/etc/radvd.conf <<EOF
3424interface eth0
3425{
3426    AdvSendAdvert on;
3427
3428    AdvReachableTime 200;
3429    AdvRetransTimer 200;
3430    AdvDefaultLifetime 1800;
3431    MinRtrAdvInterval 1200;
3432    MaxRtrAdvInterval 1800;
3433    AdvDefaultPreference low;
3434
3435    prefix %s
3436    {
3437        AdvOnLink on;
3438        AdvAutonomous %s;
3439        AdvRouterAddr off;
3440        AdvPreferredLifetime 40;
3441        AdvValidLifetime 60;
3442    };
3443};
3444EOF
3445""" % (prefix, 'on' if slaac else 'off'))
3446        self.bash('service radvd start')
3447        self.bash('service radvd status')  # Make sure radvd service is running
3448
3449    def stop_radvd_service(self):
3450        self.bash('service radvd stop')
3451
3452    def kill_radvd_service(self):
3453        self.bash('pkill radvd')
3454
3455    def __parse_dns_sd_txt(self, line: str):
3456        # Example TXT entry:
3457        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
3458        txt = {}
3459        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
3460            if '=' not in entry:
3461                continue
3462
3463            k, v = entry.split('=', 1)
3464            txt[k] = v
3465
3466        return txt
3467
3468
3469class OtbrNode(LinuxHost, NodeImpl, OtbrDocker):
3470    TUN_DEV = config.THREAD_IFNAME
3471    is_otbr = True
3472    is_bbr = True  # OTBR is also BBR
3473    node_type = 'otbr-docker'
3474
3475    def __repr__(self):
3476        return f'Otbr<{self.nodeid}>'
3477
3478    def start(self):
3479        self._setup_sysctl()
3480        self.set_log_level(5)
3481        super().start()
3482
3483    def add_ipaddr(self, addr):
3484        cmd = f'ip -6 addr add {addr}/64 dev {self.TUN_DEV}'
3485        self.bash(cmd)
3486
3487    def add_ipmaddr_tun(self, ip: str):
3488        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.TUN_DEV} {ip} &'
3489        self.bash(cmd)
3490
3491
3492class HostNode(LinuxHost, OtbrDocker):
3493    is_host = True
3494
3495    def __init__(self, nodeid, name=None, **kwargs):
3496        self.nodeid = nodeid
3497        self.name = name or ('Host%d' % nodeid)
3498        super().__init__(nodeid, **kwargs)
3499        self.bash('service otbr-agent stop')
3500
3501    def start(self, start_radvd=True, prefix=config.DOMAIN_PREFIX, slaac=False):
3502        self._setup_sysctl()
3503        if start_radvd:
3504            self.start_radvd_service(prefix, slaac)
3505        else:
3506            self.stop_radvd_service()
3507
3508    def stop(self):
3509        self.stop_radvd_service()
3510
3511    def get_addrs(self) -> List[str]:
3512        return self.get_ether_addrs()
3513
3514    def __repr__(self):
3515        return f'Host<{self.nodeid}>'
3516
3517    def get_matched_ula_addresses(self, prefix):
3518        """Get the IPv6 addresses that matches given prefix.
3519        """
3520
3521        addrs = []
3522        for addr in self.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA):
3523            if IPv6Address(addr) in IPv6Network(prefix):
3524                addrs.append(addr)
3525
3526        return addrs
3527
3528    def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
3529        """Get specific type of IPv6 address configured on thread device.
3530
3531        Args:
3532            address_type: the config.ADDRESS_TYPE type of IPv6 address.
3533
3534        Returns:
3535            IPv6 address string.
3536        """
3537
3538        if address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
3539            return self._getBackboneGua()
3540        elif address_type == config.ADDRESS_TYPE.ONLINK_ULA:
3541            return self._getInfraUla()
3542        elif address_type == config.ADDRESS_TYPE.ONLINK_GUA:
3543            return self._getInfraGua()
3544        else:
3545            raise ValueError(f'unsupported address type: {address_type}')
3546
3547
3548if __name__ == '__main__':
3549    unittest.main()
3550