1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import os 18import re 19import select 20import subprocess 21import sys 22import time 23import uuid 24from logging import Logger 25from threading import Thread 26 27import serial 28from serial.tools import list_ports 29 30from acts.controllers.buds_lib import tako_trace_logger 31 32logging = tako_trace_logger.TakoTraceLogger(Logger(__file__)) 33 34RETRIES = 0 35 36 37class LogSerialException(Exception): 38 """LogSerial Exception.""" 39 40 41class PortCheck(object): 42 def get_serial_ports(self): 43 """Gets the computer available serial ports. 44 45 Returns: 46 Dictionary object with all the serial port names. 47 """ 48 result = {} 49 ports = list_ports.comports() 50 for port_name, description, address in ports: 51 result[port_name] = (description, address) 52 return result 53 54 # TODO: Clean up this function. The boolean logic can be simplified. 55 def search_port_by_property(self, search_params): 56 """Search ports by a dictionary of the search parameters. 57 58 Args: 59 search_params: Dictionary object with the parameters 60 to search. i.e: 61 {'ID_SERIAL_SHORT':'213213', 62 'ID_USB_INTERFACE_NUM': '01'} 63 Returns: 64 Array with the ports found 65 """ 66 ports_result = [] 67 for port in self.get_serial_ports(): 68 properties = self.get_port_properties(port=port) 69 if properties: 70 properties_exists = True 71 for port_property in search_params: 72 properties_exists *= (port_property in properties) 73 properties_exists = True if properties_exists == 1 else False 74 if properties_exists: 75 found = True 76 for port_property in search_params.keys(): 77 search_value = search_params[port_property] 78 if properties[port_property] == search_value: 79 found *= True 80 else: 81 found = False 82 break 83 found = True if found == 1 else False 84 if found: 85 ports_result.append(port) 86 return ports_result 87 88 def get_port_properties(self, port): 89 """Get all the properties from a given port. 90 91 Args: 92 port: String object with the port name. i.e. '/dev/ttyACM1' 93 94 Returns: 95 dictionary object with all the properties. 96 """ 97 ports = self.get_serial_ports() 98 if port in ports: 99 result = {} 100 port_address = ports[port][1] 101 property_list = None 102 if sys.platform.startswith('linux') or sys.platform.startswith( 103 'cygwin'): 104 try: 105 command = 'udevadm info -q property -n {}'.format(port) 106 property_list = subprocess.check_output(command, shell=True) 107 property_list = property_list.decode(errors='replace') 108 except subprocess.CalledProcessError as error: 109 logging.error(error) 110 if property_list: 111 properties = filter(None, property_list.split('\n')) 112 for prop in properties: 113 p = prop.split('=') 114 result[p[0]] = p[1] 115 elif sys.platform.startswith('win'): 116 regex = ('(?P<type>[A-Z]*)\sVID\:PID\=(?P<vid>\w*)' 117 '\:(?P<pid>\w*)\s+(?P<adprop>.*$)') 118 m = re.search(regex, port_address) 119 if m: 120 result['type'] = m.group('type') 121 result['vid'] = m.group('vid') 122 result['pid'] = m.group('pid') 123 adprop = m.group('adprop').strip() 124 if adprop: 125 prop_array = adprop.split(' ') 126 for prop in prop_array: 127 p = prop.split('=') 128 result[p[0]] = p[1] 129 if 'LOCATION' in result: 130 interface = int(result['LOCATION'].split('.')[1]) 131 if interface < 10: 132 result['ID_USB_INTERFACE_NUM'] = '0{}'.format( 133 interface) 134 else: 135 result['ID_USB_INTERFACE_NUM'] = '{}'.format( 136 interface) 137 win_vid_pid = '*VID_{}*PID_{}*'.format(result['vid'], 138 result['pid']) 139 command = ( 140 'powershell gwmi "Win32_USBControllerDevice |' + 141 ' %{[wmi]($_.Dependent)} |' + 142 ' Where-Object -Property PNPDeviceID -Like "' + 143 win_vid_pid + '" |' + 144 ' Where-Object -Property Service -Eq "usbccgp" |' + 145 ' Select-Object -Property PNPDeviceID"') 146 res = subprocess.check_output(command, shell=True) 147 r = res.decode('ascii') 148 m = re.search('USB\\\\.*', r) 149 if m: 150 result['ID_SERIAL_SHORT'] = ( 151 m.group().strip().split('\\')[2]) 152 return result 153 154 def port_exists(self, port): 155 """Check if a serial port exists in the computer by the port name. 156 157 Args: 158 port: String object with the port name. i.e. '/dev/ttyACM1' 159 160 Returns: 161 True if it was found, False if not. 162 """ 163 exists = port in self.get_serial_ports() 164 return exists 165 166 167class LogSerial(object): 168 def __init__(self, 169 port, 170 baudrate, 171 bytesize=8, 172 parity='N', 173 stopbits=1, 174 timeout=0.15, 175 retries=0, 176 flush_output=True, 177 terminator='\n', 178 output_path=None, 179 serial_logger=None): 180 global RETRIES 181 self.set_log = False 182 self.output_path = None 183 self.set_output_path(output_path) 184 if serial_logger: 185 self.set_logger(serial_logger) 186 self.monitor_port = PortCheck() 187 if self.monitor_port.port_exists(port=port): 188 self.connection_handle = serial.Serial() 189 RETRIES = retries 190 self.reading = True 191 self.log = [] 192 self.log_thread = Thread() 193 self.command_ini_index = None 194 self.is_logging = False 195 self.flush_output = flush_output 196 self.terminator = terminator 197 if port: 198 self.connection_handle.port = port 199 if baudrate: 200 self.connection_handle.baudrate = baudrate 201 if bytesize: 202 self.connection_handle.bytesize = bytesize 203 if parity: 204 self.connection_handle.parity = parity 205 if stopbits: 206 self.connection_handle.stopbits = stopbits 207 if timeout: 208 self.connection_handle.timeout = timeout 209 try: 210 self.open() 211 except Exception as e: 212 self.close() 213 logging.error(e) 214 else: 215 raise LogSerialException( 216 'The port {} does not exist'.format(port)) 217 218 def set_logger(self, serial_logger): 219 global logging 220 logging = serial_logger 221 self.set_output_path(getattr(logging, 'output_path', '/tmp')) 222 self.set_log = True 223 224 def set_output_path(self, output_path): 225 """Set the output path for the flushed log. 226 227 Args: 228 output_path: String object with the path 229 """ 230 if output_path: 231 if os.path.exists(output_path): 232 self.output_path = output_path 233 else: 234 raise LogSerialException('The output path does not exist.') 235 236 def refresh_port_connection(self, port): 237 """Will update the port connection without closing the read thread. 238 239 Args: 240 port: String object with the new port name. i.e. '/dev/ttyACM1' 241 242 Raises: 243 LogSerialException if the port is not alive. 244 """ 245 if self.monitor_port.port_exists(port=port): 246 self.connection_handle.port = port 247 self.open() 248 else: 249 raise LogSerialException( 250 'The port {} does not exist'.format(port)) 251 252 def is_port_alive(self): 253 """Verify if the current port is alive in the computer. 254 255 Returns: 256 True if its alive, False if its missing. 257 """ 258 alive = self.monitor_port.port_exists(port=self.connection_handle.port) 259 return alive 260 261 # @retry(Exception, tries=RETRIES, delay=1, backoff=2) 262 def open(self): 263 """Will open the connection with the current port settings.""" 264 while self.connection_handle.isOpen(): 265 self.connection_handle.close() 266 time.sleep(0.5) 267 self.connection_handle.open() 268 if self.flush_output: 269 self.flush() 270 self.start_reading() 271 logging.info('Connection Open') 272 273 def close(self): 274 """Will close the connection and the read thread.""" 275 self.stop_reading() 276 if self.connection_handle: 277 self.connection_handle.close() 278 if not self.set_log: 279 logging.flush_log() 280 self.flush_log() 281 logging.info('Connection Closed') 282 283 def flush(self): 284 """Will flush any input from the serial connection.""" 285 self.write('\n') 286 self.connection_handle.flushInput() 287 self.connection_handle.flush() 288 flushed = 0 289 while True: 290 ready_r, _, ready_x = (select.select([self.connection_handle], [], 291 [self.connection_handle], 0)) 292 if ready_x: 293 logging.exception('exception from serial port') 294 return 295 elif ready_r: 296 flushed += 1 297 # This may cause underlying buffering. 298 self.connection_handle.read(1) 299 # Flush the underlying buffer too. 300 self.connection_handle.flush() 301 else: 302 break 303 if flushed > 0: 304 logging.debug('dropped >{} bytes'.format(flushed)) 305 306 def write(self, command, wait_time=0.2): 307 """Will write into the serial connection. 308 309 Args: 310 command: String object with the text to write. 311 wait_time: Float object with the seconds to wait after the 312 command was issued. 313 """ 314 if command: 315 if self.terminator: 316 command += self.terminator 317 self.command_ini_index = len(self.log) 318 self.connection_handle.write(command.encode()) 319 if wait_time: 320 time.sleep(wait_time) 321 logging.info('cmd [{}] sent.'.format(command.strip())) 322 323 def flush_log(self): 324 """Will output the log into a CSV file.""" 325 if len(self.log) > 0: 326 path = '' 327 if not self.output_path: 328 self.output_path = os.getcwd() 329 elif not os.path.exists(self.output_path): 330 self.output_path = os.getcwd() 331 path = os.path.join(self.output_path, 332 str(uuid.uuid4()) + '_serial.log') 333 with open(path, 'a') as log_file: 334 for info in self.log: 335 log_file.write('{}, {}\n'.format(info[0], info[1])) 336 337 def read(self): 338 """Will read from the log the output from the serial connection 339 after a write command was issued. It will take the initial time 340 of the command as a reference. 341 342 Returns: 343 Array object with the log lines. 344 """ 345 buf_read = [] 346 command_end_index = len(self.log) 347 info = self.query_serial_log(self.command_ini_index, command_end_index) 348 for line in info: 349 buf_read.append(line[1]) 350 self.command_ini_index = command_end_index 351 return buf_read 352 353 def get_all_log(self): 354 """Gets the log object that collects the logs. 355 356 Returns: 357 DataFrame object with all the logs. 358 """ 359 return self.log 360 361 def query_serial_log(self, from_index, to_index): 362 """Will query the session log from a given time in EPOC format. 363 364 Args: 365 from_timestamp: Double value with the EPOC timestamp to start 366 the search. 367 to_timestamp: Double value with the EPOC timestamp to finish the 368 rearch. 369 370 Returns: 371 DataFrame with the result query. 372 """ 373 if from_index < to_index: 374 info = self.log[from_index:to_index] 375 return info 376 377 def _start_reading_thread(self): 378 if self.connection_handle.isOpen(): 379 self.reading = True 380 while self.reading: 381 try: 382 data = self.connection_handle.readline().decode('utf-8') 383 if data: 384 self.is_logging = True 385 data.replace('/n', '') 386 data.replace('/r', '') 387 data = data.strip() 388 self.log.append([time.time(), data]) 389 else: 390 self.is_logging = False 391 except Exception: 392 time.sleep(1) 393 logging.info('Read thread closed') 394 395 def start_reading(self): 396 """Method to start the log collection.""" 397 if not self.log_thread.isAlive(): 398 self.log_thread = Thread(target=self._start_reading_thread, args=()) 399 self.log_thread.daemon = True 400 try: 401 self.log_thread.start() 402 except(KeyboardInterrupt, SystemExit): 403 self.close() 404 else: 405 logging.warning('Not running log thread, is already alive') 406 407 def stop_reading(self): 408 """Method to stop the log collection.""" 409 self.reading = False 410 self.log_thread.join(timeout=600) 411