1#!/usr/bin/env python3 2import subprocess 3import pytest 4import os 5import stat 6import time 7from os.path import join as pjoin 8import sys 9import re 10import itertools 11 12basename = pjoin(os.path.dirname(__file__), '..') 13 14def get_printcap(): 15 cmdline = base_cmdline + [ pjoin(basename, 'example', 'printcap') ] 16 proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE, 17 universal_newlines=True) 18 (stdout, _) = proc.communicate(30) 19 assert proc.returncode == 0 20 21 proto = None 22 caps = set() 23 for line in stdout.split('\n'): 24 if line.startswith('\t'): 25 caps.add(line.strip()) 26 continue 27 28 hit = re.match(r'Protocol version: (\d+)\.(\d+)$', line) 29 if hit: 30 proto = (int(hit.group(1)), int(hit.group(2))) 31 32 return (proto, caps) 33 34def test_printcap(): 35 get_printcap() 36 37def wait_for_mount(mount_process, mnt_dir, 38 test_fn=os.path.ismount): 39 elapsed = 0 40 while elapsed < 30: 41 if test_fn(mnt_dir): 42 return True 43 if mount_process.poll() is not None: 44 pytest.fail('file system process terminated prematurely') 45 time.sleep(0.1) 46 elapsed += 0.1 47 pytest.fail("mountpoint failed to come up") 48 49def cleanup(mount_process, mnt_dir): 50 # Don't bother trying Valgrind if things already went wrong 51 52 if 'bsd' in sys.platform or 'dragonfly' in sys.platform: 53 cmd = [ 'umount', '-f', mnt_dir ] 54 else: 55 cmd = [pjoin(basename, 'util', 'fusermount3'), 56 '-z', '-u', mnt_dir] 57 subprocess.call(cmd, stdout=subprocess.DEVNULL, 58 stderr=subprocess.STDOUT) 59 mount_process.terminate() 60 try: 61 mount_process.wait(1) 62 except subprocess.TimeoutExpired: 63 mount_process.kill() 64 65def umount(mount_process, mnt_dir): 66 67 if 'bsd' in sys.platform or 'dragonfly' in sys.platform: 68 cmdline = [ 'umount', mnt_dir ] 69 else: 70 # fusermount3 will be setuid root, so we can only trace it with 71 # valgrind if we're root 72 if os.getuid() == 0: 73 cmdline = base_cmdline 74 else: 75 cmdline = [] 76 cmdline = cmdline + [ pjoin(basename, 'util', 'fusermount3'), 77 '-z', '-u', mnt_dir ] 78 79 subprocess.check_call(cmdline) 80 assert not os.path.ismount(mnt_dir) 81 82 # Give mount process a little while to terminate. Popen.wait(timeout) 83 # was only added in 3.3... 84 elapsed = 0 85 while elapsed < 30: 86 code = mount_process.poll() 87 if code is not None: 88 if code == 0: 89 return 90 pytest.fail('file system process terminated with code %s' % (code,)) 91 time.sleep(0.1) 92 elapsed += 0.1 93 pytest.fail('mount process did not terminate') 94 95 96def safe_sleep(secs): 97 '''Like time.sleep(), but sleep for at least *secs* 98 99 `time.sleep` may sleep less than the given period if a signal is 100 received. This function ensures that we sleep for at least the 101 desired time. 102 ''' 103 104 now = time.time() 105 end = now + secs 106 while now < end: 107 time.sleep(end - now) 108 now = time.time() 109 110def fuse_test_marker(): 111 '''Return a pytest.marker that indicates FUSE availability 112 113 If system/user/environment does not support FUSE, return 114 a `pytest.mark.skip` object with more details. If FUSE is 115 supported, return `pytest.mark.uses_fuse()`. 116 ''' 117 118 skip = lambda x: pytest.mark.skip(reason=x) 119 120 if 'bsd' in sys.platform or 'dragonfly' in sys.platform: 121 return pytest.mark.uses_fuse() 122 123 with subprocess.Popen(['which', 'fusermount3'], stdout=subprocess.PIPE, 124 universal_newlines=True) as which: 125 fusermount_path = which.communicate()[0].strip() 126 127 if not fusermount_path or which.returncode != 0: 128 return skip("Can't find fusermount executable") 129 130 if not os.path.exists('/dev/fuse'): 131 return skip("FUSE kernel module does not seem to be loaded") 132 133 if os.getuid() == 0: 134 return pytest.mark.uses_fuse() 135 136 mode = os.stat(fusermount_path).st_mode 137 if mode & stat.S_ISUID == 0: 138 return skip('fusermount executable not setuid, and we are not root.') 139 140 try: 141 fd = os.open('/dev/fuse', os.O_RDWR) 142 except OSError as exc: 143 return skip('Unable to open /dev/fuse: %s' % exc.strerror) 144 else: 145 os.close(fd) 146 147 return pytest.mark.uses_fuse() 148 149def powerset(iterable): 150 s = list(iterable) 151 return itertools.chain.from_iterable( 152 itertools.combinations(s, r) for r in range(len(s)+1)) 153 154 155# Use valgrind if requested 156if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \ 157 not in ('no', 'false', '0'): 158 base_cmdline = [ 'valgrind', '-q', '--' ] 159else: 160 base_cmdline = [] 161 162# Try to use local fusermount3 163os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'util'), os.environ['PATH']) 164# Put example binaries on PATH 165os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'example'), os.environ['PATH']) 166 167try: 168 (fuse_proto, fuse_caps) = get_printcap() 169except: 170 # Rely on test to raise error 171 fuse_proto = (0,0) 172 fuse_caps = set() 173 174