• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import json
21import logging
22import math
23import numpy
24import os
25import statistics
26from acts import asserts
27from acts import base_test
28from acts import context
29from acts import utils
30from acts.controllers.utils_lib import ssh
31from acts.controllers import iperf_server as ipf
32from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
33from acts_contrib.test_utils.wifi import ota_chamber
34from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
35from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
36from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
37from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
38from concurrent.futures import ThreadPoolExecutor
39from functools import partial
40
41SHORT_SLEEP = 1
42MED_SLEEP = 6
43CONST_3dB = 3.01029995664
44RSSI_ERROR_VAL = float('nan')
45
46
47class WifiRssiTest(base_test.BaseTestClass):
48    """Class to test WiFi RSSI reporting.
49
50    This class tests RSSI reporting on android devices. The class tests RSSI
51    accuracy by checking RSSI over a large attenuation range, checks for RSSI
52    stability over time when attenuation is fixed, and checks that RSSI quickly
53    and reacts to changes attenuation by checking RSSI trajectories over
54    configurable attenuation waveforms.For an example config file to run this
55    test class see example_connectivity_performance_ap_sta.json.
56    """
57
58    def __init__(self, controllers):
59        base_test.BaseTestClass.__init__(self, controllers)
60        self.testcase_metric_logger = (
61            BlackboxMappedMetricLogger.for_test_case())
62        self.testclass_metric_logger = (
63            BlackboxMappedMetricLogger.for_test_class())
64        self.publish_test_metrics = True
65
66    def setup_class(self):
67        self.dut = self.android_devices[0]
68        req_params = [
69            'RemoteServer', 'RetailAccessPoints', 'rssi_test_params',
70            'main_network', 'testbed_params'
71        ]
72        self.unpack_userparams(req_params)
73        self.testclass_params = self.rssi_test_params
74        self.num_atten = self.attenuators[0].instrument.num_atten
75        self.iperf_server = self.iperf_servers[0]
76        self.iperf_client = self.iperf_clients[0]
77        self.remote_server = ssh.connection.SshConnection(
78            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
79        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
80        self.log_path = os.path.join(logging.log_path, 'results')
81        os.makedirs(self.log_path, exist_ok=True)
82        self.log.info('Access Point Configuration: {}'.format(
83            self.access_point.ap_settings))
84        self.testclass_results = []
85
86        # Turn WiFi ON
87        if self.testclass_params.get('airplane_mode', 1):
88            self.log.info('Turning on airplane mode.')
89            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
90                                'Can not turn on airplane mode.')
91        wutils.wifi_toggle_state(self.dut, True)
92
93    def teardown_test(self):
94        self.iperf_server.stop()
95
96    def teardown_class(self):
97        # Turn WiFi OFF and reset AP
98        self.access_point.teardown()
99        for dev in self.android_devices:
100            wutils.wifi_toggle_state(dev, False)
101            dev.go_to_sleep()
102
103    def pass_fail_check_rssi_stability(self, testcase_params,
104                                       postprocessed_results):
105        """Check the test result and decide if it passed or failed.
106
107        Checks the RSSI test result and fails the test if the standard
108        deviation of signal_poll_rssi is beyond the threshold defined in the
109        config file.
110
111        Args:
112            testcase_params: dict containing test-specific parameters
113            postprocessed_results: compiled arrays of RSSI measurements
114        """
115        # Set Blackbox metric values
116        if self.publish_test_metrics:
117            self.testcase_metric_logger.add_metric(
118                'signal_poll_rssi_stdev',
119                max(postprocessed_results['signal_poll_rssi']['stdev']))
120            self.testcase_metric_logger.add_metric(
121                'chain_0_rssi_stdev',
122                max(postprocessed_results['chain_0_rssi']['stdev']))
123            self.testcase_metric_logger.add_metric(
124                'chain_1_rssi_stdev',
125                max(postprocessed_results['chain_1_rssi']['stdev']))
126
127        # Evaluate test pass/fail
128        test_failed = any([
129            stdev > self.testclass_params['stdev_tolerance']
130            for stdev in postprocessed_results['signal_poll_rssi']['stdev']
131        ])
132        test_message = (
133            'RSSI stability {0}. Standard deviation was {1} dB '
134            '(limit {2}), per chain standard deviation [{3}, {4}] dB'.format(
135                'failed' * test_failed + 'passed' * (not test_failed), [
136                    float('{:.2f}'.format(x))
137                    for x in postprocessed_results['signal_poll_rssi']['stdev']
138                ], self.testclass_params['stdev_tolerance'], [
139                    float('{:.2f}'.format(x))
140                    for x in postprocessed_results['chain_0_rssi']['stdev']
141                ], [
142                    float('{:.2f}'.format(x))
143                    for x in postprocessed_results['chain_1_rssi']['stdev']
144                ]))
145        if test_failed:
146            asserts.fail(test_message)
147        asserts.explicit_pass(test_message)
148
149    def pass_fail_check_rssi_accuracy(self, testcase_params,
150                                      postprocessed_results):
151        """Check the test result and decide if it passed or failed.
152
153        Checks the RSSI test result and compares and compute its deviation from
154        the predicted RSSI. This computation is done for all reported RSSI
155        values. The test fails if any of the RSSI values specified in
156        rssi_under_test have an average error beyond what is specified in the
157        configuration file.
158
159        Args:
160            postprocessed_results: compiled arrays of RSSI measurements
161            testcase_params: dict containing params such as list of RSSIs under
162            test, i.e., can cause test to fail and boolean indicating whether
163            to look at absolute RSSI accuracy, or centered RSSI accuracy.
164            Centered accuracy is computed after systematic RSSI shifts are
165            removed.
166        """
167        test_failed = False
168        test_message = ''
169        if testcase_params['absolute_accuracy']:
170            error_type = 'absolute'
171        else:
172            error_type = 'centered'
173
174        for key, val in postprocessed_results.items():
175            # Compute the error metrics ignoring invalid RSSI readings
176            # If all readings invalid, set error to RSSI_ERROR_VAL
177            if 'rssi' in key and 'predicted' not in key:
178                filtered_error = [x for x in val['error'] if not math.isnan(x)]
179                if filtered_error:
180                    avg_shift = statistics.mean(filtered_error)
181                    if testcase_params['absolute_accuracy']:
182                        avg_error = statistics.mean(
183                            [abs(x) for x in filtered_error])
184                    else:
185                        avg_error = statistics.mean(
186                            [abs(x - avg_shift) for x in filtered_error])
187                else:
188                    avg_error = RSSI_ERROR_VAL
189                    avg_shift = RSSI_ERROR_VAL
190                # Set Blackbox metric values
191                if self.publish_test_metrics:
192                    self.testcase_metric_logger.add_metric(
193                        '{}_error'.format(key), avg_error)
194                    self.testcase_metric_logger.add_metric(
195                        '{}_shift'.format(key), avg_shift)
196                # Evaluate test pass/fail
197                rssi_failure = (avg_error >
198                                self.testclass_params['abs_tolerance']
199                                ) or math.isnan(avg_error)
200                if rssi_failure and key in testcase_params['rssi_under_test']:
201                    test_message = test_message + (
202                        '{} failed ({} error = {:.2f} dB, '
203                        'shift = {:.2f} dB)\n').format(key, error_type,
204                                                       avg_error, avg_shift)
205                    test_failed = True
206                elif rssi_failure:
207                    test_message = test_message + (
208                        '{} failed (ignored) ({} error = {:.2f} dB, '
209                        'shift = {:.2f} dB)\n').format(key, error_type,
210                                                       avg_error, avg_shift)
211                else:
212                    test_message = test_message + (
213                        '{} passed ({} error = {:.2f} dB, '
214                        'shift = {:.2f} dB)\n').format(key, error_type,
215                                                       avg_error, avg_shift)
216        if test_failed:
217            asserts.fail(test_message)
218        asserts.explicit_pass(test_message)
219
220    def post_process_rssi_sweep(self, rssi_result):
221        """Postprocesses and saves JSON formatted results.
222
223        Args:
224            rssi_result: dict containing attenuation, rssi and other meta
225            data
226        Returns:
227            postprocessed_results: compiled arrays of RSSI data used in
228            pass/fail check
229        """
230        # Save output as text file
231        results_file_path = os.path.join(self.log_path, self.current_test_name)
232        with open(results_file_path, 'w') as results_file:
233            json.dump(wputils.serialize_dict(rssi_result),
234                      results_file,
235                      indent=4)
236        # Compile results into arrays of RSSIs suitable for plotting
237        # yapf: disable
238        postprocessed_results = collections.OrderedDict(
239            [('signal_poll_rssi', {}),
240             ('signal_poll_avg_rssi', {}),
241             ('scan_rssi', {}),
242             ('chain_0_rssi', {}),
243             ('chain_1_rssi', {}),
244             ('reported_rssi', {}),
245             ('total_attenuation', []),
246             ('predicted_rssi', [])])
247        # yapf: enable
248        for key, val in postprocessed_results.items():
249            if 'scan_rssi' in key:
250                postprocessed_results[key]['data'] = [
251                    x for data_point in rssi_result['rssi_result'] for x in
252                    data_point[key][rssi_result['connected_bssid']]['data']
253                ]
254                postprocessed_results[key]['mean'] = [
255                    x[key][rssi_result['connected_bssid']]['mean']
256                    for x in rssi_result['rssi_result']
257                ]
258                postprocessed_results[key]['stdev'] = [
259                    x[key][rssi_result['connected_bssid']]['stdev']
260                    for x in rssi_result['rssi_result']
261                ]
262            elif 'predicted_rssi' in key:
263                postprocessed_results['total_attenuation'] = [
264                    att + rssi_result['fixed_attenuation'] +
265                    rssi_result['dut_front_end_loss']
266                    for att in rssi_result['attenuation']
267                ]
268                postprocessed_results['predicted_rssi'] = [
269                    rssi_result['ap_tx_power'] - att
270                    for att in postprocessed_results['total_attenuation']
271                ]
272            elif 'rssi' in key:
273                postprocessed_results[key]['data'] = [
274                    x for data_point in rssi_result['rssi_result']
275                    for x in data_point[key]['data']
276                ]
277                postprocessed_results[key]['mean'] = [
278                    x[key]['mean'] for x in rssi_result['rssi_result']
279                ]
280                postprocessed_results[key]['stdev'] = [
281                    x[key]['stdev'] for x in rssi_result['rssi_result']
282                ]
283        # Compute RSSI errors
284        for key, val in postprocessed_results.items():
285            if 'chain' in key:
286                postprocessed_results[key]['error'] = [
287                    postprocessed_results[key]['mean'][idx] + CONST_3dB -
288                    postprocessed_results['predicted_rssi'][idx]
289                    for idx in range(
290                        len(postprocessed_results['predicted_rssi']))
291                ]
292            elif 'rssi' in key and 'predicted' not in key:
293                postprocessed_results[key]['error'] = [
294                    postprocessed_results[key]['mean'][idx] -
295                    postprocessed_results['predicted_rssi'][idx]
296                    for idx in range(
297                        len(postprocessed_results['predicted_rssi']))
298                ]
299        return postprocessed_results
300
301    def plot_rssi_vs_attenuation(self, postprocessed_results):
302        """Function to plot RSSI vs attenuation sweeps
303
304        Args:
305            postprocessed_results: compiled arrays of RSSI data.
306        """
307        figure = BokehFigure(self.current_test_name,
308                             x_label='Attenuation (dB)',
309                             primary_y_label='RSSI (dBm)')
310        figure.add_line(postprocessed_results['total_attenuation'],
311                        postprocessed_results['signal_poll_rssi']['mean'],
312                        'Signal Poll RSSI',
313                        marker='circle')
314        figure.add_line(postprocessed_results['total_attenuation'],
315                        postprocessed_results['scan_rssi']['mean'],
316                        'Scan RSSI',
317                        marker='circle')
318        figure.add_line(postprocessed_results['total_attenuation'],
319                        postprocessed_results['chain_0_rssi']['mean'],
320                        'Chain 0 RSSI',
321                        marker='circle')
322        figure.add_line(postprocessed_results['total_attenuation'],
323                        postprocessed_results['chain_1_rssi']['mean'],
324                        'Chain 1 RSSI',
325                        marker='circle')
326        figure.add_line(postprocessed_results['total_attenuation'],
327                        postprocessed_results['reported_rssi']['mean'],
328                        'Reported RSSI',
329                        marker='circle')
330        figure.add_line(postprocessed_results['total_attenuation'],
331                        postprocessed_results['predicted_rssi'],
332                        'Predicted RSSI',
333                        marker='circle')
334
335        output_file_path = os.path.join(self.log_path,
336                                        self.current_test_name + '.html')
337        figure.generate_figure(output_file_path)
338
339    def plot_rssi_vs_time(self, rssi_result, postprocessed_results,
340                          center_curves):
341        """Function to plot RSSI vs time.
342
343        Args:
344            rssi_result: dict containing raw RSSI data
345            postprocessed_results: compiled arrays of RSSI data
346            center_curvers: boolean indicating whether to shift curves to align
347            them with predicted RSSIs
348        """
349        figure = BokehFigure(
350            self.current_test_name,
351            x_label='Time (s)',
352            primary_y_label=center_curves * 'Centered' + 'RSSI (dBm)',
353        )
354
355        # yapf: disable
356        rssi_time_series = collections.OrderedDict(
357            [('signal_poll_rssi', []),
358             ('signal_poll_avg_rssi', []),
359             ('scan_rssi', []),
360             ('chain_0_rssi', []),
361             ('chain_1_rssi', []),
362             ('reported_rssi', []),
363             ('predicted_rssi', [])])
364        # yapf: enable
365        for key, val in rssi_time_series.items():
366            if 'predicted_rssi' in key:
367                rssi_time_series[key] = [
368                    x for x in postprocessed_results[key] for copies in range(
369                        len(rssi_result['rssi_result'][0]['signal_poll_rssi']
370                            ['data']))
371                ]
372            elif 'rssi' in key:
373                if center_curves:
374                    filtered_error = [
375                        x for x in postprocessed_results[key]['error']
376                        if not math.isnan(x)
377                    ]
378                    if filtered_error:
379                        avg_shift = statistics.mean(filtered_error)
380                    else:
381                        avg_shift = 0
382                    rssi_time_series[key] = [
383                        x - avg_shift
384                        for x in postprocessed_results[key]['data']
385                    ]
386                else:
387                    rssi_time_series[key] = postprocessed_results[key]['data']
388            time_vec = [
389                self.testclass_params['polling_frequency'] * x
390                for x in range(len(rssi_time_series[key]))
391            ]
392            if len(rssi_time_series[key]) > 0:
393                figure.add_line(time_vec, rssi_time_series[key], key)
394
395        output_file_path = os.path.join(self.log_path,
396                                        self.current_test_name + '.html')
397        figure.generate_figure(output_file_path)
398
399    def plot_rssi_distribution(self, postprocessed_results):
400        """Function to plot RSSI distributions.
401
402        Args:
403            postprocessed_results: compiled arrays of RSSI data
404        """
405        monitored_rssis = ['signal_poll_rssi', 'chain_0_rssi', 'chain_1_rssi',
406                           'reported_rssi']
407
408        rssi_dist = collections.OrderedDict()
409        for rssi_key in monitored_rssis:
410            rssi_data = postprocessed_results[rssi_key]
411            rssi_dist[rssi_key] = collections.OrderedDict()
412            unique_rssi = sorted(set(rssi_data['data']))
413            rssi_counts = []
414            for value in unique_rssi:
415                rssi_counts.append(rssi_data['data'].count(value))
416            total_count = sum(rssi_counts)
417            rssi_dist[rssi_key]['rssi_values'] = unique_rssi
418            rssi_dist[rssi_key]['rssi_pdf'] = [
419                x / total_count for x in rssi_counts
420            ]
421            rssi_dist[rssi_key]['rssi_cdf'] = []
422            cum_prob = 0
423            for prob in rssi_dist[rssi_key]['rssi_pdf']:
424                cum_prob += prob
425                rssi_dist[rssi_key]['rssi_cdf'].append(cum_prob)
426
427        figure = BokehFigure(self.current_test_name,
428                             x_label='RSSI (dBm)',
429                             primary_y_label='p(RSSI = x)',
430                             secondary_y_label='p(RSSI <= x)')
431        for rssi_key, rssi_data in rssi_dist.items():
432            figure.add_line(x_data=rssi_data['rssi_values'],
433                            y_data=rssi_data['rssi_cdf'],
434                            legend='{} CDF'.format(rssi_key))
435        output_file_path = os.path.join(self.log_path,
436                                        self.current_test_name + '_dist.html')
437        figure.generate_figure(output_file_path)
438
439    def run_rssi_test(self, testcase_params):
440        """Test function to run RSSI tests.
441
442        The function runs an RSSI test in the current device/AP configuration.
443        Function is called from another wrapper function that sets up the
444        testbed for the RvR test
445
446        Args:
447            testcase_params: dict containing test-specific parameters
448        Returns:
449            rssi_result: dict containing rssi_result and meta data
450        """
451        # Run test and log result
452        rssi_result = collections.OrderedDict()
453        rssi_result['test_name'] = self.current_test_name
454        rssi_result['testcase_params'] = testcase_params
455        rssi_result['ap_settings'] = self.access_point.ap_settings.copy()
456        rssi_result['attenuation'] = list(testcase_params['rssi_atten_range'])
457        rssi_result['connected_bssid'] = self.main_network[
458            testcase_params['band']].get('BSSID', '00:00:00:00')
459        channel_mode_combo = '{}_{}'.format(str(testcase_params['channel']),
460                                            testcase_params['mode'])
461        channel_str = str(testcase_params['channel'])
462        if channel_mode_combo in self.testbed_params['ap_tx_power']:
463            rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][
464                channel_mode_combo]
465        else:
466            rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][
467                str(testcase_params['channel'])]
468        rssi_result['fixed_attenuation'] = self.testbed_params[
469            'fixed_attenuation'][channel_str]
470        rssi_result['dut_front_end_loss'] = self.testbed_params[
471            'dut_front_end_loss'][channel_str]
472
473        self.log.info('Start running RSSI test.')
474        rssi_result['rssi_result'] = []
475        rssi_result['llstats'] = []
476        llstats_obj = wputils.LinkLayerStats(self.dut)
477        # Start iperf traffic if required by test
478        if testcase_params['active_traffic'] and testcase_params[
479                'traffic_type'] == 'iperf':
480            self.iperf_server.start(tag=0)
481            if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
482                iperf_server_address = self.dut_ip
483            else:
484                iperf_server_address = wputils.get_server_address(
485                    self.remote_server, self.dut_ip, '255.255.255.0')
486            executor = ThreadPoolExecutor(max_workers=1)
487            thread_future = executor.submit(
488                self.iperf_client.start, iperf_server_address,
489                testcase_params['iperf_args'], 0,
490                testcase_params['traffic_timeout'] + SHORT_SLEEP)
491            executor.shutdown(wait=False)
492        elif testcase_params['active_traffic'] and testcase_params[
493                'traffic_type'] == 'ping':
494            thread_future = wputils.get_ping_stats_nb(
495                self.remote_server, self.dut_ip,
496                testcase_params['traffic_timeout'], 0.02, 64)
497        else:
498            thread_future = wputils.get_ping_stats_nb(
499                self.remote_server, self.dut_ip,
500                testcase_params['traffic_timeout'], 0.5, 64)
501        llstats_obj.update_stats()
502        for atten in testcase_params['rssi_atten_range']:
503            # Set Attenuation
504            self.log.info('Setting attenuation to {} dB'.format(atten))
505            for attenuator in self.attenuators:
506                attenuator.set_atten(atten)
507            current_rssi = collections.OrderedDict()
508            current_rssi = wputils.get_connected_rssi(
509                self.dut, testcase_params['connected_measurements'],
510                self.testclass_params['polling_frequency'],
511                testcase_params['first_measurement_delay'])
512            current_rssi['scan_rssi'] = wputils.get_scan_rssi(
513                self.dut, testcase_params['tracked_bssid'],
514                testcase_params['scan_measurements'])
515            rssi_result['rssi_result'].append(current_rssi)
516            llstats_obj.update_stats()
517            curr_llstats = llstats_obj.llstats_incremental.copy()
518            rssi_result['llstats'].append(curr_llstats)
519            self.log.info(
520                'Connected RSSI at {0:.2f} dB is {1:.2f} [{2:.2f}, {3:.2f}] dB'
521                .format(atten, current_rssi['signal_poll_rssi']['mean'],
522                        current_rssi['chain_0_rssi']['mean'],
523                        current_rssi['chain_1_rssi']['mean']))
524        # Stop iperf traffic if needed
525        for attenuator in self.attenuators:
526            attenuator.set_atten(0)
527        thread_future.result()
528        if testcase_params['active_traffic'] and testcase_params[
529                'traffic_type'] == 'iperf':
530            self.iperf_server.stop()
531        return rssi_result
532
533    def setup_ap(self, testcase_params):
534        """Function that gets devices ready for the test.
535
536        Args:
537            testcase_params: dict containing test-specific parameters
538        """
539        band = self.access_point.band_lookup_by_channel(
540            testcase_params['channel'])
541        if '6G' in band:
542            frequency = wutils.WifiEnums.channel_6G_to_freq[int(
543                testcase_params['channel'].strip('6g'))]
544        else:
545            if testcase_params['channel'] < 13:
546                frequency = wutils.WifiEnums.channel_2G_to_freq[
547                    testcase_params['channel']]
548            else:
549                frequency = wutils.WifiEnums.channel_5G_to_freq[
550                    testcase_params['channel']]
551        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
552            self.access_point.set_region(self.testbed_params['DFS_region'])
553        else:
554            self.access_point.set_region(self.testbed_params['default_region'])
555        self.access_point.set_channel(testcase_params['band'],
556                                      testcase_params['channel'])
557        self.access_point.set_bandwidth(testcase_params['band'],
558                                        testcase_params['mode'])
559        self.log.info('Access Point Configuration: {}'.format(
560            self.access_point.ap_settings))
561
562    def setup_dut(self, testcase_params):
563        """Sets up the DUT in the configuration required by the test."""
564        # Turn screen off to preserve battery
565        if self.testbed_params.get('screen_on',
566                                   False) or self.testclass_params.get(
567                                       'screen_on', False):
568            self.dut.droid.wakeLockAcquireDim()
569        else:
570            self.dut.go_to_sleep()
571        tune_code = None
572        if 'tune_code' in testcase_params:
573            tune_code= testcase_params['tune_code']
574        elif 'tune_code' in self.testbed_params and int(self.testbed_params['tune_code']['manual_tune_code']):
575            tune_code = self.testbed_params['tune_code'][testcase_params['band']]
576        if tune_code:
577            self.log.info('Forcing antenna tune code.')
578            wputils.write_antenna_tune_code(self.dut, tune_code, readback_check=False)
579        if wputils.validate_network(self.dut,
580                                    testcase_params['test_network']['SSID']):
581            self.log.info('Already connected to desired network')
582        else:
583            wutils.wifi_toggle_state(self.dut, True)
584            wutils.reset_wifi(self.dut)
585            self.main_network[testcase_params['band']][
586                'channel'] = testcase_params['channel']
587            wutils.set_wifi_country_code(self.dut,
588                                         self.testclass_params['country_code'])
589            if self.testbed_params.get('txbf_off', False):
590                wputils.disable_beamforming(self.dut)
591            wutils.wifi_connect(self.dut,
592                                self.main_network[testcase_params['band']],
593                                num_of_tries=5)
594        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
595
596    def setup_rssi_test(self, testcase_params):
597        """Main function to test RSSI.
598
599        The function sets up the AP in the correct channel and mode
600        configuration and called rssi_test to sweep attenuation and measure
601        RSSI
602
603        Args:
604            testcase_params: dict containing test-specific parameters
605        Returns:
606            rssi_result: dict containing rssi_results and meta data
607        """
608        # Configure AP
609        self.setup_ap(testcase_params)
610        # Initialize attenuators
611        for attenuator in self.attenuators:
612            attenuator.set_atten(testcase_params['rssi_atten_range'][0])
613        # Connect DUT to Network
614        self.setup_dut(testcase_params)
615
616    def get_traffic_timeout(self, testcase_params):
617        """Function to comput iperf session length required in RSSI test.
618
619        Args:
620            testcase_params: dict containing test-specific parameters
621        Returns:
622            traffic_timeout: length of iperf session required in rssi test
623        """
624        atten_step_duration = testcase_params['first_measurement_delay'] + (
625            testcase_params['connected_measurements'] *
626            self.testclass_params['polling_frequency']
627        ) + testcase_params['scan_measurements'] * MED_SLEEP
628        timeout = len(testcase_params['rssi_atten_range']
629                      ) * atten_step_duration + MED_SLEEP
630        return timeout
631
632    def compile_rssi_vs_atten_test_params(self, testcase_params):
633        """Function to complete compiling test-specific parameters
634
635        Args:
636            testcase_params: dict containing test-specific parameters
637        """
638        # Check if test should be skipped.
639        wputils.check_skip_conditions(testcase_params, self.dut,
640                                      self.access_point,
641                                      getattr(self, 'ota_chamber', None))
642
643        testcase_params.update(
644            connected_measurements=self.
645            testclass_params['rssi_vs_atten_connected_measurements'],
646            scan_measurements=self.
647            testclass_params['rssi_vs_atten_scan_measurements'],
648            first_measurement_delay=SHORT_SLEEP,
649            absolute_accuracy=1)
650        rssi_under_test = self.testclass_params['rssi_vs_atten_metrics']
651        if self.testclass_params[
652                'rssi_vs_atten_scan_measurements'] == 0 and 'scan_rssi' in rssi_under_test:
653            rssi_under_test.remove('scan_rssi')
654        testcase_params['rssi_under_test'] = rssi_under_test
655
656        testcase_params['band'] = self.access_point.band_lookup_by_channel(
657            testcase_params['channel'])
658        testcase_params['test_network'] = self.main_network[
659            testcase_params['band']]
660        testcase_params['tracked_bssid'] = [
661            self.main_network[testcase_params['band']].get(
662                'BSSID', '00:00:00:00')
663        ]
664
665        testcase_params['rssi_atten_range'] = numpy.arange(
666            self.testclass_params['rssi_vs_atten_start'],
667            self.testclass_params['rssi_vs_atten_stop'],
668            self.testclass_params['rssi_vs_atten_step']).tolist()
669
670        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
671            testcase_params)
672
673        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
674            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
675                testcase_params['traffic_timeout'])
676        else:
677            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
678                testcase_params['traffic_timeout'])
679        return testcase_params
680
681    def compile_rssi_stability_test_params(self, testcase_params):
682        """Function to complete compiling test-specific parameters
683
684        Args:
685            testcase_params: dict containing test-specific parameters
686        """
687        # Check if test should be skipped.
688        wputils.check_skip_conditions(testcase_params, self.dut,
689                                      self.access_point,
690                                      getattr(self, 'ota_chamber', None))
691        testcase_params.update(
692            connected_measurements=int(
693                self.testclass_params['rssi_stability_duration'] /
694                self.testclass_params['polling_frequency']),
695            scan_measurements=0,
696            first_measurement_delay=SHORT_SLEEP,
697            rssi_atten_range=self.testclass_params['rssi_stability_atten'])
698        testcase_params['band'] = self.access_point.band_lookup_by_channel(
699            testcase_params['channel'])
700        testcase_params['test_network'] = self.main_network[
701            testcase_params['band']]
702        testcase_params['tracked_bssid'] = [
703            self.main_network[testcase_params['band']].get(
704                'BSSID', '00:00:00:00')
705        ]
706
707        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
708            testcase_params)
709        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
710            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
711                testcase_params['traffic_timeout'])
712        else:
713            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
714                testcase_params['traffic_timeout'])
715        return testcase_params
716
717    def compile_rssi_tracking_test_params(self, testcase_params):
718        """Function to complete compiling test-specific parameters
719
720        Args:
721            testcase_params: dict containing test-specific parameters
722        """
723        # Check if test should be skipped.
724        wputils.check_skip_conditions(testcase_params, self.dut,
725                                      self.access_point,
726                                      getattr(self, 'ota_chamber', None))
727
728        testcase_params.update(connected_measurements=int(
729            1 / self.testclass_params['polling_frequency']),
730                               scan_measurements=0,
731                               first_measurement_delay=0,
732                               rssi_under_test=['signal_poll_rssi'],
733                               absolute_accuracy=0)
734        testcase_params['band'] = self.access_point.band_lookup_by_channel(
735            testcase_params['channel'])
736        testcase_params['test_network'] = self.main_network[
737            testcase_params['band']]
738        testcase_params['tracked_bssid'] = [
739            self.main_network[testcase_params['band']].get(
740                'BSSID', '00:00:00:00')
741        ]
742
743        rssi_atten_range = []
744        for waveform in self.testclass_params['rssi_tracking_waveforms']:
745            waveform_vector = []
746            for section in range(len(waveform['atten_levels']) - 1):
747                section_limits = waveform['atten_levels'][section:section + 2]
748                up_down = (1 - 2 * (section_limits[1] < section_limits[0]))
749                temp_section = list(
750                    range(section_limits[0], section_limits[1] + up_down,
751                          up_down * waveform['step_size']))
752                temp_section = [
753                    temp_section[idx] for idx in range(len(temp_section))
754                    for n in range(waveform['step_duration'])
755                ]
756                waveform_vector += temp_section
757            waveform_vector = waveform_vector * waveform['repetitions']
758            rssi_atten_range = rssi_atten_range + waveform_vector
759        testcase_params['rssi_atten_range'] = rssi_atten_range
760        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
761            testcase_params)
762
763        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
764            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
765                testcase_params['traffic_timeout'])
766        else:
767            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
768                testcase_params['traffic_timeout'])
769        return testcase_params
770
771    def _test_rssi_vs_atten(self, testcase_params):
772        """Function that gets called for each test case of rssi_vs_atten
773
774        The function gets called in each rssi test case. The function
775        customizes the test based on the test name of the test that called it
776
777        Args:
778            testcase_params: dict containing test-specific parameters
779        """
780        testcase_params = self.compile_rssi_vs_atten_test_params(
781            testcase_params)
782
783        self.setup_rssi_test(testcase_params)
784        rssi_result = self.run_rssi_test(testcase_params)
785        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
786            rssi_result)
787        self.testclass_results.append(rssi_result)
788        self.plot_rssi_vs_attenuation(rssi_result['postprocessed_results'])
789        self.pass_fail_check_rssi_accuracy(
790            testcase_params, rssi_result['postprocessed_results'])
791
792    def _test_rssi_stability(self, testcase_params):
793        """ Function that gets called for each test case of rssi_stability
794
795        The function gets called in each stability test case. The function
796        customizes test based on the test name of the test that called it
797        """
798        testcase_params = self.compile_rssi_stability_test_params(
799            testcase_params)
800
801        self.setup_rssi_test(testcase_params)
802        rssi_result = self.run_rssi_test(testcase_params)
803        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
804            rssi_result)
805        self.testclass_results.append(rssi_result)
806        self.plot_rssi_vs_time(rssi_result,
807                               rssi_result['postprocessed_results'], 1)
808        self.plot_rssi_distribution(rssi_result['postprocessed_results'])
809        self.pass_fail_check_rssi_stability(
810            testcase_params, rssi_result['postprocessed_results'])
811
812    def _test_rssi_tracking(self, testcase_params):
813        """ Function that gets called for each test case of rssi_tracking
814
815        The function gets called in each rssi test case. The function
816        customizes the test based on the test name of the test that called it
817        """
818        testcase_params = self.compile_rssi_tracking_test_params(
819            testcase_params)
820
821        self.setup_rssi_test(testcase_params)
822        rssi_result = self.run_rssi_test(testcase_params)
823        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
824            rssi_result)
825        self.testclass_results.append(rssi_result)
826        self.plot_rssi_vs_time(rssi_result,
827                               rssi_result['postprocessed_results'], 1)
828        self.pass_fail_check_rssi_accuracy(
829            testcase_params, rssi_result['postprocessed_results'])
830
831    def generate_test_cases(self, test_types, channels, modes, traffic_modes):
832        """Function that auto-generates test cases for a test class."""
833        test_cases = []
834        allowed_configs = {
835            20: [
836                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
837                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
838            ],
839            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
840            80: [36, 100, 149, '6g37', '6g117', '6g213'],
841            160: [36, '6g37', '6g117', '6g213']
842        }
843
844        for channel, mode, traffic_mode, test_type in itertools.product(
845                channels, modes, traffic_modes, test_types):
846            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
847            if channel not in allowed_configs[bandwidth]:
848                continue
849            test_name = test_type + '_ch{}_{}_{}'.format(
850                channel, mode, traffic_mode)
851            testcase_params = collections.OrderedDict(
852                channel=channel,
853                mode=mode,
854                active_traffic=(traffic_mode == 'ActiveTraffic'),
855                traffic_type=self.user_params['rssi_test_params']
856                ['traffic_type'],
857            )
858            test_function = getattr(self, '_{}'.format(test_type))
859            setattr(self, test_name, partial(test_function, testcase_params))
860            test_cases.append(test_name)
861        return test_cases
862
863
864class WifiRssi_AllChannels_ActiveTraffic_Test(WifiRssiTest):
865
866    def __init__(self, controllers):
867        super().__init__(controllers)
868        self.tests = self.generate_test_cases(
869            ['test_rssi_stability', 'test_rssi_vs_atten'], [
870                1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
871                '6g213'
872            ], ['bw20', 'bw40', 'bw80', 'bw160'], ['ActiveTraffic'])
873
874
875class WifiRssi_SampleChannels_NoTraffic_Test(WifiRssiTest):
876
877    def __init__(self, controllers):
878        super().__init__(controllers)
879        self.tests = self.generate_test_cases(
880            ['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149, '6g37'],
881            ['bw20', 'bw40', 'bw80', 'bw160'], ['NoTraffic'])
882
883
884class WifiRssiTrackingTest(WifiRssiTest):
885
886    def __init__(self, controllers):
887        super().__init__(controllers)
888        self.tests = self.generate_test_cases(['test_rssi_tracking'],
889                                              [6, 36, 149, '6g37'],
890                                              ['bw20', 'bw40', 'bw80', 'bw160'],
891                                              ['ActiveTraffic', 'NoTraffic'])
892
893
894# Over-the air version of RSSI tests
895class WifiOtaRssiTest(WifiRssiTest):
896    """Class to test over-the-air rssi tests.
897
898    This class implements measures WiFi RSSI tests in an OTA chamber.
899    It allows setting orientation and other chamber parameters to study
900    performance in varying channel conditions
901    """
902
903    def __init__(self, controllers):
904        base_test.BaseTestClass.__init__(self, controllers)
905        self.testcase_metric_logger = (
906            BlackboxMappedMetricLogger.for_test_case())
907        self.testclass_metric_logger = (
908            BlackboxMappedMetricLogger.for_test_class())
909        self.publish_test_metrics = False
910
911    def setup_class(self):
912        WifiRssiTest.setup_class(self)
913        self.ota_chamber = ota_chamber.create(
914            self.user_params['OTAChamber'])[0]
915
916    def teardown_class(self):
917        WifiRssiTest.teardown_class(self)
918        self.ota_chamber.reset_chamber()
919        self.process_testclass_results()
920
921    def teardown_test(self):
922        if self.ota_chamber.current_mode == 'continuous':
923            self.ota_chamber.reset_chamber()
924
925    def extract_test_id(self, testcase_params, id_fields):
926        test_id = collections.OrderedDict(
927            (param, testcase_params[param]) for param in id_fields)
928        return test_id
929
930    def process_testclass_results(self):
931        """Saves all test results to enable comparison."""
932        testclass_data = collections.OrderedDict()
933        for test_result in self.testclass_results:
934            current_params = test_result['testcase_params']
935
936            channel = current_params['channel']
937            channel_data = testclass_data.setdefault(
938                channel,
939                collections.OrderedDict(orientation=[],
940                                        rssi=collections.OrderedDict(
941                                            signal_poll_rssi=[],
942                                            chain_0_rssi=[],
943                                            chain_1_rssi=[])))
944
945            channel_data['orientation'].append(current_params['orientation'])
946            channel_data['rssi']['signal_poll_rssi'].append(
947                test_result['postprocessed_results']['signal_poll_rssi']
948                ['mean'][0])
949            channel_data['rssi']['chain_0_rssi'].append(
950                test_result['postprocessed_results']['chain_0_rssi']['mean']
951                [0])
952            channel_data['rssi']['chain_1_rssi'].append(
953                test_result['postprocessed_results']['chain_1_rssi']['mean']
954                [0])
955
956        # Publish test class metrics
957        for channel, channel_data in testclass_data.items():
958            for rssi_metric, rssi_metric_value in channel_data['rssi'].items():
959                metric_name = 'ota_summary_ch{}.avg_{}'.format(
960                    channel, rssi_metric)
961                metric_value = numpy.mean(rssi_metric_value)
962                self.testclass_metric_logger.add_metric(
963                    metric_name, metric_value)
964
965        # Plot test class results
966        chamber_mode = self.testclass_results[0]['testcase_params'][
967            'chamber_mode']
968        if chamber_mode == 'orientation':
969            x_label = 'Angle (deg)'
970        elif chamber_mode == 'stepped stirrers':
971            x_label = 'Position Index'
972        elif chamber_mode == 'StirrersOn':
973            return
974        plots = []
975        for channel, channel_data in testclass_data.items():
976            current_plot = BokehFigure(
977                title='Channel {} - Rssi vs. Position'.format(channel),
978                x_label=x_label,
979                primary_y_label='RSSI (dBm)',
980            )
981            for rssi_metric, rssi_metric_value in channel_data['rssi'].items():
982                legend = rssi_metric
983                current_plot.add_line(channel_data['orientation'],
984                                      rssi_metric_value, legend)
985            current_plot.generate_figure()
986            plots.append(current_plot)
987        current_context = context.get_current_context().get_full_output_path()
988        plot_file_path = os.path.join(current_context, 'results.html')
989        BokehFigure.save_figures(plots, plot_file_path)
990
991    def setup_rssi_test(self, testcase_params):
992        # Test setup
993        WifiRssiTest.setup_rssi_test(self, testcase_params)
994        if testcase_params['chamber_mode'] == 'StirrersOn':
995            self.ota_chamber.start_continuous_stirrers()
996        else:
997            self.ota_chamber.set_orientation(testcase_params['orientation'])
998
999    def compile_ota_rssi_test_params(self, testcase_params):
1000        """Function to complete compiling test-specific parameters
1001
1002        Args:
1003            testcase_params: dict containing test-specific parameters
1004        """
1005        # Check if test should be skipped.
1006        wputils.check_skip_conditions(testcase_params, self.dut,
1007                                      self.access_point,
1008                                      getattr(self, 'ota_chamber', None))
1009
1010        if 'rssi_over_orientation' in self.test_name:
1011            rssi_test_duration = self.testclass_params[
1012                'rssi_over_orientation_duration']
1013            rssi_ota_test_attenuation = [
1014                self.testclass_params['rssi_ota_test_attenuation']
1015            ]
1016        elif 'rssi_variation' in self.test_name:
1017            rssi_test_duration = self.testclass_params[
1018                'rssi_variation_duration']
1019            rssi_ota_test_attenuation = [
1020                self.testclass_params['rssi_ota_test_attenuation']
1021            ]
1022        elif 'rssi_vs_atten' in self.test_name:
1023            rssi_test_duration = self.testclass_params[
1024                'rssi_over_orientation_duration']
1025            rssi_ota_test_attenuation = numpy.arange(
1026                self.testclass_params['rssi_vs_atten_start'],
1027                self.testclass_params['rssi_vs_atten_stop'],
1028                self.testclass_params['rssi_vs_atten_step']).tolist()
1029
1030        testcase_params.update(connected_measurements=int(
1031            rssi_test_duration / self.testclass_params['polling_frequency']),
1032                               scan_measurements=0,
1033                               first_measurement_delay=SHORT_SLEEP,
1034                               rssi_atten_range=rssi_ota_test_attenuation)
1035        testcase_params['band'] = self.access_point.band_lookup_by_channel(
1036            testcase_params['channel'])
1037        testcase_params['test_network'] = self.main_network[
1038            testcase_params['band']]
1039        testcase_params['tracked_bssid'] = [
1040            self.main_network[testcase_params['band']].get(
1041                'BSSID', '00:00:00:00')
1042        ]
1043
1044        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
1045            testcase_params)
1046        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
1047            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
1048                testcase_params['traffic_timeout'])
1049        else:
1050            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
1051                testcase_params['traffic_timeout'])
1052        return testcase_params
1053
1054    def _test_ota_rssi(self, testcase_params):
1055        testcase_params = self.compile_ota_rssi_test_params(testcase_params)
1056
1057        self.setup_rssi_test(testcase_params)
1058        rssi_result = self.run_rssi_test(testcase_params)
1059        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
1060            rssi_result)
1061        self.testclass_results.append(rssi_result)
1062        self.plot_rssi_vs_time(rssi_result,
1063                               rssi_result['postprocessed_results'], 1)
1064        if 'rssi_vs_atten' in self.test_name:
1065            self.plot_rssi_vs_attenuation(rssi_result['postprocessed_results'])
1066        elif 'rssi_variation' in self.test_name:
1067            self.plot_rssi_distribution(rssi_result['postprocessed_results'])
1068
1069    def generate_test_cases(self, test_types, channels, modes, traffic_modes,
1070                            chamber_modes, orientations):
1071        test_cases = []
1072        allowed_configs = {
1073            20: [
1074                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
1075                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
1076            ],
1077            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
1078            80: [36, 100, 149, '6g37', '6g117', '6g213'],
1079            160: [36, '6g37', '6g117', '6g213']
1080        }
1081
1082        for (channel, mode, traffic, chamber_mode, orientation,
1083             test_type) in itertools.product(channels, modes, traffic_modes,
1084                                             chamber_modes, orientations,
1085                                             test_types):
1086            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
1087            if channel not in allowed_configs[bandwidth]:
1088                continue
1089            test_name = test_type + '_ch{}_{}_{}_{}deg'.format(
1090                channel, mode, traffic, orientation)
1091            testcase_params = collections.OrderedDict(
1092                channel=channel,
1093                mode=mode,
1094                active_traffic=(traffic == 'ActiveTraffic'),
1095                traffic_type=self.user_params['rssi_test_params']
1096                ['traffic_type'],
1097                chamber_mode=chamber_mode,
1098                orientation=orientation)
1099            test_function = self._test_ota_rssi
1100            setattr(self, test_name, partial(test_function, testcase_params))
1101            test_cases.append(test_name)
1102        return test_cases
1103
1104class WifiOtaRssi_StirrerVariation_Test(WifiOtaRssiTest):
1105
1106    def __init__(self, controllers):
1107        WifiRssiTest.__init__(self, controllers)
1108        self.tests = self.generate_test_cases(['test_rssi_variation'],
1109                                              [6, 36, 149, '6g37'],
1110                                              ['bw20', 'bw80', 'bw160'],
1111                                              ['ActiveTraffic'],
1112                                              ['StirrersOn'], [0])
1113
1114
1115class WifiOtaRssi_TenDegree_Test(WifiOtaRssiTest):
1116
1117    def __init__(self, controllers):
1118        WifiRssiTest.__init__(self, controllers)
1119        self.tests = self.generate_test_cases(['test_rssi_over_orientation'],
1120                                              [6, 36, 149, '6g37'],
1121                                              ['bw20', 'bw80', 'bw160'],
1122                                              ['ActiveTraffic'],
1123                                              ['orientation'],
1124                                              list(range(0, 360, 10)))
1125
1126class WifiOtaRssi_TuneCodeSweep_Test(WifiOtaRssiTest):
1127
1128    def __init__(self, controllers):
1129        WifiRssiTest.__init__(self, controllers)
1130        self.tests = self.generate_test_cases('test_rssi_variation',
1131                                              [6, 36, 149, '6g37'],
1132                                              ['bw20', 'bw80', 'bw160'],
1133                                              ['ActiveTraffic'],
1134                                              'StirrersOn', 0)
1135
1136    def generate_test_cases(self, test_type, channels, modes, traffic_modes,
1137                            chamber_mode, orientation):
1138        test_cases = []
1139        allowed_configs = {
1140            20: [
1141                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
1142                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
1143            ],
1144            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
1145            80: [36, 100, 149, '6g37', '6g117', '6g213'],
1146            160: [36, '6g37', '6g117', '6g213']
1147        }
1148        with open(self.user_params['testbed_params']['tune_code_file'],
1149                  'r') as csvfile:
1150            tune_code_configs = csv.DictReader(csvfile)
1151            for (tune_code_config, channel, mode, traffic,
1152                 ) in itertools.product(tune_code_configs, channels, modes, traffic_modes,
1153
1154                                                 ):
1155                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
1156                if channel not in allowed_configs[bandwidth]:
1157                    continue
1158                test_name = test_type + '_ch{}_{}_{}_tc_{}'.format(
1159                    channel, mode, traffic, tune_code_config['band'])
1160                testcase_params = collections.OrderedDict(
1161                    channel=channel,
1162                    mode=mode,
1163                    active_traffic=(traffic == 'ActiveTraffic'),
1164                    traffic_type=self.user_params['rssi_test_params']
1165                    ['traffic_type'],
1166                    chamber_mode=chamber_mode,
1167                    orientation=orientation,
1168                    tune_code = tune_code_config)
1169                test_function = self._test_ota_rssi
1170                setattr(self, test_name, partial(test_function, testcase_params))
1171                test_cases.append(test_name)
1172        return test_cases