• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import error, logging, os, serial, shutil, threading, time
7
8_power_play_data_file = '/tmp/power_play_data'
9
10class PowerPlay(object):
11    """Class to record serial over USB data from Power Play (go/powerplay).
12
13    It detects if powerplay is connected to the DUT over USB and opens the
14    serial port to start receiving powerplay data. It also opens a text file to
15    save this data after some formatting.
16    """
17
18    version = 1
19
20    def __init__(self, test_obj, record_interval=0):
21        """Initialize PowerPlay.
22
23        @param test_obj: test object.
24        @param record_interval: Power play data recording interval in seconds.
25        """
26        self.test = test_obj
27        self.ser = None
28        self.recording_interval = record_interval
29        self.momentary_curr_list = list()
30        self.record_thread = None
31
32    def extract_current(self, pp_data):
33        """Extract momentary current value from each line of powerplay data.
34
35        @param pp_data: Single line of powerplay data with eight comma separated
36                        values.
37        @return list containing momentary current values.
38        """
39        if pp_data[0].isdigit():
40            self.momentary_curr_list.append(float(pp_data[pp_data.index(',')+1:]
41                    [:pp_data[pp_data.index(',')+1:].index(',')]))
42        return self.momentary_curr_list
43
44    def start_recording_power_play_data(self):
45        """Starts a new thread to record power play data."""
46        self.record_thread = threading.Thread(target=self.start_record_thread)
47        self.record_thread.daemon = True
48        self.record_thread.start()
49
50    def start_record_thread(self):
51        """Start recording power play data.
52
53        Get a list of connected USB devices and try to establish a serial
54        connection. Once the connection is established, open a text file and
55        start reading serial data and write it to the text file after some
56        formatting.
57        """
58        devices = [x for x in os.listdir('/dev/') if x.startswith('ttyUSB')]
59
60        for device in devices:
61            device_link = '/dev/' + device
62            try:
63                if self.ser == None:
64                    logging.info('Trying ... %s', device_link)
65                    self.ser = serial.Serial(device_link, 115200)
66                    logging.info('Successfully connected to %s', device_link)
67                    break
68            except serial.SerialException as e:
69                raise error.TestError('Failed to connect to %s becuase of %s' %
70                                     (device_link, str(e)))
71
72        self.text_file = open(_power_play_data_file, 'w')
73
74        if self.ser != None:
75            title_row = ('time,powerplay_timestamp,momentary_current (A),' +
76                    'momentary_charge (AH),average_current (A),' +
77                    'total_standby_time,total_wake_time,num_wakes,is_awake?\n')
78            self.text_file.write(title_row)
79            start_time = time.time()
80            while self.ser.readline():
81                current_timestamp = (('{:>10.3f}'.
82                        format(time.time() - start_time)).replace(' ', ''))
83                pp_data = (self.ser.readline().replace('\00', '').
84                        replace(' ', ',').replace('\r', ''))
85                if (not pp_data.startswith('#') and (len(pp_data) > 30) and
86                        not self.text_file.closed):
87                    self.text_file.write(current_timestamp + ',' + pp_data)
88                    self.momentary_curr_list = self.extract_current(pp_data)
89                time.sleep(self.recording_interval)
90                self.ser.flushInput()
91        else:
92            self.text_file.write('No data from powerplay. Check connection.')
93
94    def stop_recording_power_play_data(self):
95        """Stop recording power play data.
96
97        Close the text file and copy it to the test log results directory. Also
98        report current data to the performance dashboard.
99        """
100        if not self.text_file.closed:
101            self.text_file.close()
102        shutil.copy(_power_play_data_file, self.test.resultsdir)
103        self.test.output_perf_value(description='momentary_current_draw',
104                               value=self.momentary_curr_list,
105                               units='Amps', higher_is_better=False)
106