• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2import subprocess
3import pytest
4import os
5import stat
6import time
7from os.path import join as pjoin
8import sys
9import re
10import itertools
11
12basename = pjoin(os.path.dirname(__file__), '..')
13
14def get_printcap():
15    cmdline = base_cmdline + [ pjoin(basename, 'example', 'printcap') ]
16    proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
17                            universal_newlines=True)
18    (stdout, _) = proc.communicate(30)
19    assert proc.returncode == 0
20
21    proto = None
22    caps = set()
23    for line in stdout.split('\n'):
24        if line.startswith('\t'):
25            caps.add(line.strip())
26            continue
27
28        hit = re.match(r'Protocol version: (\d+)\.(\d+)$', line)
29        if hit:
30            proto = (int(hit.group(1)), int(hit.group(2)))
31
32    return (proto, caps)
33
34def test_printcap():
35    get_printcap()
36
37def wait_for_mount(mount_process, mnt_dir,
38                   test_fn=os.path.ismount):
39    elapsed = 0
40    while elapsed < 30:
41        if test_fn(mnt_dir):
42            return True
43        if mount_process.poll() is not None:
44            pytest.fail('file system process terminated prematurely')
45        time.sleep(0.1)
46        elapsed += 0.1
47    pytest.fail("mountpoint failed to come up")
48
49def cleanup(mount_process, mnt_dir):
50    # Don't bother trying Valgrind if things already went wrong
51
52    if 'bsd' in sys.platform or 'dragonfly' in sys.platform:
53        cmd = [ 'umount', '-f', mnt_dir ]
54    else:
55        cmd = [pjoin(basename, 'util', 'fusermount3'),
56                         '-z', '-u', mnt_dir]
57    subprocess.call(cmd, stdout=subprocess.DEVNULL,
58                    stderr=subprocess.STDOUT)
59    mount_process.terminate()
60    try:
61        mount_process.wait(1)
62    except subprocess.TimeoutExpired:
63        mount_process.kill()
64
65def umount(mount_process, mnt_dir):
66
67    if 'bsd' in sys.platform or 'dragonfly' in sys.platform:
68        cmdline = [ 'umount', mnt_dir ]
69    else:
70        # fusermount3 will be setuid root, so we can only trace it with
71        # valgrind if we're root
72        if os.getuid() == 0:
73            cmdline = base_cmdline
74        else:
75            cmdline = []
76        cmdline = cmdline + [ pjoin(basename, 'util', 'fusermount3'),
77                              '-z', '-u', mnt_dir ]
78
79    subprocess.check_call(cmdline)
80    assert not os.path.ismount(mnt_dir)
81
82    # Give mount process a little while to terminate. Popen.wait(timeout)
83    # was only added in 3.3...
84    elapsed = 0
85    while elapsed < 30:
86        code = mount_process.poll()
87        if code is not None:
88            if code == 0:
89                return
90            pytest.fail('file system process terminated with code %s' % (code,))
91        time.sleep(0.1)
92        elapsed += 0.1
93    pytest.fail('mount process did not terminate')
94
95
96def safe_sleep(secs):
97    '''Like time.sleep(), but sleep for at least *secs*
98
99    `time.sleep` may sleep less than the given period if a signal is
100    received. This function ensures that we sleep for at least the
101    desired time.
102    '''
103
104    now = time.time()
105    end = now + secs
106    while now < end:
107        time.sleep(end - now)
108        now = time.time()
109
110def fuse_test_marker():
111    '''Return a pytest.marker that indicates FUSE availability
112
113    If system/user/environment does not support FUSE, return
114    a `pytest.mark.skip` object with more details. If FUSE is
115    supported, return `pytest.mark.uses_fuse()`.
116    '''
117
118    skip = lambda x: pytest.mark.skip(reason=x)
119
120    if 'bsd' in sys.platform or 'dragonfly' in sys.platform:
121        return pytest.mark.uses_fuse()
122
123    with subprocess.Popen(['which', 'fusermount3'], stdout=subprocess.PIPE,
124                          universal_newlines=True) as which:
125        fusermount_path = which.communicate()[0].strip()
126
127    if not fusermount_path or which.returncode != 0:
128        return skip("Can't find fusermount executable")
129
130    if not os.path.exists('/dev/fuse'):
131        return skip("FUSE kernel module does not seem to be loaded")
132
133    if os.getuid() == 0:
134        return pytest.mark.uses_fuse()
135
136    mode = os.stat(fusermount_path).st_mode
137    if mode & stat.S_ISUID == 0:
138        return skip('fusermount executable not setuid, and we are not root.')
139
140    try:
141        fd = os.open('/dev/fuse', os.O_RDWR)
142    except OSError as exc:
143        return skip('Unable to open /dev/fuse: %s' % exc.strerror)
144    else:
145        os.close(fd)
146
147    return pytest.mark.uses_fuse()
148
149def powerset(iterable):
150  s = list(iterable)
151  return itertools.chain.from_iterable(
152      itertools.combinations(s, r) for r in range(len(s)+1))
153
154
155# Use valgrind if requested
156if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \
157   not in ('no', 'false', '0'):
158    base_cmdline = [ 'valgrind', '-q', '--' ]
159else:
160    base_cmdline = []
161
162# Try to use local fusermount3
163os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'util'), os.environ['PATH'])
164# Put example binaries on PATH
165os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'example'), os.environ['PATH'])
166
167try:
168    (fuse_proto, fuse_caps) = get_printcap()
169except:
170    # Rely on test to raise error
171    fuse_proto = (0,0)
172    fuse_caps = set()
173
174