• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import error, logging, os, serial, shutil, threading, time
6
7_power_play_data_file = '/tmp/power_play_data'
8
9class PowerPlay(object):
10    """Class to record serial over USB data from Power Play (go/powerplay).
11
12    It detects if powerplay is connected to the DUT over USB and opens the
13    serial port to start receiving powerplay data. It also opens a text file to
14    save this data after some formatting.
15    """
16
17    version = 1
18
19    def __init__(self, test_obj, record_interval=0):
20        """Initialize PowerPlay.
21
22        @param test_obj: test object.
23        @param record_interval: Power play data recording interval in seconds.
24        """
25        self.test = test_obj
26        self.ser = None
27        self.recording_interval = record_interval
28        self.momentary_curr_list = list()
29        self.record_thread = None
30
31    def extract_current(self, pp_data):
32        """Extract momentary current value from each line of powerplay data.
33
34        @param pp_data: Single line of powerplay data with eight comma separated
35                        values.
36        @return list containing momentary current values.
37        """
38        if pp_data[0].isdigit():
39            self.momentary_curr_list.append(float(pp_data[pp_data.index(',')+1:]
40                    [:pp_data[pp_data.index(',')+1:].index(',')]))
41        return self.momentary_curr_list
42
43    def start_recording_power_play_data(self):
44        """Starts a new thread to record power play data."""
45        self.record_thread = threading.Thread(target=self.start_record_thread)
46        self.record_thread.daemon = True
47        self.record_thread.start()
48
49    def start_record_thread(self):
50        """Start recording power play data.
51
52        Get a list of connected USB devices and try to establish a serial
53        connection. Once the connection is established, open a text file and
54        start reading serial data and write it to the text file after some
55        formatting.
56        """
57        devices = [x for x in os.listdir('/dev/') if x.startswith('ttyUSB')]
58
59        for device in devices:
60            device_link = '/dev/' + device
61            try:
62                if self.ser == None:
63                    logging.info('Trying ... %s', device_link)
64                    self.ser = serial.Serial(device_link, 115200)
65                    logging.info('Successfully connected to %s', device_link)
66                    break
67            except serial.SerialException, e:
68                raise error.TestError('Failed to connect to %s becuase of %s' %
69                                     (device_link, str(e)))
70
71        self.text_file = open(_power_play_data_file, 'w')
72
73        if self.ser != None:
74            title_row = ('time,powerplay_timestamp,momentary_current (A),' +
75                    'momentary_charge (AH),average_current (A),' +
76                    'total_standby_time,total_wake_time,num_wakes,is_awake?\n')
77            self.text_file.write(title_row)
78            start_time = time.time()
79            while self.ser.readline():
80                current_timestamp = (('{:>10.3f}'.
81                        format(time.time() - start_time)).replace(' ', ''))
82                pp_data = (self.ser.readline().replace('\00', '').
83                        replace(' ', ',').replace('\r', ''))
84                if (not pp_data.startswith('#') and (len(pp_data) > 30) and
85                        not self.text_file.closed):
86                    self.text_file.write(current_timestamp + ',' + pp_data)
87                    self.momentary_curr_list = self.extract_current(pp_data)
88                time.sleep(self.recording_interval)
89                self.ser.flushInput()
90        else:
91            self.text_file.write('No data from powerplay. Check connection.')
92
93    def stop_recording_power_play_data(self):
94        """Stop recording power play data.
95
96        Close the text file and copy it to the test log results directory. Also
97        report current data to the performance dashboard.
98        """
99        if not self.text_file.closed:
100            self.text_file.close()
101        shutil.copy(_power_play_data_file, self.test.resultsdir)
102        self.test.output_perf_value(description='momentary_current_draw',
103                               value=self.momentary_curr_list,
104                               units='Amps', higher_is_better=False)
105