• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test program for the fcntl C module.
2"""
3import platform
4import os
5import struct
6import sys
7import unittest
8from multiprocessing import Process
9from test.support import verbose, cpython_only
10from test.support.import_helper import import_module
11from test.support.os_helper import TESTFN, unlink
12
13
14# Skip test if no fcntl module.
15fcntl = import_module('fcntl')
16
17
18
19def get_lockdata():
20    try:
21        os.O_LARGEFILE
22    except AttributeError:
23        start_len = "ll"
24    else:
25        start_len = "qq"
26
27    if (sys.platform.startswith(('netbsd', 'freebsd', 'openbsd'))
28        or sys.platform == 'darwin'):
29        if struct.calcsize('l') == 8:
30            off_t = 'l'
31            pid_t = 'i'
32        else:
33            off_t = 'lxxxx'
34            pid_t = 'l'
35        lockdata = struct.pack(off_t + off_t + pid_t + 'hh', 0, 0, 0,
36                               fcntl.F_WRLCK, 0)
37    elif sys.platform.startswith('gnukfreebsd'):
38        lockdata = struct.pack('qqihhi', 0, 0, 0, fcntl.F_WRLCK, 0, 0)
39    elif sys.platform in ['hp-uxB', 'unixware7']:
40        lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
41    else:
42        lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
43    if lockdata:
44        if verbose:
45            print('struct.pack: ', repr(lockdata))
46    return lockdata
47
48lockdata = get_lockdata()
49
50class BadFile:
51    def __init__(self, fn):
52        self.fn = fn
53    def fileno(self):
54        return self.fn
55
56def try_lockf_on_other_process_fail(fname, cmd):
57    f = open(fname, 'wb+')
58    try:
59        fcntl.lockf(f, cmd)
60    except BlockingIOError:
61        pass
62    finally:
63        f.close()
64
65def try_lockf_on_other_process(fname, cmd):
66    f = open(fname, 'wb+')
67    fcntl.lockf(f, cmd)
68    fcntl.lockf(f, fcntl.LOCK_UN)
69    f.close()
70
71class TestFcntl(unittest.TestCase):
72
73    def setUp(self):
74        self.f = None
75
76    def tearDown(self):
77        if self.f and not self.f.closed:
78            self.f.close()
79        unlink(TESTFN)
80
81    def test_fcntl_fileno(self):
82        # the example from the library docs
83        self.f = open(TESTFN, 'wb')
84        rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
85        if verbose:
86            print('Status from fcntl with O_NONBLOCK: ', rv)
87        rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
88        if verbose:
89            print('String from fcntl with F_SETLKW: ', repr(rv))
90        self.f.close()
91
92    def test_fcntl_file_descriptor(self):
93        # again, but pass the file rather than numeric descriptor
94        self.f = open(TESTFN, 'wb')
95        rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK)
96        if verbose:
97            print('Status from fcntl with O_NONBLOCK: ', rv)
98        rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
99        if verbose:
100            print('String from fcntl with F_SETLKW: ', repr(rv))
101        self.f.close()
102
103    def test_fcntl_bad_file(self):
104        with self.assertRaises(ValueError):
105            fcntl.fcntl(-1, fcntl.F_SETFL, os.O_NONBLOCK)
106        with self.assertRaises(ValueError):
107            fcntl.fcntl(BadFile(-1), fcntl.F_SETFL, os.O_NONBLOCK)
108        with self.assertRaises(TypeError):
109            fcntl.fcntl('spam', fcntl.F_SETFL, os.O_NONBLOCK)
110        with self.assertRaises(TypeError):
111            fcntl.fcntl(BadFile('spam'), fcntl.F_SETFL, os.O_NONBLOCK)
112
113    @cpython_only
114    def test_fcntl_bad_file_overflow(self):
115        from _testcapi import INT_MAX, INT_MIN
116        # Issue 15989
117        with self.assertRaises(OverflowError):
118            fcntl.fcntl(INT_MAX + 1, fcntl.F_SETFL, os.O_NONBLOCK)
119        with self.assertRaises(OverflowError):
120            fcntl.fcntl(BadFile(INT_MAX + 1), fcntl.F_SETFL, os.O_NONBLOCK)
121        with self.assertRaises(OverflowError):
122            fcntl.fcntl(INT_MIN - 1, fcntl.F_SETFL, os.O_NONBLOCK)
123        with self.assertRaises(OverflowError):
124            fcntl.fcntl(BadFile(INT_MIN - 1), fcntl.F_SETFL, os.O_NONBLOCK)
125
126    @unittest.skipIf(
127        platform.machine().startswith('arm') and platform.system() == 'Linux',
128        "ARM Linux returns EINVAL for F_NOTIFY DN_MULTISHOT")
129    def test_fcntl_64_bit(self):
130        # Issue #1309352: fcntl shouldn't fail when the third arg fits in a
131        # C 'long' but not in a C 'int'.
132        try:
133            cmd = fcntl.F_NOTIFY
134            # This flag is larger than 2**31 in 64-bit builds
135            flags = fcntl.DN_MULTISHOT
136        except AttributeError:
137            self.skipTest("F_NOTIFY or DN_MULTISHOT unavailable")
138        fd = os.open(os.path.dirname(os.path.abspath(TESTFN)), os.O_RDONLY)
139        try:
140            fcntl.fcntl(fd, cmd, flags)
141        finally:
142            os.close(fd)
143
144    def test_flock(self):
145        # Solaris needs readable file for shared lock
146        self.f = open(TESTFN, 'wb+')
147        fileno = self.f.fileno()
148        fcntl.flock(fileno, fcntl.LOCK_SH)
149        fcntl.flock(fileno, fcntl.LOCK_UN)
150        fcntl.flock(self.f, fcntl.LOCK_SH | fcntl.LOCK_NB)
151        fcntl.flock(self.f, fcntl.LOCK_UN)
152        fcntl.flock(fileno, fcntl.LOCK_EX)
153        fcntl.flock(fileno, fcntl.LOCK_UN)
154
155        self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
156        self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
157
158    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
159    def test_lockf_exclusive(self):
160        self.f = open(TESTFN, 'wb+')
161        cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
162        fcntl.lockf(self.f, cmd)
163        p = Process(target=try_lockf_on_other_process_fail, args=(TESTFN, cmd))
164        p.start()
165        p.join()
166        fcntl.lockf(self.f, fcntl.LOCK_UN)
167        self.assertEqual(p.exitcode, 0)
168
169    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
170    def test_lockf_share(self):
171        self.f = open(TESTFN, 'wb+')
172        cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
173        fcntl.lockf(self.f, cmd)
174        p = Process(target=try_lockf_on_other_process, args=(TESTFN, cmd))
175        p.start()
176        p.join()
177        fcntl.lockf(self.f, fcntl.LOCK_UN)
178        self.assertEqual(p.exitcode, 0)
179
180    @cpython_only
181    def test_flock_overflow(self):
182        import _testcapi
183        self.assertRaises(OverflowError, fcntl.flock, _testcapi.INT_MAX+1,
184                          fcntl.LOCK_SH)
185
186    @unittest.skipIf(sys.platform != 'darwin', "F_GETPATH is only available on macos")
187    def test_fcntl_f_getpath(self):
188        self.f = open(TESTFN, 'wb')
189        expected = os.path.abspath(TESTFN).encode('utf-8')
190        res = fcntl.fcntl(self.f.fileno(), fcntl.F_GETPATH, bytes(len(expected)))
191        self.assertEqual(expected, res)
192
193    @unittest.skipUnless(
194        hasattr(fcntl, "F_SETPIPE_SZ") and hasattr(fcntl, "F_GETPIPE_SZ"),
195        "F_SETPIPE_SZ and F_GETPIPE_SZ are not available on all platforms.")
196    def test_fcntl_f_pipesize(self):
197        test_pipe_r, test_pipe_w = os.pipe()
198        try:
199            # Get the default pipesize with F_GETPIPE_SZ
200            pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
201            pipesize = pipesize_default // 2  # A new value to detect change.
202            if pipesize < 512:  # the POSIX minimum
203                raise unittest.SkitTest(
204                    'default pipesize too small to perform test.')
205            fcntl.fcntl(test_pipe_w, fcntl.F_SETPIPE_SZ, pipesize)
206            self.assertEqual(fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ),
207                             pipesize)
208        finally:
209            os.close(test_pipe_r)
210            os.close(test_pipe_w)
211
212
213if __name__ == '__main__':
214    unittest.main()
215