• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import array
2import unittest
3from test.support import get_attribute
4from test.support.import_helper import import_module
5import os, struct
6fcntl = import_module('fcntl')
7termios = import_module('termios')
8get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature
9
10try:
11    tty = open("/dev/tty", "rb")
12except OSError:
13    raise unittest.SkipTest("Unable to open /dev/tty")
14else:
15    with tty:
16        # Skip if another process is in foreground
17        r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
18    rpgrp = struct.unpack("i", r)[0]
19    if rpgrp not in (os.getpgrp(), os.getsid(0)):
20        raise unittest.SkipTest("Neither the process group nor the session "
21                                "are attached to /dev/tty")
22    del tty, r, rpgrp
23
24try:
25    import pty
26except ImportError:
27    pty = None
28
29class IoctlTests(unittest.TestCase):
30    def test_ioctl(self):
31        # If this process has been put into the background, TIOCGPGRP returns
32        # the session ID instead of the process group id.
33        ids = (os.getpgrp(), os.getsid(0))
34        with open("/dev/tty", "rb") as tty:
35            r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
36            rpgrp = struct.unpack("i", r)[0]
37            self.assertIn(rpgrp, ids)
38
39    def _check_ioctl_mutate_len(self, nbytes=None):
40        buf = array.array('i')
41        intsize = buf.itemsize
42        ids = (os.getpgrp(), os.getsid(0))
43        # A fill value unlikely to be in `ids`
44        fill = -12345
45        if nbytes is not None:
46            # Extend the buffer so that it is exactly `nbytes` bytes long
47            buf.extend([fill] * (nbytes // intsize))
48            self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
49        else:
50            buf.append(fill)
51        with open("/dev/tty", "rb") as tty:
52            r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, True)
53        rpgrp = buf[0]
54        self.assertEqual(r, 0)
55        self.assertIn(rpgrp, ids)
56
57    def test_ioctl_mutate(self):
58        self._check_ioctl_mutate_len()
59
60    def test_ioctl_mutate_1024(self):
61        # Issue #9758: a mutable buffer of exactly 1024 bytes wouldn't be
62        # copied back after the system call.
63        self._check_ioctl_mutate_len(1024)
64
65    def test_ioctl_mutate_2048(self):
66        # Test with a larger buffer, just for the record.
67        self._check_ioctl_mutate_len(2048)
68
69    def test_ioctl_signed_unsigned_code_param(self):
70        if not pty:
71            raise unittest.SkipTest('pty module required')
72        mfd, sfd = pty.openpty()
73        try:
74            if termios.TIOCSWINSZ < 0:
75                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
76                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
77            else:
78                set_winsz_opcode_pos = termios.TIOCSWINSZ
79                set_winsz_opcode_maybe_neg, = struct.unpack("i",
80                        struct.pack("I", termios.TIOCSWINSZ))
81
82            our_winsz = struct.pack("HHHH",80,25,0,0)
83            # test both with a positive and potentially negative ioctl code
84            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
85            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
86        finally:
87            os.close(mfd)
88            os.close(sfd)
89
90
91if __name__ == "__main__":
92    unittest.main()
93