• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5'''
6USB to I2C controller.
7'''
8
9import glob
10import logging
11import os
12import re
13import serial
14import time
15
16from autotest_lib.client.cros import tty
17
18# Least significant bit of I2C address.
19WRITE_BIT = 0
20READ_BIT = 1
21
22
23def create_i2c_controller(chipset_config):
24    '''Factory method for I2CController.
25
26    This function is a factory method to create an I2CController instance.
27
28    @param chipset_config: Chipset configuration.
29    @return An I2CController if succeeded.
30    @throws AssertionError if a valid instance cannot be created.
31    '''
32    if chipset_config.split(':')[0] == 'SC18IM700':
33        usb_uart_driver = chipset_config.split(':')[1]
34        # Try to find a tty terminal that driver name matches usb_uart_driver.
35        tty_path = tty.find_tty_by_driver(usb_uart_driver)
36        if tty_path:
37            return _I2CControllerSC18IM700(tty_path)
38
39    assert False, "Unsupported configuration: %s" % chipset_config
40
41
42class I2CController(object):
43    '''
44    The base class of I2C controller.
45    '''
46
47    # Constants indicate I2C bus status.
48    I2C_OK = 1
49    I2C_NACK_ON_ADDRESS = 2
50    I2C_NACK_ON_DATA = 3
51    I2C_TIME_OUT = 4
52
53    def send_and_check_status(self, slave_addr, int_array):
54        '''Sends data to I2C slave device and checks the bus status.
55
56        @param slave_addr: The address of slave in 7bits format.
57        @param int_array: The data to send in integer array.
58        @param status_check: Whether to check I2C bus status.
59
60        @return An integer indicates I2C bus status.
61        '''
62        self.send(slave_addr, int_array)
63        return self.read_bus_status()
64
65    def read_bus_status(self):
66        '''Returns the I2C bus status.'''
67        raise NotImplementedError
68
69    def send(self, slave_addr, int_array):
70        '''Sends data to I2C slave device.
71
72        Caller should call read_bus_status() explicitly to confirm whether the
73        data sent successfully.
74
75        @param slave_addr: The address of slave in 7bits format.
76        @param int_array: The data to send in integer array.
77        '''
78        raise NotImplementedError
79
80    def read(self, slave_addr, bytes_to_read):
81        '''Reads data from I2C slave device.
82
83        @param slave_addr: The address of slave in 7bits format.
84        @param bytes_to_read: The number of bytes to read from device.
85        @return An array of data.
86        '''
87        raise NotImplementedError
88
89
90class _I2CControllerSC18IM700(I2CController):
91    '''
92    Implementation of I2C Controller for NXP SC18IM700.
93    '''
94    SEC_WAIT_I2C = 0.1
95
96    # Constants from official datasheet.
97    # http://www.nxp.com/documents/data_sheet/SC18IM700.pdf
98    I2C_STATUS = {0b11110000: I2CController.I2C_OK,
99                  0b11110001: I2CController.I2C_NACK_ON_ADDRESS,
100                  0b11110011: I2CController.I2C_NACK_ON_DATA,
101                  0b11111000: I2CController.I2C_TIME_OUT}
102
103    def __init__(self, device_path):
104        '''Connects to NXP via serial port.
105
106        @param device_path: The device path of serial port.
107        '''
108        self.logger = logging.getLogger('SC18IM700')
109        self.logger.info('Setup serial device... [%s]', device_path)
110        self.device_path = device_path
111        self.serial = serial.Serial(port=self.device_path,
112                                    baudrate=9600,
113                                    bytesize=serial.EIGHTBITS,
114                                    parity=serial.PARITY_NONE,
115                                    stopbits=serial.STOPBITS_ONE,
116                                    xonxoff=False,
117                                    rtscts=True,
118                                    interCharTimeout=1)
119        self.logger.info('pySerial [%s] configuration : %s',
120                         serial.VERSION, self.serial.__repr__())
121        # Clean the buffer.
122        self.serial.flush()
123
124    def _write(self, data):
125        '''Converts data to bytearray and writes to the serial port.'''
126        self.serial.write(bytearray(data))
127        self.serial.flush()
128
129    def _read(self):
130        '''Reads data from serial port(Non-Blocking).'''
131        ret = self.serial.read(self.serial.inWaiting())
132        self.logger.info('Hex and binary dump of datas - ')
133        for char in ret:
134            self.logger.info('  %x - %s', ord(char), bin(ord(char)))
135        return ret
136
137    @staticmethod
138    def _convert_to_8bits_addr(slave_addr_7bits, lsb):
139        '''Converts slave_addr from 7 bits to 8 bits with given LSB.'''
140        assert (slave_addr_7bits >> 7) == 0, "Address must not exceed 7 bits."
141        assert (lsb & ~0x01) == 0, "lsb must not exceed one bit."
142        return (slave_addr_7bits << 1) | lsb
143
144    def read_bus_status(self):
145        cmd = [ord('R'), 0x0A, ord('P')]
146        self._write(cmd)
147        time.sleep(self.SEC_WAIT_I2C)
148        ret = self._read()
149        if (len(ret) == 1) and (ord(ret[0]) in self.I2C_STATUS):
150            return self.I2C_STATUS[ord(ret[0])]
151        raise IOError("I2C_STATUS_READ_FAILED")
152
153    def send(self, slave_addr, int_array):
154        cmd = ([ord('S'),
155                self._convert_to_8bits_addr(slave_addr, WRITE_BIT),
156                len(int_array)] +
157               int_array + [ord('P')])
158        self._write(cmd)
159
160    def read(self, slave_addr, bytes_to_read):
161        cmd = ([ord('S'),
162                self._convert_to_8bits_addr(slave_addr, READ_BIT),
163                bytes_to_read,
164                ord('P')])
165        self._write(cmd)
166        time.sleep(self.SEC_WAIT_I2C)
167        return self._read()
168
169    def write_gpio(self, data):
170        self._write([ord('O'), data, ord('P')])
171
172    def read_gpio(self):
173        self._write([ord('I'), ord('P')])
174        time.sleep(self.SEC_WAIT_I2C)
175        return self._read()
176
177    def write_register(self, regs, datas):
178        assert len(regs) == len(datas)
179        cmd = [ord('W')]
180        for i in range(len(regs)):
181            cmd.append(regs[i])
182            cmd.append(datas[i])
183        cmd.append(ord('P'))
184        self._write(cmd)
185
186    def read_register(self, regs):
187        cmd = [ord('R')] + regs + [ord('P')]
188        self._write(cmd)
189        time.sleep(self.SEC_WAIT_I2C)
190        return self._read()
191