• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import ipaddress
30import logging
31import re
32from collections import Counter
33from typing import Callable, List, Collection, Union, Tuple, Optional, Dict, Pattern, Any
34
35from . import connectors
36from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshCommandRunner
37from .connectors import Simulator
38from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError
39from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix, \
40    RouterTableEntry, NetifIdentifier
41from .utils import match_line, constant_property
42
43
44class OTCI(object):
45    """
46    This class represents an OpenThread Controller Interface instance that provides versatile interfaces to
47    manipulate an OpenThread device.
48    """
49
50    DEFAULT_EXEC_COMMAND_RETRY = 4  # A command is retried 4 times if failed.
51
52    __exec_command_retry = DEFAULT_EXEC_COMMAND_RETRY
53
54    def __init__(self, otcmd: OTCommandHandler):
55        """
56        This method initializes an OTCI instance.
57
58        :param otcmd: An OpenThread Command Handler instance to execute OpenThread CLI commands.
59        """
60        self.__otcmd: OTCommandHandler = otcmd
61        self.__logger = logging.getLogger(name=str(self))
62
63    def __repr__(self):
64        """Gets the string representation of the OTCI instance."""
65        return repr(self.__otcmd)
66
67    def wait(self, duration: float, expect_line: Union[str, Pattern, Collection[Any]] = None):
68        """Wait for a given duration.
69
70        :param duration: The duration (in seconds) wait for.
71        :param expect_line: The line expected to output if given.
72                            Raise ExpectLineTimeoutError if expect_line is not found within the given duration.
73        """
74        self.log('info', "wait for %.3f seconds", duration)
75        if expect_line is None:
76            self.__otcmd.wait(duration)
77        else:
78            success = False
79
80            while duration > 0:
81                output = self.__otcmd.wait(1)
82                if any(match_line(line, expect_line) for line in output):
83                    success = True
84                    break
85
86                duration -= 1
87
88            if not success:
89                raise ExpectLineTimeoutError(expect_line)
90
91    def close(self):
92        """Close the OTCI instance."""
93        self.__otcmd.close()
94
95    def execute_command(self,
96                        cmd: str,
97                        timeout: float = 10,
98                        silent: bool = False,
99                        already_is_ok: bool = True) -> List[str]:
100        for i in range(self.__exec_command_retry + 1):
101            try:
102                return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok)
103            except Exception:
104                if i == self.__exec_command_retry:
105                    raise
106
107    def __execute_command(self,
108                          cmd: str,
109                          timeout: float = 10,
110                          silent: bool = False,
111                          already_is_ok: bool = True) -> List[str]:
112        """Execute the OpenThread CLI command.
113
114        :param cmd: The command to execute.
115        :param timeout: The command timeout.
116        :param silent: Whether to run the command silent without logging.
117        :returns: The command output as a list of lines.
118        """
119        if not silent:
120            self.log('info', '> %s', cmd)
121
122        output = self.__otcmd.execute_command(cmd, timeout)
123
124        if not silent:
125            for line in output:
126                self.log('info', '%s', line)
127
128        if cmd in ('reset', 'factoryreset'):
129            return output
130
131        if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'):
132            output = output[:-1]
133            return output
134        else:
135            raise CommandError(cmd, output)
136
137    def set_execute_command_retry(self, n: int):
138        assert n >= 0
139        self.__exec_command_retry = n
140
141    def shell(self, cmd: str, timeout: float = 10):
142        self.log('info', '# %s', cmd)
143        output = self.__otcmd.shell(cmd, timeout=timeout)
144        for line in output:
145            self.log('info', '%s', line)
146        return output
147
148    def set_logger(self, logger: logging.Logger):
149        """Set the logger for the OTCI instance, or None to disable logging."""
150        self.__logger = logger
151
152    def log(self, level, fmt, *args, **kwargs):
153        if self.__logger is not None:
154            getattr(self.__logger, level)('(%s) ' + fmt, repr(self), *args, **kwargs)
155
156    def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
157        """Set the callback that will be called for each line output by the CLI."""
158        self.__otcmd.set_line_read_callback(callback)
159
160    #
161    # Constant properties
162    #
163    @constant_property
164    def version(self):
165        """Returns the firmware version. (e.g. "OPENTHREAD/20191113-01411-gb2d66e424-dirty; SIMULATION; Nov 14 2020 14:24:38")"""
166        return self.__parse_str(self.execute_command('version'))
167
168    @constant_property
169    def thread_version(self):
170        """Get the Thread Version number."""
171        return self.__parse_int(self.execute_command('thread version'))
172
173    @constant_property
174    def api_version(self):
175        """Get API version number."""
176        try:
177            return self.__parse_int(self.execute_command('version api'))
178        except ValueError:
179            # If the device does not have `version api` command, it will print the firmware version, which would lead to ValueError.
180            return 0
181
182    #
183    # Basic device operations
184    #
185    def ifconfig_up(self):
186        """Bring up the IPv6 interface."""
187        self.execute_command('ifconfig up')
188
189    def ifconfig_down(self):
190        """Bring down the IPv6 interface."""
191        self.execute_command('ifconfig down')
192
193    def get_ifconfig_state(self) -> bool:
194        """Get the status of the IPv6 interface."""
195        return self.__parse_values(self.execute_command('ifconfig'), up=True, down=False)
196
197    def thread_start(self):
198        """Enable Thread protocol operation and attach to a Thread network."""
199        self.execute_command('thread start')
200
201    def thread_stop(self):
202        """Disable Thread protocol operation and detach from a Thread network."""
203        self.execute_command('thread stop')
204
205    def reset(self):
206        """Signal a platform reset."""
207        self.execute_command('reset')
208
209    def factory_reset(self):
210        """Delete all stored settings, and signal a platform reset."""
211        self.execute_command('factoryreset')
212
213    #
214    # Network Operations
215    #
216    _PING_STATISTICS_PATTERN = re.compile(
217        r'^(?P<transmitted>\d+) packets transmitted, (?P<received>\d+) packets received.(?: Packet loss = (?P<loss>\d+\.\d+)%.)?(?: Round-trip min/avg/max = (?P<min>\d+)/(?P<avg>\d+\.\d+)/(?P<max>\d+) ms.)?$'
218    )
219
220    def ping(self,
221             ip: str,
222             size: int = 8,
223             count: int = 1,
224             interval: float = 1,
225             hoplimit: int = 64,
226             timeout: float = 3) -> Dict:
227        """Send an ICMPv6 Echo Request.
228        The default arguments are consistent with https://github.com/openthread/openthread/blob/main/src/core/utils/ping_sender.hpp.
229
230        :param ip: The target IPv6 address to ping.
231        :param size: The number of data bytes in the payload. Default is 8.
232        :param count: The number of ICMPv6 Echo Requests to be sent. Default is 1.
233        :param interval: The interval between two consecutive ICMPv6 Echo Requests in seconds. The value may have fractional form, for example 0.5. Default is 1.
234        :param hoplimit: The hoplimit of ICMPv6 Echo Request to be sent. Default is 64. See OPENTHREAD_CONFIG_IP6_HOP_LIMIT_DEFAULT in src/core/config/ip6.h.
235        :param timeout: The maximum duration in seconds for the ping command to wait after the final echo request is sent. Default is 3.
236        """
237        cmd = f'ping {ip} {size} {count} {interval} {hoplimit} {timeout}'
238
239        timeout_allowance = 3
240        lines = self.execute_command(cmd, timeout=(count - 1) * interval + timeout + timeout_allowance)
241
242        statistics = {}
243        for line in lines:
244            m = OTCI._PING_STATISTICS_PATTERN.match(line)
245            if m is not None:
246                if m.group('transmitted') is not None:
247                    statistics['transmitted_packets'] = int(m.group('transmitted'))
248                    statistics['received_packets'] = int(m.group('received'))
249                if m.group('loss') is not None:
250                    statistics['packet_loss'] = float(m.group('loss')) / 100
251                if m.group('min') is not None:
252                    statistics['round_trip_time'] = {
253                        'min': int(m.group('min')),
254                        'avg': float(m.group('avg')),
255                        'max': int(m.group('max'))
256                    }
257        return statistics
258
259    def ping_stop(self):
260        """Stop sending ICMPv6 Echo Requests."""
261        self.execute_command('ping stop')
262
263    def discover(self, channel: int = None) -> List[Dict[str, Any]]:
264        """Perform an MLE Discovery operation."""
265        return self.__scan_networks('discover', channel)
266
267    def scan(self, channel: int = None) -> List[Dict[str, Any]]:
268        """Perform an IEEE 802.15.4 Active Scan."""
269        return self.__scan_networks('scan', channel)
270
271    def __scan_networks(self, cmd: str, channel: int = None) -> List[Dict[str, Any]]:
272        if channel is not None:
273            cmd += f' {channel}'
274
275        output = self.execute_command(cmd, timeout=10)
276        if len(output) < 2:
277            raise UnexpectedCommandOutput(output)
278
279        networks = []
280        for line in output[2:]:
281            fields = line.strip().split('|')
282
283            try:
284                _, J, netname, extpanid, panid, extaddr, ch, dbm, lqi, _ = fields
285            except Exception:
286                logging.warning('ignored output: %r', line)
287                continue
288
289            networks.append({
290                'joinable': bool(int(J)),
291                'network_name': netname.strip(),
292                'extpanid': extpanid,
293                'panid': int(panid, 16),
294                'extaddr': extaddr,
295                'channel': int(ch),
296                'dbm': int(dbm),
297                'lqi': int(lqi),
298            })
299
300        return networks
301
302    def scan_energy(self, duration: float = None, channel: int = None) -> Dict[int, int]:
303        """Perform an IEEE 802.15.4 Energy Scan."""
304        cmd = 'scan energy'
305        if duration is not None:
306            cmd += f' {duration * 1000:d}'
307
308        if channel is not None:
309            cmd += f' {channel}'
310
311        output = self.execute_command(cmd, timeout=10)
312        if len(output) < 2:
313            raise UnexpectedCommandOutput(output)
314
315        channels = {}
316        for line in output[2:]:
317            fields = line.strip().split('|')
318
319            _, Ch, RSSI, _ = fields
320            channels[int(Ch)] = int(RSSI)
321
322        return channels
323
324    def mac_send_data_request(self):
325        """Instruct an Rx-Off-When-Idle device to send a Data Request mac frame to its parent."""
326        self.execute_command('mac send datarequest')
327
328    def mac_send_empty_data(self):
329        """Instruct an Rx-Off-When-Idle device to send a Empty Data mac frame to its parent."""
330        self.execute_command('mac send emptydata')
331
332    # TODO: discover
333    # TODO: dns resolve <hostname> [DNS server IP] [DNS server port]
334    # TODO: fake /a/an <dst-ipaddr> <target> <meshLocalIid>
335    # TODO: sntp query
336
337    #
338    # Set or get device/network parameters
339    #
340
341    def get_mode(self) -> str:
342        """Get the Thread Device Mode value.
343
344            -: no flags set (rx-off-when-idle, minimal Thread device, stable network data)
345            r: rx-on-when-idle
346            d: Full Thread Device
347            n: Full Network Data
348        """
349        return self.__parse_str(self.execute_command('mode'))
350
351    def set_mode(self, mode: str):
352        """Set the Thread Device Mode value.
353
354            -: no flags set (rx-off-when-idle, minimal Thread device, stable network data)
355            r: rx-on-when-idle
356            d: Full Thread Device
357            n: Full Network Data
358        """
359        self.execute_command(f'mode {DeviceMode(mode)}')
360
361    def get_extaddr(self) -> str:
362        """Get the IEEE 802.15.4 Extended Address."""
363        return self.__parse_extaddr(self.execute_command('extaddr'))
364
365    def set_extaddr(self, extaddr: str):
366        """Set the IEEE 802.15.4 Extended Address."""
367        self.__validate_hex64b(extaddr)
368        self.execute_command(f'extaddr {extaddr}')
369
370    def get_eui64(self) -> str:
371        """Get the factory-assigned IEEE EUI-64."""
372        return self.__parse_eui64(self.execute_command('eui64'))
373
374    def set_extpanid(self, extpanid: str):
375        """Set the Thread Extended PAN ID value."""
376        self.__validate_extpanid(extpanid)
377        self.execute_command(f'extpanid {extpanid}')
378
379    def get_extpanid(self) -> str:
380        """Get the Thread Extended PAN ID value."""
381        return self.__parse_extpanid(self.execute_command('extpanid'))
382
383    def set_channel(self, ch):
384        """Set the IEEE 802.15.4 Channel value."""
385        self.execute_command('channel %d' % ch)
386
387    def get_channel(self):
388        """Get the IEEE 802.15.4 Channel value."""
389        return self.__parse_int(self.execute_command('channel'))
390
391    def get_preferred_channel_mask(self) -> int:
392        """Get preferred channel mask."""
393        return self.__parse_int(self.execute_command('channel preferred'))
394
395    def get_supported_channel_mask(self):
396        """Get supported channel mask."""
397        return self.__parse_int(self.execute_command('channel supported'))
398
399    def get_panid(self):
400        """Get the IEEE 802.15.4 PAN ID value."""
401        return self.__parse_int(self.execute_command('panid'), 16)
402
403    def set_panid(self, panid):
404        """Get the IEEE 802.15.4 PAN ID value."""
405        self.execute_command('panid %d' % panid)
406
407    def set_network_name(self, name):
408        """Set network name."""
409        self.execute_command('networkname %s' % self.__escape_escapable(name))
410
411    def get_network_name(self):
412        """Get network name."""
413        return self.__parse_str(self.execute_command('networkname'))
414
415    def get_network_key(self) -> str:
416        """Get the network key."""
417        return self.__parse_network_key(self.execute_command(self.__detect_networkkey_cmd()))
418
419    def set_network_key(self, networkkey: str):
420        """Set the network key."""
421        self.__validate_network_key(networkkey)
422        cmd = self.__detect_networkkey_cmd()
423        self.execute_command(f'{cmd} {networkkey}')
424
425    def get_key_sequence_counter(self) -> int:
426        """Get the Thread Key Sequence Counter."""
427        return self.__parse_int(self.execute_command('keysequence counter'))
428
429    def set_key_sequence_counter(self, counter: int):
430        """Set the Thread Key Sequence Counter."""
431        self.execute_command(f'keysequence counter {counter}')
432
433    def get_key_sequence_guard_time(self) -> int:
434        """Get Thread Key Switch Guard Time (in hours)."""
435        return self.__parse_int(self.execute_command('keysequence guardtime'))
436
437    def set_key_sequence_guard_time(self, hours: int):
438        """Set Thread Key Switch Guard Time (in hours) 0 means Thread Key Switch immediately if key index match."""
439        self.execute_command(f'keysequence guardtime {hours}')
440
441    def get_cca_threshold(self) -> int:
442        """Get the CCA threshold in dBm measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4."""
443        output = self.execute_command(f'ccathreshold')
444        val = self.__parse_str(output)
445        if not val.endswith(' dBm'):
446            raise UnexpectedCommandOutput(output)
447
448        return int(val[:-4])
449
450    def set_cca_threshold(self, val: int):
451        """Set the CCA threshold measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4."""
452        self.execute_command(f'ccathreshold {val}')
453
454    def get_promiscuous(self) -> bool:
455        """Get radio promiscuous property."""
456        return self.__parse_Enabled_or_Disabled(self.execute_command('promiscuous'))
457
458    def enable_promiscuous(self):
459        """Enable radio promiscuous operation and print raw packet content."""
460        self.execute_command('promiscuous enable')
461
462    def disable_promiscuous(self):
463        """Disable radio promiscuous operation."""
464        self.execute_command('promiscuous disable')
465
466    def get_txpower(self) -> int:
467        """Get the transmit power in dBm."""
468        line = self.__parse_str(self.execute_command('txpower'))
469        if not line.endswith(' dBm'):
470            raise UnexpectedCommandOutput([line])
471
472        return int(line.split()[0])
473
474    def set_txpower(self, val: int):
475        """Set the transmit power in dBm."""
476        self.execute_command(f'txpower {val}')
477
478    # TODO: fem
479    # TODO: fem lnagain
480    # TODO: fem lnagain <LNA gain>
481    # TODO: mac retries direct
482    # TODO: mac retries direct
483    # TODO: mac retries indirect
484    # TODO: mac retries indirect <number>
485
486    #
487    # Basic Node states and properties
488    #
489
490    def get_state(self) -> ThreadState:
491        """Get the current Thread state."""
492        return ThreadState(self.__parse_str(self.execute_command('state')))
493
494    def set_state(self, state: str):
495        """Try to switch to state detached, child, router or leader."""
496        self.execute_command(f'state {state}')
497
498    def get_rloc16(self) -> int:
499        """Get the Thread RLOC16 value."""
500        return self.__parse_int(self.execute_command('rloc16'), 16)
501
502    def get_router_id(self) -> int:
503        """Get the Thread Router ID value."""
504        return self.get_rloc16() >> 10
505
506    def prefer_router_id(self, routerid: int):
507        """Prefer a Router ID when solicit router id from Leader."""
508        self.execute_command(f'preferrouterid {routerid}')
509
510    def is_singleton(self) -> bool:
511        return self.__parse_values(self.execute_command('singleton'), true=True, false=False)
512
513    #
514    # RCP related utilities
515    #
516
517    def get_rcp_version(self):
518        return self.__parse_str(self.execute_command('rcp version'))
519
520    #
521    # Unsecure port utilities
522    #
523
524    def get_unsecure_ports(self) -> List[int]:
525        """all ports from the allowed unsecured port list."""
526        return self.__parse_int_list(self.execute_command('unsecureport get'))
527
528    def add_unsecure_port(self, port: int):
529        """Add a port to the allowed unsecured port list."""
530        self.execute_command(f'unsecureport add {port}')
531
532    def remove_unsecure_port(self, port: int):
533        """Remove a port from the allowed unsecured port list."""
534        self.execute_command(f'unsecureport remove {port}')
535
536    def clear_unsecure_ports(self):
537        """Remove all ports from the allowed unsecured port list."""
538        self.execute_command('unsecureport remove all')
539
540    #
541    # Leader configurations
542    #
543
544    def get_preferred_partition_id(self) -> PartitionId:
545        """Get the preferred Thread Leader Partition ID."""
546        return PartitionId(self.__parse_int(self.execute_command(self.__get_partition_preferred_cmd())))
547
548    def set_preferred_partition_id(self, parid: int):
549        """Set the preferred Thread Leader Partition ID."""
550        self.execute_command(f'{self.__get_partition_preferred_cmd()} {parid}')
551
552    def __get_partition_preferred_cmd(self) -> str:
553        """"""
554        return 'partitionid preferred' if self.api_version >= 51 else 'leaderpartitionid'
555
556    def get_leader_weight(self) -> int:
557        """Get the Thread Leader Weight."""
558        return self.__parse_int(self.execute_command('leaderweight'))
559
560    def set_leader_weight(self, weight: int):
561        """Set the Thread Leader Weight."""
562        self.execute_command(f'leaderweight {weight}')
563
564    __LEADER_DATA_KEY_MAP = {
565        'Partition ID': 'partition_id',
566        'Weighting': 'weight',
567        'Data Version': 'data_ver',
568        'Stable Data Version': 'stable_data_ver',
569        'Leader Router ID': 'leader_id',
570    }
571
572    def get_leader_data(self) -> Dict[str, int]:
573        """Get the Thread Leader Data."""
574        data = {}
575        output = self.execute_command('leaderdata')
576
577        try:
578            for line in output:
579                k, v = line.split(': ')
580                data[OTCI.__LEADER_DATA_KEY_MAP[k]] = int(v)
581        except KeyError:
582            raise UnexpectedCommandOutput(output)
583
584        return data
585
586    #
587    # Router configurations
588    #
589
590    def get_router_selection_jitter(self):
591        """Get the ROUTER_SELECTION_JITTER value."""
592        return self.__parse_int(self.execute_command('routerselectionjitter'))
593
594    def set_router_selection_jitter(self, jitter):
595        """Set the ROUTER_SELECTION_JITTER value."""
596        self.execute_command(f'routerselectionjitter {jitter}')
597
598    def get_network_id_timeout(self) -> int:
599        """Get the NETWORK_ID_TIMEOUT parameter used in the Router role."""
600        return self.__parse_int(self.execute_command('networkidtimeout'))
601
602    def set_network_id_timeout(self, timeout: int):
603        """Set the NETWORK_ID_TIMEOUT parameter used in the Router role."""
604        self.execute_command(f'networkidtimeout {timeout}')
605
606    def get_parent_priority(self) -> int:
607        """Get the assigned parent priority value, -2 means not assigned."""
608        return self.__parse_int(self.execute_command('parentpriority'))
609
610    def set_parent_priority(self, priority: int):
611        """Set the assigned parent priority value: 1, 0, -1 or -2."""
612        self.execute_command(f'parentpriority {priority}')
613
614    def get_router_upgrade_threshold(self) -> int:
615        """Get the ROUTER_UPGRADE_THRESHOLD value."""
616        return self.__parse_int(self.execute_command('routerupgradethreshold'))
617
618    def set_router_upgrade_threshold(self, threshold: int):
619        """Set the ROUTER_UPGRADE_THRESHOLD value."""
620        self.execute_command(f'routerupgradethreshold {threshold}')
621
622    def get_router_downgrade_threshold(self):
623        """Set the ROUTER_DOWNGRADE_THRESHOLD value."""
624        return self.__parse_int(self.execute_command('routerdowngradethreshold'))
625
626    def set_router_downgrade_threshold(self, threshold: int):
627        """Get the ROUTER_DOWNGRADE_THRESHOLD value."""
628        self.execute_command(f'routerdowngradethreshold {threshold}')
629
630    def get_router_eligible(self) -> bool:
631        """Indicates whether the router role is enabled or disabled."""
632        return self.__parse_Enabled_or_Disabled(self.execute_command('routereligible'))
633
634    def enable_router_eligible(self):
635        """Disable the router role."""
636        self.execute_command('routereligible enable')
637
638    def disable_router_eligible(self):
639        """Disable the router role."""
640        self.execute_command('routereligible disable')
641
642    def get_router_list(self) -> List[RouterId]:
643        """Get allocated Router IDs."""
644        line = self.__parse_str(self.execute_command('router list'))
645        return list(map(RouterId, line.strip().split()))
646
647    def get_router_table(self) -> Dict[RouterId, RouterTableEntry]:
648        """table of routers."""
649        output = self.execute_command('router table')
650        if len(output) < 2:
651            raise UnexpectedCommandOutput(output)
652
653        #
654        # Example output:
655        #
656        # | ID | RLOC16 | Next Hop | Path Cost | LQ In | LQ Out | Age | Extended MAC     |
657        # +----+--------+----------+-----------+-------+--------+-----+------------------+
658        # | 21 | 0x5400 |       21 |         0 |     3 |      3 |   5 | d28d7f875888fccb |
659        # | 56 | 0xe000 |       56 |         0 |     0 |      0 | 182 | f2d92a82c8d8fe43 |
660        # Done
661        #
662
663        headers = self.__split_table_row(output[0])
664
665        table = {}
666        for line in output[2:]:
667            line = line.strip()
668            if not line:
669                continue
670
671            fields = self.__split_table_row(line)
672            if len(fields) != len(headers):
673                raise UnexpectedCommandOutput(output)
674
675            col = lambda colname: self.__get_table_col(colname, headers, fields)
676            id = col('ID')
677
678            table[RouterId(id)] = router = RouterTableEntry({
679                'id': RouterId(id),
680                'rloc16': Rloc16(col('RLOC16'), 16),
681                'next_hop': int(col('Next Hop')),
682                'path_cost': int(col('Path Cost')),
683                'lq_in': int(col('LQ In')),
684                'lq_out': int(col('LQ Out')),
685                'age': int(col('Age')),
686                'extaddr': col('Extended MAC'),
687            })
688
689            if 'Link' in headers:
690                router['link'] = int(col('Link'))
691            else:
692                # support older version of OT which does not output `Link` field
693                router['link'] = self.get_router_info(router['id'], silent=True)['link']
694
695        return table
696
697    def get_router_info(self, id: int, silent: bool = False) -> RouterTableEntry:
698        cmd = f'router {id}'
699        info = {}
700        output = self.execute_command(cmd, silent=silent)
701        items = [line.strip().split(': ') for line in output]
702
703        headers = [h for h, _ in items]
704        fields = [f for _, f in items]
705        col = lambda colname: self.__get_table_col(colname, headers, fields)
706
707        return RouterTableEntry({
708            'id': RouterId(id),
709            'rloc16': Rloc16(col('Rloc'), 16),
710            'alloc': int(col('Alloc')),
711            'next_hop': int(col('Next Hop'), 16) >> 10,  # convert RLOC16 to Router ID
712            'link': int(col('Link')),
713        })
714
715    #
716    # Router utilities: Child management
717    #
718
719    def get_child_table(self) -> Dict[ChildId, Dict[str, Any]]:
720        """Get the table of attached children."""
721        output = self.execute_command('child table')
722        if len(output) < 2:
723            raise UnexpectedCommandOutput(output)
724
725        #
726        # Example output:
727        # | ID  | RLOC16 | Timeout    | Age        | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC     |
728        # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+
729        # |   1 | 0xc801 |        240 |         24 |     3 |  131 |1|0|0|  3| 0 |     0 | 4ecede68435358ac |
730        # |   2 | 0xc802 |        240 |          2 |     3 |  131 |0|0|0|  3| 1 |     0 | a672a601d2ce37d8 |
731        # Done
732        #
733
734        headers = self.__split_table_row(output[0])
735
736        table = {}
737        for line in output[2:]:
738            line = line.strip()
739            if not line:
740                continue
741
742            fields = self.__split_table_row(line)
743            col = lambda colname: self.__get_table_col(colname, headers, fields)
744
745            id = int(col("ID"))
746            r, d, n = int(col("R")), int(col("D")), int(col("N"))
747            mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}')
748
749            child = {
750                'id': ChildId(id),
751                'rloc16': Rloc16(col('RLOC16'), 16),
752                'timeout': int(col('Timeout')),
753                'age': int(col('Age')),
754                'lq_in': int(col('LQ In')),
755                'c_vn': int(col('C_VN')),
756                'mode': mode,
757                'extaddr': col('Extended MAC')
758            }
759
760            if 'Ver' in headers:
761                child['ver'] = int(col('Ver'))
762
763            if 'CSL' in headers:
764                child['csl'] = bool(int(col('CSL')))
765
766            if 'QMsgCnt' in headers:
767                child['qmsgcnt'] = int(col('QMsgCnt'))
768
769            table[ChildId(id)] = child
770
771        return table
772
773    #
774    # DNS server & client utilities
775    #
776
777    _IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)')
778
779    def dns_get_config(self):
780        """Get DNS client query config."""
781        output = self.execute_command('dns config')
782        config = {}
783        for line in output:
784            k, v = line.split(': ')
785            if k == 'Server':
786                ip, port = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, v).groups()
787                config['server'] = (Ip6Addr(ip), int(port))
788            elif k == 'ResponseTimeout':
789                config['response_timeout'] = int(v[:-3])
790            elif k == 'MaxTxAttempts':
791                config['max_tx_attempts'] = int(v)
792            elif k == 'RecursionDesired':
793                config['recursion_desired'] = (v == 'yes')
794            else:
795                logging.warning("dns config ignored: %s", line)
796
797        return config
798
799    def dns_set_config(self,
800                       server: Tuple[Union[str, ipaddress.IPv6Address], int],
801                       response_timeout: int = None,
802                       max_tx_attempts: int = None,
803                       recursion_desired: bool = None):
804        """Set DNS client query config."""
805        cmd = f'dns config {str(server[0])} {server[1]}'
806        if response_timeout is not None:
807            cmd += f' {response_timeout}'
808
809        assert max_tx_attempts is None or response_timeout is not None, "must specify `response_timeout` if `max_tx_attempts` is specified."
810        if max_tx_attempts is not None:
811            cmd += f' {max_tx_attempts}'
812
813        assert recursion_desired is None or max_tx_attempts is not None, 'must specify `max_tx_attempts` if `recursion_desired` is specified.'
814        if recursion_desired is not None:
815            cmd += f' {1 if recursion_desired else 0}'
816
817        self.execute_command(cmd)
818
819    def dns_get_compression(self) -> bool:
820        """Get DNS compression mode."""
821        return self.__parse_Enabled_or_Disabled(self.execute_command('dns compression'))
822
823    def dns_enable_compression(self):
824        """Enable DNS compression mode."""
825        self.execute_command('dns compression enable')
826
827    def dns_disable_compression(self):
828        """Disable DNS compression mode."""
829        self.execute_command('dns compression disable')
830
831    def dns_browse(self, service: str) -> List[Dict]:
832        """Browse DNS service instances."""
833        cmd = f'dns browse {service}'
834        output = '\n'.join(self.execute_command(cmd, 30.0))
835
836        result = []
837        for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall(
838                r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)',
839                output):
840            result.append({
841                'instance': ins,
842                'service': service,
843                'port': int(port),
844                'priority': int(priority),
845                'weight': int(weight),
846                'host': hostname,
847                'address': Ip6Addr(address),
848                'txt': self.__parse_srp_server_service_txt(txt_data),
849                'srv_ttl': int(srv_ttl),
850                'txt_ttl': int(txt_ttl),
851                'aaaa_ttl': int(aaaa_ttl),
852            })
853
854        return result
855
856    def dns_resolve(self, hostname: str) -> List[Dict]:
857        """Resolve a DNS host name."""
858        cmd = f'dns resolve {hostname}'
859        output = self.execute_command(cmd, 30.0)
860        dns_resp = output[0]
861        addrs = dns_resp.strip().split(' - ')[1].split(' ')
862        ips = [Ip6Addr(item.strip()) for item in addrs[::2]]
863        ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
864
865        return [{
866            'address': ip,
867            'ttl': ttl,
868        } for ip, ttl in zip(ips, ttls)]
869
870    def dns_resolve_service(self, instance: str, service: str) -> Dict:
871        """Resolves aservice instance."""
872        instance = self.__escape_escapable(instance)
873        cmd = f'dns service {instance} {service}'
874        output = self.execute_command(cmd, 30.0)
875
876        m = re.match(
877            r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)',
878            '\t'.join(output))
879        if m:
880            port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
881            return {
882                'instance': instance,
883                'service': service,
884                'port': int(port),
885                'priority': int(priority),
886                'weight': int(weight),
887                'host': hostname,
888                'address': Ip6Addr(address),
889                'txt': self.__parse_srp_server_service_txt(txt_data),
890                'srv_ttl': int(srv_ttl),
891                'txt_ttl': int(txt_ttl),
892                'aaaa_ttl': int(aaaa_ttl),
893            }
894        else:
895            raise CommandError(cmd, output)
896
897    #
898    # SRP server & client utilities
899    #
900
901    def srp_server_get_state(self):
902        """Get the SRP server state"""
903        return self.__parse_str(self.execute_command('srp server state'))
904
905    def srp_server_enable(self):
906        """Enable SRP server."""
907        self.execute_command('srp server enable')
908
909    def srp_server_disable(self):
910        """Disable SRP server."""
911        self.execute_command('srp server disable')
912
913    def srp_server_get_domain(self) -> str:
914        """Get the SRP server domain."""
915        return self.__parse_str(self.execute_command('srp server domain'))
916
917    def srp_server_set_domain(self, domain: str):
918        """Set the SRP server domain."""
919        self.execute_command(f'srp server domain {domain}')
920
921    def srp_server_get_hosts(self) -> List[Dict]:
922        """Get SRP server registered hosts."""
923        return self.__parse_srp_server_hosts(self.execute_command('srp server host'))
924
925    def srp_server_get_services(self) -> List[Dict]:
926        """Get SRP server registered services."""
927        output = self.execute_command('srp server service')
928        return self.__parse_srp_server_services(output)
929
930    def __parse_srp_server_hosts(self, output: List[str]) -> List[Dict]:
931        result = []
932        info = None
933        for line in output:
934            if not line.startswith(' '):
935                info = {'host': line}
936                result.append(info)
937            else:
938                k, v = line.strip().split(': ')
939                if k == 'deleted':
940                    if v not in ('true', 'false'):
941                        raise UnexpectedCommandOutput(output)
942
943                    info['deleted'] = (v == 'true')
944
945                elif k == 'addresses':
946                    if not v.startswith('[') or not v.endswith(']'):
947                        raise UnexpectedCommandOutput(output)
948
949                    v = v[1:-1]
950                    info['addresses'] = list(map(Ip6Addr, v.split(', ')))
951                else:
952                    raise UnexpectedCommandOutput(output)
953
954        return result
955
956    def __parse_srp_server_services(self, output: List[str]) -> List[Dict]:
957        result = []
958        info = None
959        for line in output:
960            if not line.startswith(' '):
961                info = {'instance': line}
962                result.append(info)
963            else:
964                k, v = line.strip().split(': ')
965                if k == 'deleted':
966                    if v not in ('true', 'false'):
967                        raise UnexpectedCommandOutput(output)
968
969                    info['deleted'] = (v == 'true')
970
971                elif k == 'addresses':
972                    if not v.startswith('[') or not v.endswith(']'):
973                        raise UnexpectedCommandOutput(output)
974
975                    v = v[1:-1]
976                    info['addresses'] = list(map(Ip6Addr, v.split(', ')))
977                elif k == 'subtypes':
978                    info[k] = list() if v == '(null)' else list(v.split(','))
979                elif k in ('port', 'weight', 'priority', 'ttl'):
980                    info[k] = int(v)
981                elif k in ('host',):
982                    info[k] = v
983                elif k == 'TXT':
984                    info['txt'] = self.__parse_srp_server_service_txt(v)
985                else:
986                    raise UnexpectedCommandOutput(output)
987
988        return result
989
990    def __parse_srp_server_service_txt(self, txt: str) -> Dict[str, Union[bytes, bool]]:
991        # example value: [txt11=76616c3131, txt12=76616c3132]
992        assert txt.startswith('[') and txt.endswith(']')
993        txt_dict = {}
994        for entry in txt[1:-1].split(', '):
995            if not entry:
996                continue
997
998            equal_pos = entry.find('=')
999
1000            if equal_pos != -1:
1001                k, v = entry[:equal_pos], entry[equal_pos + 1:]
1002                txt_dict[k] = bytes(int(v[i:i + 2], 16) for i in range(0, len(v), 2))
1003            else:
1004                txt_dict[entry] = True
1005
1006        return txt_dict
1007
1008    def srp_server_get_lease(self) -> Tuple[int, int, int, int]:
1009        """Get SRP server LEASE & KEY-LEASE range (in seconds)."""
1010        lines = self.execute_command(f'srp server lease')
1011        return tuple([int(line.split(':')[1].strip()) for line in lines])
1012
1013    def srp_server_set_lease(self, min_lease: int, max_lease: int, min_key_lease: int, max_key_lease: int):
1014        """Configure SRP server LEASE & KEY-LEASE range (in seconds)."""
1015        self.execute_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
1016
1017    def srp_client_get_state(self) -> bool:
1018        """Get SRP client state."""
1019        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client state'))
1020
1021    def srp_client_start(self, server_ip: Union[str, ipaddress.IPv6Address], server_port: int):
1022        """Start SRP client."""
1023        self.execute_command(f'srp client start {str(server_ip)} {server_port}')
1024
1025    def srp_client_stop(self):
1026        """Stop SRP client."""
1027        self.execute_command('srp client stop')
1028
1029    def srp_client_get_autostart(self) -> bool:
1030        """Get SRP client autostart mode."""
1031        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client autostart'))
1032
1033    def srp_client_enable_autostart(self):
1034        """Enable SRP client autostart mode."""
1035        self.execute_command('srp client autostart enable')
1036
1037    def srp_client_disable_autostart(self):
1038        """Disable SRP client autostart mode."""
1039        self.execute_command('srp client autostart disable')
1040
1041    def srp_client_get_callback(self) -> bool:
1042        """Get SRP client callback mode."""
1043        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client callback'))
1044
1045    def srp_client_enable_callback(self):
1046        """Enable SRP client callback mode."""
1047        self.execute_command('srp client callback enable')
1048
1049    def srp_client_disable_callback(self):
1050        """Disable SRP client callback mode."""
1051        self.execute_command('srp client callback disable')
1052
1053    def srp_client_set_host_name(self, name: str):
1054        """Set SRP client host name."""
1055        self.execute_command(f'srp client host name {name}')
1056
1057    def srp_client_get_host(self) -> Dict:
1058        """Get SRP client host."""
1059        output = self.__parse_str(self.execute_command('srp client host'))
1060        return self.__parse_srp_client_host(output)
1061
1062    _SRP_CLIENT_HOST_PATTERN = re.compile(r'name:("(.*)"|(\(null\))), state:(\S+), addrs:\[(.*)\]')
1063
1064    def __parse_srp_client_host(self, line: str) -> Dict:
1065        m = re.match(OTCI._SRP_CLIENT_HOST_PATTERN, line)
1066        if not m:
1067            raise UnexpectedCommandOutput([line])
1068
1069        _, host, _, state, addrs = m.groups()
1070        return {
1071            'host': host or '',
1072            'state': state,
1073            'addresses': [Ip6Addr(ip) for ip in addrs.split(', ')] if addrs else [],
1074        }
1075
1076    def srp_client_get_host_name(self) -> str:
1077        """Get SRP client host name."""
1078        name = self.__parse_str(self.execute_command('srp client host name'))
1079        return name if name != '(null)' else ''
1080
1081    def srp_client_get_host_addresses(self) -> List[Ip6Addr]:
1082        """Get SRP client host addresses."""
1083        return self.__parse_ip6addr_list(self.execute_command('srp client host address'))
1084
1085    def srp_client_set_host_addresses(self, *addrs: Union[str, ipaddress.IPv6Address]):
1086        """Set SRP client host addresses."""
1087        self.execute_command(f'srp client host address {" ".join(map(str, addrs))}')
1088
1089    def srp_client_get_host_state(self):
1090        """Get SRP client host state."""
1091        return self.__parse_str(self.execute_command('srp client host state'))
1092
1093    def srp_client_remove_host(self, remove_key_lease=False):
1094        """Remove SRP client host."""
1095        cmd = 'srp client host remove'
1096        if remove_key_lease:
1097            cmd += ' 1'
1098
1099        self.execute_command(cmd)
1100
1101    def srp_client_get_services(self) -> List[Dict]:
1102        """Get SRP client services."""
1103        output = self.execute_command('srp client service')
1104        return [self.__parse_srp_client_service(line) for line in output]
1105
1106    _SRP_CLIENT_SERVICE_PATTERN = re.compile(
1107        r'instance:"(.*)", name:"(.*)", state:(\S+), port:(\d+), priority:(\d+), weight:(\d+)')
1108
1109    def __parse_srp_client_service(self, line: str) -> Dict:
1110        # e.g. instance:"ins2", name:"_meshcop._udp", state:ToAdd, port:2000, priority:2, weight:2
1111        m = OTCI._SRP_CLIENT_SERVICE_PATTERN.match(line)
1112        if m is None:
1113            raise UnexpectedCommandOutput([line])
1114
1115        instance, service, state, port, priority, weight = m.groups()
1116        port, priority, weight = int(port), int(priority), int(weight)
1117        return {
1118            'instance': instance,
1119            'service': service,
1120            'state': state,
1121            'port': port,
1122            'priority': priority,
1123            'weight': weight,
1124        }
1125
1126    def srp_client_add_service(self,
1127                               instance: str,
1128                               service: str,
1129                               port: int,
1130                               priority: int = 0,
1131                               weight: int = 0,
1132                               txt: Dict[str, Union[str, bytes, bool]] = None):
1133        instance = self.__escape_escapable(instance)
1134        cmd = f'srp client service add {instance} {service} {port} {priority} {weight}'
1135        if txt:
1136            cmd += f' {self.__txt_to_hex(txt)}'
1137        self.execute_command(cmd)
1138
1139    def srp_client_remove_service(self, instance: str, service: str):
1140        """Remove a service from SRP client."""
1141        self.execute_command(f'srp client service remove {instance} {service}')
1142
1143    def srp_client_clear_service(self, instance: str, service: str):
1144        """Remove a service from SRP client without notifying the SRP server."""
1145        self.execute_command(f'srp client service clear {instance} {service}')
1146
1147    def srp_client_get_key_lease_interval(self) -> int:
1148        """Get SRP client key lease interval (in seconds)."""
1149        return self.__parse_int(self.execute_command('srp client keyleaseinterval'))
1150
1151    def srp_client_set_key_lease_interval(self, interval: int):
1152        """Set SRP client key lease interval (in seconds)."""
1153        self.execute_command(f'srp client keyleaseinterval {interval}')
1154
1155    def srp_client_get_lease_interval(self) -> int:
1156        """Get SRP client lease interval (in seconds)."""
1157        return self.__parse_int(self.execute_command('srp client leaseinterval'))
1158
1159    def srp_client_set_lease_interval(self, interval: int):
1160        """Set SRP client lease interval (in seconds)."""
1161        self.execute_command(f'srp client leaseinterval {interval}')
1162
1163    def srp_client_get_server(self) -> Tuple[Ip6Addr, int]:
1164        """Get the SRP server (IP, port)."""
1165        result = self.__parse_str(self.execute_command('srp client server'))
1166        ip, port = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, result).groups()
1167        return Ip6Addr(ip), int(port)
1168
1169    def srp_client_get_service_key(self) -> bool:
1170        """Get SRP client "service key record inclusion" mode."""
1171        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client service key'))
1172
1173    def srp_client_enable_service_key(self):
1174        """Enable SRP client "service key record inclusion" mode."""
1175        self.execute_command('srp client service key enable')
1176
1177    def srp_client_disable_service_key(self):
1178        """Disable SRP client "service key record inclusion" mode."""
1179        self.execute_command('srp client service key disable')
1180
1181    def __split_table_row(self, row: str) -> List[str]:
1182        if not (row.startswith('|') and row.endswith('|')):
1183            raise ValueError(row)
1184
1185        fields = row.split('|')
1186        fields = [x.strip() for x in fields[1:-1]]
1187        return fields
1188
1189    def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
1190        return fields[headers.index(colname)]
1191
1192    def get_child_list(self) -> List[ChildId]:
1193        """Get attached Child IDs."""
1194        line = self.__parse_str(self.execute_command(f'child list'))
1195        return [ChildId(id) for id in line.strip().split()]
1196
1197    def get_child_info(self, child: Union[ChildId, Rloc16]) -> Dict[str, Any]:
1198        output = self.execute_command(f'child {child}')
1199
1200        info = {}
1201
1202        for line in output:
1203            k, v = line.split(': ')
1204            if k == 'Child ID':
1205                info['id'] = int(v)
1206            elif k == 'Rloc':
1207                info['rloc16'] = int(v, 16)
1208            elif k == 'Ext Addr':
1209                info['extaddr'] = v
1210            elif k == 'Mode':
1211                info['mode'] = DeviceMode(v)
1212            elif k == 'Net Data':
1213                info['c_vn'] = int(v)
1214            elif k == 'Timeout':
1215                info['timeout'] = int(v)
1216            elif k == 'Age':
1217                info['age'] = int(v)
1218            elif k == 'Link Quality In':
1219                info['lq_in'] = int(v)
1220            elif k == 'RSSI':
1221                info['rssi'] = int(v)
1222            else:
1223                self.log('warning', "Child info %s: %s ignored", k, v)
1224
1225        return info
1226
1227    def get_child_ipaddrs(self) -> Dict[Rloc16, List[Ip6Addr]]:
1228        """Get the list of IP addresses stored for MTD children.
1229
1230        Note: Each MTD child might has multiple IP addresses.
1231        """
1232        output = self.execute_command('childip')
1233
1234        ipaddrs = {}
1235
1236        for line in output:
1237            rloc16, ip = line.split(': ')
1238            rloc16 = Rloc16(rloc16, 16)
1239            ipaddrs.setdefault(rloc16, []).append(Ip6Addr(ip.strip()))
1240
1241        return ipaddrs
1242
1243    #
1244    # Child configurations
1245    #
1246
1247    def get_max_children(self) -> int:
1248        """Get the Thread maximum number of allowed children."""
1249        return self.__parse_int(self.execute_command('childmax'))
1250
1251    def set_max_children(self, val: int):
1252        """Set the Thread maximum number of allowed children."""
1253        self.execute_command(f'childmax {val}')
1254
1255    def get_child_ip_max(self) -> int:
1256        """Get the maximum number of IP addresses that each MTD child may register with this device as parent."""
1257        return self.__parse_int(self.execute_command('childip max'))
1258
1259    def set_child_ip_max(self, val: int):
1260        """Get the maximum number of IP addresses that each MTD child may register with this device as parent."""
1261        self.execute_command(f'childip max {val}')
1262
1263    def get_child_timeout(self):
1264        """Get the Thread Child Timeout value."""
1265        return self.__parse_int(self.execute_command('childtimeout'))
1266
1267    def set_child_timeout(self, timeout):
1268        """Set the Thread Child Timeout value."""
1269        self.execute_command('childtimeout %d' % timeout)
1270
1271    def get_child_supervision_interval(self) -> int:
1272        """Get the Child Supervision Check Timeout value."""
1273        return self.__parse_int(self.execute_command('childsupervision interval'))
1274
1275    def set_child_supervision_interval(self, val: int):
1276        """Set the Child Supervision Interval value.
1277        This command can only be used with FTD devices.
1278        """
1279        self.execute_command(f'childsupervision interval {val}')
1280
1281    def get_child_supervision_check_timeout(self) -> int:
1282        """Get the Child Supervision Check Timeout value."""
1283        return self.__parse_int(self.execute_command('childsupervision checktimeout'))
1284
1285    def set_child_supervision_check_timeout(self, val: int):
1286        """Set the Child Supervision Check Timeout value."""
1287        self.execute_command(f'childsupervision checktimeout {val}')
1288
1289    #
1290    # Neighbor management
1291    #
1292
1293    def get_neighbor_list(self) -> List[Rloc16]:
1294        """Get a list of RLOC16 of neighbors"""
1295        line = self.__parse_str(self.execute_command('neighbor list')).strip()
1296        return [Rloc16(id, 16) for id in line.split()]
1297
1298    def get_neighbor_table(self) -> Dict[Rloc16, Dict[str, Any]]:
1299        output = self.execute_command('neighbor table')
1300        if len(output) < 2:
1301            raise UnexpectedCommandOutput(output)
1302
1303        #
1304        # Example output:
1305        #
1306        # | Role | RLOC16 | Age | Avg RSSI | Last RSSI |R|D|N| Extended MAC     |
1307        # +------+--------+-----+----------+-----------+-+-+-+------------------+
1308        # |   C  | 0xcc01 |  96 |      -46 |       -46 |1|1|1| 1eb9ba8a6522636b |
1309        # |   R  | 0xc800 |   2 |      -29 |       -29 |1|1|1| 9a91556102c39ddb |
1310        # |   R  | 0xf000 |   3 |      -28 |       -28 |1|1|1| 0ad7ed6beaa6016d |
1311        # Done
1312        #
1313
1314        headers = self.__split_table_row(output[0])
1315
1316        table = {}
1317        for line in output[2:]:
1318            line = line.strip()
1319            if not line:
1320                continue
1321
1322            fields = self.__split_table_row(line)
1323            col = lambda colname: self.__get_table_col(colname, headers, fields)
1324
1325            role = col('Role')
1326            is_router = role == 'R'
1327            r, d, n = int(col('R')), int(col('D')), int(col('N'))
1328            mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}')
1329
1330            rloc16 = Rloc16(col('RLOC16'), 16)
1331
1332            table[rloc16] = {
1333                'is_router': is_router,
1334                'rloc16': rloc16,
1335                'age': int(col('Age')),
1336                'avg_rssi': int(col('Avg RSSI')),
1337                'last_rssi': int(col('Last RSSI')),
1338                'mode': mode,
1339                'extaddr': col('Extended MAC'),
1340            }
1341
1342        return table
1343
1344    #
1345    # SED/SSED configuration
1346    #
1347
1348    def get_poll_period(self) -> int:
1349        """Get the customized data poll period of sleepy end device (milliseconds).
1350        Only for Reference Device."""
1351        return self.__parse_int(self.execute_command('pollperiod'))
1352
1353    def set_poll_period(self, poll_period: int):
1354        """Set the customized data poll period (in milliseconds) for sleepy end device.
1355
1356        Only for Reference Device."""
1357        self.execute_command(f'pollperiod {poll_period}')
1358
1359    # TODO: csl
1360    # TODO: csl channel <channel>
1361    # TODO: csl period <period>
1362    # TODO: csl timeout <timeout>
1363
1364    _CSL_PERIOD_PATTERN = re.compile(r'(\d+)\(in units of 10 symbols\), \d+ms')
1365    _CSL_TIMEOUT_PATTERN = re.compile(r'(\d+)s')
1366
1367    def get_csl_config(self) -> Dict[str, int]:
1368        """Get the CSL configuration."""
1369        output = self.execute_command('csl')
1370
1371        cfg = {}
1372        for line in output:
1373            k, v = line.split(': ')
1374            if k == 'Channel':
1375                cfg['channel'] = int(v)
1376            elif k == 'Timeout':
1377                cfg['timeout'] = int(OTCI._CSL_TIMEOUT_PATTERN.match(v).group(1))
1378            elif k == 'Period':
1379                cfg['period'] = int(OTCI._CSL_PERIOD_PATTERN.match(v).group(1))
1380            else:
1381                logging.warning("Ignore unknown CSL parameter: %s: %s", k, v)
1382
1383        return cfg
1384
1385    def config_csl(self, channel: int = None, period: int = None, timeout: int = None):
1386        """Configure CSL parameters.
1387
1388        :param channel: Set CSL channel.
1389        :param period: Set CSL period in units of 10 symbols. Disable CSL by setting this parameter to 0.
1390        :param timeout: Set the CSL timeout in seconds.
1391        """
1392
1393        if channel is None and period is None and timeout is None:
1394            raise InvalidArgumentsError("Please specify at least 1 parameter to configure.")
1395
1396        if channel is not None:
1397            self.execute_command(f'csl channel {channel}')
1398
1399        if period is not None:
1400            self.execute_command(f'csl period {period}')
1401
1402        if timeout is not None:
1403            self.execute_command(f'csl timeout {timeout}')
1404
1405    #
1406    # Leader utilities
1407    #
1408
1409    def get_context_id_reuse_delay(self) -> int:
1410        """Get the CONTEXT_ID_REUSE_DELAY value."""
1411        return self.__parse_int(self.execute_command('contextreusedelay'))
1412
1413    def set_context_id_reuse_delay(self, val: int):
1414        """Set the CONTEXT_ID_REUSE_DELAY value."""
1415        self.execute_command(f'contextreusedelay {val}')
1416
1417    def release_router_id(self, routerid: int):
1418        """Release a Router ID that has been allocated by the device in the Leader role."""
1419        self.execute_command(f'releaserouterid {routerid}')
1420
1421    # Time Sync utilities
1422    # TODO: networktime
1423    # TODO: networktime <timesyncperiod> <xtalthreshold>
1424    # TODO: delaytimermin
1425    # TODO: delaytimermin <delaytimermin>
1426
1427    #
1428    # Commissioniner operations
1429    #
1430
1431    def commissioner_start(self):
1432        """Start the Commissioner role."""
1433        self.execute_command('commissioner start')
1434
1435    def commissioner_stop(self):
1436        """Stop the Commissioner role."""
1437        self.execute_command('commissioner stop')
1438
1439    def get_commissioiner_state(self) -> str:
1440        """Get current Commissioner state (active or petitioning or disabled)."""
1441        return self.__parse_str(self.execute_command('commissioner state'))
1442
1443    def get_commissioner_session_id(self) -> int:
1444        """Get current commissioner session id."""
1445        return self.__parse_int(self.execute_command('commissioner sessionid'))
1446
1447    def commissioner_add_joiner(self, pskd, eui64=None, discerner=None, timeout=None):
1448        """Add a Joiner entry.
1449
1450        :param pskd: Pre-Shared Key for the Joiner.
1451        :param eui64: The IEEE EUI-64 of the Joiner or '*' to match any Joiner
1452        :param discerner: The Joiner discerner in format number/length.
1453        :param timeout: Joiner timeout in seconds.
1454        """
1455        if (eui64 is not None) == (discerner is not None):
1456            raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.")
1457
1458        if eui64 is not None and eui64 != '*':
1459            self.__validate_extaddr(eui64)
1460
1461        cmd = f'commissioner joiner add {eui64 or discerner} {pskd}'
1462
1463        if timeout is not None:
1464            cmd += f' {timeout}'
1465
1466        self.execute_command(cmd)
1467
1468    def commissioner_remove_jointer(self, eui64=None, discerner=None):
1469        if (eui64 is not None) == (discerner is not None):
1470            raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.")
1471
1472        if eui64 is not None and eui64 != '*':
1473            self.__validate_extaddr(eui64)
1474
1475        self.execute_command(f'commissioner joiner remove {eui64 or discerner}')
1476
1477    def set_commissioner_provisioning_url(self, url: str):
1478        self.execute_command(f'commissioner provisioningurl {url}')
1479
1480    # TODO: commissioner announce
1481    # TODO: commissioner energy
1482    # TODO: commissioner mgmtget
1483    # TODO: commissioner mgmtset
1484    # TODO: commissioner panid
1485
1486    #
1487    # Joiner operations
1488    #
1489    def joiner_start(self, psk: str, provisioning_url: str = None):
1490        """Start the Joiner."""
1491        cmd = f'joiner start {psk}'
1492        if provisioning_url is not None:
1493            cmd += f' {provisioning_url}'
1494
1495        self.execute_command(cmd)
1496
1497    def joiner_stop(self):
1498        """Stop the Joiner role."""
1499        self.execute_command('joiner stop')
1500
1501    def get_joiner_id(self) -> str:
1502        """Get the Joiner ID."""
1503        return self.__parse_joiner_id(self.execute_command('joiner id'))
1504
1505    def get_joiner_port(self) -> int:
1506        """Get the Joiner port."""
1507        return self.__parse_int(self.execute_command(f'joinerport'))
1508
1509    def set_joiner_port(self, port: int):
1510        """Set the Joiner port."""
1511        self.execute_command(f'joinerport {port}')
1512
1513    # TODO: joiner discerner
1514
1515    #
1516    # Network Data utilities
1517    #
1518    def get_local_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1519        """Get prefixes from local Network Data."""
1520        output = self.execute_command('prefix')
1521        return self.__parse_prefixes(output)
1522
1523    def __parse_prefixes(self, output: List[str]) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1524        prefixes = []
1525
1526        for line in output:
1527            if line.startswith('- '):
1528                line = line[2:]
1529
1530            prefix, flags, prf, rloc16 = line.split()[:4]
1531            prefixes.append((Ip6Prefix(prefix), flags, prf, Rloc16(rloc16, 16)))
1532
1533        return prefixes
1534
1535    def add_prefix(self, prefix: str, flags='paosr', prf='med'):
1536        """Add a valid prefix to the Network Data."""
1537        self.execute_command(f'prefix add {prefix} {flags} {prf}')
1538
1539    def remove_prefix(self, prefix: str):
1540        """Invalidate a prefix in the Network Data."""
1541        self.execute_command(f'prefix remove {prefix}')
1542
1543    def register_network_data(self):
1544        self.execute_command('netdata register')
1545
1546    def get_network_data(self) -> Dict[str, List]:
1547        output = self.execute_command('netdata show')
1548
1549        netdata = {}
1550        if output.pop(0) != 'Prefixes:':
1551            raise UnexpectedCommandOutput(output)
1552
1553        prefixes_output = []
1554        while True:
1555            line = output.pop(0)
1556            if line == 'Routes:':
1557                break
1558            else:
1559                prefixes_output.append(line)
1560
1561        netdata['prefixes'] = self.__parse_prefixes(prefixes_output)
1562
1563        routes_output = []
1564        while True:
1565            line = output.pop(0)
1566            if line == 'Services:':
1567                break
1568            else:
1569                routes_output.append(line)
1570
1571        netdata['routes'] = self.__parse_routes(routes_output)
1572        netdata['services'] = self.__parse_services(output)
1573
1574        return netdata
1575
1576    def get_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1577        """Get network prefixes from Thread Network Data."""
1578        network_data = self.get_network_data()
1579        return network_data['prefixes']
1580
1581    def get_routes(self) -> List[Tuple[str, bool, str, Rloc16]]:
1582        """Get routes from Thread Network Data."""
1583        network_data = self.get_network_data()
1584        return network_data['routes']
1585
1586    def get_services(self) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]:
1587        """Get services from Thread Network Data"""
1588        network_data = self.get_network_data()
1589        return network_data['services']
1590
1591    def __parse_services(self, output: List[str]) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]:
1592        services = []
1593        for line in output:
1594            line = line.split()
1595
1596            enterprise_number, service_data, server_data = line[:3]
1597            if line[3] == 's':
1598                stable, rloc16 = True, line[4]
1599            else:
1600                stable, rloc16 = False, line[3]
1601
1602            enterprise_number = int(enterprise_number)
1603            service_data = self.__hex_to_bytes(service_data)
1604            server_data = self.__hex_to_bytes(server_data)
1605            rloc16 = Rloc16(rloc16, 16)
1606
1607            services.append((enterprise_number, service_data, server_data, stable, rloc16))
1608
1609        return services
1610
1611    def get_network_data_bytes(self) -> bytes:
1612        """Get the raw Network Data."""
1613        hexstr = self.__parse_str(self.execute_command('netdata show -x'))
1614        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
1615
1616    def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]:
1617        """Get routes from local Network Data."""
1618        return self.__parse_routes(self.execute_command('route'))
1619
1620    def __parse_routes(self, output: List[str]) -> List[Tuple[str, bool, str, Rloc16]]:
1621        routes = []
1622        for line in output:
1623            line = line.split()
1624            if line[1] == 's':
1625                prefix, _, prf, rloc16 = line
1626                stable = True
1627            else:
1628                prefix, prf, rloc16 = line
1629                stable = False
1630
1631            rloc16 = Rloc16(rloc16, 16)
1632            routes.append((prefix, stable, prf, rloc16))
1633
1634        return routes
1635
1636    def add_route(self, prefix: str, stable=True, prf='med'):
1637        """Add a valid external route to the Network Data."""
1638        cmd = f'route add {prefix}'
1639        if stable:
1640            cmd += ' s'
1641
1642        cmd += f' {prf}'
1643        self.execute_command(cmd)
1644
1645    def remove_route(self, prefix: str):
1646        """Invalidate a external route in the Network Data."""
1647        self.execute_command(f'route remove {prefix}')
1648
1649    def add_service(self, enterprise_number: int, service_data: Union[str, bytes], server_data: Union[str, bytes]):
1650        """Add service to the Network Data.
1651
1652        enterpriseNumber: IANA enterprise number
1653        serviceData: hex-encoded binary service data
1654        serverData: hex-encoded binary server data
1655        """
1656        service_data = self.__validate_hex_or_bytes(service_data)
1657        server_data = self.__validate_hex_or_bytes(server_data)
1658        self.execute_command(f'service add {enterprise_number} {service_data} {server_data}')
1659
1660    def remove_service(self, enterprise_number, service_data):
1661        """Remove service from Network Data.
1662
1663        enterpriseNumber: IANA enterprise number
1664        serviceData: hext-encoded binary service data
1665        """
1666        service_data = self.__validate_hex_or_bytes(service_data)
1667        self.execute_command(f'service remove {enterprise_number} {service_data}')
1668
1669    #
1670    # Dataset management
1671    #
1672
1673    def dataset_init_buffer(self, get_active_dataset=False, get_pending_dataset=False):
1674        """Initialize operational dataset buffer."""
1675        if get_active_dataset and get_pending_dataset:
1676            raise InvalidArgumentsError("Can not specify both `get_active_dataset` and `get_pending_dataset`.")
1677
1678        if get_active_dataset:
1679            self.execute_command(f'dataset init active')
1680        elif get_pending_dataset:
1681            self.execute_command(f'dataset init pending')
1682        else:
1683            self.execute_command(f'dataset init new')
1684
1685    def dataset_commit_buffer(self, dataset: str):
1686        if dataset in ('active', 'pending'):
1687            cmd = f'dataset commit {dataset}'
1688        else:
1689            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1690
1691        self.execute_command(cmd)
1692
1693    def dataset_clear_buffer(self):
1694        """Reset operational dataset buffer."""
1695        self.execute_command('dataset clear')
1696
1697    def get_dataset(self, dataset: str = 'buffer'):
1698        if dataset in ('active', 'pending'):
1699            cmd = f'dataset {dataset}'
1700        elif dataset == 'buffer':
1701            cmd = 'dataset'
1702        else:
1703            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1704
1705        output = self.execute_command(cmd)
1706        return self.__parse_dataset(output)
1707
1708    def __parse_dataset(self, output: List[str]) -> Dict[str, Any]:
1709        # Example output:
1710        #
1711        # Active Timestamp: 1
1712        # Channel: 22
1713        # Channel Mask: 0x07fff800
1714        # Ext PAN ID: 5c93ae980ff22d35
1715        # Mesh Local Prefix: fdc7:55fe:6363:bd01::/64
1716        # Network Key: d1a8348d59fb1fac1d6c4f95007d487a
1717        # Network Name: OpenThread-7caa
1718        # PAN ID: 0x7caa
1719        # PSKc: 167d89fd169e439ca0b8266de248090f
1720        # Security Policy: 0, onrc
1721
1722        dataset = {}
1723
1724        for line in output:
1725            line = line.split(': ')
1726            key, val = line[0], ': '.join(line[1:])
1727
1728            if key == 'Active Timestamp':
1729                dataset['active_timestamp'] = int(val)
1730            elif key == 'Channel':
1731                dataset['channel'] = int(val)
1732            elif key == 'Channel Mask':
1733                dataset['channel_mask'] = int(val, 16)
1734            elif key == 'Ext PAN ID':
1735                dataset['extpanid'] = val
1736            elif key == 'Mesh Local Prefix':
1737                dataset['mesh_local_prefix'] = val
1738            elif key in ('Network Key', 'Master Key'):
1739                dataset['networkkey'] = val
1740            elif key == 'Network Name':
1741                dataset['network_name'] = val
1742            elif key == 'PAN ID':
1743                dataset['panid'] = int(val, 16)
1744            elif key == 'PSKc':
1745                dataset['pskc'] = val
1746            elif key == 'Security Policy':
1747                rotation_time, flags = val.split(', ') if ', ' in val else val.split(' ')
1748                rotation_time = int(rotation_time)
1749                dataset['security_policy'] = SecurityPolicy(rotation_time, flags)
1750            else:
1751                raise UnexpectedCommandOutput(output)
1752
1753        return dataset
1754
1755    def get_dataset_bytes(self, dataset: str) -> bytes:
1756        if dataset in ('active', 'pending'):
1757            cmd = f'dataset {dataset} -x'
1758        else:
1759            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1760
1761        hexstr = self.__parse_str(self.execute_command(cmd))
1762        return self.__hex_to_bytes(hexstr)
1763
1764    def set_dataset_bytes(self, dataset: str, data: bytes) -> None:
1765        if dataset in ('active', 'pending'):
1766            cmd = f'dataset set {dataset} {self.__bytes_to_hex(data)}'
1767        else:
1768            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1769
1770        self.execute_command(cmd)
1771
1772    def dataset_set_buffer(self,
1773                           active_timestamp: int = None,
1774                           channel: int = None,
1775                           channel_mask: int = None,
1776                           extpanid: str = None,
1777                           mesh_local_prefix: str = None,
1778                           network_key: str = None,
1779                           network_name: str = None,
1780                           panid: int = None,
1781                           pskc: str = None,
1782                           security_policy: tuple = None,
1783                           pending_timestamp: int = None):
1784        if active_timestamp is not None:
1785            self.execute_command(f'dataset activetimestamp {active_timestamp}')
1786
1787        if channel is not None:
1788            self.execute_command(f'dataset channel {channel}')
1789
1790        if channel_mask is not None:
1791            self.execute_command(f'dataset channelmask {channel_mask}')
1792
1793        if extpanid is not None:
1794            self.execute_command(f'dataset extpanid {extpanid}')
1795
1796        if mesh_local_prefix is not None:
1797            self.execute_command(f'dataset meshlocalprefix {mesh_local_prefix}')
1798
1799        if network_key is not None:
1800            nwk_cmd = self.__detect_networkkey_cmd()
1801            self.execute_command(f'dataset {nwk_cmd} {network_key}')
1802
1803        if network_name is not None:
1804            self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}')
1805
1806        if panid is not None:
1807            self.execute_command(f'dataset panid {panid}')
1808
1809        if pskc is not None:
1810            self.execute_command(f'dataset pskc {pskc}')
1811
1812        if security_policy is not None:
1813            rotation_time, flags = security_policy
1814            self.execute_command(f'dataset securitypolicy {rotation_time} {flags}')
1815
1816        if pending_timestamp is not None:
1817            self.execute_command(f'dataset pendingtimestamp {pending_timestamp}')
1818
1819    # TODO: dataset mgmtgetcommand
1820    # TODO: dataset mgmtsetcommand
1821    # TODO: dataset set <active|pending> <dataset>
1822
1823    #
1824    # Allowlist management
1825    #
1826
1827    def enable_allowlist(self):
1828        self.execute_command(f'macfilter addr {self.__detect_allowlist_cmd()}')
1829
1830    def disable_allowlist(self):
1831        self.execute_command('macfilter addr disable')
1832
1833    def add_allowlist(self, addr: str, rssi: int = None):
1834        cmd = f'macfilter addr add {addr}'
1835
1836        if rssi is not None:
1837            cmd += f' {rssi}'
1838
1839        self.execute_command(cmd)
1840
1841    def remove_allowlist(self, addr: str):
1842        self.execute_command(f'macfilter addr remove {addr}')
1843
1844    def clear_allowlist(self):
1845        self.execute_command('macfilter addr clear')
1846
1847    def set_allowlist(self, allowlist: Collection[Union[str, Tuple[str, int]]]):
1848        self.clear_allowlist()
1849
1850        if allowlist is None:
1851            self.disable_allowlist()
1852        else:
1853            self.enable_allowlist()
1854            for item in allowlist:
1855                if isinstance(item, str):
1856                    self.add_allowlist(item)
1857                else:
1858                    addr, rssi = item[0], item[1]
1859                    self.add_allowlist(addr, rssi)
1860
1861    # TODO: denylist
1862    # TODO: macfilter rss
1863    # TODO: macfilter rss add <extaddr> <rss>
1864    # TODO: macfilter rss add-lqi <extaddr> <lqi>
1865    # TODO: macfilter rss remove <extaddr>
1866    # TODO: macfilter rss clear
1867
1868    def __detect_allowlist_cmd(self):
1869        if self.api_version >= 28:
1870            return 'allowlist'
1871        else:
1872            return '\x77\x68\x69\x74\x65\x6c\x69\x73\x74'
1873
1874    def __detect_networkkey_cmd(self) -> str:
1875        return 'networkkey' if self.api_version >= 126 else 'masterkey'
1876
1877    #
1878    # Unicast Addresses management
1879    #
1880    def add_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1881        """Add an IPv6 address to the Thread interface."""
1882        self.execute_command(f'ipaddr add {ip}')
1883
1884    def del_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1885        """Delete an IPv6 address from the Thread interface."""
1886        self.execute_command(f'ipaddr del {ip}')
1887
1888    def get_ipaddrs(self) -> Tuple[Ip6Addr]:
1889        """Get all IPv6 addresses assigned to the Thread interface."""
1890        return tuple(map(Ip6Addr, self.execute_command('ipaddr')))
1891
1892    def has_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1893        """Check if a IPv6 address was added to the Thread interface."""
1894        return ip in self.get_ipaddrs()
1895
1896    def get_ipaddr_mleid(self) -> Ip6Addr:
1897        """Get Thread Mesh Local EID address."""
1898        return self.__parse_ip6addr(self.execute_command('ipaddr mleid'))
1899
1900    def get_ipaddr_linklocal(self) -> Ip6Addr:
1901        """Get Thread link-local IPv6 address."""
1902        return self.__parse_ip6addr(self.execute_command('ipaddr linklocal'))
1903
1904    def get_ipaddr_rloc(self) -> Ip6Addr:
1905        """Get Thread Routing Locator (RLOC) address."""
1906        return self.__parse_ip6addr(self.execute_command('ipaddr rloc'))
1907
1908    #
1909    # Multicast Addresses management
1910    #
1911
1912    def add_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1913        """Subscribe the Thread interface to the IPv6 multicast address."""
1914        self.execute_command(f'ipmaddr add {ip}')
1915
1916    def del_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1917        """Unsubscribe the Thread interface to the IPv6 multicast address."""
1918        self.execute_command(f'ipmaddr del {ip}')
1919
1920    def get_ipmaddrs(self) -> Tuple[Ip6Addr]:
1921        """Get all IPv6 multicast addresses subscribed to the Thread interface."""
1922        return tuple(map(Ip6Addr, self.execute_command('ipmaddr')))
1923
1924    def has_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1925        """Check if a IPv6 multicast address was subscribed by the Thread interface."""
1926        return ip in self.get_ipmaddrs()
1927
1928    def get_ipmaddr_promiscuous(self) -> bool:
1929        """Get multicast promiscuous mode."""
1930        return self.__parse_Enabled_or_Disabled(self.execute_command("ipmaddr promiscuous"))
1931
1932    def enable_ipmaddr_promiscuous(self):
1933        """Enable multicast promiscuous mode."""
1934        self.execute_command('ipmaddr promiscuous enable')
1935
1936    def disable_ipmaddr_promiscuous(self):
1937        """Disable multicast promiscuous mode."""
1938        self.execute_command('ipmaddr promiscuous disable')
1939
1940    def get_ipmaddr_llatn(self) -> Ip6Addr:
1941        """Get Link Local All Thread Nodes Multicast Address"""
1942        return self.__parse_ip6addr(self.execute_command('ipmaddr llatn'))
1943
1944    def get_ipmaddr_rlatn(self) -> Ip6Addr:
1945        """Get Realm Local All Thread Nodes Multicast Address"""
1946        return self.__parse_ip6addr(self.execute_command('ipmaddr rlatn'))
1947
1948    #
1949    # Backbone Router Utilities
1950    #
1951
1952    # TODO: bbr mgmt ...
1953
1954    def enable_backbone_router(self):
1955        """Enable Backbone Router Service for Thread 1.2 FTD.
1956
1957        SRV_DATA.ntf would be triggerred for attached device if there is no Backbone Router Service in Thread Network Data.
1958        """
1959        self.execute_command('bbr enable')
1960
1961    def disable_backbone_router(self):
1962        """Disable Backbone Router Service for Thread 1.2 FTD.
1963
1964        SRV_DATA.ntf would be triggerred if Backbone Router is Primary state.
1965        """
1966        self.execute_command('bbr disable')
1967
1968    def get_backbone_router_state(self) -> str:
1969        """Get local Backbone state (Disabled or Primary or Secondary) for Thread 1.2 FTD."""
1970        return self.__parse_str(self.execute_command('bbr state'))
1971
1972    def get_primary_backbone_router_info(self) -> Optional[dict]:
1973        """Show current Primary Backbone Router information for Thread 1.2 device."""
1974        output = self.execute_command('bbr')
1975
1976        if len(output) < 1:
1977            raise UnexpectedCommandOutput(output)
1978
1979        line = output[0]
1980        if line == 'BBR Primary: None':
1981            return None
1982
1983        if line != 'BBR Primary:':
1984            raise UnexpectedCommandOutput(output)
1985
1986        # Example output:
1987        # BBR Primary:
1988        # server16: 0xE400
1989        # seqno:    10
1990        # delay:    120 secs
1991        # timeout:  300 secs
1992
1993        dataset = {}
1994
1995        for line in output[1:]:
1996            key, val = line.split(':')
1997            key, val = key.strip(), val.strip()
1998            if key == 'server16':
1999                dataset[key] = int(val, 16)
2000            elif key == 'seqno':
2001                dataset[key] = int(val)
2002            elif key == 'delay':
2003                if not val.endswith(' secs'):
2004                    raise UnexpectedCommandOutput(output)
2005                dataset[key] = int(val.split()[0])
2006            elif key == 'timeout':
2007                if not val.endswith(' secs'):
2008                    raise UnexpectedCommandOutput(output)
2009                dataset[key] = int(val.split()[0])
2010            else:
2011                raise UnexpectedCommandOutput(output)
2012
2013        return dataset
2014
2015    def register_backbone_router_dataset(self):
2016        """Register Backbone Router Service for Thread 1.2 FTD.
2017
2018        SRV_DATA.ntf would be triggerred for attached device.
2019        """
2020        self.execute_command('bbr register')
2021
2022    def get_backbone_router_config(self) -> dict:
2023        """Show local Backbone Router configuration for Thread 1.2 FTD."""
2024        output = self.execute_command('bbr config')
2025        # Example output:
2026        # seqno:    10
2027        # delay:    120 secs
2028        # timeout:  300 secs
2029
2030        config = {}
2031
2032        for line in output:
2033            key, val = line.split(':')
2034            key, val = key.strip(), val.strip()
2035            if key == 'seqno':
2036                config[key] = int(val)
2037            elif key in ('delay', 'timeout'):
2038                if not line.endswith(' secs'):
2039                    raise UnexpectedCommandOutput(output)
2040                config[key] = int(val.split()[0])
2041            else:
2042                raise UnexpectedCommandOutput(output)
2043
2044        return config
2045
2046    def set_backbone_router_config(self, seqno: int = None, delay: int = None, timeout: int = None):
2047        """Configure local Backbone Router configuration for Thread 1.2 FTD.
2048
2049        Call register_backbone_router_dataset() to explicitly register Backbone Router service to Leader for Secondary Backbone Router.
2050        """
2051        if seqno is None and delay is None and timeout is None:
2052            raise InvalidArgumentsError("Please specify seqno or delay or timeout")
2053
2054        cmd = 'bbr config'
2055        if seqno is not None:
2056            cmd += f' seqno {seqno}'
2057
2058        if delay is not None:
2059            cmd += f' delay {delay}'
2060
2061        if timeout is not None:
2062            cmd += f' timeout {timeout}'
2063
2064        self.execute_command(cmd)
2065
2066    def get_backbone_router_jitter(self) -> int:
2067        """Get jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD."""
2068        return self.__parse_int(self.execute_command('bbr jitter'))
2069
2070    def set_backbone_router_jitter(self, val: int):
2071        """Set jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD."""
2072        self.execute_command(f'bbr jitter {val}')
2073
2074    def backbone_router_get_multicast_listeners(self) -> List[Tuple[Ip6Addr, int]]:
2075        """Get Backbone Router Multicast Listeners."""
2076        listeners = []
2077        for line in self.execute_command('bbr mgmt mlr listener'):
2078            ip, timeout = line.split()
2079            listeners.append((Ip6Addr(ip), int(timeout)))
2080
2081        return listeners
2082
2083    #
2084    # Thread 1.2 and DUA/MLR utilities
2085    #
2086
2087    def get_domain_name(self) -> str:
2088        """Get the Thread Domain Name for Thread 1.2 device."""
2089        return self.__parse_str(self.execute_command('domainname'))
2090
2091    def set_domain_name(self, name: str):
2092        """Set the Thread Domain Name for Thread 1.2 device."""
2093        self.execute_command('domainname %s' % self.__escape_escapable(name))
2094
2095    # TODO: dua iid
2096    # TODO: dua iid <iid>
2097    # TODO: dua iid clear
2098    # TODO: mlr reg <ipaddr> ... [timeout]
2099
2100    #
2101    # Link metrics management
2102    #
2103    # TODO: linkmetrics mgmt <ipaddr> forward <seriesid> [ldraX][pqmr]
2104    # TODO: linkmetrics probe <ipaddr> <seriesid> <length>
2105    # TODO: linkmetrics query <ipaddr> single [pqmr]
2106    # TODO: linkmetrics query <ipaddr> forward <seriesid>
2107    # TODO: linkquality <extaddr>
2108    # TODO: linkquality <extaddr> <linkquality>
2109    #
2110
2111    #
2112    # Logging
2113    #
2114
2115    def get_log_level(self) -> int:
2116        """Get the log level."""
2117        return self.__parse_int(self.execute_command('log level'))
2118
2119    def set_log_level(self, level: int):
2120        """Set the log level."""
2121        self.execute_command(f'log level {level}')
2122
2123    #
2124    # Device performance related information
2125    #
2126
2127    def get_message_buffer_info(self) -> dict:
2128        """Get the current message buffer information."""
2129        output = self.execute_command('bufferinfo')
2130
2131        info = {}
2132
2133        def _parse_val(val):
2134            vals = val.split()
2135            return int(vals[0]) if len(vals) == 1 else tuple(map(int, vals))
2136
2137        for line in output:
2138            key, val = line.split(':')
2139            key, val = key.strip(), val.strip()
2140            info[key.replace(' ', '_')] = _parse_val(val)
2141
2142        return info
2143
2144    @constant_property
2145    def counter_names(self):
2146        """Get the supported counter names."""
2147        return tuple(self.execute_command('counters'))
2148
2149    def get_counter(self, name: str) -> Counter:
2150        """Reset the counter value."""
2151        output = self.execute_command(f'counters {name}')
2152
2153        counter = Counter()
2154        for line in output:
2155            k, v = line.strip().split(': ')
2156            counter[k] = int(v)
2157
2158        return counter
2159
2160    def reset_counter(self, name: str):
2161        """Reset the counter value."""
2162        self.execute_command(f'counters {name} reset')
2163
2164    def get_eidcache(self) -> Dict[Ip6Addr, Rloc16]:
2165        """Get the EID-to-RLOC cache entries."""
2166        output = self.execute_command('eidcache')
2167        cache = {}
2168
2169        for line in output:
2170            ip, rloc16, _ = line.split(" ", 2)
2171
2172            cache[Ip6Addr(ip)] = Rloc16(rloc16, 16)
2173
2174        return cache
2175
2176    #
2177    # UDP utilities
2178    #
2179
2180    def udp_open(self):
2181        """Opens the example socket."""
2182        self.execute_command('udp open')
2183
2184    def udp_close(self):
2185        """Opens the example socket."""
2186        self.execute_command('udp close')
2187
2188    def udp_bind(self, ip: str, port: int, netif: NetifIdentifier = NetifIdentifier.THERAD):
2189        """Assigns a name (i.e. IPv6 address and port) to the example socket.
2190
2191        :param ip: the IPv6 address or the unspecified IPv6 address (::).
2192        :param port: the UDP port
2193        """
2194        bindarg = ''
2195        if netif == NetifIdentifier.UNSPECIFIED:
2196            bindarg += ' -u'
2197        elif netif == NetifIdentifier.BACKBONE:
2198            bindarg += ' -b'
2199
2200        self.execute_command(f'udp bind{bindarg} {ip} {port}')
2201
2202    def udp_connect(self, ip: str, port: int):
2203        """Specifies the peer with which the socket is to be associated.
2204
2205        ip: the peer's IPv6 address.
2206        port: the peer's UDP port.
2207        """
2208        self.execute_command(f'udp connect {ip} {port}')
2209
2210    def udp_send(self, ip: str = None, port: int = None, text: str = None, random_bytes: int = None, hex: str = None):
2211        """Send a few bytes over UDP.
2212
2213        ip: the IPv6 destination address.
2214        port: the UDP destination port.
2215        type: the type of the message: _ -t: text payload in the value, same as without specifying the type. _ -s: autogenerated payload with specified length indicated in the value.
2216        * -x: binary data in hexadecimal representation in the value.
2217        """
2218        if (ip is None) != (port is None):
2219            raise InvalidArgumentsError("Please specify both `ip` and `port`.")
2220
2221        if (text is not None) + (random_bytes is not None) + (hex is not None) != 1:
2222            raise InvalidArgumentsError("Please specify `text` or `random_bytes` or `hex`.")
2223
2224        cmd = 'udp send'
2225
2226        if ip is not None:
2227            cmd += f' {ip} {port}'
2228
2229        if text is not None:
2230            cmd += f' -t {text}'
2231        elif random_bytes is not None:
2232            cmd += f' -s {random_bytes}'
2233        elif hex is not None:
2234            self.__validate_hex(hex)
2235            cmd += f' -x {hex}'
2236
2237        self.execute_command(cmd)
2238
2239    def udp_get_link_security(self) -> bool:
2240        """Gets whether the link security is enabled or disabled."""
2241        return self.__parse_Enabled_or_Disabled(self.execute_command('udp linksecurity'))
2242
2243    def udp_enable_link_security(self):
2244        """Enable link security."""
2245        self.execute_command('udp linksecurity enable')
2246
2247    def udp_disable_link_security(self):
2248        """Disable link security."""
2249        self.execute_command('udp linksecurity disable')
2250
2251    def netstat(self) -> List[Tuple[Tuple[Ip6Addr, int], Tuple[Ip6Addr, int]]]:
2252        cmd = 'netstat'
2253        output = self.execute_command(cmd)
2254        if len(output) < 2:
2255            raise UnexpectedCommandOutput(output)
2256
2257        socks = []
2258        for line in output[2:]:
2259            _, sock_addr, peer_addr = line.strip().split('|')[:3]
2260            sock_addr = self.__parse_socket_addr(sock_addr.strip())
2261            peer_addr = self.__parse_socket_addr(peer_addr.strip())
2262            socks.append((sock_addr, peer_addr))
2263
2264        return socks
2265
2266    @staticmethod
2267    def __parse_socket_addr(addr: str) -> Tuple[Ip6Addr, int]:
2268        addr, port = addr.rsplit(':', 1)
2269        if addr.startswith('[') and addr.endswith(']'):
2270            addr = addr[1:-1]
2271
2272        return Ip6Addr(addr), int(port) if port != '*' else 0
2273
2274    #
2275    # CoAP CLI (test) utilities
2276    #
2277    def coap_start(self):
2278        """Starts the application coap service."""
2279        self.execute_command('coap start')
2280
2281    def coap_stop(self):
2282        """Stops the application coap service."""
2283        self.execute_command('coap stop')
2284
2285    def coap_get(self, addr: str, uri_path: str, type: str = "con"):
2286        cmd = f'coap get {addr} {uri_path} {type}'
2287        self.execute_command(cmd)
2288
2289    def coap_put(self, addr: str, uri_path: str, type: str = "con", payload: str = None):
2290        cmd = f'coap put {addr} {uri_path} {type}'
2291
2292        if payload is not None:
2293            cmd += f' {payload}'
2294
2295        self.execute_command(cmd)
2296
2297    def coap_post(self, addr: str, uri_path: str, type: str = "con", payload: str = None):
2298        cmd = f'coap post {addr} {uri_path} {type}'
2299
2300        if payload is not None:
2301            cmd += f' {payload}'
2302
2303        self.execute_command(cmd)
2304
2305    def coap_delete(self, addr: str, uri_path: str, type: str = "con", payload: str = None):
2306        cmd = f'coap delete {addr} {uri_path} {type}'
2307
2308        if payload is not None:
2309            cmd += f' {payload}'
2310
2311        self.execute_command(cmd)
2312
2313    def coap_get_test_resource_path(self) -> str:
2314        """Gets the URI path for the test resource."""
2315        return self.__parse_str(self.execute_command('coap resource'))
2316
2317    def coap_set_test_resource_path(self, path: str):
2318        """Sets the URI path for the test resource."""
2319        self.execute_command(f'coap resource {path}')
2320
2321    def coap_test_set_resource_content(self, content: str):
2322        """Sets the content sent by the test resource. If a CoAP client is observing the resource, a notification is sent to that client."""
2323        self.execute_command(f'coap set {content}')
2324
2325    # TODO: coap observe <address> <uri-path> [type]
2326    # TODO: coap cancel
2327    # TODO: coap parameters <type> ["default"|<ack_timeout> <ack_random_factor_numerator> <ack_random_factor_denominator> <max_retransmit>]
2328    # TODO: CoAP Secure utilities
2329
2330    #
2331    # Other TODOs
2332    #
2333    # TODO: netstat
2334    # TODO: networkdiagnostic get <addr> <type> ..
2335    # TODO: networkdiagnostic reset <addr> <type> ..
2336    # TODO: parent
2337    # TODO: pskc [-p] <key>|<passphrase>
2338    #
2339
2340    #
2341    # Private methods
2342    #
2343
2344    def __parse_str(self, output: List[str]) -> str:
2345        if len(output) != 1:
2346            raise UnexpectedCommandOutput(output)
2347
2348        return output[0]
2349
2350    def __parse_int_list(self, output: List[str]) -> List[int]:
2351        line = self.__parse_str(output)
2352        return list(map(int, line.strip().split()))
2353
2354    def __parse_ip6addr(self, output: List[str]) -> Ip6Addr:
2355        return Ip6Addr(self.__parse_str(output))
2356
2357    def __parse_ip6addr_list(self, output: List[str]) -> List[Ip6Addr]:
2358        return [Ip6Addr(line) for line in output]
2359
2360    def __parse_int(self, output: List[str], base=10) -> int:
2361        if len(output) != 1:
2362            raise UnexpectedCommandOutput(output)
2363
2364        return int(output[0], base)
2365
2366    def __parse_network_key(self, output: List[str]) -> str:
2367        networkkey = self.__parse_str(output)
2368
2369        try:
2370            self.__validate_network_key(networkkey)
2371        except ValueError:
2372            raise UnexpectedCommandOutput(output)
2373
2374        return networkkey
2375
2376    def __validate_network_key(self, networkkey: str):
2377        if len(networkkey) != 32:
2378            raise ValueError(networkkey)
2379
2380        int(networkkey, 16)
2381
2382    def __parse_hex64b(self, output: List[str]) -> str:
2383        extaddr = self.__parse_str(output)
2384
2385        try:
2386            self.__validate_hex64b(extaddr)
2387        except ValueError:
2388            raise UnexpectedCommandOutput(output)
2389
2390        return extaddr
2391
2392    __parse_extaddr = __parse_hex64b
2393    __parse_extpanid = __parse_hex64b
2394    __parse_eui64 = __parse_hex64b
2395    __parse_joiner_id = __parse_hex64b
2396
2397    def __validate_hex64b(self, extaddr: str):
2398        if len(extaddr) != 16:
2399            raise ValueError(extaddr)
2400
2401        self.__validate_hex(extaddr)
2402
2403    def __validate_hex(self, hexstr: str):
2404        if len(hexstr) % 2 != 0:
2405            raise ValueError(hexstr)
2406
2407        for i in range(0, len(hexstr), 2):
2408            int(hexstr[i:i + 2], 16)
2409
2410    __validate_extaddr = __validate_hex64b
2411    __validate_extpanid = __validate_hex64b
2412
2413    def __parse_Enabled_or_Disabled(self, output: List[str]) -> bool:
2414        return self.__parse_values(output, Enabled=True, Disabled=False)
2415
2416    def __parse_values(self, output: List[str], **vals) -> Any:
2417        val = self.__parse_str(output)
2418        if val not in vals:
2419            raise UnexpectedCommandOutput(output)
2420
2421        return vals[val]
2422
2423    def __validate_hex_or_bytes(self, data: Union[str, bytes]) -> str:
2424        if isinstance(data, bytes):
2425            return ''.join('%02x' % c for c in data)
2426        else:
2427            self.__validate_hex(data)
2428            return data
2429
2430    def __hex_to_bytes(self, hexstr: str) -> bytes:
2431        self.__validate_hex(hexstr)
2432        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
2433
2434    def __bytes_to_hex(self, data: bytes) -> str:
2435        return ''.join('%02x' % b for b in data)
2436
2437    def __escape_escapable(self, s: str) -> str:
2438        """Escape CLI escapable characters in the given string.
2439        """
2440        escapable_chars = '\\ \t\r\n'
2441        for char in escapable_chars:
2442            s = s.replace(char, '\\%s' % char)
2443        return s
2444
2445    def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str:
2446        txt_bin = b''
2447        for k, v in txt.items():
2448            assert '=' not in k, 'TXT key must not contain `=`'
2449
2450            if isinstance(v, str):
2451                entry = f'{k}={v}'.encode('utf8')
2452            elif isinstance(v, bytes):
2453                entry = f'{k}='.encode('utf8') + v
2454            else:
2455                assert v is True, 'TXT val must be str or bytes or True'
2456                entry = k.encode('utf8')
2457
2458            assert len(entry) <= 255, 'TXT entry is too long'
2459
2460            txt_bin += bytes([len(entry)])
2461            txt_bin += entry
2462
2463        return ''.join('%02x' % b for b in txt_bin)
2464
2465
2466def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
2467    cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator)
2468    cmd_handler = OtCliCommandRunner(cli_handler)
2469    return OTCI(cmd_handler)
2470
2471
2472def connect_cli_serial(dev: str, baudrate=115200) -> OTCI:
2473    cli_handler = connectors.OtCliSerial(dev, baudrate)
2474    cmd_handler = OtCliCommandRunner(cli_handler)
2475    return OTCI(cmd_handler)
2476
2477
2478def connect_ncp_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
2479    ncp_handler = connectors.OtNcpSim(executable, nodeid, simulator=simulator)
2480    cmd_handler = OtCliCommandRunner(ncp_handler, is_spinel_cli=True)
2481    return OTCI(cmd_handler)
2482
2483
2484def connect_otbr_ssh(host: str, port: int = 22, username='pi', password='raspberry', sudo=True):
2485    cmd_handler = OtbrSshCommandRunner(host, port, username, password, sudo=sudo)
2486    return OTCI(cmd_handler)
2487
2488
2489def connect_cmd_handler(cmd_handler: OTCommandHandler) -> OTCI:
2490    return OTCI(cmd_handler)
2491