# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import numpy import common from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors class MBIMDataChannel(object): """ Provides access to the data channel of a MBIM modem. The object is used to send and receive MBIM frames to/from the modem. The object uses the BULK-IN endpoint exposed in the data interface for any reads from the modem to the host. The object uses the BULK-OUT endpoint exposed in the data interface for any writes from the host to the modem. The channel does not deaggregate/aggregate packets into MBIM frames. The caller is expected to validate/provide MBIM frames to the channel. The channel is just used to send raw bytes to the device and read raw bytes from the device. """ _READ_TIMEOUT_MS = 10000 _WRITE_TIMEOUT_MS = 10000 def __init__(self, device, data_interface_number, bulk_in_endpoint_address, bulk_out_endpoint_address, max_in_buffer_size): """ @param device: Device handle returned by PyUSB for the modem to test. @param bulk_in_endpoint_address: |bEndpointAddress| for the usb BULK IN endpoint from the data interface. @param bulk_out_endpoint_address: |bEndpointAddress| for the usb BULK OUT endpoint from the data interface. @param max_in_buffer_size: The (fixed) buffer size to used for in data transfers. """ self._device = device self._data_interface_number = data_interface_number self._bulk_in_endpoint_address = bulk_in_endpoint_address self._bulk_out_endpoint_address = bulk_out_endpoint_address self._max_in_buffer_size = max_in_buffer_size def send_ntb(self, ntb): """ Send the specified payload down to the device using the bulk-out USB pipe. @param ntb: Byte array of complete MBIM NTB to be sent to the device. @raises MBIMComplianceDataTransferError if the complete |ntb| could not be sent. """ ntb_length = len(ntb) written = self._device.write(endpoint=self._bulk_out_endpoint_address, data=ntb, timeout=self._WRITE_TIMEOUT_MS, interface=self._data_interface_number) numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))}, linewidth=1000) logging.debug('Data Channel: Sent %d bytes out of %d bytes requested. ' 'Payload: %s', written, ntb_length, numpy.array(ntb)) if written < ntb_length: mbim_errors.log_and_raise( mbim_errors.MBIMComplianceDataTransferError, 'Could not send the complete NTB (%d/%d bytes sent)' % written, ntb_length) def receive_ntb(self): """ Receive a payload from the device using the bulk-in USB pipe. This API will return any data it receives from the device within |_READ_TIMEOUT_S| seconds. If nothing is received within this duration, it returns an empty byte array. The API returns only one MBIM NTB received per invocation. @returns Byte array of complete MBIM NTB received from the device. This could be empty if nothing is received from the device. """ ntb = self._device.read(endpoint=self._bulk_in_endpoint_address, size=self._max_in_buffer_size, timeout=self._READ_TIMEOUT_MS, interface=self._data_interface_number) ntb_length = len(ntb) numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))}, linewidth=1000) logging.debug('Data Channel: Received %d bytes response. Payload: %s', ntb_length, numpy.array(ntb)) return ntb