• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import numpy
7
8import common
9from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
10
11
12class MBIMDataChannel(object):
13    """
14    Provides access to the data channel of a MBIM modem.
15
16    The object is used to send and receive MBIM frames to/from the modem.
17    The object uses the BULK-IN endpoint exposed in the data interface for any
18    reads from the modem to the host.
19    The object uses the BULK-OUT endpoint exposed in the data interface for any
20    writes from the host to the modem.
21    The channel does not deaggregate/aggregate packets into MBIM frames. The
22    caller is expected to validate/provide MBIM frames to the channel. The
23    channel is just used to send raw bytes to the device and read raw bytes from
24    the device.
25
26    """
27    _READ_TIMEOUT_MS = 10000
28    _WRITE_TIMEOUT_MS = 10000
29
30    def __init__(self,
31                 device,
32                 data_interface_number,
33                 bulk_in_endpoint_address,
34                 bulk_out_endpoint_address,
35                 max_in_buffer_size):
36        """
37        @param device: Device handle returned by PyUSB for the modem to test.
38        @param bulk_in_endpoint_address: |bEndpointAddress| for the usb
39                BULK IN endpoint from the data interface.
40        @param bulk_out_endpoint_address: |bEndpointAddress| for the usb
41                BULK OUT endpoint from the data interface.
42        @param max_in_buffer_size: The (fixed) buffer size to used for in
43                data transfers.
44
45        """
46        self._device = device
47        self._data_interface_number = data_interface_number
48        self._bulk_in_endpoint_address = bulk_in_endpoint_address
49        self._bulk_out_endpoint_address = bulk_out_endpoint_address
50        self._max_in_buffer_size = max_in_buffer_size
51
52
53    def send_ntb(self, ntb):
54        """
55        Send the specified payload down to the device using the bulk-out USB
56        pipe.
57
58        @param ntb: Byte array of complete MBIM NTB to be sent to the device.
59        @raises MBIMComplianceDataTransferError if the complete |ntb| could not
60                be sent.
61
62        """
63        ntb_length = len(ntb)
64        written = self._device.write(endpoint=self._bulk_out_endpoint_address,
65                                     data=ntb,
66                                     timeout=self._WRITE_TIMEOUT_MS,
67                                     interface=self._data_interface_number)
68        numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
69                               linewidth=1000)
70        logging.debug('Data Channel: Sent %d bytes out of %d bytes requested. '
71                      'Payload: %s',
72                       written, ntb_length, numpy.array(ntb))
73        if written < ntb_length:
74            mbim_errors.log_and_raise(
75                    mbim_errors.MBIMComplianceDataTransferError,
76                    'Could not send the complete NTB (%d/%d bytes sent)' %
77                    written, ntb_length)
78
79
80    def receive_ntb(self):
81        """
82        Receive a payload from the device using the bulk-in USB pipe.
83
84        This API will return any data it receives from the device within
85        |_READ_TIMEOUT_S| seconds. If nothing is received within this duration,
86        it returns an empty byte array. The API returns only one MBIM NTB
87        received per invocation.
88
89        @returns Byte array of complete MBIM NTB received from the device. This
90                could be empty if nothing is received from the device.
91
92        """
93        ntb = self._device.read(endpoint=self._bulk_in_endpoint_address,
94                                size=self._max_in_buffer_size,
95                                timeout=self._READ_TIMEOUT_MS,
96                                interface=self._data_interface_number)
97        ntb_length = len(ntb)
98        numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
99                               linewidth=1000)
100        logging.debug('Data Channel: Received %d bytes response. Payload: %s',
101                       ntb_length, numpy.array(ntb))
102        return ntb
103