1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16"""Utility classes and functions used for testing polling frame notifications 17""" 18 19import time 20from typing import Collection, Optional 21from dataclasses import dataclass 22from .reader import TransceiveConfiguration 23 24 25@dataclass 26class PollingFrameTestCase: 27 """Defines a test case for polling frame tests, 28 containing data and transceive configuration to send the frame with 29 To verify against lists of expected types and data values 30 """ 31 configuration: TransceiveConfiguration 32 data: str 33 34 success_types: Collection = () 35 success_data: Collection = () 36 warning_data: Collection = () 37 38 def __init__( 39 self, 40 configuration, 41 data, 42 success_types=(), 43 success_data=(), 44 warning_data=(), 45 ): 46 47 self.configuration = configuration 48 self.data = data 49 if len(success_types) == 0: 50 success_types = (configuration.type,) 51 # If no success data variations were given, 52 # assume only original is allowed 53 if len(success_data) == 0: 54 success_data = (data,) 55 self.success_types = success_types 56 self.success_data = success_data 57 self.warning_data = warning_data 58 59 def format_for_error(self, **kwargs): 60 """Formats testcase value for pretty reporting in errors""" 61 extras = {**kwargs} 62 if self.configuration.type not in {"O", "X"}: 63 extras["crc"] = self.configuration.crc 64 extras["bits"] = self.configuration.bits 65 if self.configuration.bitrate != 106: 66 extras["bitrate"] = self.configuration.bitrate 67 return {"type": self.configuration.type, "data": self.data, **extras} 68 69 @property 70 def expected_data(self): 71 """Returns all data variations that should not cause a test to fail""" 72 return [*self.success_data, *self.warning_data] 73 74 @property 75 def expected_types(self): 76 """Returns all types that should not cause a test to fail""" 77 return self.success_types 78 79 80@dataclass 81class PollingFrame: 82 """Represents PollingFrame object returned from an Android device""" 83 type: str 84 data: bytes = b"" 85 timestamp: int = 0 86 triggered_auto_transact: bool = False 87 vendor_specific_gain: int = 0 88 89 @staticmethod 90 def from_dict(json: dict): 91 """Creates a PollingFrame object from dict""" 92 return PollingFrame( 93 type=json.get("type"), 94 data=bytes.fromhex(json.get("data")), 95 timestamp=json.get("timestamp"), 96 triggered_auto_transact=json.get( 97 "triggeredAutoTransact", json.get("triggered_auto_transact") 98 ), 99 vendor_specific_gain=json.get( 100 "vendorSpecificGain", json.get("vendor_specific_gain") 101 ), 102 ) 103 104 def to_dict(self): 105 """Dumps PollingFrame object into a dict""" 106 return { 107 "type": self.type, 108 "data": self.data.hex().upper(), 109 "timestamp": self.timestamp, 110 "triggeredAutoTransact": self.triggered_auto_transact, 111 "vendorSpecificGain": self.vendor_specific_gain, 112 } 113 114 def __repr__(self) -> str: 115 return ( 116 f"{self.__class__.__name__}" 117 + f"({', '.join(f'{k}={v}' for k, v in self.to_dict().items())})" 118 ) 119 120 121_CARRIER = 13.56e6 122_A_TIMEOUT = (1236 + 384) / _CARRIER 123_B_TIMEOUT = 7680 / _CARRIER 124_F_TIMEOUT = 6800 / _CARRIER 125 126 127_GUARD_TIME_A = 0.0051 128_GUARD_TIME_B = 0.0051 129_GUARD_TIME_F = 0.02 130_GUARD_TIME = max(_GUARD_TIME_A, _GUARD_TIME_B, _GUARD_TIME_F) 131GUARD_TIME_PER_TECH = { 132 "O": _GUARD_TIME, 133 "X": _GUARD_TIME, 134 "A": _GUARD_TIME_A, 135 "B": _GUARD_TIME_B, 136 "F": _GUARD_TIME_F, 137} 138 139 140# Placeholder values for ON and OFF events 141_O = TransceiveConfiguration(type="O") 142_X = TransceiveConfiguration(type="X") 143 144# Possible transceive configurations for polling frames 145CONFIGURATION_A_LONG = _A = TransceiveConfiguration( 146 type="A", crc=True, bits=8, timeout=_A_TIMEOUT 147) 148_A_SHORT = TransceiveConfiguration( 149 type="A", crc=False, bits=7, timeout=_A_TIMEOUT 150) 151_A_NOCRC = TransceiveConfiguration( 152 type="A", crc=False, bits=8, timeout=_A_TIMEOUT 153) 154 155CONFIGURATION_B_LONG = _B = TransceiveConfiguration( 156 type="B", crc=True, bits=8, timeout=_B_TIMEOUT 157) 158_B_NOCRC = TransceiveConfiguration( 159 type="B", crc=False, bits=8, timeout=_B_TIMEOUT 160) 161 162_F = TransceiveConfiguration( 163 type="F", crc=True, bits=8, bitrate=212, timeout=_F_TIMEOUT 164) 165_F_424 = TransceiveConfiguration( 166 type="F", crc=True, bits=8, bitrate=424, timeout=_F_TIMEOUT 167) 168 169 170# Possible polling frame configurations 171# 1) Frames with special meaning like wakeup/request: 172# - WUPA/REQA WUPB/REQB, SENSF_REQ, etc. 173# 2) Special cases: 174# - 7-bit short frames (Type A only); 175# - 424 kbps (Type F only) 176# 3) Full frames without CRC (Types A,B only) 177# 4) Full frames with CRC (Types A,B only, F cannot serve as annotation) 178 179# Placeholder test cases for ON/OFF 180POLLING_FRAME_ON = PollingFrameTestCase(_O, "01", ["O"], ["01"]) 181POLLING_FRAME_OFF = PollingFrameTestCase(_X, "00", ["X"], ["00"]) 182 183# Type A 184# 1) 185POLLING_FRAMES_TYPE_A_SPECIAL = [ 186 # * Device MUST recognize all common Type A frames properly 187 # REQA 188 PollingFrameTestCase(_A_SHORT, "26", ["A"], ["26"], ["52"]), 189 # WUPA 190 PollingFrameTestCase(_A_SHORT, "52", ["A"], ["52"], ["26"]), 191 # Some readers send SLP_REQ in the polling loop 192 PollingFrameTestCase(_A, "5000", ["A", "U"], ["5000"]), 193] 194# 2) 7-bit short frames 195POLLING_FRAMES_TYPE_A_SHORT = [ 196 # * Device SHOULD detect custom short polling frames properly 197 # Verify that WUPA/REQA are detected via full byte value 198 # And that other short frames do not confuse the detection 199 PollingFrameTestCase(_A_SHORT, "20", ["U"]), 200 PollingFrameTestCase(_A_SHORT, "06", ["U"]), 201 PollingFrameTestCase(_A_SHORT, "50", ["U"]), 202 PollingFrameTestCase(_A_SHORT, "02", ["U"]), 203 PollingFrameTestCase(_A_SHORT, "70", ["U"]), 204 PollingFrameTestCase(_A_SHORT, "7a", ["U"]), 205] 206# 3) 207POLLING_FRAMES_TYPE_A_NOCRC = [ 208 # * Device SHOULD keep all bytes for frames less than or 2 bytes long 209 PollingFrameTestCase(_A_NOCRC, "aa", ["U"], ["aa"], [""]), 210 PollingFrameTestCase(_A_NOCRC, "55aa", ["U"], ["55aa"], [""]), 211 PollingFrameTestCase(_A_NOCRC, "aa55aa", ["U"], ["aa55aa"], ["aa"]), 212 PollingFrameTestCase(_A_NOCRC, "55aa55aa", ["U"], ["55aa55aa"], ["55aa"]), 213] 214# 4) 215POLLING_FRAMES_TYPE_A_LONG = [ 216 # * Device MUST detect custom <= 20 byte long Type A frames with CRC as U 217 PollingFrameTestCase(_A, "02f1", ["U"]), 218 PollingFrameTestCase(_A, "ff00", ["U"]), 219 PollingFrameTestCase(_A, "ff001122", ["U"]), 220 PollingFrameTestCase(_A, "ff00112233445566", ["U"]), 221 PollingFrameTestCase(_A, "ff00112233445566778899aa", ["U"]), 222 PollingFrameTestCase(_A, "000102030405060708090a0b0c0d", ["U"]), 223 PollingFrameTestCase(_A, "101112131415161718191a1b1c1d1e", ["U"]), 224 PollingFrameTestCase(_A, "202122232425262728292a2b2c2d2e2f", ["U"]), 225 PollingFrameTestCase(_A, "303132333435363738393a3b3c3d3e3f30", ["U"]), 226 PollingFrameTestCase(_A, "404142434445464748494a4b4c4d4e4f4041", ["U"]), 227 PollingFrameTestCase(_A, "505152535455565758595a5b5c5d5e5f505152", ["U"]), 228 PollingFrameTestCase(_A, "606162636465666768696a6b6c6d6e6f60616263", ["U"]), 229] 230 231# Type B 232# 1) Verifies that device properly detects all WUPB/REQB variations 233POLLING_FRAMES_TYPE_B_SPECIAL = [ 234 # * Device MUST recognize all common Type B frames properly 235 # 1.1) Common cases 236 # REQB, AFI 0x00, TS 0x00 237 PollingFrameTestCase(_B, "050000", ["B"]), 238 # WUPB, AFI 0x00, TS 0x00 239 PollingFrameTestCase(_B, "050008", ["B"]), 240 # 1.2) Different AFI values 241 # REQB, AFI 0x01, TS 0x00; Transit 242 PollingFrameTestCase(_B, "050100", ["B"]), 243 # WUPB, AFI 0x02, TS 0x00; Financial 244 PollingFrameTestCase(_B, "050208", ["B"]), 245 # REQB, AFI 0x03, TS 0x00; Identification 246 PollingFrameTestCase(_B, "050300", ["B"]), 247 # 1.3) Different Timeslot counts 248 # REQB, AFI 0x00, TS 0x01 (2) 249 PollingFrameTestCase(_B, "050001", ["B"]), 250 # WUPB, AFI 0x00, TS 0x02 (4) 251 PollingFrameTestCase(_B, "05000a", ["B"]), 252 # 1.4) Non-default AFI and Timeslot values 253 # REQB, AFI 0x01, TS 0x01 (2) 254 PollingFrameTestCase(_B, "050101", ["B"]), 255 # WUPB, AFI 0x02, TS 0x02 (4) 256 PollingFrameTestCase(_B, "05020a", ["B"]), 257] 258# 3) 259POLLING_FRAMES_TYPE_B_NOCRC = [ 260 # * Device SHOULD keep all bytes for frames less than or 2 bytes long 261 # This allows the use of legacy Type-B proprietary polling commands 262 # as polling loop annotations 263 PollingFrameTestCase(_B_NOCRC, "aa", ["U"], ["aa"], [""]), 264 PollingFrameTestCase(_B_NOCRC, "55aa", ["U"], ["55aa"], [""]), 265 # * Device SHOULD NOT cut off 2 last bytes for frames shorter than 3 bytes 266 PollingFrameTestCase(_B_NOCRC, "aa55aa", ["U"], ["aa55aa"], ["aa"]), 267 PollingFrameTestCase(_B_NOCRC, "55aa55aa", ["U"], ["55aa55aa"], ["55aa"]), 268 # * Device SHOULD NOT confuse B_NOCRC frames starting with PBF as WUPB/REQB 269 # Check that lack of CRC, or invalid length is detected as U 270 PollingFrameTestCase(_B_NOCRC, "05000001", ["U"], ["05000001"], ["0500"]), 271 PollingFrameTestCase(_B_NOCRC, "05000801", ["U"], ["05000801"], ["0500"]), 272 PollingFrameTestCase(_B_NOCRC, "050000", ["U"], ["050000"], ["05"]), 273 PollingFrameTestCase(_B_NOCRC, "050008", ["U"], ["050008"], ["05"]), 274] 275# 4) 276POLLING_FRAMES_TYPE_B_LONG = [ 277 # * Device MUST detect Type B frames with valid PBf and invalid length as U 278 PollingFrameTestCase(_B, "05000001", ["U"]), 279 PollingFrameTestCase(_B, "05000801", ["U"]), 280 # * Device MUST detect custom <= 20 byte long Type B frames with CRC as U 281 PollingFrameTestCase(_B, "02f1", ["U"]), 282 PollingFrameTestCase(_B, "ff00", ["U"]), 283 PollingFrameTestCase(_B, "ff001122", ["U"]), 284 PollingFrameTestCase(_B, "ff00112233445566", ["U"]), 285 PollingFrameTestCase(_B, "ff00112233445566778899aa", ["U"]), 286 PollingFrameTestCase(_B, "ff00112233445566778899aabbccddee", ["U"]), 287 PollingFrameTestCase(_B, "ff00112233445566778899aabbccddeeff001122", ["U"]), 288] 289 290# Type F 291# 1) 292POLLING_FRAMES_TYPE_F_SPECIAL = [ 293 # * Device MUST recognize all common Type F frames properly 294 # 1.0) Common 295 # SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x00 296 PollingFrameTestCase(_F, "00ffff0000", ["F"]), 297 # SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x00 298 PollingFrameTestCase(_F, "0000030000", ["F"]), 299 # 1.1) Different request codes 300 # SENSF_REQ, SC, 0xffff, RC 0x01, TS 0x00 301 PollingFrameTestCase(_F, "00ffff0100", ["F"]), 302 # SENSF_REQ, SC, 0x0003, RC 0x01, TS 0x00 303 PollingFrameTestCase(_F, "0000030100", ["F"]), 304 # 1.2) Different Timeslot counts 305 # SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x01 (2) 306 PollingFrameTestCase(_F, "00ffff0001", ["F"]), 307 # SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x02 (4) 308 PollingFrameTestCase(_F, "0000030002", ["F"]), 309 # 2) 424 kbps 310 # SENSF_REQ, SC, 0xffff 311 PollingFrameTestCase(_F_424, "00ffff0100", ["F"]), 312 # SENSF_REQ, SC, 0x0003 313 PollingFrameTestCase(_F_424, "00ffff0100", ["F"]), 314] 315 316POLLING_FRAME_ALL_TEST_CASES = [ 317 POLLING_FRAME_ON, 318 *POLLING_FRAMES_TYPE_A_SPECIAL, 319 *POLLING_FRAMES_TYPE_A_SHORT, 320 *POLLING_FRAMES_TYPE_A_NOCRC, 321 *POLLING_FRAMES_TYPE_A_LONG, 322 *POLLING_FRAMES_TYPE_B_SPECIAL, 323 *POLLING_FRAMES_TYPE_B_NOCRC, 324 *POLLING_FRAMES_TYPE_B_LONG, 325 *POLLING_FRAMES_TYPE_F_SPECIAL, 326 POLLING_FRAME_OFF, 327] 328 329 330EXPEDITABLE_POLLING_LOOP_EVENT_TYPES = ["F", "U"] 331 332 333def get_expedited_frames(frames): 334 """Finds and collects all expedited polling frames. 335 Expedited frames belong to F, U types and they get reported 336 to the service while the OS might still be evaluating the loop 337 """ 338 expedited_frames = [] 339 # Expedited frames come at the beginning 340 for frame in frames: 341 if frame.type not in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES: 342 break 343 expedited_frames.append(frame) 344 return expedited_frames 345 346 347def split_frames_by_timestamp_wrap(frames, pivot_timestamp=None): 348 """Returns two lists of polling frames 349 split based on the timestamp value wrapping over to lower value 350 assuming that frames were provided in the way they arrived 351 """ 352 if not frames: 353 return [], [] 354 # Take the first timestamp from first frame (or the one provided) 355 # And check that timestamp for all frames that come afterwards is bigger 356 # otherwise consider them wrapped 357 pivot_timestamp = pivot_timestamp or frames[0].timestamp 358 not_wrapped = [] 359 for frame in frames: 360 if frame.timestamp < pivot_timestamp: 361 break 362 not_wrapped.append(frame) 363 wrapped = frames[len(not_wrapped) :] 364 return not_wrapped, wrapped 365 366 367def apply_expedited_frame_ordering(frames, limit=3): 368 """Attempts to replicate expedited frame delivery behavior 369 of HostEmulationManager for type F, U events 370 """ 371 leave, expedite = [], [] 372 373 for frame in frames: 374 if frame.type in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES \ 375 and len(expedite) < limit: 376 expedite.append(frame) 377 else: 378 leave.append(frame) 379 return expedite + leave 380 381 382def apply_original_frame_ordering(frames): 383 """Reverts expedited frame ordering caused by HostEmulationManager, 384 useful when having the original polling frame order is preferable in a test 385 386 Call this function ONLY with a list of frames resembling a full polling loop 387 with possible expedited F, U events at the beginning. 388 """ 389 if len(frames) == 0: 390 return [] 391 392 expedited_frames = get_expedited_frames(frames) 393 # If no expedited frames were found at the beginning, leave 394 if len(expedited_frames) == 0: 395 return frames 396 397 # Original frames come after expedited ones 398 original_frames = frames[len(expedited_frames) :] 399 400 # In between expedited and original frames, 401 # which should be pre-sorted in their category 402 # there might be a timestamp wrap 403 original_not_wrapped, original_wrapped = split_frames_by_timestamp_wrap( 404 original_frames 405 ) 406 # Non-expedited, original frame should be the first one in the loop 407 # so we can use the timestamp of the first expedited frame as a pivot 408 expedited_not_wrapped, expedited_wrapped = split_frames_by_timestamp_wrap( 409 expedited_frames, 410 pivot_timestamp=( 411 original_not_wrapped[0].timestamp 412 if len(original_not_wrapped) > 0 else None 413 ), 414 ) 415 416 return sorted( 417 original_not_wrapped + expedited_not_wrapped, key=lambda f: f.timestamp 418 ) + sorted(original_wrapped + expedited_wrapped, key=lambda f: f.timestamp) 419 420 421def _test_apply_original_frame_ordering(): 422 """Verifies that 'apply_original_frame_ordering' works properly""" 423 testcases = [ 424 # Overflow after Normal B 425 ( 426 ("O", 4), ("A", 5), ("U", 6), ("B", 7), 427 ("U", 0), ("F", 1), ("U", 2), ("X", 3) 428 ), 429 # Overflow after Expeditable 430 ( 431 ("O", 4), ("A", 5), ("U", 6), ("B", 7), 432 ("U", 8), ("F", 0), ("U", 1), ("X", 2) 433 ), 434 # Overflow after Normal O 435 (("O", 4), ("A", 0), ("B", 1), ("F", 2), ("X", 3)), 436 # Overflow after Normal A 437 (("O", 4), ("A", 5), ("B", 0), ("F", 1), ("X", 2)), 438 # Overflow after Expeditable U 439 (("O", 4), ("U", 5), ("A", 0), ("B", 1), ("F", 2), ("X", 3)), 440 # No overflow 441 (("O", 0), ("A", 1), ("B", 2), ("X", 3)), 442 # No overflow 443 (("O", 0), ("A", 1), ("B", 2), ("F", 3), ("X", 4)), 444 # No overflow 445 (("O", 0), ("A", 1), ("U", 2), ("B", 3), ("U", 4), ("F", 5), ("X", 6)), 446 ] 447 448 for testcase in testcases: 449 original_frames = [ 450 PollingFrame(type_, b"", timestamp) 451 for (type_, timestamp) in testcase 452 ] 453 # Test for case where none or all frames get expedited 454 for limit in range(len(original_frames)): 455 expedited_frames = apply_expedited_frame_ordering( 456 original_frames, limit=limit 457 ) 458 restored_frames = apply_original_frame_ordering(expedited_frames) 459 assert original_frames == restored_frames 460 461 462# This should not raise anything when module is imported 463_test_apply_original_frame_ordering() 464 465 466_FRAME_EVENT_TIMEOUT_SEC = 1 467 468 469def poll_and_observe_frames( 470 pn532, 471 emulator, 472 testcases, 473 *, 474 restore_original_frame_ordering=False, 475 ignore_field_off_event_timeout=False, 476 **kwargs, 477): 478 """Handles broadcasting polling loop events for provided list of test cases. 479 Provided set of test cases MUST contain a complete polling loop, starting 480 with 'O' and ending with 'X' event. 481 """ 482 483 assert len(testcases) > 2 484 assert testcases[0].configuration.type == "O" 485 assert testcases[-1].configuration.type == "X" 486 487 off_event_handler = None 488 for idx, testcase in enumerate(testcases): 489 configuration = testcase.configuration 490 491 # On last 'X' Event, create handler 492 if idx == len(testcases) - 1 and configuration.type == "X": 493 off_event_handler = emulator.asyncWaitForPollingFrameOff("XEvent") 494 495 time.sleep(GUARD_TIME_PER_TECH[configuration.type]) 496 497 if configuration.type == "O": 498 pn532.unmute() 499 elif configuration.type == "X": 500 pn532.mute() 501 else: 502 if "power_level" in kwargs: 503 configuration = configuration.replace( 504 power=kwargs["power_level"] 505 ) 506 pn532.send_broadcast( 507 data=bytes.fromhex(testcase.data), 508 configuration=configuration 509 ) 510 if configuration.type in {"O", "X"}: 511 time.sleep(GUARD_TIME_PER_TECH[configuration.type]) 512 513 try: 514 if off_event_handler is not None: 515 off_event_handler.waitAndGet("XEvent", _FRAME_EVENT_TIMEOUT_SEC) 516 except (Exception, ) as e: 517 if not ignore_field_off_event_timeout: 518 emulator.log.warning( f"Timed out waiting for 'X' event due to {e}") 519 520 frames = [PollingFrame.from_dict(f) for f in emulator.getPollingFrames()] 521 522 if restore_original_frame_ordering: 523 # Attempt to revert expedited frame delivery ordering for U and F frames 524 # while keeping timestamp wrapping into account 525 frames = apply_original_frame_ordering(frames) 526 527 return frames 528 529 530def get_frame_test_stats(testcases, frames, timestamps=()): 531 """Creates a dict containing test info for error output""" 532 if len(timestamps) == 0: 533 timestamps = [-1] * len(testcases) 534 535 return { 536 "frames_sent_count": len(testcases), 537 "frames_received_count": len(frames), 538 "frames_sent": [ 539 testcase.format_for_error(timestamp=timestamp) 540 for timestamp, testcase in zip(timestamps, testcases) 541 ], 542 "frames_received": [frame.to_dict() for frame in frames], 543 } 544