# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import error, logging, os, serial, shutil, threading, time _power_play_data_file = '/tmp/power_play_data' class PowerPlay(object): """Class to record serial over USB data from Power Play (go/powerplay). It detects if powerplay is connected to the DUT over USB and opens the serial port to start receiving powerplay data. It also opens a text file to save this data after some formatting. """ version = 1 def __init__(self, test_obj, record_interval=0): """Initialize PowerPlay. @param test_obj: test object. @param record_interval: Power play data recording interval in seconds. """ self.test = test_obj self.ser = None self.recording_interval = record_interval self.momentary_curr_list = list() self.record_thread = None def extract_current(self, pp_data): """Extract momentary current value from each line of powerplay data. @param pp_data: Single line of powerplay data with eight comma separated values. @return list containing momentary current values. """ if pp_data[0].isdigit(): self.momentary_curr_list.append(float(pp_data[pp_data.index(',')+1:] [:pp_data[pp_data.index(',')+1:].index(',')])) return self.momentary_curr_list def start_recording_power_play_data(self): """Starts a new thread to record power play data.""" self.record_thread = threading.Thread(target=self.start_record_thread) self.record_thread.daemon = True self.record_thread.start() def start_record_thread(self): """Start recording power play data. Get a list of connected USB devices and try to establish a serial connection. Once the connection is established, open a text file and start reading serial data and write it to the text file after some formatting. """ devices = [x for x in os.listdir('/dev/') if x.startswith('ttyUSB')] for device in devices: device_link = '/dev/' + device try: if self.ser == None: logging.info('Trying ... %s', device_link) self.ser = serial.Serial(device_link, 115200) logging.info('Successfully connected to %s', device_link) break except serial.SerialException, e: raise error.TestError('Failed to connect to %s becuase of %s' % (device_link, str(e))) self.text_file = open(_power_play_data_file, 'w') if self.ser != None: title_row = ('time,powerplay_timestamp,momentary_current (A),' + 'momentary_charge (AH),average_current (A),' + 'total_standby_time,total_wake_time,num_wakes,is_awake?\n') self.text_file.write(title_row) start_time = time.time() while self.ser.readline(): current_timestamp = (('{:>10.3f}'. format(time.time() - start_time)).replace(' ', '')) pp_data = (self.ser.readline().replace('\00', ''). replace(' ', ',').replace('\r', '')) if (not pp_data.startswith('#') and (len(pp_data) > 30) and not self.text_file.closed): self.text_file.write(current_timestamp + ',' + pp_data) self.momentary_curr_list = self.extract_current(pp_data) time.sleep(self.recording_interval) self.ser.flushInput() else: self.text_file.write('No data from powerplay. Check connection.') def stop_recording_power_play_data(self): """Stop recording power play data. Close the text file and copy it to the test log results directory. Also report current data to the performance dashboard. """ if not self.text_file.closed: self.text_file.close() shutil.copy(_power_play_data_file, self.test.resultsdir) self.test.output_perf_value(description='momentary_current_draw', value=self.momentary_curr_list, units='Amps', higher_is_better=False)