• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19import json
20import logging
21import os
22import statistics
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers.utils_lib import ssh
28from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
29from acts_contrib.test_utils.wifi import ota_chamber
30from acts_contrib.test_utils.wifi import ota_sniffer
31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
33from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
34from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
35from functools import partial
36
37
38class WifiPingTest(base_test.BaseTestClass):
39    """Class for ping-based Wifi performance tests.
40
41    This class implements WiFi ping performance tests such as range and RTT.
42    The class setups up the AP in the desired configurations, configures
43    and connects the phone to the AP, and runs  For an example config file to
44    run this test class see example_connectivity_performance_ap_sta.json.
45    """
46
47    TEST_TIMEOUT = 10
48    RSSI_POLL_INTERVAL = 0.2
49    SHORT_SLEEP = 1
50    MED_SLEEP = 5
51    MAX_CONSECUTIVE_ZEROS = 5
52    DISCONNECTED_PING_RESULT = {
53        'connected': 0,
54        'rtt': [],
55        'time_stamp': [],
56        'ping_interarrivals': [],
57        'packet_loss_percentage': 100
58    }
59
60    def __init__(self, controllers):
61        base_test.BaseTestClass.__init__(self, controllers)
62        self.testcase_metric_logger = (
63            BlackboxMappedMetricLogger.for_test_case())
64        self.testclass_metric_logger = (
65            BlackboxMappedMetricLogger.for_test_class())
66        self.publish_testcase_metrics = True
67
68    def setup_class(self):
69        self.dut = self.android_devices[-1]
70        req_params = [
71            'ping_test_params', 'testbed_params', 'main_network',
72            'RetailAccessPoints', 'RemoteServer'
73        ]
74        opt_params = ['OTASniffer']
75        self.unpack_userparams(req_params, opt_params)
76        self.testclass_params = self.ping_test_params
77        self.num_atten = self.attenuators[0].instrument.num_atten
78        self.ping_server = ssh.connection.SshConnection(
79            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
80        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
81        if hasattr(self,
82                   'OTASniffer') and self.testbed_params['sniffer_enable']:
83            try:
84                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
85            except:
86                self.log.warning('Could not start sniffer. Disabling sniffs.')
87                self.testbed_params['sniffer_enable'] = 0
88        self.log.info('Access Point Configuration: {}'.format(
89            self.access_point.ap_settings))
90        self.log_path = os.path.join(logging.log_path, 'results')
91        os.makedirs(self.log_path, exist_ok=True)
92        self.atten_dut_chain_map = {}
93        self.testclass_results = []
94
95        # Turn WiFi ON
96        if self.testclass_params.get('airplane_mode', 1):
97            self.log.info('Turning on airplane mode.')
98            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
99                                'Can not turn on airplane mode.')
100        wutils.wifi_toggle_state(self.dut, True)
101
102        # Configure test retries
103        self.user_params['retry_tests'] = [self.__class__.__name__]
104
105    def teardown_class(self):
106        for attenuator in self.attenuators:
107            attenuator.set_atten(0, strict=False, retry=True)
108        # Turn WiFi OFF and reset AP
109        self.access_point.teardown()
110        for dev in self.android_devices:
111            wutils.wifi_toggle_state(dev, False)
112            dev.go_to_sleep()
113        self.process_testclass_results()
114
115    def setup_test(self):
116        self.retry_flag = False
117
118    def teardown_test(self):
119        self.retry_flag = False
120
121    def on_retry(self):
122        """Function to control test logic on retried tests.
123
124        This function is automatically executed on tests that are being
125        retried. In this case the function resets wifi, toggles it off and on
126        and sets a retry_flag to enable further tweaking the test logic on
127        second attempts.
128        """
129        self.retry_flag = True
130        for dev in self.android_devices:
131            wutils.reset_wifi(dev)
132            wutils.toggle_wifi_off_and_on(dev)
133
134    def process_testclass_results(self):
135        """Saves all test results to enable comparison."""
136        testclass_summary = {}
137        for test in self.testclass_results:
138            if 'range' in test['test_name']:
139                testclass_summary[test['test_name']] = test['range']
140        # Save results
141        results_file_path = os.path.join(self.log_path,
142                                         'testclass_summary.json')
143        with open(results_file_path, 'w') as results_file:
144            json.dump(wputils.serialize_dict(testclass_summary),
145                      results_file,
146                      indent=4)
147
148    def pass_fail_check_ping_rtt(self, result):
149        """Check the test result and decide if it passed or failed.
150
151        The function computes RTT statistics and fails any tests in which the
152        tail of the ping latency results exceeds the threshold defined in the
153        configuration file.
154
155        Args:
156            result: dict containing ping results and other meta data
157        """
158        ignored_fraction = (self.testclass_params['rtt_ignored_interval'] /
159                            self.testclass_params['rtt_ping_duration'])
160        sorted_rtt = [
161            sorted(x['rtt'][round(ignored_fraction * len(x['rtt'])):])
162            for x in result['ping_results']
163        ]
164        disconnected = any([len(x) == 0 for x in sorted_rtt])
165        if disconnected:
166            asserts.fail('Test failed. DUT disconnected at least once.')
167
168        rtt_at_test_percentile = [
169            x[int((1 - self.testclass_params['rtt_test_percentile'] / 100) *
170                  len(x))] for x in sorted_rtt
171        ]
172        # Set blackbox metric
173        if self.publish_testcase_metrics:
174            self.testcase_metric_logger.add_metric('ping_rtt',
175                                                   max(rtt_at_test_percentile))
176        # Evaluate test pass/fail
177        rtt_failed = any([
178            rtt > self.testclass_params['rtt_threshold'] * 1000
179            for rtt in rtt_at_test_percentile
180        ])
181        if rtt_failed:
182            #TODO: figure out how to cleanly exclude RTT tests from retry
183            asserts.explicit_pass(
184                'Test failed. RTTs at test percentile = {}'.format(
185                    rtt_at_test_percentile))
186        else:
187            asserts.explicit_pass(
188                'Test Passed. RTTs at test percentile = {}'.format(
189                    rtt_at_test_percentile))
190
191    def pass_fail_check_ping_range(self, result):
192        """Check the test result and decide if it passed or failed.
193
194        Checks whether the attenuation at which ping packet losses begin to
195        exceed the threshold matches the range derived from golden
196        rate-vs-range result files. The test fails is ping range is
197        range_gap_threshold worse than RvR range.
198
199        Args:
200            result: dict containing ping results and meta data
201        """
202        # Evaluate test pass/fail
203        test_message = ('Attenuation at range is {}dB. '
204                        'LLStats at Range: {}'.format(
205                            result['range'], result['llstats_at_range']))
206        if result['peak_throughput_pct'] < 95:
207            asserts.fail('(RESULT NOT RELIABLE) {}'.format(test_message))
208
209        # If pass, set Blackbox metric
210        if self.publish_testcase_metrics:
211            self.testcase_metric_logger.add_metric('ping_range',
212                                                   result['range'])
213        asserts.explicit_pass(test_message)
214
215    def pass_fail_check(self, result):
216        if 'range' in result['testcase_params']['test_type']:
217            self.pass_fail_check_ping_range(result)
218        else:
219            self.pass_fail_check_ping_rtt(result)
220
221    def process_ping_results(self, testcase_params, ping_range_result):
222        """Saves and plots ping results.
223
224        Args:
225            ping_range_result: dict containing ping results and metadata
226        """
227        # Compute range
228        ping_loss_over_att = [
229            x['packet_loss_percentage']
230            for x in ping_range_result['ping_results']
231        ]
232        ping_loss_above_threshold = [
233            x > self.testclass_params['range_ping_loss_threshold']
234            for x in ping_loss_over_att
235        ]
236        for idx in range(len(ping_loss_above_threshold)):
237            if all(ping_loss_above_threshold[idx:]):
238                range_index = max(idx, 1) - 1
239                break
240        else:
241            range_index = -1
242        ping_range_result['atten_at_range'] = testcase_params['atten_range'][
243            range_index]
244        ping_range_result['peak_throughput_pct'] = 100 - min(
245            ping_loss_over_att)
246        ping_range_result['total_attenuation'] = [
247            ping_range_result['fixed_attenuation'] + att
248            for att in testcase_params['atten_range']
249        ]
250        ping_range_result['range'] = (ping_range_result['atten_at_range'] +
251                                      ping_range_result['fixed_attenuation'])
252        ping_range_result['llstats_at_range'] = (
253            'TX MCS = {0} ({1:.1f}%). '
254            'RX MCS = {2} ({3:.1f}%)'.format(
255                ping_range_result['llstats'][range_index]['summary']
256                ['common_tx_mcs'], ping_range_result['llstats'][range_index]
257                ['summary']['common_tx_mcs_freq'] * 100,
258                ping_range_result['llstats'][range_index]['summary']
259                ['common_rx_mcs'], ping_range_result['llstats'][range_index]
260                ['summary']['common_rx_mcs_freq'] * 100))
261
262        # Save results
263        results_file_path = os.path.join(
264            self.log_path, '{}.json'.format(self.current_test_name))
265        with open(results_file_path, 'w') as results_file:
266            json.dump(wputils.serialize_dict(ping_range_result),
267                      results_file,
268                      indent=4)
269
270        # Plot results
271        if 'rtt' in self.current_test_name:
272            figure = BokehFigure(self.current_test_name,
273                                 x_label='Timestamp (s)',
274                                 primary_y_label='Round Trip Time (ms)')
275            for idx, result in enumerate(ping_range_result['ping_results']):
276                if len(result['rtt']) > 1:
277                    x_data = [
278                        t - result['time_stamp'][0]
279                        for t in result['time_stamp']
280                    ]
281                    figure.add_line(
282                        x_data, result['rtt'], 'RTT @ {}dB'.format(
283                            ping_range_result['attenuation'][idx]))
284
285            output_file_path = os.path.join(
286                self.log_path, '{}.html'.format(self.current_test_name))
287            figure.generate_figure(output_file_path)
288
289    def run_ping_test(self, testcase_params):
290        """Main function to test ping.
291
292        The function sets up the AP in the correct channel and mode
293        configuration and calls get_ping_stats while sweeping attenuation
294
295        Args:
296            testcase_params: dict containing all test parameters
297        Returns:
298            test_result: dict containing ping results and other meta data
299        """
300        # Prepare results dict
301        llstats_obj = wputils.LinkLayerStats(
302            self.dut, self.testclass_params.get('llstats_enabled', True))
303        test_result = collections.OrderedDict()
304        test_result['testcase_params'] = testcase_params.copy()
305        test_result['test_name'] = self.current_test_name
306        test_result['ap_config'] = self.access_point.ap_settings.copy()
307        test_result['attenuation'] = testcase_params['atten_range']
308        test_result['fixed_attenuation'] = self.testbed_params[
309            'fixed_attenuation'][str(testcase_params['channel'])]
310        test_result['rssi_results'] = []
311        test_result['ping_results'] = []
312        test_result['llstats'] = []
313        # Setup sniffer
314        if self.testbed_params['sniffer_enable']:
315            self.sniffer.start_capture(
316                testcase_params['test_network'],
317                chan=testcase_params['channel'],
318                bw=testcase_params['bandwidth'],
319                duration=testcase_params['ping_duration'] *
320                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
321        # Run ping and sweep attenuation as needed
322        zero_counter = 0
323        pending_first_ping = 1
324        for atten in testcase_params['atten_range']:
325            for attenuator in self.attenuators:
326                attenuator.set_atten(atten, strict=False, retry=True)
327            if self.testclass_params.get('monitor_rssi', 1):
328                rssi_future = wputils.get_connected_rssi_nb(
329                    self.dut,
330                    int(testcase_params['ping_duration'] / 2 /
331                        self.RSSI_POLL_INTERVAL), self.RSSI_POLL_INTERVAL,
332                    testcase_params['ping_duration'] / 2)
333            # Refresh link layer stats
334            llstats_obj.update_stats()
335            if testcase_params.get('ping_from_dut', False):
336                current_ping_stats = wputils.get_ping_stats(
337                    self.dut,
338                    wputils.get_server_address(self.ping_server, self.dut_ip,
339                                               '255.255.255.0'),
340                    testcase_params['ping_duration'],
341                    testcase_params['ping_interval'],
342                    testcase_params['ping_size'])
343            else:
344                current_ping_stats = wputils.get_ping_stats(
345                    self.ping_server, self.dut_ip,
346                    testcase_params['ping_duration'],
347                    testcase_params['ping_interval'],
348                    testcase_params['ping_size'])
349            if self.testclass_params.get('monitor_rssi', 1):
350                current_rssi = rssi_future.result()
351            else:
352                current_rssi = collections.OrderedDict([
353                    ('time_stamp', []), ('bssid', []), ('ssid', []),
354                    ('frequency', []),
355                    ('signal_poll_rssi', wputils.empty_rssi_result()),
356                    ('signal_poll_avg_rssi', wputils.empty_rssi_result()),
357                    ('chain_0_rssi', wputils.empty_rssi_result()),
358                    ('chain_1_rssi', wputils.empty_rssi_result())
359                ])
360            test_result['rssi_results'].append(current_rssi)
361            llstats_obj.update_stats()
362            curr_llstats = llstats_obj.llstats_incremental.copy()
363            test_result['llstats'].append(curr_llstats)
364            if current_ping_stats['connected']:
365                llstats_str = 'TX MCS = {0} ({1:.1f}%). RX MCS = {2} ({3:.1f}%)'.format(
366                    curr_llstats['summary']['common_tx_mcs'],
367                    curr_llstats['summary']['common_tx_mcs_freq'] * 100,
368                    curr_llstats['summary']['common_rx_mcs'],
369                    curr_llstats['summary']['common_rx_mcs_freq'] * 100)
370                self.log.info(
371                    'Attenuation = {0}dB\tPacket Loss = {1:.1f}%\t'
372                    'Avg RTT = {2:.2f}ms\tRSSI = {3:.1f} [{4:.1f},{5:.1f}]\t{6}\t'
373                    .format(atten,
374                            current_ping_stats['packet_loss_percentage'],
375                            statistics.mean(current_ping_stats['rtt']),
376                            current_rssi['signal_poll_rssi']['mean'],
377                            current_rssi['chain_0_rssi']['mean'],
378                            current_rssi['chain_1_rssi']['mean'], llstats_str))
379                if current_ping_stats['packet_loss_percentage'] == 100:
380                    zero_counter = zero_counter + 1
381                else:
382                    zero_counter = 0
383                    pending_first_ping = 0
384            else:
385                self.log.info(
386                    'Attenuation = {}dB. Disconnected.'.format(atten))
387                zero_counter = zero_counter + 1
388            test_result['ping_results'].append(current_ping_stats.as_dict())
389            # Test ends when ping loss stable at 0. If test has successfully
390            # started, test ends on MAX_CONSECUTIVE_ZEROS. In case of a restry
391            # extra zeros are allowed to ensure a test properly starts.
392            if self.retry_flag and pending_first_ping:
393                allowable_zeros = self.MAX_CONSECUTIVE_ZEROS**2
394            else:
395                allowable_zeros = self.MAX_CONSECUTIVE_ZEROS
396            if zero_counter == allowable_zeros:
397                self.log.info('Ping loss stable at 100%. Stopping test now.')
398                for idx in range(
399                        len(testcase_params['atten_range']) -
400                        len(test_result['ping_results'])):
401                    test_result['ping_results'].append(
402                        self.DISCONNECTED_PING_RESULT)
403                break
404        # Set attenuator to initial setting
405        for attenuator in self.attenuators:
406            attenuator.set_atten(testcase_params['atten_range'][0],
407                                 strict=False,
408                                 retry=True)
409        if self.testbed_params['sniffer_enable']:
410            self.sniffer.stop_capture()
411        return test_result
412
413    def setup_ap(self, testcase_params):
414        """Sets up the access point in the configuration required by the test.
415
416        Args:
417            testcase_params: dict containing AP and other test params
418        """
419        band = self.access_point.band_lookup_by_channel(
420            testcase_params['channel'])
421        if '6G' in band:
422            frequency = wutils.WifiEnums.channel_6G_to_freq[int(
423                testcase_params['channel'].strip('6g'))]
424        else:
425            if testcase_params['channel'] < 13:
426                frequency = wutils.WifiEnums.channel_2G_to_freq[
427                    testcase_params['channel']]
428            else:
429                frequency = wutils.WifiEnums.channel_5G_to_freq[
430                    testcase_params['channel']]
431        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
432            self.access_point.set_region(self.testbed_params['DFS_region'])
433        else:
434            self.access_point.set_region(self.testbed_params['default_region'])
435        self.access_point.set_channel(band, testcase_params['channel'])
436        self.access_point.set_bandwidth(band, testcase_params['mode'])
437        if 'low' in testcase_params['ap_power']:
438            self.log.info('Setting low AP power.')
439            self.access_point.set_power(
440                band, self.testclass_params['low_ap_tx_power'])
441        self.log.info('Access Point Configuration: {}'.format(
442            self.access_point.ap_settings))
443
444    def validate_and_connect(self, testcase_params):
445        if wputils.validate_network(self.dut,
446                                    testcase_params['test_network']['SSID']):
447            self.log.info('Already connected to desired network')
448        else:
449            current_country = wputils.get_country_code(self.dut)
450            if current_country != self.testclass_params['country_code']:
451                self.log.warning(
452                    'Requested CC: {}, Current CC: {}. Resetting WiFi'.format(
453                        self.testclass_params['country_code'],
454                        current_country))
455                wutils.wifi_toggle_state(self.dut, False)
456                wutils.set_wifi_country_code(
457                    self.dut, self.testclass_params['country_code'])
458                wutils.wifi_toggle_state(self.dut, True)
459                wutils.reset_wifi(self.dut)
460                wutils.set_wifi_country_code(
461                    self.dut, self.testclass_params['country_code'])
462            if self.testbed_params.get('txbf_off', False):
463                wputils.disable_beamforming(self.dut)
464            testcase_params['test_network']['channel'] = testcase_params[
465                'channel']
466            wutils.wifi_connect(self.dut,
467                                testcase_params['test_network'],
468                                num_of_tries=5,
469                                check_connectivity=True)
470
471    def setup_dut(self, testcase_params):
472        """Sets up the DUT in the configuration required by the test.
473
474        Args:
475            testcase_params: dict containing AP and other test params
476        """
477        # Turn screen off to preserve battery
478        if self.testbed_params.get('screen_on',
479                                   False) or self.testclass_params.get(
480                                       'screen_on', False):
481            self.dut.droid.wakeLockAcquireDim()
482        else:
483            self.dut.go_to_sleep()
484        self.validate_and_connect(testcase_params)
485        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
486        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
487            self.atten_dut_chain_map[testcase_params[
488                'channel']] = wputils.get_current_atten_dut_chain_map(
489                    self.attenuators, self.dut, self.ping_server)
490        self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
491            self.atten_dut_chain_map[testcase_params['channel']]))
492        for idx, atten in enumerate(self.attenuators):
493            if self.atten_dut_chain_map[testcase_params['channel']][
494                    idx] == testcase_params['attenuated_chain']:
495                atten.offset = atten.instrument.max_atten
496            else:
497                atten.offset = 0
498
499    def setup_ping_test(self, testcase_params):
500        """Function that gets devices ready for the test.
501
502        Args:
503            testcase_params: dict containing test-specific parameters
504        """
505        # Configure AP
506        self.setup_ap(testcase_params)
507        # Set attenuator to 0 dB
508        for attenuator in self.attenuators:
509            attenuator.set_atten(testcase_params['atten_range'][0],
510                                 strict=False,
511                                 retry=True)
512        # Reset, configure, and connect DUT
513        self.setup_dut(testcase_params)
514
515    def get_range_start_atten(self, testcase_params):
516        """Gets the starting attenuation for this ping test.
517
518        This function is used to get the starting attenuation for ping range
519        tests. This implementation returns the default starting attenuation,
520        however, defining this function enables a more involved configuration
521        for over-the-air test classes.
522
523        Args:
524            testcase_params: dict containing all test params
525        """
526        return self.testclass_params['range_atten_start']
527
528    def compile_test_params(self, testcase_params):
529        # Check if test should be skipped.
530        wputils.check_skip_conditions(testcase_params, self.dut,
531                                      self.access_point,
532                                      getattr(self, 'ota_chamber', None))
533
534        band = self.access_point.band_lookup_by_channel(
535            testcase_params['channel'])
536        testcase_params['test_network'] = self.main_network[band]
537        if testcase_params['chain_mask'] in ['0', '1']:
538            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
539                1 if testcase_params['chain_mask'] == '0' else 0)
540        else:
541            # Set attenuated chain to -1. Do not set to None as this will be
542            # compared to RF chain map which may include None
543            testcase_params['attenuated_chain'] = -1
544        if testcase_params['test_type'] == 'test_ping_range':
545            testcase_params.update(
546                ping_interval=self.testclass_params['range_ping_interval'],
547                ping_duration=self.testclass_params['range_ping_duration'],
548                ping_size=self.testclass_params['ping_size'],
549            )
550        elif testcase_params['test_type'] == 'test_fast_ping_rtt':
551            testcase_params.update(
552                ping_interval=self.testclass_params['rtt_ping_interval']
553                ['fast'],
554                ping_duration=self.testclass_params['rtt_ping_duration'],
555                ping_size=self.testclass_params['ping_size'],
556            )
557        elif testcase_params['test_type'] == 'test_slow_ping_rtt':
558            testcase_params.update(
559                ping_interval=self.testclass_params['rtt_ping_interval']
560                ['slow'],
561                ping_duration=self.testclass_params['rtt_ping_duration'],
562                ping_size=self.testclass_params['ping_size'])
563
564        if testcase_params['test_type'] == 'test_ping_range':
565            start_atten = self.get_range_start_atten(testcase_params)
566            num_atten_steps = int(
567                (self.testclass_params['range_atten_stop'] - start_atten) /
568                self.testclass_params['range_atten_step'])
569            testcase_params['atten_range'] = [
570                start_atten + x * self.testclass_params['range_atten_step']
571                for x in range(0, num_atten_steps)
572            ]
573        else:
574            testcase_params['atten_range'] = self.testclass_params[
575                'rtt_test_attenuation']
576        return testcase_params
577
578    def _test_ping(self, testcase_params):
579        """ Function that gets called for each range test case
580
581        The function gets called in each range test case. It customizes the
582        range test based on the test name of the test that called it
583
584        Args:
585            testcase_params: dict containing preliminary set of parameters
586        """
587        # Compile test parameters from config and test name
588        testcase_params = self.compile_test_params(testcase_params)
589        # Run ping test
590        self.setup_ping_test(testcase_params)
591        ping_result = self.run_ping_test(testcase_params)
592        # Postprocess results
593        self.process_ping_results(testcase_params, ping_result)
594        self.testclass_results.append(ping_result)
595        self.pass_fail_check(ping_result)
596
597    def generate_test_cases(self, ap_power, channels, modes, chain_mask,
598                            test_types):
599        """Function that auto-generates test cases for a test class."""
600        test_cases = []
601        allowed_configs = {
602            20: [
603                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
604                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
605            ],
606            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
607            80: [36, 100, 149, '6g37', '6g117', '6g213'],
608            160: [36, '6g37', '6g117', '6g213']
609        }
610
611        for channel, mode, chain, test_type in itertools.product(
612                channels, modes, chain_mask, test_types):
613            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
614            if channel not in allowed_configs[bandwidth]:
615                continue
616            testcase_name = '{}_ch{}_{}_ch{}'.format(test_type, channel, mode,
617                                                     chain)
618            testcase_params = collections.OrderedDict(test_type=test_type,
619                                                      ap_power=ap_power,
620                                                      channel=channel,
621                                                      mode=mode,
622                                                      bandwidth=bandwidth,
623                                                      chain_mask=chain)
624            setattr(self, testcase_name,
625                    partial(self._test_ping, testcase_params))
626            test_cases.append(testcase_name)
627        return test_cases
628
629
630class WifiPing_TwoChain_Test(WifiPingTest):
631    def __init__(self, controllers):
632        super().__init__(controllers)
633        self.tests = self.generate_test_cases(ap_power='standard',
634                                              channels=[
635                                                  1, 6, 11, 36, 40, 44, 48,
636                                                  149, 153, 157, 161, '6g37',
637                                                  '6g117', '6g213'
638                                              ],
639                                              modes=['bw20', 'bw40', 'bw80'],
640                                              test_types=[
641                                                  'test_ping_range',
642                                                  'test_fast_ping_rtt',
643                                                  'test_slow_ping_rtt'
644                                              ],
645                                              chain_mask=['2x2'])
646
647
648class WifiPing_PerChainRange_Test(WifiPingTest):
649    def __init__(self, controllers):
650        super().__init__(controllers)
651        self.tests = self.generate_test_cases(ap_power='standard',
652                                              chain_mask=['0', '1', '2x2'],
653                                              channels=[
654                                                  1, 6, 11, 36, 40, 44, 48,
655                                                  149, 153, 157, 161, '6g37',
656                                                  '6g117', '6g213'
657                                              ],
658                                              modes=['bw20', 'bw40', 'bw80'],
659                                              test_types=['test_ping_range'])
660
661
662class WifiPing_LowPowerAP_Test(WifiPingTest):
663    def __init__(self, controllers):
664        super().__init__(controllers)
665        self.tests = self.generate_test_cases(
666            ap_power='low_power',
667            chain_mask=['0', '1', '2x2'],
668            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
669            modes=['bw20', 'bw40', 'bw80'],
670            test_types=['test_ping_range'])
671
672
673# Over-the air version of ping tests
674class WifiOtaPingTest(WifiPingTest):
675    """Class to test over-the-air ping
676
677    This class tests WiFi ping performance in an OTA chamber. It enables
678    setting turntable orientation and other chamber parameters to study
679    performance in varying channel conditions
680    """
681    def __init__(self, controllers):
682        base_test.BaseTestClass.__init__(self, controllers)
683        self.testcase_metric_logger = (
684            BlackboxMappedMetricLogger.for_test_case())
685        self.testclass_metric_logger = (
686            BlackboxMappedMetricLogger.for_test_class())
687        self.publish_testcase_metrics = False
688
689    def setup_class(self):
690        WifiPingTest.setup_class(self)
691        self.ota_chamber = ota_chamber.create(
692            self.user_params['OTAChamber'])[0]
693
694    def teardown_class(self):
695        WifiPingTest.teardown_class(self)
696        self.process_testclass_results()
697        self.ota_chamber.reset_chamber()
698
699    def process_testclass_results(self):
700        """Saves all test results to enable comparison."""
701        WifiPingTest.process_testclass_results(self)
702
703        range_vs_angle = collections.OrderedDict()
704        for test in self.testclass_results:
705            curr_params = test['testcase_params']
706            curr_config = wputils.extract_sub_dict(
707                curr_params, ['channel', 'mode', 'chain_mask'])
708            curr_config_id = tuple(curr_config.items())
709            if curr_config_id in range_vs_angle:
710                if curr_params['position'] not in range_vs_angle[
711                        curr_config_id]['position']:
712                    range_vs_angle[curr_config_id]['position'].append(
713                        curr_params['position'])
714                    range_vs_angle[curr_config_id]['range'].append(
715                        test['range'])
716                    range_vs_angle[curr_config_id]['llstats_at_range'].append(
717                        test['llstats_at_range'])
718                else:
719                    range_vs_angle[curr_config_id]['range'][-1] = test['range']
720                    range_vs_angle[curr_config_id]['llstats_at_range'][
721                        -1] = test['llstats_at_range']
722            else:
723                range_vs_angle[curr_config_id] = {
724                    'position': [curr_params['position']],
725                    'range': [test['range']],
726                    'llstats_at_range': [test['llstats_at_range']]
727                }
728        chamber_mode = self.testclass_results[0]['testcase_params'][
729            'chamber_mode']
730        if chamber_mode == 'orientation':
731            x_label = 'Angle (deg)'
732        elif chamber_mode == 'stepped stirrers':
733            x_label = 'Position Index'
734        figure = BokehFigure(
735            title='Range vs. Position',
736            x_label=x_label,
737            primary_y_label='Range (dB)',
738        )
739        for curr_config_id, curr_config_data in range_vs_angle.items():
740            curr_config = collections.OrderedDict(curr_config_id)
741            figure.add_line(x_data=curr_config_data['position'],
742                            y_data=curr_config_data['range'],
743                            hover_text=curr_config_data['llstats_at_range'],
744                            legend='{}'.format(curr_config_id))
745            average_range = sum(curr_config_data['range']) / len(
746                curr_config_data['range'])
747            self.log.info('Average range for {} is: {}dB'.format(
748                curr_config_id, average_range))
749            metric_name = 'ota_summary_ch{}_{}_ch{}.avg_range'.format(
750                curr_config['channel'], curr_config['mode'],
751                curr_config['chain_mask'])
752            self.testclass_metric_logger.add_metric(metric_name, average_range)
753        current_context = context.get_current_context().get_full_output_path()
754        plot_file_path = os.path.join(current_context, 'results.html')
755        figure.generate_figure(plot_file_path)
756
757        # Save results
758        results_file_path = os.path.join(current_context,
759                                         'testclass_summary.json')
760        with open(results_file_path, 'w') as results_file:
761            json.dump(wputils.serialize_dict(range_vs_angle),
762                      results_file,
763                      indent=4)
764
765    def setup_dut(self, testcase_params):
766        """Sets up the DUT in the configuration required by the test.
767
768        Args:
769            testcase_params: dict containing AP and other test params
770        """
771        wputils.set_chain_mask(self.dut, testcase_params['chain_mask'])
772        # Turn screen off to preserve battery
773        if self.testbed_params.get('screen_on',
774                                   False) or self.testclass_params.get(
775                                       'screen_on', False):
776            self.dut.droid.wakeLockAcquireDim()
777        else:
778            self.dut.go_to_sleep()
779        self.validate_and_connect(testcase_params)
780        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
781
782    def setup_ping_test(self, testcase_params):
783        # Setup turntable
784        if testcase_params['chamber_mode'] == 'orientation':
785            self.ota_chamber.set_orientation(testcase_params['position'])
786        elif testcase_params['chamber_mode'] == 'stepped stirrers':
787            self.ota_chamber.step_stirrers(testcase_params['total_positions'])
788        # Continue setting up ping test
789        WifiPingTest.setup_ping_test(self, testcase_params)
790
791    def get_range_start_atten(self, testcase_params):
792        """Gets the starting attenuation for this ping test.
793
794        The function gets the starting attenuation by checking whether a test
795        at the same configuration has executed. If so it sets the starting
796        point a configurable number of dBs below the reference test.
797
798        Returns:
799            start_atten: starting attenuation for current test
800        """
801        # If the test is being retried, start from the beginning
802        if self.retry_flag:
803            self.log.info('Retry flag set. Setting attenuation to minimum.')
804            return self.testclass_params['range_atten_start']
805        # Get the current and reference test config. The reference test is the
806        # one performed at the current MCS+1
807        ref_test_params = wputils.extract_sub_dict(
808            testcase_params, ['channel', 'mode', 'chain_mask'])
809        # Check if reference test has been run and set attenuation accordingly
810        previous_params = [
811            wputils.extract_sub_dict(result['testcase_params'],
812                                     ['channel', 'mode', 'chain_mask'])
813            for result in self.testclass_results
814        ]
815        try:
816            ref_index = previous_params[::-1].index(ref_test_params)
817            ref_index = len(previous_params) - 1 - ref_index
818            start_atten = self.testclass_results[ref_index][
819                'atten_at_range'] - (
820                    self.testclass_params['adjacent_range_test_gap'])
821        except ValueError:
822            self.log.info(
823                'Reference test not found. Starting from {} dB'.format(
824                    self.testclass_params['range_atten_start']))
825            start_atten = self.testclass_params['range_atten_start']
826        return start_atten
827
828    def generate_test_cases(self, ap_power, channels, modes, chain_masks,
829                            chamber_mode, positions):
830        test_cases = []
831        allowed_configs = {
832            20: [
833                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
834                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
835            ],
836            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
837            80: [36, 100, 149, '6g37', '6g117', '6g213'],
838            160: [36, '6g37', '6g117', '6g213']
839        }
840        for channel, mode, chain_mask, position in itertools.product(
841                channels, modes, chain_masks, positions):
842            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
843            if channel not in allowed_configs[bandwidth]:
844                continue
845            testcase_name = 'test_ping_range_ch{}_{}_ch{}_pos{}'.format(
846                channel, mode, chain_mask, position)
847            testcase_params = collections.OrderedDict(
848                test_type='test_ping_range',
849                ap_power=ap_power,
850                channel=channel,
851                mode=mode,
852                bandwidth=bandwidth,
853                chain_mask=chain_mask,
854                chamber_mode=chamber_mode,
855                total_positions=len(positions),
856                position=position)
857            setattr(self, testcase_name,
858                    partial(self._test_ping, testcase_params))
859            test_cases.append(testcase_name)
860        return test_cases
861
862
863class WifiOtaPing_TenDegree_Test(WifiOtaPingTest):
864    def __init__(self, controllers):
865        WifiOtaPingTest.__init__(self, controllers)
866        self.tests = self.generate_test_cases(
867            ap_power='standard',
868            channels=[6, 36, 149, '6g37', '6g117', '6g213'],
869            modes=['bw20'],
870            chain_masks=['2x2'],
871            chamber_mode='orientation',
872            positions=list(range(0, 360, 10)))
873
874
875class WifiOtaPing_45Degree_Test(WifiOtaPingTest):
876    def __init__(self, controllers):
877        WifiOtaPingTest.__init__(self, controllers)
878        self.tests = self.generate_test_cases(ap_power='standard',
879                                              channels=[
880                                                  1, 6, 11, 36, 40, 44, 48,
881                                                  149, 153, 157, 161, '6g37',
882                                                  '6g117', '6g213'
883                                              ],
884                                              modes=['bw20'],
885                                              chain_masks=['2x2'],
886                                              chamber_mode='orientation',
887                                              positions=list(range(0, 360,
888                                                                   45)))
889
890
891class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest):
892    def __init__(self, controllers):
893        WifiOtaPingTest.__init__(self, controllers)
894        self.tests = self.generate_test_cases(ap_power='standard',
895                                              channels=[6, 36, 149],
896                                              modes=['bw20'],
897                                              chain_masks=['2x2'],
898                                              chamber_mode='stepped stirrers',
899                                              positions=list(range(100)))
900
901
902class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest):
903    def __init__(self, controllers):
904        WifiOtaPingTest.__init__(self, controllers)
905        self.tests = self.generate_test_cases(ap_power='low_power',
906                                              channels=[6, 36, 149],
907                                              modes=['bw20'],
908                                              chain_masks=['2x2'],
909                                              chamber_mode='orientation',
910                                              positions=list(range(0, 360,
911                                                                   10)))
912
913
914class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest):
915    def __init__(self, controllers):
916        WifiOtaPingTest.__init__(self, controllers)
917        self.tests = self.generate_test_cases(
918            ap_power='low_power',
919            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
920            modes=['bw20'],
921            chain_masks=['2x2'],
922            chamber_mode='orientation',
923            positions=list(range(0, 360, 45)))
924
925
926class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest):
927    def __init__(self, controllers):
928        WifiOtaPingTest.__init__(self, controllers)
929        self.tests = self.generate_test_cases(ap_power='low_power',
930                                              channels=[6, 36, 149],
931                                              modes=['bw20'],
932                                              chain_masks=['2x2'],
933                                              chamber_mode='stepped stirrers',
934                                              positions=list(range(100)))
935
936
937class WifiOtaPing_LowPowerAP_PerChain_TenDegree_Test(WifiOtaPingTest):
938    def __init__(self, controllers):
939        WifiOtaPingTest.__init__(self, controllers)
940        self.tests = self.generate_test_cases(ap_power='low_power',
941                                              channels=[6, 36, 149],
942                                              modes=['bw20'],
943                                              chain_masks=[0, 1, '2x2'],
944                                              chamber_mode='orientation',
945                                              positions=list(range(0, 360,
946                                                                   10)))
947
948
949class WifiOtaPing_PerChain_TenDegree_Test(WifiOtaPingTest):
950    def __init__(self, controllers):
951        WifiOtaPingTest.__init__(self, controllers)
952        self.tests = self.generate_test_cases(
953            ap_power='standard',
954            channels=[6, 36, 149, '6g37', '6g117', '6g213'],
955            modes=['bw20'],
956            chain_masks=[0, 1, '2x2'],
957            chamber_mode='orientation',
958            positions=list(range(0, 360, 10)))
959