1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import multiprocessing 19import time 20 21from datetime import datetime 22from uuid import uuid4 23 24from acts import signals 25from acts import tracelogger 26from acts import utils 27from acts.controllers import iperf_client 28from acts.controllers import iperf_server 29 30AC_VO = 'AC_VO' 31AC_VI = 'AC_VI' 32AC_BE = 'AC_BE' 33AC_BK = 'AC_BK' 34 35# TODO(fxb/61421): Add tests to check all DSCP classes are mapped to the correct 36# AC (there are many that aren't included here). Requires implementation of 37# sniffer. 38DEFAULT_AC_TO_TOS_TAG_MAP = { 39 AC_VO: '0xC0', 40 AC_VI: '0x80', 41 AC_BE: '0x0', 42 AC_BK: '0x20' 43} 44UDP = 'udp' 45TCP = 'tcp' 46DEFAULT_IPERF_PORT = 5201 47DEFAULT_STREAM_TIME = 10 48DEFAULT_IP_ADDR_TIMEOUT = 15 49PROCESS_JOIN_TIMEOUT = 60 50AVAILABLE = True 51UNAVAILABLE = False 52 53 54class WmmTransceiverError(signals.ControllerError): 55 pass 56 57 58def create(config, identifier=None, wlan_devices=None, access_points=None): 59 """Creates a WmmTransceiver from a config. 60 61 Args: 62 config: dict, config parameters for the transceiver. Contains: 63 - iperf_config: dict, the config to use for creating IPerfClients 64 and IPerfServers (excluding port). 65 - port_range_start: int, the lower bound of the port range to use 66 for creating IPerfServers. Defaults to 5201. 67 - wlan_device: string, the identifier of the wlan_device used for 68 this WmmTransceiver (optional) 69 70 identifier: string, identifier for the WmmTransceiver. Must be provided 71 either as arg or in the config. 72 wlan_devices: list of WlanDevice objects from which to get the 73 wlan_device, if any, used as this transceiver 74 access_points: list of AccessPoint objects from which to get the 75 access_point, if any, used as this transceiver 76 """ 77 try: 78 # If identifier is not provided as func arg, it must be provided via 79 # config file. 80 if not identifier: 81 identifier = config['identifier'] 82 iperf_config = config['iperf_config'] 83 84 except KeyError as err: 85 raise WmmTransceiverError( 86 'Parameter not provided as func arg, nor found in config: %s' % 87 err) 88 89 if wlan_devices is None: 90 wlan_devices = [] 91 92 if access_points is None: 93 access_points = [] 94 95 port_range_start = config.get('port_range_start', DEFAULT_IPERF_PORT) 96 97 wd = None 98 ap = None 99 if 'wlan_device' in config: 100 wd = _find_wlan_device(config['wlan_device'], wlan_devices) 101 elif 'access_point' in config: 102 ap = _find_access_point(config['access_point'], access_points) 103 104 return WmmTransceiver(iperf_config, 105 identifier, 106 wlan_device=wd, 107 access_point=ap, 108 port_range_start=port_range_start) 109 110 111def _find_wlan_device(wlan_device_identifier, wlan_devices): 112 """Returns WlanDevice based on string identifier (e.g. ip, serial, etc.) 113 114 Args: 115 wlan_device_identifier: string, identifier for the desired WlanDevice 116 wlan_devices: list, WlanDevices to search through 117 118 Returns: 119 WlanDevice, with identifier matching wlan_device_identifier 120 121 Raises: 122 WmmTransceiverError, if no WlanDevice matches identifier 123 """ 124 for wd in wlan_devices: 125 if wlan_device_identifier == wd.identifier: 126 return wd 127 raise WmmTransceiverError('No WlanDevice with identifier: %s' % 128 wlan_device_identifier) 129 130 131def _find_access_point(access_point_ip, access_points): 132 """Returns AccessPoint based on string ip address 133 134 Args: 135 access_point_ip: string, control plane ip addr of the desired AP, 136 access_points: list, AccessPoints to search through 137 138 Returns: 139 AccessPoint, with hostname matching access_point_ip 140 141 Raises: 142 WmmTransceiverError, if no AccessPoint matches ip""" 143 for ap in access_points: 144 if ap.ssh_settings.hostname == access_point_ip: 145 return ap 146 raise WmmTransceiverError('No AccessPoint with ip: %s' % access_point_ip) 147 148 149class WmmTransceiver(object): 150 """Object for handling WMM tagged streams between devices""" 151 def __init__(self, 152 iperf_config, 153 identifier, 154 wlan_device=None, 155 access_point=None, 156 port_range_start=5201): 157 158 self.identifier = identifier 159 self.log = tracelogger.TraceLogger( 160 WmmTransceiverLoggerAdapter(logging.getLogger(), 161 {'identifier': self.identifier})) 162 # WlanDevice or AccessPoint, that is used as the transceiver. Only one 163 # will be set. This helps consolodate association, setup, teardown, etc. 164 self.wlan_device = wlan_device 165 self.access_point = access_point 166 167 # Parameters used to create IPerfClient and IPerfServer objects on 168 # device 169 self._iperf_config = iperf_config 170 self._test_interface = self._iperf_config.get('test_interface') 171 self._port_range_start = port_range_start 172 self._next_server_port = port_range_start 173 174 # Maps IPerfClients, used for streams from this device, to True if 175 # available, False if reserved 176 self._iperf_clients = {} 177 178 # Maps IPerfServers, used to receive streams from other devices, to True 179 # if available, False if reserved 180 self._iperf_servers = {} 181 182 # Maps ports of servers, which are provided to other transceivers, to 183 # the actual IPerfServer objects 184 self._iperf_server_ports = {} 185 186 # Maps stream UUIDs to IPerfClients reserved for that streams use 187 self._reserved_clients = {} 188 189 # Maps stream UUIDs to (WmmTransceiver, IPerfServer) tuples, where the 190 # server is reserved on the transceiver for that streams use 191 self._reserved_servers = {} 192 193 # Maps with shared memory functionality to be used across the parallel 194 # streams. active_streams holds UUIDs of streams that are currently 195 # running on this device (mapped to True, since there is no 196 # multiprocessing set). stream_results maps UUIDs of streams completed 197 # on this device to IPerfResult results for that stream. 198 self._manager = multiprocessing.Manager() 199 self._active_streams = self._manager.dict() 200 self._stream_results = self._manager.dict() 201 202 # Holds parameters for streams that are prepared to run asynchronously 203 # (i.e. resources have been allocated). Maps UUIDs of the future streams 204 # to a dict, containing the stream parameters. 205 self._pending_async_streams = {} 206 207 # Set of UUIDs of asynchronous streams that have at least started, but 208 # have not had their resources reclaimed yet 209 self._ran_async_streams = set() 210 211 # Set of stream parallel process, which can be joined if completed 212 # successfully, or terminated and joined in the event of an error 213 self._running_processes = set() 214 215 def run_synchronous_traffic_stream(self, stream_parameters, subnet): 216 """Runs a traffic stream with IPerf3 between two WmmTransceivers and 217 saves the results. 218 219 Args: 220 stream_parameters: dict, containing parameters to used for the 221 stream. See _parse_stream_parameters for details. 222 subnet: string, the subnet of the network to use for the stream 223 224 Returns: 225 uuid: UUID object, identifier of the stream 226 """ 227 (receiver, access_category, bandwidth, 228 stream_time) = self._parse_stream_parameters(stream_parameters) 229 uuid = uuid4() 230 231 (client, server_ip, 232 server_port) = self._get_stream_resources(uuid, receiver, subnet) 233 234 self._validate_server_address(server_ip, uuid) 235 236 self.log.info('Running synchronous stream to %s WmmTransceiver' % 237 receiver.identifier) 238 self._run_traffic(uuid, 239 client, 240 server_ip, 241 server_port, 242 self._active_streams, 243 self._stream_results, 244 access_category=access_category, 245 bandwidth=bandwidth, 246 stream_time=stream_time) 247 248 self._return_stream_resources(uuid) 249 return uuid 250 251 def prepare_asynchronous_stream(self, stream_parameters, subnet): 252 """Reserves resources and saves configs for upcoming asynchronous 253 traffic streams, so they can be started more simultaneously. 254 255 Args: 256 stream_parameters: dict, containing parameters to used for the 257 stream. See _parse_stream_parameters for details. 258 subnet: string, the subnet of the network to use for the stream 259 260 Returns: 261 uuid: UUID object, identifier of the stream 262 """ 263 (receiver, access_category, bandwidth, 264 time) = self._parse_stream_parameters(stream_parameters) 265 uuid = uuid4() 266 267 (client, server_ip, 268 server_port) = self._get_stream_resources(uuid, receiver, subnet) 269 270 self._validate_server_address(server_ip, uuid) 271 272 pending_stream_config = { 273 'client': client, 274 'server_ip': server_ip, 275 'server_port': server_port, 276 'access_category': access_category, 277 'bandwidth': bandwidth, 278 'time': time 279 } 280 281 self._pending_async_streams[uuid] = pending_stream_config 282 self.log.info('Stream to %s WmmTransceiver prepared.' % 283 receiver.identifier) 284 return uuid 285 286 def start_asynchronous_streams(self, start_time=None): 287 """Starts pending asynchronous streams between two WmmTransceivers as 288 parallel processes. 289 290 Args: 291 start_time: float, time, seconds since epoch, at which to start the 292 stream (for better synchronicity). If None, start immediately. 293 """ 294 for uuid in self._pending_async_streams: 295 pending_stream_config = self._pending_async_streams[uuid] 296 client = pending_stream_config['client'] 297 server_ip = pending_stream_config['server_ip'] 298 server_port = pending_stream_config['server_port'] 299 access_category = pending_stream_config['access_category'] 300 bandwidth = pending_stream_config['bandwidth'] 301 time = pending_stream_config['time'] 302 303 process = multiprocessing.Process(target=self._run_traffic, 304 args=[ 305 uuid, client, server_ip, 306 server_port, 307 self._active_streams, 308 self._stream_results 309 ], 310 kwargs={ 311 'access_category': 312 access_category, 313 'bandwidth': bandwidth, 314 'stream_time': time, 315 'start_time': start_time 316 }) 317 318 # This needs to be set here to ensure its marked active before 319 # it even starts. 320 self._active_streams[uuid] = True 321 process.start() 322 self._ran_async_streams.add(uuid) 323 self._running_processes.add(process) 324 325 self._pending_async_streams.clear() 326 327 def cleanup_asynchronous_streams(self, timeout=PROCESS_JOIN_TIMEOUT): 328 """Releases reservations on resources (IPerfClients and IPerfServers) 329 that were held for asynchronous streams, both pending and finished. 330 Attempts to join any running processes, logging an error if timeout is 331 exceeded. 332 333 Args: 334 timeout: time, in seconds, to wait for each running process, if any, 335 to join 336 """ 337 self.log.info('Cleaning up any asynchronous streams.') 338 339 # Releases resources for any streams that were prepared, but no run 340 for uuid in self._pending_async_streams: 341 self.log.error( 342 'Pending asynchronous stream %s never ran. Cleaning.' % uuid) 343 self._return_stream_resources(uuid) 344 self._pending_async_streams.clear() 345 346 # Attempts to join any running streams, terminating them after timeout 347 # if necessary. 348 while self._running_processes: 349 process = self._running_processes.pop() 350 process.join(timeout) 351 if process.is_alive(): 352 self.log.error( 353 'Stream process failed to join in %s seconds. Terminating.' 354 % timeout) 355 process.terminate() 356 process.join() 357 self._active_streams.clear() 358 359 # Release resources for any finished streams 360 while self._ran_async_streams: 361 uuid = self._ran_async_streams.pop() 362 self._return_stream_resources(uuid) 363 364 def get_results(self, uuid): 365 """Retrieves a streams IPerfResults from stream_results 366 367 Args: 368 uuid: UUID object, identifier of the stream 369 """ 370 return self._stream_results.get(uuid, None) 371 372 def destroy_resources(self): 373 for server in self._iperf_servers: 374 server.stop() 375 self._iperf_servers.clear() 376 self._iperf_server_ports.clear() 377 self._iperf_clients.clear() 378 self._next_server_port = self._port_range_start 379 self._stream_results.clear() 380 381 @property 382 def has_active_streams(self): 383 return bool(self._active_streams) 384 385 # Helper Functions 386 387 def _run_traffic(self, 388 uuid, 389 client, 390 server_ip, 391 server_port, 392 active_streams, 393 stream_results, 394 access_category=None, 395 bandwidth=None, 396 stream_time=DEFAULT_STREAM_TIME, 397 start_time=None): 398 """Runs an iperf3 stream. 399 400 1. Adds stream UUID to active_streams 401 2. Runs stream 402 3. Saves results to stream_results 403 4. Removes stream UUID from active_streams 404 405 Args: 406 uuid: UUID object, identifier for stream 407 client: IPerfClient object on device 408 server_ip: string, ip address of IPerfServer for stream 409 server_port: int, port of the IPerfServer for stream 410 active_streams: multiprocessing.Manager.dict, which holds stream 411 UUIDs of active streams on the device 412 stream_results: multiprocessing.Manager.dict, which maps stream 413 UUIDs of streams to IPerfResult objects 414 access_category: string, WMM access category to use with iperf 415 (AC_BK, AC_BE, AC_VI, AC_VO). Unset if None. 416 bandwidth: int, bandwidth in mbps to use with iperf. Implies UDP. 417 Unlimited if None. 418 stream_time: int, time in seconds, to run iperf stream 419 start_time: float, time, seconds since epoch, at which to start the 420 stream (for better synchronicity). If None, start immediately. 421 """ 422 active_streams[uuid] = True 423 # SSH sessions must be started within the process that is going to 424 # use it. 425 if type(client) == iperf_client.IPerfClientOverSsh: 426 with utils.SuppressLogOutput(): 427 client.start_ssh() 428 429 ac_flag = '' 430 bandwidth_flag = '' 431 time_flag = '-t %s' % stream_time 432 433 if access_category: 434 ac_flag = ' -S %s' % DEFAULT_AC_TO_TOS_TAG_MAP[access_category] 435 436 if bandwidth: 437 bandwidth_flag = ' -u -b %sM' % bandwidth 438 439 iperf_flags = '-p %s -i 1 %s%s%s -J' % (server_port, time_flag, 440 ac_flag, bandwidth_flag) 441 if not start_time: 442 start_time = time.time() 443 time_str = datetime.fromtimestamp(start_time).strftime('%H:%M:%S.%f') 444 self.log.info( 445 'At %s, starting %s second stream to %s:%s with (AC: %s, Bandwidth: %s)' 446 % (time_str, stream_time, server_ip, server_port, access_category, 447 bandwidth if bandwidth else 'Unlimited')) 448 449 # If present, wait for stream start time 450 if start_time: 451 current_time = time.time() 452 while current_time < start_time: 453 current_time = time.time() 454 path = client.start(server_ip, iperf_flags, '%s' % uuid) 455 stream_results[uuid] = iperf_server.IPerfResult( 456 path, reporting_speed_units='mbps') 457 458 if type(client) == iperf_client.IPerfClientOverSsh: 459 client.close_ssh() 460 active_streams.pop(uuid) 461 462 def _get_stream_resources(self, uuid, receiver, subnet): 463 """Reserves an IPerfClient and IPerfServer for a stream. 464 465 Args: 466 uuid: UUID object, identifier of the stream 467 receiver: WmmTransceiver object, which will be the streams receiver 468 subnet: string, subnet of test network, to retrieve the appropriate 469 server address 470 471 Returns: 472 (IPerfClient, string, int) representing the client, server address, 473 and server port to use for the stream 474 """ 475 client = self._get_client(uuid) 476 server_ip, server_port = self._get_server(receiver, uuid, subnet) 477 return (client, server_ip, server_port) 478 479 def _return_stream_resources(self, uuid): 480 """Releases reservations on a streams IPerfClient and IPerfServer, so 481 they can be used by a future stream. 482 483 Args: 484 uuid: UUID object, identifier of the stream 485 """ 486 if uuid in self._active_streams: 487 raise EnvironmentError('Resource still being used by stream %s' % 488 uuid) 489 (receiver, server_port) = self._reserved_servers.pop(uuid) 490 receiver._release_server(server_port) 491 client = self._reserved_clients.pop(uuid) 492 self._iperf_clients[client] = AVAILABLE 493 494 def _get_client(self, uuid): 495 """Retrieves and reserves IPerfClient for use in a stream. If none are 496 available, a new one is created. 497 498 Args: 499 uuid: UUID object, identifier for stream, used to link client to 500 stream for teardown 501 502 Returns: 503 IPerfClient on device 504 """ 505 reserved_client = None 506 for client in self._iperf_clients: 507 if self._iperf_clients[client] == AVAILABLE: 508 reserved_client = client 509 break 510 else: 511 reserved_client = iperf_client.create([self._iperf_config])[0] 512 # Due to the nature of multiprocessing, ssh connections must 513 # be started inside the parallel processes, so it must be closed 514 # here. 515 if type(reserved_client) == iperf_client.IPerfClientOverSsh: 516 reserved_client.close_ssh() 517 518 self._iperf_clients[reserved_client] = UNAVAILABLE 519 self._reserved_clients[uuid] = reserved_client 520 return reserved_client 521 522 def _get_server(self, receiver, uuid, subnet): 523 """Retrieves the address and port of a reserved IPerfServer object from 524 the receiver object for use in a stream. 525 526 Args: 527 receiver: WmmTransceiver, to get an IPerfServer from 528 uuid: UUID, identifier for stream, used to link server to stream 529 for teardown 530 subnet: string, subnet of test network, to retrieve the appropriate 531 server address 532 533 Returns: 534 (string, int) representing the IPerfServer address and port 535 """ 536 (server_ip, server_port) = receiver._reserve_server(subnet) 537 self._reserved_servers[uuid] = (receiver, server_port) 538 return (server_ip, server_port) 539 540 def _reserve_server(self, subnet): 541 """Reserves an available IPerfServer for use in a stream from another 542 WmmTransceiver. If none are available, a new one is created. 543 544 Args: 545 subnet: string, subnet of test network, to retrieve the appropriate 546 server address 547 548 Returns: 549 (string, int) representing the IPerfServer address and port 550 """ 551 reserved_server = None 552 for server in self._iperf_servers: 553 if self._iperf_servers[server] == AVAILABLE: 554 reserved_server = server 555 break 556 else: 557 iperf_server_config = self._iperf_config 558 iperf_server_config.update({'port': self._next_server_port}) 559 self._next_server_port += 1 560 reserved_server = iperf_server.create([iperf_server_config])[0] 561 self._iperf_server_ports[reserved_server.port] = reserved_server 562 563 self._iperf_servers[reserved_server] = UNAVAILABLE 564 reserved_server.start() 565 end_time = time.time() + DEFAULT_IP_ADDR_TIMEOUT 566 while time.time() < end_time: 567 if self.wlan_device: 568 addresses = utils.get_interface_ip_addresses( 569 self.wlan_device.device, self._test_interface) 570 else: 571 addresses = reserved_server.get_interface_ip_addresses( 572 self._test_interface) 573 for addr in addresses['ipv4_private']: 574 if utils.ip_in_subnet(addr, subnet): 575 return (addr, reserved_server.port) 576 raise AttributeError( 577 'Reserved server has no ipv4 address in the %s subnet' % subnet) 578 579 def _release_server(self, server_port): 580 """Releases reservation on IPerfServer, which was held for a stream 581 from another WmmTransceiver. 582 583 Args: 584 server_port: int, the port of the IPerfServer being returned (since) 585 it is the identifying characteristic 586 """ 587 server = self._iperf_server_ports[server_port] 588 server.stop() 589 self._iperf_servers[server] = AVAILABLE 590 591 def _validate_server_address(self, server_ip, uuid, timeout=60): 592 """ Verifies server address can be pinged before attempting to run 593 traffic, since iperf is unforgiving when the server is unreachable. 594 595 Args: 596 server_ip: string, ip address of the iperf server 597 uuid: string, uuid of the stream to use this server 598 timeout: int, time in seconds to wait for server to respond to pings 599 600 Raises: 601 WmmTransceiverError, if, after timeout, server ip is unreachable. 602 """ 603 self.log.info('Verifying server address (%s) is reachable.' % 604 server_ip) 605 end_time = time.time() + timeout 606 while time.time() < end_time: 607 if self.can_ping(server_ip): 608 break 609 else: 610 self.log.debug( 611 'Could not ping server address (%s). Retrying in 1 second.' 612 % (server_ip)) 613 time.sleep(1) 614 else: 615 self._return_stream_resources(uuid) 616 raise WmmTransceiverError('IPerfServer address (%s) unreachable.' % 617 server_ip) 618 619 def can_ping(self, dest_ip): 620 """ Utilizes can_ping function in wlan_device or access_point device to 621 ping dest_ip 622 623 Args: 624 dest_ip: string, ip address to ping 625 626 Returns: 627 True, if dest address is reachable 628 False, otherwise 629 """ 630 if self.wlan_device: 631 return self.wlan_device.can_ping(dest_ip) 632 else: 633 return self.access_point.can_ping(dest_ip) 634 635 def _parse_stream_parameters(self, stream_parameters): 636 """Parses stream_parameters from dictionary. 637 638 Args: 639 stream_parameters: dict of stream parameters 640 'receiver': WmmTransceiver, the receiver for the stream 641 'access_category': String, the access category to use for the 642 stream. Unset if None. 643 'bandwidth': int, bandwidth in mbps for the stream. If set, 644 implies UDP. If unset, implies TCP and unlimited bandwidth. 645 'time': int, time in seconds to run stream. 646 647 Returns: 648 (receiver, access_category, bandwidth, time) as 649 (WmmTransceiver, String, int, int) 650 """ 651 receiver = stream_parameters['receiver'] 652 access_category = stream_parameters.get('access_category', None) 653 bandwidth = stream_parameters.get('bandwidth', None) 654 time = stream_parameters.get('time', DEFAULT_STREAM_TIME) 655 return (receiver, access_category, bandwidth, time) 656 657 658class WmmTransceiverLoggerAdapter(logging.LoggerAdapter): 659 def process(self, msg, kwargs): 660 if self.extra['identifier']: 661 log_identifier = ' | %s' % self.extra['identifier'] 662 else: 663 log_identifier = '' 664 msg = "[WmmTransceiver%s] %s" % (log_identifier, msg) 665 return (msg, kwargs) 666