• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test program for the fcntl C module.
2"""
3import multiprocessing
4import platform
5import os
6import struct
7import sys
8import unittest
9from test.support import (
10    cpython_only, get_pagesize, is_apple, requires_subprocess, verbose
11)
12from test.support.import_helper import import_module
13from test.support.os_helper import TESTFN, unlink
14
15
16# Skip test if no fcntl module.
17fcntl = import_module('fcntl')
18
19
20
21class BadFile:
22    def __init__(self, fn):
23        self.fn = fn
24    def fileno(self):
25        return self.fn
26
27def try_lockf_on_other_process_fail(fname, cmd):
28    f = open(fname, 'wb+')
29    try:
30        fcntl.lockf(f, cmd)
31    except BlockingIOError:
32        pass
33    finally:
34        f.close()
35
36def try_lockf_on_other_process(fname, cmd):
37    f = open(fname, 'wb+')
38    fcntl.lockf(f, cmd)
39    fcntl.lockf(f, fcntl.LOCK_UN)
40    f.close()
41
42class TestFcntl(unittest.TestCase):
43
44    def setUp(self):
45        self.f = None
46
47    def tearDown(self):
48        if self.f and not self.f.closed:
49            self.f.close()
50        unlink(TESTFN)
51
52    @staticmethod
53    def get_lockdata():
54        try:
55            os.O_LARGEFILE
56        except AttributeError:
57            start_len = "ll"
58        else:
59            start_len = "qq"
60
61        if (
62            sys.platform.startswith(('netbsd', 'freebsd', 'openbsd'))
63            or is_apple
64        ):
65            if struct.calcsize('l') == 8:
66                off_t = 'l'
67                pid_t = 'i'
68            else:
69                off_t = 'lxxxx'
70                pid_t = 'l'
71            lockdata = struct.pack(off_t + off_t + pid_t + 'hh', 0, 0, 0,
72                                   fcntl.F_WRLCK, 0)
73        elif sys.platform.startswith('gnukfreebsd'):
74            lockdata = struct.pack('qqihhi', 0, 0, 0, fcntl.F_WRLCK, 0, 0)
75        elif sys.platform in ['hp-uxB', 'unixware7']:
76            lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
77        else:
78            lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
79        if lockdata:
80            if verbose:
81                print('struct.pack: ', repr(lockdata))
82        return lockdata
83
84    def test_fcntl_fileno(self):
85        # the example from the library docs
86        self.f = open(TESTFN, 'wb')
87        rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
88        if verbose:
89            print('Status from fcntl with O_NONBLOCK: ', rv)
90        lockdata = self.get_lockdata()
91        rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
92        if verbose:
93            print('String from fcntl with F_SETLKW: ', repr(rv))
94        self.f.close()
95
96    def test_fcntl_file_descriptor(self):
97        # again, but pass the file rather than numeric descriptor
98        self.f = open(TESTFN, 'wb')
99        rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK)
100        if verbose:
101            print('Status from fcntl with O_NONBLOCK: ', rv)
102        lockdata = self.get_lockdata()
103        rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
104        if verbose:
105            print('String from fcntl with F_SETLKW: ', repr(rv))
106        self.f.close()
107
108    def test_fcntl_bad_file(self):
109        with self.assertRaises(ValueError):
110            fcntl.fcntl(-1, fcntl.F_SETFL, os.O_NONBLOCK)
111        with self.assertRaises(ValueError):
112            fcntl.fcntl(BadFile(-1), fcntl.F_SETFL, os.O_NONBLOCK)
113        with self.assertRaises(TypeError):
114            fcntl.fcntl('spam', fcntl.F_SETFL, os.O_NONBLOCK)
115        with self.assertRaises(TypeError):
116            fcntl.fcntl(BadFile('spam'), fcntl.F_SETFL, os.O_NONBLOCK)
117
118    @cpython_only
119    def test_fcntl_bad_file_overflow(self):
120        _testcapi = import_module("_testcapi")
121        INT_MAX = _testcapi.INT_MAX
122        INT_MIN = _testcapi.INT_MIN
123        # Issue 15989
124        with self.assertRaises(OverflowError):
125            fcntl.fcntl(INT_MAX + 1, fcntl.F_SETFL, os.O_NONBLOCK)
126        with self.assertRaises(OverflowError):
127            fcntl.fcntl(BadFile(INT_MAX + 1), fcntl.F_SETFL, os.O_NONBLOCK)
128        with self.assertRaises(OverflowError):
129            fcntl.fcntl(INT_MIN - 1, fcntl.F_SETFL, os.O_NONBLOCK)
130        with self.assertRaises(OverflowError):
131            fcntl.fcntl(BadFile(INT_MIN - 1), fcntl.F_SETFL, os.O_NONBLOCK)
132
133    @unittest.skipIf(
134        (platform.machine().startswith("arm") and platform.system() == "Linux")
135        or platform.system() == "Android",
136        "this platform returns EINVAL for F_NOTIFY DN_MULTISHOT")
137    def test_fcntl_64_bit(self):
138        # Issue #1309352: fcntl shouldn't fail when the third arg fits in a
139        # C 'long' but not in a C 'int'.
140        try:
141            cmd = fcntl.F_NOTIFY
142            # This flag is larger than 2**31 in 64-bit builds
143            flags = fcntl.DN_MULTISHOT
144        except AttributeError:
145            self.skipTest("F_NOTIFY or DN_MULTISHOT unavailable")
146        fd = os.open(os.path.dirname(os.path.abspath(TESTFN)), os.O_RDONLY)
147        try:
148            fcntl.fcntl(fd, cmd, flags)
149        finally:
150            os.close(fd)
151
152    def test_flock(self):
153        # Solaris needs readable file for shared lock
154        self.f = open(TESTFN, 'wb+')
155        fileno = self.f.fileno()
156        fcntl.flock(fileno, fcntl.LOCK_SH)
157        fcntl.flock(fileno, fcntl.LOCK_UN)
158        fcntl.flock(self.f, fcntl.LOCK_SH | fcntl.LOCK_NB)
159        fcntl.flock(self.f, fcntl.LOCK_UN)
160        fcntl.flock(fileno, fcntl.LOCK_EX)
161        fcntl.flock(fileno, fcntl.LOCK_UN)
162
163        self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
164        self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
165
166    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
167    @requires_subprocess()
168    def test_lockf_exclusive(self):
169        self.f = open(TESTFN, 'wb+')
170        cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
171        fcntl.lockf(self.f, cmd)
172        mp = multiprocessing.get_context('spawn')
173        p = mp.Process(target=try_lockf_on_other_process_fail, args=(TESTFN, cmd))
174        p.start()
175        p.join()
176        fcntl.lockf(self.f, fcntl.LOCK_UN)
177        self.assertEqual(p.exitcode, 0)
178
179    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
180    @requires_subprocess()
181    def test_lockf_share(self):
182        self.f = open(TESTFN, 'wb+')
183        cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
184        fcntl.lockf(self.f, cmd)
185        mp = multiprocessing.get_context('spawn')
186        p = mp.Process(target=try_lockf_on_other_process, args=(TESTFN, cmd))
187        p.start()
188        p.join()
189        fcntl.lockf(self.f, fcntl.LOCK_UN)
190        self.assertEqual(p.exitcode, 0)
191
192    @cpython_only
193    def test_flock_overflow(self):
194        _testcapi = import_module("_testcapi")
195        self.assertRaises(OverflowError, fcntl.flock, _testcapi.INT_MAX+1,
196                          fcntl.LOCK_SH)
197
198    @unittest.skipIf(sys.platform != 'darwin', "F_GETPATH is only available on macos")
199    def test_fcntl_f_getpath(self):
200        self.f = open(TESTFN, 'wb')
201        expected = os.path.abspath(TESTFN).encode('utf-8')
202        res = fcntl.fcntl(self.f.fileno(), fcntl.F_GETPATH, bytes(len(expected)))
203        self.assertEqual(expected, res)
204
205    @unittest.skipUnless(
206        hasattr(fcntl, "F_SETPIPE_SZ") and hasattr(fcntl, "F_GETPIPE_SZ"),
207        "F_SETPIPE_SZ and F_GETPIPE_SZ are not available on all platforms.")
208    def test_fcntl_f_pipesize(self):
209        test_pipe_r, test_pipe_w = os.pipe()
210        try:
211            # Get the default pipesize with F_GETPIPE_SZ
212            pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
213            pipesize = pipesize_default // 2  # A new value to detect change.
214            pagesize_default = get_pagesize()
215            if pipesize < pagesize_default:  # the POSIX minimum
216                raise unittest.SkipTest(
217                    'default pipesize too small to perform test.')
218            fcntl.fcntl(test_pipe_w, fcntl.F_SETPIPE_SZ, pipesize)
219            self.assertEqual(fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ),
220                             pipesize)
221        finally:
222            os.close(test_pipe_r)
223            os.close(test_pipe_w)
224
225
226if __name__ == '__main__':
227    unittest.main()
228