• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import logging
21import numpy
22import os
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers import iperf_client
28from acts.controllers.utils_lib import ssh
29from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
30from acts_contrib.test_utils.wifi import ota_chamber
31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
33from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
34from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
35from acts_contrib.test_utils.wifi import ota_sniffer
36from functools import partial
37from WifiRvrTest import WifiRvrTest
38from WifiPingTest import WifiPingTest
39
40
41class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
42    """Class to test WiFi sensitivity tests.
43
44    This class implements measures WiFi sensitivity per rate. It heavily
45    leverages the WifiRvrTest class and introduced minor differences to set
46    specific rates and the access point, and implements a different pass/fail
47    check. For an example config file to run this test class see
48    example_connectivity_performance_ap_sta.json.
49    """
50
51    MAX_CONSECUTIVE_ZEROS = 5
52    RSSI_POLL_INTERVAL = 0.2
53    VALID_TEST_CONFIGS = {
54        1: ['legacy', 'VHT20'],
55        2: ['legacy', 'VHT20'],
56        6: ['legacy', 'VHT20'],
57        10: ['legacy', 'VHT20'],
58        11: ['legacy', 'VHT20'],
59        36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
60        40: ['legacy', 'VHT20'],
61        44: ['legacy', 'VHT20'],
62        48: ['legacy', 'VHT20'],
63        149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
64        153: ['legacy', 'VHT20'],
65        157: ['legacy', 'VHT20'],
66        161: ['legacy', 'VHT20']
67    }
68    RateTuple = collections.namedtuple(('RateTuple'),
69                                       ['mcs', 'streams', 'data_rate'])
70    #yapf:disable
71    VALID_RATES = {
72        'legacy_2GHz': [
73            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
74            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
75            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
76            RateTuple(11, 1, 11), RateTuple(9, 1, 9),
77            RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
78            RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
79        'legacy_5GHz': [
80            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
81            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
82            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
83            RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
84        'HT20': [
85            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
86            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
87            RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
88            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
89            RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
90            RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
91            RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
92            RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
93        'VHT20': [
94            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
95            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
96            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
97            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
98            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
99            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
100            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
101            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
102            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
103            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
104        'VHT40': [
105            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
106            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
107            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
108            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
109            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
110            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
111            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
112            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
113            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
114            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
115        'VHT80': [
116            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
117            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
118            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
119            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
120            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
121            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
122            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
123            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
124            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
125            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
126    }
127    #yapf:enable
128
129    def __init__(self, controllers):
130        base_test.BaseTestClass.__init__(self, controllers)
131        self.testcase_metric_logger = (
132            BlackboxMappedMetricLogger.for_test_case())
133        self.testclass_metric_logger = (
134            BlackboxMappedMetricLogger.for_test_class())
135        self.publish_testcase_metrics = True
136
137    def setup_class(self):
138        """Initializes common test hardware and parameters.
139
140        This function initializes hardwares and compiles parameters that are
141        common to all tests in this class.
142        """
143        self.dut = self.android_devices[-1]
144        self.sta_dut = self.android_devices[-1]
145        req_params = [
146            'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
147            'RemoteServer'
148        ]
149        opt_params = ['main_network', 'OTASniffer']
150        self.unpack_userparams(req_params, opt_params)
151        self.testclass_params = self.sensitivity_test_params
152        self.num_atten = self.attenuators[0].instrument.num_atten
153        self.ping_server = ssh.connection.SshConnection(
154            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
155        if hasattr(self,
156                   'OTASniffer') and self.testbed_params['sniffer_enable']:
157            try:
158                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
159            except:
160                self.log.warning('Could not start sniffer. Disabling sniffs.')
161                self.testbed_params['sniffer_enable'] = 0
162        self.remote_server = self.ping_server
163        self.iperf_server = self.iperf_servers[0]
164        self.iperf_client = self.iperf_clients[0]
165        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
166        self.log.info('Access Point Configuration: {}'.format(
167            self.access_point.ap_settings))
168        self.log_path = os.path.join(logging.log_path, 'results')
169        os.makedirs(self.log_path, exist_ok=True)
170        self.atten_dut_chain_map = {}
171        self.testclass_results = []
172
173        # Turn WiFi ON
174        if self.testclass_params.get('airplane_mode', 1):
175            self.log.info('Turning on airplane mode.')
176            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
177                                'Can not turn on airplane mode.')
178        wutils.wifi_toggle_state(self.dut, True)
179
180        # Configure test retries
181        self.user_params['retry_tests'] = [self.__class__.__name__]
182
183    def teardown_class(self):
184        self.access_point.teardown()
185        # Turn WiFi OFF
186        for dev in self.android_devices:
187            wutils.wifi_toggle_state(dev, False)
188            dev.go_to_sleep()
189        self.process_testclass_results()
190
191    def setup_test(self):
192        self.retry_flag = False
193
194    def teardown_test(self):
195        self.retry_flag = False
196
197    def on_retry(self):
198        """Function to control test logic on retried tests.
199
200        This function is automatically executed on tests that are being
201        retried. In this case the function resets wifi, toggles it off and on
202        and sets a retry_flag to enable further tweaking the test logic on
203        second attempts.
204        """
205        self.retry_flag = True
206        for dev in self.android_devices:
207            wutils.reset_wifi(dev)
208            wutils.toggle_wifi_off_and_on(dev)
209
210    def pass_fail_check(self, result):
211        """Checks sensitivity results and decides on pass/fail.
212
213        Args:
214            result: dict containing attenuation, throughput and other meta
215                data
216        """
217        result_string = ('Throughput = {}%, Sensitivity = {}.'.format(
218            result['peak_throughput_pct'], result['sensitivity']))
219        if result['peak_throughput_pct'] < 95:
220            asserts.fail('Result unreliable. {}'.format(result_string))
221        else:
222            asserts.explicit_pass('Test Passed. {}'.format(result_string))
223
224    def plot_per_curves(self):
225        """Plots PER curves to help debug sensitivity."""
226
227        plots = collections.OrderedDict()
228        id_fields = ['channel', 'mode', 'num_streams']
229        for result in self.testclass_results:
230            testcase_params = result['testcase_params']
231            plot_id = self.extract_test_id(testcase_params, id_fields)
232            plot_id = tuple(plot_id.items())
233            if plot_id not in plots:
234                plots[plot_id] = BokehFigure(
235                    title='Channel {} {} Nss{}'.format(
236                        result['testcase_params']['channel'],
237                        result['testcase_params']['mode'],
238                        result['testcase_params']['num_streams']),
239                    x_label='Attenuation (dB)',
240                    primary_y_label='PER (%)')
241            per = [stat['summary']['rx_per'] for stat in result['llstats']]
242            if len(per) < len(result['total_attenuation']):
243                per.extend([100] *
244                           (len(result['total_attenuation']) - len(per)))
245            plots[plot_id].add_line(result['total_attenuation'], per,
246                                    result['test_name'])
247        figure_list = []
248        for plot_id, plot in plots.items():
249            plot.generate_figure()
250            figure_list.append(plot)
251        output_file_path = os.path.join(self.log_path, 'results.html')
252        BokehFigure.save_figures(figure_list, output_file_path)
253
254    def process_testclass_results(self):
255        """Saves and plots test results from all executed test cases."""
256        # write json output
257        self.plot_per_curves()
258        testclass_results_dict = collections.OrderedDict()
259        id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
260        channels_tested = []
261        for result in self.testclass_results:
262            testcase_params = result['testcase_params']
263            test_id = self.extract_test_id(testcase_params, id_fields)
264            test_id = tuple(test_id.items())
265            if test_id not in testclass_results_dict:
266                testclass_results_dict[test_id] = collections.OrderedDict()
267            channel = testcase_params['channel']
268            if channel not in channels_tested:
269                channels_tested.append(channel)
270            if result['peak_throughput_pct'] >= 95:
271                testclass_results_dict[test_id][channel] = result[
272                    'sensitivity']
273            else:
274                testclass_results_dict[test_id][channel] = ''
275
276        # calculate average metrics
277        metrics_dict = collections.OrderedDict()
278        id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
279        for test_id in testclass_results_dict.keys():
280            for channel in testclass_results_dict[test_id].keys():
281                metric_tag = collections.OrderedDict(test_id, channel=channel)
282                metric_tag = self.extract_test_id(metric_tag, id_fields)
283                metric_tag = tuple(metric_tag.items())
284                metrics_dict.setdefault(metric_tag, [])
285                sensitivity_result = testclass_results_dict[test_id][channel]
286                if sensitivity_result != '':
287                    metrics_dict[metric_tag].append(sensitivity_result)
288        for metric_tag_tuple, metric_data in metrics_dict.items():
289            metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
290            metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
291                metric_tag_dict['channel'], metric_tag_dict['mode'],
292                metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
293            metric_key = '{}.avg_sensitivity'.format(metric_tag)
294            metric_value = numpy.mean(metric_data)
295            self.testclass_metric_logger.add_metric(metric_key, metric_value)
296
297        # write csv
298        csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
299        for channel in channels_tested:
300            csv_header.append('Ch. ' + str(channel))
301        results_file_path = os.path.join(self.log_path, 'results.csv')
302        with open(results_file_path, mode='w') as csv_file:
303            writer = csv.DictWriter(csv_file, fieldnames=csv_header)
304            writer.writeheader()
305            for test_id, test_results in testclass_results_dict.items():
306                test_id_dict = dict(test_id)
307                if 'legacy' in test_id_dict['mode']:
308                    rate_list = self.VALID_RATES['legacy_2GHz']
309                else:
310                    rate_list = self.VALID_RATES[test_id_dict['mode']]
311                data_rate = next(rate.data_rate for rate in rate_list
312                                 if rate[:-1] == (test_id_dict['rate'],
313                                                  test_id_dict['num_streams']))
314                row_value = {
315                    'Mode': test_id_dict['mode'],
316                    'MCS': test_id_dict['rate'],
317                    'Streams': test_id_dict['num_streams'],
318                    'Chain': test_id_dict['chain_mask'],
319                    'Rate (Mbps)': data_rate,
320                }
321                for channel in channels_tested:
322                    row_value['Ch. ' + str(channel)] = test_results.pop(
323                        channel, ' ')
324                writer.writerow(row_value)
325
326        if not self.testclass_params['traffic_type'].lower() == 'ping':
327            WifiRvrTest.process_testclass_results(self)
328
329    def process_rvr_test_results(self, testcase_params, rvr_result):
330        """Post processes RvR results to compute sensitivity.
331
332        Takes in the results of the RvR tests and computes the sensitivity of
333        the current rate by looking at the point at which throughput drops
334        below the percentage specified in the config file. The function then
335        calls on its parent class process_test_results to plot the result.
336
337        Args:
338            rvr_result: dict containing attenuation, throughput and other meta
339            data
340        """
341        rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
342        rvr_result['peak_throughput_pct'] = 100
343        throughput_check = [
344            throughput < rvr_result['peak_throughput'] *
345            (self.testclass_params['throughput_pct_at_sensitivity'] / 100)
346            for throughput in rvr_result['throughput_receive']
347        ]
348        consistency_check = [
349            idx for idx in range(len(throughput_check))
350            if all(throughput_check[idx:])
351        ]
352        rvr_result['atten_at_range'] = rvr_result['attenuation'][
353            consistency_check[0] - 1]
354        rvr_result['range'] = rvr_result['fixed_attenuation'] + (
355            rvr_result['atten_at_range'])
356        rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
357            self.testbed_params['ap_tx_power_offset'][str(
358                testcase_params['channel'])] - rvr_result['range'])
359        WifiRvrTest.process_test_results(self, rvr_result)
360
361    def process_ping_test_results(self, testcase_params, ping_result):
362        """Post processes RvR results to compute sensitivity.
363
364        Takes in the results of the RvR tests and computes the sensitivity of
365        the current rate by looking at the point at which throughput drops
366        below the percentage specified in the config file. The function then
367        calls on its parent class process_test_results to plot the result.
368
369        Args:
370            rvr_result: dict containing attenuation, throughput and other meta
371            data
372        """
373        WifiPingTest.process_ping_results(self, testcase_params, ping_result)
374        ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
375            self.testbed_params['ap_tx_power_offset'][str(
376                testcase_params['channel'])] - ping_result['range'])
377
378    def setup_sensitivity_test(self, testcase_params):
379        # Setup test
380        if testcase_params['traffic_type'].lower() == 'ping':
381            self.setup_ping_test(testcase_params)
382            self.run_sensitivity_test = self.run_ping_test
383            self.process_sensitivity_test_results = (
384                self.process_ping_test_results)
385        else:
386            self.setup_rvr_test(testcase_params)
387            self.run_sensitivity_test = self.run_rvr_test
388            self.process_sensitivity_test_results = (
389                self.process_rvr_test_results)
390
391    def setup_ap(self, testcase_params):
392        """Sets up the AP and attenuator to compensate for AP chain imbalance.
393
394        Args:
395            testcase_params: dict containing AP and other test params
396        """
397        band = self.access_point.band_lookup_by_channel(
398            testcase_params['channel'])
399        if '2G' in band:
400            frequency = wutils.WifiEnums.channel_2G_to_freq[
401                testcase_params['channel']]
402        else:
403            frequency = wutils.WifiEnums.channel_5G_to_freq[
404                testcase_params['channel']]
405        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
406            self.access_point.set_region(self.testbed_params['DFS_region'])
407        else:
408            self.access_point.set_region(self.testbed_params['default_region'])
409        self.access_point.set_channel(band, testcase_params['channel'])
410        self.access_point.set_bandwidth(band, testcase_params['mode'])
411        self.access_point.set_power(band, testcase_params['ap_tx_power'])
412        self.access_point.set_rate(band, testcase_params['mode'],
413                                   testcase_params['num_streams'],
414                                   testcase_params['rate'],
415                                   testcase_params['short_gi'])
416        # Set attenuator offsets and set attenuators to initial condition
417        atten_offsets = self.testbed_params['chain_offset'][str(
418            testcase_params['channel'])]
419        for atten in self.attenuators:
420            if 'AP-Chain-0' in atten.path:
421                atten.offset = atten_offsets[0]
422            elif 'AP-Chain-1' in atten.path:
423                atten.offset = atten_offsets[1]
424            else:
425                atten.offset = 0
426        self.log.info('Access Point Configuration: {}'.format(
427            self.access_point.ap_settings))
428
429    def setup_dut(self, testcase_params):
430        """Sets up the DUT in the configuration required by the test.
431
432        Args:
433            testcase_params: dict containing AP and other test params
434        """
435        # Turn screen off to preserve battery
436        if self.testbed_params.get('screen_on',
437                                   False) or self.testclass_params.get(
438                                       'screen_on', False):
439            self.dut.droid.wakeLockAcquireDim()
440        else:
441            self.dut.go_to_sleep()
442        if wputils.validate_network(self.dut,
443                                    testcase_params['test_network']['SSID']):
444            self.log.info('Already connected to desired network')
445        else:
446            wutils.wifi_toggle_state(self.dut, False)
447            wutils.set_wifi_country_code(self.dut,
448                                         self.testclass_params['country_code'])
449            wutils.wifi_toggle_state(self.dut, True)
450            wutils.reset_wifi(self.dut)
451            wutils.set_wifi_country_code(self.dut,
452                                         self.testclass_params['country_code'])
453            testcase_params['test_network']['channel'] = testcase_params[
454                'channel']
455            wutils.wifi_connect(self.dut,
456                                testcase_params['test_network'],
457                                num_of_tries=5,
458                                check_connectivity=False)
459        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
460        # Activate/attenuate the correct chains
461        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
462            self.atten_dut_chain_map[testcase_params[
463                'channel']] = wputils.get_current_atten_dut_chain_map(
464                    self.attenuators, self.dut, self.ping_server)
465        self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
466            self.atten_dut_chain_map[testcase_params['channel']]))
467        for idx, atten in enumerate(self.attenuators):
468            if self.atten_dut_chain_map[testcase_params['channel']][
469                    idx] == testcase_params['attenuated_chain']:
470                atten.offset = atten.instrument.max_atten
471
472    def extract_test_id(self, testcase_params, id_fields):
473        test_id = collections.OrderedDict(
474            (param, testcase_params[param]) for param in id_fields)
475        return test_id
476
477    def get_start_atten(self, testcase_params):
478        """Gets the starting attenuation for this sensitivity test.
479
480        The function gets the starting attenuation by checking whether a test
481        as the next higher MCS has been executed. If so it sets the starting
482        point a configurable number of dBs below the next MCS's sensitivity.
483
484        Returns:
485            start_atten: starting attenuation for current test
486        """
487        # If the test is being retried, start from the beginning
488        if self.retry_flag:
489            self.log.info('Retry flag set. Setting attenuation to minimum.')
490            return self.testclass_params['atten_start']
491        # Get the current and reference test config. The reference test is the
492        # one performed at the current MCS+1
493        current_rate = testcase_params['rate']
494        ref_test_params = self.extract_test_id(
495            testcase_params,
496            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
497        if 'legacy' in testcase_params['mode']:
498            if testcase_params['channel'] <= 13:
499                rate_list = self.VALID_RATES['legacy_2GHz']
500            else:
501                rate_list = self.VALID_RATES['legacy_5GHz']
502            ref_index = max(
503                0,
504                rate_list.index(self.RateTuple(current_rate, 1, current_rate))
505                - 1)
506            ref_test_params['rate'] = rate_list[ref_index].mcs
507        else:
508            ref_test_params['rate'] = current_rate + 1
509
510        # Check if reference test has been run and set attenuation accordingly
511        previous_params = [
512            self.extract_test_id(
513                result['testcase_params'],
514                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
515            for result in self.testclass_results
516        ]
517
518        try:
519            ref_index = previous_params.index(ref_test_params)
520            start_atten = self.testclass_results[ref_index][
521                'atten_at_range'] - (
522                    self.testclass_params['adjacent_mcs_range_gap'])
523        except ValueError:
524            self.log.warning(
525                'Reference test not found. Starting from {} dB'.format(
526                    self.testclass_params['atten_start']))
527            start_atten = self.testclass_params['atten_start']
528            start_atten = max(start_atten, 0)
529        return start_atten
530
531    def compile_test_params(self, testcase_params):
532        """Function that generates test params based on the test name."""
533        # Check if test should be skipped.
534        wputils.check_skip_conditions(testcase_params, self.dut,
535                                      self.access_point,
536                                      getattr(self, 'ota_chamber', None))
537
538        band = self.access_point.band_lookup_by_channel(
539            testcase_params['channel'])
540        testcase_params['band'] = band
541        testcase_params['test_network'] = self.main_network[band]
542        if testcase_params['chain_mask'] in ['0', '1']:
543            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
544                1 if testcase_params['chain_mask'] == '0' else 0)
545        else:
546            # Set attenuated chain to -1. Do not set to None as this will be
547            # compared to RF chain map which may include None
548            testcase_params['attenuated_chain'] = -1
549
550        self.testclass_params[
551            'range_ping_loss_threshold'] = 100 - self.testclass_params[
552                'throughput_pct_at_sensitivity']
553        if self.testclass_params['traffic_type'] == 'UDP':
554            testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
555                self.testclass_params['iperf_duration'],
556                self.testclass_params['UDP_rates'][testcase_params['mode']])
557        elif self.testclass_params['traffic_type'] == 'TCP':
558            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
559                self.testclass_params['iperf_duration'])
560
561        if self.testclass_params['traffic_type'] != 'ping' and isinstance(
562                self.iperf_client, iperf_client.IPerfClientOverAdb):
563            testcase_params['iperf_args'] += ' -R'
564            testcase_params['use_client_output'] = True
565        else:
566            testcase_params['use_client_output'] = False
567
568        return testcase_params
569
570    def _test_sensitivity(self, testcase_params):
571        """Function that gets called for each test case
572
573        The function gets called in each rvr test case. The function customizes
574        the rvr test based on the test name of the test that called it
575        """
576        # Compile test parameters from config and test name
577        testcase_params = self.compile_test_params(testcase_params)
578        testcase_params.update(self.testclass_params)
579        testcase_params['atten_start'] = self.get_start_atten(testcase_params)
580        num_atten_steps = int(
581            (testcase_params['atten_stop'] - testcase_params['atten_start']) /
582            testcase_params['atten_step'])
583        testcase_params['atten_range'] = [
584            testcase_params['atten_start'] + x * testcase_params['atten_step']
585            for x in range(0, num_atten_steps)
586        ]
587
588        # Prepare devices and run test
589        self.setup_sensitivity_test(testcase_params)
590        result = self.run_sensitivity_test(testcase_params)
591        self.process_sensitivity_test_results(testcase_params, result)
592
593        # Post-process results
594        self.testclass_results.append(result)
595        self.pass_fail_check(result)
596
597    def generate_test_cases(self, channels, modes, chain_mask):
598        """Function that auto-generates test cases for a test class."""
599        test_cases = []
600        for channel in channels:
601            requested_modes = [
602                mode for mode in modes
603                if mode in self.VALID_TEST_CONFIGS[channel]
604            ]
605            for mode in requested_modes:
606                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
607                if 'VHT' in mode:
608                    rates = self.VALID_RATES[mode]
609                elif 'HT' in mode:
610                    rates = self.VALID_RATES[mode]
611                elif 'legacy' in mode and channel < 14:
612                    rates = self.VALID_RATES['legacy_2GHz']
613                elif 'legacy' in mode and channel > 14:
614                    rates = self.VALID_RATES['legacy_5GHz']
615                else:
616                    raise ValueError('Invalid test mode.')
617                for chain, rate in itertools.product(chain_mask, rates):
618                    testcase_params = collections.OrderedDict(
619                        channel=channel,
620                        mode=mode,
621                        bandwidth=bandwidth,
622                        rate=rate.mcs,
623                        num_streams=rate.streams,
624                        short_gi=1,
625                        chain_mask=chain)
626                    if chain in ['0', '1'] and rate[1] == 2:
627                        # Do not test 2-stream rates in single chain mode
628                        continue
629                    if 'legacy' in mode:
630                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
631                                         '_ch{}'.format(
632                                             channel, mode,
633                                             str(rate.mcs).replace('.', 'p'),
634                                             rate.streams, chain))
635                    else:
636                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
637                                         '_ch{}'.format(
638                                             channel, mode, rate.mcs,
639                                             rate.streams, chain))
640                    setattr(self, testcase_name,
641                            partial(self._test_sensitivity, testcase_params))
642                    test_cases.append(testcase_name)
643        return test_cases
644
645
646class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
647    def __init__(self, controllers):
648        super().__init__(controllers)
649        self.tests = self.generate_test_cases(
650            [6, 36, 40, 44, 48, 149, 153, 157, 161],
651            ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])
652
653
654class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
655    def __init__(self, controllers):
656        super().__init__(controllers)
657        self.tests = self.generate_test_cases([6, 36, 149],
658                                              ['VHT20', 'VHT40', 'VHT80'],
659                                              ['0', '1', '2x2'])
660
661
662class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
663    def __init__(self, controllers):
664        super().__init__(controllers)
665        self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
666                                              ['0', '1', '2x2'])
667
668
669class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
670    def __init__(self, controllers):
671        super().__init__(controllers)
672        self.tests = self.generate_test_cases(
673            [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
674            ['0', '1', '2x2'])
675
676
677class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
678    def __init__(self, controllers):
679        super().__init__(controllers)
680        self.tests = self.generate_test_cases([36, 40, 44, 48],
681                                              ['VHT20', 'VHT40', 'VHT80'],
682                                              ['0', '1', '2x2'])
683
684
685class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
686    def __init__(self, controllers):
687        super().__init__(controllers)
688        self.tests = self.generate_test_cases([149, 153, 157, 161],
689                                              ['VHT20', 'VHT40', 'VHT80'],
690                                              ['0', '1', '2x2'])
691
692
693# Over-the air version of senstivity tests
694class WifiOtaSensitivityTest(WifiSensitivityTest):
695    """Class to test over-the-air senstivity.
696
697    This class implements measures WiFi sensitivity tests in an OTA chamber.
698    It allows setting orientation and other chamber parameters to study
699    performance in varying channel conditions
700    """
701    def __init__(self, controllers):
702        base_test.BaseTestClass.__init__(self, controllers)
703        self.testcase_metric_logger = (
704            BlackboxMappedMetricLogger.for_test_case())
705        self.testclass_metric_logger = (
706            BlackboxMappedMetricLogger.for_test_class())
707        self.publish_testcase_metrics = False
708
709    def setup_class(self):
710        WifiSensitivityTest.setup_class(self)
711        self.current_chain_mask = '2x2'
712        self.ota_chamber = ota_chamber.create(
713            self.user_params['OTAChamber'])[0]
714
715    def teardown_class(self):
716        WifiSensitivityTest.teardown_class(self)
717        self.ota_chamber.reset_chamber()
718
719    def setup_sensitivity_test(self, testcase_params):
720        # Setup turntable
721        self.ota_chamber.set_orientation(testcase_params['orientation'])
722        # Continue test setup
723        WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)
724
725    def setup_dut(self, testcase_params):
726        """Sets up the DUT in the configuration required by the test.
727
728        Args:
729            testcase_params: dict containing AP and other test params
730        """
731        # Configure the right INI settings
732        wputils.set_chain_mask(self.dut, testcase_params['chain_mask'])
733        # Turn screen off to preserve battery
734        if self.testbed_params.get('screen_on',
735                                   False) or self.testclass_params.get(
736                                       'screen_on', False):
737            self.dut.droid.wakeLockAcquireDim()
738        else:
739            self.dut.go_to_sleep()
740        self.validate_and_connect(testcase_params)
741        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
742
743    def process_testclass_results(self):
744        """Saves and plots test results from all executed test cases."""
745        self.plot_per_curves()
746        testclass_results_dict = collections.OrderedDict()
747        id_fields = ['channel', 'mode', 'rate']
748        plots = []
749        for result in self.testclass_results:
750            test_id = self.extract_test_id(result['testcase_params'],
751                                           id_fields)
752            test_id = tuple(test_id.items())
753            chain_mask = result['testcase_params']['chain_mask']
754            num_streams = result['testcase_params']['num_streams']
755            line_id = (chain_mask, num_streams)
756            if test_id not in testclass_results_dict:
757                testclass_results_dict[test_id] = collections.OrderedDict()
758            if line_id not in testclass_results_dict[test_id]:
759                testclass_results_dict[test_id][line_id] = {
760                    'orientation': [],
761                    'sensitivity': []
762                }
763            orientation = result['testcase_params']['orientation']
764            if result['peak_throughput_pct'] >= 95:
765                sensitivity = result['sensitivity']
766            else:
767                sensitivity = float('nan')
768            if orientation not in testclass_results_dict[test_id][line_id][
769                    'orientation']:
770                testclass_results_dict[test_id][line_id]['orientation'].append(
771                    orientation)
772                testclass_results_dict[test_id][line_id]['sensitivity'].append(
773                    sensitivity)
774            else:
775                testclass_results_dict[test_id][line_id]['sensitivity'][
776                    -1] = sensitivity
777
778        for test_id, test_data in testclass_results_dict.items():
779            test_id_dict = dict(test_id)
780            if 'legacy' in test_id_dict['mode']:
781                test_id_str = 'Channel {} - {} {}Mbps'.format(
782                    test_id_dict['channel'], test_id_dict['mode'],
783                    test_id_dict['rate'])
784            else:
785                test_id_str = 'Channel {} - {} MCS{}'.format(
786                    test_id_dict['channel'], test_id_dict['mode'],
787                    test_id_dict['rate'])
788            curr_plot = BokehFigure(title=str(test_id_str),
789                                    x_label='Orientation (deg)',
790                                    primary_y_label='Sensitivity (dBm)')
791            for line_id, line_results in test_data.items():
792                curr_plot.add_line(line_results['orientation'],
793                                   line_results['sensitivity'],
794                                   legend='Nss{} - Chain Mask {}'.format(
795                                       line_id[1], line_id[0]),
796                                   marker='circle')
797                if 'legacy' in test_id_dict['mode']:
798                    metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
799                        test_id_dict['channel'], test_id_dict['mode'],
800                        test_id_dict['rate'], line_id[0])
801                else:
802                    metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
803                        test_id_dict['channel'], test_id_dict['mode'],
804                        test_id_dict['rate'], line_id[1], line_id[0])
805
806                metric_name = metric_tag + '.avg_sensitivity'
807                metric_value = numpy.nanmean(line_results['sensitivity'])
808                self.testclass_metric_logger.add_metric(
809                    metric_name, metric_value)
810                self.log.info(('Average Sensitivity for {}: {:.1f}').format(
811                    metric_tag, metric_value))
812            current_context = (
813                context.get_current_context().get_full_output_path())
814            output_file_path = os.path.join(current_context,
815                                            str(test_id_str) + '.html')
816            curr_plot.generate_figure(output_file_path)
817            plots.append(curr_plot)
818        output_file_path = os.path.join(current_context, 'results.html')
819        BokehFigure.save_figures(plots, output_file_path)
820
821    def get_start_atten(self, testcase_params):
822        """Gets the starting attenuation for this sensitivity test.
823
824        The function gets the starting attenuation by checking whether a test
825        at the same rate configuration has executed. If so it sets the starting
826        point a configurable number of dBs below the reference test.
827
828        Returns:
829            start_atten: starting attenuation for current test
830        """
831        # If the test is being retried, start from the beginning
832        if self.retry_flag:
833            self.log.info('Retry flag set. Setting attenuation to minimum.')
834            return self.testclass_params['atten_start']
835        # Get the current and reference test config. The reference test is the
836        # one performed at the current MCS+1
837        ref_test_params = self.extract_test_id(
838            testcase_params,
839            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
840        # Check if reference test has been run and set attenuation accordingly
841        previous_params = [
842            self.extract_test_id(
843                result['testcase_params'],
844                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
845            for result in self.testclass_results
846        ]
847        try:
848            ref_index = previous_params[::-1].index(ref_test_params)
849            ref_index = len(previous_params) - 1 - ref_index
850            start_atten = self.testclass_results[ref_index][
851                'atten_at_range'] - (
852                    self.testclass_params['adjacent_mcs_range_gap'])
853        except ValueError:
854            print('Reference test not found. Starting from {} dB'.format(
855                self.testclass_params['atten_start']))
856            start_atten = self.testclass_params['atten_start']
857        start_atten = max(start_atten, 0)
858        return start_atten
859
860    def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
861                            angles):
862        """Function that auto-generates test cases for a test class."""
863        test_cases = []
864        for channel in channels:
865            requested_modes = [
866                mode for mode in modes
867                if mode in self.VALID_TEST_CONFIGS[channel]
868            ]
869            for chain, mode in itertools.product(chain_mask, requested_modes):
870                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
871                if 'VHT' in mode:
872                    valid_rates = self.VALID_RATES[mode]
873                elif 'HT' in mode:
874                    valid_rates = self.VALID_RATES[mode]
875                elif 'legacy' in mode and channel < 14:
876                    valid_rates = self.VALID_RATES['legacy_2GHz']
877                elif 'legacy' in mode and channel > 14:
878                    valid_rates = self.VALID_RATES['legacy_5GHz']
879                else:
880                    raise ValueError('Invalid test mode.')
881                for rate, angle in itertools.product(valid_rates, angles):
882                    testcase_params = collections.OrderedDict(
883                        channel=channel,
884                        mode=mode,
885                        bandwidth=bandwidth,
886                        rate=rate.mcs,
887                        num_streams=rate.streams,
888                        short_gi=1,
889                        chain_mask=chain,
890                        orientation=angle)
891                    if rate not in requested_rates:
892                        continue
893                    if str(chain) in ['0', '1'] and rate[1] == 2:
894                        # Do not test 2-stream rates in single chain mode
895                        continue
896                    if 'legacy' in mode:
897                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
898                                         '_ch{}_{}deg'.format(
899                                             channel, mode,
900                                             str(rate.mcs).replace('.', 'p'),
901                                             rate.streams, chain, angle))
902                    else:
903                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
904                                         '_ch{}_{}deg'.format(
905                                             channel, mode, rate.mcs,
906                                             rate.streams, chain, angle))
907                    setattr(self, testcase_name,
908                            partial(self._test_sensitivity, testcase_params))
909                    test_cases.append(testcase_name)
910        return test_cases
911
912
913class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
914    def __init__(self, controllers):
915        WifiOtaSensitivityTest.__init__(self, controllers)
916        requested_channels = [6, 36, 149]
917        requested_rates = [
918            self.RateTuple(8, 1, 86.7),
919            self.RateTuple(6, 1, 65),
920            self.RateTuple(2, 1, 21.7),
921            self.RateTuple(8, 2, 173.3),
922            self.RateTuple(6, 2, 130.3),
923            self.RateTuple(2, 2, 43.3)
924        ]
925        self.tests = self.generate_test_cases(requested_channels,
926                                              ['VHT20', 'VHT80'],
927                                              requested_rates, ['2x2'],
928                                              list(range(0, 360, 10)))
929
930
931class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
932    def __init__(self, controllers):
933        WifiOtaSensitivityTest.__init__(self, controllers)
934        requested_channels = [6, 36, 149]
935        requested_rates = [
936            self.RateTuple(9, 1, 96),
937            self.RateTuple(9, 2, 192),
938            self.RateTuple(6, 1, 65),
939            self.RateTuple(6, 2, 130.3),
940            self.RateTuple(2, 1, 21.7),
941            self.RateTuple(2, 2, 43.3)
942        ]
943        self.tests = self.generate_test_cases(requested_channels,
944                                              ['VHT20', 'VHT80'],
945                                              requested_rates, [0, 1, '2x2'],
946                                              list(range(0, 360, 10)))
947
948
949class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
950    def __init__(self, controllers):
951        WifiOtaSensitivityTest.__init__(self, controllers)
952        requested_channels = [6, 36, 149]
953        requested_rates = [
954            self.RateTuple(9, 1, 96),
955            self.RateTuple(8, 1, 86.7),
956            self.RateTuple(7, 1, 72.2),
957            self.RateTuple(4, 1, 43.3),
958            self.RateTuple(2, 1, 21.7),
959            self.RateTuple(0, 1, 7.2),
960            self.RateTuple(9, 2, 192),
961            self.RateTuple(8, 2, 173.3),
962            self.RateTuple(7, 2, 144.4),
963            self.RateTuple(4, 2, 86.7),
964            self.RateTuple(2, 2, 43.3),
965            self.RateTuple(0, 2, 14.4)
966        ]
967        self.tests = self.generate_test_cases(requested_channels,
968                                              ['VHT20', 'VHT80'],
969                                              requested_rates, ['2x2'],
970                                              list(range(0, 360, 30)))
971
972
973class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
974    def __init__(self, controllers):
975        WifiOtaSensitivityTest.__init__(self, controllers)
976        requested_rates = [
977            self.RateTuple(8, 1, 86.7),
978            self.RateTuple(2, 1, 21.7),
979            self.RateTuple(8, 2, 173.3),
980            self.RateTuple(2, 2, 43.3)
981        ]
982        self.tests = self.generate_test_cases(
983            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
984            requested_rates, ['2x2'], list(range(0, 360, 45)))
985