• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import os
16import re
17
18
19class Device(object):
20    """Create dict object for relay usb connection.
21
22       This class provides an interface to locate lab equipment without encoding
23       knowledge of the USB bus topology in the lab equipment device drivers.
24    """
25
26    KEY_VID = 'vendor_id'
27    KEY_PID = 'product_id'
28    KEY_SN = 'serial_no'
29    KEY_INF = 'inf'
30    KEY_CFG = 'config'
31    KEY_NAME = 'name'
32    KEY_TTY = 'tty_path'
33    KEY_MFG = 'mfg'
34    KEY_PRD = 'product'
35    KEY_VER = 'version'
36
37    _instance = None
38
39    _USB_DEVICE_SYS_ROOT = '/sys/bus/usb/devices'
40    _DEV_ROOT = '/dev'
41
42    _SYS_VENDOR_ID = 'idVendor'
43    _SYS_PRODUCT_ID = 'idProduct'
44    _SYS_SERIAL_NO = 'serial'
45    _INF_CLASS = 'bInterfaceClass'
46    _INF_SUB_CLASS = 'bInterfaceSubClass'
47    _INF_PROTOCOL = 'bInterfaceProtocol'
48    _MFG_STRING = 'manufacturer'
49    _PRODUCT_STRING = 'product'
50    _VERSION_STRING = 'version'
51
52    _USB_CDC_ACM_CLASS = 0x02
53    _USB_CDC_ACM_SUB_CLASS = 0x02
54    _USB_CDC_ACM_PROTOCOL = 0x01
55
56    def __init__(self, name, vid, pid, cfg, inf):
57        self._device_list = []
58
59        self._build_device(name, vid, pid, cfg, inf)
60
61        self._walk_usb_tree(self._init_device_list_callback, None)
62
63    def __new__(cls, *args, **kwargs):
64        # The Device class should be a singleton.  A lab test procedure may
65        # use multiple pieces of lab equipment and we do not want to have to
66        # create a new instance of the Device for each device.
67        if not cls._instance:
68            cls._instance = super(Device, cls).__new__(cls, *args, **kwargs)
69        return cls._instance
70
71    def __enter__(self):
72        return self
73
74    def __exit__(self, exception_type, exception_value, traceback):
75        pass
76
77    def _build_device(self, name, vid, pid, cfg, inf):
78        """Build relay device information.
79
80        Args:
81            name:   device
82            vid:    vendor ID
83            pid:    product ID
84            cfg:    configuration
85            inf:    interface
86
87        Returns:
88            Nothing
89        """
90        entry = {}
91        entry[self.KEY_NAME] = name
92        entry[self.KEY_VID] = int(vid, 16)
93        entry[self.KEY_PID] = int(pid, 16)
94
95        # The serial number string is optional in USB and not all devices
96        # use it.  The relay devices do not use it then we specify 'None' in
97        # the lab configuration file.
98        entry[self.KEY_SN] = None
99        entry[self.KEY_CFG] = int(cfg)
100        entry[self.KEY_INF] = int(inf)
101        entry[self.KEY_TTY] = None
102
103        self._device_list.append(entry)
104
105    def _find_lab_device_entry(self, vendor_id, product_id, serial_no):
106        """find a device in the lab device list.
107
108        Args:
109            vendor_id: unique vendor id for device
110            product_id: unique product id for device
111            serial_no: serial string for the device (may be None)
112
113        Returns:
114            device entry or None
115        """
116        for device in self._device_list:
117            if device[self.KEY_VID] != vendor_id:
118                continue
119            if device[self.KEY_PID] != product_id:
120                continue
121            if device[self.KEY_SN] == serial_no:
122                return device
123
124        return None
125
126    def _read_sys_attr(self, root, attr):
127        """read a sysfs attribute.
128
129        Args:
130            root: path of the sysfs directory
131            attr: attribute to read
132
133        Returns:
134            attribute value or None
135        """
136        try:
137            path = os.path.join(root, attr)
138            with open(path) as f:
139                return f.readline().rstrip()
140        except IOError:
141            return None
142
143    def _read_sys_hex_attr(self, root, attr):
144        """read a sysfs hexadecimal integer attribute.
145
146        Args:
147            root: path of the sysfs directory
148            attr: attribute to read
149
150        Returns:
151            attribute value or None
152        """
153        try:
154            path = os.path.join(root, attr)
155            with open(path) as f:
156                return int(f.readline(), 16)
157        except IOError:
158            return None
159
160    def _is_cdc_acm(self, inf_path):
161        """determine if the interface implements the CDC ACM class.
162
163        Args:
164            inf_path: directory entry for the inf under /sys/bus/usb/devices
165
166        Returns:
167            True if the inf is CDC ACM, false otherwise
168        """
169        cls = self._read_sys_hex_attr(inf_path, self._INF_CLASS)
170        sub_cls = self._read_sys_hex_attr(inf_path, self._INF_SUB_CLASS)
171        proto = self._read_sys_hex_attr(inf_path, self._INF_PROTOCOL)
172        if self._USB_CDC_ACM_CLASS != cls:
173            return False
174        if self._USB_CDC_ACM_SUB_CLASS != sub_cls:
175            return False
176        if self._USB_CDC_ACM_PROTOCOL != proto:
177            return False
178
179        return True
180
181    def _read_tty_name(self, dir_entry, inf, cfg):
182        """Get the path to the associated tty device.
183
184        Args:
185            dir_entry: directory entry for the device under /sys/bus/usb/devices
186            inf: Interface number of the device
187            cfg: Configuration number of the device
188
189        Returns:
190            Path to a tty device or None
191        """
192        inf_path = os.path.join(self._USB_DEVICE_SYS_ROOT,
193                                '%s:%d.%d' % (dir_entry, cfg, inf))
194
195        # first determine if this is a CDC-ACM or USB Serial device.
196        if self._is_cdc_acm(inf_path):
197            tty_list = os.listdir(os.path.join(inf_path, 'tty'))
198
199            # Each CDC-ACM interface should only have one tty device associated
200            # with it so just return the first item in the list.
201            return os.path.join(self._DEV_ROOT, tty_list[0])
202        else:
203            # USB Serial devices have a link to their ttyUSB* device in the inf
204            # directory
205            tty_re = re.compile(r'ttyUSB\d+$')
206
207            dir_list = os.listdir(inf_path)
208            for entry in dir_list:
209                if tty_re.match(entry):
210                    return os.path.join(self._DEV_ROOT, entry)
211
212        return None
213
214    def _init_device_list_callback(self, _, dir_entry):
215        """Callback function used with _walk_usb_tree for device list init.
216
217        Args:
218            _: Callback context (unused)
219            dir_entry: Directory entry reported by _walk_usb_tree
220
221        """
222        path = os.path.join(self._USB_DEVICE_SYS_ROOT, dir_entry)
223
224        # The combination of vendor id, product id, and serial number
225        # should be sufficient to uniquely identify each piece of lab
226        # equipment.
227        vendor_id = self._read_sys_hex_attr(path, self._SYS_VENDOR_ID)
228        product_id = self._read_sys_hex_attr(path, self._SYS_PRODUCT_ID)
229        serial_no = self._read_sys_attr(path, self._SYS_SERIAL_NO)
230
231        # For each device try to match it with a device entry in the lab
232        # configuration.
233        device = self._find_lab_device_entry(vendor_id, product_id, serial_no)
234        if device:
235            # If the device is in the lab configuration then determine
236            # which tty device it associated with.
237            device[self.KEY_TTY] = self._read_tty_name(dir_entry,
238                                                       device[self.KEY_INF],
239                                                       device[self.KEY_CFG])
240
241    def _list_all_tty_devices_callback(self, dev_list, dir_entry):
242        """Callback for _walk_usb_tree when listing all USB serial devices.
243
244        Args:
245            dev_list: Device list to fill
246            dir_entry: Directory entry reported by _walk_usb_tree
247
248        """
249        dev_path = os.path.join(self._USB_DEVICE_SYS_ROOT, dir_entry)
250
251        # Determine if there are any interfaces in the sys directory for the
252        # USB Device.
253        inf_re = re.compile(r'\d+-\d+(\.\d+){0,}:(?P<cfg>\d+)\.(?P<inf>\d+)$')
254        inf_dir_list = os.listdir(dev_path)
255
256        for inf_entry in inf_dir_list:
257            inf_match = inf_re.match(inf_entry)
258            if inf_match is None:
259                continue
260
261            inf_dict = inf_match.groupdict()
262            inf = int(inf_dict['inf'])
263            cfg = int(inf_dict['cfg'])
264
265            # Check to see if there is a tty device associated with this
266            # interface.
267            tty_path = self._read_tty_name(dir_entry, inf, cfg)
268            if tty_path is None:
269                continue
270
271            # This is a TTY interface, create a dictionary of the relevant
272            # sysfs attributes for this device.
273            entry = {}
274            entry[self.KEY_TTY] = tty_path
275            entry[self.KEY_INF] = inf
276            entry[self.KEY_CFG] = cfg
277            entry[self.KEY_VID] = self._read_sys_hex_attr(dev_path,
278                                                          self._SYS_VENDOR_ID)
279            entry[self.KEY_PID] = self._read_sys_hex_attr(dev_path,
280                                                          self._SYS_PRODUCT_ID)
281            entry[self.KEY_SN] = self._read_sys_attr(dev_path,
282                                                     self._SYS_SERIAL_NO)
283            entry[self.KEY_MFG] = self._read_sys_attr(dev_path,
284                                                      self._MFG_STRING)
285            entry[self.KEY_PRD] = self._read_sys_attr(dev_path,
286                                                      self._PRODUCT_STRING)
287            entry[self.KEY_VER] = self._read_sys_attr(dev_path,
288                                                      self._VERSION_STRING)
289
290            # If this device is also in the lab device list then add the
291            # friendly name for it.
292            lab_device = self._find_lab_device_entry(entry[self.KEY_VID],
293                                                     entry[self.KEY_PID],
294                                                     entry[self.KEY_SN])
295            if lab_device is not None:
296                entry[self.KEY_NAME] = lab_device[self.KEY_NAME]
297
298            dev_list.append(entry)
299
300    def _walk_usb_tree(self, callback, context):
301        """Walk the USB device and locate lab devices.
302
303           Traverse the USB device tree in /sys/bus/usb/devices and inspect each
304           device and see if it matches a device in the lab configuration.  If
305           it does then get the path to the associated tty device.
306
307        Args:
308            callback: Callback to invoke when a USB device is found.
309            context: Context variable for callback.
310
311        Returns:
312            Nothing
313        """
314        # Match only devices, exclude interfaces and root hubs
315        file_re = re.compile(r'\d+-\d+(\.\d+){0,}$')
316        dir_list = os.listdir(self._USB_DEVICE_SYS_ROOT)
317
318        for dir_entry in dir_list:
319            if file_re.match(dir_entry):
320                callback(context, dir_entry)
321
322    def get_tty_path(self, name):
323        """Get the path to the tty device for a given lab device.
324
325        Args:
326            name: lab device identifier, e.g. 'rail', or 'bt_trigger'
327
328        Returns:
329            Path to the tty device otherwise None
330        """
331        for dev in self._device_list:
332            if dev[self.KEY_NAME] == name and dev[self.KEY_NAME] is not None:
333                return dev[self.KEY_TTY]
334
335        return None
336
337    def get_tty_devices(self):
338        """Get a list of all USB based tty devices attached to the machine.
339
340        Returns:
341            List of dictionaries where each dictionary contains a description of
342            the USB TTY device.
343        """
344        all_dev_list = []
345        self._walk_usb_tree(self._list_all_tty_devices_callback, all_dev_list)
346
347        return all_dev_list
348
349