• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17"""Generates a Python test case from a snoop log."""
18
19import json
20import math
21import os
22
23from parse_log import FullApduEntry, NfcType, PollingLoopEntry
24
25INDENT_SIZE = 4
26
27
28def generate_test(
29    log: list[FullApduEntry | PollingLoopEntry], name: str
30) -> str:
31  """Generates a Python test case from a snoop log parsed by the replay tool.
32
33  The generated test will be placed in the current directory.
34
35  Args:
36    log: The parsed snoop log.
37    name: The name of the file containing the snoop log.
38
39  Returns:
40    The name of the JSON file containing APDUs needed to run the test.
41  """
42  # The name of the test file is based on the name of the snoop log
43  python_local_file = name + "_test.py"
44  file_path = (
45      os.path.dirname(os.path.realpath(__file__)) + "/" + python_local_file
46  )
47
48  try:
49    file = open(file_path, "wt")
50  except Exception as e:
51    raise RuntimeError(
52        "Error occurred while opening file: {}".format(file_path)
53    ) from e
54  file.write(create_imports())
55  file.write(create_polling_loop_methods())
56  file.write(create_apdu_exchange_method())
57  file.write(create_setup())
58  file.write(create_test_opening(name))
59
60  last_timestamp = log[0].ts
61  json_list = []
62  for entry in log:
63    if isinstance(entry, PollingLoopEntry):
64      file.write(create_polling_loop_test(entry, last_timestamp))
65    else:  # isinstance(entry, FullApduEntry):
66      file.write(create_apdu_test(entry, last_timestamp))
67      json_list.append(create_apdu_dict(entry))
68    last_timestamp = entry.ts
69
70  json_dump = json.dumps(json_list)
71  apdu_local_file = name + "_apdus.json"
72  apdu_file_path = (
73      os.path.dirname(os.path.realpath(__file__)) + "/" + apdu_local_file
74  )
75  apdu_file = open(apdu_file_path, "wt")
76  apdu_file.write(json_dump)
77
78  file.write(create_teardown_test())
79  file.write(create_main_function())
80
81  print()
82  print(
83      "Test generated at {}. To run the test, copy the test file to"
84      " packages/modules/Nfc/NfcNci/tests/testcases/multidevices/.".format(file_path)
85  )
86  update_android_bp(python_local_file, name)
87
88  return apdu_local_file
89
90
91def update_android_bp(local_file_path, test_name):
92  """Creates a new python_test_host entry in Android.bp for the generated test."""
93  try:
94    android_bp = open("Android.bp", "a")
95  except Exception as e:
96    raise RuntimeError("Error occurred while opening Android.bp") from e
97
98  s = create_line()
99  s += create_line()
100  s += create_line("python_test_host {")
101  s += create_line('name: "{}",'.format(test_name), indent=1)
102  s += create_line('main: "{}",'.format(local_file_path), indent=1)
103  s += create_line('srcs: ["{}"],'.format(local_file_path), indent=1)
104  s += create_line('test_config: "AndroidTest.xml",', indent=1)
105  s += create_line('device_common_data: [', indent=1)
106  s += create_line('":NfcEmulatorApduApp",', indent=2)
107  s += create_line('"config.yaml",', indent=2)
108  s += create_line('],', indent=1)
109  s += create_line("test_options: {", indent=1)
110  s += create_line("unit_test: false,", indent=2)
111  s += create_line('runner: "mobly",', indent=2)
112  s += create_line("},", indent=1)
113  s += create_line('defaults: ["GeneratedTestsPythonDefaults"],', indent=1)
114  s += create_line("}")
115  android_bp.write(s)
116
117
118def create_apdu_dict(entry: FullApduEntry):
119  """Creates a dictionary representation of an APDU entry."""
120  command_arr = []
121  for cmd in entry.command:
122    command_arr.append(cmd.hex())
123  response_arr = []
124  for rsp in entry.response:
125    if isinstance(rsp, str):
126      response_arr.append(rsp)
127    else:
128      response_arr.append(rsp.hex())
129  apdu_dict = {
130      "commands": command_arr,
131      "responses": response_arr,
132  }
133  return apdu_dict
134
135
136def create_test_opening(name: str):
137  """Creates the opening of the test file."""
138  s = create_line("def test_{}(self):".format(name), indent=1)
139  s += create_line("# Read in APDU commands and responses from file", indent=2)
140  s += create_line(
141      'file_path_name = self.user_params.get("file_path", "")', indent=2
142  )
143  s += create_line("apdu_cmds = []", indent=2)
144  s += create_line("apdu_rsps = []", indent=2)
145  s += create_line("if file_path_name:", indent=2)
146  s += create_line('with open(file_path_name, "r") as json_str:', indent=3)
147  s += create_line('self.emulator.nfc_emulator.startMainActivity(json_str.read())', indent=4)
148  s += create_line()
149  s += create_line('with open(file_path_name, "r") as json_data:', indent=3)
150  s += create_line("d = json.load(json_data)", indent=4)
151  s += create_line("for entry in d:", indent=4)
152  s += create_line("apdu_cmds.append(", indent=5)
153  s += create_line(
154      '[bytearray.fromhex(cmd) for cmd in entry["commands"]]', indent=6
155  )
156  s += create_line(")", indent=5)
157  s += create_line("apdu_rsps.append(", indent=5)
158  s += create_line(
159      '[bytearray.fromhex(rsp) for rsp in entry["responses"]]', indent=6
160  )
161  s += create_line(")", indent=5)
162  s += create_line()
163  return s
164
165
166def create_polling_loop_test(entry: PollingLoopEntry, last_timestamp: int):
167  """Adds code to send a polling loop from the reader to the emulator.
168
169  The test will check to ensure that the polling loop is successfully received.
170  """
171  s = create_line(
172      "# Sending {} polling loop".format(entry.type),
173      indent=2,
174  )
175
176  sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp)
177  s += create_line("time.sleep({})".format(sleep_time), indent=2)
178
179  match entry.type:
180    case NfcType.NFC_A:
181      s += create_line("saw_loop = send_polling_loop_a(self.reader)", indent=2)
182    case NfcType.NFC_B:
183      s += create_line("saw_loop = send_polling_loop_b(self.reader)", indent=2)
184    case _:  # NfcType.UNKNOWN
185      s += create_line('custom_data = "{}"'.format(entry.data.hex()), indent=2)
186      s += create_line(
187          "saw_loop = send_custom_polling_loop(self.reader, custom_data)",
188          indent=2,
189      )
190  s += create_line(
191      'asserts.assert_true(saw_loop, "Did not see polling loop")', indent=2
192  )
193  s += create_line()
194  return s
195
196
197def create_apdu_test(entry: FullApduEntry, last_timestamp: int):
198  """Adds code to conduct an APDU exchange between the reader and emulator.
199
200  The test will check to ensure that the expected response is received from the
201  emulator.
202  """
203  s = create_line("# Conducting APDU exchange", indent=2)
204
205  sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp)
206  s += create_line("time.sleep({})".format(sleep_time), indent=2)
207
208  s += create_line("commands = apdu_cmds[0]", indent=2)
209  s += create_line("responses = apdu_rsps[0]", indent=2)
210  s += create_line(
211      "tag_found, transacted = conduct_apdu_exchange(self.reader, commands,"
212      " responses)",
213      indent=2,
214  )
215  s += create_line()
216  s += create_line("asserts.assert_true(", indent=2)
217  s += create_line(
218      'tag_found, "Reader did not detect tag, transaction not attempted."',
219      indent=3,
220  )
221  s += create_line(")", indent=2)
222  s += create_line("asserts.assert_true(", indent=2)
223  s += create_line("transacted,", indent=3)
224  s += create_line(
225      '"Transaction failed, check device logs for more information."', indent=3
226  )
227  s += create_line(")", indent=2)
228  s += create_line()
229  s += create_line("apdu_cmds.pop(0)", indent=2)
230  s += create_line("apdu_rsps.pop(0)", indent=2)
231  s += create_line()
232  return s
233
234
235def create_imports():
236  s = create_line('"""Test generated from the NFC Replay Tool."""')
237  s += create_line()
238  s += create_line("import json")
239  s += create_line("import time")
240  s += create_line("from mobly import asserts")
241  s += create_line("from mobly import base_test")
242  s += create_line("from mobly import test_runner")
243  s += create_line("from mobly.controllers import android_device")
244  s += create_line("import pn532")
245  s += create_line()
246  s += create_line("# Number of polling loops to perform.")
247  s += create_line("_NUM_POLLING_LOOPS = 50")
248  s += create_line()
249  return s
250
251def create_polling_loop_methods():
252  """Create methods that send polling loops to the reader.
253
254  Specifically, three methods are created: send_polling_loop_a(),
255  send_polling_loop_b(), and send_custom_polling_loop().
256  """
257  s = create_line()
258  s += create_line()
259  s += create_line("def send_polling_loop_a(reader: pn532.PN532) -> bool:")
260  s += create_line("saw_loop = False", indent=1)
261  s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1)
262  s += create_line("tag = reader.poll_a()", indent=2)
263  s += create_line("if tag is not None:", indent=2)
264  s += create_line("saw_loop = True", indent=3)
265  s += create_line("break", indent=3)
266  s += create_line("reader.mute()", indent=2)
267  s += create_line("return saw_loop", indent=1)
268  s += create_line()
269  s += create_line()
270  s += create_line("def send_polling_loop_b(reader: pn532.PN532) -> bool:")
271  s += create_line("saw_loop = False", indent=1)
272  s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1)
273  s += create_line("tag = reader.poll_b()", indent=2)
274  s += create_line("if tag is not None:", indent=2)
275  s += create_line("saw_loop = True", indent=3)
276  s += create_line("break", indent=3)
277  s += create_line("reader.mute()", indent=2)
278  s += create_line("return saw_loop", indent=1)
279  s += create_line()
280  s += create_line()
281  s += create_line(
282      "def send_custom_polling_loop(reader: pn532.PN532, custom_data_hex: str)"
283      " -> bool:"
284  )
285  s += create_line("saw_loop = False", indent=1)
286  s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1)
287  s += create_line("tag = reader.poll_a()", indent=2)
288  s += create_line("if tag is not None:", indent=2)
289  s += create_line(
290      "reader.send_broadcast(bytearray.fromhex(custom_data_hex))", indent=3
291  )
292  s += create_line("saw_loop = True", indent=3)
293  s += create_line("break", indent=3)
294  s += create_line("reader.poll_b()", indent=2)
295  s += create_line("reader.mute()", indent=2)
296  s += create_line("return saw_loop", indent=1)
297  return s
298
299
300def create_apdu_exchange_method():
301  """Creates method to conduct an APDU exchange between the emulator and reader."""
302  s = create_line()
303  s += create_line()
304  s += create_line("def conduct_apdu_exchange(")
305  s += create_line(
306      "reader: pn532.PN532, commands: list[bytearray], responses:"
307      " list[bytearray]",
308      indent=2,
309  )
310  s += create_line(") -> tuple[pn532.tag.TypeATag | None, bool]:")
311  s += create_line(
312      '"""Conducts an APDU exchange with the PN532 reader."""', indent=1
313  )
314  s += create_line('for _ in range(_NUM_POLLING_LOOPS):', indent=1)
315  s += create_line('tag = reader.poll_a()', indent=2)
316  s += create_line('if tag is not None:', indent=2)
317  s += create_line('transacted = tag.transact(commands, responses)', indent=3)
318  s += create_line('reader.mute()', indent=3)
319  s += create_line('# edge case: expect no response', indent=3)
320  s += create_line('if not responses or responses[0] == bytearray.fromhex(""):', indent=3)
321  s += create_line('return tag, True', indent=4)
322  s += create_line('return tag, transacted', indent=3)
323  s += create_line('reader.mute()', indent=2)
324  s += create_line('return None, False', indent=1)
325  return s
326
327
328def create_setup():
329  """Creates methods to prepare the PN532 reader and emulator before the test.
330
331  This involves checking to ensure that the raeder and emulator are both
332  present, and enabling NFC on the emulator.
333
334  Args:
335    name: The name of the original snoop log file.
336  """
337  s = create_line()
338  s += create_line()
339  s += create_line(
340      "class GeneratedMultiDeviceTestCases(base_test.BaseTestClass):"
341  )
342  s += create_line()
343  s += create_line("def setup_class(self):", indent=1)
344  s += create_line(
345      "self.emulator = self.register_controller(android_device)[0]", indent=2
346  )
347  s += create_line('self.emulator.debug_tag = "emulator"', indent=2)
348  s += create_line('if (hasattr(self.emulator, "dimensions") and "pn532_serial_path" in self.emulator.dimensions):', indent=2)
349  s += create_line('pn532_serial_path = self.emulator.dimensions["pn532_serial_path"]', indent=3)
350  s += create_line('else:', indent=2)
351  s += create_line(
352      'pn532_serial_path = self.user_params.get("pn532_serial_path", "")',
353      indent=3,
354  )
355  s += create_line('self.emulator.load_snippet("nfc_emulator", "com.android.nfc.emulatorapp")', indent=2)
356  s += create_line(
357      'self.emulator.adb.shell(["svc", "nfc", "disable"])', indent=2
358  )
359  s += create_line(
360      'self.emulator.adb.shell(["svc", "nfc", "enable"])', indent=2
361  )
362  s += create_line("self.reader = pn532.PN532(pn532_serial_path)", indent=2)
363  s += create_line("self.reader.mute()", indent=2)
364  s += create_line()
365  return s
366
367
368def create_teardown_test():
369  s = create_line("def teardown_test(self):", indent=1)
370  s += create_line("self.reader.mute()", indent=2)
371  return s
372
373
374def create_main_function():
375  s = create_line()
376  s += create_line('if __name__ == "__main__":')
377  s += create_line("test_runner.main()", indent=1)
378  s += create_line()
379  return s
380
381
382def create_line(s: str = "", indent: int = 0):
383  return "{}{}\n".format(create_indent(indent), s)
384
385
386def create_indent(multiplier: int):
387  return " " * multiplier * INDENT_SIZE
388
389
390def calculate_time_to_sleep(current_ts: int, last_ts: int) -> int:
391  num_seconds = math.ceil((current_ts - last_ts) / 1000000)
392  if num_seconds < 1:
393    return 1
394  return num_seconds
395