• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import json
17import logging
18import math
19import os
20import re
21import time
22
23import acts.controllers.power_monitor as power_monitor_lib
24import acts.controllers.monsoon as monsoon_controller
25import acts.controllers.iperf_server as ipf
26from acts import asserts
27from acts import base_test
28from acts import utils
29from acts.metrics.loggers.blackbox import BlackboxMetricLogger
30from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger
31from acts_contrib.test_utils.power import plot_utils
32
33RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
34IPERF_TIMEOUT = 180
35THRESHOLD_TOLERANCE_DEFAULT = 0.2
36GET_FROM_PHONE = 'get_from_dut'
37GET_FROM_AP = 'get_from_ap'
38PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2
39MONSOON_MAX_CURRENT = 8.0
40DEFAULT_MONSOON_FREQUENCY = 500
41ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
42MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
43TEMP_FILE = '/sdcard/Download/tmp.log'
44
45
46class ObjNew(object):
47    """Create a random obj with unknown attributes and value.
48
49    """
50    def __init__(self, **kwargs):
51        self.__dict__.update(kwargs)
52
53    def __contains__(self, item):
54        """Function to check if one attribute is contained in the object.
55
56        Args:
57            item: the item to check
58        Return:
59            True/False
60        """
61        return hasattr(self, item)
62
63
64class PowerBaseTest(base_test.BaseTestClass):
65    """Base class for all wireless power related tests.
66
67    """
68    def __init__(self, controllers):
69
70        super().__init__(controllers)
71        self.power_result = BlackboxMetricLogger.for_test_case(
72            metric_name='avg_power')
73        self.start_meas_time = 0
74        self.rockbottom_script = None
75        self.img_name = ''
76        self.dut = None
77        self.power_logger = PowerMetricLogger.for_test_case()
78        self.power_monitor = None
79
80    @property
81    def final_test(self):
82        return len(
83            self.results.requested
84        ) > 0 and self.current_test_name == self.results.requested[-1]
85
86    @property
87    def display_name_test_suite(self):
88        return getattr(self, '_display_name_test_suite',
89                       self.__class__.__name__)
90
91    @display_name_test_suite.setter
92    def display_name_test_suite(self, name):
93        self._display_name_test_suite = name
94
95    @property
96    def display_name_test_case(self):
97        default_test_name = getattr(self, 'test_name', None)
98        return getattr(self, '_display_name_test_case', default_test_name)
99
100    @display_name_test_case.setter
101    def display_name_test_case(self, name):
102        self._display_name_test_case = name
103
104    def initialize_power_monitor(self):
105        """ Initializes the power monitor object.
106
107        Raises an exception if there are no controllers available.
108        """
109        if hasattr(self, 'bitses'):
110            if hasattr(self, 'monsoons'):
111                self.log.info('Destroying monsoon controller.')
112                monsoon_controller.destroy(self.monsoons)
113                time.sleep(2)
114            self.power_monitor = self.bitses[0]
115            self.power_monitor.setup(registry=self.user_params)
116        elif hasattr(self, 'monsoons'):
117            self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade(
118                self.monsoons[0])
119            self.monsoons[0].set_max_current(8.0)
120            self.monsoons[0].set_voltage(self.mon_voltage)
121        else:
122            raise RuntimeError('No power monitors available.')
123
124    def setup_class(self):
125
126        super().setup_class()
127
128        self.log = logging.getLogger()
129        self.tests = self.get_existing_test_names()
130
131        # Obtain test parameters from user_params
132        TEST_PARAMS = self.TAG + '_params'
133        self.test_params = self.user_params.get(TEST_PARAMS, {})
134        if not self.test_params:
135            self.log.warning(TEST_PARAMS + ' was not found in the user '
136                             'parameters defined in the config file.')
137
138        # Override user_param values with test parameters
139        self.user_params.update(self.test_params)
140
141        # Unpack user_params with default values. All the usages of user_params
142        # as self attributes need to be included either as a required parameter
143        # or as a parameter with a default value.
144        req_params = ['custom_files', 'mon_duration']
145        self.unpack_userparams(req_params,
146                               mon_freq=DEFAULT_MONSOON_FREQUENCY,
147                               mon_offset=0,
148                               bug_report=False,
149                               extra_wait=None,
150                               iperf_duration=None,
151                               pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
152                               mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT,
153                               ap_dtim_period=None)
154
155        # Setup the must have controllers, phone and monsoon
156        self.dut = self.android_devices[0]
157        self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
158        os.makedirs(self.mon_data_path, exist_ok=True)
159
160        # Initialize the power monitor object that will be used to measure
161        self.initialize_power_monitor()
162
163        # Unpack the thresholds file or fail class setup if it can't be found
164        for file in self.custom_files:
165            if 'pass_fail_threshold_' + self.dut.model in file:
166                self.threshold_file = file
167                break
168        else:
169            raise RuntimeError('Required test pass/fail threshold file is '
170                               'missing')
171
172        # Unpack the rockbottom script or fail class setup if it can't be found
173        for file in self.custom_files:
174            if 'rockbottom_' + self.dut.model in file:
175                self.rockbottom_script = file
176                break
177        else:
178            raise RuntimeError('Required rockbottom script is missing.')
179
180        # Unpack optional custom files
181        for file in self.custom_files:
182            if 'attenuator_setting' in file:
183                self.attenuation_file = file
184            elif 'network_config' in file:
185                self.network_file = file
186
187        if hasattr(self, 'attenuators'):
188            self.num_atten = self.attenuators[0].instrument.num_atten
189            self.atten_level = self.unpack_custom_file(self.attenuation_file)
190        self.threshold = self.unpack_custom_file(self.threshold_file)
191        self.mon_info = self.create_monsoon_info()
192
193        # Sync device time, timezone and country code
194        utils.require_sl4a((self.dut, ))
195        utils.sync_device_time(self.dut)
196
197        screen_on_img = self.user_params.get('screen_on_img', [])
198        if screen_on_img:
199            img_src = screen_on_img[0]
200            img_dest = '/sdcard/Pictures/'
201            success = self.dut.push_system_file(img_src, img_dest)
202            if success:
203                self.img_name = os.path.basename(img_src)
204
205    def setup_test(self):
206        """Set up test specific parameters or configs.
207
208        """
209        super().setup_test()
210
211        # Reset result variables
212        self.avg_current = 0
213        self.samples = []
214        self.power_result.metric_value = 0
215
216        # Set the device into rockbottom state
217        self.dut_rockbottom()
218
219        # Wait for extra time if needed for the first test
220        if self.extra_wait:
221            self.more_wait_first_test()
222
223    def teardown_test(self):
224        """Tear down necessary objects after test case is finished.
225
226        """
227        self.log.info('Tearing down the test case')
228        self.power_monitor.connect_usb()
229        self.power_logger.set_avg_power(self.power_result.metric_value)
230        self.power_logger.set_avg_current(self.avg_current)
231        self.power_logger.set_voltage(self.mon_voltage)
232        self.power_logger.set_testbed(self.testbed_name)
233
234        # If a threshold was provided, log it in the power proto
235        if self.threshold and self.test_name in self.threshold:
236            avg_current_threshold = self.threshold[self.test_name]
237            self.power_logger.set_avg_current_threshold(avg_current_threshold)
238
239        build_id = self.dut.build_info.get('build_id', '')
240        incr_build_id = self.dut.build_info.get('incremental_build_id', '')
241        branch = self.user_params.get('branch', '')
242        target = self.dut.device_info.get('flavor', '')
243
244        self.power_logger.set_branch(branch)
245        self.power_logger.set_build_id(build_id)
246        self.power_logger.set_incremental_build_id(incr_build_id)
247        self.power_logger.set_target(target)
248
249        # Log the display name of the test suite and test case
250        if self.display_name_test_suite:
251            name = self.display_name_test_suite
252            self.power_logger.set_test_suite_display_name(name)
253
254        if self.display_name_test_case:
255            name = self.display_name_test_case
256            self.power_logger.set_test_case_display_name(name)
257
258        # Take Bugreport
259        if self.bug_report:
260            begin_time = utils.get_current_epoch_time()
261            self.dut.take_bug_report(self.test_name, begin_time)
262
263        # Allow the device to cooldown before executing the next test
264        cooldown = self.test_params.get('cooldown', None)
265        if cooldown and not self.final_test:
266            time.sleep(cooldown)
267
268    def teardown_class(self):
269        """Clean up the test class after tests finish running
270
271        """
272        self.log.info('Tearing down the test class')
273        if self.power_monitor:
274            self.power_monitor.connect_usb()
275
276    def on_fail(self, test_name, begin_time):
277        self.power_logger.set_pass_fail_status('FAIL')
278
279    def on_pass(self, test_name, begin_time):
280        self.power_logger.set_pass_fail_status('PASS')
281
282    def dut_rockbottom(self):
283        """Set the dut to rockbottom state
284
285        """
286        # The rockbottom script might include a device reboot, so it is
287        # necessary to stop SL4A during its execution.
288        self.dut.stop_services()
289        self.log.info('Executing rockbottom script for ' + self.dut.model)
290        os.chmod(self.rockbottom_script, 0o777)
291        os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial,
292                                    self.img_name))
293        # Make sure the DUT is in root mode after coming back
294        self.dut.root_adb()
295        # Restart SL4A
296        self.dut.start_services()
297
298    def unpack_custom_file(self, file, test_specific=True):
299        """Unpack the pass_fail_thresholds from a common file.
300
301        Args:
302            file: the common file containing pass fail threshold.
303            test_specific: if True, returns the JSON element within the file
304                that starts with the test class name.
305        """
306        with open(file, 'r') as f:
307            params = json.load(f)
308        if test_specific:
309            try:
310                return params[self.TAG]
311            except KeyError:
312                pass
313        else:
314            return params
315
316    def decode_test_configs(self, attrs, indices):
317        """Decode the test config/params from test name.
318
319        Remove redundant function calls when tests are similar.
320        Args:
321            attrs: a list of the attrs of the test config obj
322            indices: a list of the location indices of keyword in the test name.
323        """
324        # Decode test parameters for the current test
325        test_params = self.current_test_name.split('_')
326        values = [test_params[x] for x in indices]
327        config_dict = dict(zip(attrs, values))
328        self.test_configs = ObjNew(**config_dict)
329
330    def more_wait_first_test(self):
331        # For the first test, increase the offset for longer wait time
332        if self.current_test_name == self.tests[0]:
333            self.mon_info.offset = self.mon_offset + self.extra_wait
334        else:
335            self.mon_info.offset = self.mon_offset
336
337    def set_attenuation(self, atten_list):
338        """Function to set the attenuator to desired attenuations.
339
340        Args:
341            atten_list: list containing the attenuation for each attenuator.
342        """
343        if len(atten_list) != self.num_atten:
344            raise Exception('List given does not have the correct length')
345        for i in range(self.num_atten):
346            self.attenuators[i].set_atten(atten_list[i])
347
348    def measure_power_and_validate(self):
349        """The actual test flow and result processing and validate.
350
351        """
352        self.collect_power_data()
353        self.pass_fail_check(self.avg_current)
354
355    def collect_power_data(self):
356        """Measure power, plot and take log if needed.
357
358        Returns:
359            A MonsoonResult object.
360        """
361        # Collecting current measurement data and plot
362        samples = self.power_monitor_data_collect_save()
363
364        current = [sample[1] for sample in samples]
365        average_current = sum(current) * 1000 / len(current)
366
367        self.power_result.metric_value = (average_current * self.mon_voltage)
368        self.avg_current = average_current
369
370        plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model,
371                                       self.dut.build_info['build_id'])
372        plot_utils.current_waveform_plot(samples, self.mon_voltage,
373                                         self.mon_info.data_path, plot_title)
374
375        return samples
376
377    def pass_fail_check(self, average_current=None):
378        """Check the test result and decide if it passed or failed.
379
380        The threshold is provided in the config file. In this class, result is
381        current in mA.
382        """
383
384        if not self.threshold or self.test_name not in self.threshold:
385            self.log.error("No threshold is provided for the test '{}' in "
386                           "the configuration file.".format(self.test_name))
387            return
388
389        current_threshold = self.threshold[self.test_name]
390        if average_current:
391            asserts.assert_true(
392                abs(average_current - current_threshold) / current_threshold <
393                self.pass_fail_tolerance,
394                'Measured average current in [{}]: {:.2f}mA, which is '
395                'out of the acceptable range {:.2f}±{:.2f}mA'.format(
396                    self.test_name, average_current, current_threshold,
397                    self.pass_fail_tolerance * current_threshold))
398            asserts.explicit_pass(
399                'Measurement finished for [{}]: {:.2f}mA, which is '
400                'within the acceptable range {:.2f}±{:.2f}'.format(
401                    self.test_name, average_current, current_threshold,
402                    self.pass_fail_tolerance * current_threshold))
403        else:
404            asserts.fail(
405                'Something happened, measurement is not complete, test failed')
406
407    def create_monsoon_info(self):
408        """Creates the config dictionary for monsoon
409
410        Returns:
411            mon_info: Dictionary with the monsoon packet config
412        """
413        mon_info = ObjNew(freq=self.mon_freq,
414                          duration=self.mon_duration,
415                          offset=self.mon_offset,
416                          data_path=self.mon_data_path)
417        return mon_info
418
419    def power_monitor_data_collect_save(self):
420        """Current measurement and save the log file.
421
422        Collect current data using Monsoon box and return the path of the
423        log file. Take bug report if requested.
424
425        Returns:
426            A list of tuples in which the first element is a timestamp and the
427            second element is the sampled current in Amperes at that time.
428        """
429
430        tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
431                                self.dut.build_info['build_id'])
432
433        data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
434
435        # If the specified Monsoon data file already exists (e.g., multiple
436        # measurements in a single test), write the results to a new file with
437        # the postfix "_#".
438        if os.path.exists(data_path):
439            highest_value = 1
440            for filename in os.listdir(os.path.dirname(data_path)):
441                match = re.match(r'{}_(\d+).txt'.format(tag), filename)
442                if match:
443                    highest_value = max(highest_value, int(match.group(1)))
444
445            data_path = os.path.join(self.mon_info.data_path,
446                                     '%s_%s.txt' % (tag, highest_value + 1))
447
448        # Resets the battery status right before the test starts.
449        self.dut.adb.shell(RESET_BATTERY_STATS)
450        self.log.info('Starting power measurement. Duration: {}s. Offset: '
451                      '{}s. Voltage: {} V.'.format(self.mon_info.duration,
452                                                   self.mon_info.offset,
453                                                   self.mon_voltage))
454
455        # TODO(b/155426729): Create an accurate host-to-device time difference
456        # measurement.
457        device_time_cmd = 'echo $EPOCHREALTIME'
458        device_time = self.dut.adb.shell(device_time_cmd)
459        host_time = time.time()
460        self.log.debug('device start time %s, host start time %s', device_time,
461                       host_time)
462        device_to_host_offset = float(device_time) - host_time
463
464        # Start the power measurement using monsoon.
465        self.dut.stop_services()
466        time.sleep(1)
467        self.power_monitor.disconnect_usb()
468        measurement_args = dict(duration=self.mon_info.duration,
469                                measure_after_seconds=self.mon_info.offset,
470                                hz=self.mon_info.freq)
471        self.power_monitor.measure(measurement_args=measurement_args,
472                                   measurement_name=self.test_name,
473                                   start_time=device_to_host_offset,
474                                   monsoon_output_path=data_path)
475        self.power_monitor.release_resources()
476        self.power_monitor.connect_usb()
477        self.dut.wait_for_boot_completion()
478        time.sleep(10)
479        self.dut.start_services()
480
481        return self.power_monitor.get_waveform(file_path=data_path)
482
483    def process_iperf_results(self):
484        """Get the iperf results and process.
485
486        Returns:
487             throughput: the average throughput during tests.
488        """
489        # Get IPERF results and add this to the plot title
490        RESULTS_DESTINATION = os.path.join(
491            self.iperf_server.log_path,
492            'iperf_client_output_{}.log'.format(self.current_test_name))
493        self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION)
494        # Calculate the average throughput
495        if self.use_client_output:
496            iperf_file = RESULTS_DESTINATION
497        else:
498            iperf_file = self.iperf_server.log_files[-1]
499        try:
500            iperf_result = ipf.IPerfResult(iperf_file)
501
502            # Compute the throughput in Mbit/s
503            throughput = (math.fsum(
504                iperf_result.instantaneous_rates[self.start_meas_time:-1]
505            ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1])
506                          ) * 8 * (1.024**2)
507
508            self.log.info('The average throughput is {}'.format(throughput))
509        except ValueError:
510            self.log.warning('Cannot get iperf result. Setting to 0')
511            throughput = 0
512        return throughput
513