• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unit test for atft manager."""
16import base64
17import unittest
18
19import atftman
20
21from atftman import EncryptionAlgorithm
22from atftman import ProductInfo
23from atftman import ProvisionState
24from atftman import ProvisionStatus
25from fastboot_exceptions import DeviceNotFoundException
26from fastboot_exceptions import FastbootFailure
27from fastboot_exceptions import NoAlgorithmAvailableException
28from fastboot_exceptions import ProductAttributesFileFormatError
29from fastboot_exceptions import ProductNotSpecifiedException
30from mock import call
31from mock import MagicMock
32from mock import patch
33
34files = []
35
36
37class AtftManTest(unittest.TestCase):
38  ATFA_TEST_SERIAL = 'ATFA_TEST_SERIAL'
39  TEST_TMP_FOLDER = '/tmp/TMPTEST/'
40  TEST_SERIAL = 'TEST_SERIAL'
41  TEST_SERIAL2 = 'TEST_SERIAL2'
42  TEST_SERIAL3 = 'TEST_SERIAL3'
43  TEST_UUID = 'TEST-UUID'
44  TEST_LOCATION = 'BUS1-PORT1'
45  TEST_LOCATION2 = 'BUS2-PORT1'
46  TEST_LOCATION3 = 'BUS1-PORT2'
47  TEST_ID = '00000000000000000000000000000000'
48  TEST_ID_ARRAY = bytearray.fromhex(TEST_ID)
49  TEST_NAME = 'name'
50  TEST_FILE_NAME = 'filename'
51  TEST_ATTRIBUTE_ARRAY = bytearray(1052)
52  TEST_VBOOT_KEY_ARRAY = bytearray(128)
53  TEST_ATTRIBUTE_STRING = base64.standard_b64encode(TEST_ATTRIBUTE_ARRAY)
54  TEST_VBOOT_KEY_STRING = base64.standard_b64encode(TEST_VBOOT_KEY_ARRAY)
55
56  class FastbootDeviceTemplate(object):
57
58    @staticmethod
59    def ListDevices():
60      pass
61
62    def __init__(self, serial_number):
63      self.serial_number = serial_number
64
65    def Oem(self, oem_command, err_to_out):
66      pass
67
68    def Upload(self, file_path):
69      pass
70
71    def GetVar(self, var):
72      pass
73
74    def Download(self, file_path):
75      pass
76
77    def Disconnect(self):
78      pass
79
80    def GetHostOs(self):
81      return 'Windows'
82
83    def __del__(self):
84      pass
85
86  def MockInit(self, serial_number):
87    mock_instance = MagicMock()
88    mock_instance.serial_number = serial_number
89    mock_instance.GetHostOs = MagicMock()
90    mock_instance.GetHostOs.return_value = 'Windows'
91    return mock_instance
92
93  def setUp(self):
94    self.mock_serial_mapper = MagicMock()
95    self.mock_serial_instance = MagicMock()
96    self.mock_serial_mapper.return_value = self.mock_serial_instance
97    self.mock_serial_instance.get_serial_map.return_value = []
98    self.status_map = {}
99    self.mock_timer_instance = None
100    self.configs = {}
101    self.configs['ATFA_REBOOT_TIMEOUT'] = 30
102    self.configs['DEFAULT_KEY_THRESHOLD'] = 100
103
104  # Test AtftManager.ListDevices
105  class MockInstantTimer(object):
106    def __init__(self, timeout, callback):
107      self.timeout = timeout
108      self.callback = callback
109
110    def start(self):
111      self.callback()
112
113  def MockCreateInstantTimer(self, timeout, callback):
114    return self.MockInstantTimer(timeout, callback)
115
116  @patch('threading.Timer')
117  @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices')
118  def testListDevicesNormal(self, mock_list_devices, mock_create_timer):
119    mock_create_timer.side_effect = self.MockCreateInstantTimer
120    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
121                                       self.mock_serial_mapper, self.configs)
122    atft_manager._GetOs = MagicMock()
123    atft_manager._SetOs = MagicMock()
124    atft_manager._GetOs.return_value = 'Windows'
125    mock_list_devices.return_value = [self.TEST_SERIAL, self.ATFA_TEST_SERIAL]
126    atft_manager.ListDevices()
127    atft_manager.ListDevices()
128    self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL)
129    self.assertEqual(1, len(atft_manager.target_devs))
130    self.assertEqual(atft_manager.target_devs[0].serial_number,
131                     self.TEST_SERIAL)
132    self.assertEqual(None, atft_manager._atfa_dev_setting)
133    self.assertEqual(True, atft_manager._atfa_reboot_lock.acquire(False))
134    atft_manager._SetOs.assert_not_called()
135
136  @patch('threading.Timer')
137  @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices')
138  def testListDevicesNormalSetOs(self, mock_list_devices, mock_create_timer):
139    mock_create_timer.side_effect = self.MockCreateInstantTimer
140    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
141                                       self.mock_serial_mapper, self.configs)
142    atft_manager._GetOs = MagicMock()
143    atft_manager._SetOs = MagicMock()
144    atft_manager._GetOs.return_value = 'Linux'
145    mock_list_devices.return_value = [self.TEST_SERIAL, self.ATFA_TEST_SERIAL]
146    atft_manager.ListDevices()
147    atft_manager.ListDevices()
148    atft_manager._GetOs.assert_called()
149    atft_manager._SetOs.assert_called_once_with(
150        atft_manager.atfa_dev, 'Windows')
151    self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL)
152
153  @patch('threading.Timer')
154  @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices')
155  def testListDevicesATFA(self, mock_list_devices, mock_create_timer):
156    mock_create_timer.side_effect = self.MockCreateInstantTimer
157    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
158                                       self.mock_serial_mapper, self.configs)
159    atft_manager._GetOs = MagicMock()
160    atft_manager._GetOs.return_value = 'Windows'
161    mock_list_devices.return_value = [self.ATFA_TEST_SERIAL]
162    atft_manager.ListDevices()
163    atft_manager.ListDevices()
164    self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL)
165    self.assertEqual(0, len(atft_manager.target_devs))
166
167  @patch('threading.Timer')
168  @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices')
169  def testListDevicesTarget(self, mock_list_devices, mock_create_timer):
170    mock_create_timer.side_effect = self.MockCreateInstantTimer
171    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
172                                       self.mock_serial_mapper, self.configs)
173    atft_manager._GetOs = MagicMock()
174    atft_manager._GetOs.return_value = 'Windows'
175    mock_list_devices.return_value = [self.TEST_SERIAL]
176    atft_manager.ListDevices()
177    atft_manager.ListDevices()
178    self.assertEqual(atft_manager.atfa_dev, None)
179    self.assertEqual(1, len(atft_manager.target_devs))
180    self.assertEqual(atft_manager.target_devs[0].serial_number,
181                     self.TEST_SERIAL)
182
183  @patch('threading.Timer')
184  @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices')
185  def testListDevicesMultipleTargets(self, mock_list_devices, mock_create_timer):
186    mock_create_timer.side_effect = self.MockCreateInstantTimer
187    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
188                                       self.mock_serial_mapper, self.configs)
189    atft_manager._GetOs = MagicMock()
190    atft_manager._GetOs.return_value = 'Windows'
191    mock_list_devices.return_value = [self.TEST_SERIAL, self.TEST_SERIAL2]
192    atft_manager.ListDevices()
193    atft_manager.ListDevices()
194    self.assertEqual(atft_manager.atfa_dev, None)
195    self.assertEqual(2, len(atft_manager.target_devs))
196    self.assertEqual(atft_manager.target_devs[0].serial_number,
197                     self.TEST_SERIAL)
198    self.assertEqual(atft_manager.target_devs[1].serial_number,
199                     self.TEST_SERIAL2)
200
201  @patch('threading.Timer')
202  def testListDevicesChangeNorm(self, mock_create_timer):
203    mock_create_timer.side_effect = self.MockCreateInstantTimer
204    mock_fastboot = MagicMock()
205    mock_fastboot.side_effect = self.MockInit
206    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
207                                       self.configs)
208    atft_manager._GetOs = MagicMock()
209    atft_manager._GetOs.return_value = 'Windows'
210    mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL]
211    atft_manager.ListDevices()
212    atft_manager.ListDevices()
213    mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL]
214    atft_manager.ListDevices()
215    atft_manager.ListDevices()
216    self.assertEqual(atft_manager.atfa_dev, None)
217    self.assertEqual(1, len(atft_manager.target_devs))
218    self.assertEqual(atft_manager.target_devs[0].serial_number,
219                     self.TEST_SERIAL)
220    self.assertEqual(2, mock_fastboot.call_count)
221
222  @patch('threading.Timer')
223  def testListDevicesChangeAdd(self, mock_create_timer):
224    mock_create_timer.side_effect = self.MockCreateInstantTimer
225    mock_fastboot = MagicMock()
226    mock_fastboot.side_effect = self.MockInit
227    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
228                                       self.configs)
229    atft_manager._GetOs = MagicMock()
230    atft_manager._GetOs.return_value = 'Windows'
231    mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL]
232    atft_manager.ListDevices()
233    atft_manager.ListDevices()
234    mock_fastboot.ListDevices.return_value = [
235        self.ATFA_TEST_SERIAL, self.TEST_SERIAL
236    ]
237    atft_manager.ListDevices()
238    atft_manager.ListDevices()
239    self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL)
240    self.assertEqual(1, len(atft_manager.target_devs))
241    self.assertEqual(atft_manager.target_devs[0].serial_number,
242                     self.TEST_SERIAL)
243    self.assertEqual(2, mock_fastboot.call_count)
244
245  @patch('threading.Timer')
246  def testListDevicesChangeAddATFA(self, mock_create_timer):
247    mock_create_timer.side_effect = self.MockCreateInstantTimer
248    mock_fastboot = MagicMock()
249    mock_fastboot.side_effect = self.MockInit
250    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
251                                       self.configs)
252    atft_manager._GetOs = MagicMock()
253    atft_manager._GetOs.return_value = 'Windows'
254    mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL]
255    atft_manager.ListDevices()
256    atft_manager.ListDevices()
257    mock_fastboot.ListDevices.return_value = [
258        self.ATFA_TEST_SERIAL, self.TEST_SERIAL
259    ]
260    atft_manager.ListDevices()
261    atft_manager.ListDevices()
262    self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL)
263    self.assertEqual(1, len(atft_manager.target_devs))
264    self.assertEqual(atft_manager.target_devs[0].serial_number,
265                     self.TEST_SERIAL)
266    self.assertEqual(2, mock_fastboot.call_count)
267
268  @patch('threading.Timer')
269  def testListDevicesChangeCommon(self, mock_create_timer):
270    mock_create_timer.side_effect = self.MockCreateInstantTimer
271    mock_fastboot = MagicMock()
272    mock_fastboot.side_effect = self.MockInit
273    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
274                                       self.configs)
275    atft_manager._GetOs = MagicMock()
276    atft_manager._GetOs.return_value = 'Windows'
277    mock_fastboot.ListDevices.return_value = [
278        self.ATFA_TEST_SERIAL, self.TEST_SERIAL
279    ]
280    atft_manager.ListDevices()
281    atft_manager.ListDevices()
282    mock_fastboot.ListDevices.return_value = [
283        self.TEST_SERIAL2, self.TEST_SERIAL
284    ]
285    atft_manager.ListDevices()
286    atft_manager.ListDevices()
287    self.assertEqual(atft_manager.atfa_dev, None)
288    self.assertEqual(2, len(atft_manager.target_devs))
289    self.assertEqual(atft_manager.target_devs[0].serial_number,
290                     self.TEST_SERIAL)
291    self.assertEqual(atft_manager.target_devs[1].serial_number,
292                     self.TEST_SERIAL2)
293    self.assertEqual(3, mock_fastboot.call_count)
294
295  @patch('threading.Timer')
296  def testListDevicesChangeCommonATFA(self, mock_create_timer):
297    mock_create_timer.side_effect = self.MockCreateInstantTimer
298    mock_fastboot = MagicMock()
299    mock_fastboot.side_effect = self.MockInit
300    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
301                                       self.configs)
302    atft_manager._GetOs = MagicMock()
303    atft_manager._GetOs.return_value = 'Windows'
304    mock_fastboot.ListDevices.return_value = [
305        self.ATFA_TEST_SERIAL, self.TEST_SERIAL
306    ]
307    atft_manager.ListDevices()
308    atft_manager.ListDevices()
309    mock_fastboot.ListDevices.return_value = [
310        self.ATFA_TEST_SERIAL, self.TEST_SERIAL2
311    ]
312    atft_manager.ListDevices()
313    # First refresh, TEST_SERIAL should disappear
314    self.assertEqual(0, len(atft_manager.target_devs))
315    # Second refresh, TEST_SERIAL2 should be added
316    atft_manager.ListDevices()
317    self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL)
318    self.assertEqual(1, len(atft_manager.target_devs))
319    self.assertEqual(atft_manager.target_devs[0].serial_number,
320                     self.TEST_SERIAL2)
321    self.assertEqual(3, mock_fastboot.call_count)
322
323  @patch('threading.Timer')
324  def testListDevicesRemoveATFA(self, mock_create_timer):
325    mock_create_timer.side_effect = self.MockCreateInstantTimer
326    mock_fastboot = MagicMock()
327    mock_fastboot.side_effect = self.MockInit
328    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
329                                       self.configs)
330    atft_manager._GetOs = MagicMock()
331    atft_manager._GetOs.return_value = 'Windows'
332    mock_fastboot.ListDevices.return_value = [
333        self.ATFA_TEST_SERIAL, self.TEST_SERIAL
334    ]
335    atft_manager.ListDevices()
336    atft_manager.ListDevices()
337    mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL]
338    atft_manager.ListDevices()
339    self.assertEqual(atft_manager.atfa_dev, None)
340    self.assertEqual(1, len(atft_manager.target_devs))
341    self.assertEqual(atft_manager.target_devs[0].serial_number,
342                     self.TEST_SERIAL)
343
344  @patch('threading.Timer')
345  def testListDevicesRemoveDevice(self, mock_create_timer):
346    mock_create_timer.side_effect = self.MockCreateInstantTimer
347    mock_fastboot = MagicMock()
348    mock_fastboot.side_effect = self.MockInit
349    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
350                                       self.configs)
351    atft_manager._GetOs = MagicMock()
352    atft_manager._GetOs.return_value = 'Windows'
353    mock_fastboot.ListDevices.return_value = [
354        self.ATFA_TEST_SERIAL, self.TEST_SERIAL
355    ]
356    atft_manager.ListDevices()
357    atft_manager.ListDevices()
358    mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL]
359    atft_manager.ListDevices()
360    self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL)
361    self.assertEqual(0, len(atft_manager.target_devs))
362
363  @patch('threading.Timer')
364  def testListDevicesPendingRemove(self, mock_create_timer):
365    mock_create_timer.side_effect = self.MockCreateInstantTimer
366    mock_fastboot = MagicMock()
367    mock_fastboot.side_effect = self.MockInit
368    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
369                                       self.configs)
370    atft_manager._GetOs = MagicMock()
371    atft_manager._GetOs.return_value = 'Windows'
372    mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL]
373    atft_manager.ListDevices()
374    # Just appear once, should not be in target device list.
375    self.assertEqual(0, len(atft_manager.target_devs))
376
377  @patch('threading.Timer')
378  def testListDevicesPendingAdd(self, mock_create_timer):
379    mock_create_timer.side_effect = self.MockCreateInstantTimer
380    mock_fastboot = MagicMock()
381    mock_fastboot.side_effect = self.MockInit
382    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
383                                       self.configs)
384    atft_manager._GetOs = MagicMock()
385    atft_manager._GetOs.return_value = 'Windows'
386    mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL]
387    atft_manager.ListDevices()
388    self.assertEqual(0, len(atft_manager.target_devs))
389    mock_fastboot.ListDevices.return_value = [
390        self.TEST_SERIAL, self.TEST_SERIAL2
391    ]
392    # TEST_SERIAL appears twice, should be in the list.
393    # TEST_SERIAL2 just appears once, should not be in the list.
394    atft_manager.ListDevices()
395    self.assertEqual(1, len(atft_manager.target_devs))
396
397  @patch('threading.Timer')
398  def testListDevicesPendingTemp(self, mock_create_timer):
399    mock_create_timer.side_effect = self.MockCreateInstantTimer
400    mock_fastboot = MagicMock()
401    mock_fastboot.side_effect = self.MockInit
402    atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper,
403                                       self.configs)
404    atft_manager._GetOs = MagicMock()
405    atft_manager._GetOs.return_value = 'Windows'
406    mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL]
407    atft_manager.ListDevices()
408    self.assertEqual(0, len(atft_manager.target_devs))
409    mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL2]
410    atft_manager.ListDevices()
411    # Nothing appears twice.
412    self.assertEqual(0, len(atft_manager.target_devs))
413
414  def mockSetSerialMapper(self, serial_map):
415    self.serial_map = {}
416    for serial in serial_map:
417      self.serial_map[serial.lower()] = serial_map[serial]
418
419  def mockGetLocation(self, serial):
420    serial_lower = serial.lower()
421    if serial_lower in self.serial_map:
422      return self.serial_map[serial_lower]
423    return None
424
425  @patch('threading.Timer')
426  def testListDevicesLocation(self, mock_create_timer):
427    mock_create_timer.side_effect = self.MockCreateInstantTimer
428    mock_serial_mapper = MagicMock()
429    smap = {
430        self.ATFA_TEST_SERIAL: self.TEST_LOCATION,
431        self.TEST_SERIAL: self.TEST_LOCATION2
432    }
433    mock_serial_instance = MagicMock()
434    mock_serial_mapper.return_value = mock_serial_instance
435    mock_serial_instance.refresh_serial_map.side_effect = (
436        lambda serial_map=smap: self.mockSetSerialMapper(serial_map))
437    mock_serial_instance.get_location.side_effect = self.mockGetLocation
438    mock_fastboot = MagicMock()
439    mock_fastboot.side_effect = self.MockInit
440    atft_manager = atftman.AtftManager(
441      mock_fastboot, mock_serial_mapper, self.configs)
442    atft_manager._GetOs = MagicMock()
443    atft_manager._GetOs.return_value = 'Windows'
444    mock_fastboot.ListDevices.return_value = [
445        self.ATFA_TEST_SERIAL, self.TEST_SERIAL
446    ]
447    atft_manager.ListDevices()
448    atft_manager.ListDevices()
449    self.assertEqual(atft_manager.atfa_dev.location, self.TEST_LOCATION)
450    self.assertEqual(atft_manager.target_devs[0].location, self.TEST_LOCATION2)
451
452  # Test _SortTargetDevices
453  def testSortDevicesDefault(self):
454    mock_serial_mapper = MagicMock()
455    mock_serial_instance = MagicMock()
456    mock_serial_mapper.return_value = mock_serial_instance
457    smap = {
458        self.TEST_SERIAL: self.TEST_LOCATION,
459        self.TEST_SERIAL2: self.TEST_LOCATION2,
460        self.TEST_SERIAL3: self.TEST_LOCATION3
461    }
462    mock_serial_instance.refresh_serial_map.side_effect = (
463        lambda serial_map=smap: self.mockSetSerialMapper(serial_map))
464    mock_serial_instance.get_location.side_effect = self.mockGetLocation
465    mock_fastboot = MagicMock()
466    mock_fastboot.side_effect = self.MockInit
467    atft_manager = atftman.AtftManager(
468        mock_fastboot, mock_serial_mapper, self.configs)
469    mock_fastboot.ListDevices.return_value = [
470        self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3
471    ]
472    atft_manager.ListDevices()
473    atft_manager.ListDevices()
474    self.assertEqual(3, len(atft_manager.target_devs))
475    self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location)
476    self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location)
477    self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location)
478
479  def testSortDevicesLocation(self):
480    mock_serial_mapper = MagicMock()
481    mock_serial_instance = MagicMock()
482    mock_serial_mapper.return_value = mock_serial_instance
483    smap = {
484        self.TEST_SERIAL: self.TEST_LOCATION,
485        self.TEST_SERIAL2: self.TEST_LOCATION2,
486        self.TEST_SERIAL3: self.TEST_LOCATION3
487    }
488    mock_serial_instance.refresh_serial_map.side_effect = (
489        lambda serial_map=smap: self.mockSetSerialMapper(serial_map))
490    mock_serial_instance.get_location.side_effect = self.mockGetLocation
491    mock_fastboot = MagicMock()
492    mock_fastboot.side_effect = self.MockInit
493    atft_manager = atftman.AtftManager(
494        mock_fastboot, mock_serial_mapper, self.configs)
495    mock_fastboot.ListDevices.return_value = [
496        self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3
497    ]
498    atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION)
499    atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION)
500    self.assertEqual(3, len(atft_manager.target_devs))
501    self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location)
502    self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location)
503    self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location)
504
505  def testSortDevicesSerial(self):
506    mock_serial_mapper = MagicMock()
507    mock_serial_instance = MagicMock()
508    mock_serial_mapper.return_value = mock_serial_instance
509    smap = {
510        self.TEST_SERIAL: self.TEST_LOCATION,
511        self.TEST_SERIAL2: self.TEST_LOCATION2,
512        self.TEST_SERIAL3: self.TEST_LOCATION3
513    }
514    mock_serial_instance.refresh_serial_map.side_effect = (
515        lambda serial_map=smap: self.mockSetSerialMapper(serial_map))
516    mock_serial_instance.get_location.side_effect = self.mockGetLocation
517    mock_fastboot = MagicMock()
518    mock_fastboot_instance = MagicMock()
519    mock_fastboot.side_effect = self.MockInit
520    mock_fastboot.return_value = mock_fastboot_instance
521    mock_fastboot_instance.GetVar = MagicMock()
522    atft_manager = atftman.AtftManager(
523        mock_fastboot, mock_serial_mapper, self.configs)
524    mock_fastboot.ListDevices.return_value = [
525        self.TEST_SERIAL2, self.TEST_SERIAL3, self.TEST_SERIAL
526    ]
527    atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL)
528    atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL)
529    self.assertEqual(3, len(atft_manager.target_devs))
530    self.assertEqual(self.TEST_SERIAL,
531                     atft_manager.target_devs[0].serial_number)
532    self.assertEqual(self.TEST_SERIAL2,
533                     atft_manager.target_devs[1].serial_number)
534    self.assertEqual(self.TEST_SERIAL3,
535                     atft_manager.target_devs[2].serial_number)
536
537  # Test AtftManager.TransferContent
538
539  @staticmethod
540  def _AppendFile(file_path):
541    files.append(file_path)
542
543  @staticmethod
544  def _CheckFile(file_path):
545    assert file_path in files
546    return True
547
548  @staticmethod
549  def _RemoveFile(file_path):
550    assert file_path in files
551    files.remove(file_path)
552
553  @patch('os.rmdir')
554  @patch('os.remove')
555  @patch('os.path.exists')
556  @patch('tempfile.mkdtemp')
557  @patch('uuid.uuid1')
558  @patch('__main__.AtftManTest.FastbootDeviceTemplate.Upload')
559  @patch('__main__.AtftManTest.FastbootDeviceTemplate.Download')
560  def testTransferContentNormal(self, mock_download, mock_upload, mock_uuid,
561                                mock_create_folder, mock_exists, mock_remove,
562                                mock_rmdir):
563    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
564                                       self.mock_serial_mapper, self.configs)
565    # upload (to fs): create a temporary file
566    mock_upload.side_effect = AtftManTest._AppendFile
567    # download (from fs): check if the temporary file exists
568    mock_download.side_effect = AtftManTest._CheckFile
569    mock_exists.side_effect = AtftManTest._CheckFile
570    # remove: remove the file
571    mock_remove.side_effect = AtftManTest._RemoveFile
572    mock_rmdir.side_effect = AtftManTest._RemoveFile
573    mock_create_folder.return_value = self.TEST_TMP_FOLDER
574    files.append(self.TEST_TMP_FOLDER)
575    mock_uuid.return_value = self.TEST_UUID
576    tmp_path = self.TEST_TMP_FOLDER + self.TEST_UUID
577    src = self.FastbootDeviceTemplate(self.TEST_SERIAL)
578    dst = self.FastbootDeviceTemplate(self.TEST_SERIAL)
579    atft_manager.TransferContent(src, dst)
580    src.Upload.assert_called_once_with(tmp_path)
581    src.Download.assert_called_once_with(tmp_path)
582    # we should have no temporary file at the end
583    self.assertTrue(not files)
584
585  # Test AtftManager._ChooseAlgorithm
586  def testChooseAlgorithm(self):
587    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
588                                       self.mock_serial_mapper, self.configs)
589    p256 = atftman.EncryptionAlgorithm.ALGORITHM_P256
590    curve = atftman.EncryptionAlgorithm.ALGORITHM_CURVE25519
591    algorithm = atft_manager._ChooseAlgorithm([p256, curve])
592    self.assertEqual(curve, algorithm)
593
594  def testChooseAlgorithmP256(self):
595    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
596                                       self.mock_serial_mapper, self.configs)
597    p256 = atftman.EncryptionAlgorithm.ALGORITHM_P256
598    algorithm = atft_manager._ChooseAlgorithm([p256])
599    self.assertEqual(p256, algorithm)
600
601  def testChooseAlgorithmCurve(self):
602    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
603                                       self.mock_serial_mapper, self.configs)
604    curve = atftman.EncryptionAlgorithm.ALGORITHM_CURVE25519
605    algorithm = atft_manager._ChooseAlgorithm([curve])
606    self.assertEqual(curve, algorithm)
607
608  def testChooseAlgorithmException(self):
609    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
610                                       self.mock_serial_mapper, self.configs)
611    with self.assertRaises(NoAlgorithmAvailableException):
612      atft_manager._ChooseAlgorithm([])
613
614  def testChooseAlgorithmExceptionNoAvailable(self):
615    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
616                                       self.mock_serial_mapper, self.configs)
617    with self.assertRaises(NoAlgorithmAvailableException):
618      atft_manager._ChooseAlgorithm(['abcd'])
619
620  # Test AtftManager._GetAlgorithmList
621  def testGetAlgorithmList(self):
622    mock_target = MagicMock()
623    mock_target.GetVar.return_value = '1:p256,2:curve25519'
624    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
625                                       self.mock_serial_mapper, self.configs)
626    algorithm_list = atft_manager._GetAlgorithmList(mock_target)
627    self.assertEqual(2, len(algorithm_list))
628    self.assertEqual(1, algorithm_list[0])
629    self.assertEqual(2, algorithm_list[1])
630
631  # Test DeviceInfo.__eq__
632  def testDeviceInfoEqual(self):
633    test_device1 = atftman.DeviceInfo(None, self.TEST_SERIAL,
634                                      self.TEST_LOCATION)
635    test_device2 = atftman.DeviceInfo(None, self.TEST_SERIAL2,
636                                      self.TEST_LOCATION2)
637    test_device3 = atftman.DeviceInfo(None, self.TEST_SERIAL,
638                                      self.TEST_LOCATION2)
639    test_device4 = atftman.DeviceInfo(None, self.TEST_SERIAL2,
640                                      self.TEST_LOCATION)
641    test_device5 = atftman.DeviceInfo(None, self.TEST_SERIAL,
642                                      self.TEST_LOCATION)
643    self.assertEqual(test_device1, test_device5)
644    self.assertNotEqual(test_device1, test_device2)
645    self.assertNotEqual(test_device1, test_device3)
646    self.assertNotEqual(test_device1, test_device4)
647
648  # Test DeviceInfo.Copy
649  def testDeviceInfoCopy(self):
650    test_device1 = atftman.DeviceInfo(None, self.TEST_SERIAL,
651                                      self.TEST_LOCATION)
652    test_device2 = atftman.DeviceInfo(None, self.TEST_SERIAL2,
653                                      self.TEST_LOCATION2)
654    test_device3 = test_device1.Copy()
655    self.assertEqual(test_device3, test_device1)
656    self.assertNotEqual(test_device3, test_device2)
657
658  # Test AtfaDeviceManager.CheckStatus
659  def testCheckStatus(self):
660    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
661                                       self.mock_serial_mapper, self.configs)
662    mock_atfa_dev = MagicMock()
663    atft_manager.atfa_dev = mock_atfa_dev
664    test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager)
665    atft_manager.product_info = MagicMock()
666    atft_manager.product_info.product_id = self.TEST_ID
667    mock_atfa_dev.Oem.return_value = 'TEST\n(bootloader) 100\nTEST'
668    test_number = test_atfa_device_manager.CheckStatus()
669    mock_atfa_dev.Oem.assert_called_once_with('num-keys ' + self.TEST_ID, True)
670    self.assertEqual(100, atft_manager.GetATFAKeysLeft())
671
672  def testCheckStatusCRLF(self):
673    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
674                                       self.mock_serial_mapper, self.configs)
675    mock_atfa_dev = MagicMock()
676    atft_manager.atfa_dev = mock_atfa_dev
677    test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager)
678    atft_manager.product_info = MagicMock()
679    atft_manager.product_info.product_id = self.TEST_ID
680    mock_atfa_dev.Oem.return_value = 'TEST\r\n(bootloader) 100\r\nTEST'
681    test_number = test_atfa_device_manager.CheckStatus()
682    mock_atfa_dev.Oem.assert_called_once_with('num-keys ' + self.TEST_ID, True)
683    self.assertEqual(100, atft_manager.GetATFAKeysLeft())
684
685  def testCheckStatusNoProductId(self):
686    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
687                                       self.mock_serial_mapper, self.configs)
688    mock_atfa_dev = MagicMock()
689    atft_manager.atfa_dev = mock_atfa_dev
690    atft_manager.product_info = None
691    test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager)
692    mock_atfa_dev.Oem.return_value = 'TEST\r\n(bootloader) 100\r\nTEST'
693    with self.assertRaises(ProductNotSpecifiedException):
694      test_atfa_device_manager.CheckStatus()
695
696  def testCheckStatusNoATFA(self):
697    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
698                                       self.mock_serial_mapper, self.configs)
699    atft_manager.atfa_dev = None
700    atft_manager.product_info = MagicMock()
701    atft_manager.product_info.product_id = self.TEST_ID
702    test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager)
703    with self.assertRaises(DeviceNotFoundException):
704      test_atfa_device_manager.CheckStatus()
705
706  def testCheckStatusInvalidFormat(self):
707    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
708                                       self.mock_serial_mapper, self.configs)
709    mock_atfa_dev = MagicMock()
710    atft_manager.atfa_dev = mock_atfa_dev
711    test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager)
712    atft_manager.product_info = MagicMock()
713    atft_manager.product_info.product_id = self.TEST_ID
714    mock_atfa_dev.Oem.return_value = 'TEST\nTEST'
715    with self.assertRaises(FastbootFailure):
716      test_atfa_device_manager.CheckStatus()
717
718  def testCheckStatusInvalidNumber(self):
719    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
720                                       self.mock_serial_mapper, self.configs)
721    mock_atfa_dev = MagicMock()
722    atft_manager.atfa_dev = mock_atfa_dev
723    test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager)
724    atft_manager.product_info = MagicMock()
725    atft_manager.product_info.product_id = self.TEST_ID
726    mock_atfa_dev.Oem.return_value = 'TEST\n(bootloader) abcd\nTEST'
727    with self.assertRaises(FastbootFailure):
728      test_atfa_device_manager.CheckStatus()
729
730  # Test AtftManager.CheckProvisionStatus
731  def MockGetVar(self, variable):
732    return self.status_map.get(variable)
733
734  def testCheckProvisionStatus(self):
735    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
736                                       self.mock_serial_mapper, self.configs)
737    # All initial state
738    self.status_map = {}
739    self.status_map['at-vboot-state'] = (
740      '(bootloader) bootloader-locked: 0\n'
741      '(bootloader) bootloader-min-versions: -1,0,3\n'
742      '(bootloader) avb-perm-attr-set: 0\n'
743      '(bootloader) avb-locked: 0\n'
744      '(bootloader) avb-unlock-disabled: 0\n'
745      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
746    self.status_map['at-attest-uuid'] = ''
747    mock_device = MagicMock()
748    mock_device.GetVar.side_effect = self.MockGetVar
749    atft_manager.CheckProvisionStatus(mock_device)
750    self.assertEqual(ProvisionStatus.IDLE, mock_device.provision_status)
751
752    # Attestation key provisioned
753    self.status_map['at-attest-uuid'] = self.TEST_UUID
754    atft_manager.CheckProvisionStatus(mock_device)
755    self.assertEqual(ProvisionStatus.PROVISION_SUCCESS,
756                     mock_device.provision_status)
757
758    # AVB locked
759    self.status_map['at-vboot-state'] = (
760      '(bootloader) bootloader-locked: 0\n'
761      '(bootloader) bootloader-min-versions: -1,0,3\n'
762      '(bootloader) avb-perm-attr-set: 0\n'
763      '(bootloader) avb-locked: 1\n'
764      '(bootloader) avb-unlock-disabled: 0\n'
765      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
766    self.status_map['at-attest-uuid'] = ''
767    atft_manager.CheckProvisionStatus(mock_device)
768    self.assertEqual(ProvisionStatus.LOCKAVB_SUCCESS,
769                     mock_device.provision_status)
770
771    # Permanent attributes fused
772    self.status_map['at-vboot-state'] = (
773      '(bootloader) bootloader-locked: 0\n'
774      '(bootloader) bootloader-min-versions: -1,0,3\n'
775      '(bootloader) avb-perm-attr-set: 1\n'
776      '(bootloader) avb-locked: 0\n'
777      '(bootloader) avb-unlock-disabled: 0\n'
778      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
779    self.status_map['at-attest-uuid'] = ''
780    atft_manager.CheckProvisionStatus(mock_device)
781    self.assertEqual(ProvisionStatus.FUSEATTR_SUCCESS,
782                     mock_device.provision_status)
783
784    # Bootloader locked
785    self.status_map['at-vboot-state'] = (
786      '(bootloader) bootloader-locked: 1\n'
787      '(bootloader) bootloader-min-versions: -1,0,3\n'
788      '(bootloader) avb-perm-attr-set: 0\n'
789      '(bootloader) avb-locked: 0\n'
790      '(bootloader) avb-unlock-disabled: 0\n'
791      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
792    self.status_map['at-attest-uuid'] = ''
793    atft_manager.CheckProvisionStatus(mock_device)
794    self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS,
795                     mock_device.provision_status)
796
797  def testCheckProvisionStatusFormat(self):
798    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
799                                       self.mock_serial_mapper, self.configs)
800    # All initial state
801    self.status_map = {}
802    self.status_map['at-vboot-state'] = (
803      '(bootloader) bootloader-locked=1\n'
804      '(bootloader) bootloader-min-versions=-1,0,3\n'
805      '(bootloader) avb-perm-attr-set=0\n'
806      '(bootloader) avb-locked=0\n'
807      '(bootloader) avb-unlock-disabled=0\n'
808      '(bootloader) avb-min-versions=0:1,1:1,2:1,4097 :2,4098:2\n')
809    self.status_map['at-attest-uuid'] = ''
810    mock_device = MagicMock()
811    mock_device.GetVar.side_effect = self.MockGetVar
812    atft_manager.CheckProvisionStatus(mock_device)
813    self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status)
814    self.assertEqual(True, mock_device.provision_state.bootloader_locked)
815
816    self.status_map['at-vboot-state'] = (
817      '(bootloader) bootloader-locked:1\n'
818      '(bootloader) bootloader-min-versions:-1,0,3\n'
819      '(bootloader) avb-perm-attr-set:0\n'
820      '(bootloader) avb-locked:0\n'
821      '(bootloader) avb-unlock-disabled:0\n'
822      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
823    self.status_map['at-attest-uuid'] = ''
824    mock_device = MagicMock()
825    mock_device.GetVar.side_effect = self.MockGetVar
826    atft_manager.CheckProvisionStatus(mock_device)
827    self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status)
828    self.assertEqual(True, mock_device.provision_state.bootloader_locked)
829
830    self.status_map['at-vboot-state'] = (
831      '(bootloader) bootloader-locked:\t1\n'
832      '(bootloader) bootloader-min-versions:\t-1,0,3\n'
833      '(bootloader) avb-perm-attr-set:\t0\n'
834      '(bootloader) avb-locked:\t0\n'
835      '(bootloader) avb-unlock-disabled:\t0\n'
836      '(bootloader) avb-min-versions:\t0:1,1:1,2:1,4097 :2,4098:2\n')
837    self.status_map['at-attest-uuid'] = ''
838    mock_device = MagicMock()
839    mock_device.GetVar.side_effect = self.MockGetVar
840    atft_manager.CheckProvisionStatus(mock_device)
841    self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status)
842    self.assertEqual(True, mock_device.provision_state.bootloader_locked)
843
844  def testCheckProvisionState(self):
845    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
846                                       self.mock_serial_mapper, self.configs)
847    # All initial state
848    self.status_map = {}
849    self.status_map['at-vboot-state'] = (
850      '(bootloader) bootloader-locked: 0\n'
851      '(bootloader) bootloader-min-versions: -1,0,3\n'
852      '(bootloader) avb-perm-attr-set: 0\n'
853      '(bootloader) avb-locked: 0\n'
854      '(bootloader) avb-unlock-disabled: 0\n'
855      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
856    self.status_map['at-attest-uuid'] = ''
857    mock_device = MagicMock()
858    mock_device.GetVar.side_effect = self.MockGetVar
859    atft_manager.CheckProvisionStatus(mock_device)
860    self.assertEqual(False, mock_device.provision_state.bootloader_locked)
861    self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set)
862    self.assertEqual(False, mock_device.provision_state.avb_locked)
863    self.assertEqual(False, mock_device.provision_state.provisioned)
864
865    # Attestation key provisioned
866    self.status_map['at-attest-uuid'] = self.TEST_UUID
867    atft_manager.CheckProvisionStatus(mock_device)
868    self.assertEqual(False, mock_device.provision_state.bootloader_locked)
869    self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set)
870    self.assertEqual(False, mock_device.provision_state.avb_locked)
871    self.assertEqual(True, mock_device.provision_state.provisioned)
872
873    # AVB locked and attestation key provisioned
874    self.status_map['at-vboot-state'] = (
875      '(bootloader) bootloader-locked: 0\n'
876      '(bootloader) bootloader-min-versions: -1,0,3\n'
877      '(bootloader) avb-perm-attr-set: 0\n'
878      '(bootloader) avb-locked: 1\n'
879      '(bootloader) avb-unlock-disabled: 0\n'
880      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
881    self.status_map['at-attest-uuid'] = self.TEST_UUID
882    atft_manager.CheckProvisionStatus(mock_device)
883    self.assertEqual(False, mock_device.provision_state.bootloader_locked)
884    self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set)
885    self.assertEqual(True, mock_device.provision_state.avb_locked)
886    self.assertEqual(True, mock_device.provision_state.provisioned)
887    self.assertEqual(ProvisionStatus.PROVISION_SUCCESS,
888                     mock_device.provision_status)
889
890    # Permanent attributes fused
891    self.status_map['at-vboot-state'] = (
892      '(bootloader) bootloader-locked: 0\n'
893      '(bootloader) bootloader-min-versions: -1,0,3\n'
894      '(bootloader) avb-perm-attr-set: 1\n'
895      '(bootloader) avb-locked: 0\n'
896      '(bootloader) avb-unlock-disabled: 0\n'
897      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
898    self.status_map['at-attest-uuid'] = ''
899    atft_manager.CheckProvisionStatus(mock_device)
900    self.assertEqual(False, mock_device.provision_state.bootloader_locked)
901    self.assertEqual(True, mock_device.provision_state.avb_perm_attr_set)
902    self.assertEqual(False, mock_device.provision_state.avb_locked)
903    self.assertEqual(False, mock_device.provision_state.provisioned)
904
905    # All status set
906    self.status_map['at-vboot-state'] = (
907      '(bootloader) bootloader-locked: 1\n'
908      '(bootloader) bootloader-min-versions: -1,0,3\n'
909      '(bootloader) avb-perm-attr-set: 1\n'
910      '(bootloader) avb-locked: 1\n'
911      '(bootloader) avb-unlock-disabled: 0\n'
912      '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n')
913    self.status_map['at-attest-uuid'] = self.TEST_UUID
914    atft_manager.CheckProvisionStatus(mock_device)
915    self.assertEqual(True, mock_device.provision_state.bootloader_locked)
916    self.assertEqual(True, mock_device.provision_state.avb_perm_attr_set)
917    self.assertEqual(True, mock_device.provision_state.avb_locked)
918    self.assertEqual(True, mock_device.provision_state.provisioned)
919    self.assertEqual(ProvisionStatus.PROVISION_SUCCESS,
920                     mock_device.provision_status)
921
922  # Test AtftManager.Provision
923  def MockSetProvisionSuccess(self, target):
924    target.provision_status = ProvisionStatus.PROVISION_SUCCESS
925    target.provision_state = ProvisionState()
926    target.provision_state.provisioned = True
927
928  def MockSetProvisionFail(self, target):
929    target.provision_status = ProvisionStatus.PROVISION_FAILED
930    target.provision_state = ProvisionState()
931    target.provision_state.provisioned = False
932
933  def testProvision(self):
934    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
935                                       self.mock_serial_mapper, self.configs)
936    mock_atfa = MagicMock()
937    mock_target = MagicMock()
938    atft_manager._atfa_dev_manager = MagicMock()
939    atft_manager.atfa_dev = mock_atfa
940    atft_manager._GetAlgorithmList = MagicMock()
941    atft_manager._GetAlgorithmList.return_value = [
942        EncryptionAlgorithm.ALGORITHM_CURVE25519
943    ]
944    atft_manager.TransferContent = MagicMock()
945    atft_manager.CheckProvisionStatus = MagicMock()
946    atft_manager.CheckProvisionStatus.side_effect = self.MockSetProvisionSuccess
947
948    atft_manager.Provision(mock_target)
949
950    # Make sure atfa.SetTime is called.
951    atft_manager._atfa_dev_manager.SetTime.assert_called_once()
952    # Transfer content should be ATFA->target, target->ATFA, ATFA->target
953    transfer_content_calls = [
954        call(mock_atfa, mock_target),
955        call(mock_target, mock_atfa),
956        call(mock_atfa, mock_target)
957    ]
958    atft_manager.TransferContent.assert_has_calls(transfer_content_calls)
959    atfa_oem_calls = [
960        call('atfa-start-provisioning ' +
961             str(EncryptionAlgorithm.ALGORITHM_CURVE25519)),
962        call('atfa-finish-provisioning')
963    ]
964    target_oem_calls = [
965        call('at-get-ca-request'),
966        call('at-set-ca-response')
967    ]
968    mock_atfa.Oem.assert_has_calls(atfa_oem_calls)
969    mock_target.Oem.assert_has_calls(target_oem_calls)
970
971  def testProvisionFailed(self):
972    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
973                                       self.mock_serial_mapper, self.configs)
974    mock_atfa = MagicMock()
975    mock_target = MagicMock()
976    atft_manager._atfa_dev_manager = MagicMock()
977    atft_manager.atfa_dev = mock_atfa
978    atft_manager._GetAlgorithmList = MagicMock()
979    atft_manager._GetAlgorithmList.return_value = [
980        EncryptionAlgorithm.ALGORITHM_CURVE25519
981    ]
982    atft_manager.TransferContent = MagicMock()
983    atft_manager.CheckProvisionStatus = MagicMock()
984    atft_manager.CheckProvisionStatus.side_effect = self.MockSetProvisionFail
985    with self.assertRaises(FastbootFailure):
986      atft_manager.Provision(mock_target)
987
988  # Test AtftManager.FuseVbootKey
989  def MockSetFuseVbootSuccess(self, target):
990    target.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS
991    target.provision_state = ProvisionState()
992    target.provision_state.bootloader_locked = True
993
994  def MockSetFuseVbootFail(self, target):
995    target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED
996    target.provision_state = ProvisionState()
997    target.provision_state.bootloader_locked = False
998
999  @patch('os.remove')
1000  @patch('tempfile.NamedTemporaryFile')
1001  def testFuseVbootKey(self, mock_create_temp_file, mock_remove):
1002    mock_file = MagicMock()
1003    mock_create_temp_file.return_value = mock_file
1004    mock_file.name = self.TEST_FILE_NAME
1005
1006    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1007                                       self.mock_serial_mapper, self.configs)
1008    atft_manager.product_info = ProductInfo(
1009        self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY,
1010        self.TEST_VBOOT_KEY_ARRAY)
1011    mock_target = MagicMock()
1012    atft_manager.CheckProvisionStatus = MagicMock()
1013    atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess
1014
1015    atft_manager.FuseVbootKey(mock_target)
1016
1017    mock_file.write.assert_called_once_with(self.TEST_VBOOT_KEY_ARRAY)
1018    mock_target.Download.assert_called_once_with(self.TEST_FILE_NAME)
1019    mock_remove.assert_called_once_with(self.TEST_FILE_NAME)
1020    mock_target.Oem.assert_called_once_with('fuse at-bootloader-vboot-key')
1021
1022  @patch('os.remove')
1023  @patch('tempfile.NamedTemporaryFile')
1024  def testFuseVbootKeyFailed(self, mock_create_temp_file, _):
1025    mock_file = MagicMock()
1026    mock_create_temp_file.return_value = mock_file
1027    mock_file.name = self.TEST_FILE_NAME
1028
1029    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1030                                       self.mock_serial_mapper, self.configs)
1031    atft_manager.product_info = ProductInfo(
1032        self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY,
1033        self.TEST_VBOOT_KEY_ARRAY)
1034    mock_target = MagicMock()
1035    atft_manager.CheckProvisionStatus = MagicMock()
1036    atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootFail
1037    with self.assertRaises(FastbootFailure):
1038      atft_manager.FuseVbootKey(mock_target)
1039
1040  def testFuseVbootKeyNoProduct(self):
1041    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1042                                       self.mock_serial_mapper, self.configs)
1043    mock_target = MagicMock()
1044    atft_manager.product_info = None
1045    with self.assertRaises(ProductNotSpecifiedException):
1046      atft_manager.FuseVbootKey(mock_target)
1047
1048  @patch('os.remove')
1049  @patch('tempfile.NamedTemporaryFile')
1050  def testFuseVbootKeyFastbootFailure(self, mock_create_temp_file, _):
1051    mock_file = MagicMock()
1052    mock_create_temp_file.return_value = mock_file
1053    mock_file.name = self.TEST_FILE_NAME
1054
1055    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1056                                       self.mock_serial_mapper, self.configs)
1057    atft_manager.product_info = ProductInfo(
1058        self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY,
1059        self.TEST_VBOOT_KEY_ARRAY)
1060    mock_target = MagicMock()
1061    atft_manager.CheckProvisionStatus = MagicMock()
1062    mock_target.Oem.side_effect = FastbootFailure('')
1063
1064    with self.assertRaises(FastbootFailure):
1065      atft_manager.FuseVbootKey(mock_target)
1066    self.assertEqual(
1067        ProvisionStatus.FUSEVBOOT_FAILED, mock_target.provision_status)
1068
1069  # Test AtftManager.FusePermAttr
1070  def MockSetFuseAttrSuccess(self, target):
1071    target.provision_status = ProvisionStatus.FUSEATTR_SUCCESS
1072    target.provision_state = ProvisionState()
1073    target.provision_state.avb_perm_attr_set = True
1074
1075  def MockSetFuseAttrFail(self, target):
1076    target.provision_status = ProvisionStatus.FUSEATTR_FAILED
1077    target.provision_state = ProvisionState()
1078    target.provision_state.avb_perm_attr_set = False
1079
1080  @patch('os.remove')
1081  @patch('tempfile.NamedTemporaryFile')
1082  def testFusePermAttr(self, mock_create_temp_file, mock_remove):
1083    mock_file = MagicMock()
1084    mock_create_temp_file.return_value = mock_file
1085    mock_file.name = self.TEST_FILE_NAME
1086
1087    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1088                                       self.mock_serial_mapper, self.configs)
1089    atft_manager.product_info = ProductInfo(
1090        self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY,
1091        self.TEST_VBOOT_KEY_ARRAY)
1092    mock_target = MagicMock()
1093    atft_manager.CheckProvisionStatus = MagicMock()
1094    atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseAttrSuccess
1095
1096    atft_manager.FusePermAttr(mock_target)
1097
1098    mock_file.write.assert_called_once_with(self.TEST_ATTRIBUTE_ARRAY)
1099    mock_target.Download.assert_called_once_with(self.TEST_FILE_NAME)
1100    mock_remove.assert_called_once_with(self.TEST_FILE_NAME)
1101    mock_target.Oem.assert_called_once_with('fuse at-perm-attr')
1102
1103  @patch('os.remove')
1104  @patch('tempfile.NamedTemporaryFile')
1105  def testFusePermAttrFail(self, mock_create_temp_file, _):
1106    mock_file = MagicMock()
1107    mock_create_temp_file.return_value = mock_file
1108    mock_file.name = self.TEST_FILE_NAME
1109
1110    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1111                                       self.mock_serial_mapper, self.configs)
1112    atft_manager.product_info = ProductInfo(
1113        self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY,
1114        self.TEST_VBOOT_KEY_ARRAY)
1115    mock_target = MagicMock()
1116    atft_manager.CheckProvisionStatus = MagicMock()
1117    atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseAttrFail
1118    with self.assertRaises(FastbootFailure):
1119      atft_manager.FusePermAttr(mock_target)
1120
1121  def testFusePermAttrNoProduct(self):
1122    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1123                                       self.mock_serial_mapper, self.configs)
1124    mock_target = MagicMock()
1125    atft_manager.product_info = None
1126    with self.assertRaises(ProductNotSpecifiedException):
1127      atft_manager.FusePermAttr(mock_target)
1128
1129  @patch('os.remove')
1130  @patch('tempfile.NamedTemporaryFile')
1131  def testFusePermAttrFastbootFailure(self, mock_create_temp_file, _):
1132    mock_file = MagicMock()
1133    mock_create_temp_file.return_value = mock_file
1134    mock_file.name = self.TEST_FILE_NAME
1135
1136    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1137                                       self.mock_serial_mapper, self.configs)
1138    atft_manager.product_info = ProductInfo(
1139        self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY,
1140        self.TEST_VBOOT_KEY_ARRAY)
1141    mock_target = MagicMock()
1142    atft_manager.CheckProvisionStatus = MagicMock()
1143    mock_target.Oem.side_effect = FastbootFailure('')
1144
1145    with self.assertRaises(FastbootFailure):
1146      atft_manager.FusePermAttr(mock_target)
1147    self.assertEqual(
1148        ProvisionStatus.FUSEATTR_FAILED, mock_target.provision_status)
1149
1150  # Test AtftManager.LockAvb
1151  def MockSetLockAvbSuccess(self, target):
1152    target.provision_status = ProvisionStatus.LOCKAVB_SUCCESS
1153    target.provision_state = ProvisionState()
1154    target.provision_state.avb_locked = True
1155
1156  def MockSetLockAvbFail(self, target):
1157    target.provision_status = ProvisionStatus.LOCKAVB_FAILED
1158    target.provision_state = ProvisionState()
1159    target.provision_state.avb_locked = False
1160
1161  def testLockAvb(self):
1162    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1163                                       self.mock_serial_mapper, self.configs)
1164    mock_target = MagicMock()
1165    atft_manager.CheckProvisionStatus = MagicMock()
1166    atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbSuccess
1167    atft_manager.LockAvb(mock_target)
1168    mock_target.Oem.assert_called_once_with('at-lock-vboot')
1169    self.assertEqual(
1170        ProvisionStatus.LOCKAVB_SUCCESS, mock_target.provision_status)
1171
1172  def testLockAvbFail(self):
1173    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1174                                       self.mock_serial_mapper, self.configs)
1175    mock_target = MagicMock()
1176    atft_manager.CheckProvisionStatus = MagicMock()
1177    atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbFail
1178    with self.assertRaises(FastbootFailure):
1179      atft_manager.LockAvb(mock_target)
1180
1181  def testLockAvbFastbootFailure(self):
1182    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1183                                       self.mock_serial_mapper, self.configs)
1184    mock_target = MagicMock()
1185    atft_manager.CheckProvisionStatus = MagicMock()
1186    atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbSuccess
1187    mock_target.Oem.side_effect = FastbootFailure('')
1188    with self.assertRaises(FastbootFailure):
1189      atft_manager.LockAvb(mock_target)
1190    self.assertEqual(
1191        ProvisionStatus.LOCKAVB_FAILED, mock_target.provision_status)
1192
1193  # Test AtftManager.Reboot
1194  class MockTimer(object):
1195    def __init__(self, interval, callback):
1196      self.interval = interval
1197      self.callback = callback
1198
1199    def start(self):
1200      pass
1201
1202    def refresh(self):
1203      if self.callback:
1204        self.callback()
1205
1206    def cancel(self):
1207      self.callback = None
1208
1209  def mock_create_timer(self, interval, callback):
1210    self.mock_timer_instance = self.MockTimer(interval, callback)
1211    return self.mock_timer_instance
1212
1213  @patch('threading.Timer')
1214  def testRebootSuccess(self, mock_timer):
1215    self.mock_timer_instance = None
1216    atft_manager = atftman.AtftManager(
1217      self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs)
1218    timeout = 1
1219    atft_manager.stable_serials = [self.TEST_SERIAL]
1220    mock_fastboot = MagicMock()
1221    test_device = atftman.DeviceInfo(
1222        mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION)
1223
1224    atft_manager.target_devs.append(test_device)
1225    mock_success = MagicMock()
1226    mock_fail = MagicMock()
1227    mock_timer.side_effect = self.mock_create_timer
1228
1229    atft_manager.Reboot(test_device, timeout, mock_success, mock_fail)
1230
1231    # During the reboot, the status should be REBOOT_ING.
1232    self.assertEqual(1, len(atft_manager.target_devs))
1233    self.assertEqual(
1234        ProvisionStatus.REBOOT_ING,
1235        atft_manager.target_devs[0].provision_status)
1236    self.assertEqual(
1237        self.TEST_SERIAL, atft_manager.target_devs[0].serial_number)
1238
1239    # After the device reappear, the status should be REBOOT_SUCCESS.
1240    atft_manager.stable_serials = [self.TEST_SERIAL]
1241    atft_manager._HandleRebootCallbacks()
1242    # mock timeout event.
1243    self.mock_timer_instance.refresh()
1244
1245    self.assertEqual(1, len(atft_manager.target_devs))
1246    self.assertEqual(
1247        ProvisionStatus.REBOOT_SUCCESS,
1248        atft_manager.target_devs[0].provision_status)
1249    mock_fastboot.Reboot.assert_called_once()
1250
1251    # Success should be called, fail should not.
1252    mock_success.assert_called_once()
1253    mock_fail.assert_not_called()
1254
1255  @patch('threading.Timer')
1256  def testRebootTimeout(self, mock_timer):
1257    self.mock_timer_instance = None
1258    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1259                                       self.mock_serial_mapper, self.configs)
1260    timeout = 1
1261    atft_manager.stable_serials.append(self.TEST_SERIAL)
1262    mock_fastboot = MagicMock()
1263    test_device = atftman.DeviceInfo(
1264        mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION)
1265    atft_manager.target_devs.append(test_device)
1266    mock_success = MagicMock()
1267    mock_fail = MagicMock()
1268    # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS
1269    atft_manager.CheckProvisionStatus = MagicMock()
1270    atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess
1271    mock_timer.side_effect = self.mock_create_timer
1272
1273    atft_manager.Reboot(test_device, timeout, mock_success, mock_fail)
1274    atft_manager.stable_serials = []
1275
1276    atft_manager._HandleRebootCallbacks()
1277
1278    # mock timeout event.
1279    self.mock_timer_instance.refresh()
1280
1281    self.assertEqual(0, len(atft_manager.target_devs))
1282    mock_success.assert_not_called()
1283    mock_fail.assert_called_once()
1284
1285  @patch('threading.Timer')
1286  def testRebootTimeoutBeforeRefresh(self, mock_timer):
1287    self.mock_timer_instance = None
1288    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1289                                       self.mock_serial_mapper, self.configs)
1290    timeout = 1
1291    atft_manager.stable_serials.append(self.TEST_SERIAL)
1292    mock_fastboot = MagicMock()
1293    test_device = atftman.DeviceInfo(
1294        mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION)
1295    atft_manager.target_devs.append(test_device)
1296    mock_success = MagicMock()
1297    mock_fail = MagicMock()
1298    # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS
1299    atft_manager.CheckProvisionStatus = MagicMock()
1300    atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess
1301    mock_timer.side_effect = self.mock_create_timer
1302
1303    atft_manager.Reboot(test_device, timeout, mock_success, mock_fail)
1304    atft_manager.stable_serials = []
1305     # mock timeout event.
1306    self.mock_timer_instance.refresh()
1307    # mock refresh event.
1308    atft_manager._HandleRebootCallbacks()
1309
1310    self.assertEqual(0, len(atft_manager.target_devs))
1311    mock_success.assert_not_called()
1312    mock_fail.assert_called_once()
1313
1314  @patch('threading.Timer')
1315  def testRebootFailure(self, mock_timer):
1316    self.mock_timer_instance = None
1317    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1318                                       self.mock_serial_mapper, self.configs)
1319    mock_target = MagicMock()
1320    mock_target.serial_number = self.TEST_SERIAL
1321    timeout = 1
1322    atft_manager.stable_serials.append(self.TEST_SERIAL)
1323    test_device = atftman.DeviceInfo(None, self.TEST_SERIAL, self.TEST_LOCATION)
1324    atft_manager.target_devs.append(test_device)
1325    mock_success = MagicMock()
1326    mock_fail = MagicMock()
1327    # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS
1328    atft_manager.CheckProvisionStatus = MagicMock()
1329    atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess
1330    mock_timer.side_effect = self.mock_create_timer
1331    mock_target.Reboot.side_effect = FastbootFailure('')
1332
1333    with self.assertRaises(FastbootFailure):
1334      atft_manager.Reboot(mock_target, timeout, mock_success, mock_fail)
1335
1336    # There should be no timeout timer.
1337    self.assertEqual(None, self.mock_timer_instance)
1338    # mock refresh event.
1339    atft_manager._HandleRebootCallbacks()
1340    mock_success.assert_not_called()
1341    mock_fail.assert_not_called()
1342
1343  # Test AtftManager.ProcessProductAttributesFile
1344  def testProcessProductAttributesFile(self):
1345    test_content = (
1346        '{'
1347        '  "productName": "%s",'
1348        '  "productConsoleId": "%s",'
1349        '  "productPermanentAttribute": "%s",'
1350        '  "bootloaderPublicKey": "%s",'
1351        '  "creationTime": ""'
1352        '}') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING,
1353                self.TEST_VBOOT_KEY_STRING)
1354    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1355                                       self.mock_serial_mapper, self.configs)
1356    atft_manager.ProcessProductAttributesFile(test_content)
1357    self.assertEqual(self.TEST_NAME, atft_manager.product_info.product_name)
1358    self.assertEqual(self.TEST_ID, atft_manager.product_info.product_id)
1359    self.assertEqual(self.TEST_ATTRIBUTE_ARRAY,
1360                     atft_manager.product_info.product_attributes)
1361    self.assertEqual(self.TEST_VBOOT_KEY_ARRAY,
1362                     atft_manager.product_info.vboot_key)
1363
1364  def testProcessProductAttributesFileWrongJSON(self):
1365    test_content = (
1366        '{'
1367        '  "productName": "%s",'
1368        '  "productConsoleId": "%s",'
1369        '  "productPermanentAttribute": "%s",'
1370        '  "bootloaderPublicKey": "%s",'
1371        '  "creationTime": ""'
1372        '') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING,
1373               self.TEST_VBOOT_KEY_STRING)
1374    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1375                                       self.mock_serial_mapper, self.configs)
1376    with self.assertRaises(ProductAttributesFileFormatError):
1377      atft_manager.ProcessProductAttributesFile(test_content)
1378
1379  def testProcessProductAttributesFileMissingField(self):
1380    test_content = (
1381        '{'
1382        '  "productConsoleId": "%s",'
1383        '  "productPermanentAttribute": "%s",'
1384        '  "bootloaderPublicKey": "%s",'
1385        '  "creationTime": ""'
1386        '}') % (self.TEST_ID, self.TEST_ATTRIBUTE_STRING,
1387                self.TEST_VBOOT_KEY_STRING)
1388    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1389                                       self.mock_serial_mapper, self.configs)
1390    with self.assertRaises(ProductAttributesFileFormatError):
1391      atft_manager.ProcessProductAttributesFile(test_content)
1392
1393  def testProcessProductAttributesFileWrongLength(self):
1394    test_content = (
1395        '{'
1396        '  "productName": "%s",'
1397        '  "productConsoleId": "%s",'
1398        '  "productPermanentAttribute": "%s",'
1399        '  "bootloaderPublicKey": "%s",'
1400        '  "creationTime": ""'
1401        '}') % (self.TEST_NAME, self.TEST_ID,
1402                base64.standard_b64encode(bytearray(1053)),
1403                self.TEST_VBOOT_KEY_STRING)
1404    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1405                                       self.mock_serial_mapper, self.configs)
1406    with self.assertRaises(ProductAttributesFileFormatError) as e:
1407      atft_manager.ProcessProductAttributesFile(test_content)
1408
1409  def testProcessProductAttributesFileWrongBase64(self):
1410    test_content = (
1411        '{'
1412        '  "productName": "%s",'
1413        '  "productConsoleId": "%s",'
1414        '  "productPermanentAttribute": "%s",'
1415        '  "bootloaderPublicKey": "%s",'
1416        '  "creationTime": ""'
1417        '}') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING,
1418                '12')
1419    atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate,
1420                                       self.mock_serial_mapper, self.configs)
1421    with self.assertRaises(ProductAttributesFileFormatError):
1422      atft_manager.ProcessProductAttributesFile(test_content)
1423
1424if __name__ == '__main__':
1425  unittest.main()
1426