1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, multiprocessing, os, time 6import numpy 7from autotest_lib.client.bin import test 8from autotest_lib.client.cros.camera import camera_utils 9from autotest_lib.client.cros.power import sys_power 10from autotest_lib.client.cros.video import device_capability 11from autotest_lib.client.common_lib import error 12 13try: 14 # HACK: We need to succeed if OpenCV is missing to allow "emerge 15 # autotest-tests" to succeed, as OpenCV is not available in the build 16 # environment. It is available on the target where the test actually runs. 17 import cv2 18except ImportError: 19 pass 20 21 22def async_suspend(): 23 try: 24 time.sleep(5) # allow some time to start capturing 25 sys_power.kernel_suspend(10) 26 except: 27 # Any exception will be re-raised in main process, but the stack trace 28 # will be wrong. Log it here with the correct stack trace. 29 logging.exception('suspend raised exception') 30 raise 31 32 33class power_CameraSuspend(test.test): 34 """Test camera before & after suspend.""" 35 36 version = 1 37 38 def run_once(self, capability, save_images=False): 39 device_capability.DeviceCapability().ensure_capability(capability) 40 # open the camera via opencv 41 cam_name, cam_index = camera_utils.find_camera() 42 if cam_index is None: 43 raise error.TestError('no camera found') 44 cam = cv2.VideoCapture(cam_index) 45 46 # kick off async suspend 47 logging.info('starting subprocess to suspend system') 48 pool = multiprocessing.Pool(processes=1) 49 # TODO(spang): Move async suspend to library. 50 result = pool.apply_async(async_suspend) 51 52 # capture images concurrently with suspend 53 capture_start = time.time() 54 logging.info('start capturing at %d', capture_start) 55 image_count = 0 56 resume_count = None 57 last_image = None 58 59 while True: 60 # terminate if we've captured a few frames after resume 61 if result.ready() and resume_count is None: 62 result.get() # reraise exception, if any 63 resume_count = image_count 64 logging.info('suspend task finished') 65 if resume_count is not None and image_count - resume_count >= 10: 66 break 67 68 # capture one frame 69 image_ok, image = cam.read() 70 image_count += 1 71 if not image_ok: 72 logging.error('failed capture at image %d', image_count) 73 raise error.TestFail('image capture failed from %s', cam_name) 74 75 # write image to disk, if requested 76 if save_images and image_count <= 200: 77 path = os.path.join(self.outputdir, '%03d.jpg' % image_count) 78 cv2.imwrite(path, image) 79 80 # verify camera produces a unique image on each capture 81 if last_image is not None and numpy.array_equal(image, last_image): 82 raise error.TestFail('camera produced two identical images') 83 last_image = image 84 85 capture_end = time.time() 86 logging.info('done capturing at %d', capture_end) 87 88 logging.info('captured %d frames in %d seconds', 89 image_count, capture_end - capture_start) 90