1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, multiprocessing, os, time 6import numpy 7from autotest_lib.client.bin import test 8from autotest_lib.client.cros import sys_power 9from autotest_lib.client.cros.camera import camera_utils 10from autotest_lib.client.common_lib import error 11 12try: 13 # HACK: We need to succeed if OpenCV is missing to allow "emerge 14 # autotest-tests" to succeed, as OpenCV is not available in the build 15 # environment. It is available on the target where the test actually runs. 16 import cv2 17except ImportError: 18 pass 19 20 21def async_suspend(): 22 try: 23 time.sleep(5) # allow some time to start capturing 24 sys_power.kernel_suspend(10) 25 except: 26 # Any exception will be re-raised in main process, but the stack trace 27 # will be wrong. Log it here with the correct stack trace. 28 logging.exception('suspend raised exception') 29 raise 30 31 32class power_CameraSuspend(test.test): 33 """Test camera before & after suspend.""" 34 35 version = 1 36 37 def run_once(self, save_images=False): 38 # open the camera via opencv 39 cam_name, cam_index = camera_utils.find_camera() 40 if cam_index is None: 41 raise error.TestError('no camera found') 42 cam = cv2.VideoCapture(cam_index) 43 44 # kick off async suspend 45 logging.info('starting subprocess to suspend system') 46 pool = multiprocessing.Pool(processes=1) 47 # TODO(spang): Move async suspend to library. 48 result = pool.apply_async(async_suspend) 49 50 # capture images concurrently with suspend 51 capture_start = time.time() 52 logging.info('start capturing at %d', capture_start) 53 image_count = 0 54 resume_count = None 55 last_image = None 56 57 while True: 58 # terminate if we've captured a few frames after resume 59 if result.ready() and resume_count is None: 60 result.get() # reraise exception, if any 61 resume_count = image_count 62 logging.info('suspend task finished') 63 if resume_count is not None and image_count - resume_count >= 10: 64 break 65 66 # capture one frame 67 image_ok, image = cam.read() 68 image_count += 1 69 if not image_ok: 70 logging.error('failed capture at image %d', image_count) 71 raise error.TestFail('image capture failed from %s', cam_name) 72 73 # write image to disk, if requested 74 if save_images and image_count <= 200: 75 path = os.path.join(self.outputdir, '%03d.jpg' % image_count) 76 cv2.imwrite(path, image) 77 78 # verify camera produces a unique image on each capture 79 if last_image is not None and numpy.array_equal(image, last_image): 80 raise error.TestFail('camera produced two identical images') 81 last_image = image 82 83 capture_end = time.time() 84 logging.info('done capturing at %d', capture_end) 85 86 logging.info('captured %d frames in %d seconds', 87 image_count, capture_end - capture_start) 88