• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import json
18import logging
19import math
20import os
21import shlex
22import subprocess
23import threading
24import time
25
26from acts import context
27from acts import utils
28from acts.controllers.android_device import AndroidDevice
29from acts.controllers.utils_lib.ssh import connection
30from acts.controllers.utils_lib.ssh import settings
31from acts.event import event_bus
32from acts.event.decorators import subscribe_static
33from acts.event.event import TestClassBeginEvent
34from acts.event.event import TestClassEndEvent
35from acts.libs.proc import job
36
37MOBLY_CONTROLLER_CONFIG_NAME = 'IPerfServer'
38ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers'
39KILOBITS = 1024
40MEGABITS = KILOBITS * 1024
41GIGABITS = MEGABITS * 1024
42BITS_IN_BYTE = 8
43
44
45def create(configs):
46    """ Factory method for iperf servers.
47
48    The function creates iperf servers based on at least one config.
49    If configs only specify a port number, a regular local IPerfServer object
50    will be created. If configs contains ssh settings or and AndroidDevice,
51    remote iperf servers will be started on those devices
52
53    Args:
54        configs: config parameters for the iperf server
55    """
56    results = []
57    for c in configs:
58        if type(c) in (str, int) and str(c).isdigit():
59            results.append(IPerfServer(int(c)))
60        elif type(c) is dict and 'AndroidDevice' in c and 'port' in c:
61            results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port']))
62        elif type(c) is dict and 'ssh_config' in c and 'port' in c:
63            results.append(
64                IPerfServerOverSsh(c['ssh_config'],
65                                   c['port'],
66                                   test_interface=c.get('test_interface'),
67                                   use_killall=c.get('use_killall')))
68        else:
69            raise ValueError(
70                'Config entry %s in %s is not a valid IPerfServer '
71                'config.' % (repr(c), configs))
72    return results
73
74
75def get_info(iperf_servers):
76    """Placeholder for info about iperf servers
77
78    Returns:
79        None
80    """
81    return None
82
83
84def destroy(iperf_server_list):
85    for iperf_server in iperf_server_list:
86        try:
87            iperf_server.stop()
88        except Exception:
89            logging.exception('Unable to properly clean up %s.' % iperf_server)
90
91
92class IPerfResult(object):
93    def __init__(self, result_path, reporting_speed_units='Mbytes'):
94        """Loads iperf result from file.
95
96        Loads iperf result from JSON formatted server log. File can be accessed
97        before or after server is stopped. Note that only the first JSON object
98        will be loaded and this funtion is not intended to be used with files
99        containing multiple iperf client runs.
100        """
101        # if result_path isn't a path, treat it as JSON
102        self.reporting_speed_units = reporting_speed_units
103        if not os.path.exists(result_path):
104            self.result = json.loads(result_path)
105        else:
106            try:
107                with open(result_path, 'r') as f:
108                    iperf_output = f.readlines()
109                    if '}\n' in iperf_output:
110                        iperf_output = iperf_output[:iperf_output.index('}\n'
111                                                                        ) + 1]
112                    iperf_string = ''.join(iperf_output)
113                    iperf_string = iperf_string.replace('nan', '0')
114                    self.result = json.loads(iperf_string)
115            except ValueError:
116                with open(result_path, 'r') as f:
117                    # Possibly a result from interrupted iperf run,
118                    # skip first line and try again.
119                    lines = f.readlines()[1:]
120                    self.result = json.loads(''.join(lines))
121
122    def _has_data(self):
123        """Checks if the iperf result has valid throughput data.
124
125        Returns:
126            True if the result contains throughput data. False otherwise.
127        """
128        return ('end' in self.result) and ('sum_received' in self.result['end']
129                                           or 'sum' in self.result['end'])
130
131    def _get_reporting_speed(self, network_speed_in_bits_per_second):
132        """Sets the units for the network speed reporting based on how the
133        object was initiated.  Defaults to Megabytes per second.  Currently
134        supported, bits per second (bits), kilobits per second (kbits), megabits
135        per second (mbits), gigabits per second (gbits), bytes per second
136        (bytes), kilobits per second (kbytes), megabits per second (mbytes),
137        gigabytes per second (gbytes).
138
139        Args:
140            network_speed_in_bits_per_second: The network speed from iperf in
141                bits per second.
142
143        Returns:
144            The value of the throughput in the appropriate units.
145        """
146        speed_divisor = 1
147        if self.reporting_speed_units[1:].lower() == 'bytes':
148            speed_divisor = speed_divisor * BITS_IN_BYTE
149        if self.reporting_speed_units[0:1].lower() == 'k':
150            speed_divisor = speed_divisor * KILOBITS
151        if self.reporting_speed_units[0:1].lower() == 'm':
152            speed_divisor = speed_divisor * MEGABITS
153        if self.reporting_speed_units[0:1].lower() == 'g':
154            speed_divisor = speed_divisor * GIGABITS
155        return network_speed_in_bits_per_second / speed_divisor
156
157    def get_json(self):
158        """Returns the raw json output from iPerf."""
159        return self.result
160
161    @property
162    def error(self):
163        return self.result.get('error', None)
164
165    @property
166    def avg_rate(self):
167        """Average UDP rate in MB/s over the entire run.
168
169        This is the average UDP rate observed at the terminal the iperf result
170        is pulled from. According to iperf3 documentation this is calculated
171        based on bytes sent and thus is not a good representation of the
172        quality of the link. If the result is not from a success run, this
173        property is None.
174        """
175        if not self._has_data() or 'sum' not in self.result['end']:
176            return None
177        bps = self.result['end']['sum']['bits_per_second']
178        return self._get_reporting_speed(bps)
179
180    @property
181    def avg_receive_rate(self):
182        """Average receiving rate in MB/s over the entire run.
183
184        This data may not exist if iperf was interrupted. If the result is not
185        from a success run, this property is None.
186        """
187        if not self._has_data() or 'sum_received' not in self.result['end']:
188            return None
189        bps = self.result['end']['sum_received']['bits_per_second']
190        return self._get_reporting_speed(bps)
191
192    @property
193    def avg_send_rate(self):
194        """Average sending rate in MB/s over the entire run.
195
196        This data may not exist if iperf was interrupted. If the result is not
197        from a success run, this property is None.
198        """
199        if not self._has_data() or 'sum_sent' not in self.result['end']:
200            return None
201        bps = self.result['end']['sum_sent']['bits_per_second']
202        return self._get_reporting_speed(bps)
203
204    @property
205    def instantaneous_rates(self):
206        """Instantaneous received rate in MB/s over entire run.
207
208        This data may not exist if iperf was interrupted. If the result is not
209        from a success run, this property is None.
210        """
211        if not self._has_data():
212            return None
213        intervals = [
214            self._get_reporting_speed(interval['sum']['bits_per_second'])
215            for interval in self.result['intervals']
216        ]
217        return intervals
218
219    @property
220    def std_deviation(self):
221        """Standard deviation of rates in MB/s over entire run.
222
223        This data may not exist if iperf was interrupted. If the result is not
224        from a success run, this property is None.
225        """
226        return self.get_std_deviation(0)
227
228    def get_std_deviation(self, iperf_ignored_interval):
229        """Standard deviation of rates in MB/s over entire run.
230
231        This data may not exist if iperf was interrupted. If the result is not
232        from a success run, this property is None. A configurable number of
233        beginning (and the single last) intervals are ignored in the
234        calculation as they are inaccurate (e.g. the last is from a very small
235        interval)
236
237        Args:
238            iperf_ignored_interval: number of iperf interval to ignored in
239            calculating standard deviation
240
241        Returns:
242            The standard deviation.
243        """
244        if not self._has_data():
245            return None
246        instantaneous_rates = self.instantaneous_rates[
247            iperf_ignored_interval:-1]
248        avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates)
249        sqd_deviations = ([(rate - avg_rate)**2
250                           for rate in instantaneous_rates])
251        std_dev = math.sqrt(
252            math.fsum(sqd_deviations) / (len(sqd_deviations) - 1))
253        return std_dev
254
255
256class IPerfServerBase(object):
257    # Keeps track of the number of IPerfServer logs to prevent file name
258    # collisions.
259    __log_file_counter = 0
260
261    __log_file_lock = threading.Lock()
262
263    def __init__(self, port):
264        self._port = port
265        # TODO(markdr): We shouldn't be storing the log files in an array like
266        # this. Nobody should be reading this property either. Instead, the
267        # IPerfResult should be returned in stop() with all the necessary info.
268        # See aosp/1012824 for a WIP implementation.
269        self.log_files = []
270
271    @property
272    def port(self):
273        raise NotImplementedError('port must be specified.')
274
275    @property
276    def started(self):
277        raise NotImplementedError('started must be specified.')
278
279    def start(self, extra_args='', tag=''):
280        """Starts an iperf3 server.
281
282        Args:
283            extra_args: A string representing extra arguments to start iperf
284                server with.
285            tag: Appended to log file name to identify logs from different
286                iperf runs.
287        """
288        raise NotImplementedError('start() must be specified.')
289
290    def stop(self):
291        """Stops the iperf server.
292
293        Returns:
294            The name of the log file generated from the terminated session.
295        """
296        raise NotImplementedError('stop() must be specified.')
297
298    def _get_full_file_path(self, tag=None):
299        """Returns the full file path for the IPerfServer log file.
300
301        Note: If the directory for the file path does not exist, it will be
302        created.
303
304        Args:
305            tag: The tag passed in to the server run.
306        """
307        out_dir = self.log_path
308
309        with IPerfServerBase.__log_file_lock:
310            tags = [tag, IPerfServerBase.__log_file_counter]
311            out_file_name = 'IPerfServer,%s.log' % (','.join(
312                [str(x) for x in tags if x != '' and x is not None]))
313            IPerfServerBase.__log_file_counter += 1
314
315        file_path = os.path.join(out_dir, out_file_name)
316        self.log_files.append(file_path)
317        return file_path
318
319    @property
320    def log_path(self):
321        current_context = context.get_current_context()
322        full_out_dir = os.path.join(current_context.get_full_output_path(),
323                                    'IPerfServer%s' % self.port)
324
325        # Ensure the directory exists.
326        os.makedirs(full_out_dir, exist_ok=True)
327
328        return full_out_dir
329
330
331def _get_port_from_ss_output(ss_output, pid):
332    pid = str(pid)
333    lines = ss_output.split('\n')
334    for line in lines:
335        if pid in line:
336            # Expected format:
337            # tcp LISTEN  0 5 *:<PORT>  *:* users:(("cmd",pid=<PID>,fd=3))
338            return line.split()[4].split(':')[-1]
339    else:
340        raise ProcessLookupError('Could not find started iperf3 process.')
341
342
343class IPerfServer(IPerfServerBase):
344    """Class that handles iperf server commands on localhost."""
345    def __init__(self, port=5201):
346        super().__init__(port)
347        self._hinted_port = port
348        self._current_log_file = None
349        self._iperf_process = None
350        self._last_opened_file = None
351
352    @property
353    def port(self):
354        return self._port
355
356    @property
357    def started(self):
358        return self._iperf_process is not None
359
360    def start(self, extra_args='', tag=''):
361        """Starts iperf server on local machine.
362
363        Args:
364            extra_args: A string representing extra arguments to start iperf
365                server with.
366            tag: Appended to log file name to identify logs from different
367                iperf runs.
368        """
369        if self._iperf_process is not None:
370            return
371
372        self._current_log_file = self._get_full_file_path(tag)
373
374        # Run an iperf3 server on the hinted port with JSON output.
375        command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J']
376
377        command.extend(shlex.split(extra_args))
378
379        if self._last_opened_file:
380            self._last_opened_file.close()
381        self._last_opened_file = open(self._current_log_file, 'w')
382        self._iperf_process = subprocess.Popen(command,
383                                               stdout=self._last_opened_file,
384                                               stderr=subprocess.DEVNULL)
385        for attempts_left in reversed(range(3)):
386            try:
387                self._port = int(
388                    _get_port_from_ss_output(
389                        job.run('ss -l -p -n | grep iperf').stdout,
390                        self._iperf_process.pid))
391                break
392            except ProcessLookupError:
393                if attempts_left == 0:
394                    raise
395                logging.debug('iperf3 process not started yet.')
396                time.sleep(.01)
397
398    def stop(self):
399        """Stops the iperf server.
400
401        Returns:
402            The name of the log file generated from the terminated session.
403        """
404        if self._iperf_process is None:
405            return
406
407        if self._last_opened_file:
408            self._last_opened_file.close()
409            self._last_opened_file = None
410
411        self._iperf_process.terminate()
412        self._iperf_process = None
413
414        return self._current_log_file
415
416    def __del__(self):
417        self.stop()
418
419
420class IPerfServerOverSsh(IPerfServerBase):
421    """Class that handles iperf3 operations on remote machines."""
422    def __init__(self,
423                 ssh_config,
424                 port,
425                 test_interface=None,
426                 use_killall=False):
427        super().__init__(port)
428        self.ssh_settings = settings.from_config(ssh_config)
429        self._ssh_session = None
430        self.start_ssh()
431
432        self._iperf_pid = None
433        self._current_tag = None
434        self.hostname = self.ssh_settings.hostname
435        self._use_killall = str(use_killall).lower() == 'true'
436        try:
437            # A test interface can only be found if an ip address is specified.
438            # A fully qualified hostname will return None for the
439            # test_interface.
440            self.test_interface = self._get_test_interface_based_on_ip(
441                test_interface)
442        except Exception:
443            self.test_interface = None
444
445    @property
446    def port(self):
447        return self._port
448
449    @property
450    def started(self):
451        return self._iperf_pid is not None
452
453    def _get_remote_log_path(self):
454        return '/tmp/iperf_server_port%s.log' % self.port
455
456    def _get_test_interface_based_on_ip(self, test_interface):
457        """Gets the test interface for a particular IP if the test interface
458            passed in test_interface is None
459
460        Args:
461            test_interface: Either a interface name, ie eth0, or None
462
463        Returns:
464            The name of the test interface.
465        """
466        if test_interface:
467            return test_interface
468        return utils.get_interface_based_on_ip(self._ssh_session,
469                                               self.hostname)
470
471    def get_interface_ip_addresses(self, interface):
472        """Gets all of the ip addresses, ipv4 and ipv6, associated with a
473           particular interface name.
474
475        Args:
476            interface: The interface name on the device, ie eth0
477
478        Returns:
479            A list of dictionaries of the the various IP addresses:
480                ipv4_private_local_addresses: Any 192.168, 172.16, or 10
481                    addresses
482                ipv4_public_addresses: Any IPv4 public addresses
483                ipv6_link_local_addresses: Any fe80:: addresses
484                ipv6_private_local_addresses: Any fd00:: addresses
485                ipv6_public_addresses: Any publicly routable addresses
486        """
487        if not self._ssh_session:
488            self.start_ssh()
489
490        return utils.get_interface_ip_addresses(self._ssh_session, interface)
491
492    def renew_test_interface_ip_address(self):
493        """Renews the test interface's IP address.  Necessary for changing
494           DHCP scopes during a test.
495        """
496        if not self._ssh_session:
497            self.start_ssh()
498        utils.renew_linux_ip_address(self._ssh_session, self.test_interface)
499
500    def _cleanup_iperf_port(self):
501        """Checks and kills zombie iperf servers occupying intended port."""
502        iperf_check_cmd = ('netstat -tulpn | grep LISTEN | grep iperf3'
503                           ' | grep :{}').format(self.port)
504        iperf_check = self._ssh_session.run(iperf_check_cmd,
505                                            ignore_status=True)
506        iperf_check = iperf_check.stdout
507        if iperf_check:
508            logging.debug('Killing zombie server on port {}'.format(self.port))
509            iperf_pid = iperf_check.split(' ')[-1].split('/')[0]
510            self._ssh_session.run('kill -9 {}'.format(str(iperf_pid)))
511
512    def start(self, extra_args='', tag='', iperf_binary=None):
513        """Starts iperf server on specified machine and port.
514
515        Args:
516            extra_args: A string representing extra arguments to start iperf
517                server with.
518            tag: Appended to log file name to identify logs from different
519                iperf runs.
520            iperf_binary: Location of iperf3 binary. If none, it is assumed the
521                the binary is in the path.
522        """
523        if self.started:
524            return
525
526        if not self._ssh_session:
527            self.start_ssh()
528        self._cleanup_iperf_port()
529        if not iperf_binary:
530            logging.debug('No iperf3 binary specified.  '
531                          'Assuming iperf3 is in the path.')
532            iperf_binary = 'iperf3'
533        else:
534            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
535        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
536
537        cmd = '{cmd} {extra_flags} > {log_file}'.format(
538            cmd=iperf_command,
539            extra_flags=extra_args,
540            log_file=self._get_remote_log_path())
541
542        job_result = self._ssh_session.run_async(cmd)
543        self._iperf_pid = job_result.stdout
544        self._current_tag = tag
545
546    def stop(self):
547        """Stops the iperf server.
548
549        Returns:
550            The name of the log file generated from the terminated session.
551        """
552        if not self.started:
553            return
554
555        if self._use_killall:
556            self._ssh_session.run('killall iperf3', ignore_status=True)
557        else:
558            self._ssh_session.run_async('kill -9 {}'.format(
559                str(self._iperf_pid)))
560
561        iperf_result = self._ssh_session.run('cat {}'.format(
562            self._get_remote_log_path()))
563
564        log_file = self._get_full_file_path(self._current_tag)
565        with open(log_file, 'w') as f:
566            f.write(iperf_result.stdout)
567
568        self._ssh_session.run_async('rm {}'.format(
569            self._get_remote_log_path()))
570        self._iperf_pid = None
571        return log_file
572
573    def start_ssh(self):
574        """Starts an ssh session to the iperf server."""
575        if not self._ssh_session:
576            self._ssh_session = connection.SshConnection(self.ssh_settings)
577
578    def close_ssh(self):
579        """Closes the ssh session to the iperf server, if one exists, preventing
580        connection reset errors when rebooting server device.
581        """
582        if self.started:
583            self.stop()
584        if self._ssh_session:
585            self._ssh_session.close()
586            self._ssh_session = None
587
588
589# TODO(markdr): Remove this after automagic controller creation has been
590# removed.
591class _AndroidDeviceBridge(object):
592    """A helper class for connecting serial numbers to AndroidDevices."""
593
594    _test_class = None
595
596    @staticmethod
597    @subscribe_static(TestClassBeginEvent)
598    def on_test_begin(event):
599        _AndroidDeviceBridge._test_class = event.test_class
600
601    @staticmethod
602    @subscribe_static(TestClassEndEvent)
603    def on_test_end(_):
604        _AndroidDeviceBridge._test_class = None
605
606    @staticmethod
607    def android_devices():
608        """A dict of serial -> AndroidDevice, where AndroidDevice is a device
609        found in the current TestClass's controllers.
610        """
611        if not _AndroidDeviceBridge._test_class:
612            return {}
613        return {
614            device.serial: device
615            for device in _AndroidDeviceBridge._test_class.android_devices
616        }
617
618
619event_bus.register_subscription(
620    _AndroidDeviceBridge.on_test_begin.subscription)
621event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription)
622
623
624class IPerfServerOverAdb(IPerfServerBase):
625    """Class that handles iperf3 operations over ADB devices."""
626    def __init__(self, android_device_or_serial, port):
627        """Creates a new IPerfServerOverAdb object.
628
629        Args:
630            android_device_or_serial: Either an AndroidDevice object, or the
631                serial that corresponds to the AndroidDevice. Note that the
632                serial must be present in an AndroidDevice entry in the ACTS
633                config.
634            port: The port number to open the iperf server on.
635        """
636        super().__init__(port)
637        self._android_device_or_serial = android_device_or_serial
638
639        self._iperf_process = None
640        self._current_tag = ''
641
642    @property
643    def port(self):
644        return self._port
645
646    @property
647    def started(self):
648        return self._iperf_process is not None
649
650    @property
651    def _android_device(self):
652        if isinstance(self._android_device_or_serial, AndroidDevice):
653            return self._android_device_or_serial
654        else:
655            return _AndroidDeviceBridge.android_devices()[
656                self._android_device_or_serial]
657
658    def _get_device_log_path(self):
659        return '~/data/iperf_server_port%s.log' % self.port
660
661    def start(self, extra_args='', tag='', iperf_binary=None):
662        """Starts iperf server on an ADB device.
663
664        Args:
665            extra_args: A string representing extra arguments to start iperf
666                server with.
667            tag: Appended to log file name to identify logs from different
668                iperf runs.
669            iperf_binary: Location of iperf3 binary. If none, it is assumed the
670                the binary is in the path.
671        """
672        if self._iperf_process is not None:
673            return
674
675        if not iperf_binary:
676            logging.debug('No iperf3 binary specified.  '
677                          'Assuming iperf3 is in the path.')
678            iperf_binary = 'iperf3'
679        else:
680            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
681        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
682
683        self._iperf_process = self._android_device.adb.shell_nb(
684            '{cmd} {extra_flags} > {log_file}'.format(
685                cmd=iperf_command,
686                extra_flags=extra_args,
687                log_file=self._get_device_log_path()))
688
689        self._iperf_process_adb_pid = ''
690        while len(self._iperf_process_adb_pid) == 0:
691            self._iperf_process_adb_pid = self._android_device.adb.shell(
692                'pgrep iperf3 -n')
693
694        self._current_tag = tag
695
696    def stop(self):
697        """Stops the iperf server.
698
699        Returns:
700            The name of the log file generated from the terminated session.
701        """
702        if self._iperf_process is None:
703            return
704
705        job.run('kill -9 {}'.format(self._iperf_process.pid))
706
707        # TODO(markdr): update with definitive kill method
708        while True:
709            iperf_process_list = self._android_device.adb.shell('pgrep iperf3')
710            if iperf_process_list.find(self._iperf_process_adb_pid) == -1:
711                break
712            else:
713                self._android_device.adb.shell("kill -9 {}".format(
714                    self._iperf_process_adb_pid))
715
716        iperf_result = self._android_device.adb.shell('cat {}'.format(
717            self._get_device_log_path()))
718
719        log_file = self._get_full_file_path(self._current_tag)
720        with open(log_file, 'w') as f:
721            f.write(iperf_result)
722
723        self._android_device.adb.shell('rm {}'.format(
724            self._get_device_log_path()))
725
726        self._iperf_process = None
727        return log_file
728