# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import common from autotest_lib.client.bin import utils from autotest_lib.client.cros.cellular.mbim_compliance import mbim_channel from autotest_lib.client.cros.cellular.mbim_compliance \ import mbim_command_message from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors from autotest_lib.client.cros.cellular.mbim_compliance \ import mbim_message_request from autotest_lib.client.cros.cellular.mbim_compliance \ import mbim_message_response from autotest_lib.client.cros.cellular.mbim_compliance \ import mbim_test_base from autotest_lib.client.cros.cellular.mbim_compliance.sequences \ import get_descriptors_sequence from autotest_lib.client.cros.cellular.mbim_compliance.sequences \ import mbim_open_generic_sequence class cellular_MbimComplianceCM16(mbim_test_base.MbimTestBase): """ CM_16 Validation of fragmented message transmission in case of multiple fragmented messages. This test verifies that fragmented messages sent from the function are not intermixed. Note that this test is only applicable for devices that support multiple outstanding commands. Reference: [1] Universal Serial Bus Communication Class MBIM Compliance Testing: 44 http://www.usb.org/developers/docs/devclass_docs/MBIM-Compliance-1.0.pdf """ version = 1 def run_internal(self): """ Run CM_16 test. """ # Precondition desc_sequence = get_descriptors_sequence.GetDescriptorsSequence( self.device_context) descriptors = desc_sequence.run() self.device_context.update_descriptor_cache(descriptors) open_sequence = mbim_open_generic_sequence.MBIMOpenGenericSequence( self.device_context) open_sequence.run(max_control_transfer_size=64) device_context = self.device_context descriptor_cache = device_context.descriptor_cache self.channel = mbim_channel.MBIMChannel( device_context.device, descriptor_cache.mbim_communication_interface.bInterfaceNumber, descriptor_cache.interrupt_endpoint.bEndpointAddress, device_context.max_control_transfer_size) # Step 1 caps_command_message = mbim_command_message.MBIMDeviceCapsQuery() caps_packets = mbim_message_request.generate_request_packets( caps_command_message, device_context.max_control_transfer_size) self.caps_transaction_id = caps_command_message.transaction_id # Step 2 services_command_message = ( mbim_command_message.MBIMDeviceServicesQuery()) services_packets = mbim_message_request.generate_request_packets( services_command_message, device_context.max_control_transfer_size) self.services_transaction_id = services_command_message.transaction_id # Transmit the messages now self.channel.unidirectional_transaction(*caps_packets) self.channel.unidirectional_transaction(*services_packets) # Step 3 utils.poll_for_condition( self._get_response_packets, timeout=5, exception=mbim_errors.MBIMComplianceChannelError( 'Failed to retrieve the response packets to specific ' 'control messages.')) self.channel.close() caps_response_message = self.caps_response services_response_message = self.services_response is_caps_message_valid = isinstance( caps_response_message, mbim_command_message.MBIMDeviceCapsInfo) is_services_message_valid = isinstance( services_response_message, mbim_command_message.MBIMDeviceServicesInfo) if not ((is_caps_message_valid and is_services_message_valid) and (caps_response_message.transaction_id == caps_command_message.transaction_id) and (caps_response_message.device_service_id == caps_command_message.device_service_id) and caps_response_message.cid == caps_command_message.cid and (services_command_message.transaction_id == services_response_message.transaction_id) and (services_command_message.device_service_id == services_response_message.device_service_id) and services_command_message.cid == services_response_message.cid): mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError, 'mbim1.0:9.5#1') def _get_response_packets(self): """ Condition method for |poll_for_condition| to check the retrieval of target packets. @returns True if both caps response packet and services response packet are received, False otherwise. """ try: packets = self.channel.get_outstanding_packets() except mbim_errors.MBIMComplianceChannelError: logging.debug("Error in receiving response fragments from the device") mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError, 'mbim1.0:9.5#1') self.caps_response = None self.services_response = None for packet in packets: try: message_response = mbim_message_response.parse_response_packets( packet) except mbim_errors.MBIMComplianceControlMessageError: logging.debug("Error in parsing response fragments from the device") mbim_errors.log_and_raise( mbim_errors.MBIMComplianceAssertionError, 'mbim1.0:9.5#1') if message_response.transaction_id == self.caps_transaction_id: self.caps_response = message_response elif (message_response.transaction_id == self.services_transaction_id): self.services_response = message_response if self.caps_response and self.services_response: return True return False