1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import error, logging, os, serial, shutil, threading, time 7 8_power_play_data_file = '/tmp/power_play_data' 9 10class PowerPlay(object): 11 """Class to record serial over USB data from Power Play (go/powerplay). 12 13 It detects if powerplay is connected to the DUT over USB and opens the 14 serial port to start receiving powerplay data. It also opens a text file to 15 save this data after some formatting. 16 """ 17 18 version = 1 19 20 def __init__(self, test_obj, record_interval=0): 21 """Initialize PowerPlay. 22 23 @param test_obj: test object. 24 @param record_interval: Power play data recording interval in seconds. 25 """ 26 self.test = test_obj 27 self.ser = None 28 self.recording_interval = record_interval 29 self.momentary_curr_list = list() 30 self.record_thread = None 31 32 def extract_current(self, pp_data): 33 """Extract momentary current value from each line of powerplay data. 34 35 @param pp_data: Single line of powerplay data with eight comma separated 36 values. 37 @return list containing momentary current values. 38 """ 39 if pp_data[0].isdigit(): 40 self.momentary_curr_list.append(float(pp_data[pp_data.index(',')+1:] 41 [:pp_data[pp_data.index(',')+1:].index(',')])) 42 return self.momentary_curr_list 43 44 def start_recording_power_play_data(self): 45 """Starts a new thread to record power play data.""" 46 self.record_thread = threading.Thread(target=self.start_record_thread) 47 self.record_thread.daemon = True 48 self.record_thread.start() 49 50 def start_record_thread(self): 51 """Start recording power play data. 52 53 Get a list of connected USB devices and try to establish a serial 54 connection. Once the connection is established, open a text file and 55 start reading serial data and write it to the text file after some 56 formatting. 57 """ 58 devices = [x for x in os.listdir('/dev/') if x.startswith('ttyUSB')] 59 60 for device in devices: 61 device_link = '/dev/' + device 62 try: 63 if self.ser == None: 64 logging.info('Trying ... %s', device_link) 65 self.ser = serial.Serial(device_link, 115200) 66 logging.info('Successfully connected to %s', device_link) 67 break 68 except serial.SerialException as e: 69 raise error.TestError('Failed to connect to %s becuase of %s' % 70 (device_link, str(e))) 71 72 self.text_file = open(_power_play_data_file, 'w') 73 74 if self.ser != None: 75 title_row = ('time,powerplay_timestamp,momentary_current (A),' + 76 'momentary_charge (AH),average_current (A),' + 77 'total_standby_time,total_wake_time,num_wakes,is_awake?\n') 78 self.text_file.write(title_row) 79 start_time = time.time() 80 while self.ser.readline(): 81 current_timestamp = (('{:>10.3f}'. 82 format(time.time() - start_time)).replace(' ', '')) 83 pp_data = (self.ser.readline().replace('\00', ''). 84 replace(' ', ',').replace('\r', '')) 85 if (not pp_data.startswith('#') and (len(pp_data) > 30) and 86 not self.text_file.closed): 87 self.text_file.write(current_timestamp + ',' + pp_data) 88 self.momentary_curr_list = self.extract_current(pp_data) 89 time.sleep(self.recording_interval) 90 self.ser.flushInput() 91 else: 92 self.text_file.write('No data from powerplay. Check connection.') 93 94 def stop_recording_power_play_data(self): 95 """Stop recording power play data. 96 97 Close the text file and copy it to the test log results directory. Also 98 report current data to the performance dashboard. 99 """ 100 if not self.text_file.closed: 101 self.text_file.close() 102 shutil.copy(_power_play_data_file, self.test.resultsdir) 103 self.test.output_perf_value(description='momentary_current_draw', 104 value=self.momentary_curr_list, 105 units='Amps', higher_is_better=False) 106