blob: 457c2a01bcc1d18429f5e362791ec7bf9dbbee93 [file] [log] [blame]
#!/usr/bin/env python3
if __name__ == '__main__':
import pytest
import sys
sys.exit(pytest.main([__file__] + sys.argv[1:]))
import subprocess
import os
import sys
import pytest
import stat
import shutil
import filecmp
import errno
import platform
from distutils.version import LooseVersion
from tempfile import NamedTemporaryFile
from util import (wait_for_mount, umount, cleanup, base_cmdline,
safe_sleep, basename)
from os.path import join as pjoin
TEST_FILE = __file__
with open(TEST_FILE, 'rb') as fh:
TEST_DATA = fh.read()
def name_generator(__ctr=[0]):
__ctr[0] += 1
return 'testfile_%d' % __ctr[0]
@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
@pytest.mark.parametrize("options", ([], ['-o', 'clone_fd']))
def test_hello(tmpdir, name, options):
mnt_dir = str(tmpdir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', name),
'-f', mnt_dir ] + options
if name == 'hello_ll':
# supports single-threading only
cmdline.append('-s')
mount_process = subprocess.Popen(cmdline)
try:
wait_for_mount(mount_process, mnt_dir)
assert os.listdir(mnt_dir) == [ 'hello' ]
filename = pjoin(mnt_dir, 'hello')
with open(filename, 'r') as fh:
assert fh.read() == 'Hello World!\n'
with pytest.raises(IOError) as exc_info:
open(filename, 'r+')
assert exc_info.value.errno == errno.EACCES
with pytest.raises(IOError) as exc_info:
open(filename + 'does-not-exist', 'r+')
assert exc_info.value.errno == errno.ENOENT
except:
cleanup(mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.parametrize("name", ('passthrough', 'passthrough_fh',
'passthrough_ll'))
@pytest.mark.parametrize("debug", (True, False))
def test_passthrough(tmpdir, name, debug):
mnt_dir = str(tmpdir.mkdir('mnt'))
src_dir = str(tmpdir.mkdir('src'))
cmdline = base_cmdline + \
[ pjoin(basename, 'example', name),
'-f', mnt_dir ]
if debug:
cmdline.append('-d')
mount_process = subprocess.Popen(cmdline)
try:
wait_for_mount(mount_process, mnt_dir)
work_dir = pjoin(mnt_dir, src_dir)
subprocess.check_call([ os.path.join(basename, 'test', 'test'),
work_dir, ':' + src_dir ])
tst_write(work_dir)
tst_mkdir(work_dir)
tst_symlink(work_dir)
tst_mknod(work_dir)
if os.getuid() == 0:
tst_chown(work_dir)
# Underlying fs may not have full nanosecond resolution
tst_utimens(work_dir, ns_tol=1000)
tst_link(work_dir)
tst_readdir(work_dir)
tst_statvfs(work_dir)
tst_truncate_path(work_dir)
tst_truncate_fd(work_dir)
tst_unlink(work_dir)
tst_passthrough(src_dir, work_dir)
except:
cleanup(mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
def test_ioctl(tmpdir):
mnt_dir = str(tmpdir)
testfile = pjoin(mnt_dir, 'fioc')
cmdline = base_cmdline + \
[pjoin(basename, 'example', 'ioctl'), '-f', mnt_dir ]
mount_process = subprocess.Popen(cmdline)
try:
wait_for_mount(mount_process, mnt_dir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'ioctl_client'),
testfile ]
assert subprocess.check_output(cmdline) == b'0\n'
with open(testfile, 'wb') as fh:
fh.write(b'foobar')
assert subprocess.check_output(cmdline) == b'6\n'
subprocess.check_call(cmdline + [ '3' ])
with open(testfile, 'rb') as fh:
assert fh.read()== b'foo'
except:
cleanup(mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
def test_poll(tmpdir):
mnt_dir = str(tmpdir)
cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'),
'-f', mnt_dir ]
mount_process = subprocess.Popen(cmdline)
try:
wait_for_mount(mount_process, mnt_dir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'poll_client') ]
subprocess.check_call(cmdline, cwd=mnt_dir)
except:
cleanup(mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.parametrize("name",
('notify_inval_inode',
'notify_store_retrieve'))
@pytest.mark.parametrize("notify", (True, False))
def test_notify1(tmpdir, name, notify):
mnt_dir = str(tmpdir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', name),
'-f', '--update-interval=1', mnt_dir ]
if not notify:
cmdline.append('--no-notify')
mount_process = subprocess.Popen(cmdline)
try:
wait_for_mount(mount_process, mnt_dir)
filename = pjoin(mnt_dir, 'current_time')
with open(filename, 'r') as fh:
read1 = fh.read()
safe_sleep(2)
with open(filename, 'r') as fh:
read2 = fh.read()
if notify:
assert read1 != read2
else:
assert read1 == read2
except:
cleanup(mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.parametrize("notify", (True, False))
def test_notify_inval_entry(tmpdir, notify):
mnt_dir = str(tmpdir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'notify_inval_entry'),
'-f', '--update-interval=1',
'--timeout=5', mnt_dir ]
if not notify:
cmdline.append('--no-notify')
mount_process = subprocess.Popen(cmdline)
try:
wait_for_mount(mount_process, mnt_dir)
fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
try:
os.stat(fname)
except FileNotFoundError:
# We may have hit a race condition and issued
# readdir just before the name changed
fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
os.stat(fname)
safe_sleep(2)
if not notify:
os.stat(fname)
safe_sleep(5)
with pytest.raises(FileNotFoundError):
os.stat(fname)
except:
cleanup(mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.parametrize("writeback", (False, True))
def test_write_cache(tmpdir, writeback):
if writeback and LooseVersion(platform.release()) < '3.14':
pytest.skip('Requires kernel 3.14 or newer')
# This test hangs under Valgrind when running close(fd)
# test_write_cache.c:test_fs(). Most likely this is because of an internal
# deadlock in valgrind, it probably assumes that until close() returns,
# control does not come to the program.
mnt_dir = str(tmpdir)
cmdline = [ pjoin(basename, 'test', 'test_write_cache'),
mnt_dir ]
if writeback:
cmdline.append('-owriteback_cache')
subprocess.check_call(cmdline)
@pytest.mark.skipif(os.getuid() != 0,
reason='needs to run as root')
def test_cuse(capfd):
# Valgrind warns about unknown ioctls, that's ok
capfd.register_output(r'^==([0-9]+).+unhandled ioctl.+\n'
r'==\1== \s{3}.+\n'
r'==\1== \s{3}.+$', count=0)
devname = 'cuse-test-%d' % os.getpid()
devpath = '/dev/%s' % devname
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'cuse'),
'-f', '--name=%s' % devname ]
mount_process = subprocess.Popen(cmdline)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'cuse_client'),
devpath ]
try:
wait_for_mount(mount_process, devpath,
test_fn=os.path.exists)
assert subprocess.check_output(cmdline + ['s']) == b'0\n'
data = b'some test data'
off = 5
proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ],
stdin=subprocess.PIPE)
proc.stdin.write(data)
proc.stdin.close()
assert proc.wait(timeout=10) == 0
size = str(off + len(data)).encode() + b'\n'
assert subprocess.check_output(cmdline + ['s']) == size
out = subprocess.check_output(
cmdline + [ 'r', str(off + len(data) + 2), '0' ])
assert out == (b'\0' * off) + data
finally:
mount_process.terminate()
def checked_unlink(filename, path, isdir=False):
fullname = pjoin(path, filename)
if isdir:
os.rmdir(fullname)
else:
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert filename not in os.listdir(path)
def tst_mkdir(mnt_dir):
dirname = name_generator()
fullname = mnt_dir + "/" + dirname
os.mkdir(fullname)
fstat = os.stat(fullname)
assert stat.S_ISDIR(fstat.st_mode)
assert os.listdir(fullname) == []
assert fstat.st_nlink in (1,2)
assert dirname in os.listdir(mnt_dir)
checked_unlink(dirname, mnt_dir, isdir=True)
def tst_symlink(mnt_dir):
linkname = name_generator()
fullname = mnt_dir + "/" + linkname
os.symlink("/imaginary/dest", fullname)
fstat = os.lstat(fullname)
assert stat.S_ISLNK(fstat.st_mode)
assert os.readlink(fullname) == "/imaginary/dest"
assert fstat.st_nlink == 1
assert linkname in os.listdir(mnt_dir)
checked_unlink(linkname, mnt_dir)
def tst_mknod(mnt_dir):
filename = pjoin(mnt_dir, name_generator())
shutil.copyfile(TEST_FILE, filename)
fstat = os.lstat(filename)
assert stat.S_ISREG(fstat.st_mode)
assert fstat.st_nlink == 1
assert os.path.basename(filename) in os.listdir(mnt_dir)
assert filecmp.cmp(TEST_FILE, filename, False)
checked_unlink(filename, mnt_dir)
def tst_chown(mnt_dir):
filename = pjoin(mnt_dir, name_generator())
os.mkdir(filename)
fstat = os.lstat(filename)
uid = fstat.st_uid
gid = fstat.st_gid
uid_new = uid + 1
os.chown(filename, uid_new, -1)
fstat = os.lstat(filename)
assert fstat.st_uid == uid_new
assert fstat.st_gid == gid
gid_new = gid + 1
os.chown(filename, -1, gid_new)
fstat = os.lstat(filename)
assert fstat.st_uid == uid_new
assert fstat.st_gid == gid_new
checked_unlink(filename, mnt_dir, isdir=True)
def tst_write(mnt_dir):
name = pjoin(mnt_dir, name_generator())
shutil.copyfile(TEST_FILE, name)
assert filecmp.cmp(name, TEST_FILE, False)
checked_unlink(name, mnt_dir)
def tst_unlink(mnt_dir):
name = pjoin(mnt_dir, name_generator())
data1 = b'foo'
data2 = b'bar'
with open(pjoin(mnt_dir, name), 'wb+', buffering=0) as fh:
fh.write(data1)
checked_unlink(name, mnt_dir)
fh.write(data2)
fh.seek(0)
assert fh.read() == data1+data2
def tst_statvfs(mnt_dir):
os.statvfs(mnt_dir)
def tst_link(mnt_dir):
name1 = pjoin(mnt_dir, name_generator())
name2 = pjoin(mnt_dir, name_generator())
shutil.copyfile(TEST_FILE, name1)
assert filecmp.cmp(name1, TEST_FILE, False)
os.link(name1, name2)
fstat1 = os.lstat(name1)
fstat2 = os.lstat(name2)
assert fstat1 == fstat2
assert fstat1.st_nlink == 2
assert os.path.basename(name2) in os.listdir(mnt_dir)
assert filecmp.cmp(name1, name2, False)
os.unlink(name2)
fstat1 = os.lstat(name1)
assert fstat1.st_nlink == 1
os.unlink(name1)
def tst_readdir(mnt_dir):
dir_ = pjoin(mnt_dir, name_generator())
file_ = dir_ + "/" + name_generator()
subdir = dir_ + "/" + name_generator()
subfile = subdir + "/" + name_generator()
os.mkdir(dir_)
shutil.copyfile(TEST_FILE, file_)
os.mkdir(subdir)
shutil.copyfile(TEST_FILE, subfile)
listdir_is = os.listdir(dir_)
listdir_is.sort()
listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
listdir_should.sort()
assert listdir_is == listdir_should
os.unlink(file_)
os.unlink(subfile)
os.rmdir(subdir)
os.rmdir(dir_)
def tst_truncate_path(mnt_dir):
assert len(TEST_DATA) > 1024
filename = pjoin(mnt_dir, name_generator())
with open(filename, 'wb') as fh:
fh.write(TEST_DATA)
fstat = os.stat(filename)
size = fstat.st_size
assert size == len(TEST_DATA)
# Add zeros at the end
os.truncate(filename, size + 1024)
assert os.stat(filename).st_size == size + 1024
with open(filename, 'rb') as fh:
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
# Truncate data
os.truncate(filename, size - 1024)
assert os.stat(filename).st_size == size - 1024
with open(filename, 'rb') as fh:
assert fh.read(size) == TEST_DATA[:size-1024]
os.unlink(filename)
def tst_truncate_fd(mnt_dir):
assert len(TEST_DATA) > 1024
with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
fd = fh.fileno()
fh.write(TEST_DATA)
fstat = os.fstat(fd)
size = fstat.st_size
assert size == len(TEST_DATA)
# Add zeros at the end
os.ftruncate(fd, size + 1024)
assert os.fstat(fd).st_size == size + 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
# Truncate data
os.ftruncate(fd, size - 1024)
assert os.fstat(fd).st_size == size - 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA[:size-1024]
def tst_utimens(mnt_dir, ns_tol=0):
filename = pjoin(mnt_dir, name_generator())
os.mkdir(filename)
fstat = os.lstat(filename)
atime = fstat.st_atime + 42.28
mtime = fstat.st_mtime - 42.23
if sys.version_info < (3,3):
os.utime(filename, (atime, mtime))
else:
atime_ns = fstat.st_atime_ns + int(42.28*1e9)
mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
os.utime(filename, None, ns=(atime_ns, mtime_ns))
fstat = os.lstat(filename)
assert abs(fstat.st_atime - atime) < 1e-3
assert abs(fstat.st_mtime - mtime) < 1e-3
if sys.version_info >= (3,3):
assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
checked_unlink(filename, mnt_dir, isdir=True)
def tst_passthrough(src_dir, mnt_dir):
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(src_name, 'w') as fh:
fh.write('Hello, world')
assert name in os.listdir(src_dir)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(mnt_name, 'w') as fh:
fh.write('Hello, world')
assert name in os.listdir(src_dir)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)