1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import multiprocessing 7import Queue 8import struct 9import time 10 11import common 12from autotest_lib.client.bin import utils 13from autotest_lib.client.cros.cellular.mbim_compliance \ 14 import mbim_channel_endpoint 15from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors 16 17 18class MBIMChannel(object): 19 """ 20 Provide synchronous access to the modem with MBIM command level interaction. 21 22 This object should simplify your interaction over the MBIM channel as 23 follows: 24 - Use |bidirectional_transaction| to send MBIM packets that are part of a 25 transaction. This function will block until the transaction completes and 26 return the MBIM packets received in response. 27 - |bidirectional_transaction| will filter out packets that do not correspond 28 to your transaction. This way, you don't have to worry about unsolicited 29 notifications and/or stale packets when interacting with the modem. 30 - All filtered out packets can be grabbed using the 31 |get_outstanding_packets| function. Use this function to receive error 32 notifications, status notifications, etc. 33 - Use |unidirectional_transaction| to send MBIM packets for which you don't 34 expect a response. 35 - Use |flush| to clean out all pipes before starting a new transaction. 36 37 Note that "MBIM packets" here really means MBIM fragments. This object does 38 not (de)fragment packets for you. Out of necessity, it does check that 39 received fragments are contiguous and in-order. 40 41 So, this object houses the minimum information necessary about the MBIM 42 fragments to provide you a comfortable synchronous packet level channel. 43 44 """ 45 46 ENDPOINT_JOIN_TIMEOUT_S = 5 47 FRAGMENT_TIMEOUT_S = 3 48 # TODO(pprabhu) Consider allowing each transaction to specify its own 49 # timeout. 50 TRANSACTION_TIMEOUT_S = 5 51 52 MESSAGE_HEADER_FORMAT = '<LLL' 53 FRAGMENT_HEADER_FORMAT = '<LL' 54 MBIM_FRAGMENTED_MESSAGES = [ 55 0x00000003, # MBIM_COMMAND_MSG 56 0x80000003, # MBIM_COMMAND_DONE 57 0x80000007] # MBIM_INDICATE_STATUS 58 59 def __init__(self, 60 device, 61 interface_number, 62 interrupt_endpoint_address, 63 in_buffer_size, 64 process_class=None): 65 """ 66 @param device: Device handle returned by PyUSB for the modem to test. 67 @param interface_number: |bInterfaceNumber| of the MBIM interface. 68 @param interrupt_endpoint_address: |bEndpointAddress| for the usb 69 INTERRUPT IN endpoint for notifications. 70 @param in_buffer_size: The (fixed) buffer size to use for in control 71 transfers. 72 @param process_class: The class to instantiate to create a subprocess. 73 This is used by tests only, to easily mock out the process 74 ceation. 75 76 """ 77 self._stop_request_event = multiprocessing.Event() 78 self._request_queue = multiprocessing.Queue() 79 self._response_queue = multiprocessing.Queue() 80 self._outstanding_packets = [] 81 self._last_response = [] 82 self._stashed_first_fragment = None 83 if process_class is None: 84 process_class = multiprocessing.Process 85 self._endpoint_process = process_class( 86 target=mbim_channel_endpoint.MBIMChannelEndpoint, 87 args=(device, 88 interface_number, 89 interrupt_endpoint_address, 90 in_buffer_size, 91 self._request_queue, 92 self._response_queue, 93 self._stop_request_event)) 94 self._endpoint_process.start() 95 96 97 def __del__(self): 98 """ 99 The destructor. 100 101 Note that it is not guaranteed that |__del__| is called for objects that 102 exist when the interpreter exits. It is recommended to call |close| 103 explicitly. 104 105 """ 106 self.close() 107 108 109 def close(self): 110 """ 111 Cleanly close the MBIMChannel. 112 113 MBIMChannel forks a subprocess to communicate with the USB device. It is 114 recommended that |close| be called explicitly. 115 116 """ 117 if not self._endpoint_process: 118 return 119 120 if self._endpoint_process.is_alive(): 121 self._stop_request_event.set() 122 self._endpoint_process.join(self.ENDPOINT_JOIN_TIMEOUT_S) 123 if self._endpoint_process.is_alive(): 124 self._endpoint_process.terminate() 125 126 self._endpoint_process = None 127 128 129 def bidirectional_transaction(self, *args): 130 """ 131 Execute a synchronous bidirectional transaction. 132 133 @param *args: Fragments of a single MBIM transaction. An MBIM 134 transaction may consist of multiple fragments - each fragment is 135 the payload for a USB control message. It should be an 136 |array.array| object. It is your responsibility (and choice) to 137 keep the fragments in-order, and to send all the fragments. 138 For more details, see "Fragmentation of messages" in the MBIM 139 spec. 140 @returns: A list of fragments in the same order as received that 141 correspond to the given transaction. If we receive less 142 fragments than claimed, we will return what we get. If we 143 receive non-contiguous / out-of-order fragments, we'll complain. 144 @raises: MBIMComplianceChannelError if received fragments are 145 out-of-order or non-contigouos. 146 147 """ 148 self._verify_endpoint_open() 149 if not args: 150 mbim_errors.log_and_raise( 151 mbim_errors.MBIMComplianceChannelError, 152 'No data given to |bidirectional_transaction|.') 153 154 transaction_id, _, _ = self._fragment_metadata(args[0]) 155 for fragment in args: 156 self._request_queue.put_nowait(fragment) 157 return self._get_response_fragments(transaction_id) 158 159 160 def unidirectional_transaction(self, *args): 161 """ 162 Execute a synchronous unidirectional transaction. No return value. 163 164 @param *args: Fragments of a single MBIM transaction. An MBIM 165 transaction may consist of multiple fragments - each fragment is 166 the payload for a USB control message. It should be an 167 |array.array| object. It is your responsibility (and choice) to 168 keep the fragments in-order, and to send all the fragments. 169 For more details, see "Fragmentation of messages" in the MBIM 170 spec. 171 172 """ 173 self._verify_endpoint_open() 174 if not args: 175 mbim_errors.log_and_raise( 176 mbim_errors.MBIMComplianceChannelError, 177 'No data given to |unidirectional_transaction|.') 178 179 for fragment in args: 180 self._request_queue.put_nowait(fragment) 181 182 183 def flush(self): 184 """ 185 Clean out all queues. 186 187 This waits till all outgoing packets have been sent, and then waits some 188 more to give the channel time to settle down. 189 190 @raises: MBIMComplianceChannelError if things don't settle down fast 191 enough. 192 """ 193 self._verify_endpoint_open() 194 num_remaining_fragments = self._request_queue.qsize() 195 try: 196 timeout = self.FRAGMENT_TIMEOUT_S * num_remaining_fragments 197 utils.poll_for_condition(lambda: self._request_queue.empty(), 198 timeout=timeout) 199 except utils.TimeoutError: 200 mbim_errors.log_and_raise( 201 mbim_errors.MBIMComplianceChannelError, 202 'Could not flush request queue.') 203 204 # Now wait for the response queue to settle down. 205 # In the worst case, each request fragment that was remaining at the 206 # time flush was called belonged to a different transaction, and each of 207 # these transactions would serially timeout in |TRANSACTION_TIMEOUT_S|. 208 # To avoid sleeping for long times, we cap this value arbitrarily to 5 209 # transactions. 210 num_remaining_transactions = min(5, num_remaining_fragments) 211 time.sleep(num_remaining_fragments * self.TRANSACTION_TIMEOUT_S) 212 extra_packets = self.get_outstanding_packets() 213 for packet in extra_packets: 214 logging.debug('flush: discarding packet: %s', packet) 215 216 217 def get_outstanding_packets(self): 218 """ 219 Get all received packets that were not part of an explicit transaction. 220 221 @returns: A list of packets. Each packet is a list of fragments, so you 222 perhaps want to do something like: 223 for packet in channel.get_outstanding_packets(): 224 for fragment in packet: 225 # handle fragment. 226 227 """ 228 self._verify_endpoint_open() 229 # Try to get more packets from the response queue. 230 # This can block forever if the modem keeps spewing trash at us. 231 while True: 232 packet = self._get_packet_fragments() 233 if not packet: 234 break 235 self._outstanding_packets.append(packet) 236 237 packets = self._outstanding_packets 238 self._outstanding_packets = [] 239 return packets 240 241 242 def _get_response_fragments(self, transaction_id): 243 """ 244 Get response for the given |transaction_id|. 245 246 @returns: A list of fragments. 247 @raises: MBIMComplianceChannelError if response is not recieved. 248 249 """ 250 def _poll_response(): 251 packet = self._get_packet_fragments() 252 if not packet: 253 return False 254 first_fragment = packet[0] 255 response_id, _, _ = self._fragment_metadata(first_fragment) 256 if response_id == transaction_id: 257 self._last_response = packet 258 return True 259 self._outstanding_packets.append(packet) 260 return False 261 262 try: 263 utils.poll_for_condition( 264 _poll_response, 265 timeout=self.TRANSACTION_TIMEOUT_S) 266 except utils.TimeoutError: 267 mbim_errors.log_and_raise( 268 mbim_errors.MBIMComplianceChannelError, 269 'Did not receive timely reply to transaction %d' % 270 transaction_id) 271 return self._last_response 272 273 274 def _get_packet_fragments(self): 275 """ 276 Get all fragements of the next packet from the modem. 277 278 This function is responsible for putting together fragments of one 279 packet, and checking that fragments are continguous and in-order. 280 281 """ 282 fragments = [] 283 if self._stashed_first_fragment is not None: 284 first_fragment = self._stashed_first_fragment 285 self._stashed_first_fragment = None 286 else: 287 try: 288 first_fragment = self._response_queue.get( 289 True, self.FRAGMENT_TIMEOUT_S) 290 except Queue.Empty: 291 # *Don't fail* Just return nothing. 292 return fragments 293 294 transaction_id, total_fragments, current_fragment = ( 295 self._fragment_metadata(first_fragment)) 296 if current_fragment != 0: 297 mbim_errors.log_and_raise( 298 mbim_errors.MBIMComplianceChannelError, 299 'First fragment reports fragment number %d' % 300 current_fragment) 301 302 fragments.append(first_fragment) 303 304 last_fragment = 0 305 while last_fragment < total_fragments - 1: 306 try: 307 fragment = self._response_queue.get(True, 308 self.FRAGMENT_TIMEOUT_S) 309 except Queue.Empty: 310 # *Don't fail* Just return the fragments we got so far. 311 break 312 313 fragment_id, fragment_total, fragment_current = ( 314 self._fragment_metadata(fragment)) 315 if fragment_id != transaction_id: 316 # *Don't fail* Treat a different transaction id as indicating 317 # that the next packet has already arrived. 318 logging.warning('Recieved only %d out of %d fragments for ' 319 'transaction %d.', 320 last_fragment, 321 total_fragments, 322 transaction_id) 323 self._stashed_first_fragment = fragment 324 break 325 326 if fragment_total != total_fragments: 327 mbim_errors.log_and_raise( 328 mbim_errors.MBIMComplianceChannelError, 329 'Fragment number %d reports incorrect total (%d/%d)' % 330 (last_fragment + 1, fragment_total, total_fragments)) 331 332 if fragment_current != last_fragment + 1: 333 mbim_errors.log_and_raise( 334 mbim_errors.MBIMComplianceChannelError, 335 'Received reordered fragments. Expected %d, got %d' % 336 (last_fragment + 1, fragment_current)) 337 338 last_fragment += 1 339 fragments.append(fragment) 340 341 return fragments 342 343 344 def _fragment_metadata(self, fragment): 345 """ This function houses all the MBIM packet knowledge. """ 346 # All packets have a message header. 347 if len(fragment) < struct.calcsize(self.MESSAGE_HEADER_FORMAT): 348 mbim_errors.log_and_raise( 349 mbim_errors.MBIMComplianceChannelError, 350 'Corrupted fragment |%s| does not have an MBIM header.' % 351 fragment) 352 353 message_type, _, transaction_id = struct.unpack_from( 354 self.MESSAGE_HEADER_FORMAT, 355 fragment) 356 357 if message_type in self.MBIM_FRAGMENTED_MESSAGES: 358 fragment = fragment[struct.calcsize(self.MESSAGE_HEADER_FORMAT):] 359 if len(fragment) < struct.calcsize(self.FRAGMENT_HEADER_FORMAT): 360 mbim_errors.log_and_raise( 361 mbim_errors.MBIMComplianceChannelError, 362 'Corrupted fragment |%s| does not have a fragment ' 363 'header. ' % 364 fragment) 365 366 total_fragments, current_fragment = struct.unpack_from( 367 self.FRAGMENT_HEADER_FORMAT, 368 fragment) 369 else: 370 # For other types, there is only one 'fragment'. 371 total_fragments = 1 372 current_fragment = 0 373 374 return transaction_id, total_fragments, current_fragment 375 376 377 def _verify_endpoint_open(self): 378 if not self._endpoint_process.is_alive(): 379 mbim_errors.log_and_raise( 380 mbim_errors.MBIMComplianceChannelError, 381 'MBIMChannelEndpoint died unexpectedly. ' 382 'The actual exception can be found in log entries from the ' 383 'subprocess.') 384