• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, multiprocessing, os, time
6import numpy
7from autotest_lib.client.bin import test
8from autotest_lib.client.cros.camera import camera_utils
9from autotest_lib.client.cros.power import sys_power
10from autotest_lib.client.cros.video import device_capability
11from autotest_lib.client.common_lib import error
12
13try:
14    # HACK: We need to succeed if OpenCV is missing to allow "emerge
15    # autotest-tests" to succeed, as OpenCV is not available in the build
16    # environment. It is available on the target where the test actually runs.
17    import cv2
18except ImportError:
19    pass
20
21
22def async_suspend():
23    try:
24        time.sleep(5) # allow some time to start capturing
25        sys_power.kernel_suspend(10)
26    except:
27        # Any exception will be re-raised in main process, but the stack trace
28        # will be wrong. Log it here with the correct stack trace.
29        logging.exception('suspend raised exception')
30        raise
31
32
33class power_CameraSuspend(test.test):
34    """Test camera before & after suspend."""
35
36    version = 1
37
38    def run_once(self, capability, save_images=False):
39        device_capability.DeviceCapability().ensure_capability(capability)
40        # open the camera via opencv
41        cam_name, cam_index = camera_utils.find_camera()
42        if cam_index is None:
43            raise error.TestError('no camera found')
44        cam = cv2.VideoCapture(cam_index)
45
46        # kick off async suspend
47        logging.info('starting subprocess to suspend system')
48        pool = multiprocessing.Pool(processes=1)
49        # TODO(spang): Move async suspend to library.
50        result = pool.apply_async(async_suspend)
51
52        # capture images concurrently with suspend
53        capture_start = time.time()
54        logging.info('start capturing at %d', capture_start)
55        image_count = 0
56        resume_count = None
57        last_image = None
58
59        while True:
60            # terminate if we've captured a few frames after resume
61            if result.ready() and resume_count is None:
62                result.get() # reraise exception, if any
63                resume_count = image_count
64                logging.info('suspend task finished')
65            if resume_count is not None and image_count - resume_count >= 10:
66                break
67
68            # capture one frame
69            image_ok, image = cam.read()
70            image_count += 1
71            if not image_ok:
72                logging.error('failed capture at image %d', image_count)
73                raise error.TestFail('image capture failed from %s', cam_name)
74
75            # write image to disk, if requested
76            if save_images and image_count <= 200:
77                path = os.path.join(self.outputdir, '%03d.jpg' % image_count)
78                cv2.imwrite(path, image)
79
80            # verify camera produces a unique image on each capture
81            if last_image is not None and numpy.array_equal(image, last_image):
82                raise error.TestFail('camera produced two identical images')
83            last_image = image
84
85        capture_end = time.time()
86        logging.info('done capturing at %d', capture_end)
87
88        logging.info('captured %d frames in %d seconds',
89                     image_count, capture_end - capture_start)
90