1''' 2Tests for fileinput module. 3Nick Mathewson 4''' 5 6import unittest 7from test.test_support import verbose, TESTFN, run_unittest 8from test.test_support import unlink as safe_unlink, check_warnings 9import sys, re 10from StringIO import StringIO 11from fileinput import FileInput, hook_encoded 12 13# The fileinput module has 2 interfaces: the FileInput class which does 14# all the work, and a few functions (input, etc.) that use a global _state 15# variable. We only test the FileInput class, since the other functions 16# only provide a thin facade over FileInput. 17 18# Write lines (a list of lines) to temp file number i, and return the 19# temp file's name. 20def writeTmp(i, lines, mode='w'): # opening in text mode is the default 21 name = TESTFN + str(i) 22 f = open(name, mode) 23 f.writelines(lines) 24 f.close() 25 return name 26 27def remove_tempfiles(*names): 28 for name in names: 29 safe_unlink(name) 30 31class LineReader: 32 33 def __init__(self): 34 self._linesread = [] 35 36 @property 37 def linesread(self): 38 try: 39 return self._linesread[:] 40 finally: 41 self._linesread = [] 42 43 def openhook(self, filename, mode): 44 self.it = iter(filename.splitlines(True)) 45 return self 46 47 def readline(self, size=None): 48 line = next(self.it, '') 49 self._linesread.append(line) 50 return line 51 52 def readlines(self, hint=-1): 53 lines = [] 54 size = 0 55 while True: 56 line = self.readline() 57 if not line: 58 return lines 59 lines.append(line) 60 size += len(line) 61 if size >= hint: 62 return lines 63 64 def close(self): 65 pass 66 67class BufferSizesTests(unittest.TestCase): 68 def test_buffer_sizes(self): 69 # First, run the tests with default and teeny buffer size. 70 for round, bs in (0, 0), (1, 30): 71 try: 72 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)]) 73 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)]) 74 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)]) 75 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)]) 76 self.buffer_size_test(t1, t2, t3, t4, bs, round) 77 finally: 78 remove_tempfiles(t1, t2, t3, t4) 79 80 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0): 81 pat = re.compile(r'LINE (\d+) OF FILE (\d+)') 82 83 start = 1 + round*6 84 if verbose: 85 print '%s. Simple iteration (bs=%s)' % (start+0, bs) 86 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 87 lines = list(fi) 88 fi.close() 89 self.assertEqual(len(lines), 31) 90 self.assertEqual(lines[4], 'Line 5 of file 1\n') 91 self.assertEqual(lines[30], 'Line 1 of file 4\n') 92 self.assertEqual(fi.lineno(), 31) 93 self.assertEqual(fi.filename(), t4) 94 95 if verbose: 96 print '%s. Status variables (bs=%s)' % (start+1, bs) 97 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 98 s = "x" 99 while s and s != 'Line 6 of file 2\n': 100 s = fi.readline() 101 self.assertEqual(fi.filename(), t2) 102 self.assertEqual(fi.lineno(), 21) 103 self.assertEqual(fi.filelineno(), 6) 104 self.assertFalse(fi.isfirstline()) 105 self.assertFalse(fi.isstdin()) 106 107 if verbose: 108 print '%s. Nextfile (bs=%s)' % (start+2, bs) 109 fi.nextfile() 110 self.assertEqual(fi.readline(), 'Line 1 of file 3\n') 111 self.assertEqual(fi.lineno(), 22) 112 fi.close() 113 114 if verbose: 115 print '%s. Stdin (bs=%s)' % (start+3, bs) 116 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs) 117 savestdin = sys.stdin 118 try: 119 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n") 120 lines = list(fi) 121 self.assertEqual(len(lines), 33) 122 self.assertEqual(lines[32], 'Line 2 of stdin\n') 123 self.assertEqual(fi.filename(), '<stdin>') 124 fi.nextfile() 125 finally: 126 sys.stdin = savestdin 127 128 if verbose: 129 print '%s. Boundary conditions (bs=%s)' % (start+4, bs) 130 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 131 self.assertEqual(fi.lineno(), 0) 132 self.assertEqual(fi.filename(), None) 133 fi.nextfile() 134 self.assertEqual(fi.lineno(), 0) 135 self.assertEqual(fi.filename(), None) 136 137 if verbose: 138 print '%s. Inplace (bs=%s)' % (start+5, bs) 139 savestdout = sys.stdout 140 try: 141 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs) 142 for line in fi: 143 line = line[:-1].upper() 144 print line 145 fi.close() 146 finally: 147 sys.stdout = savestdout 148 149 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 150 for line in fi: 151 self.assertEqual(line[-1], '\n') 152 m = pat.match(line[:-1]) 153 self.assertNotEqual(m, None) 154 self.assertEqual(int(m.group(1)), fi.filelineno()) 155 fi.close() 156 157class FileInputTests(unittest.TestCase): 158 def test_zero_byte_files(self): 159 try: 160 t1 = writeTmp(1, [""]) 161 t2 = writeTmp(2, [""]) 162 t3 = writeTmp(3, ["The only line there is.\n"]) 163 t4 = writeTmp(4, [""]) 164 fi = FileInput(files=(t1, t2, t3, t4)) 165 166 line = fi.readline() 167 self.assertEqual(line, 'The only line there is.\n') 168 self.assertEqual(fi.lineno(), 1) 169 self.assertEqual(fi.filelineno(), 1) 170 self.assertEqual(fi.filename(), t3) 171 172 line = fi.readline() 173 self.assertFalse(line) 174 self.assertEqual(fi.lineno(), 1) 175 self.assertEqual(fi.filelineno(), 0) 176 self.assertEqual(fi.filename(), t4) 177 fi.close() 178 finally: 179 remove_tempfiles(t1, t2, t3, t4) 180 181 def test_files_that_dont_end_with_newline(self): 182 try: 183 t1 = writeTmp(1, ["A\nB\nC"]) 184 t2 = writeTmp(2, ["D\nE\nF"]) 185 fi = FileInput(files=(t1, t2)) 186 lines = list(fi) 187 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 188 self.assertEqual(fi.filelineno(), 3) 189 self.assertEqual(fi.lineno(), 6) 190 finally: 191 remove_tempfiles(t1, t2) 192 193 def test_unicode_filenames(self): 194 try: 195 t1 = writeTmp(1, ["A\nB"]) 196 encoding = sys.getfilesystemencoding() 197 if encoding is None: 198 encoding = 'ascii' 199 fi = FileInput(files=unicode(t1, encoding)) 200 lines = list(fi) 201 self.assertEqual(lines, ["A\n", "B"]) 202 finally: 203 remove_tempfiles(t1) 204 205 def test_fileno(self): 206 try: 207 t1 = writeTmp(1, ["A\nB"]) 208 t2 = writeTmp(2, ["C\nD"]) 209 fi = FileInput(files=(t1, t2)) 210 self.assertEqual(fi.fileno(), -1) 211 line = fi.next() 212 self.assertNotEqual(fi.fileno(), -1) 213 fi.nextfile() 214 self.assertEqual(fi.fileno(), -1) 215 line = list(fi) 216 self.assertEqual(fi.fileno(), -1) 217 finally: 218 remove_tempfiles(t1, t2) 219 220 def test_opening_mode(self): 221 try: 222 # invalid mode, should raise ValueError 223 fi = FileInput(mode="w") 224 self.fail("FileInput should reject invalid mode argument") 225 except ValueError: 226 pass 227 try: 228 # try opening in universal newline mode 229 t1 = writeTmp(1, ["A\nB\r\nC\rD"], mode="wb") 230 fi = FileInput(files=t1, mode="U") 231 lines = list(fi) 232 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) 233 finally: 234 remove_tempfiles(t1) 235 236 def test_file_opening_hook(self): 237 try: 238 # cannot use openhook and inplace mode 239 fi = FileInput(inplace=1, openhook=lambda f,m: None) 240 self.fail("FileInput should raise if both inplace " 241 "and openhook arguments are given") 242 except ValueError: 243 pass 244 try: 245 fi = FileInput(openhook=1) 246 self.fail("FileInput should check openhook for being callable") 247 except ValueError: 248 pass 249 try: 250 # UTF-7 is a convenient, seldom used encoding 251 t1 = writeTmp(1, ['+AEE-\n+AEI-'], mode="wb") 252 fi = FileInput(files=t1, openhook=hook_encoded("utf-7")) 253 lines = list(fi) 254 self.assertEqual(lines, [u'A\n', u'B']) 255 finally: 256 remove_tempfiles(t1) 257 258 def test_readline(self): 259 with open(TESTFN, 'wb') as f: 260 f.write('A\nB\r\nC\r') 261 # Fill TextIOWrapper buffer. 262 f.write('123456789\n' * 1000) 263 # Issue #20501: readline() shouldn't read whole file. 264 f.write('\x80') 265 self.addCleanup(safe_unlink, TESTFN) 266 267 fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii')) 268 # The most likely failure is a UnicodeDecodeError due to the entire 269 # file being read when it shouldn't have been. 270 self.assertEqual(fi.readline(), u'A\n') 271 self.assertEqual(fi.readline(), u'B\r\n') 272 self.assertEqual(fi.readline(), u'C\r') 273 with self.assertRaises(UnicodeDecodeError): 274 # Read to the end of file. 275 list(fi) 276 fi.close() 277 278 def test_readline_buffering(self): 279 src = LineReader() 280 fi = FileInput(files=['line1\nline2', 'line3\n'], openhook=src.openhook) 281 self.assertEqual(src.linesread, []) 282 self.assertEqual(fi.readline(), 'line1\n') 283 self.assertEqual(src.linesread, ['line1\n']) 284 self.assertEqual(fi.readline(), 'line2') 285 self.assertEqual(src.linesread, ['line2']) 286 self.assertEqual(fi.readline(), 'line3\n') 287 self.assertEqual(src.linesread, ['', 'line3\n']) 288 self.assertEqual(fi.readline(), '') 289 self.assertEqual(src.linesread, ['']) 290 self.assertEqual(fi.readline(), '') 291 self.assertEqual(src.linesread, []) 292 fi.close() 293 294 def test_iteration_buffering(self): 295 src = LineReader() 296 fi = FileInput(files=['line1\nline2', 'line3\n'], openhook=src.openhook) 297 self.assertEqual(src.linesread, []) 298 self.assertEqual(next(fi), 'line1\n') 299 self.assertEqual(src.linesread, ['line1\n']) 300 self.assertEqual(next(fi), 'line2') 301 self.assertEqual(src.linesread, ['line2']) 302 self.assertEqual(next(fi), 'line3\n') 303 self.assertEqual(src.linesread, ['', 'line3\n']) 304 self.assertRaises(StopIteration, next, fi) 305 self.assertEqual(src.linesread, ['']) 306 self.assertRaises(StopIteration, next, fi) 307 self.assertEqual(src.linesread, []) 308 fi.close() 309 310class Test_hook_encoded(unittest.TestCase): 311 """Unit tests for fileinput.hook_encoded()""" 312 313 def test_modes(self): 314 with open(TESTFN, 'wb') as f: 315 # UTF-7 is a convenient, seldom used encoding 316 f.write('A\nB\r\nC\rD+IKw-') 317 self.addCleanup(safe_unlink, TESTFN) 318 319 def check(mode, expected_lines): 320 fi = FileInput(files=TESTFN, mode=mode, 321 openhook=hook_encoded('utf-7')) 322 lines = list(fi) 323 fi.close() 324 self.assertEqual(lines, expected_lines) 325 326 check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac']) 327 check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac']) 328 check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac']) 329 check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac']) 330 331def test_main(): 332 run_unittest(BufferSizesTests, FileInputTests, Test_hook_encoded) 333 334if __name__ == "__main__": 335 test_main() 336