1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import numpy 7 8import common 9from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors 10 11 12class MBIMDataChannel(object): 13 """ 14 Provides access to the data channel of a MBIM modem. 15 16 The object is used to send and receive MBIM frames to/from the modem. 17 The object uses the BULK-IN endpoint exposed in the data interface for any 18 reads from the modem to the host. 19 The object uses the BULK-OUT endpoint exposed in the data interface for any 20 writes from the host to the modem. 21 The channel does not deaggregate/aggregate packets into MBIM frames. The 22 caller is expected to validate/provide MBIM frames to the channel. The 23 channel is just used to send raw bytes to the device and read raw bytes from 24 the device. 25 26 """ 27 _READ_TIMEOUT_MS = 10000 28 _WRITE_TIMEOUT_MS = 10000 29 30 def __init__(self, 31 device, 32 data_interface_number, 33 bulk_in_endpoint_address, 34 bulk_out_endpoint_address, 35 max_in_buffer_size): 36 """ 37 @param device: Device handle returned by PyUSB for the modem to test. 38 @param bulk_in_endpoint_address: |bEndpointAddress| for the usb 39 BULK IN endpoint from the data interface. 40 @param bulk_out_endpoint_address: |bEndpointAddress| for the usb 41 BULK OUT endpoint from the data interface. 42 @param max_in_buffer_size: The (fixed) buffer size to used for in 43 data transfers. 44 45 """ 46 self._device = device 47 self._data_interface_number = data_interface_number 48 self._bulk_in_endpoint_address = bulk_in_endpoint_address 49 self._bulk_out_endpoint_address = bulk_out_endpoint_address 50 self._max_in_buffer_size = max_in_buffer_size 51 52 53 def send_ntb(self, ntb): 54 """ 55 Send the specified payload down to the device using the bulk-out USB 56 pipe. 57 58 @param ntb: Byte array of complete MBIM NTB to be sent to the device. 59 @raises MBIMComplianceDataTransferError if the complete |ntb| could not 60 be sent. 61 62 """ 63 ntb_length = len(ntb) 64 written = self._device.write(endpoint=self._bulk_out_endpoint_address, 65 data=ntb, 66 timeout=self._WRITE_TIMEOUT_MS, 67 interface=self._data_interface_number) 68 numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))}, 69 linewidth=1000) 70 logging.debug('Data Channel: Sent %d bytes out of %d bytes requested. ' 71 'Payload: %s', 72 written, ntb_length, numpy.array(ntb)) 73 if written < ntb_length: 74 mbim_errors.log_and_raise( 75 mbim_errors.MBIMComplianceDataTransferError, 76 'Could not send the complete NTB (%d/%d bytes sent)' % 77 written, ntb_length) 78 79 80 def receive_ntb(self): 81 """ 82 Receive a payload from the device using the bulk-in USB pipe. 83 84 This API will return any data it receives from the device within 85 |_READ_TIMEOUT_S| seconds. If nothing is received within this duration, 86 it returns an empty byte array. The API returns only one MBIM NTB 87 received per invocation. 88 89 @returns Byte array of complete MBIM NTB received from the device. This 90 could be empty if nothing is received from the device. 91 92 """ 93 ntb = self._device.read(endpoint=self._bulk_in_endpoint_address, 94 size=self._max_in_buffer_size, 95 timeout=self._READ_TIMEOUT_MS, 96 interface=self._data_interface_number) 97 ntb_length = len(ntb) 98 numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))}, 99 linewidth=1000) 100 logging.debug('Data Channel: Received %d bytes response. Payload: %s', 101 ntb_length, numpy.array(ntb)) 102 return ntb 103