1#!/usr/bin/env python3 2 3if __name__ == '__main__': 4 import pytest 5 import sys 6 sys.exit(pytest.main([__file__] + sys.argv[1:])) 7 8import subprocess 9import os 10import sys 11import py 12import pytest 13import stat 14import shutil 15import filecmp 16import tempfile 17import time 18import errno 19import sys 20import platform 21from looseversion import LooseVersion 22from tempfile import NamedTemporaryFile 23from contextlib import contextmanager 24from util import (wait_for_mount, umount, cleanup, base_cmdline, 25 safe_sleep, basename, fuse_test_marker, test_printcap, 26 fuse_proto, powerset) 27from os.path import join as pjoin 28 29pytestmark = fuse_test_marker() 30 31TEST_FILE = __file__ 32 33with open(TEST_FILE, 'rb') as fh: 34 TEST_DATA = fh.read() 35 36def name_generator(__ctr=[0]): 37 __ctr[0] += 1 38 return 'testfile_%d' % __ctr[0] 39 40options = [] 41if sys.platform == 'linux': 42 options.append('clone_fd') 43 44def invoke_directly(mnt_dir, name, options): 45 cmdline = base_cmdline + [ pjoin(basename, 'example', name), 46 '-f', mnt_dir, '-o', ','.join(options) ] 47 if name == 'hello_ll': 48 # supports single-threading only 49 cmdline.append('-s') 50 51 return cmdline 52 53def invoke_mount_fuse(mnt_dir, name, options): 54 return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'), 55 name, mnt_dir, '-o', ','.join(options) ] 56 57def invoke_mount_fuse_drop_privileges(mnt_dir, name, options): 58 if os.getuid() != 0: 59 pytest.skip('drop_privileges requires root, skipping.') 60 61 return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',)) 62 63class raii_tmpdir: 64 def __init__(self): 65 self.d = tempfile.mkdtemp() 66 67 def __str__(self): 68 return str(self.d) 69 70 def mkdir(self, path): 71 return py.path.local(str(self.d)).mkdir(path) 72 73@pytest.fixture 74def short_tmpdir(): 75 return raii_tmpdir() 76 77def readdir_inode(dir): 78 cmd = base_cmdline + [ pjoin(basename, 'test', 'readdir_inode'), dir ] 79 with subprocess.Popen(cmd, stdout=subprocess.PIPE, 80 universal_newlines=True) as proc: 81 lines = proc.communicate()[0].splitlines() 82 lines.sort() 83 return lines 84 85 86@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse, 87 invoke_mount_fuse_drop_privileges)) 88@pytest.mark.parametrize("options", powerset(options)) 89@pytest.mark.parametrize("name", ('hello', 'hello_ll')) 90def test_hello(tmpdir, name, options, cmdline_builder, output_checker): 91 mnt_dir = str(tmpdir) 92 mount_process = subprocess.Popen( 93 cmdline_builder(mnt_dir, name, options), 94 stdout=output_checker.fd, stderr=output_checker.fd) 95 try: 96 wait_for_mount(mount_process, mnt_dir) 97 assert os.listdir(mnt_dir) == [ 'hello' ] 98 filename = pjoin(mnt_dir, 'hello') 99 with open(filename, 'r') as fh: 100 assert fh.read() == 'Hello World!\n' 101 with pytest.raises(IOError) as exc_info: 102 open(filename, 'r+') 103 assert exc_info.value.errno == errno.EACCES 104 with pytest.raises(IOError) as exc_info: 105 open(filename + 'does-not-exist', 'r+') 106 assert exc_info.value.errno == errno.ENOENT 107 if name == 'hello_ll': 108 tst_xattr(mnt_dir) 109 except: 110 cleanup(mount_process, mnt_dir) 111 raise 112 else: 113 umount(mount_process, mnt_dir) 114 115@pytest.mark.parametrize("writeback", (False, True)) 116@pytest.mark.parametrize("name", ('passthrough', 'passthrough_plus', 117 'passthrough_fh', 'passthrough_ll')) 118@pytest.mark.parametrize("debug", (False, True)) 119def test_passthrough(short_tmpdir, name, debug, output_checker, writeback): 120 # Avoid false positives from libfuse debug messages 121 if debug: 122 output_checker.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$', 123 count=0) 124 125 # test_syscalls prints "No error" under FreeBSD 126 output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]", 127 count=0) 128 129 mnt_dir = str(short_tmpdir.mkdir('mnt')) 130 src_dir = str(short_tmpdir.mkdir('src')) 131 132 if name == 'passthrough_plus': 133 cmdline = base_cmdline + \ 134 [ pjoin(basename, 'example', 'passthrough'), 135 '--plus', '-f', mnt_dir ] 136 else: 137 cmdline = base_cmdline + \ 138 [ pjoin(basename, 'example', name), 139 '-f', mnt_dir ] 140 if debug: 141 cmdline.append('-d') 142 143 if writeback: 144 if name != 'passthrough_ll': 145 pytest.skip('example does not support writeback caching') 146 cmdline.append('-o') 147 cmdline.append('writeback') 148 149 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 150 stderr=output_checker.fd) 151 try: 152 wait_for_mount(mount_process, mnt_dir) 153 work_dir = mnt_dir + src_dir 154 155 tst_statvfs(work_dir) 156 tst_readdir(src_dir, work_dir) 157 tst_readdir_big(src_dir, work_dir) 158 tst_open_read(src_dir, work_dir) 159 tst_open_write(src_dir, work_dir) 160 tst_create(work_dir) 161 tst_passthrough(src_dir, work_dir) 162 tst_append(src_dir, work_dir) 163 tst_seek(src_dir, work_dir) 164 tst_mkdir(work_dir) 165 tst_rmdir(work_dir, src_dir) 166 tst_unlink(work_dir, src_dir) 167 tst_symlink(work_dir) 168 if os.getuid() == 0: 169 tst_chown(work_dir) 170 171 # Underlying fs may not have full nanosecond resolution 172 tst_utimens(work_dir, ns_tol=1000) 173 174 tst_link(work_dir) 175 tst_truncate_path(work_dir) 176 tst_truncate_fd(work_dir) 177 tst_open_unlink(work_dir) 178 179 syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'), 180 work_dir, ':' + src_dir ] 181 if writeback: 182 # When writeback caching is enabled, kernel has to open files for 183 # reading even when userspace opens with O_WDONLY. This fails if the 184 # filesystem process doesn't have special permission. 185 syscall_test_cmd.append('-53') 186 subprocess.check_call(syscall_test_cmd) 187 except: 188 cleanup(mount_process, mnt_dir) 189 raise 190 else: 191 umount(mount_process, mnt_dir) 192 193@pytest.mark.parametrize("cache", (False, True)) 194def test_passthrough_hp(short_tmpdir, cache, output_checker): 195 mnt_dir = str(short_tmpdir.mkdir('mnt')) 196 src_dir = str(short_tmpdir.mkdir('src')) 197 198 cmdline = base_cmdline + \ 199 [ pjoin(basename, 'example', 'passthrough_hp'), 200 src_dir, mnt_dir ] 201 202 cmdline.append('--foreground') 203 204 if not cache: 205 cmdline.append('--nocache') 206 207 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 208 stderr=output_checker.fd) 209 try: 210 wait_for_mount(mount_process, mnt_dir) 211 212 tst_statvfs(mnt_dir) 213 tst_readdir(src_dir, mnt_dir) 214 tst_readdir_big(src_dir, mnt_dir) 215 tst_open_read(src_dir, mnt_dir) 216 tst_open_write(src_dir, mnt_dir) 217 tst_create(mnt_dir) 218 if not cache: 219 tst_passthrough(src_dir, mnt_dir) 220 tst_append(src_dir, mnt_dir) 221 tst_seek(src_dir, mnt_dir) 222 tst_mkdir(mnt_dir) 223 if cache: 224 # if cache is enabled, no operations should go through 225 # src_dir as the cache will become stale. 226 tst_rmdir(mnt_dir) 227 tst_unlink(mnt_dir) 228 else: 229 tst_rmdir(mnt_dir, src_dir) 230 tst_unlink(mnt_dir, src_dir) 231 tst_symlink(mnt_dir) 232 if os.getuid() == 0: 233 tst_chown(mnt_dir) 234 235 # Underlying fs may not have full nanosecond resolution 236 tst_utimens(mnt_dir, ns_tol=1000) 237 238 tst_link(mnt_dir) 239 tst_truncate_path(mnt_dir) 240 tst_truncate_fd(mnt_dir) 241 tst_open_unlink(mnt_dir) 242 243 # test_syscalls assumes that changes in source directory 244 # will be reflected immediately in mountpoint, so we 245 # can't use it. 246 if not cache: 247 syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'), 248 mnt_dir, ':' + src_dir ] 249 # unlinked testfiles check fails without kernel fix 250 # "fuse: fix illegal access to inode with reused nodeid" 251 # so opt-in for this test from kernel 5.14 252 if LooseVersion(platform.release()) >= '5.14': 253 syscall_test_cmd.append('-u') 254 subprocess.check_call(syscall_test_cmd) 255 except: 256 cleanup(mount_process, mnt_dir) 257 raise 258 else: 259 umount(mount_process, mnt_dir) 260 261 262@pytest.mark.skipif(fuse_proto < (7,11), 263 reason='not supported by running kernel') 264def test_ioctl(tmpdir, output_checker): 265 progname = pjoin(basename, 'example', 'ioctl') 266 if not os.path.exists(progname): 267 pytest.skip('%s not built' % os.path.basename(progname)) 268 269 mnt_dir = str(tmpdir) 270 testfile = pjoin(mnt_dir, 'fioc') 271 cmdline = base_cmdline + [progname, '-f', mnt_dir ] 272 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 273 stderr=output_checker.fd) 274 try: 275 wait_for_mount(mount_process, mnt_dir) 276 277 cmdline = base_cmdline + \ 278 [ pjoin(basename, 'example', 'ioctl_client'), 279 testfile ] 280 assert subprocess.check_output(cmdline) == b'0\n' 281 with open(testfile, 'wb') as fh: 282 fh.write(b'foobar') 283 assert subprocess.check_output(cmdline) == b'6\n' 284 subprocess.check_call(cmdline + [ '3' ]) 285 with open(testfile, 'rb') as fh: 286 assert fh.read()== b'foo' 287 except: 288 cleanup(mount_process, mnt_dir) 289 raise 290 else: 291 umount(mount_process, mnt_dir) 292 293def test_poll(tmpdir, output_checker): 294 mnt_dir = str(tmpdir) 295 cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'), 296 '-f', mnt_dir ] 297 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 298 stderr=output_checker.fd) 299 try: 300 wait_for_mount(mount_process, mnt_dir) 301 cmdline = base_cmdline + \ 302 [ pjoin(basename, 'example', 'poll_client') ] 303 subprocess.check_call(cmdline, cwd=mnt_dir) 304 except: 305 cleanup(mount_process, mnt_dir) 306 raise 307 else: 308 umount(mount_process, mnt_dir) 309 310def test_null(tmpdir, output_checker): 311 progname = pjoin(basename, 'example', 'null') 312 if not os.path.exists(progname): 313 pytest.skip('%s not built' % os.path.basename(progname)) 314 315 mnt_file = str(tmpdir) + '/file' 316 with open(mnt_file, 'w') as fh: 317 fh.write('dummy') 318 cmdline = base_cmdline + [ progname, '-f', mnt_file ] 319 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 320 stderr=output_checker.fd) 321 def test_fn(name): 322 return os.stat(name).st_size > 4000 323 try: 324 wait_for_mount(mount_process, mnt_file, test_fn) 325 with open(mnt_file, 'rb') as fh: 326 assert fh.read(382) == b'\0' * 382 327 with open(mnt_file, 'wb') as fh: 328 fh.write(b'whatever') 329 except: 330 cleanup(mount_process, mnt_file) 331 raise 332 else: 333 umount(mount_process, mnt_file) 334 335 336@pytest.mark.skipif(fuse_proto < (7,12), 337 reason='not supported by running kernel') 338@pytest.mark.parametrize("only_expire", ("invalidate_entries", "expire_entries")) 339@pytest.mark.parametrize("notify", (True, False)) 340def test_notify_inval_entry(tmpdir, only_expire, notify, output_checker): 341 mnt_dir = str(tmpdir) 342 cmdline = base_cmdline + \ 343 [ pjoin(basename, 'example', 'notify_inval_entry'), 344 '-f', '--update-interval=1', 345 '--timeout=5', mnt_dir ] 346 if not notify: 347 cmdline.append('--no-notify') 348 if only_expire == "expire_entries": 349 cmdline.append('--only-expire') 350 if fuse_proto < (7,38): 351 pytest.skip('only-expire not supported by running kernel') 352 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 353 stderr=output_checker.fd) 354 try: 355 wait_for_mount(mount_process, mnt_dir) 356 fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0]) 357 try: 358 os.stat(fname) 359 except FileNotFoundError: 360 # We may have hit a race condition and issued 361 # readdir just before the name changed 362 fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0]) 363 os.stat(fname) 364 365 safe_sleep(2) 366 if not notify: 367 os.stat(fname) 368 safe_sleep(5) 369 with pytest.raises(FileNotFoundError): 370 os.stat(fname) 371 except: 372 cleanup(mount_process, mnt_dir) 373 raise 374 else: 375 umount(mount_process, mnt_dir) 376 377@pytest.mark.parametrize("intended_user", ('root', 'non_root')) 378def test_dev_auto_unmount(short_tmpdir, output_checker, intended_user): 379 """Check that root can mount with dev and auto_unmount 380 (but non-root cannot). 381 Split into root vs non-root, so that the output of pytest 382 makes clear what functionality is being tested.""" 383 if os.getuid() == 0 and intended_user == 'non_root': 384 pytest.skip('needs to run as non-root') 385 if os.getuid() != 0 and intended_user == 'root': 386 pytest.skip('needs to run as root') 387 mnt_dir = str(short_tmpdir.mkdir('mnt')) 388 src_dir = str('/dev') 389 cmdline = base_cmdline + \ 390 [ pjoin(basename, 'example', 'passthrough_ll'), 391 '-o', f'source={src_dir},dev,auto_unmount', 392 '-f', mnt_dir ] 393 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 394 stderr=output_checker.fd) 395 try: 396 wait_for_mount(mount_process, mnt_dir) 397 if os.getuid() == 0: 398 open(pjoin(mnt_dir, 'null')).close() 399 else: 400 with pytest.raises(PermissionError): 401 open(pjoin(mnt_dir, 'null')).close() 402 except: 403 cleanup(mount_process, mnt_dir) 404 raise 405 else: 406 umount(mount_process, mnt_dir) 407 408@pytest.mark.skipif(os.getuid() != 0, 409 reason='needs to run as root') 410def test_cuse(output_checker): 411 412 # Valgrind warns about unknown ioctls, that's ok 413 output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n' 414 r'==\1== \s{3}.+\n' 415 r'==\1== \s{3}.+$', count=0) 416 417 devname = 'cuse-test-%d' % os.getpid() 418 devpath = '/dev/%s' % devname 419 cmdline = base_cmdline + \ 420 [ pjoin(basename, 'example', 'cuse'), 421 '-f', '--name=%s' % devname ] 422 mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, 423 stderr=output_checker.fd) 424 425 cmdline = base_cmdline + \ 426 [ pjoin(basename, 'example', 'cuse_client'), 427 devpath ] 428 try: 429 wait_for_mount(mount_process, devpath, 430 test_fn=os.path.exists) 431 assert subprocess.check_output(cmdline + ['s']) == b'0\n' 432 data = b'some test data' 433 off = 5 434 proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ], 435 stdin=subprocess.PIPE) 436 proc.stdin.write(data) 437 proc.stdin.close() 438 assert proc.wait(timeout=10) == 0 439 size = str(off + len(data)).encode() + b'\n' 440 assert subprocess.check_output(cmdline + ['s']) == size 441 out = subprocess.check_output( 442 cmdline + [ 'r', str(off + len(data) + 2), '0' ]) 443 assert out == (b'\0' * off) + data 444 finally: 445 mount_process.terminate() 446 447def test_release_unlink_race(tmpdir, output_checker): 448 """test case for Issue #746 449 450 If RELEASE and UNLINK opcodes are sent back to back, and fuse_fs_release() 451 and fuse_fs_rename() are slow to execute, UNLINK will run while RELEASE is 452 still executing. UNLINK will try to rename the file and, while the rename 453 is happening, the RELEASE will finish executing. As a result, RELEASE will 454 not detect in time that UNLINK has happened, and UNLINK will not detect in 455 time that RELEASE has happened. 456 457 458 NOTE: This is triggered only when nullpath_ok is set. 459 460 If it is NOT SET then get_path_nullok() called by fuse_lib_release() will 461 call get_path_common() and lock the path, and then the fuse_lib_unlink() 462 will wait for the path to be unlocked before executing and thus synchronise 463 with fuse_lib_release(). 464 465 If it is SET then get_path_nullok() will just set the path to null and 466 return without locking anything and thus allowing fuse_lib_unlink() to 467 eventually execute unimpeded while fuse_lib_release() is still running. 468 """ 469 470 fuse_mountpoint = str(tmpdir) 471 472 fuse_binary_command = base_cmdline + \ 473 [ pjoin(basename, 'test', 'release_unlink_race'), 474 "-f", fuse_mountpoint] 475 476 fuse_process = subprocess.Popen(fuse_binary_command, 477 stdout=output_checker.fd, 478 stderr=output_checker.fd) 479 480 try: 481 wait_for_mount(fuse_process, fuse_mountpoint) 482 483 temp_dir = tempfile.TemporaryDirectory(dir="/tmp/") 484 temp_dir_path = temp_dir.name 485 486 fuse_temp_file, fuse_temp_file_path = tempfile.mkstemp(dir=(fuse_mountpoint + temp_dir_path)) 487 488 os.close(fuse_temp_file) 489 os.unlink(fuse_temp_file_path) 490 491 # needed for slow CI/CD pipelines for unlink OP to complete processing 492 safe_sleep(3) 493 494 assert os.listdir(temp_dir_path) == [] 495 496 except: 497 temp_dir.cleanup() 498 cleanup(fuse_process, fuse_mountpoint) 499 raise 500 501 else: 502 temp_dir.cleanup() 503 umount(fuse_process, fuse_mountpoint) 504 505 506@contextmanager 507def os_open(name, flags): 508 fd = os.open(name, flags) 509 try: 510 yield fd 511 finally: 512 os.close(fd) 513 514def os_create(name): 515 os.close(os.open(name, os.O_CREAT | os.O_RDWR)) 516 517def tst_unlink(mnt_dir, src_dir=None): 518 name = name_generator() 519 fullname = mnt_dir + "/" + name 520 srcname = fullname 521 if src_dir is not None: 522 srcname = pjoin(src_dir, name) 523 with open(srcname, 'wb') as fh: 524 fh.write(b'hello') 525 assert name in os.listdir(mnt_dir) 526 os.unlink(fullname) 527 with pytest.raises(OSError) as exc_info: 528 os.stat(fullname) 529 assert exc_info.value.errno == errno.ENOENT 530 assert name not in os.listdir(mnt_dir) 531 532def tst_mkdir(mnt_dir): 533 dirname = name_generator() 534 fullname = mnt_dir + "/" + dirname 535 os.mkdir(fullname) 536 fstat = os.stat(fullname) 537 assert stat.S_ISDIR(fstat.st_mode) 538 assert os.listdir(fullname) == [] 539 # Some filesystem (e.g. BTRFS) don't track st_nlink for directories 540 assert fstat.st_nlink in (1,2) 541 assert dirname in os.listdir(mnt_dir) 542 543def tst_rmdir(mnt_dir, src_dir=None): 544 name = name_generator() 545 fullname = mnt_dir + "/" + name 546 srcname = fullname 547 if src_dir is not None: 548 srcname = pjoin(src_dir, name) 549 os.mkdir(srcname) 550 assert name in os.listdir(mnt_dir) 551 os.rmdir(fullname) 552 with pytest.raises(OSError) as exc_info: 553 os.stat(fullname) 554 assert exc_info.value.errno == errno.ENOENT 555 assert name not in os.listdir(mnt_dir) 556 557def tst_symlink(mnt_dir): 558 linkname = name_generator() 559 fullname = mnt_dir + "/" + linkname 560 os.symlink("/imaginary/dest", fullname) 561 fstat = os.lstat(fullname) 562 assert stat.S_ISLNK(fstat.st_mode) 563 assert os.readlink(fullname) == "/imaginary/dest" 564 assert fstat.st_nlink == 1 565 assert linkname in os.listdir(mnt_dir) 566 567def tst_create(mnt_dir): 568 name = name_generator() 569 fullname = pjoin(mnt_dir, name) 570 with pytest.raises(OSError) as exc_info: 571 os.stat(fullname) 572 assert exc_info.value.errno == errno.ENOENT 573 assert name not in os.listdir(mnt_dir) 574 575 fd = os.open(fullname, os.O_CREAT | os.O_RDWR) 576 os.close(fd) 577 578 assert name in os.listdir(mnt_dir) 579 fstat = os.lstat(fullname) 580 assert stat.S_ISREG(fstat.st_mode) 581 assert fstat.st_nlink == 1 582 assert fstat.st_size == 0 583 584def tst_chown(mnt_dir): 585 filename = pjoin(mnt_dir, name_generator()) 586 os.mkdir(filename) 587 fstat = os.lstat(filename) 588 uid = fstat.st_uid 589 gid = fstat.st_gid 590 591 uid_new = uid + 1 592 os.chown(filename, uid_new, -1) 593 fstat = os.lstat(filename) 594 assert fstat.st_uid == uid_new 595 assert fstat.st_gid == gid 596 597 gid_new = gid + 1 598 os.chown(filename, -1, gid_new) 599 fstat = os.lstat(filename) 600 assert fstat.st_uid == uid_new 601 assert fstat.st_gid == gid_new 602 603def tst_open_read(src_dir, mnt_dir): 604 name = name_generator() 605 with open(pjoin(src_dir, name), 'wb') as fh_out, \ 606 open(TEST_FILE, 'rb') as fh_in: 607 shutil.copyfileobj(fh_in, fh_out) 608 609 assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False) 610 611def tst_open_write(src_dir, mnt_dir): 612 name = name_generator() 613 os_create(pjoin(src_dir, name)) 614 fullname = pjoin(mnt_dir, name) 615 with open(fullname, 'wb') as fh_out, \ 616 open(TEST_FILE, 'rb') as fh_in: 617 shutil.copyfileobj(fh_in, fh_out) 618 619 assert filecmp.cmp(fullname, TEST_FILE, False) 620 621def tst_append(src_dir, mnt_dir): 622 name = name_generator() 623 os_create(pjoin(src_dir, name)) 624 fullname = pjoin(mnt_dir, name) 625 with os_open(fullname, os.O_WRONLY) as fd: 626 os.write(fd, b'foo\n') 627 with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd: 628 os.write(fd, b'bar\n') 629 630 with open(fullname, 'rb') as fh: 631 assert fh.read() == b'foo\nbar\n' 632 633def tst_seek(src_dir, mnt_dir): 634 name = name_generator() 635 os_create(pjoin(src_dir, name)) 636 fullname = pjoin(mnt_dir, name) 637 with os_open(fullname, os.O_WRONLY) as fd: 638 os.lseek(fd, 1, os.SEEK_SET) 639 os.write(fd, b'foobar\n') 640 with os_open(fullname, os.O_WRONLY) as fd: 641 os.lseek(fd, 4, os.SEEK_SET) 642 os.write(fd, b'com') 643 644 with open(fullname, 'rb') as fh: 645 assert fh.read() == b'\0foocom\n' 646 647def tst_open_unlink(mnt_dir): 648 name = pjoin(mnt_dir, name_generator()) 649 data1 = b'foo' 650 data2 = b'bar' 651 fullname = pjoin(mnt_dir, name) 652 with open(fullname, 'wb+', buffering=0) as fh: 653 fh.write(data1) 654 os.unlink(fullname) 655 with pytest.raises(OSError) as exc_info: 656 os.stat(fullname) 657 assert exc_info.value.errno == errno.ENOENT 658 assert name not in os.listdir(mnt_dir) 659 fh.write(data2) 660 fh.seek(0) 661 assert fh.read() == data1+data2 662 663def tst_statvfs(mnt_dir): 664 os.statvfs(mnt_dir) 665 666def tst_link(mnt_dir): 667 name1 = pjoin(mnt_dir, name_generator()) 668 name2 = pjoin(mnt_dir, name_generator()) 669 shutil.copyfile(TEST_FILE, name1) 670 assert filecmp.cmp(name1, TEST_FILE, False) 671 672 fstat1 = os.lstat(name1) 673 assert fstat1.st_nlink == 1 674 675 os.link(name1, name2) 676 677 fstat1 = os.lstat(name1) 678 fstat2 = os.lstat(name2) 679 assert fstat1 == fstat2 680 assert fstat1.st_nlink == 2 681 assert os.path.basename(name2) in os.listdir(mnt_dir) 682 assert filecmp.cmp(name1, name2, False) 683 684 # Since RELEASE requests are asynchronous, it is possible that 685 # libfuse still considers the file to be open at this point 686 # and (since -o hard_remove is not used) renames it instead of 687 # deleting it. In that case, the following lstat() call will 688 # still report an st_nlink value of 2 (cf. issue #157). 689 os.unlink(name2) 690 691 assert os.path.basename(name2) not in os.listdir(mnt_dir) 692 with pytest.raises(FileNotFoundError): 693 os.lstat(name2) 694 695 # See above, we may have to wait until RELEASE has been 696 # received before the st_nlink value is correct. 697 maxwait = time.time() + 2 698 fstat1 = os.lstat(name1) 699 while fstat1.st_nlink == 2 and time.time() < maxwait: 700 fstat1 = os.lstat(name1) 701 time.sleep(0.1) 702 assert fstat1.st_nlink == 1 703 704 os.unlink(name1) 705 706def tst_readdir(src_dir, mnt_dir): 707 newdir = name_generator() 708 709 src_newdir = pjoin(src_dir, newdir) 710 mnt_newdir = pjoin(mnt_dir, newdir) 711 file_ = src_newdir + "/" + name_generator() 712 subdir = src_newdir + "/" + name_generator() 713 subfile = subdir + "/" + name_generator() 714 715 os.mkdir(src_newdir) 716 shutil.copyfile(TEST_FILE, file_) 717 os.mkdir(subdir) 718 shutil.copyfile(TEST_FILE, subfile) 719 720 listdir_is = os.listdir(mnt_newdir) 721 listdir_is.sort() 722 listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ] 723 listdir_should.sort() 724 assert listdir_is == listdir_should 725 726 inodes_is = readdir_inode(mnt_newdir) 727 inodes_should = readdir_inode(src_newdir) 728 assert inodes_is == inodes_should 729 730 os.unlink(file_) 731 os.unlink(subfile) 732 os.rmdir(subdir) 733 os.rmdir(src_newdir) 734 735def tst_readdir_big(src_dir, mnt_dir): 736 737 # Add enough entries so that readdir needs to be called 738 # multiple times. 739 fnames = [] 740 for i in range(500): 741 fname = ('A rather long filename to make sure that we ' 742 'fill up the buffer - ' * 3) + str(i) 743 with open(pjoin(src_dir, fname), 'w') as fh: 744 fh.write('File %d' % i) 745 fnames.append(fname) 746 747 listdir_is = sorted(os.listdir(mnt_dir)) 748 listdir_should = sorted(os.listdir(src_dir)) 749 assert listdir_is == listdir_should 750 751 inodes_is = readdir_inode(mnt_dir) 752 inodes_should = readdir_inode(src_dir) 753 assert inodes_is == inodes_should 754 755 for fname in fnames: 756 stat_src = os.stat(pjoin(src_dir, fname)) 757 stat_mnt = os.stat(pjoin(mnt_dir, fname)) 758 assert stat_src.st_ino == stat_mnt.st_ino 759 assert stat_src.st_mtime == stat_mnt.st_mtime 760 assert stat_src.st_ctime == stat_mnt.st_ctime 761 assert stat_src.st_size == stat_mnt.st_size 762 os.unlink(pjoin(src_dir, fname)) 763 764def tst_truncate_path(mnt_dir): 765 assert len(TEST_DATA) > 1024 766 767 filename = pjoin(mnt_dir, name_generator()) 768 with open(filename, 'wb') as fh: 769 fh.write(TEST_DATA) 770 771 fstat = os.stat(filename) 772 size = fstat.st_size 773 assert size == len(TEST_DATA) 774 775 # Add zeros at the end 776 os.truncate(filename, size + 1024) 777 assert os.stat(filename).st_size == size + 1024 778 with open(filename, 'rb') as fh: 779 assert fh.read(size) == TEST_DATA 780 assert fh.read(1025) == b'\0' * 1024 781 782 # Truncate data 783 os.truncate(filename, size - 1024) 784 assert os.stat(filename).st_size == size - 1024 785 with open(filename, 'rb') as fh: 786 assert fh.read(size) == TEST_DATA[:size-1024] 787 788 os.unlink(filename) 789 790def tst_truncate_fd(mnt_dir): 791 assert len(TEST_DATA) > 1024 792 with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh: 793 fd = fh.fileno() 794 fh.write(TEST_DATA) 795 fstat = os.fstat(fd) 796 size = fstat.st_size 797 assert size == len(TEST_DATA) 798 799 # Add zeros at the end 800 os.ftruncate(fd, size + 1024) 801 assert os.fstat(fd).st_size == size + 1024 802 fh.seek(0) 803 assert fh.read(size) == TEST_DATA 804 assert fh.read(1025) == b'\0' * 1024 805 806 # Truncate data 807 os.ftruncate(fd, size - 1024) 808 assert os.fstat(fd).st_size == size - 1024 809 fh.seek(0) 810 assert fh.read(size) == TEST_DATA[:size-1024] 811 812def tst_utimens(mnt_dir, ns_tol=0): 813 filename = pjoin(mnt_dir, name_generator()) 814 os.mkdir(filename) 815 fstat = os.lstat(filename) 816 817 atime = fstat.st_atime + 42.28 818 mtime = fstat.st_mtime - 42.23 819 if sys.version_info < (3,3): 820 os.utime(filename, (atime, mtime)) 821 else: 822 atime_ns = fstat.st_atime_ns + int(42.28*1e9) 823 mtime_ns = fstat.st_mtime_ns - int(42.23*1e9) 824 os.utime(filename, None, ns=(atime_ns, mtime_ns)) 825 826 fstat = os.lstat(filename) 827 828 assert abs(fstat.st_atime - atime) < 1 829 assert abs(fstat.st_mtime - mtime) < 1 830 if sys.version_info >= (3,3): 831 assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol 832 assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol 833 834def tst_passthrough(src_dir, mnt_dir): 835 name = name_generator() 836 src_name = pjoin(src_dir, name) 837 mnt_name = pjoin(src_dir, name) 838 assert name not in os.listdir(src_dir) 839 assert name not in os.listdir(mnt_dir) 840 with open(src_name, 'w') as fh: 841 fh.write('Hello, world') 842 assert name in os.listdir(src_dir) 843 assert name in os.listdir(mnt_dir) 844 assert os.stat(src_name) == os.stat(mnt_name) 845 846 name = name_generator() 847 src_name = pjoin(src_dir, name) 848 mnt_name = pjoin(src_dir, name) 849 assert name not in os.listdir(src_dir) 850 assert name not in os.listdir(mnt_dir) 851 with open(mnt_name, 'w') as fh: 852 fh.write('Hello, world') 853 assert name in os.listdir(src_dir) 854 assert name in os.listdir(mnt_dir) 855 assert os.stat(src_name) == os.stat(mnt_name) 856 857 858def tst_xattr(mnt_dir): 859 path = os.path.join(mnt_dir, 'hello') 860 os.setxattr(path, b'hello_ll_setxattr_name', b'hello_ll_setxattr_value') 861 assert os.getxattr(path, b'hello_ll_getxattr_name') == b'hello_ll_getxattr_value' 862 os.removexattr(path, b'hello_ll_removexattr_name') 863 864 865# avoid warning about unused import 866assert test_printcap 867