1# 2# test_multibytecodec.py 3# Unit test for multibytecodec itself 4# 5 6import _multibytecodec 7import codecs 8import io 9import sys 10import textwrap 11import unittest 12from test import support 13from test.support import os_helper 14from test.support.os_helper import TESTFN 15 16ALL_CJKENCODINGS = [ 17# _codecs_cn 18 'gb2312', 'gbk', 'gb18030', 'hz', 19# _codecs_hk 20 'big5hkscs', 21# _codecs_jp 22 'cp932', 'shift_jis', 'euc_jp', 'euc_jisx0213', 'shift_jisx0213', 23 'euc_jis_2004', 'shift_jis_2004', 24# _codecs_kr 25 'cp949', 'euc_kr', 'johab', 26# _codecs_tw 27 'big5', 'cp950', 28# _codecs_iso2022 29 'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004', 30 'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr', 31] 32 33class Test_MultibyteCodec(unittest.TestCase): 34 35 def test_nullcoding(self): 36 for enc in ALL_CJKENCODINGS: 37 self.assertEqual(b''.decode(enc), '') 38 self.assertEqual(str(b'', enc), '') 39 self.assertEqual(''.encode(enc), b'') 40 41 def test_str_decode(self): 42 for enc in ALL_CJKENCODINGS: 43 self.assertEqual('abcd'.encode(enc), b'abcd') 44 45 def test_errorcallback_longindex(self): 46 dec = codecs.getdecoder('euc-kr') 47 myreplace = lambda exc: ('', sys.maxsize+1) 48 codecs.register_error('test.cjktest', myreplace) 49 self.assertRaises(IndexError, dec, 50 b'apple\x92ham\x93spam', 'test.cjktest') 51 52 def test_errorcallback_custom_ignore(self): 53 # Issue #23215: MemoryError with custom error handlers and multibyte codecs 54 data = 100 * "\udc00" 55 codecs.register_error("test.ignore", codecs.ignore_errors) 56 for enc in ALL_CJKENCODINGS: 57 self.assertEqual(data.encode(enc, "test.ignore"), b'') 58 59 def test_codingspec(self): 60 try: 61 for enc in ALL_CJKENCODINGS: 62 code = '# coding: {}\n'.format(enc) 63 exec(code) 64 finally: 65 os_helper.unlink(TESTFN) 66 67 def test_init_segfault(self): 68 # bug #3305: this used to segfault 69 self.assertRaises(AttributeError, 70 _multibytecodec.MultibyteStreamReader, None) 71 self.assertRaises(AttributeError, 72 _multibytecodec.MultibyteStreamWriter, None) 73 74 def test_decode_unicode(self): 75 # Trying to decode a unicode string should raise a TypeError 76 for enc in ALL_CJKENCODINGS: 77 self.assertRaises(TypeError, codecs.getdecoder(enc), "") 78 79class Test_IncrementalEncoder(unittest.TestCase): 80 81 def test_stateless(self): 82 # cp949 encoder isn't stateful at all. 83 encoder = codecs.getincrementalencoder('cp949')() 84 self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'), 85 b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb') 86 self.assertEqual(encoder.reset(), None) 87 self.assertEqual(encoder.encode('\u2606\u223c\u2606', True), 88 b'\xa1\xd9\xa1\xad\xa1\xd9') 89 self.assertEqual(encoder.reset(), None) 90 self.assertEqual(encoder.encode('', True), b'') 91 self.assertEqual(encoder.encode('', False), b'') 92 self.assertEqual(encoder.reset(), None) 93 94 def test_stateful(self): 95 # jisx0213 encoder is stateful for a few code points. eg) 96 # U+00E6 => A9DC 97 # U+00E6 U+0300 => ABC4 98 # U+0300 => ABDC 99 100 encoder = codecs.getincrementalencoder('jisx0213')() 101 self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4') 102 self.assertEqual(encoder.encode('\u00e6'), b'') 103 self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4') 104 self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc') 105 106 self.assertEqual(encoder.reset(), None) 107 self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc') 108 109 self.assertEqual(encoder.encode('\u00e6'), b'') 110 self.assertEqual(encoder.encode('', True), b'\xa9\xdc') 111 self.assertEqual(encoder.encode('', True), b'') 112 113 def test_stateful_keep_buffer(self): 114 encoder = codecs.getincrementalencoder('jisx0213')() 115 self.assertEqual(encoder.encode('\u00e6'), b'') 116 self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123') 117 self.assertEqual(encoder.encode('\u0300\u00e6'), b'\xab\xc4') 118 self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123') 119 self.assertEqual(encoder.reset(), None) 120 self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc') 121 self.assertEqual(encoder.encode('\u00e6'), b'') 122 self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123') 123 self.assertEqual(encoder.encode('', True), b'\xa9\xdc') 124 125 def test_state_methods_with_buffer_state(self): 126 # euc_jis_2004 stores state as a buffer of pending bytes 127 encoder = codecs.getincrementalencoder('euc_jis_2004')() 128 129 initial_state = encoder.getstate() 130 self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4') 131 encoder.setstate(initial_state) 132 self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4') 133 134 self.assertEqual(encoder.encode('\u00e6'), b'') 135 partial_state = encoder.getstate() 136 self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4') 137 encoder.setstate(partial_state) 138 self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4') 139 140 def test_state_methods_with_non_buffer_state(self): 141 # iso2022_jp stores state without using a buffer 142 encoder = codecs.getincrementalencoder('iso2022_jp')() 143 144 self.assertEqual(encoder.encode('z'), b'z') 145 en_state = encoder.getstate() 146 147 self.assertEqual(encoder.encode('\u3042'), b'\x1b\x24\x42\x24\x22') 148 jp_state = encoder.getstate() 149 self.assertEqual(encoder.encode('z'), b'\x1b\x28\x42z') 150 151 encoder.setstate(jp_state) 152 self.assertEqual(encoder.encode('\u3042'), b'\x24\x22') 153 154 encoder.setstate(en_state) 155 self.assertEqual(encoder.encode('z'), b'z') 156 157 def test_getstate_returns_expected_value(self): 158 # Note: getstate is implemented such that these state values 159 # are expected to be the same across all builds of Python, 160 # regardless of x32/64 bit, endianness and compiler. 161 162 # euc_jis_2004 stores state as a buffer of pending bytes 163 buffer_state_encoder = codecs.getincrementalencoder('euc_jis_2004')() 164 self.assertEqual(buffer_state_encoder.getstate(), 0) 165 buffer_state_encoder.encode('\u00e6') 166 self.assertEqual(buffer_state_encoder.getstate(), 167 int.from_bytes( 168 b"\x02" 169 b"\xc3\xa6" 170 b"\x00\x00\x00\x00\x00\x00\x00\x00", 171 'little')) 172 buffer_state_encoder.encode('\u0300') 173 self.assertEqual(buffer_state_encoder.getstate(), 0) 174 175 # iso2022_jp stores state without using a buffer 176 non_buffer_state_encoder = codecs.getincrementalencoder('iso2022_jp')() 177 self.assertEqual(non_buffer_state_encoder.getstate(), 178 int.from_bytes( 179 b"\x00" 180 b"\x42\x42\x00\x00\x00\x00\x00\x00", 181 'little')) 182 non_buffer_state_encoder.encode('\u3042') 183 self.assertEqual(non_buffer_state_encoder.getstate(), 184 int.from_bytes( 185 b"\x00" 186 b"\xc2\x42\x00\x00\x00\x00\x00\x00", 187 'little')) 188 189 def test_setstate_validates_input_size(self): 190 encoder = codecs.getincrementalencoder('euc_jp')() 191 pending_size_nine = int.from_bytes( 192 b"\x09" 193 b"\x00\x00\x00\x00\x00\x00\x00\x00" 194 b"\x00\x00\x00\x00\x00\x00\x00\x00", 195 'little') 196 self.assertRaises(UnicodeError, encoder.setstate, pending_size_nine) 197 198 def test_setstate_validates_input_bytes(self): 199 encoder = codecs.getincrementalencoder('euc_jp')() 200 invalid_utf8 = int.from_bytes( 201 b"\x01" 202 b"\xff" 203 b"\x00\x00\x00\x00\x00\x00\x00\x00", 204 'little') 205 self.assertRaises(UnicodeDecodeError, encoder.setstate, invalid_utf8) 206 207 def test_issue5640(self): 208 encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace') 209 self.assertEqual(encoder.encode('\xff'), b'\\xff') 210 self.assertEqual(encoder.encode('\n'), b'\n') 211 212 @support.cpython_only 213 def test_subinterp(self): 214 # bpo-42846: Test a CJK codec in a subinterpreter 215 import _testcapi 216 encoding = 'cp932' 217 text = "Python の開発は、1990 年ごろから開始されています。" 218 code = textwrap.dedent(""" 219 import codecs 220 encoding = %r 221 text = %r 222 encoder = codecs.getincrementalencoder(encoding)() 223 text2 = encoder.encode(text).decode(encoding) 224 if text2 != text: 225 raise ValueError(f"encoding issue: {text2!a} != {text!a}") 226 """) % (encoding, text) 227 res = _testcapi.run_in_subinterp(code) 228 self.assertEqual(res, 0) 229 230class Test_IncrementalDecoder(unittest.TestCase): 231 232 def test_dbcs(self): 233 # cp949 decoder is simple with only 1 or 2 bytes sequences. 234 decoder = codecs.getincrementaldecoder('cp949')() 235 self.assertEqual(decoder.decode(b'\xc6\xc4\xc0\xcc\xbd'), 236 '\ud30c\uc774') 237 self.assertEqual(decoder.decode(b'\xe3 \xb8\xb6\xc0\xbb'), 238 '\uc36c \ub9c8\uc744') 239 self.assertEqual(decoder.decode(b''), '') 240 241 def test_dbcs_keep_buffer(self): 242 decoder = codecs.getincrementaldecoder('cp949')() 243 self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c') 244 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True) 245 self.assertEqual(decoder.decode(b'\xcc'), '\uc774') 246 247 self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c') 248 self.assertRaises(UnicodeDecodeError, decoder.decode, 249 b'\xcc\xbd', True) 250 self.assertEqual(decoder.decode(b'\xcc'), '\uc774') 251 252 def test_iso2022(self): 253 decoder = codecs.getincrementaldecoder('iso2022-jp')() 254 ESC = b'\x1b' 255 self.assertEqual(decoder.decode(ESC + b'('), '') 256 self.assertEqual(decoder.decode(b'B', True), '') 257 self.assertEqual(decoder.decode(ESC + b'$'), '') 258 self.assertEqual(decoder.decode(b'B@$'), '\u4e16') 259 self.assertEqual(decoder.decode(b'@$@'), '\u4e16') 260 self.assertEqual(decoder.decode(b'$', True), '\u4e16') 261 self.assertEqual(decoder.reset(), None) 262 self.assertEqual(decoder.decode(b'@$'), '@$') 263 self.assertEqual(decoder.decode(ESC + b'$'), '') 264 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True) 265 self.assertEqual(decoder.decode(b'B@$'), '\u4e16') 266 267 def test_decode_unicode(self): 268 # Trying to decode a unicode string should raise a TypeError 269 for enc in ALL_CJKENCODINGS: 270 decoder = codecs.getincrementaldecoder(enc)() 271 self.assertRaises(TypeError, decoder.decode, "") 272 273 def test_state_methods(self): 274 decoder = codecs.getincrementaldecoder('euc_jp')() 275 276 # Decode a complete input sequence 277 self.assertEqual(decoder.decode(b'\xa4\xa6'), '\u3046') 278 pending1, _ = decoder.getstate() 279 self.assertEqual(pending1, b'') 280 281 # Decode first half of a partial input sequence 282 self.assertEqual(decoder.decode(b'\xa4'), '') 283 pending2, flags2 = decoder.getstate() 284 self.assertEqual(pending2, b'\xa4') 285 286 # Decode second half of a partial input sequence 287 self.assertEqual(decoder.decode(b'\xa6'), '\u3046') 288 pending3, _ = decoder.getstate() 289 self.assertEqual(pending3, b'') 290 291 # Jump back and decode second half of partial input sequence again 292 decoder.setstate((pending2, flags2)) 293 self.assertEqual(decoder.decode(b'\xa6'), '\u3046') 294 pending4, _ = decoder.getstate() 295 self.assertEqual(pending4, b'') 296 297 # Ensure state values are preserved correctly 298 decoder.setstate((b'abc', 123456789)) 299 self.assertEqual(decoder.getstate(), (b'abc', 123456789)) 300 301 def test_setstate_validates_input(self): 302 decoder = codecs.getincrementaldecoder('euc_jp')() 303 self.assertRaises(TypeError, decoder.setstate, 123) 304 self.assertRaises(TypeError, decoder.setstate, ("invalid", 0)) 305 self.assertRaises(TypeError, decoder.setstate, (b"1234", "invalid")) 306 self.assertRaises(UnicodeError, decoder.setstate, (b"123456789", 0)) 307 308class Test_StreamReader(unittest.TestCase): 309 def test_bug1728403(self): 310 try: 311 f = open(TESTFN, 'wb') 312 try: 313 f.write(b'\xa1') 314 finally: 315 f.close() 316 f = codecs.open(TESTFN, encoding='cp949') 317 try: 318 self.assertRaises(UnicodeDecodeError, f.read, 2) 319 finally: 320 f.close() 321 finally: 322 os_helper.unlink(TESTFN) 323 324class Test_StreamWriter(unittest.TestCase): 325 def test_gb18030(self): 326 s= io.BytesIO() 327 c = codecs.getwriter('gb18030')(s) 328 c.write('123') 329 self.assertEqual(s.getvalue(), b'123') 330 c.write('\U00012345') 331 self.assertEqual(s.getvalue(), b'123\x907\x959') 332 c.write('\uac00\u00ac') 333 self.assertEqual(s.getvalue(), 334 b'123\x907\x959\x827\xcf5\x810\x851') 335 336 def test_utf_8(self): 337 s= io.BytesIO() 338 c = codecs.getwriter('utf-8')(s) 339 c.write('123') 340 self.assertEqual(s.getvalue(), b'123') 341 c.write('\U00012345') 342 self.assertEqual(s.getvalue(), b'123\xf0\x92\x8d\x85') 343 c.write('\uac00\u00ac') 344 self.assertEqual(s.getvalue(), 345 b'123\xf0\x92\x8d\x85' 346 b'\xea\xb0\x80\xc2\xac') 347 348 def test_streamwriter_strwrite(self): 349 s = io.BytesIO() 350 wr = codecs.getwriter('gb18030')(s) 351 wr.write('abcd') 352 self.assertEqual(s.getvalue(), b'abcd') 353 354class Test_ISO2022(unittest.TestCase): 355 def test_g2(self): 356 iso2022jp2 = b'\x1b(B:hu4:unit\x1b.A\x1bNi de famille' 357 uni = ':hu4:unit\xe9 de famille' 358 self.assertEqual(iso2022jp2.decode('iso2022-jp-2'), uni) 359 360 def test_iso2022_jp_g0(self): 361 self.assertNotIn(b'\x0e', '\N{SOFT HYPHEN}'.encode('iso-2022-jp-2')) 362 for encoding in ('iso-2022-jp-2004', 'iso-2022-jp-3'): 363 e = '\u3406'.encode(encoding) 364 self.assertFalse(any(x > 0x80 for x in e)) 365 366 def test_bug1572832(self): 367 for x in range(0x10000, 0x110000): 368 # Any ISO 2022 codec will cause the segfault 369 chr(x).encode('iso_2022_jp', 'ignore') 370 371class TestStateful(unittest.TestCase): 372 text = '\u4E16\u4E16' 373 encoding = 'iso-2022-jp' 374 expected = b'\x1b$B@$@$' 375 reset = b'\x1b(B' 376 expected_reset = expected + reset 377 378 def test_encode(self): 379 self.assertEqual(self.text.encode(self.encoding), self.expected_reset) 380 381 def test_incrementalencoder(self): 382 encoder = codecs.getincrementalencoder(self.encoding)() 383 output = b''.join( 384 encoder.encode(char) 385 for char in self.text) 386 self.assertEqual(output, self.expected) 387 self.assertEqual(encoder.encode('', final=True), self.reset) 388 self.assertEqual(encoder.encode('', final=True), b'') 389 390 def test_incrementalencoder_final(self): 391 encoder = codecs.getincrementalencoder(self.encoding)() 392 last_index = len(self.text) - 1 393 output = b''.join( 394 encoder.encode(char, index == last_index) 395 for index, char in enumerate(self.text)) 396 self.assertEqual(output, self.expected_reset) 397 self.assertEqual(encoder.encode('', final=True), b'') 398 399class TestHZStateful(TestStateful): 400 text = '\u804a\u804a' 401 encoding = 'hz' 402 expected = b'~{ADAD' 403 reset = b'~}' 404 expected_reset = expected + reset 405 406 407if __name__ == "__main__": 408 unittest.main() 409