• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16"""Utility classes and functions used for testing polling frame notifications
17"""
18
19import time
20from typing import Collection, Optional
21from dataclasses import dataclass
22from .reader import TransceiveConfiguration
23
24
25@dataclass
26class PollingFrameTestCase:
27    """Defines a test case for polling frame tests,
28    containing data and transceive configuration to send the frame with
29    To verify against lists of expected types and data values
30    """
31    configuration: TransceiveConfiguration
32    data: str
33
34    success_types: Collection = ()
35    success_data: Collection = ()
36    warning_data: Collection = ()
37
38    def __init__(
39        self,
40        configuration,
41        data,
42        success_types=(),
43        success_data=(),
44        warning_data=(),
45    ):
46
47        self.configuration = configuration
48        self.data = data
49        if len(success_types) == 0:
50            success_types = (configuration.type,)
51        # If no success data variations were given,
52        # assume only original is allowed
53        if len(success_data) == 0:
54            success_data = (data,)
55        self.success_types = success_types
56        self.success_data = success_data
57        self.warning_data = warning_data
58
59    def format_for_error(self, **kwargs):
60        """Formats testcase value for pretty reporting in errors"""
61        extras = {**kwargs}
62        if self.configuration.type not in {"O", "X"}:
63            extras["crc"] = self.configuration.crc
64            extras["bits"] = self.configuration.bits
65        if self.configuration.bitrate != 106:
66            extras["bitrate"] = self.configuration.bitrate
67        return {"type": self.configuration.type, "data": self.data, **extras}
68
69    @property
70    def expected_data(self):
71        """Returns all data variations that should not cause a test to fail"""
72        return [*self.success_data, *self.warning_data]
73
74    @property
75    def expected_types(self):
76        """Returns all types that should not cause a test to fail"""
77        return self.success_types
78
79
80@dataclass
81class PollingFrame:
82    """Represents PollingFrame object returned from an Android device"""
83    type: str
84    data: bytes = b""
85    timestamp: int = 0
86    triggered_auto_transact: bool = False
87    vendor_specific_gain: int = 0
88
89    @staticmethod
90    def from_dict(json: dict):
91        """Creates a PollingFrame object from dict"""
92        return PollingFrame(
93            type=json.get("type"),
94            data=bytes.fromhex(json.get("data")),
95            timestamp=json.get("timestamp"),
96            triggered_auto_transact=json.get(
97                "triggeredAutoTransact", json.get("triggered_auto_transact")
98            ),
99            vendor_specific_gain=json.get(
100                "vendorSpecificGain", json.get("vendor_specific_gain")
101            ),
102        )
103
104    def to_dict(self):
105        """Dumps PollingFrame object into a dict"""
106        return {
107            "type": self.type,
108            "data": self.data.hex().upper(),
109            "timestamp": self.timestamp,
110            "triggeredAutoTransact": self.triggered_auto_transact,
111            "vendorSpecificGain": self.vendor_specific_gain,
112        }
113
114    def __repr__(self) -> str:
115        return (
116            f"{self.__class__.__name__}"
117            + f"({', '.join(f'{k}={v}' for k, v in self.to_dict().items())})"
118        )
119
120
121_CARRIER = 13.56e6
122_A_TIMEOUT = (1236 + 384) / _CARRIER
123_B_TIMEOUT = 7680 / _CARRIER
124_F_TIMEOUT = 6800 / _CARRIER
125
126
127_GUARD_TIME_A = 0.0051
128_GUARD_TIME_B = 0.0051
129_GUARD_TIME_F = 0.02
130_GUARD_TIME = max(_GUARD_TIME_A, _GUARD_TIME_B, _GUARD_TIME_F)
131GUARD_TIME_PER_TECH = {
132    "O": _GUARD_TIME,
133    "X": _GUARD_TIME,
134    "A": _GUARD_TIME_A,
135    "B": _GUARD_TIME_B,
136    "F": _GUARD_TIME_F,
137}
138
139
140# Placeholder values for ON and OFF events
141_O = TransceiveConfiguration(type="O")
142_X = TransceiveConfiguration(type="X")
143
144# Possible transceive configurations for polling frames
145CONFIGURATION_A_LONG = _A = TransceiveConfiguration(
146    type="A", crc=True, bits=8, timeout=_A_TIMEOUT
147)
148_A_SHORT = TransceiveConfiguration(
149    type="A", crc=False, bits=7, timeout=_A_TIMEOUT
150)
151_A_NOCRC = TransceiveConfiguration(
152    type="A", crc=False, bits=8, timeout=_A_TIMEOUT
153)
154
155CONFIGURATION_B_LONG = _B = TransceiveConfiguration(
156    type="B", crc=True, bits=8, timeout=_B_TIMEOUT
157)
158_B_NOCRC = TransceiveConfiguration(
159    type="B", crc=False, bits=8, timeout=_B_TIMEOUT
160)
161
162_F = TransceiveConfiguration(
163    type="F", crc=True, bits=8, bitrate=212, timeout=_F_TIMEOUT
164)
165_F_424 = TransceiveConfiguration(
166    type="F", crc=True, bits=8, bitrate=424, timeout=_F_TIMEOUT
167)
168
169
170# Possible polling frame configurations
171# 1) Frames with special meaning like wakeup/request:
172#    - WUPA/REQA WUPB/REQB, SENSF_REQ, etc.
173# 2) Special cases:
174#    - 7-bit short frames (Type A only);
175#    - 424 kbps (Type F only)
176# 3) Full frames without CRC (Types A,B only)
177# 4) Full frames with CRC (Types A,B only, F cannot serve as annotation)
178
179# Placeholder test cases for ON/OFF
180POLLING_FRAME_ON = PollingFrameTestCase(_O, "01", ["O"], ["01"])
181POLLING_FRAME_OFF = PollingFrameTestCase(_X, "00", ["X"], ["00"])
182
183# Type A
184# 1)
185POLLING_FRAMES_TYPE_A_SPECIAL = [
186    # * Device MUST recognize all common Type A frames properly
187    # REQA
188    PollingFrameTestCase(_A_SHORT, "26", ["A"], ["26"], ["52"]),
189    # WUPA
190    PollingFrameTestCase(_A_SHORT, "52", ["A"], ["52"], ["26"]),
191    # Some readers send SLP_REQ in the polling loop
192    PollingFrameTestCase(_A, "5000", ["A", "U"], ["5000"]),
193]
194# 2) 7-bit short frames
195POLLING_FRAMES_TYPE_A_SHORT = [
196    # * Device SHOULD detect custom short polling frames properly
197    #   Verify that WUPA/REQA are detected via full byte value
198    #   And that other short frames do not confuse the detection
199    PollingFrameTestCase(_A_SHORT, "20", ["U"]),
200    PollingFrameTestCase(_A_SHORT, "06", ["U"]),
201    PollingFrameTestCase(_A_SHORT, "50", ["U"]),
202    PollingFrameTestCase(_A_SHORT, "02", ["U"]),
203    PollingFrameTestCase(_A_SHORT, "70", ["U"]),
204    PollingFrameTestCase(_A_SHORT, "7a", ["U"]),
205]
206# 3)
207POLLING_FRAMES_TYPE_A_NOCRC = [
208    # * Device SHOULD keep all bytes for frames less than or 2 bytes long
209    PollingFrameTestCase(_A_NOCRC, "aa", ["U"], ["aa"], [""]),
210    PollingFrameTestCase(_A_NOCRC, "55aa", ["U"], ["55aa"], [""]),
211    PollingFrameTestCase(_A_NOCRC, "aa55aa", ["U"], ["aa55aa"], ["aa"]),
212    PollingFrameTestCase(_A_NOCRC, "55aa55aa", ["U"], ["55aa55aa"], ["55aa"]),
213]
214# 4)
215POLLING_FRAMES_TYPE_A_LONG = [
216    # * Device MUST detect custom <= 20 byte long Type A frames with CRC as U
217    PollingFrameTestCase(_A, "02f1", ["U"]),
218    PollingFrameTestCase(_A, "ff00", ["U"]),
219    PollingFrameTestCase(_A, "ff001122", ["U"]),
220    PollingFrameTestCase(_A, "ff00112233445566", ["U"]),
221    PollingFrameTestCase(_A, "ff00112233445566778899aa", ["U"]),
222    PollingFrameTestCase(_A, "000102030405060708090a0b0c0d", ["U"]),
223    PollingFrameTestCase(_A, "101112131415161718191a1b1c1d1e", ["U"]),
224    PollingFrameTestCase(_A, "202122232425262728292a2b2c2d2e2f", ["U"]),
225    PollingFrameTestCase(_A, "303132333435363738393a3b3c3d3e3f30", ["U"]),
226    PollingFrameTestCase(_A, "404142434445464748494a4b4c4d4e4f4041", ["U"]),
227    PollingFrameTestCase(_A, "505152535455565758595a5b5c5d5e5f505152", ["U"]),
228    PollingFrameTestCase(_A, "606162636465666768696a6b6c6d6e6f60616263", ["U"]),
229]
230
231# Type B
232# 1) Verifies that device properly detects all WUPB/REQB variations
233POLLING_FRAMES_TYPE_B_SPECIAL = [
234    # * Device MUST recognize all common Type B frames properly
235    # 1.1) Common cases
236    #   REQB, AFI 0x00, TS 0x00
237    PollingFrameTestCase(_B, "050000", ["B"]),
238    #   WUPB, AFI 0x00, TS 0x00
239    PollingFrameTestCase(_B, "050008", ["B"]),
240    # 1.2) Different AFI values
241    #   REQB, AFI 0x01, TS 0x00; Transit
242    PollingFrameTestCase(_B, "050100", ["B"]),
243    #   WUPB, AFI 0x02, TS 0x00; Financial
244    PollingFrameTestCase(_B, "050208", ["B"]),
245    #   REQB, AFI 0x03, TS 0x00; Identification
246    PollingFrameTestCase(_B, "050300", ["B"]),
247    # 1.3) Different Timeslot counts
248    #   REQB, AFI 0x00, TS 0x01 (2)
249    PollingFrameTestCase(_B, "050001", ["B"]),
250    #   WUPB, AFI 0x00, TS 0x02 (4)
251    PollingFrameTestCase(_B, "05000a", ["B"]),
252    # 1.4) Non-default AFI and Timeslot values
253    #   REQB, AFI 0x01, TS 0x01 (2)
254    PollingFrameTestCase(_B, "050101", ["B"]),
255    #   WUPB, AFI 0x02, TS 0x02 (4)
256    PollingFrameTestCase(_B, "05020a", ["B"]),
257]
258# 3)
259POLLING_FRAMES_TYPE_B_NOCRC = [
260    # * Device SHOULD keep all bytes for frames less than or 2 bytes long
261    #   This allows the use of legacy Type-B proprietary polling commands
262    #   as polling loop annotations
263    PollingFrameTestCase(_B_NOCRC, "aa", ["U"], ["aa"], [""]),
264    PollingFrameTestCase(_B_NOCRC, "55aa", ["U"], ["55aa"], [""]),
265    # * Device SHOULD NOT cut off 2 last bytes for frames shorter than 3 bytes
266    PollingFrameTestCase(_B_NOCRC, "aa55aa", ["U"], ["aa55aa"], ["aa"]),
267    PollingFrameTestCase(_B_NOCRC, "55aa55aa", ["U"], ["55aa55aa"], ["55aa"]),
268    # * Device SHOULD NOT confuse B_NOCRC frames starting with PBF as WUPB/REQB
269    #   Check that lack of CRC, or invalid length is detected as U
270    PollingFrameTestCase(_B_NOCRC, "05000001", ["U"], ["05000001"], ["0500"]),
271    PollingFrameTestCase(_B_NOCRC, "05000801", ["U"], ["05000801"], ["0500"]),
272    PollingFrameTestCase(_B_NOCRC, "050000", ["U"], ["050000"], ["05"]),
273    PollingFrameTestCase(_B_NOCRC, "050008", ["U"], ["050008"], ["05"]),
274]
275# 4)
276POLLING_FRAMES_TYPE_B_LONG = [
277    # * Device MUST detect Type B frames with valid PBf and invalid length as U
278    PollingFrameTestCase(_B, "05000001", ["U"]),
279    PollingFrameTestCase(_B, "05000801", ["U"]),
280    # * Device MUST detect custom <= 20 byte long Type B frames with CRC as U
281    PollingFrameTestCase(_B, "02f1", ["U"]),
282    PollingFrameTestCase(_B, "ff00", ["U"]),
283    PollingFrameTestCase(_B, "ff001122", ["U"]),
284    PollingFrameTestCase(_B, "ff00112233445566", ["U"]),
285    PollingFrameTestCase(_B, "ff00112233445566778899aa", ["U"]),
286    PollingFrameTestCase(_B, "ff00112233445566778899aabbccddee", ["U"]),
287    PollingFrameTestCase(_B, "ff00112233445566778899aabbccddeeff001122", ["U"]),
288]
289
290# Type F
291# 1)
292POLLING_FRAMES_TYPE_F_SPECIAL = [
293    # * Device MUST recognize all common Type F frames properly
294    # 1.0) Common
295    #   SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x00
296    PollingFrameTestCase(_F, "00ffff0000", ["F"]),
297    #   SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x00
298    PollingFrameTestCase(_F, "0000030000", ["F"]),
299    # 1.1) Different request codes
300    #   SENSF_REQ, SC, 0xffff, RC 0x01, TS 0x00
301    PollingFrameTestCase(_F, "00ffff0100", ["F"]),
302    #   SENSF_REQ, SC, 0x0003, RC 0x01, TS 0x00
303    PollingFrameTestCase(_F, "0000030100", ["F"]),
304    # 1.2) Different Timeslot counts
305    #   SENSF_REQ, SC, 0xffff, RC 0x00, TS 0x01 (2)
306    PollingFrameTestCase(_F, "00ffff0001", ["F"]),
307    #   SENSF_REQ, SC, 0x0003, RC 0x00, TS 0x02 (4)
308    PollingFrameTestCase(_F, "0000030002", ["F"]),
309    # 2) 424 kbps
310    #   SENSF_REQ, SC, 0xffff
311    PollingFrameTestCase(_F_424, "00ffff0100", ["F"]),
312    #   SENSF_REQ, SC, 0x0003
313    PollingFrameTestCase(_F_424, "00ffff0100", ["F"]),
314]
315
316POLLING_FRAME_ALL_TEST_CASES = [
317    POLLING_FRAME_ON,
318    *POLLING_FRAMES_TYPE_A_SPECIAL,
319    *POLLING_FRAMES_TYPE_A_SHORT,
320    *POLLING_FRAMES_TYPE_A_NOCRC,
321    *POLLING_FRAMES_TYPE_A_LONG,
322    *POLLING_FRAMES_TYPE_B_SPECIAL,
323    *POLLING_FRAMES_TYPE_B_NOCRC,
324    *POLLING_FRAMES_TYPE_B_LONG,
325    *POLLING_FRAMES_TYPE_F_SPECIAL,
326    POLLING_FRAME_OFF,
327]
328
329
330EXPEDITABLE_POLLING_LOOP_EVENT_TYPES = ["F", "U"]
331
332
333def get_expedited_frames(frames):
334    """Finds and collects all expedited polling frames.
335    Expedited frames belong to F, U types and they get reported
336    to the service while the OS might still be evaluating the loop
337    """
338    expedited_frames = []
339    # Expedited frames come at the beginning
340    for frame in frames:
341        if frame.type not in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES:
342            break
343        expedited_frames.append(frame)
344    return expedited_frames
345
346
347def split_frames_by_timestamp_wrap(frames, pivot_timestamp=None):
348    """Returns two lists of polling frames
349    split based on the timestamp value wrapping over to lower value
350    assuming that frames were provided in the way they arrived
351    """
352    if not frames:
353        return [], []
354    # Take the first timestamp from first frame (or the one provided)
355    # And check that timestamp for all frames that come afterwards is bigger
356    # otherwise consider them wrapped
357    pivot_timestamp = pivot_timestamp or frames[0].timestamp
358    not_wrapped = []
359    for frame in frames:
360        if frame.timestamp < pivot_timestamp:
361            break
362        not_wrapped.append(frame)
363    wrapped = frames[len(not_wrapped) :]
364    return not_wrapped, wrapped
365
366
367def apply_expedited_frame_ordering(frames, limit=3):
368    """Attempts to replicate expedited frame delivery behavior
369    of HostEmulationManager for type F, U events
370    """
371    leave, expedite = [], []
372
373    for frame in frames:
374        if frame.type in EXPEDITABLE_POLLING_LOOP_EVENT_TYPES \
375            and len(expedite) < limit:
376            expedite.append(frame)
377        else:
378            leave.append(frame)
379    return expedite + leave
380
381
382def apply_original_frame_ordering(frames):
383    """Reverts expedited frame ordering caused by HostEmulationManager,
384    useful when having the original polling frame order is preferable in a test
385
386    Call this function ONLY with a list of frames resembling a full polling loop
387    with possible expedited F, U events at the beginning.
388    """
389    if len(frames) == 0:
390        return []
391
392    expedited_frames = get_expedited_frames(frames)
393    # If no expedited frames were found at the beginning, leave
394    if len(expedited_frames) == 0:
395        return frames
396
397    # Original frames come after expedited ones
398    original_frames = frames[len(expedited_frames) :]
399
400    # In between expedited and original frames,
401    # which should be pre-sorted in their category
402    # there might be a timestamp wrap
403    original_not_wrapped, original_wrapped = split_frames_by_timestamp_wrap(
404        original_frames
405    )
406    # Non-expedited, original frame should be the first one in the loop
407    # so we can use the timestamp of the first expedited frame as a pivot
408    expedited_not_wrapped, expedited_wrapped = split_frames_by_timestamp_wrap(
409        expedited_frames,
410        pivot_timestamp=(
411            original_not_wrapped[0].timestamp
412            if len(original_not_wrapped) > 0 else None
413        ),
414    )
415
416    return sorted(
417        original_not_wrapped + expedited_not_wrapped, key=lambda f: f.timestamp
418    ) + sorted(original_wrapped + expedited_wrapped, key=lambda f: f.timestamp)
419
420
421def _test_apply_original_frame_ordering():
422    """Verifies that 'apply_original_frame_ordering' works properly"""
423    testcases = [
424        # Overflow after Normal B
425        (
426            ("O", 4), ("A", 5), ("U", 6), ("B", 7),
427            ("U", 0), ("F", 1), ("U", 2), ("X", 3)
428        ),
429        # Overflow after Expeditable
430        (
431            ("O", 4), ("A", 5), ("U", 6), ("B", 7),
432            ("U", 8), ("F", 0), ("U", 1), ("X", 2)
433        ),
434        # Overflow after Normal O
435        (("O", 4), ("A", 0), ("B", 1), ("F", 2), ("X", 3)),
436        # Overflow after Normal A
437        (("O", 4), ("A", 5), ("B", 0), ("F", 1), ("X", 2)),
438        # Overflow after Expeditable U
439        (("O", 4), ("U", 5), ("A", 0), ("B", 1), ("F", 2), ("X", 3)),
440        # No overflow
441        (("O", 0), ("A", 1), ("B", 2), ("X", 3)),
442        # No overflow
443        (("O", 0), ("A", 1), ("B", 2), ("F", 3), ("X", 4)),
444        # No overflow
445        (("O", 0), ("A", 1), ("U", 2), ("B", 3), ("U", 4), ("F", 5), ("X", 6)),
446    ]
447
448    for testcase in testcases:
449        original_frames = [
450            PollingFrame(type_, b"", timestamp)
451            for (type_, timestamp) in testcase
452        ]
453        # Test for case where none or all frames get expedited
454        for limit in range(len(original_frames)):
455            expedited_frames = apply_expedited_frame_ordering(
456                original_frames, limit=limit
457            )
458            restored_frames = apply_original_frame_ordering(expedited_frames)
459            assert original_frames == restored_frames
460
461
462# This should not raise anything when module is imported
463_test_apply_original_frame_ordering()
464
465
466_FRAME_EVENT_TIMEOUT_SEC = 1
467
468
469def poll_and_observe_frames(
470    pn532,
471    emulator,
472    testcases,
473    *,
474    restore_original_frame_ordering=False,
475    ignore_field_off_event_timeout=False,
476    **kwargs,
477):
478    """Handles broadcasting polling loop events for provided list of test cases.
479    Provided set of test cases MUST contain a complete polling loop, starting
480    with 'O' and ending with 'X' event.
481    """
482
483    assert len(testcases) > 2
484    assert testcases[0].configuration.type == "O"
485    assert testcases[-1].configuration.type == "X"
486
487    off_event_handler = None
488    for idx, testcase in enumerate(testcases):
489        configuration = testcase.configuration
490
491        # On last 'X' Event, create handler
492        if idx == len(testcases) - 1 and configuration.type == "X":
493            off_event_handler = emulator.asyncWaitForPollingFrameOff("XEvent")
494
495        time.sleep(GUARD_TIME_PER_TECH[configuration.type])
496
497        if configuration.type == "O":
498            pn532.unmute()
499        elif configuration.type == "X":
500            pn532.mute()
501        else:
502            if "power_level" in kwargs:
503                configuration = configuration.replace(
504                    power=kwargs["power_level"]
505                )
506            pn532.send_broadcast(
507                data=bytes.fromhex(testcase.data),
508                configuration=configuration
509            )
510        if configuration.type in {"O", "X"}:
511            time.sleep(GUARD_TIME_PER_TECH[configuration.type])
512
513    try:
514        if off_event_handler is not None:
515            off_event_handler.waitAndGet("XEvent", _FRAME_EVENT_TIMEOUT_SEC)
516    except (Exception, ) as e:
517        if not ignore_field_off_event_timeout:
518            emulator.log.warning( f"Timed out waiting for 'X' event due to {e}")
519
520    frames = [PollingFrame.from_dict(f) for f in emulator.getPollingFrames()]
521
522    if restore_original_frame_ordering:
523        # Attempt to revert expedited frame delivery ordering for U and F frames
524        # while keeping timestamp wrapping into account
525        frames = apply_original_frame_ordering(frames)
526
527    return frames
528
529
530def get_frame_test_stats(testcases, frames, timestamps=()):
531    """Creates a dict containing test info for error output"""
532    if len(timestamps) == 0:
533        timestamps = [-1] * len(testcases)
534
535    return  {
536        "frames_sent_count": len(testcases),
537        "frames_received_count": len(frames),
538        "frames_sent": [
539            testcase.format_for_error(timestamp=timestamp)
540            for timestamp, testcase in zip(timestamps, testcases)
541        ],
542        "frames_received": [frame.to_dict() for frame in frames],
543    }
544