# Copyright 2017 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from __future__ import absolute_import from __future__ import division from __future__ import print_function import collections import contextlib import logging import os import signal import socket import sys import mock import pytest import subprocess32 from lucifer import leasing logger = logging.getLogger(__name__) # 9999-01-01T00:00:00+00:00 _THE_END = 253370764800 def test_obtain_lease(tmpdir): """Test obtain_lease. Provides basic test coverage metrics. The slower subprocess tests provide better functional coverage. """ path = _make_lease(tmpdir, 124) with leasing.obtain_lease(path): pass assert not os.path.exists(path) @pytest.mark.slow def test_obtain_lease_succesfully_removes_file(tmpdir): """Test obtain_lease cleans up lease file if successful.""" path = _make_lease(tmpdir, 124) with _obtain_lease(path) as lease_proc: lease_proc.finish() assert not os.path.exists(path) @pytest.mark.slow def test_obtain_lease_with_error_removes_files(tmpdir): """Test obtain_lease removes file if it errors.""" path = _make_lease(tmpdir, 124) with _obtain_lease(path) as lease_proc: lease_proc.proc.send_signal(signal.SIGINT) lease_proc.proc.wait() assert not os.path.exists(path) @pytest.mark.slow def test_Lease_expired(tmpdir, end_time): """Test Lease.Expired().""" _make_lease(tmpdir, 123) path = _make_lease(tmpdir, 124) with _obtain_lease(path): leases = _leases_dict(str(tmpdir)) assert leases[123].expired() assert not leases[124].expired() def test_unlocked_fresh_leases_are_not_expired(tmpdir): """Test get_expired_leases().""" path = _make_lease(tmpdir, 123) os.utime(path, (_THE_END, _THE_END)) leases = _leases_dict(str(tmpdir)) assert not leases[123].expired() def test_leases_iter_with_sock_files(tmpdir): """Test leases_iter() ignores sock files.""" _make_lease(tmpdir, 123) tmpdir.join('124.sock').write('') leases = _leases_dict(str(tmpdir)) assert 124 not in leases def test_Lease_cleanup(tmpdir): """Test Lease.cleanup().""" lease_path = _make_lease(tmpdir, 123) tmpdir.join('123.sock').write('') sock_path = str(tmpdir.join('123.sock')) for lease in leasing.leases_iter(str(tmpdir)): logger.debug('Cleaning up %r', lease) lease.cleanup() assert not os.path.exists(lease_path) assert not os.path.exists(sock_path) def test_Lease_cleanup_does_not_raise_on_error(tmpdir): """Test Lease.cleanup().""" lease_path = _make_lease(tmpdir, 123) tmpdir.join('123.sock').write('') sock_path = str(tmpdir.join('123.sock')) for lease in leasing.leases_iter(str(tmpdir)): os.unlink(lease_path) os.unlink(sock_path) lease.cleanup() @pytest.mark.slow def test_Lease_abort(tmpdir): """Test Lease.abort().""" _make_lease(tmpdir, 123) with _abort_socket(tmpdir, 123) as proc: expired = list(leasing.leases_iter(str(tmpdir))) assert len(expired) > 0 for lease in expired: lease.abort() proc.wait() assert proc.returncode == 0 @pytest.mark.slow def test_Lease_abort_with_closed_socket(tmpdir): """Test Lease.abort() with closed socket.""" _make_lease(tmpdir, 123) with _abort_socket(tmpdir, 123) as proc: proc.terminate() proc.wait() expired = list(leasing.leases_iter(str(tmpdir))) assert len(expired) > 0 for lease in expired: with pytest.raises(socket.error): lease.abort() @pytest.mark.slow def test_Lease_abort_with_blocked_socket(tmpdir): """Test Lease.abort() with blocked socket. If the behavior this test is looking for is missing (a raised error for nonblock write timeout), this test will hang indefinitely on a blocking socket read. """ _make_lease(tmpdir, 123) with _abort_socket_norecv(tmpdir, 123): expired = list(leasing.leases_iter(str(tmpdir))) assert len(expired) == 1 lease = expired[0] with pytest.raises(socket.error): while True: lease.abort() @pytest.fixture def end_time(): """Mock out time.time to return a time in the future.""" with mock.patch('time.time', return_value=_THE_END) as t: yield t _LeaseProc = collections.namedtuple('_LeaseProc', 'finish proc') @contextlib.contextmanager def _obtain_lease(path): """Lock a lease file. Yields a _LeaseProc. finish is a function that can be called to finish the process normally. proc is a Popen instance. This uses a slow subprocess; any test that uses this should be marked slow. """ with subprocess32.Popen( [sys.executable, '-um', 'lucifer.cmd.test.obtain_lease', path], stdin=subprocess32.PIPE, stdout=subprocess32.PIPE) as proc: # Wait for lock grab. proc.stdout.readline() def finish(): """Finish lease process normally.""" proc.stdin.write('\n') # Wait for lease release. proc.stdout.readline() try: yield _LeaseProc(finish, proc) finally: proc.terminate() @contextlib.contextmanager def _abort_socket(tmpdir, job_id): """Open a testing abort socket and listener for a job. As a context manager, returns the Popen instance for the listener process when entering. This uses a slow subprocess; any test that uses this should be marked slow. """ path = os.path.join(str(tmpdir), '%d.sock' % job_id) logger.debug('Making abort socket at %s', path) with subprocess32.Popen( [sys.executable, '-um', 'lucifer.cmd.test.abort_socket', path], stdout=subprocess32.PIPE) as proc: # Wait for socket bind. proc.stdout.readline() try: yield proc finally: proc.terminate() @contextlib.contextmanager def _abort_socket_norecv(tmpdir, job_id): """Open a testing abort socket and bad listener for a job. The listening process doesn't actually call recv(). As a context manager, returns the Popen instance for the listener process when entering. This uses a slow subprocess; any test that uses this should be marked slow. """ path = os.path.join(str(tmpdir), '%d.sock' % job_id) logger.debug('Making abort socket at %s', path) with subprocess32.Popen( [sys.executable, '-um', 'lucifer.cmd.test.abort_socket_norecv', path], stdout=subprocess32.PIPE) as proc: # Wait for socket bind. proc.stdout.readline() try: yield proc finally: proc.terminate() def _leases_dict(jobdir): """Convenience method for tests.""" return {lease.id: lease for lease in leasing.leases_iter(jobdir)} def _make_lease(tmpdir, job_id): return _make_lease_file(str(tmpdir), job_id) def _make_lease_file(jobdir, job_id): """Make lease file corresponding to a job. @param jobdir: job lease file directory @param job_id: Job ID """ path = os.path.join(jobdir, str(job_id)) with open(path, 'w'): pass return path class _TestError(Exception): """Error for tests."""