# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. ''' USB to I2C controller. ''' import glob import logging import os import re import serial import time from autotest_lib.client.cros import tty # Least significant bit of I2C address. WRITE_BIT = 0 READ_BIT = 1 def create_i2c_controller(chipset_config): '''Factory method for I2CController. This function is a factory method to create an I2CController instance. @param chipset_config: Chipset configuration. @return An I2CController if succeeded. @throws AssertionError if a valid instance cannot be created. ''' if chipset_config.split(':')[0] == 'SC18IM700': usb_uart_driver = chipset_config.split(':')[1] # Try to find a tty terminal that driver name matches usb_uart_driver. tty_path = tty.find_tty_by_driver(usb_uart_driver) if tty_path: return _I2CControllerSC18IM700(tty_path) assert False, "Unsupported configuration: %s" % chipset_config class I2CController(object): ''' The base class of I2C controller. ''' # Constants indicate I2C bus status. I2C_OK = 1 I2C_NACK_ON_ADDRESS = 2 I2C_NACK_ON_DATA = 3 I2C_TIME_OUT = 4 def send_and_check_status(self, slave_addr, int_array): '''Sends data to I2C slave device and checks the bus status. @param slave_addr: The address of slave in 7bits format. @param int_array: The data to send in integer array. @param status_check: Whether to check I2C bus status. @return An integer indicates I2C bus status. ''' self.send(slave_addr, int_array) return self.read_bus_status() def read_bus_status(self): '''Returns the I2C bus status.''' raise NotImplementedError def send(self, slave_addr, int_array): '''Sends data to I2C slave device. Caller should call read_bus_status() explicitly to confirm whether the data sent successfully. @param slave_addr: The address of slave in 7bits format. @param int_array: The data to send in integer array. ''' raise NotImplementedError def read(self, slave_addr, bytes_to_read): '''Reads data from I2C slave device. @param slave_addr: The address of slave in 7bits format. @param bytes_to_read: The number of bytes to read from device. @return An array of data. ''' raise NotImplementedError class _I2CControllerSC18IM700(I2CController): ''' Implementation of I2C Controller for NXP SC18IM700. ''' SEC_WAIT_I2C = 0.1 # Constants from official datasheet. # http://www.nxp.com/documents/data_sheet/SC18IM700.pdf I2C_STATUS = {0b11110000: I2CController.I2C_OK, 0b11110001: I2CController.I2C_NACK_ON_ADDRESS, 0b11110011: I2CController.I2C_NACK_ON_DATA, 0b11111000: I2CController.I2C_TIME_OUT} def __init__(self, device_path): '''Connects to NXP via serial port. @param device_path: The device path of serial port. ''' self.logger = logging.getLogger('SC18IM700') self.logger.info('Setup serial device... [%s]', device_path) self.device_path = device_path self.serial = serial.Serial(port=self.device_path, baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, xonxoff=False, rtscts=True, interCharTimeout=1) self.logger.info('pySerial [%s] configuration : %s', serial.VERSION, self.serial.__repr__()) # Clean the buffer. self.serial.flush() def _write(self, data): '''Converts data to bytearray and writes to the serial port.''' self.serial.write(bytearray(data)) self.serial.flush() def _read(self): '''Reads data from serial port(Non-Blocking).''' ret = self.serial.read(self.serial.inWaiting()) self.logger.info('Hex and binary dump of datas - ') for char in ret: self.logger.info(' %x - %s', ord(char), bin(ord(char))) return ret @staticmethod def _convert_to_8bits_addr(slave_addr_7bits, lsb): '''Converts slave_addr from 7 bits to 8 bits with given LSB.''' assert (slave_addr_7bits >> 7) == 0, "Address must not exceed 7 bits." assert (lsb & ~0x01) == 0, "lsb must not exceed one bit." return (slave_addr_7bits << 1) | lsb def read_bus_status(self): cmd = [ord('R'), 0x0A, ord('P')] self._write(cmd) time.sleep(self.SEC_WAIT_I2C) ret = self._read() if (len(ret) == 1) and (ord(ret[0]) in self.I2C_STATUS): return self.I2C_STATUS[ord(ret[0])] raise IOError("I2C_STATUS_READ_FAILED") def send(self, slave_addr, int_array): cmd = ([ord('S'), self._convert_to_8bits_addr(slave_addr, WRITE_BIT), len(int_array)] + int_array + [ord('P')]) self._write(cmd) def read(self, slave_addr, bytes_to_read): cmd = ([ord('S'), self._convert_to_8bits_addr(slave_addr, READ_BIT), bytes_to_read, ord('P')]) self._write(cmd) time.sleep(self.SEC_WAIT_I2C) return self._read() def write_gpio(self, data): self._write([ord('O'), data, ord('P')]) def read_gpio(self): self._write([ord('I'), ord('P')]) time.sleep(self.SEC_WAIT_I2C) return self._read() def write_register(self, regs, datas): assert len(regs) == len(datas) cmd = [ord('W')] for i in range(len(regs)): cmd.append(regs[i]) cmd.append(datas[i]) cmd.append(ord('P')) self._write(cmd) def read_register(self, regs): cmd = [ord('R')] + regs + [ord('P')] self._write(cmd) time.sleep(self.SEC_WAIT_I2C) return self._read()