• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2
3if __name__ == '__main__':
4    import pytest
5    import sys
6    sys.exit(pytest.main([__file__] + sys.argv[1:]))
7
8import subprocess
9import os
10import sys
11import py
12import pytest
13import stat
14import shutil
15import filecmp
16import tempfile
17import time
18import errno
19import sys
20import platform
21from looseversion import LooseVersion
22from tempfile import NamedTemporaryFile
23from contextlib import contextmanager
24from util import (wait_for_mount, umount, cleanup, base_cmdline,
25                  safe_sleep, basename, fuse_test_marker, test_printcap,
26                  fuse_proto, powerset)
27from os.path import join as pjoin
28
29pytestmark = fuse_test_marker()
30
31TEST_FILE = __file__
32
33with open(TEST_FILE, 'rb') as fh:
34    TEST_DATA = fh.read()
35
36def name_generator(__ctr=[0]):
37    __ctr[0] += 1
38    return 'testfile_%d' % __ctr[0]
39
40options = []
41if sys.platform == 'linux':
42    options.append('clone_fd')
43
44def invoke_directly(mnt_dir, name, options):
45    cmdline = base_cmdline + [ pjoin(basename, 'example', name),
46                               '-f', mnt_dir, '-o', ','.join(options) ]
47    if name == 'hello_ll':
48        # supports single-threading only
49        cmdline.append('-s')
50
51    return cmdline
52
53def invoke_mount_fuse(mnt_dir, name, options):
54    return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'),
55                            name, mnt_dir, '-o', ','.join(options) ]
56
57def invoke_mount_fuse_drop_privileges(mnt_dir, name, options):
58    if os.getuid() != 0:
59        pytest.skip('drop_privileges requires root, skipping.')
60
61    return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',))
62
63class raii_tmpdir:
64    def __init__(self):
65        self.d = tempfile.mkdtemp()
66
67    def __str__(self):
68        return str(self.d)
69
70    def mkdir(self, path):
71        return py.path.local(str(self.d)).mkdir(path)
72
73@pytest.fixture
74def short_tmpdir():
75    return raii_tmpdir()
76
77def readdir_inode(dir):
78    cmd = base_cmdline + [ pjoin(basename, 'test', 'readdir_inode'), dir ]
79    with subprocess.Popen(cmd, stdout=subprocess.PIPE,
80                          universal_newlines=True) as proc:
81        lines = proc.communicate()[0].splitlines()
82    lines.sort()
83    return lines
84
85
86@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse,
87                                             invoke_mount_fuse_drop_privileges))
88@pytest.mark.parametrize("options", powerset(options))
89@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
90def test_hello(tmpdir, name, options, cmdline_builder, output_checker):
91    mnt_dir = str(tmpdir)
92    mount_process = subprocess.Popen(
93        cmdline_builder(mnt_dir, name, options),
94        stdout=output_checker.fd, stderr=output_checker.fd)
95    try:
96        wait_for_mount(mount_process, mnt_dir)
97        assert os.listdir(mnt_dir) == [ 'hello' ]
98        filename = pjoin(mnt_dir, 'hello')
99        with open(filename, 'r') as fh:
100            assert fh.read() == 'Hello World!\n'
101        with pytest.raises(IOError) as exc_info:
102            open(filename, 'r+')
103        assert exc_info.value.errno == errno.EACCES
104        with pytest.raises(IOError) as exc_info:
105            open(filename + 'does-not-exist', 'r+')
106        assert exc_info.value.errno == errno.ENOENT
107        if name == 'hello_ll':
108            tst_xattr(mnt_dir)
109    except:
110        cleanup(mount_process, mnt_dir)
111        raise
112    else:
113        umount(mount_process, mnt_dir)
114
115@pytest.mark.parametrize("writeback", (False, True))
116@pytest.mark.parametrize("name", ('passthrough', 'passthrough_plus',
117                           'passthrough_fh', 'passthrough_ll'))
118@pytest.mark.parametrize("debug", (False, True))
119def test_passthrough(short_tmpdir, name, debug, output_checker, writeback):
120    # Avoid false positives from libfuse debug messages
121    if debug:
122        output_checker.register_output(r'^   unique: [0-9]+, error: -[0-9]+ .+$',
123                                       count=0)
124
125    # test_syscalls prints "No error" under FreeBSD
126    output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]",
127                                   count=0)
128
129    mnt_dir = str(short_tmpdir.mkdir('mnt'))
130    src_dir = str(short_tmpdir.mkdir('src'))
131
132    if name == 'passthrough_plus':
133        cmdline = base_cmdline + \
134                  [ pjoin(basename, 'example', 'passthrough'),
135                    '--plus', '-f', mnt_dir ]
136    else:
137        cmdline = base_cmdline + \
138                  [ pjoin(basename, 'example', name),
139                    '-f', mnt_dir ]
140    if debug:
141        cmdline.append('-d')
142
143    if writeback:
144        if name != 'passthrough_ll':
145            pytest.skip('example does not support writeback caching')
146        cmdline.append('-o')
147        cmdline.append('writeback')
148
149    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
150                                     stderr=output_checker.fd)
151    try:
152        wait_for_mount(mount_process, mnt_dir)
153        work_dir = mnt_dir + src_dir
154
155        tst_statvfs(work_dir)
156        tst_readdir(src_dir, work_dir)
157        tst_readdir_big(src_dir, work_dir)
158        tst_open_read(src_dir, work_dir)
159        tst_open_write(src_dir, work_dir)
160        tst_create(work_dir)
161        tst_passthrough(src_dir, work_dir)
162        tst_append(src_dir, work_dir)
163        tst_seek(src_dir, work_dir)
164        tst_mkdir(work_dir)
165        tst_rmdir(work_dir, src_dir)
166        tst_unlink(work_dir, src_dir)
167        tst_symlink(work_dir)
168        if os.getuid() == 0:
169            tst_chown(work_dir)
170
171        # Underlying fs may not have full nanosecond resolution
172        tst_utimens(work_dir, ns_tol=1000)
173
174        tst_link(work_dir)
175        tst_truncate_path(work_dir)
176        tst_truncate_fd(work_dir)
177        tst_open_unlink(work_dir)
178
179        syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
180                             work_dir, ':' + src_dir ]
181        if writeback:
182            # When writeback caching is enabled, kernel has to open files for
183            # reading even when userspace opens with O_WDONLY. This fails if the
184            # filesystem process doesn't have special permission.
185            syscall_test_cmd.append('-53')
186        subprocess.check_call(syscall_test_cmd)
187    except:
188        cleanup(mount_process, mnt_dir)
189        raise
190    else:
191        umount(mount_process, mnt_dir)
192
193@pytest.mark.parametrize("cache", (False, True))
194def test_passthrough_hp(short_tmpdir, cache, output_checker):
195    mnt_dir = str(short_tmpdir.mkdir('mnt'))
196    src_dir = str(short_tmpdir.mkdir('src'))
197
198    cmdline = base_cmdline + \
199              [ pjoin(basename, 'example', 'passthrough_hp'),
200                src_dir, mnt_dir ]
201
202    cmdline.append('--foreground')
203
204    if not cache:
205        cmdline.append('--nocache')
206
207    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
208                                     stderr=output_checker.fd)
209    try:
210        wait_for_mount(mount_process, mnt_dir)
211
212        tst_statvfs(mnt_dir)
213        tst_readdir(src_dir, mnt_dir)
214        tst_readdir_big(src_dir, mnt_dir)
215        tst_open_read(src_dir, mnt_dir)
216        tst_open_write(src_dir, mnt_dir)
217        tst_create(mnt_dir)
218        if not cache:
219            tst_passthrough(src_dir, mnt_dir)
220        tst_append(src_dir, mnt_dir)
221        tst_seek(src_dir, mnt_dir)
222        tst_mkdir(mnt_dir)
223        if cache:
224            # if cache is enabled, no operations should go through
225            # src_dir as the cache will become stale.
226            tst_rmdir(mnt_dir)
227            tst_unlink(mnt_dir)
228        else:
229            tst_rmdir(mnt_dir, src_dir)
230            tst_unlink(mnt_dir, src_dir)
231        tst_symlink(mnt_dir)
232        if os.getuid() == 0:
233            tst_chown(mnt_dir)
234
235        # Underlying fs may not have full nanosecond resolution
236        tst_utimens(mnt_dir, ns_tol=1000)
237
238        tst_link(mnt_dir)
239        tst_truncate_path(mnt_dir)
240        tst_truncate_fd(mnt_dir)
241        tst_open_unlink(mnt_dir)
242
243        # test_syscalls assumes that changes in source directory
244        # will be reflected immediately in mountpoint, so we
245        # can't use it.
246        if not cache:
247            syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
248                             mnt_dir, ':' + src_dir ]
249            # unlinked testfiles check fails without kernel fix
250            # "fuse: fix illegal access to inode with reused nodeid"
251            # so opt-in for this test from kernel 5.14
252            if LooseVersion(platform.release()) >= '5.14':
253                syscall_test_cmd.append('-u')
254            subprocess.check_call(syscall_test_cmd)
255    except:
256        cleanup(mount_process, mnt_dir)
257        raise
258    else:
259        umount(mount_process, mnt_dir)
260
261
262@pytest.mark.skipif(fuse_proto < (7,11),
263                    reason='not supported by running kernel')
264def test_ioctl(tmpdir, output_checker):
265    progname = pjoin(basename, 'example', 'ioctl')
266    if not os.path.exists(progname):
267        pytest.skip('%s not built' % os.path.basename(progname))
268
269    mnt_dir = str(tmpdir)
270    testfile = pjoin(mnt_dir, 'fioc')
271    cmdline = base_cmdline + [progname, '-f', mnt_dir ]
272    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
273                                     stderr=output_checker.fd)
274    try:
275        wait_for_mount(mount_process, mnt_dir)
276
277        cmdline = base_cmdline + \
278                  [ pjoin(basename, 'example', 'ioctl_client'),
279                    testfile ]
280        assert subprocess.check_output(cmdline) == b'0\n'
281        with open(testfile, 'wb') as fh:
282            fh.write(b'foobar')
283        assert subprocess.check_output(cmdline) == b'6\n'
284        subprocess.check_call(cmdline + [ '3' ])
285        with open(testfile, 'rb') as fh:
286            assert fh.read()== b'foo'
287    except:
288        cleanup(mount_process, mnt_dir)
289        raise
290    else:
291        umount(mount_process, mnt_dir)
292
293def test_poll(tmpdir, output_checker):
294    mnt_dir = str(tmpdir)
295    cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'),
296               '-f', mnt_dir ]
297    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
298                                     stderr=output_checker.fd)
299    try:
300        wait_for_mount(mount_process, mnt_dir)
301        cmdline = base_cmdline + \
302                  [ pjoin(basename, 'example', 'poll_client') ]
303        subprocess.check_call(cmdline, cwd=mnt_dir)
304    except:
305        cleanup(mount_process, mnt_dir)
306        raise
307    else:
308        umount(mount_process, mnt_dir)
309
310def test_null(tmpdir, output_checker):
311    progname = pjoin(basename, 'example', 'null')
312    if not os.path.exists(progname):
313        pytest.skip('%s not built' % os.path.basename(progname))
314
315    mnt_file = str(tmpdir) + '/file'
316    with open(mnt_file, 'w') as fh:
317        fh.write('dummy')
318    cmdline = base_cmdline + [ progname, '-f', mnt_file ]
319    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
320                                     stderr=output_checker.fd)
321    def test_fn(name):
322        return os.stat(name).st_size > 4000
323    try:
324        wait_for_mount(mount_process, mnt_file, test_fn)
325        with open(mnt_file, 'rb') as fh:
326            assert fh.read(382) == b'\0' * 382
327        with open(mnt_file, 'wb') as fh:
328            fh.write(b'whatever')
329    except:
330        cleanup(mount_process, mnt_file)
331        raise
332    else:
333        umount(mount_process, mnt_file)
334
335
336@pytest.mark.skipif(fuse_proto < (7,12),
337                    reason='not supported by running kernel')
338@pytest.mark.parametrize("only_expire", ("invalidate_entries", "expire_entries"))
339@pytest.mark.parametrize("notify", (True, False))
340def test_notify_inval_entry(tmpdir, only_expire, notify, output_checker):
341    mnt_dir = str(tmpdir)
342    cmdline = base_cmdline + \
343              [ pjoin(basename, 'example', 'notify_inval_entry'),
344                '-f', '--update-interval=1',
345                '--timeout=5', mnt_dir ]
346    if not notify:
347        cmdline.append('--no-notify')
348    if only_expire == "expire_entries":
349        cmdline.append('--only-expire')
350        if fuse_proto < (7,38):
351            pytest.skip('only-expire not supported by running kernel')
352    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
353                                     stderr=output_checker.fd)
354    try:
355        wait_for_mount(mount_process, mnt_dir)
356        fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
357        try:
358            os.stat(fname)
359        except FileNotFoundError:
360            # We may have hit a race condition and issued
361            # readdir just before the name changed
362            fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
363            os.stat(fname)
364
365        safe_sleep(2)
366        if not notify:
367            os.stat(fname)
368            safe_sleep(5)
369        with pytest.raises(FileNotFoundError):
370            os.stat(fname)
371    except:
372        cleanup(mount_process, mnt_dir)
373        raise
374    else:
375        umount(mount_process, mnt_dir)
376
377@pytest.mark.parametrize("intended_user", ('root', 'non_root'))
378def test_dev_auto_unmount(short_tmpdir, output_checker, intended_user):
379    """Check that root can mount with dev and auto_unmount
380    (but non-root cannot).
381    Split into root vs non-root, so that the output of pytest
382    makes clear what functionality is being tested."""
383    if os.getuid() == 0 and intended_user == 'non_root':
384        pytest.skip('needs to run as non-root')
385    if os.getuid() != 0 and intended_user == 'root':
386        pytest.skip('needs to run as root')
387    mnt_dir = str(short_tmpdir.mkdir('mnt'))
388    src_dir = str('/dev')
389    cmdline = base_cmdline + \
390                [ pjoin(basename, 'example', 'passthrough_ll'),
391                '-o', f'source={src_dir},dev,auto_unmount',
392                '-f', mnt_dir ]
393    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
394                                     stderr=output_checker.fd)
395    try:
396        wait_for_mount(mount_process, mnt_dir)
397        if os.getuid() == 0:
398            open(pjoin(mnt_dir, 'null')).close()
399        else:
400            with pytest.raises(PermissionError):
401                open(pjoin(mnt_dir, 'null')).close()
402    except:
403        cleanup(mount_process, mnt_dir)
404        raise
405    else:
406        umount(mount_process, mnt_dir)
407
408@pytest.mark.skipif(os.getuid() != 0,
409                    reason='needs to run as root')
410def test_cuse(output_checker):
411
412    # Valgrind warns about unknown ioctls, that's ok
413    output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n'
414                                   r'==\1== \s{3}.+\n'
415                                   r'==\1== \s{3}.+$', count=0)
416
417    devname = 'cuse-test-%d' % os.getpid()
418    devpath = '/dev/%s' % devname
419    cmdline = base_cmdline + \
420              [ pjoin(basename, 'example', 'cuse'),
421                '-f', '--name=%s' % devname ]
422    mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
423                                     stderr=output_checker.fd)
424
425    cmdline = base_cmdline + \
426              [ pjoin(basename, 'example', 'cuse_client'),
427                devpath ]
428    try:
429        wait_for_mount(mount_process, devpath,
430                       test_fn=os.path.exists)
431        assert subprocess.check_output(cmdline + ['s']) == b'0\n'
432        data = b'some test data'
433        off = 5
434        proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ],
435                                stdin=subprocess.PIPE)
436        proc.stdin.write(data)
437        proc.stdin.close()
438        assert proc.wait(timeout=10) == 0
439        size = str(off + len(data)).encode() + b'\n'
440        assert subprocess.check_output(cmdline + ['s']) == size
441        out = subprocess.check_output(
442            cmdline + [ 'r', str(off + len(data) + 2), '0' ])
443        assert out == (b'\0' * off) + data
444    finally:
445        mount_process.terminate()
446
447def test_release_unlink_race(tmpdir, output_checker):
448    """test case for Issue #746
449
450    If RELEASE and UNLINK opcodes are sent back to back, and fuse_fs_release()
451    and fuse_fs_rename() are slow to execute, UNLINK will run while RELEASE is
452    still executing. UNLINK will try to rename the file and, while the rename
453    is happening, the RELEASE will finish executing. As a result, RELEASE will
454    not detect in time that UNLINK has happened, and UNLINK will not detect in
455    time that RELEASE has happened.
456
457
458    NOTE: This is triggered only when nullpath_ok is set.
459
460    If it is NOT SET then get_path_nullok() called by fuse_lib_release() will
461    call get_path_common() and lock the path, and then the fuse_lib_unlink()
462    will wait for the path to be unlocked before executing and thus synchronise
463    with fuse_lib_release().
464
465    If it is SET then get_path_nullok() will just set the path to null and
466    return without locking anything and thus allowing fuse_lib_unlink() to
467    eventually execute unimpeded while fuse_lib_release() is still running.
468    """
469
470    fuse_mountpoint = str(tmpdir)
471
472    fuse_binary_command = base_cmdline + \
473        [ pjoin(basename, 'test', 'release_unlink_race'),
474        "-f", fuse_mountpoint]
475
476    fuse_process = subprocess.Popen(fuse_binary_command,
477                                   stdout=output_checker.fd,
478                                   stderr=output_checker.fd)
479
480    try:
481        wait_for_mount(fuse_process, fuse_mountpoint)
482
483        temp_dir = tempfile.TemporaryDirectory(dir="/tmp/")
484        temp_dir_path = temp_dir.name
485
486        fuse_temp_file, fuse_temp_file_path = tempfile.mkstemp(dir=(fuse_mountpoint + temp_dir_path))
487
488        os.close(fuse_temp_file)
489        os.unlink(fuse_temp_file_path)
490
491        # needed for slow CI/CD pipelines for unlink OP to complete processing
492        safe_sleep(3)
493
494        assert os.listdir(temp_dir_path) == []
495
496    except:
497        temp_dir.cleanup()
498        cleanup(fuse_process, fuse_mountpoint)
499        raise
500
501    else:
502        temp_dir.cleanup()
503        umount(fuse_process, fuse_mountpoint)
504
505
506@contextmanager
507def os_open(name, flags):
508    fd = os.open(name, flags)
509    try:
510        yield fd
511    finally:
512        os.close(fd)
513
514def os_create(name):
515    os.close(os.open(name, os.O_CREAT | os.O_RDWR))
516
517def tst_unlink(mnt_dir, src_dir=None):
518    name = name_generator()
519    fullname = mnt_dir + "/" + name
520    srcname = fullname
521    if src_dir is not None:
522        srcname = pjoin(src_dir, name)
523    with open(srcname, 'wb') as fh:
524        fh.write(b'hello')
525    assert name in os.listdir(mnt_dir)
526    os.unlink(fullname)
527    with pytest.raises(OSError) as exc_info:
528        os.stat(fullname)
529    assert exc_info.value.errno == errno.ENOENT
530    assert name not in os.listdir(mnt_dir)
531
532def tst_mkdir(mnt_dir):
533    dirname = name_generator()
534    fullname = mnt_dir + "/" + dirname
535    os.mkdir(fullname)
536    fstat = os.stat(fullname)
537    assert stat.S_ISDIR(fstat.st_mode)
538    assert os.listdir(fullname) ==  []
539    # Some filesystem (e.g. BTRFS) don't track st_nlink for directories
540    assert fstat.st_nlink in (1,2)
541    assert dirname in os.listdir(mnt_dir)
542
543def tst_rmdir(mnt_dir, src_dir=None):
544    name = name_generator()
545    fullname = mnt_dir + "/" + name
546    srcname = fullname
547    if src_dir is not None:
548        srcname = pjoin(src_dir, name)
549    os.mkdir(srcname)
550    assert name in os.listdir(mnt_dir)
551    os.rmdir(fullname)
552    with pytest.raises(OSError) as exc_info:
553        os.stat(fullname)
554    assert exc_info.value.errno == errno.ENOENT
555    assert name not in os.listdir(mnt_dir)
556
557def tst_symlink(mnt_dir):
558    linkname = name_generator()
559    fullname = mnt_dir + "/" + linkname
560    os.symlink("/imaginary/dest", fullname)
561    fstat = os.lstat(fullname)
562    assert stat.S_ISLNK(fstat.st_mode)
563    assert os.readlink(fullname) == "/imaginary/dest"
564    assert fstat.st_nlink == 1
565    assert linkname in os.listdir(mnt_dir)
566
567def tst_create(mnt_dir):
568    name = name_generator()
569    fullname = pjoin(mnt_dir, name)
570    with pytest.raises(OSError) as exc_info:
571        os.stat(fullname)
572    assert exc_info.value.errno == errno.ENOENT
573    assert name not in os.listdir(mnt_dir)
574
575    fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
576    os.close(fd)
577
578    assert name in os.listdir(mnt_dir)
579    fstat = os.lstat(fullname)
580    assert stat.S_ISREG(fstat.st_mode)
581    assert fstat.st_nlink == 1
582    assert fstat.st_size == 0
583
584def tst_chown(mnt_dir):
585    filename = pjoin(mnt_dir, name_generator())
586    os.mkdir(filename)
587    fstat = os.lstat(filename)
588    uid = fstat.st_uid
589    gid = fstat.st_gid
590
591    uid_new = uid + 1
592    os.chown(filename, uid_new, -1)
593    fstat = os.lstat(filename)
594    assert fstat.st_uid == uid_new
595    assert fstat.st_gid == gid
596
597    gid_new = gid + 1
598    os.chown(filename, -1, gid_new)
599    fstat = os.lstat(filename)
600    assert fstat.st_uid == uid_new
601    assert fstat.st_gid == gid_new
602
603def tst_open_read(src_dir, mnt_dir):
604    name = name_generator()
605    with open(pjoin(src_dir, name), 'wb') as fh_out, \
606         open(TEST_FILE, 'rb') as fh_in:
607        shutil.copyfileobj(fh_in, fh_out)
608
609    assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
610
611def tst_open_write(src_dir, mnt_dir):
612    name = name_generator()
613    os_create(pjoin(src_dir, name))
614    fullname = pjoin(mnt_dir, name)
615    with open(fullname, 'wb') as fh_out, \
616         open(TEST_FILE, 'rb') as fh_in:
617        shutil.copyfileobj(fh_in, fh_out)
618
619    assert filecmp.cmp(fullname, TEST_FILE, False)
620
621def tst_append(src_dir, mnt_dir):
622    name = name_generator()
623    os_create(pjoin(src_dir, name))
624    fullname = pjoin(mnt_dir, name)
625    with os_open(fullname, os.O_WRONLY) as fd:
626        os.write(fd, b'foo\n')
627    with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd:
628        os.write(fd, b'bar\n')
629
630    with open(fullname, 'rb') as fh:
631        assert fh.read() == b'foo\nbar\n'
632
633def tst_seek(src_dir, mnt_dir):
634    name = name_generator()
635    os_create(pjoin(src_dir, name))
636    fullname = pjoin(mnt_dir, name)
637    with os_open(fullname, os.O_WRONLY) as fd:
638        os.lseek(fd, 1, os.SEEK_SET)
639        os.write(fd, b'foobar\n')
640    with os_open(fullname, os.O_WRONLY) as fd:
641        os.lseek(fd, 4, os.SEEK_SET)
642        os.write(fd, b'com')
643
644    with open(fullname, 'rb') as fh:
645        assert fh.read() == b'\0foocom\n'
646
647def tst_open_unlink(mnt_dir):
648    name = pjoin(mnt_dir, name_generator())
649    data1 = b'foo'
650    data2 = b'bar'
651    fullname = pjoin(mnt_dir, name)
652    with open(fullname, 'wb+', buffering=0) as fh:
653        fh.write(data1)
654        os.unlink(fullname)
655        with pytest.raises(OSError) as exc_info:
656            os.stat(fullname)
657        assert exc_info.value.errno == errno.ENOENT
658        assert name not in os.listdir(mnt_dir)
659        fh.write(data2)
660        fh.seek(0)
661        assert fh.read() == data1+data2
662
663def tst_statvfs(mnt_dir):
664    os.statvfs(mnt_dir)
665
666def tst_link(mnt_dir):
667    name1 = pjoin(mnt_dir, name_generator())
668    name2 = pjoin(mnt_dir, name_generator())
669    shutil.copyfile(TEST_FILE, name1)
670    assert filecmp.cmp(name1, TEST_FILE, False)
671
672    fstat1 = os.lstat(name1)
673    assert fstat1.st_nlink == 1
674
675    os.link(name1, name2)
676
677    fstat1 = os.lstat(name1)
678    fstat2 = os.lstat(name2)
679    assert fstat1 == fstat2
680    assert fstat1.st_nlink == 2
681    assert os.path.basename(name2) in os.listdir(mnt_dir)
682    assert filecmp.cmp(name1, name2, False)
683
684    # Since RELEASE requests are asynchronous, it is possible that
685    # libfuse still considers the file to be open at this point
686    # and (since -o hard_remove is not used) renames it instead of
687    # deleting it. In that case, the following lstat() call will
688    # still report an st_nlink value of 2 (cf. issue #157).
689    os.unlink(name2)
690
691    assert os.path.basename(name2) not in os.listdir(mnt_dir)
692    with pytest.raises(FileNotFoundError):
693        os.lstat(name2)
694
695    # See above, we may have to wait until RELEASE has been
696    # received before the st_nlink value is correct.
697    maxwait = time.time() + 2
698    fstat1 = os.lstat(name1)
699    while fstat1.st_nlink == 2 and time.time() < maxwait:
700        fstat1 = os.lstat(name1)
701        time.sleep(0.1)
702    assert fstat1.st_nlink == 1
703
704    os.unlink(name1)
705
706def tst_readdir(src_dir, mnt_dir):
707    newdir = name_generator()
708
709    src_newdir = pjoin(src_dir, newdir)
710    mnt_newdir = pjoin(mnt_dir, newdir)
711    file_ = src_newdir + "/" + name_generator()
712    subdir = src_newdir + "/" + name_generator()
713    subfile = subdir + "/" + name_generator()
714
715    os.mkdir(src_newdir)
716    shutil.copyfile(TEST_FILE, file_)
717    os.mkdir(subdir)
718    shutil.copyfile(TEST_FILE, subfile)
719
720    listdir_is = os.listdir(mnt_newdir)
721    listdir_is.sort()
722    listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
723    listdir_should.sort()
724    assert listdir_is == listdir_should
725
726    inodes_is = readdir_inode(mnt_newdir)
727    inodes_should = readdir_inode(src_newdir)
728    assert inodes_is == inodes_should
729
730    os.unlink(file_)
731    os.unlink(subfile)
732    os.rmdir(subdir)
733    os.rmdir(src_newdir)
734
735def tst_readdir_big(src_dir, mnt_dir):
736
737    # Add enough entries so that readdir needs to be called
738    # multiple times.
739    fnames = []
740    for i in range(500):
741        fname  = ('A rather long filename to make sure that we '
742                  'fill up the buffer - ' * 3) + str(i)
743        with open(pjoin(src_dir, fname), 'w') as fh:
744            fh.write('File %d' % i)
745        fnames.append(fname)
746
747    listdir_is = sorted(os.listdir(mnt_dir))
748    listdir_should = sorted(os.listdir(src_dir))
749    assert listdir_is == listdir_should
750
751    inodes_is = readdir_inode(mnt_dir)
752    inodes_should = readdir_inode(src_dir)
753    assert inodes_is == inodes_should
754
755    for fname in fnames:
756        stat_src = os.stat(pjoin(src_dir, fname))
757        stat_mnt = os.stat(pjoin(mnt_dir, fname))
758        assert stat_src.st_ino == stat_mnt.st_ino
759        assert stat_src.st_mtime == stat_mnt.st_mtime
760        assert stat_src.st_ctime == stat_mnt.st_ctime
761        assert stat_src.st_size == stat_mnt.st_size
762        os.unlink(pjoin(src_dir, fname))
763
764def tst_truncate_path(mnt_dir):
765    assert len(TEST_DATA) > 1024
766
767    filename = pjoin(mnt_dir, name_generator())
768    with open(filename, 'wb') as fh:
769        fh.write(TEST_DATA)
770
771    fstat = os.stat(filename)
772    size = fstat.st_size
773    assert size == len(TEST_DATA)
774
775    # Add zeros at the end
776    os.truncate(filename, size + 1024)
777    assert os.stat(filename).st_size == size + 1024
778    with open(filename, 'rb') as fh:
779        assert fh.read(size) == TEST_DATA
780        assert fh.read(1025) == b'\0' * 1024
781
782    # Truncate data
783    os.truncate(filename, size - 1024)
784    assert os.stat(filename).st_size == size - 1024
785    with open(filename, 'rb') as fh:
786        assert fh.read(size) == TEST_DATA[:size-1024]
787
788    os.unlink(filename)
789
790def tst_truncate_fd(mnt_dir):
791    assert len(TEST_DATA) > 1024
792    with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
793        fd = fh.fileno()
794        fh.write(TEST_DATA)
795        fstat = os.fstat(fd)
796        size = fstat.st_size
797        assert size == len(TEST_DATA)
798
799        # Add zeros at the end
800        os.ftruncate(fd, size + 1024)
801        assert os.fstat(fd).st_size == size + 1024
802        fh.seek(0)
803        assert fh.read(size) == TEST_DATA
804        assert fh.read(1025) == b'\0' * 1024
805
806        # Truncate data
807        os.ftruncate(fd, size - 1024)
808        assert os.fstat(fd).st_size == size - 1024
809        fh.seek(0)
810        assert fh.read(size) == TEST_DATA[:size-1024]
811
812def tst_utimens(mnt_dir, ns_tol=0):
813    filename = pjoin(mnt_dir, name_generator())
814    os.mkdir(filename)
815    fstat = os.lstat(filename)
816
817    atime = fstat.st_atime + 42.28
818    mtime = fstat.st_mtime - 42.23
819    if sys.version_info < (3,3):
820        os.utime(filename, (atime, mtime))
821    else:
822        atime_ns = fstat.st_atime_ns + int(42.28*1e9)
823        mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
824        os.utime(filename, None, ns=(atime_ns, mtime_ns))
825
826    fstat = os.lstat(filename)
827
828    assert abs(fstat.st_atime - atime) < 1
829    assert abs(fstat.st_mtime - mtime) < 1
830    if sys.version_info >= (3,3):
831        assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
832        assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
833
834def tst_passthrough(src_dir, mnt_dir):
835    name = name_generator()
836    src_name = pjoin(src_dir, name)
837    mnt_name = pjoin(src_dir, name)
838    assert name not in os.listdir(src_dir)
839    assert name not in os.listdir(mnt_dir)
840    with open(src_name, 'w') as fh:
841        fh.write('Hello, world')
842    assert name in os.listdir(src_dir)
843    assert name in os.listdir(mnt_dir)
844    assert os.stat(src_name) == os.stat(mnt_name)
845
846    name = name_generator()
847    src_name = pjoin(src_dir, name)
848    mnt_name = pjoin(src_dir, name)
849    assert name not in os.listdir(src_dir)
850    assert name not in os.listdir(mnt_dir)
851    with open(mnt_name, 'w') as fh:
852        fh.write('Hello, world')
853    assert name in os.listdir(src_dir)
854    assert name in os.listdir(mnt_dir)
855    assert os.stat(src_name) == os.stat(mnt_name)
856
857
858def tst_xattr(mnt_dir):
859    path = os.path.join(mnt_dir, 'hello')
860    os.setxattr(path, b'hello_ll_setxattr_name', b'hello_ll_setxattr_value')
861    assert os.getxattr(path, b'hello_ll_getxattr_name') == b'hello_ll_getxattr_value'
862    os.removexattr(path, b'hello_ll_removexattr_name')
863
864
865# avoid warning about unused import
866assert test_printcap
867