1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import ipaddress 30import logging 31import re 32from collections import Counter 33from typing import Callable, List, Collection, Union, Tuple, Optional, Dict, Pattern, Any 34 35from . import connectors 36from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshCommandRunner 37from .connectors import Simulator 38from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError 39from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix, \ 40 RouterTableEntry, NetifIdentifier 41from .utils import match_line, constant_property 42 43 44class OTCI(object): 45 """ 46 This class represents an OpenThread Controller Interface instance that provides versatile interfaces to 47 manipulate an OpenThread device. 48 """ 49 50 DEFAULT_EXEC_COMMAND_RETRY = 4 # A command is retried 4 times if failed. 51 52 __exec_command_retry = DEFAULT_EXEC_COMMAND_RETRY 53 54 def __init__(self, otcmd: OTCommandHandler): 55 """ 56 This method initializes an OTCI instance. 57 58 :param otcmd: An OpenThread Command Handler instance to execute OpenThread CLI commands. 59 """ 60 self.__otcmd: OTCommandHandler = otcmd 61 self.__logger = logging.getLogger(name=str(self)) 62 63 def __repr__(self): 64 """Gets the string representation of the OTCI instance.""" 65 return repr(self.__otcmd) 66 67 def wait(self, duration: float, expect_line: Union[str, Pattern, Collection[Any]] = None): 68 """Wait for a given duration. 69 70 :param duration: The duration (in seconds) wait for. 71 :param expect_line: The line expected to output if given. 72 Raise ExpectLineTimeoutError if expect_line is not found within the given duration. 73 """ 74 self.log('info', "wait for %.3f seconds", duration) 75 if expect_line is None: 76 self.__otcmd.wait(duration) 77 else: 78 success = False 79 80 while duration > 0: 81 output = self.__otcmd.wait(1) 82 if any(match_line(line, expect_line) for line in output): 83 success = True 84 break 85 86 duration -= 1 87 88 if not success: 89 raise ExpectLineTimeoutError(expect_line) 90 91 def close(self): 92 """Close the OTCI instance.""" 93 self.__otcmd.close() 94 95 def execute_command(self, 96 cmd: str, 97 timeout: float = 10, 98 silent: bool = False, 99 already_is_ok: bool = True) -> List[str]: 100 for i in range(self.__exec_command_retry + 1): 101 try: 102 return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok) 103 except Exception: 104 if i == self.__exec_command_retry: 105 raise 106 107 def __execute_command(self, 108 cmd: str, 109 timeout: float = 10, 110 silent: bool = False, 111 already_is_ok: bool = True) -> List[str]: 112 """Execute the OpenThread CLI command. 113 114 :param cmd: The command to execute. 115 :param timeout: The command timeout. 116 :param silent: Whether to run the command silent without logging. 117 :returns: The command output as a list of lines. 118 """ 119 if not silent: 120 self.log('info', '> %s', cmd) 121 122 output = self.__otcmd.execute_command(cmd, timeout) 123 124 if not silent: 125 for line in output: 126 self.log('info', '%s', line) 127 128 if cmd in ('reset', 'factoryreset'): 129 return output 130 131 if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'): 132 output = output[:-1] 133 return output 134 else: 135 raise CommandError(cmd, output) 136 137 def set_execute_command_retry(self, n: int): 138 assert n >= 0 139 self.__exec_command_retry = n 140 141 def shell(self, cmd: str, timeout: float = 10): 142 self.log('info', '# %s', cmd) 143 output = self.__otcmd.shell(cmd, timeout=timeout) 144 for line in output: 145 self.log('info', '%s', line) 146 return output 147 148 def set_logger(self, logger: logging.Logger): 149 """Set the logger for the OTCI instance, or None to disable logging.""" 150 self.__logger = logger 151 152 def log(self, level, fmt, *args, **kwargs): 153 if self.__logger is not None: 154 getattr(self.__logger, level)('(%s) ' + fmt, repr(self), *args, **kwargs) 155 156 def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): 157 """Set the callback that will be called for each line output by the CLI.""" 158 self.__otcmd.set_line_read_callback(callback) 159 160 # 161 # Constant properties 162 # 163 @constant_property 164 def version(self): 165 """Returns the firmware version. (e.g. "OPENTHREAD/20191113-01411-gb2d66e424-dirty; SIMULATION; Nov 14 2020 14:24:38")""" 166 return self.__parse_str(self.execute_command('version')) 167 168 @constant_property 169 def thread_version(self): 170 """Get the Thread Version number.""" 171 return self.__parse_int(self.execute_command('thread version')) 172 173 @constant_property 174 def api_version(self): 175 """Get API version number.""" 176 try: 177 return self.__parse_int(self.execute_command('version api')) 178 except ValueError: 179 # If the device does not have `version api` command, it will print the firmware version, which would lead to ValueError. 180 return 0 181 182 # 183 # Basic device operations 184 # 185 def ifconfig_up(self): 186 """Bring up the IPv6 interface.""" 187 self.execute_command('ifconfig up') 188 189 def ifconfig_down(self): 190 """Bring down the IPv6 interface.""" 191 self.execute_command('ifconfig down') 192 193 def get_ifconfig_state(self) -> bool: 194 """Get the status of the IPv6 interface.""" 195 return self.__parse_values(self.execute_command('ifconfig'), up=True, down=False) 196 197 def thread_start(self): 198 """Enable Thread protocol operation and attach to a Thread network.""" 199 self.execute_command('thread start') 200 201 def thread_stop(self): 202 """Disable Thread protocol operation and detach from a Thread network.""" 203 self.execute_command('thread stop') 204 205 def reset(self): 206 """Signal a platform reset.""" 207 self.execute_command('reset') 208 209 def factory_reset(self): 210 """Delete all stored settings, and signal a platform reset.""" 211 self.execute_command('factoryreset') 212 213 # 214 # Network Operations 215 # 216 _PING_STATISTICS_PATTERN = re.compile( 217 r'^(?P<transmitted>\d+) packets transmitted, (?P<received>\d+) packets received.(?: Packet loss = (?P<loss>\d+\.\d+)%.)?(?: Round-trip min/avg/max = (?P<min>\d+)/(?P<avg>\d+\.\d+)/(?P<max>\d+) ms.)?$' 218 ) 219 220 def ping(self, 221 ip: str, 222 size: int = 8, 223 count: int = 1, 224 interval: float = 1, 225 hoplimit: int = 64, 226 timeout: float = 3) -> Dict: 227 """Send an ICMPv6 Echo Request. 228 The default arguments are consistent with https://github.com/openthread/openthread/blob/main/src/core/utils/ping_sender.hpp. 229 230 :param ip: The target IPv6 address to ping. 231 :param size: The number of data bytes in the payload. Default is 8. 232 :param count: The number of ICMPv6 Echo Requests to be sent. Default is 1. 233 :param interval: The interval between two consecutive ICMPv6 Echo Requests in seconds. The value may have fractional form, for example 0.5. Default is 1. 234 :param hoplimit: The hoplimit of ICMPv6 Echo Request to be sent. Default is 64. See OPENTHREAD_CONFIG_IP6_HOP_LIMIT_DEFAULT in src/core/config/ip6.h. 235 :param timeout: The maximum duration in seconds for the ping command to wait after the final echo request is sent. Default is 3. 236 """ 237 cmd = f'ping {ip} {size} {count} {interval} {hoplimit} {timeout}' 238 239 timeout_allowance = 3 240 lines = self.execute_command(cmd, timeout=(count - 1) * interval + timeout + timeout_allowance) 241 242 statistics = {} 243 for line in lines: 244 m = OTCI._PING_STATISTICS_PATTERN.match(line) 245 if m is not None: 246 if m.group('transmitted') is not None: 247 statistics['transmitted_packets'] = int(m.group('transmitted')) 248 statistics['received_packets'] = int(m.group('received')) 249 if m.group('loss') is not None: 250 statistics['packet_loss'] = float(m.group('loss')) / 100 251 if m.group('min') is not None: 252 statistics['round_trip_time'] = { 253 'min': int(m.group('min')), 254 'avg': float(m.group('avg')), 255 'max': int(m.group('max')) 256 } 257 return statistics 258 259 def ping_stop(self): 260 """Stop sending ICMPv6 Echo Requests.""" 261 self.execute_command('ping stop') 262 263 def discover(self, channel: int = None) -> List[Dict[str, Any]]: 264 """Perform an MLE Discovery operation.""" 265 return self.__scan_networks('discover', channel) 266 267 def scan(self, channel: int = None) -> List[Dict[str, Any]]: 268 """Perform an IEEE 802.15.4 Active Scan.""" 269 return self.__scan_networks('scan', channel) 270 271 def __scan_networks(self, cmd: str, channel: int = None) -> List[Dict[str, Any]]: 272 if channel is not None: 273 cmd += f' {channel}' 274 275 output = self.execute_command(cmd, timeout=10) 276 if len(output) < 2: 277 raise UnexpectedCommandOutput(output) 278 279 networks = [] 280 for line in output[2:]: 281 fields = line.strip().split('|') 282 283 try: 284 _, J, netname, extpanid, panid, extaddr, ch, dbm, lqi, _ = fields 285 except Exception: 286 logging.warning('ignored output: %r', line) 287 continue 288 289 networks.append({ 290 'joinable': bool(int(J)), 291 'network_name': netname.strip(), 292 'extpanid': extpanid, 293 'panid': int(panid, 16), 294 'extaddr': extaddr, 295 'channel': int(ch), 296 'dbm': int(dbm), 297 'lqi': int(lqi), 298 }) 299 300 return networks 301 302 def scan_energy(self, duration: float = None, channel: int = None) -> Dict[int, int]: 303 """Perform an IEEE 802.15.4 Energy Scan.""" 304 cmd = 'scan energy' 305 if duration is not None: 306 cmd += f' {duration * 1000:d}' 307 308 if channel is not None: 309 cmd += f' {channel}' 310 311 output = self.execute_command(cmd, timeout=10) 312 if len(output) < 2: 313 raise UnexpectedCommandOutput(output) 314 315 channels = {} 316 for line in output[2:]: 317 fields = line.strip().split('|') 318 319 _, Ch, RSSI, _ = fields 320 channels[int(Ch)] = int(RSSI) 321 322 return channels 323 324 def mac_send_data_request(self): 325 """Instruct an Rx-Off-When-Idle device to send a Data Request mac frame to its parent.""" 326 self.execute_command('mac send datarequest') 327 328 def mac_send_empty_data(self): 329 """Instruct an Rx-Off-When-Idle device to send a Empty Data mac frame to its parent.""" 330 self.execute_command('mac send emptydata') 331 332 # TODO: discover 333 # TODO: dns resolve <hostname> [DNS server IP] [DNS server port] 334 # TODO: fake /a/an <dst-ipaddr> <target> <meshLocalIid> 335 # TODO: sntp query 336 337 # 338 # Set or get device/network parameters 339 # 340 341 def get_mode(self) -> str: 342 """Get the Thread Device Mode value. 343 344 -: no flags set (rx-off-when-idle, minimal Thread device, stable network data) 345 r: rx-on-when-idle 346 d: Full Thread Device 347 n: Full Network Data 348 """ 349 return self.__parse_str(self.execute_command('mode')) 350 351 def set_mode(self, mode: str): 352 """Set the Thread Device Mode value. 353 354 -: no flags set (rx-off-when-idle, minimal Thread device, stable network data) 355 r: rx-on-when-idle 356 d: Full Thread Device 357 n: Full Network Data 358 """ 359 self.execute_command(f'mode {DeviceMode(mode)}') 360 361 def get_extaddr(self) -> str: 362 """Get the IEEE 802.15.4 Extended Address.""" 363 return self.__parse_extaddr(self.execute_command('extaddr')) 364 365 def set_extaddr(self, extaddr: str): 366 """Set the IEEE 802.15.4 Extended Address.""" 367 self.__validate_hex64b(extaddr) 368 self.execute_command(f'extaddr {extaddr}') 369 370 def get_eui64(self) -> str: 371 """Get the factory-assigned IEEE EUI-64.""" 372 return self.__parse_eui64(self.execute_command('eui64')) 373 374 def set_extpanid(self, extpanid: str): 375 """Set the Thread Extended PAN ID value.""" 376 self.__validate_extpanid(extpanid) 377 self.execute_command(f'extpanid {extpanid}') 378 379 def get_extpanid(self) -> str: 380 """Get the Thread Extended PAN ID value.""" 381 return self.__parse_extpanid(self.execute_command('extpanid')) 382 383 def set_channel(self, ch): 384 """Set the IEEE 802.15.4 Channel value.""" 385 self.execute_command('channel %d' % ch) 386 387 def get_channel(self): 388 """Get the IEEE 802.15.4 Channel value.""" 389 return self.__parse_int(self.execute_command('channel')) 390 391 def get_preferred_channel_mask(self) -> int: 392 """Get preferred channel mask.""" 393 return self.__parse_int(self.execute_command('channel preferred')) 394 395 def get_supported_channel_mask(self): 396 """Get supported channel mask.""" 397 return self.__parse_int(self.execute_command('channel supported')) 398 399 def get_panid(self): 400 """Get the IEEE 802.15.4 PAN ID value.""" 401 return self.__parse_int(self.execute_command('panid'), 16) 402 403 def set_panid(self, panid): 404 """Get the IEEE 802.15.4 PAN ID value.""" 405 self.execute_command('panid %d' % panid) 406 407 def set_network_name(self, name): 408 """Set network name.""" 409 self.execute_command('networkname %s' % self.__escape_escapable(name)) 410 411 def get_network_name(self): 412 """Get network name.""" 413 return self.__parse_str(self.execute_command('networkname')) 414 415 def get_network_key(self) -> str: 416 """Get the network key.""" 417 return self.__parse_network_key(self.execute_command(self.__detect_networkkey_cmd())) 418 419 def set_network_key(self, networkkey: str): 420 """Set the network key.""" 421 self.__validate_network_key(networkkey) 422 cmd = self.__detect_networkkey_cmd() 423 self.execute_command(f'{cmd} {networkkey}') 424 425 def get_key_sequence_counter(self) -> int: 426 """Get the Thread Key Sequence Counter.""" 427 return self.__parse_int(self.execute_command('keysequence counter')) 428 429 def set_key_sequence_counter(self, counter: int): 430 """Set the Thread Key Sequence Counter.""" 431 self.execute_command(f'keysequence counter {counter}') 432 433 def get_key_sequence_guard_time(self) -> int: 434 """Get Thread Key Switch Guard Time (in hours).""" 435 return self.__parse_int(self.execute_command('keysequence guardtime')) 436 437 def set_key_sequence_guard_time(self, hours: int): 438 """Set Thread Key Switch Guard Time (in hours) 0 means Thread Key Switch immediately if key index match.""" 439 self.execute_command(f'keysequence guardtime {hours}') 440 441 def get_cca_threshold(self) -> int: 442 """Get the CCA threshold in dBm measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4.""" 443 output = self.execute_command(f'ccathreshold') 444 val = self.__parse_str(output) 445 if not val.endswith(' dBm'): 446 raise UnexpectedCommandOutput(output) 447 448 return int(val[:-4]) 449 450 def set_cca_threshold(self, val: int): 451 """Set the CCA threshold measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4.""" 452 self.execute_command(f'ccathreshold {val}') 453 454 def get_promiscuous(self) -> bool: 455 """Get radio promiscuous property.""" 456 return self.__parse_Enabled_or_Disabled(self.execute_command('promiscuous')) 457 458 def enable_promiscuous(self): 459 """Enable radio promiscuous operation and print raw packet content.""" 460 self.execute_command('promiscuous enable') 461 462 def disable_promiscuous(self): 463 """Disable radio promiscuous operation.""" 464 self.execute_command('promiscuous disable') 465 466 def get_txpower(self) -> int: 467 """Get the transmit power in dBm.""" 468 line = self.__parse_str(self.execute_command('txpower')) 469 if not line.endswith(' dBm'): 470 raise UnexpectedCommandOutput([line]) 471 472 return int(line.split()[0]) 473 474 def set_txpower(self, val: int): 475 """Set the transmit power in dBm.""" 476 self.execute_command(f'txpower {val}') 477 478 # TODO: fem 479 # TODO: fem lnagain 480 # TODO: fem lnagain <LNA gain> 481 # TODO: mac retries direct 482 # TODO: mac retries direct 483 # TODO: mac retries indirect 484 # TODO: mac retries indirect <number> 485 486 # 487 # Basic Node states and properties 488 # 489 490 def get_state(self) -> ThreadState: 491 """Get the current Thread state.""" 492 return ThreadState(self.__parse_str(self.execute_command('state'))) 493 494 def set_state(self, state: str): 495 """Try to switch to state detached, child, router or leader.""" 496 self.execute_command(f'state {state}') 497 498 def get_rloc16(self) -> int: 499 """Get the Thread RLOC16 value.""" 500 return self.__parse_int(self.execute_command('rloc16'), 16) 501 502 def get_router_id(self) -> int: 503 """Get the Thread Router ID value.""" 504 return self.get_rloc16() >> 10 505 506 def prefer_router_id(self, routerid: int): 507 """Prefer a Router ID when solicit router id from Leader.""" 508 self.execute_command(f'preferrouterid {routerid}') 509 510 def is_singleton(self) -> bool: 511 return self.__parse_values(self.execute_command('singleton'), true=True, false=False) 512 513 # 514 # RCP related utilities 515 # 516 517 def get_rcp_version(self): 518 return self.__parse_str(self.execute_command('rcp version')) 519 520 # 521 # Unsecure port utilities 522 # 523 524 def get_unsecure_ports(self) -> List[int]: 525 """all ports from the allowed unsecured port list.""" 526 return self.__parse_int_list(self.execute_command('unsecureport get')) 527 528 def add_unsecure_port(self, port: int): 529 """Add a port to the allowed unsecured port list.""" 530 self.execute_command(f'unsecureport add {port}') 531 532 def remove_unsecure_port(self, port: int): 533 """Remove a port from the allowed unsecured port list.""" 534 self.execute_command(f'unsecureport remove {port}') 535 536 def clear_unsecure_ports(self): 537 """Remove all ports from the allowed unsecured port list.""" 538 self.execute_command('unsecureport remove all') 539 540 # 541 # Leader configurations 542 # 543 544 def get_preferred_partition_id(self) -> PartitionId: 545 """Get the preferred Thread Leader Partition ID.""" 546 return PartitionId(self.__parse_int(self.execute_command(self.__get_partition_preferred_cmd()))) 547 548 def set_preferred_partition_id(self, parid: int): 549 """Set the preferred Thread Leader Partition ID.""" 550 self.execute_command(f'{self.__get_partition_preferred_cmd()} {parid}') 551 552 def __get_partition_preferred_cmd(self) -> str: 553 """""" 554 return 'partitionid preferred' if self.api_version >= 51 else 'leaderpartitionid' 555 556 def get_leader_weight(self) -> int: 557 """Get the Thread Leader Weight.""" 558 return self.__parse_int(self.execute_command('leaderweight')) 559 560 def set_leader_weight(self, weight: int): 561 """Set the Thread Leader Weight.""" 562 self.execute_command(f'leaderweight {weight}') 563 564 __LEADER_DATA_KEY_MAP = { 565 'Partition ID': 'partition_id', 566 'Weighting': 'weight', 567 'Data Version': 'data_ver', 568 'Stable Data Version': 'stable_data_ver', 569 'Leader Router ID': 'leader_id', 570 } 571 572 def get_leader_data(self) -> Dict[str, int]: 573 """Get the Thread Leader Data.""" 574 data = {} 575 output = self.execute_command('leaderdata') 576 577 try: 578 for line in output: 579 k, v = line.split(': ') 580 data[OTCI.__LEADER_DATA_KEY_MAP[k]] = int(v) 581 except KeyError: 582 raise UnexpectedCommandOutput(output) 583 584 return data 585 586 # 587 # Router configurations 588 # 589 590 def get_router_selection_jitter(self): 591 """Get the ROUTER_SELECTION_JITTER value.""" 592 return self.__parse_int(self.execute_command('routerselectionjitter')) 593 594 def set_router_selection_jitter(self, jitter): 595 """Set the ROUTER_SELECTION_JITTER value.""" 596 self.execute_command(f'routerselectionjitter {jitter}') 597 598 def get_network_id_timeout(self) -> int: 599 """Get the NETWORK_ID_TIMEOUT parameter used in the Router role.""" 600 return self.__parse_int(self.execute_command('networkidtimeout')) 601 602 def set_network_id_timeout(self, timeout: int): 603 """Set the NETWORK_ID_TIMEOUT parameter used in the Router role.""" 604 self.execute_command(f'networkidtimeout {timeout}') 605 606 def get_parent_priority(self) -> int: 607 """Get the assigned parent priority value, -2 means not assigned.""" 608 return self.__parse_int(self.execute_command('parentpriority')) 609 610 def set_parent_priority(self, priority: int): 611 """Set the assigned parent priority value: 1, 0, -1 or -2.""" 612 self.execute_command(f'parentpriority {priority}') 613 614 def get_router_upgrade_threshold(self) -> int: 615 """Get the ROUTER_UPGRADE_THRESHOLD value.""" 616 return self.__parse_int(self.execute_command('routerupgradethreshold')) 617 618 def set_router_upgrade_threshold(self, threshold: int): 619 """Set the ROUTER_UPGRADE_THRESHOLD value.""" 620 self.execute_command(f'routerupgradethreshold {threshold}') 621 622 def get_router_downgrade_threshold(self): 623 """Set the ROUTER_DOWNGRADE_THRESHOLD value.""" 624 return self.__parse_int(self.execute_command('routerdowngradethreshold')) 625 626 def set_router_downgrade_threshold(self, threshold: int): 627 """Get the ROUTER_DOWNGRADE_THRESHOLD value.""" 628 self.execute_command(f'routerdowngradethreshold {threshold}') 629 630 def get_router_eligible(self) -> bool: 631 """Indicates whether the router role is enabled or disabled.""" 632 return self.__parse_Enabled_or_Disabled(self.execute_command('routereligible')) 633 634 def enable_router_eligible(self): 635 """Disable the router role.""" 636 self.execute_command('routereligible enable') 637 638 def disable_router_eligible(self): 639 """Disable the router role.""" 640 self.execute_command('routereligible disable') 641 642 def get_router_list(self) -> List[RouterId]: 643 """Get allocated Router IDs.""" 644 line = self.__parse_str(self.execute_command('router list')) 645 return list(map(RouterId, line.strip().split())) 646 647 def get_router_table(self) -> Dict[RouterId, RouterTableEntry]: 648 """table of routers.""" 649 output = self.execute_command('router table') 650 if len(output) < 2: 651 raise UnexpectedCommandOutput(output) 652 653 # 654 # Example output: 655 # 656 # | ID | RLOC16 | Next Hop | Path Cost | LQ In | LQ Out | Age | Extended MAC | 657 # +----+--------+----------+-----------+-------+--------+-----+------------------+ 658 # | 21 | 0x5400 | 21 | 0 | 3 | 3 | 5 | d28d7f875888fccb | 659 # | 56 | 0xe000 | 56 | 0 | 0 | 0 | 182 | f2d92a82c8d8fe43 | 660 # Done 661 # 662 663 headers = self.__split_table_row(output[0]) 664 665 table = {} 666 for line in output[2:]: 667 line = line.strip() 668 if not line: 669 continue 670 671 fields = self.__split_table_row(line) 672 if len(fields) != len(headers): 673 raise UnexpectedCommandOutput(output) 674 675 col = lambda colname: self.__get_table_col(colname, headers, fields) 676 id = col('ID') 677 678 table[RouterId(id)] = router = RouterTableEntry({ 679 'id': RouterId(id), 680 'rloc16': Rloc16(col('RLOC16'), 16), 681 'next_hop': int(col('Next Hop')), 682 'path_cost': int(col('Path Cost')), 683 'lq_in': int(col('LQ In')), 684 'lq_out': int(col('LQ Out')), 685 'age': int(col('Age')), 686 'extaddr': col('Extended MAC'), 687 }) 688 689 if 'Link' in headers: 690 router['link'] = int(col('Link')) 691 else: 692 # support older version of OT which does not output `Link` field 693 router['link'] = self.get_router_info(router['id'], silent=True)['link'] 694 695 return table 696 697 def get_router_info(self, id: int, silent: bool = False) -> RouterTableEntry: 698 cmd = f'router {id}' 699 info = {} 700 output = self.execute_command(cmd, silent=silent) 701 items = [line.strip().split(': ') for line in output] 702 703 headers = [h for h, _ in items] 704 fields = [f for _, f in items] 705 col = lambda colname: self.__get_table_col(colname, headers, fields) 706 707 return RouterTableEntry({ 708 'id': RouterId(id), 709 'rloc16': Rloc16(col('Rloc'), 16), 710 'alloc': int(col('Alloc')), 711 'next_hop': int(col('Next Hop'), 16) >> 10, # convert RLOC16 to Router ID 712 'link': int(col('Link')), 713 }) 714 715 # 716 # Router utilities: Child management 717 # 718 719 def get_child_table(self) -> Dict[ChildId, Dict[str, Any]]: 720 """Get the table of attached children.""" 721 output = self.execute_command('child table') 722 if len(output) < 2: 723 raise UnexpectedCommandOutput(output) 724 725 # 726 # Example output: 727 # | ID | RLOC16 | Timeout | Age | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC | 728 # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+ 729 # | 1 | 0xc801 | 240 | 24 | 3 | 131 |1|0|0| 3| 0 | 0 | 4ecede68435358ac | 730 # | 2 | 0xc802 | 240 | 2 | 3 | 131 |0|0|0| 3| 1 | 0 | a672a601d2ce37d8 | 731 # Done 732 # 733 734 headers = self.__split_table_row(output[0]) 735 736 table = {} 737 for line in output[2:]: 738 line = line.strip() 739 if not line: 740 continue 741 742 fields = self.__split_table_row(line) 743 col = lambda colname: self.__get_table_col(colname, headers, fields) 744 745 id = int(col("ID")) 746 r, d, n = int(col("R")), int(col("D")), int(col("N")) 747 mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}') 748 749 child = { 750 'id': ChildId(id), 751 'rloc16': Rloc16(col('RLOC16'), 16), 752 'timeout': int(col('Timeout')), 753 'age': int(col('Age')), 754 'lq_in': int(col('LQ In')), 755 'c_vn': int(col('C_VN')), 756 'mode': mode, 757 'extaddr': col('Extended MAC') 758 } 759 760 if 'Ver' in headers: 761 child['ver'] = int(col('Ver')) 762 763 if 'CSL' in headers: 764 child['csl'] = bool(int(col('CSL'))) 765 766 if 'QMsgCnt' in headers: 767 child['qmsgcnt'] = int(col('QMsgCnt')) 768 769 table[ChildId(id)] = child 770 771 return table 772 773 # 774 # DNS server & client utilities 775 # 776 777 _IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)') 778 779 def dns_get_config(self): 780 """Get DNS client query config.""" 781 output = self.execute_command('dns config') 782 config = {} 783 for line in output: 784 k, v = line.split(': ') 785 if k == 'Server': 786 ip, port = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, v).groups() 787 config['server'] = (Ip6Addr(ip), int(port)) 788 elif k == 'ResponseTimeout': 789 config['response_timeout'] = int(v[:-3]) 790 elif k == 'MaxTxAttempts': 791 config['max_tx_attempts'] = int(v) 792 elif k == 'RecursionDesired': 793 config['recursion_desired'] = (v == 'yes') 794 else: 795 logging.warning("dns config ignored: %s", line) 796 797 return config 798 799 def dns_set_config(self, 800 server: Tuple[Union[str, ipaddress.IPv6Address], int], 801 response_timeout: int = None, 802 max_tx_attempts: int = None, 803 recursion_desired: bool = None): 804 """Set DNS client query config.""" 805 cmd = f'dns config {str(server[0])} {server[1]}' 806 if response_timeout is not None: 807 cmd += f' {response_timeout}' 808 809 assert max_tx_attempts is None or response_timeout is not None, "must specify `response_timeout` if `max_tx_attempts` is specified." 810 if max_tx_attempts is not None: 811 cmd += f' {max_tx_attempts}' 812 813 assert recursion_desired is None or max_tx_attempts is not None, 'must specify `max_tx_attempts` if `recursion_desired` is specified.' 814 if recursion_desired is not None: 815 cmd += f' {1 if recursion_desired else 0}' 816 817 self.execute_command(cmd) 818 819 def dns_get_compression(self) -> bool: 820 """Get DNS compression mode.""" 821 return self.__parse_Enabled_or_Disabled(self.execute_command('dns compression')) 822 823 def dns_enable_compression(self): 824 """Enable DNS compression mode.""" 825 self.execute_command('dns compression enable') 826 827 def dns_disable_compression(self): 828 """Disable DNS compression mode.""" 829 self.execute_command('dns compression disable') 830 831 def dns_browse(self, service: str) -> List[Dict]: 832 """Browse DNS service instances.""" 833 cmd = f'dns browse {service}' 834 output = '\n'.join(self.execute_command(cmd, 30.0)) 835 836 result = [] 837 for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall( 838 r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)', 839 output): 840 result.append({ 841 'instance': ins, 842 'service': service, 843 'port': int(port), 844 'priority': int(priority), 845 'weight': int(weight), 846 'host': hostname, 847 'address': Ip6Addr(address), 848 'txt': self.__parse_srp_server_service_txt(txt_data), 849 'srv_ttl': int(srv_ttl), 850 'txt_ttl': int(txt_ttl), 851 'aaaa_ttl': int(aaaa_ttl), 852 }) 853 854 return result 855 856 def dns_resolve(self, hostname: str) -> List[Dict]: 857 """Resolve a DNS host name.""" 858 cmd = f'dns resolve {hostname}' 859 output = self.execute_command(cmd, 30.0) 860 dns_resp = output[0] 861 addrs = dns_resp.strip().split(' - ')[1].split(' ') 862 ips = [Ip6Addr(item.strip()) for item in addrs[::2]] 863 ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]] 864 865 return [{ 866 'address': ip, 867 'ttl': ttl, 868 } for ip, ttl in zip(ips, ttls)] 869 870 def dns_resolve_service(self, instance: str, service: str) -> Dict: 871 """Resolves aservice instance.""" 872 instance = self.__escape_escapable(instance) 873 cmd = f'dns service {instance} {service}' 874 output = self.execute_command(cmd, 30.0) 875 876 m = re.match( 877 r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)', 878 '\t'.join(output)) 879 if m: 880 port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups() 881 return { 882 'instance': instance, 883 'service': service, 884 'port': int(port), 885 'priority': int(priority), 886 'weight': int(weight), 887 'host': hostname, 888 'address': Ip6Addr(address), 889 'txt': self.__parse_srp_server_service_txt(txt_data), 890 'srv_ttl': int(srv_ttl), 891 'txt_ttl': int(txt_ttl), 892 'aaaa_ttl': int(aaaa_ttl), 893 } 894 else: 895 raise CommandError(cmd, output) 896 897 # 898 # SRP server & client utilities 899 # 900 901 def srp_server_get_state(self): 902 """Get the SRP server state""" 903 return self.__parse_str(self.execute_command('srp server state')) 904 905 def srp_server_enable(self): 906 """Enable SRP server.""" 907 self.execute_command('srp server enable') 908 909 def srp_server_disable(self): 910 """Disable SRP server.""" 911 self.execute_command('srp server disable') 912 913 def srp_server_get_domain(self) -> str: 914 """Get the SRP server domain.""" 915 return self.__parse_str(self.execute_command('srp server domain')) 916 917 def srp_server_set_domain(self, domain: str): 918 """Set the SRP server domain.""" 919 self.execute_command(f'srp server domain {domain}') 920 921 def srp_server_get_hosts(self) -> List[Dict]: 922 """Get SRP server registered hosts.""" 923 return self.__parse_srp_server_hosts(self.execute_command('srp server host')) 924 925 def srp_server_get_services(self) -> List[Dict]: 926 """Get SRP server registered services.""" 927 output = self.execute_command('srp server service') 928 return self.__parse_srp_server_services(output) 929 930 def __parse_srp_server_hosts(self, output: List[str]) -> List[Dict]: 931 result = [] 932 info = None 933 for line in output: 934 if not line.startswith(' '): 935 info = {'host': line} 936 result.append(info) 937 else: 938 k, v = line.strip().split(': ') 939 if k == 'deleted': 940 if v not in ('true', 'false'): 941 raise UnexpectedCommandOutput(output) 942 943 info['deleted'] = (v == 'true') 944 945 elif k == 'addresses': 946 if not v.startswith('[') or not v.endswith(']'): 947 raise UnexpectedCommandOutput(output) 948 949 v = v[1:-1] 950 info['addresses'] = list(map(Ip6Addr, v.split(', '))) 951 else: 952 raise UnexpectedCommandOutput(output) 953 954 return result 955 956 def __parse_srp_server_services(self, output: List[str]) -> List[Dict]: 957 result = [] 958 info = None 959 for line in output: 960 if not line.startswith(' '): 961 info = {'instance': line} 962 result.append(info) 963 else: 964 k, v = line.strip().split(': ') 965 if k == 'deleted': 966 if v not in ('true', 'false'): 967 raise UnexpectedCommandOutput(output) 968 969 info['deleted'] = (v == 'true') 970 971 elif k == 'addresses': 972 if not v.startswith('[') or not v.endswith(']'): 973 raise UnexpectedCommandOutput(output) 974 975 v = v[1:-1] 976 info['addresses'] = list(map(Ip6Addr, v.split(', '))) 977 elif k == 'subtypes': 978 info[k] = list() if v == '(null)' else list(v.split(',')) 979 elif k in ('port', 'weight', 'priority', 'ttl'): 980 info[k] = int(v) 981 elif k in ('host',): 982 info[k] = v 983 elif k == 'TXT': 984 info['txt'] = self.__parse_srp_server_service_txt(v) 985 else: 986 raise UnexpectedCommandOutput(output) 987 988 return result 989 990 def __parse_srp_server_service_txt(self, txt: str) -> Dict[str, Union[bytes, bool]]: 991 # example value: [txt11=76616c3131, txt12=76616c3132] 992 assert txt.startswith('[') and txt.endswith(']') 993 txt_dict = {} 994 for entry in txt[1:-1].split(', '): 995 if not entry: 996 continue 997 998 equal_pos = entry.find('=') 999 1000 if equal_pos != -1: 1001 k, v = entry[:equal_pos], entry[equal_pos + 1:] 1002 txt_dict[k] = bytes(int(v[i:i + 2], 16) for i in range(0, len(v), 2)) 1003 else: 1004 txt_dict[entry] = True 1005 1006 return txt_dict 1007 1008 def srp_server_get_lease(self) -> Tuple[int, int, int, int]: 1009 """Get SRP server LEASE & KEY-LEASE range (in seconds).""" 1010 lines = self.execute_command(f'srp server lease') 1011 return tuple([int(line.split(':')[1].strip()) for line in lines]) 1012 1013 def srp_server_set_lease(self, min_lease: int, max_lease: int, min_key_lease: int, max_key_lease: int): 1014 """Configure SRP server LEASE & KEY-LEASE range (in seconds).""" 1015 self.execute_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}') 1016 1017 def srp_client_get_state(self) -> bool: 1018 """Get SRP client state.""" 1019 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client state')) 1020 1021 def srp_client_start(self, server_ip: Union[str, ipaddress.IPv6Address], server_port: int): 1022 """Start SRP client.""" 1023 self.execute_command(f'srp client start {str(server_ip)} {server_port}') 1024 1025 def srp_client_stop(self): 1026 """Stop SRP client.""" 1027 self.execute_command('srp client stop') 1028 1029 def srp_client_get_autostart(self) -> bool: 1030 """Get SRP client autostart mode.""" 1031 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client autostart')) 1032 1033 def srp_client_enable_autostart(self): 1034 """Enable SRP client autostart mode.""" 1035 self.execute_command('srp client autostart enable') 1036 1037 def srp_client_disable_autostart(self): 1038 """Disable SRP client autostart mode.""" 1039 self.execute_command('srp client autostart disable') 1040 1041 def srp_client_get_callback(self) -> bool: 1042 """Get SRP client callback mode.""" 1043 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client callback')) 1044 1045 def srp_client_enable_callback(self): 1046 """Enable SRP client callback mode.""" 1047 self.execute_command('srp client callback enable') 1048 1049 def srp_client_disable_callback(self): 1050 """Disable SRP client callback mode.""" 1051 self.execute_command('srp client callback disable') 1052 1053 def srp_client_set_host_name(self, name: str): 1054 """Set SRP client host name.""" 1055 self.execute_command(f'srp client host name {name}') 1056 1057 def srp_client_get_host(self) -> Dict: 1058 """Get SRP client host.""" 1059 output = self.__parse_str(self.execute_command('srp client host')) 1060 return self.__parse_srp_client_host(output) 1061 1062 _SRP_CLIENT_HOST_PATTERN = re.compile(r'name:("(.*)"|(\(null\))), state:(\S+), addrs:\[(.*)\]') 1063 1064 def __parse_srp_client_host(self, line: str) -> Dict: 1065 m = re.match(OTCI._SRP_CLIENT_HOST_PATTERN, line) 1066 if not m: 1067 raise UnexpectedCommandOutput([line]) 1068 1069 _, host, _, state, addrs = m.groups() 1070 return { 1071 'host': host or '', 1072 'state': state, 1073 'addresses': [Ip6Addr(ip) for ip in addrs.split(', ')] if addrs else [], 1074 } 1075 1076 def srp_client_get_host_name(self) -> str: 1077 """Get SRP client host name.""" 1078 name = self.__parse_str(self.execute_command('srp client host name')) 1079 return name if name != '(null)' else '' 1080 1081 def srp_client_get_host_addresses(self) -> List[Ip6Addr]: 1082 """Get SRP client host addresses.""" 1083 return self.__parse_ip6addr_list(self.execute_command('srp client host address')) 1084 1085 def srp_client_set_host_addresses(self, *addrs: Union[str, ipaddress.IPv6Address]): 1086 """Set SRP client host addresses.""" 1087 self.execute_command(f'srp client host address {" ".join(map(str, addrs))}') 1088 1089 def srp_client_get_host_state(self): 1090 """Get SRP client host state.""" 1091 return self.__parse_str(self.execute_command('srp client host state')) 1092 1093 def srp_client_remove_host(self, remove_key_lease=False): 1094 """Remove SRP client host.""" 1095 cmd = 'srp client host remove' 1096 if remove_key_lease: 1097 cmd += ' 1' 1098 1099 self.execute_command(cmd) 1100 1101 def srp_client_get_services(self) -> List[Dict]: 1102 """Get SRP client services.""" 1103 output = self.execute_command('srp client service') 1104 return [self.__parse_srp_client_service(line) for line in output] 1105 1106 _SRP_CLIENT_SERVICE_PATTERN = re.compile( 1107 r'instance:"(.*)", name:"(.*)", state:(\S+), port:(\d+), priority:(\d+), weight:(\d+)') 1108 1109 def __parse_srp_client_service(self, line: str) -> Dict: 1110 # e.g. instance:"ins2", name:"_meshcop._udp", state:ToAdd, port:2000, priority:2, weight:2 1111 m = OTCI._SRP_CLIENT_SERVICE_PATTERN.match(line) 1112 if m is None: 1113 raise UnexpectedCommandOutput([line]) 1114 1115 instance, service, state, port, priority, weight = m.groups() 1116 port, priority, weight = int(port), int(priority), int(weight) 1117 return { 1118 'instance': instance, 1119 'service': service, 1120 'state': state, 1121 'port': port, 1122 'priority': priority, 1123 'weight': weight, 1124 } 1125 1126 def srp_client_add_service(self, 1127 instance: str, 1128 service: str, 1129 port: int, 1130 priority: int = 0, 1131 weight: int = 0, 1132 txt: Dict[str, Union[str, bytes, bool]] = None): 1133 instance = self.__escape_escapable(instance) 1134 cmd = f'srp client service add {instance} {service} {port} {priority} {weight}' 1135 if txt: 1136 cmd += f' {self.__txt_to_hex(txt)}' 1137 self.execute_command(cmd) 1138 1139 def srp_client_remove_service(self, instance: str, service: str): 1140 """Remove a service from SRP client.""" 1141 self.execute_command(f'srp client service remove {instance} {service}') 1142 1143 def srp_client_clear_service(self, instance: str, service: str): 1144 """Remove a service from SRP client without notifying the SRP server.""" 1145 self.execute_command(f'srp client service clear {instance} {service}') 1146 1147 def srp_client_get_key_lease_interval(self) -> int: 1148 """Get SRP client key lease interval (in seconds).""" 1149 return self.__parse_int(self.execute_command('srp client keyleaseinterval')) 1150 1151 def srp_client_set_key_lease_interval(self, interval: int): 1152 """Set SRP client key lease interval (in seconds).""" 1153 self.execute_command(f'srp client keyleaseinterval {interval}') 1154 1155 def srp_client_get_lease_interval(self) -> int: 1156 """Get SRP client lease interval (in seconds).""" 1157 return self.__parse_int(self.execute_command('srp client leaseinterval')) 1158 1159 def srp_client_set_lease_interval(self, interval: int): 1160 """Set SRP client lease interval (in seconds).""" 1161 self.execute_command(f'srp client leaseinterval {interval}') 1162 1163 def srp_client_get_server(self) -> Tuple[Ip6Addr, int]: 1164 """Get the SRP server (IP, port).""" 1165 result = self.__parse_str(self.execute_command('srp client server')) 1166 ip, port = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, result).groups() 1167 return Ip6Addr(ip), int(port) 1168 1169 def srp_client_get_service_key(self) -> bool: 1170 """Get SRP client "service key record inclusion" mode.""" 1171 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client service key')) 1172 1173 def srp_client_enable_service_key(self): 1174 """Enable SRP client "service key record inclusion" mode.""" 1175 self.execute_command('srp client service key enable') 1176 1177 def srp_client_disable_service_key(self): 1178 """Disable SRP client "service key record inclusion" mode.""" 1179 self.execute_command('srp client service key disable') 1180 1181 def __split_table_row(self, row: str) -> List[str]: 1182 if not (row.startswith('|') and row.endswith('|')): 1183 raise ValueError(row) 1184 1185 fields = row.split('|') 1186 fields = [x.strip() for x in fields[1:-1]] 1187 return fields 1188 1189 def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str: 1190 return fields[headers.index(colname)] 1191 1192 def get_child_list(self) -> List[ChildId]: 1193 """Get attached Child IDs.""" 1194 line = self.__parse_str(self.execute_command(f'child list')) 1195 return [ChildId(id) for id in line.strip().split()] 1196 1197 def get_child_info(self, child: Union[ChildId, Rloc16]) -> Dict[str, Any]: 1198 output = self.execute_command(f'child {child}') 1199 1200 info = {} 1201 1202 for line in output: 1203 k, v = line.split(': ') 1204 if k == 'Child ID': 1205 info['id'] = int(v) 1206 elif k == 'Rloc': 1207 info['rloc16'] = int(v, 16) 1208 elif k == 'Ext Addr': 1209 info['extaddr'] = v 1210 elif k == 'Mode': 1211 info['mode'] = DeviceMode(v) 1212 elif k == 'Net Data': 1213 info['c_vn'] = int(v) 1214 elif k == 'Timeout': 1215 info['timeout'] = int(v) 1216 elif k == 'Age': 1217 info['age'] = int(v) 1218 elif k == 'Link Quality In': 1219 info['lq_in'] = int(v) 1220 elif k == 'RSSI': 1221 info['rssi'] = int(v) 1222 else: 1223 self.log('warning', "Child info %s: %s ignored", k, v) 1224 1225 return info 1226 1227 def get_child_ipaddrs(self) -> Dict[Rloc16, List[Ip6Addr]]: 1228 """Get the list of IP addresses stored for MTD children. 1229 1230 Note: Each MTD child might has multiple IP addresses. 1231 """ 1232 output = self.execute_command('childip') 1233 1234 ipaddrs = {} 1235 1236 for line in output: 1237 rloc16, ip = line.split(': ') 1238 rloc16 = Rloc16(rloc16, 16) 1239 ipaddrs.setdefault(rloc16, []).append(Ip6Addr(ip.strip())) 1240 1241 return ipaddrs 1242 1243 # 1244 # Child configurations 1245 # 1246 1247 def get_max_children(self) -> int: 1248 """Get the Thread maximum number of allowed children.""" 1249 return self.__parse_int(self.execute_command('childmax')) 1250 1251 def set_max_children(self, val: int): 1252 """Set the Thread maximum number of allowed children.""" 1253 self.execute_command(f'childmax {val}') 1254 1255 def get_child_ip_max(self) -> int: 1256 """Get the maximum number of IP addresses that each MTD child may register with this device as parent.""" 1257 return self.__parse_int(self.execute_command('childip max')) 1258 1259 def set_child_ip_max(self, val: int): 1260 """Get the maximum number of IP addresses that each MTD child may register with this device as parent.""" 1261 self.execute_command(f'childip max {val}') 1262 1263 def get_child_timeout(self): 1264 """Get the Thread Child Timeout value.""" 1265 return self.__parse_int(self.execute_command('childtimeout')) 1266 1267 def set_child_timeout(self, timeout): 1268 """Set the Thread Child Timeout value.""" 1269 self.execute_command('childtimeout %d' % timeout) 1270 1271 def get_child_supervision_interval(self) -> int: 1272 """Get the Child Supervision Check Timeout value.""" 1273 return self.__parse_int(self.execute_command('childsupervision interval')) 1274 1275 def set_child_supervision_interval(self, val: int): 1276 """Set the Child Supervision Interval value. 1277 This command can only be used with FTD devices. 1278 """ 1279 self.execute_command(f'childsupervision interval {val}') 1280 1281 def get_child_supervision_check_timeout(self) -> int: 1282 """Get the Child Supervision Check Timeout value.""" 1283 return self.__parse_int(self.execute_command('childsupervision checktimeout')) 1284 1285 def set_child_supervision_check_timeout(self, val: int): 1286 """Set the Child Supervision Check Timeout value.""" 1287 self.execute_command(f'childsupervision checktimeout {val}') 1288 1289 # 1290 # Neighbor management 1291 # 1292 1293 def get_neighbor_list(self) -> List[Rloc16]: 1294 """Get a list of RLOC16 of neighbors""" 1295 line = self.__parse_str(self.execute_command('neighbor list')).strip() 1296 return [Rloc16(id, 16) for id in line.split()] 1297 1298 def get_neighbor_table(self) -> Dict[Rloc16, Dict[str, Any]]: 1299 output = self.execute_command('neighbor table') 1300 if len(output) < 2: 1301 raise UnexpectedCommandOutput(output) 1302 1303 # 1304 # Example output: 1305 # 1306 # | Role | RLOC16 | Age | Avg RSSI | Last RSSI |R|D|N| Extended MAC | 1307 # +------+--------+-----+----------+-----------+-+-+-+------------------+ 1308 # | C | 0xcc01 | 96 | -46 | -46 |1|1|1| 1eb9ba8a6522636b | 1309 # | R | 0xc800 | 2 | -29 | -29 |1|1|1| 9a91556102c39ddb | 1310 # | R | 0xf000 | 3 | -28 | -28 |1|1|1| 0ad7ed6beaa6016d | 1311 # Done 1312 # 1313 1314 headers = self.__split_table_row(output[0]) 1315 1316 table = {} 1317 for line in output[2:]: 1318 line = line.strip() 1319 if not line: 1320 continue 1321 1322 fields = self.__split_table_row(line) 1323 col = lambda colname: self.__get_table_col(colname, headers, fields) 1324 1325 role = col('Role') 1326 is_router = role == 'R' 1327 r, d, n = int(col('R')), int(col('D')), int(col('N')) 1328 mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}') 1329 1330 rloc16 = Rloc16(col('RLOC16'), 16) 1331 1332 table[rloc16] = { 1333 'is_router': is_router, 1334 'rloc16': rloc16, 1335 'age': int(col('Age')), 1336 'avg_rssi': int(col('Avg RSSI')), 1337 'last_rssi': int(col('Last RSSI')), 1338 'mode': mode, 1339 'extaddr': col('Extended MAC'), 1340 } 1341 1342 return table 1343 1344 # 1345 # SED/SSED configuration 1346 # 1347 1348 def get_poll_period(self) -> int: 1349 """Get the customized data poll period of sleepy end device (milliseconds). 1350 Only for Reference Device.""" 1351 return self.__parse_int(self.execute_command('pollperiod')) 1352 1353 def set_poll_period(self, poll_period: int): 1354 """Set the customized data poll period (in milliseconds) for sleepy end device. 1355 1356 Only for Reference Device.""" 1357 self.execute_command(f'pollperiod {poll_period}') 1358 1359 # TODO: csl 1360 # TODO: csl channel <channel> 1361 # TODO: csl period <period> 1362 # TODO: csl timeout <timeout> 1363 1364 _CSL_PERIOD_PATTERN = re.compile(r'(\d+)\(in units of 10 symbols\), \d+ms') 1365 _CSL_TIMEOUT_PATTERN = re.compile(r'(\d+)s') 1366 1367 def get_csl_config(self) -> Dict[str, int]: 1368 """Get the CSL configuration.""" 1369 output = self.execute_command('csl') 1370 1371 cfg = {} 1372 for line in output: 1373 k, v = line.split(': ') 1374 if k == 'Channel': 1375 cfg['channel'] = int(v) 1376 elif k == 'Timeout': 1377 cfg['timeout'] = int(OTCI._CSL_TIMEOUT_PATTERN.match(v).group(1)) 1378 elif k == 'Period': 1379 cfg['period'] = int(OTCI._CSL_PERIOD_PATTERN.match(v).group(1)) 1380 else: 1381 logging.warning("Ignore unknown CSL parameter: %s: %s", k, v) 1382 1383 return cfg 1384 1385 def config_csl(self, channel: int = None, period: int = None, timeout: int = None): 1386 """Configure CSL parameters. 1387 1388 :param channel: Set CSL channel. 1389 :param period: Set CSL period in units of 10 symbols. Disable CSL by setting this parameter to 0. 1390 :param timeout: Set the CSL timeout in seconds. 1391 """ 1392 1393 if channel is None and period is None and timeout is None: 1394 raise InvalidArgumentsError("Please specify at least 1 parameter to configure.") 1395 1396 if channel is not None: 1397 self.execute_command(f'csl channel {channel}') 1398 1399 if period is not None: 1400 self.execute_command(f'csl period {period}') 1401 1402 if timeout is not None: 1403 self.execute_command(f'csl timeout {timeout}') 1404 1405 # 1406 # Leader utilities 1407 # 1408 1409 def get_context_id_reuse_delay(self) -> int: 1410 """Get the CONTEXT_ID_REUSE_DELAY value.""" 1411 return self.__parse_int(self.execute_command('contextreusedelay')) 1412 1413 def set_context_id_reuse_delay(self, val: int): 1414 """Set the CONTEXT_ID_REUSE_DELAY value.""" 1415 self.execute_command(f'contextreusedelay {val}') 1416 1417 def release_router_id(self, routerid: int): 1418 """Release a Router ID that has been allocated by the device in the Leader role.""" 1419 self.execute_command(f'releaserouterid {routerid}') 1420 1421 # Time Sync utilities 1422 # TODO: networktime 1423 # TODO: networktime <timesyncperiod> <xtalthreshold> 1424 # TODO: delaytimermin 1425 # TODO: delaytimermin <delaytimermin> 1426 1427 # 1428 # Commissioniner operations 1429 # 1430 1431 def commissioner_start(self): 1432 """Start the Commissioner role.""" 1433 self.execute_command('commissioner start') 1434 1435 def commissioner_stop(self): 1436 """Stop the Commissioner role.""" 1437 self.execute_command('commissioner stop') 1438 1439 def get_commissioiner_state(self) -> str: 1440 """Get current Commissioner state (active or petitioning or disabled).""" 1441 return self.__parse_str(self.execute_command('commissioner state')) 1442 1443 def get_commissioner_session_id(self) -> int: 1444 """Get current commissioner session id.""" 1445 return self.__parse_int(self.execute_command('commissioner sessionid')) 1446 1447 def commissioner_add_joiner(self, pskd, eui64=None, discerner=None, timeout=None): 1448 """Add a Joiner entry. 1449 1450 :param pskd: Pre-Shared Key for the Joiner. 1451 :param eui64: The IEEE EUI-64 of the Joiner or '*' to match any Joiner 1452 :param discerner: The Joiner discerner in format number/length. 1453 :param timeout: Joiner timeout in seconds. 1454 """ 1455 if (eui64 is not None) == (discerner is not None): 1456 raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.") 1457 1458 if eui64 is not None and eui64 != '*': 1459 self.__validate_extaddr(eui64) 1460 1461 cmd = f'commissioner joiner add {eui64 or discerner} {pskd}' 1462 1463 if timeout is not None: 1464 cmd += f' {timeout}' 1465 1466 self.execute_command(cmd) 1467 1468 def commissioner_remove_jointer(self, eui64=None, discerner=None): 1469 if (eui64 is not None) == (discerner is not None): 1470 raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.") 1471 1472 if eui64 is not None and eui64 != '*': 1473 self.__validate_extaddr(eui64) 1474 1475 self.execute_command(f'commissioner joiner remove {eui64 or discerner}') 1476 1477 def set_commissioner_provisioning_url(self, url: str): 1478 self.execute_command(f'commissioner provisioningurl {url}') 1479 1480 # TODO: commissioner announce 1481 # TODO: commissioner energy 1482 # TODO: commissioner mgmtget 1483 # TODO: commissioner mgmtset 1484 # TODO: commissioner panid 1485 1486 # 1487 # Joiner operations 1488 # 1489 def joiner_start(self, psk: str, provisioning_url: str = None): 1490 """Start the Joiner.""" 1491 cmd = f'joiner start {psk}' 1492 if provisioning_url is not None: 1493 cmd += f' {provisioning_url}' 1494 1495 self.execute_command(cmd) 1496 1497 def joiner_stop(self): 1498 """Stop the Joiner role.""" 1499 self.execute_command('joiner stop') 1500 1501 def get_joiner_id(self) -> str: 1502 """Get the Joiner ID.""" 1503 return self.__parse_joiner_id(self.execute_command('joiner id')) 1504 1505 def get_joiner_port(self) -> int: 1506 """Get the Joiner port.""" 1507 return self.__parse_int(self.execute_command(f'joinerport')) 1508 1509 def set_joiner_port(self, port: int): 1510 """Set the Joiner port.""" 1511 self.execute_command(f'joinerport {port}') 1512 1513 # TODO: joiner discerner 1514 1515 # 1516 # Network Data utilities 1517 # 1518 def get_local_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1519 """Get prefixes from local Network Data.""" 1520 output = self.execute_command('prefix') 1521 return self.__parse_prefixes(output) 1522 1523 def __parse_prefixes(self, output: List[str]) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1524 prefixes = [] 1525 1526 for line in output: 1527 if line.startswith('- '): 1528 line = line[2:] 1529 1530 prefix, flags, prf, rloc16 = line.split()[:4] 1531 prefixes.append((Ip6Prefix(prefix), flags, prf, Rloc16(rloc16, 16))) 1532 1533 return prefixes 1534 1535 def add_prefix(self, prefix: str, flags='paosr', prf='med'): 1536 """Add a valid prefix to the Network Data.""" 1537 self.execute_command(f'prefix add {prefix} {flags} {prf}') 1538 1539 def remove_prefix(self, prefix: str): 1540 """Invalidate a prefix in the Network Data.""" 1541 self.execute_command(f'prefix remove {prefix}') 1542 1543 def register_network_data(self): 1544 self.execute_command('netdata register') 1545 1546 def get_network_data(self) -> Dict[str, List]: 1547 output = self.execute_command('netdata show') 1548 1549 netdata = {} 1550 if output.pop(0) != 'Prefixes:': 1551 raise UnexpectedCommandOutput(output) 1552 1553 prefixes_output = [] 1554 while True: 1555 line = output.pop(0) 1556 if line == 'Routes:': 1557 break 1558 else: 1559 prefixes_output.append(line) 1560 1561 netdata['prefixes'] = self.__parse_prefixes(prefixes_output) 1562 1563 routes_output = [] 1564 while True: 1565 line = output.pop(0) 1566 if line == 'Services:': 1567 break 1568 else: 1569 routes_output.append(line) 1570 1571 netdata['routes'] = self.__parse_routes(routes_output) 1572 netdata['services'] = self.__parse_services(output) 1573 1574 return netdata 1575 1576 def get_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1577 """Get network prefixes from Thread Network Data.""" 1578 network_data = self.get_network_data() 1579 return network_data['prefixes'] 1580 1581 def get_routes(self) -> List[Tuple[str, bool, str, Rloc16]]: 1582 """Get routes from Thread Network Data.""" 1583 network_data = self.get_network_data() 1584 return network_data['routes'] 1585 1586 def get_services(self) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]: 1587 """Get services from Thread Network Data""" 1588 network_data = self.get_network_data() 1589 return network_data['services'] 1590 1591 def __parse_services(self, output: List[str]) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]: 1592 services = [] 1593 for line in output: 1594 line = line.split() 1595 1596 enterprise_number, service_data, server_data = line[:3] 1597 if line[3] == 's': 1598 stable, rloc16 = True, line[4] 1599 else: 1600 stable, rloc16 = False, line[3] 1601 1602 enterprise_number = int(enterprise_number) 1603 service_data = self.__hex_to_bytes(service_data) 1604 server_data = self.__hex_to_bytes(server_data) 1605 rloc16 = Rloc16(rloc16, 16) 1606 1607 services.append((enterprise_number, service_data, server_data, stable, rloc16)) 1608 1609 return services 1610 1611 def get_network_data_bytes(self) -> bytes: 1612 """Get the raw Network Data.""" 1613 hexstr = self.__parse_str(self.execute_command('netdata show -x')) 1614 return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) 1615 1616 def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]: 1617 """Get routes from local Network Data.""" 1618 return self.__parse_routes(self.execute_command('route')) 1619 1620 def __parse_routes(self, output: List[str]) -> List[Tuple[str, bool, str, Rloc16]]: 1621 routes = [] 1622 for line in output: 1623 line = line.split() 1624 if line[1] == 's': 1625 prefix, _, prf, rloc16 = line 1626 stable = True 1627 else: 1628 prefix, prf, rloc16 = line 1629 stable = False 1630 1631 rloc16 = Rloc16(rloc16, 16) 1632 routes.append((prefix, stable, prf, rloc16)) 1633 1634 return routes 1635 1636 def add_route(self, prefix: str, stable=True, prf='med'): 1637 """Add a valid external route to the Network Data.""" 1638 cmd = f'route add {prefix}' 1639 if stable: 1640 cmd += ' s' 1641 1642 cmd += f' {prf}' 1643 self.execute_command(cmd) 1644 1645 def remove_route(self, prefix: str): 1646 """Invalidate a external route in the Network Data.""" 1647 self.execute_command(f'route remove {prefix}') 1648 1649 def add_service(self, enterprise_number: int, service_data: Union[str, bytes], server_data: Union[str, bytes]): 1650 """Add service to the Network Data. 1651 1652 enterpriseNumber: IANA enterprise number 1653 serviceData: hex-encoded binary service data 1654 serverData: hex-encoded binary server data 1655 """ 1656 service_data = self.__validate_hex_or_bytes(service_data) 1657 server_data = self.__validate_hex_or_bytes(server_data) 1658 self.execute_command(f'service add {enterprise_number} {service_data} {server_data}') 1659 1660 def remove_service(self, enterprise_number, service_data): 1661 """Remove service from Network Data. 1662 1663 enterpriseNumber: IANA enterprise number 1664 serviceData: hext-encoded binary service data 1665 """ 1666 service_data = self.__validate_hex_or_bytes(service_data) 1667 self.execute_command(f'service remove {enterprise_number} {service_data}') 1668 1669 # 1670 # Dataset management 1671 # 1672 1673 def dataset_init_buffer(self, get_active_dataset=False, get_pending_dataset=False): 1674 """Initialize operational dataset buffer.""" 1675 if get_active_dataset and get_pending_dataset: 1676 raise InvalidArgumentsError("Can not specify both `get_active_dataset` and `get_pending_dataset`.") 1677 1678 if get_active_dataset: 1679 self.execute_command(f'dataset init active') 1680 elif get_pending_dataset: 1681 self.execute_command(f'dataset init pending') 1682 else: 1683 self.execute_command(f'dataset init new') 1684 1685 def dataset_commit_buffer(self, dataset: str): 1686 if dataset in ('active', 'pending'): 1687 cmd = f'dataset commit {dataset}' 1688 else: 1689 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1690 1691 self.execute_command(cmd) 1692 1693 def dataset_clear_buffer(self): 1694 """Reset operational dataset buffer.""" 1695 self.execute_command('dataset clear') 1696 1697 def get_dataset(self, dataset: str = 'buffer'): 1698 if dataset in ('active', 'pending'): 1699 cmd = f'dataset {dataset}' 1700 elif dataset == 'buffer': 1701 cmd = 'dataset' 1702 else: 1703 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1704 1705 output = self.execute_command(cmd) 1706 return self.__parse_dataset(output) 1707 1708 def __parse_dataset(self, output: List[str]) -> Dict[str, Any]: 1709 # Example output: 1710 # 1711 # Active Timestamp: 1 1712 # Channel: 22 1713 # Channel Mask: 0x07fff800 1714 # Ext PAN ID: 5c93ae980ff22d35 1715 # Mesh Local Prefix: fdc7:55fe:6363:bd01::/64 1716 # Network Key: d1a8348d59fb1fac1d6c4f95007d487a 1717 # Network Name: OpenThread-7caa 1718 # PAN ID: 0x7caa 1719 # PSKc: 167d89fd169e439ca0b8266de248090f 1720 # Security Policy: 0, onrc 1721 1722 dataset = {} 1723 1724 for line in output: 1725 line = line.split(': ') 1726 key, val = line[0], ': '.join(line[1:]) 1727 1728 if key == 'Active Timestamp': 1729 dataset['active_timestamp'] = int(val) 1730 elif key == 'Channel': 1731 dataset['channel'] = int(val) 1732 elif key == 'Channel Mask': 1733 dataset['channel_mask'] = int(val, 16) 1734 elif key == 'Ext PAN ID': 1735 dataset['extpanid'] = val 1736 elif key == 'Mesh Local Prefix': 1737 dataset['mesh_local_prefix'] = val 1738 elif key in ('Network Key', 'Master Key'): 1739 dataset['networkkey'] = val 1740 elif key == 'Network Name': 1741 dataset['network_name'] = val 1742 elif key == 'PAN ID': 1743 dataset['panid'] = int(val, 16) 1744 elif key == 'PSKc': 1745 dataset['pskc'] = val 1746 elif key == 'Security Policy': 1747 rotation_time, flags = val.split(', ') if ', ' in val else val.split(' ') 1748 rotation_time = int(rotation_time) 1749 dataset['security_policy'] = SecurityPolicy(rotation_time, flags) 1750 else: 1751 raise UnexpectedCommandOutput(output) 1752 1753 return dataset 1754 1755 def get_dataset_bytes(self, dataset: str) -> bytes: 1756 if dataset in ('active', 'pending'): 1757 cmd = f'dataset {dataset} -x' 1758 else: 1759 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1760 1761 hexstr = self.__parse_str(self.execute_command(cmd)) 1762 return self.__hex_to_bytes(hexstr) 1763 1764 def set_dataset_bytes(self, dataset: str, data: bytes) -> None: 1765 if dataset in ('active', 'pending'): 1766 cmd = f'dataset set {dataset} {self.__bytes_to_hex(data)}' 1767 else: 1768 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1769 1770 self.execute_command(cmd) 1771 1772 def dataset_set_buffer(self, 1773 active_timestamp: int = None, 1774 channel: int = None, 1775 channel_mask: int = None, 1776 extpanid: str = None, 1777 mesh_local_prefix: str = None, 1778 network_key: str = None, 1779 network_name: str = None, 1780 panid: int = None, 1781 pskc: str = None, 1782 security_policy: tuple = None, 1783 pending_timestamp: int = None): 1784 if active_timestamp is not None: 1785 self.execute_command(f'dataset activetimestamp {active_timestamp}') 1786 1787 if channel is not None: 1788 self.execute_command(f'dataset channel {channel}') 1789 1790 if channel_mask is not None: 1791 self.execute_command(f'dataset channelmask {channel_mask}') 1792 1793 if extpanid is not None: 1794 self.execute_command(f'dataset extpanid {extpanid}') 1795 1796 if mesh_local_prefix is not None: 1797 self.execute_command(f'dataset meshlocalprefix {mesh_local_prefix}') 1798 1799 if network_key is not None: 1800 nwk_cmd = self.__detect_networkkey_cmd() 1801 self.execute_command(f'dataset {nwk_cmd} {network_key}') 1802 1803 if network_name is not None: 1804 self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}') 1805 1806 if panid is not None: 1807 self.execute_command(f'dataset panid {panid}') 1808 1809 if pskc is not None: 1810 self.execute_command(f'dataset pskc {pskc}') 1811 1812 if security_policy is not None: 1813 rotation_time, flags = security_policy 1814 self.execute_command(f'dataset securitypolicy {rotation_time} {flags}') 1815 1816 if pending_timestamp is not None: 1817 self.execute_command(f'dataset pendingtimestamp {pending_timestamp}') 1818 1819 # TODO: dataset mgmtgetcommand 1820 # TODO: dataset mgmtsetcommand 1821 # TODO: dataset set <active|pending> <dataset> 1822 1823 # 1824 # Allowlist management 1825 # 1826 1827 def enable_allowlist(self): 1828 self.execute_command(f'macfilter addr {self.__detect_allowlist_cmd()}') 1829 1830 def disable_allowlist(self): 1831 self.execute_command('macfilter addr disable') 1832 1833 def add_allowlist(self, addr: str, rssi: int = None): 1834 cmd = f'macfilter addr add {addr}' 1835 1836 if rssi is not None: 1837 cmd += f' {rssi}' 1838 1839 self.execute_command(cmd) 1840 1841 def remove_allowlist(self, addr: str): 1842 self.execute_command(f'macfilter addr remove {addr}') 1843 1844 def clear_allowlist(self): 1845 self.execute_command('macfilter addr clear') 1846 1847 def set_allowlist(self, allowlist: Collection[Union[str, Tuple[str, int]]]): 1848 self.clear_allowlist() 1849 1850 if allowlist is None: 1851 self.disable_allowlist() 1852 else: 1853 self.enable_allowlist() 1854 for item in allowlist: 1855 if isinstance(item, str): 1856 self.add_allowlist(item) 1857 else: 1858 addr, rssi = item[0], item[1] 1859 self.add_allowlist(addr, rssi) 1860 1861 # TODO: denylist 1862 # TODO: macfilter rss 1863 # TODO: macfilter rss add <extaddr> <rss> 1864 # TODO: macfilter rss add-lqi <extaddr> <lqi> 1865 # TODO: macfilter rss remove <extaddr> 1866 # TODO: macfilter rss clear 1867 1868 def __detect_allowlist_cmd(self): 1869 if self.api_version >= 28: 1870 return 'allowlist' 1871 else: 1872 return '\x77\x68\x69\x74\x65\x6c\x69\x73\x74' 1873 1874 def __detect_networkkey_cmd(self) -> str: 1875 return 'networkkey' if self.api_version >= 126 else 'masterkey' 1876 1877 # 1878 # Unicast Addresses management 1879 # 1880 def add_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1881 """Add an IPv6 address to the Thread interface.""" 1882 self.execute_command(f'ipaddr add {ip}') 1883 1884 def del_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1885 """Delete an IPv6 address from the Thread interface.""" 1886 self.execute_command(f'ipaddr del {ip}') 1887 1888 def get_ipaddrs(self) -> Tuple[Ip6Addr]: 1889 """Get all IPv6 addresses assigned to the Thread interface.""" 1890 return tuple(map(Ip6Addr, self.execute_command('ipaddr'))) 1891 1892 def has_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1893 """Check if a IPv6 address was added to the Thread interface.""" 1894 return ip in self.get_ipaddrs() 1895 1896 def get_ipaddr_mleid(self) -> Ip6Addr: 1897 """Get Thread Mesh Local EID address.""" 1898 return self.__parse_ip6addr(self.execute_command('ipaddr mleid')) 1899 1900 def get_ipaddr_linklocal(self) -> Ip6Addr: 1901 """Get Thread link-local IPv6 address.""" 1902 return self.__parse_ip6addr(self.execute_command('ipaddr linklocal')) 1903 1904 def get_ipaddr_rloc(self) -> Ip6Addr: 1905 """Get Thread Routing Locator (RLOC) address.""" 1906 return self.__parse_ip6addr(self.execute_command('ipaddr rloc')) 1907 1908 # 1909 # Multicast Addresses management 1910 # 1911 1912 def add_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1913 """Subscribe the Thread interface to the IPv6 multicast address.""" 1914 self.execute_command(f'ipmaddr add {ip}') 1915 1916 def del_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1917 """Unsubscribe the Thread interface to the IPv6 multicast address.""" 1918 self.execute_command(f'ipmaddr del {ip}') 1919 1920 def get_ipmaddrs(self) -> Tuple[Ip6Addr]: 1921 """Get all IPv6 multicast addresses subscribed to the Thread interface.""" 1922 return tuple(map(Ip6Addr, self.execute_command('ipmaddr'))) 1923 1924 def has_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1925 """Check if a IPv6 multicast address was subscribed by the Thread interface.""" 1926 return ip in self.get_ipmaddrs() 1927 1928 def get_ipmaddr_promiscuous(self) -> bool: 1929 """Get multicast promiscuous mode.""" 1930 return self.__parse_Enabled_or_Disabled(self.execute_command("ipmaddr promiscuous")) 1931 1932 def enable_ipmaddr_promiscuous(self): 1933 """Enable multicast promiscuous mode.""" 1934 self.execute_command('ipmaddr promiscuous enable') 1935 1936 def disable_ipmaddr_promiscuous(self): 1937 """Disable multicast promiscuous mode.""" 1938 self.execute_command('ipmaddr promiscuous disable') 1939 1940 def get_ipmaddr_llatn(self) -> Ip6Addr: 1941 """Get Link Local All Thread Nodes Multicast Address""" 1942 return self.__parse_ip6addr(self.execute_command('ipmaddr llatn')) 1943 1944 def get_ipmaddr_rlatn(self) -> Ip6Addr: 1945 """Get Realm Local All Thread Nodes Multicast Address""" 1946 return self.__parse_ip6addr(self.execute_command('ipmaddr rlatn')) 1947 1948 # 1949 # Backbone Router Utilities 1950 # 1951 1952 # TODO: bbr mgmt ... 1953 1954 def enable_backbone_router(self): 1955 """Enable Backbone Router Service for Thread 1.2 FTD. 1956 1957 SRV_DATA.ntf would be triggerred for attached device if there is no Backbone Router Service in Thread Network Data. 1958 """ 1959 self.execute_command('bbr enable') 1960 1961 def disable_backbone_router(self): 1962 """Disable Backbone Router Service for Thread 1.2 FTD. 1963 1964 SRV_DATA.ntf would be triggerred if Backbone Router is Primary state. 1965 """ 1966 self.execute_command('bbr disable') 1967 1968 def get_backbone_router_state(self) -> str: 1969 """Get local Backbone state (Disabled or Primary or Secondary) for Thread 1.2 FTD.""" 1970 return self.__parse_str(self.execute_command('bbr state')) 1971 1972 def get_primary_backbone_router_info(self) -> Optional[dict]: 1973 """Show current Primary Backbone Router information for Thread 1.2 device.""" 1974 output = self.execute_command('bbr') 1975 1976 if len(output) < 1: 1977 raise UnexpectedCommandOutput(output) 1978 1979 line = output[0] 1980 if line == 'BBR Primary: None': 1981 return None 1982 1983 if line != 'BBR Primary:': 1984 raise UnexpectedCommandOutput(output) 1985 1986 # Example output: 1987 # BBR Primary: 1988 # server16: 0xE400 1989 # seqno: 10 1990 # delay: 120 secs 1991 # timeout: 300 secs 1992 1993 dataset = {} 1994 1995 for line in output[1:]: 1996 key, val = line.split(':') 1997 key, val = key.strip(), val.strip() 1998 if key == 'server16': 1999 dataset[key] = int(val, 16) 2000 elif key == 'seqno': 2001 dataset[key] = int(val) 2002 elif key == 'delay': 2003 if not val.endswith(' secs'): 2004 raise UnexpectedCommandOutput(output) 2005 dataset[key] = int(val.split()[0]) 2006 elif key == 'timeout': 2007 if not val.endswith(' secs'): 2008 raise UnexpectedCommandOutput(output) 2009 dataset[key] = int(val.split()[0]) 2010 else: 2011 raise UnexpectedCommandOutput(output) 2012 2013 return dataset 2014 2015 def register_backbone_router_dataset(self): 2016 """Register Backbone Router Service for Thread 1.2 FTD. 2017 2018 SRV_DATA.ntf would be triggerred for attached device. 2019 """ 2020 self.execute_command('bbr register') 2021 2022 def get_backbone_router_config(self) -> dict: 2023 """Show local Backbone Router configuration for Thread 1.2 FTD.""" 2024 output = self.execute_command('bbr config') 2025 # Example output: 2026 # seqno: 10 2027 # delay: 120 secs 2028 # timeout: 300 secs 2029 2030 config = {} 2031 2032 for line in output: 2033 key, val = line.split(':') 2034 key, val = key.strip(), val.strip() 2035 if key == 'seqno': 2036 config[key] = int(val) 2037 elif key in ('delay', 'timeout'): 2038 if not line.endswith(' secs'): 2039 raise UnexpectedCommandOutput(output) 2040 config[key] = int(val.split()[0]) 2041 else: 2042 raise UnexpectedCommandOutput(output) 2043 2044 return config 2045 2046 def set_backbone_router_config(self, seqno: int = None, delay: int = None, timeout: int = None): 2047 """Configure local Backbone Router configuration for Thread 1.2 FTD. 2048 2049 Call register_backbone_router_dataset() to explicitly register Backbone Router service to Leader for Secondary Backbone Router. 2050 """ 2051 if seqno is None and delay is None and timeout is None: 2052 raise InvalidArgumentsError("Please specify seqno or delay or timeout") 2053 2054 cmd = 'bbr config' 2055 if seqno is not None: 2056 cmd += f' seqno {seqno}' 2057 2058 if delay is not None: 2059 cmd += f' delay {delay}' 2060 2061 if timeout is not None: 2062 cmd += f' timeout {timeout}' 2063 2064 self.execute_command(cmd) 2065 2066 def get_backbone_router_jitter(self) -> int: 2067 """Get jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD.""" 2068 return self.__parse_int(self.execute_command('bbr jitter')) 2069 2070 def set_backbone_router_jitter(self, val: int): 2071 """Set jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD.""" 2072 self.execute_command(f'bbr jitter {val}') 2073 2074 def backbone_router_get_multicast_listeners(self) -> List[Tuple[Ip6Addr, int]]: 2075 """Get Backbone Router Multicast Listeners.""" 2076 listeners = [] 2077 for line in self.execute_command('bbr mgmt mlr listener'): 2078 ip, timeout = line.split() 2079 listeners.append((Ip6Addr(ip), int(timeout))) 2080 2081 return listeners 2082 2083 # 2084 # Thread 1.2 and DUA/MLR utilities 2085 # 2086 2087 def get_domain_name(self) -> str: 2088 """Get the Thread Domain Name for Thread 1.2 device.""" 2089 return self.__parse_str(self.execute_command('domainname')) 2090 2091 def set_domain_name(self, name: str): 2092 """Set the Thread Domain Name for Thread 1.2 device.""" 2093 self.execute_command('domainname %s' % self.__escape_escapable(name)) 2094 2095 # TODO: dua iid 2096 # TODO: dua iid <iid> 2097 # TODO: dua iid clear 2098 # TODO: mlr reg <ipaddr> ... [timeout] 2099 2100 # 2101 # Link metrics management 2102 # 2103 # TODO: linkmetrics mgmt <ipaddr> forward <seriesid> [ldraX][pqmr] 2104 # TODO: linkmetrics probe <ipaddr> <seriesid> <length> 2105 # TODO: linkmetrics query <ipaddr> single [pqmr] 2106 # TODO: linkmetrics query <ipaddr> forward <seriesid> 2107 # TODO: linkquality <extaddr> 2108 # TODO: linkquality <extaddr> <linkquality> 2109 # 2110 2111 # 2112 # Logging 2113 # 2114 2115 def get_log_level(self) -> int: 2116 """Get the log level.""" 2117 return self.__parse_int(self.execute_command('log level')) 2118 2119 def set_log_level(self, level: int): 2120 """Set the log level.""" 2121 self.execute_command(f'log level {level}') 2122 2123 # 2124 # Device performance related information 2125 # 2126 2127 def get_message_buffer_info(self) -> dict: 2128 """Get the current message buffer information.""" 2129 output = self.execute_command('bufferinfo') 2130 2131 info = {} 2132 2133 def _parse_val(val): 2134 vals = val.split() 2135 return int(vals[0]) if len(vals) == 1 else tuple(map(int, vals)) 2136 2137 for line in output: 2138 key, val = line.split(':') 2139 key, val = key.strip(), val.strip() 2140 info[key.replace(' ', '_')] = _parse_val(val) 2141 2142 return info 2143 2144 @constant_property 2145 def counter_names(self): 2146 """Get the supported counter names.""" 2147 return tuple(self.execute_command('counters')) 2148 2149 def get_counter(self, name: str) -> Counter: 2150 """Reset the counter value.""" 2151 output = self.execute_command(f'counters {name}') 2152 2153 counter = Counter() 2154 for line in output: 2155 k, v = line.strip().split(': ') 2156 counter[k] = int(v) 2157 2158 return counter 2159 2160 def reset_counter(self, name: str): 2161 """Reset the counter value.""" 2162 self.execute_command(f'counters {name} reset') 2163 2164 def get_eidcache(self) -> Dict[Ip6Addr, Rloc16]: 2165 """Get the EID-to-RLOC cache entries.""" 2166 output = self.execute_command('eidcache') 2167 cache = {} 2168 2169 for line in output: 2170 ip, rloc16, _ = line.split(" ", 2) 2171 2172 cache[Ip6Addr(ip)] = Rloc16(rloc16, 16) 2173 2174 return cache 2175 2176 # 2177 # UDP utilities 2178 # 2179 2180 def udp_open(self): 2181 """Opens the example socket.""" 2182 self.execute_command('udp open') 2183 2184 def udp_close(self): 2185 """Opens the example socket.""" 2186 self.execute_command('udp close') 2187 2188 def udp_bind(self, ip: str, port: int, netif: NetifIdentifier = NetifIdentifier.THERAD): 2189 """Assigns a name (i.e. IPv6 address and port) to the example socket. 2190 2191 :param ip: the IPv6 address or the unspecified IPv6 address (::). 2192 :param port: the UDP port 2193 """ 2194 bindarg = '' 2195 if netif == NetifIdentifier.UNSPECIFIED: 2196 bindarg += ' -u' 2197 elif netif == NetifIdentifier.BACKBONE: 2198 bindarg += ' -b' 2199 2200 self.execute_command(f'udp bind{bindarg} {ip} {port}') 2201 2202 def udp_connect(self, ip: str, port: int): 2203 """Specifies the peer with which the socket is to be associated. 2204 2205 ip: the peer's IPv6 address. 2206 port: the peer's UDP port. 2207 """ 2208 self.execute_command(f'udp connect {ip} {port}') 2209 2210 def udp_send(self, ip: str = None, port: int = None, text: str = None, random_bytes: int = None, hex: str = None): 2211 """Send a few bytes over UDP. 2212 2213 ip: the IPv6 destination address. 2214 port: the UDP destination port. 2215 type: the type of the message: _ -t: text payload in the value, same as without specifying the type. _ -s: autogenerated payload with specified length indicated in the value. 2216 * -x: binary data in hexadecimal representation in the value. 2217 """ 2218 if (ip is None) != (port is None): 2219 raise InvalidArgumentsError("Please specify both `ip` and `port`.") 2220 2221 if (text is not None) + (random_bytes is not None) + (hex is not None) != 1: 2222 raise InvalidArgumentsError("Please specify `text` or `random_bytes` or `hex`.") 2223 2224 cmd = 'udp send' 2225 2226 if ip is not None: 2227 cmd += f' {ip} {port}' 2228 2229 if text is not None: 2230 cmd += f' -t {text}' 2231 elif random_bytes is not None: 2232 cmd += f' -s {random_bytes}' 2233 elif hex is not None: 2234 self.__validate_hex(hex) 2235 cmd += f' -x {hex}' 2236 2237 self.execute_command(cmd) 2238 2239 def udp_get_link_security(self) -> bool: 2240 """Gets whether the link security is enabled or disabled.""" 2241 return self.__parse_Enabled_or_Disabled(self.execute_command('udp linksecurity')) 2242 2243 def udp_enable_link_security(self): 2244 """Enable link security.""" 2245 self.execute_command('udp linksecurity enable') 2246 2247 def udp_disable_link_security(self): 2248 """Disable link security.""" 2249 self.execute_command('udp linksecurity disable') 2250 2251 def netstat(self) -> List[Tuple[Tuple[Ip6Addr, int], Tuple[Ip6Addr, int]]]: 2252 cmd = 'netstat' 2253 output = self.execute_command(cmd) 2254 if len(output) < 2: 2255 raise UnexpectedCommandOutput(output) 2256 2257 socks = [] 2258 for line in output[2:]: 2259 _, sock_addr, peer_addr = line.strip().split('|')[:3] 2260 sock_addr = self.__parse_socket_addr(sock_addr.strip()) 2261 peer_addr = self.__parse_socket_addr(peer_addr.strip()) 2262 socks.append((sock_addr, peer_addr)) 2263 2264 return socks 2265 2266 @staticmethod 2267 def __parse_socket_addr(addr: str) -> Tuple[Ip6Addr, int]: 2268 addr, port = addr.rsplit(':', 1) 2269 if addr.startswith('[') and addr.endswith(']'): 2270 addr = addr[1:-1] 2271 2272 return Ip6Addr(addr), int(port) if port != '*' else 0 2273 2274 # 2275 # CoAP CLI (test) utilities 2276 # 2277 def coap_start(self): 2278 """Starts the application coap service.""" 2279 self.execute_command('coap start') 2280 2281 def coap_stop(self): 2282 """Stops the application coap service.""" 2283 self.execute_command('coap stop') 2284 2285 def coap_get(self, addr: str, uri_path: str, type: str = "con"): 2286 cmd = f'coap get {addr} {uri_path} {type}' 2287 self.execute_command(cmd) 2288 2289 def coap_put(self, addr: str, uri_path: str, type: str = "con", payload: str = None): 2290 cmd = f'coap put {addr} {uri_path} {type}' 2291 2292 if payload is not None: 2293 cmd += f' {payload}' 2294 2295 self.execute_command(cmd) 2296 2297 def coap_post(self, addr: str, uri_path: str, type: str = "con", payload: str = None): 2298 cmd = f'coap post {addr} {uri_path} {type}' 2299 2300 if payload is not None: 2301 cmd += f' {payload}' 2302 2303 self.execute_command(cmd) 2304 2305 def coap_delete(self, addr: str, uri_path: str, type: str = "con", payload: str = None): 2306 cmd = f'coap delete {addr} {uri_path} {type}' 2307 2308 if payload is not None: 2309 cmd += f' {payload}' 2310 2311 self.execute_command(cmd) 2312 2313 def coap_get_test_resource_path(self) -> str: 2314 """Gets the URI path for the test resource.""" 2315 return self.__parse_str(self.execute_command('coap resource')) 2316 2317 def coap_set_test_resource_path(self, path: str): 2318 """Sets the URI path for the test resource.""" 2319 self.execute_command(f'coap resource {path}') 2320 2321 def coap_test_set_resource_content(self, content: str): 2322 """Sets the content sent by the test resource. If a CoAP client is observing the resource, a notification is sent to that client.""" 2323 self.execute_command(f'coap set {content}') 2324 2325 # TODO: coap observe <address> <uri-path> [type] 2326 # TODO: coap cancel 2327 # TODO: coap parameters <type> ["default"|<ack_timeout> <ack_random_factor_numerator> <ack_random_factor_denominator> <max_retransmit>] 2328 # TODO: CoAP Secure utilities 2329 2330 # 2331 # Other TODOs 2332 # 2333 # TODO: netstat 2334 # TODO: networkdiagnostic get <addr> <type> .. 2335 # TODO: networkdiagnostic reset <addr> <type> .. 2336 # TODO: parent 2337 # TODO: pskc [-p] <key>|<passphrase> 2338 # 2339 2340 # 2341 # Private methods 2342 # 2343 2344 def __parse_str(self, output: List[str]) -> str: 2345 if len(output) != 1: 2346 raise UnexpectedCommandOutput(output) 2347 2348 return output[0] 2349 2350 def __parse_int_list(self, output: List[str]) -> List[int]: 2351 line = self.__parse_str(output) 2352 return list(map(int, line.strip().split())) 2353 2354 def __parse_ip6addr(self, output: List[str]) -> Ip6Addr: 2355 return Ip6Addr(self.__parse_str(output)) 2356 2357 def __parse_ip6addr_list(self, output: List[str]) -> List[Ip6Addr]: 2358 return [Ip6Addr(line) for line in output] 2359 2360 def __parse_int(self, output: List[str], base=10) -> int: 2361 if len(output) != 1: 2362 raise UnexpectedCommandOutput(output) 2363 2364 return int(output[0], base) 2365 2366 def __parse_network_key(self, output: List[str]) -> str: 2367 networkkey = self.__parse_str(output) 2368 2369 try: 2370 self.__validate_network_key(networkkey) 2371 except ValueError: 2372 raise UnexpectedCommandOutput(output) 2373 2374 return networkkey 2375 2376 def __validate_network_key(self, networkkey: str): 2377 if len(networkkey) != 32: 2378 raise ValueError(networkkey) 2379 2380 int(networkkey, 16) 2381 2382 def __parse_hex64b(self, output: List[str]) -> str: 2383 extaddr = self.__parse_str(output) 2384 2385 try: 2386 self.__validate_hex64b(extaddr) 2387 except ValueError: 2388 raise UnexpectedCommandOutput(output) 2389 2390 return extaddr 2391 2392 __parse_extaddr = __parse_hex64b 2393 __parse_extpanid = __parse_hex64b 2394 __parse_eui64 = __parse_hex64b 2395 __parse_joiner_id = __parse_hex64b 2396 2397 def __validate_hex64b(self, extaddr: str): 2398 if len(extaddr) != 16: 2399 raise ValueError(extaddr) 2400 2401 self.__validate_hex(extaddr) 2402 2403 def __validate_hex(self, hexstr: str): 2404 if len(hexstr) % 2 != 0: 2405 raise ValueError(hexstr) 2406 2407 for i in range(0, len(hexstr), 2): 2408 int(hexstr[i:i + 2], 16) 2409 2410 __validate_extaddr = __validate_hex64b 2411 __validate_extpanid = __validate_hex64b 2412 2413 def __parse_Enabled_or_Disabled(self, output: List[str]) -> bool: 2414 return self.__parse_values(output, Enabled=True, Disabled=False) 2415 2416 def __parse_values(self, output: List[str], **vals) -> Any: 2417 val = self.__parse_str(output) 2418 if val not in vals: 2419 raise UnexpectedCommandOutput(output) 2420 2421 return vals[val] 2422 2423 def __validate_hex_or_bytes(self, data: Union[str, bytes]) -> str: 2424 if isinstance(data, bytes): 2425 return ''.join('%02x' % c for c in data) 2426 else: 2427 self.__validate_hex(data) 2428 return data 2429 2430 def __hex_to_bytes(self, hexstr: str) -> bytes: 2431 self.__validate_hex(hexstr) 2432 return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) 2433 2434 def __bytes_to_hex(self, data: bytes) -> str: 2435 return ''.join('%02x' % b for b in data) 2436 2437 def __escape_escapable(self, s: str) -> str: 2438 """Escape CLI escapable characters in the given string. 2439 """ 2440 escapable_chars = '\\ \t\r\n' 2441 for char in escapable_chars: 2442 s = s.replace(char, '\\%s' % char) 2443 return s 2444 2445 def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str: 2446 txt_bin = b'' 2447 for k, v in txt.items(): 2448 assert '=' not in k, 'TXT key must not contain `=`' 2449 2450 if isinstance(v, str): 2451 entry = f'{k}={v}'.encode('utf8') 2452 elif isinstance(v, bytes): 2453 entry = f'{k}='.encode('utf8') + v 2454 else: 2455 assert v is True, 'TXT val must be str or bytes or True' 2456 entry = k.encode('utf8') 2457 2458 assert len(entry) <= 255, 'TXT entry is too long' 2459 2460 txt_bin += bytes([len(entry)]) 2461 txt_bin += entry 2462 2463 return ''.join('%02x' % b for b in txt_bin) 2464 2465 2466def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI: 2467 cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator) 2468 cmd_handler = OtCliCommandRunner(cli_handler) 2469 return OTCI(cmd_handler) 2470 2471 2472def connect_cli_serial(dev: str, baudrate=115200) -> OTCI: 2473 cli_handler = connectors.OtCliSerial(dev, baudrate) 2474 cmd_handler = OtCliCommandRunner(cli_handler) 2475 return OTCI(cmd_handler) 2476 2477 2478def connect_ncp_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI: 2479 ncp_handler = connectors.OtNcpSim(executable, nodeid, simulator=simulator) 2480 cmd_handler = OtCliCommandRunner(ncp_handler, is_spinel_cli=True) 2481 return OTCI(cmd_handler) 2482 2483 2484def connect_otbr_ssh(host: str, port: int = 22, username='pi', password='raspberry', sudo=True): 2485 cmd_handler = OtbrSshCommandRunner(host, port, username, password, sudo=sudo) 2486 return OTCI(cmd_handler) 2487 2488 2489def connect_cmd_handler(cmd_handler: OTCommandHandler) -> OTCI: 2490 return OTCI(cmd_handler) 2491