• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import json
18import logging
19import math
20import os
21import threading
22
23from acts import context
24from acts import utils
25from acts.controllers.android_device import AndroidDevice
26from acts.controllers.utils_lib.ssh import connection
27from acts.controllers.utils_lib.ssh import settings
28from acts.event import event_bus
29from acts.event.decorators import subscribe_static
30from acts.event.event import TestClassBeginEvent
31from acts.event.event import TestClassEndEvent
32from acts.libs.proc import job
33
34ACTS_CONTROLLER_CONFIG_NAME = 'IPerfServer'
35ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers'
36
37
38def create(configs):
39    """ Factory method for iperf servers.
40
41    The function creates iperf servers based on at least one config.
42    If configs only specify a port number, a regular local IPerfServer object
43    will be created. If configs contains ssh settings or and AndroidDevice,
44    remote iperf servers will be started on those devices
45
46    Args:
47        configs: config parameters for the iperf server
48    """
49    results = []
50    for c in configs:
51        if type(c) in (str, int) and str(c).isdigit():
52            results.append(IPerfServer(int(c)))
53        elif type(c) is dict and 'AndroidDevice' in c and 'port' in c:
54            results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port']))
55        elif type(c) is dict and 'ssh_config' in c and 'port' in c:
56            results.append(IPerfServerOverSsh(c['ssh_config'], c['port']))
57        else:
58            raise ValueError(
59                'Config entry %s in %s is not a valid IPerfServer '
60                'config.' % (repr(c), configs))
61    return results
62
63
64def destroy(iperf_server_list):
65    for iperf_server in iperf_server_list:
66        try:
67            iperf_server.stop()
68        except Exception:
69            logging.exception('Unable to properly clean up %s.' % iperf_server)
70
71
72class IPerfResult(object):
73    def __init__(self, result_path):
74        """Loads iperf result from file.
75
76        Loads iperf result from JSON formatted server log. File can be accessed
77        before or after server is stopped. Note that only the first JSON object
78        will be loaded and this funtion is not intended to be used with files
79        containing multiple iperf client runs.
80        """
81        try:
82            with open(result_path, 'r') as f:
83                iperf_output = f.readlines()
84                if '}\n' in iperf_output:
85                    iperf_output = iperf_output[:iperf_output.index('}\n') + 1]
86                iperf_string = ''.join(iperf_output)
87                iperf_string = iperf_string.replace('nan', '0')
88                self.result = json.loads(iperf_string)
89        except ValueError:
90            with open(result_path, 'r') as f:
91                # Possibly a result from interrupted iperf run, skip first line
92                # and try again.
93                lines = f.readlines()[1:]
94                self.result = json.loads(''.join(lines))
95
96    def _has_data(self):
97        """Checks if the iperf result has valid throughput data.
98
99        Returns:
100            True if the result contains throughput data. False otherwise.
101        """
102        return ('end' in self.result) and ('sum_received' in self.result['end']
103                                           or 'sum' in self.result['end'])
104
105    def get_json(self):
106        """Returns the raw json output from iPerf."""
107        return self.result
108
109    @property
110    def error(self):
111        return self.result.get('error', None)
112
113    @property
114    def avg_rate(self):
115        """Average UDP rate in MB/s over the entire run.
116
117        This is the average UDP rate observed at the terminal the iperf result
118        is pulled from. According to iperf3 documentation this is calculated
119        based on bytes sent and thus is not a good representation of the
120        quality of the link. If the result is not from a success run, this
121        property is None.
122        """
123        if not self._has_data() or 'sum' not in self.result['end']:
124            return None
125        bps = self.result['end']['sum']['bits_per_second']
126        return bps / 8 / 1024 / 1024
127
128    @property
129    def avg_receive_rate(self):
130        """Average receiving rate in MB/s over the entire run.
131
132        This data may not exist if iperf was interrupted. If the result is not
133        from a success run, this property is None.
134        """
135        if not self._has_data() or 'sum_received' not in self.result['end']:
136            return None
137        bps = self.result['end']['sum_received']['bits_per_second']
138        return bps / 8 / 1024 / 1024
139
140    @property
141    def avg_send_rate(self):
142        """Average sending rate in MB/s over the entire run.
143
144        This data may not exist if iperf was interrupted. If the result is not
145        from a success run, this property is None.
146        """
147        if not self._has_data() or 'sum_sent' not in self.result['end']:
148            return None
149        bps = self.result['end']['sum_sent']['bits_per_second']
150        return bps / 8 / 1024 / 1024
151
152    @property
153    def instantaneous_rates(self):
154        """Instantaneous received rate in MB/s over entire run.
155
156        This data may not exist if iperf was interrupted. If the result is not
157        from a success run, this property is None.
158        """
159        if not self._has_data():
160            return None
161        intervals = [
162            interval['sum']['bits_per_second'] / 8 / 1024 / 1024
163            for interval in self.result['intervals']
164        ]
165        return intervals
166
167    @property
168    def std_deviation(self):
169        """Standard deviation of rates in MB/s over entire run.
170
171        This data may not exist if iperf was interrupted. If the result is not
172        from a success run, this property is None.
173        """
174        return self.get_std_deviation(0)
175
176    def get_std_deviation(self, iperf_ignored_interval):
177        """Standard deviation of rates in MB/s over entire run.
178
179        This data may not exist if iperf was interrupted. If the result is not
180        from a success run, this property is None. A configurable number of
181        beginning (and the single last) intervals are ignored in the
182        calculation as they are inaccurate (e.g. the last is from a very small
183        interval)
184
185        Args:
186            iperf_ignored_interval: number of iperf interval to ignored in
187            calculating standard deviation
188
189        Returns:
190            The standard deviation.
191        """
192        if not self._has_data():
193            return None
194        instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval:
195                                                       -1]
196        avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates)
197        sqd_deviations = [(rate - avg_rate)**2 for rate in instantaneous_rates]
198        std_dev = math.sqrt(
199            math.fsum(sqd_deviations) / (len(sqd_deviations) - 1))
200        return std_dev
201
202
203class IPerfServerBase(object):
204    # Keeps track of the number of IPerfServer logs to prevent file name
205    # collisions.
206    __log_file_counter = 0
207
208    __log_file_lock = threading.Lock()
209
210    def __init__(self, port):
211        self._port = port
212        # TODO(markdr): Remove this after migration to the new iperf APIs.
213        self.log_files = []
214
215    @property
216    def port(self):
217        raise NotImplementedError('port must be specified.')
218
219    @property
220    def started(self):
221        raise NotImplementedError('started must be specified.')
222
223    def start(self, extra_args='', tag=''):
224        """Starts an iperf3 server.
225
226        Args:
227            extra_args: A string representing extra arguments to start iperf
228                server with.
229            tag: Appended to log file name to identify logs from different
230                iperf runs.
231        """
232        raise NotImplementedError('start() must be specified.')
233
234    def stop(self):
235        """Stops the iperf server.
236
237        Returns:
238            The name of the log file generated from the terminated session.
239        """
240        raise NotImplementedError('stop() must be specified.')
241
242    def _get_full_file_path(self, tag=None):
243        """Returns the full file path for the IPerfServer log file.
244
245        Note: If the directory for the file path does not exist, it will be
246        created.
247
248        Args:
249            tag: The tag passed in to the server run.
250        """
251        out_dir = self.log_path
252
253        with IPerfServerBase.__log_file_lock:
254            tags = [tag, IPerfServerBase.__log_file_counter]
255            out_file_name = 'IPerfServer,%s.log' % (','.join(
256                [str(x) for x in tags if x != '' and x is not None]))
257            IPerfServerBase.__log_file_counter += 1
258
259        file_path = os.path.join(out_dir, out_file_name)
260        self.log_files.append(file_path)
261        return file_path
262
263    @property
264    def log_path(self):
265        current_context = context.get_current_context()
266        full_out_dir = os.path.join(current_context.get_full_output_path(),
267                                    'IPerfServer%s' % self.port)
268
269        # Ensure the directory exists.
270        utils.create_dir(full_out_dir)
271
272        return full_out_dir
273
274
275class IPerfServer(IPerfServerBase):
276    """Class that handles iperf server commands on localhost."""
277
278    def __init__(self, port):
279        super().__init__(port)
280        self._iperf_command = 'iperf3 -s -J -p {}'.format(self.port)
281        self._current_log_file = None
282        self._iperf_process = None
283
284    @property
285    def port(self):
286        return self._port
287
288    @property
289    def started(self):
290        return self._iperf_process is not None
291
292    def start(self, extra_args='', tag=''):
293        """Starts iperf server on local machine.
294
295        Args:
296            extra_args: A string representing extra arguments to start iperf
297                server with.
298            tag: Appended to log file name to identify logs from different
299                iperf runs.
300        """
301        if self._iperf_process is not None:
302            return
303
304        self._current_log_file = self._get_full_file_path(tag)
305
306        cmd = '{cmd} {extra_flags} > {log_file}'.format(
307            cmd=self._iperf_command,
308            extra_flags=extra_args,
309            log_file=self._current_log_file)
310
311        self._iperf_process = utils.start_standing_subprocess(cmd)
312
313    def stop(self):
314        """Stops the iperf server.
315
316        Returns:
317            The name of the log file generated from the terminated session.
318        """
319        if self._iperf_process is None:
320            return
321
322        utils.stop_standing_subprocess(self._iperf_process)
323
324        self._iperf_process = None
325        return self._current_log_file
326
327
328class IPerfServerOverSsh(IPerfServerBase):
329    """Class that handles iperf3 operations on remote machines."""
330
331    def __init__(self, ssh_config, port):
332        super().__init__(port)
333        ssh_settings = settings.from_config(ssh_config)
334        self._ssh_session = connection.SshConnection(ssh_settings)
335
336        self._iperf_command = 'iperf3 -s -J -p {}'.format(self.port)
337        self._iperf_pid = None
338        self._current_tag = None
339
340    @property
341    def port(self):
342        return self._port
343
344    @property
345    def started(self):
346        return self._iperf_pid is not None
347
348    def _get_remote_log_path(self):
349        return 'iperf_server_port%s.log' % self.port
350
351    def start(self, extra_args='', tag=''):
352        """Starts iperf server on specified machine and port.
353
354        Args:
355            extra_args: A string representing extra arguments to start iperf
356                server with.
357            tag: Appended to log file name to identify logs from different
358                iperf runs.
359        """
360        if self.started:
361            return
362
363        cmd = '{cmd} {extra_flags} > {log_file}'.format(
364            cmd=self._iperf_command,
365            extra_flags=extra_args,
366            log_file=self._get_remote_log_path())
367
368        job_result = self._ssh_session.run_async(cmd)
369        self._iperf_pid = job_result.stdout
370        self._current_tag = tag
371
372    def stop(self):
373        """Stops the iperf server.
374
375        Returns:
376            The name of the log file generated from the terminated session.
377        """
378        if not self.started:
379            return
380
381        self._ssh_session.run_async('kill -9 {}'.format(str(self._iperf_pid)))
382        iperf_result = self._ssh_session.run('cat {}'.format(
383            self._get_remote_log_path()))
384
385        log_file = self._get_full_file_path(self._current_tag)
386        with open(log_file, 'w') as f:
387            f.write(iperf_result.stdout)
388
389        self._ssh_session.run_async('rm {}'.format(
390            self._get_remote_log_path()))
391        self._iperf_pid = None
392        return log_file
393
394
395# TODO(markdr): Remove this after automagic controller creation has been
396# removed.
397class _AndroidDeviceBridge(object):
398    """A helper class for connecting serial numbers to AndroidDevices."""
399
400    # A dict of serial -> AndroidDevice, where AndroidDevice is a device found
401    # in the current TestClass's controllers.
402    android_devices = {}
403
404    @staticmethod
405    @subscribe_static(TestClassBeginEvent)
406    def on_test_begin(event):
407        for device in getattr(event.test_class, 'android_devices', []):
408            _AndroidDeviceBridge.android_devices[device.serial] = device
409
410    @staticmethod
411    @subscribe_static(TestClassEndEvent)
412    def on_test_end(_):
413        _AndroidDeviceBridge.android_devices = {}
414
415
416event_bus.register_subscription(
417    _AndroidDeviceBridge.on_test_begin.subscription)
418event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription)
419
420
421class IPerfServerOverAdb(IPerfServerBase):
422    """Class that handles iperf3 operations over ADB devices."""
423
424    def __init__(self, android_device_or_serial, port):
425        """Creates a new IPerfServerOverAdb object.
426
427        Args:
428            android_device_or_serial: Either an AndroidDevice object, or the
429                serial that corresponds to the AndroidDevice. Note that the
430                serial must be present in an AndroidDevice entry in the ACTS
431                config.
432            port: The port number to open the iperf server on.
433        """
434        super().__init__(port)
435        self._android_device_or_serial = android_device_or_serial
436
437        self._iperf_command = 'iperf3 -s -J -p {}'.format(self.port)
438        self._iperf_process = None
439        self._current_tag = ''
440
441    @property
442    def port(self):
443        return self._port
444
445    @property
446    def started(self):
447        return self._iperf_process is not None
448
449    @property
450    def _android_device(self):
451        if isinstance(self._android_device_or_serial, AndroidDevice):
452            return self._android_device_or_serial
453        else:
454            return _AndroidDeviceBridge.android_devices[
455                self._android_device_or_serial]
456
457    def _get_device_log_path(self):
458        return '~/data/iperf_server_port%s.log' % self.port
459
460    def start(self, extra_args='', tag=''):
461        """Starts iperf server on an ADB device.
462
463        Args:
464            extra_args: A string representing extra arguments to start iperf
465                server with.
466            tag: Appended to log file name to identify logs from different
467                iperf runs.
468        """
469        if self._iperf_process is not None:
470            return
471
472        self._iperf_process = self._android_device.adb.shell_nb(
473            '{cmd} {extra_flags} > {log_file}'.format(
474                cmd=self._iperf_command,
475                extra_flags=extra_args,
476                log_file=self._get_device_log_path()))
477        self._iperf_process_adb_pid = ''
478        while len(self._iperf_process_adb_pid) == 0:
479            self._iperf_process_adb_pid = self._android_device.adb.shell(
480                'pgrep iperf3 -n')
481
482        self._current_tag = tag
483
484    def stop(self):
485        """Stops the iperf server.
486
487        Returns:
488            The name of the log file generated from the terminated session.
489        """
490        if self._iperf_process is None:
491            return
492
493        job.run('kill -9 {}'.format(self._iperf_process.pid))
494
495        #TODO(markdr): update with definitive kill method
496        while True:
497            iperf_process_list = self._android_device.adb.shell('pgrep iperf3')
498            if iperf_process_list.find(self._iperf_process_adb_pid) == -1:
499                break
500            else:
501                self._android_device.adb.shell("kill -9 {}".format(
502                    self._iperf_process_adb_pid))
503
504        iperf_result = self._android_device.adb.shell('cat {}'.format(
505            self._get_device_log_path()))
506
507        log_file = self._get_full_file_path(self._current_tag)
508        with open(log_file, 'w') as f:
509            f.write(iperf_result)
510
511        self._android_device.adb.shell('rm {}'.format(
512            self._get_device_log_path()))
513
514        self._iperf_process = None
515        return log_file
516