1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17"""Generates a Python test case from a snoop log.""" 18 19import json 20import math 21import os 22 23from parse_log import FullApduEntry, NfcType, PollingLoopEntry 24 25INDENT_SIZE = 4 26 27 28def generate_test( 29 log: list[FullApduEntry | PollingLoopEntry], name: str 30) -> str: 31 """Generates a Python test case from a snoop log parsed by the replay tool. 32 33 The generated test will be placed in the current directory. 34 35 Args: 36 log: The parsed snoop log. 37 name: The name of the file containing the snoop log. 38 39 Returns: 40 The name of the JSON file containing APDUs needed to run the test. 41 """ 42 # The name of the test file is based on the name of the snoop log 43 python_local_file = name + "_test.py" 44 file_path = ( 45 os.path.dirname(os.path.realpath(__file__)) + "/" + python_local_file 46 ) 47 48 try: 49 file = open(file_path, "wt") 50 except Exception as e: 51 raise RuntimeError( 52 "Error occurred while opening file: {}".format(file_path) 53 ) from e 54 file.write(create_imports()) 55 file.write(create_polling_loop_methods()) 56 file.write(create_apdu_exchange_method()) 57 file.write(create_setup()) 58 file.write(create_test_opening(name)) 59 60 last_timestamp = log[0].ts 61 json_list = [] 62 for entry in log: 63 if isinstance(entry, PollingLoopEntry): 64 file.write(create_polling_loop_test(entry, last_timestamp)) 65 else: # isinstance(entry, FullApduEntry): 66 file.write(create_apdu_test(entry, last_timestamp)) 67 json_list.append(create_apdu_dict(entry)) 68 last_timestamp = entry.ts 69 70 json_dump = json.dumps(json_list) 71 apdu_local_file = name + "_apdus.json" 72 apdu_file_path = ( 73 os.path.dirname(os.path.realpath(__file__)) + "/" + apdu_local_file 74 ) 75 apdu_file = open(apdu_file_path, "wt") 76 apdu_file.write(json_dump) 77 78 file.write(create_teardown_test()) 79 file.write(create_main_function()) 80 81 print() 82 print( 83 "Test generated at {}. To run the test, copy the test file to" 84 " packages/modules/Nfc/NfcNci/tests/testcases/multidevices/.".format(file_path) 85 ) 86 update_android_bp(python_local_file, name) 87 88 return apdu_local_file 89 90 91def update_android_bp(local_file_path, test_name): 92 """Creates a new python_test_host entry in Android.bp for the generated test.""" 93 try: 94 android_bp = open("Android.bp", "a") 95 except Exception as e: 96 raise RuntimeError("Error occurred while opening Android.bp") from e 97 98 s = create_line() 99 s += create_line() 100 s += create_line("python_test_host {") 101 s += create_line('name: "{}",'.format(test_name), indent=1) 102 s += create_line('main: "{}",'.format(local_file_path), indent=1) 103 s += create_line('srcs: ["{}"],'.format(local_file_path), indent=1) 104 s += create_line('test_config: "AndroidTest.xml",', indent=1) 105 s += create_line('device_common_data: [', indent=1) 106 s += create_line('":NfcEmulatorApduApp",', indent=2) 107 s += create_line('"config.yaml",', indent=2) 108 s += create_line('],', indent=1) 109 s += create_line("test_options: {", indent=1) 110 s += create_line("unit_test: false,", indent=2) 111 s += create_line('runner: "mobly",', indent=2) 112 s += create_line("},", indent=1) 113 s += create_line('defaults: ["GeneratedTestsPythonDefaults"],', indent=1) 114 s += create_line("}") 115 android_bp.write(s) 116 117 118def create_apdu_dict(entry: FullApduEntry): 119 """Creates a dictionary representation of an APDU entry.""" 120 command_arr = [] 121 for cmd in entry.command: 122 command_arr.append(cmd.hex()) 123 response_arr = [] 124 for rsp in entry.response: 125 if isinstance(rsp, str): 126 response_arr.append(rsp) 127 else: 128 response_arr.append(rsp.hex()) 129 apdu_dict = { 130 "commands": command_arr, 131 "responses": response_arr, 132 } 133 return apdu_dict 134 135 136def create_test_opening(name: str): 137 """Creates the opening of the test file.""" 138 s = create_line("def test_{}(self):".format(name), indent=1) 139 s += create_line("# Read in APDU commands and responses from file", indent=2) 140 s += create_line( 141 'file_path_name = self.user_params.get("file_path", "")', indent=2 142 ) 143 s += create_line("apdu_cmds = []", indent=2) 144 s += create_line("apdu_rsps = []", indent=2) 145 s += create_line("if file_path_name:", indent=2) 146 s += create_line('with open(file_path_name, "r") as json_str:', indent=3) 147 s += create_line('self.emulator.nfc_emulator.startMainActivity(json_str.read())', indent=4) 148 s += create_line() 149 s += create_line('with open(file_path_name, "r") as json_data:', indent=3) 150 s += create_line("d = json.load(json_data)", indent=4) 151 s += create_line("for entry in d:", indent=4) 152 s += create_line("apdu_cmds.append(", indent=5) 153 s += create_line( 154 '[bytearray.fromhex(cmd) for cmd in entry["commands"]]', indent=6 155 ) 156 s += create_line(")", indent=5) 157 s += create_line("apdu_rsps.append(", indent=5) 158 s += create_line( 159 '[bytearray.fromhex(rsp) for rsp in entry["responses"]]', indent=6 160 ) 161 s += create_line(")", indent=5) 162 s += create_line() 163 return s 164 165 166def create_polling_loop_test(entry: PollingLoopEntry, last_timestamp: int): 167 """Adds code to send a polling loop from the reader to the emulator. 168 169 The test will check to ensure that the polling loop is successfully received. 170 """ 171 s = create_line( 172 "# Sending {} polling loop".format(entry.type), 173 indent=2, 174 ) 175 176 sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp) 177 s += create_line("time.sleep({})".format(sleep_time), indent=2) 178 179 match entry.type: 180 case NfcType.NFC_A: 181 s += create_line("saw_loop = send_polling_loop_a(self.reader)", indent=2) 182 case NfcType.NFC_B: 183 s += create_line("saw_loop = send_polling_loop_b(self.reader)", indent=2) 184 case _: # NfcType.UNKNOWN 185 s += create_line('custom_data = "{}"'.format(entry.data.hex()), indent=2) 186 s += create_line( 187 "saw_loop = send_custom_polling_loop(self.reader, custom_data)", 188 indent=2, 189 ) 190 s += create_line( 191 'asserts.assert_true(saw_loop, "Did not see polling loop")', indent=2 192 ) 193 s += create_line() 194 return s 195 196 197def create_apdu_test(entry: FullApduEntry, last_timestamp: int): 198 """Adds code to conduct an APDU exchange between the reader and emulator. 199 200 The test will check to ensure that the expected response is received from the 201 emulator. 202 """ 203 s = create_line("# Conducting APDU exchange", indent=2) 204 205 sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp) 206 s += create_line("time.sleep({})".format(sleep_time), indent=2) 207 208 s += create_line("commands = apdu_cmds[0]", indent=2) 209 s += create_line("responses = apdu_rsps[0]", indent=2) 210 s += create_line( 211 "tag_found, transacted = conduct_apdu_exchange(self.reader, commands," 212 " responses)", 213 indent=2, 214 ) 215 s += create_line() 216 s += create_line("asserts.assert_true(", indent=2) 217 s += create_line( 218 'tag_found, "Reader did not detect tag, transaction not attempted."', 219 indent=3, 220 ) 221 s += create_line(")", indent=2) 222 s += create_line("asserts.assert_true(", indent=2) 223 s += create_line("transacted,", indent=3) 224 s += create_line( 225 '"Transaction failed, check device logs for more information."', indent=3 226 ) 227 s += create_line(")", indent=2) 228 s += create_line() 229 s += create_line("apdu_cmds.pop(0)", indent=2) 230 s += create_line("apdu_rsps.pop(0)", indent=2) 231 s += create_line() 232 return s 233 234 235def create_imports(): 236 s = create_line('"""Test generated from the NFC Replay Tool."""') 237 s += create_line() 238 s += create_line("import json") 239 s += create_line("import time") 240 s += create_line("from mobly import asserts") 241 s += create_line("from mobly import base_test") 242 s += create_line("from mobly import test_runner") 243 s += create_line("from mobly.controllers import android_device") 244 s += create_line("import pn532") 245 s += create_line() 246 s += create_line("# Number of polling loops to perform.") 247 s += create_line("_NUM_POLLING_LOOPS = 50") 248 s += create_line() 249 return s 250 251def create_polling_loop_methods(): 252 """Create methods that send polling loops to the reader. 253 254 Specifically, three methods are created: send_polling_loop_a(), 255 send_polling_loop_b(), and send_custom_polling_loop(). 256 """ 257 s = create_line() 258 s += create_line() 259 s += create_line("def send_polling_loop_a(reader: pn532.PN532) -> bool:") 260 s += create_line("saw_loop = False", indent=1) 261 s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) 262 s += create_line("tag = reader.poll_a()", indent=2) 263 s += create_line("if tag is not None:", indent=2) 264 s += create_line("saw_loop = True", indent=3) 265 s += create_line("break", indent=3) 266 s += create_line("reader.mute()", indent=2) 267 s += create_line("return saw_loop", indent=1) 268 s += create_line() 269 s += create_line() 270 s += create_line("def send_polling_loop_b(reader: pn532.PN532) -> bool:") 271 s += create_line("saw_loop = False", indent=1) 272 s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) 273 s += create_line("tag = reader.poll_b()", indent=2) 274 s += create_line("if tag is not None:", indent=2) 275 s += create_line("saw_loop = True", indent=3) 276 s += create_line("break", indent=3) 277 s += create_line("reader.mute()", indent=2) 278 s += create_line("return saw_loop", indent=1) 279 s += create_line() 280 s += create_line() 281 s += create_line( 282 "def send_custom_polling_loop(reader: pn532.PN532, custom_data_hex: str)" 283 " -> bool:" 284 ) 285 s += create_line("saw_loop = False", indent=1) 286 s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) 287 s += create_line("tag = reader.poll_a()", indent=2) 288 s += create_line("if tag is not None:", indent=2) 289 s += create_line( 290 "reader.send_broadcast(bytearray.fromhex(custom_data_hex))", indent=3 291 ) 292 s += create_line("saw_loop = True", indent=3) 293 s += create_line("break", indent=3) 294 s += create_line("reader.poll_b()", indent=2) 295 s += create_line("reader.mute()", indent=2) 296 s += create_line("return saw_loop", indent=1) 297 return s 298 299 300def create_apdu_exchange_method(): 301 """Creates method to conduct an APDU exchange between the emulator and reader.""" 302 s = create_line() 303 s += create_line() 304 s += create_line("def conduct_apdu_exchange(") 305 s += create_line( 306 "reader: pn532.PN532, commands: list[bytearray], responses:" 307 " list[bytearray]", 308 indent=2, 309 ) 310 s += create_line(") -> tuple[pn532.tag.TypeATag | None, bool]:") 311 s += create_line( 312 '"""Conducts an APDU exchange with the PN532 reader."""', indent=1 313 ) 314 s += create_line('for _ in range(_NUM_POLLING_LOOPS):', indent=1) 315 s += create_line('tag = reader.poll_a()', indent=2) 316 s += create_line('if tag is not None:', indent=2) 317 s += create_line('transacted = tag.transact(commands, responses)', indent=3) 318 s += create_line('reader.mute()', indent=3) 319 s += create_line('# edge case: expect no response', indent=3) 320 s += create_line('if not responses or responses[0] == bytearray.fromhex(""):', indent=3) 321 s += create_line('return tag, True', indent=4) 322 s += create_line('return tag, transacted', indent=3) 323 s += create_line('reader.mute()', indent=2) 324 s += create_line('return None, False', indent=1) 325 return s 326 327 328def create_setup(): 329 """Creates methods to prepare the PN532 reader and emulator before the test. 330 331 This involves checking to ensure that the raeder and emulator are both 332 present, and enabling NFC on the emulator. 333 334 Args: 335 name: The name of the original snoop log file. 336 """ 337 s = create_line() 338 s += create_line() 339 s += create_line( 340 "class GeneratedMultiDeviceTestCases(base_test.BaseTestClass):" 341 ) 342 s += create_line() 343 s += create_line("def setup_class(self):", indent=1) 344 s += create_line( 345 "self.emulator = self.register_controller(android_device)[0]", indent=2 346 ) 347 s += create_line('self.emulator.debug_tag = "emulator"', indent=2) 348 s += create_line('if (hasattr(self.emulator, "dimensions") and "pn532_serial_path" in self.emulator.dimensions):', indent=2) 349 s += create_line('pn532_serial_path = self.emulator.dimensions["pn532_serial_path"]', indent=3) 350 s += create_line('else:', indent=2) 351 s += create_line( 352 'pn532_serial_path = self.user_params.get("pn532_serial_path", "")', 353 indent=3, 354 ) 355 s += create_line('self.emulator.load_snippet("nfc_emulator", "com.android.nfc.emulatorapp")', indent=2) 356 s += create_line( 357 'self.emulator.adb.shell(["svc", "nfc", "disable"])', indent=2 358 ) 359 s += create_line( 360 'self.emulator.adb.shell(["svc", "nfc", "enable"])', indent=2 361 ) 362 s += create_line("self.reader = pn532.PN532(pn532_serial_path)", indent=2) 363 s += create_line("self.reader.mute()", indent=2) 364 s += create_line() 365 return s 366 367 368def create_teardown_test(): 369 s = create_line("def teardown_test(self):", indent=1) 370 s += create_line("self.reader.mute()", indent=2) 371 return s 372 373 374def create_main_function(): 375 s = create_line() 376 s += create_line('if __name__ == "__main__":') 377 s += create_line("test_runner.main()", indent=1) 378 s += create_line() 379 return s 380 381 382def create_line(s: str = "", indent: int = 0): 383 return "{}{}\n".format(create_indent(indent), s) 384 385 386def create_indent(multiplier: int): 387 return " " * multiplier * INDENT_SIZE 388 389 390def calculate_time_to_sleep(current_ts: int, last_ts: int) -> int: 391 num_seconds = math.ceil((current_ts - last_ts) / 1000000) 392 if num_seconds < 1: 393 return 1 394 return num_seconds 395