• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# test_multibytecodec.py
3#   Unit test for multibytecodec itself
4#
5
6from test import support
7from test.support import TESTFN
8import unittest, io, codecs, sys
9import _multibytecodec
10
11ALL_CJKENCODINGS = [
12# _codecs_cn
13    'gb2312', 'gbk', 'gb18030', 'hz',
14# _codecs_hk
15    'big5hkscs',
16# _codecs_jp
17    'cp932', 'shift_jis', 'euc_jp', 'euc_jisx0213', 'shift_jisx0213',
18    'euc_jis_2004', 'shift_jis_2004',
19# _codecs_kr
20    'cp949', 'euc_kr', 'johab',
21# _codecs_tw
22    'big5', 'cp950',
23# _codecs_iso2022
24    'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004',
25    'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr',
26]
27
28class Test_MultibyteCodec(unittest.TestCase):
29
30    def test_nullcoding(self):
31        for enc in ALL_CJKENCODINGS:
32            self.assertEqual(b''.decode(enc), '')
33            self.assertEqual(str(b'', enc), '')
34            self.assertEqual(''.encode(enc), b'')
35
36    def test_str_decode(self):
37        for enc in ALL_CJKENCODINGS:
38            self.assertEqual('abcd'.encode(enc), b'abcd')
39
40    def test_errorcallback_longindex(self):
41        dec = codecs.getdecoder('euc-kr')
42        myreplace  = lambda exc: ('', sys.maxsize+1)
43        codecs.register_error('test.cjktest', myreplace)
44        self.assertRaises(IndexError, dec,
45                          b'apple\x92ham\x93spam', 'test.cjktest')
46
47    def test_errorcallback_custom_ignore(self):
48        # Issue #23215: MemoryError with custom error handlers and multibyte codecs
49        data = 100 * "\udc00"
50        codecs.register_error("test.ignore", codecs.ignore_errors)
51        for enc in ALL_CJKENCODINGS:
52            self.assertEqual(data.encode(enc, "test.ignore"), b'')
53
54    def test_codingspec(self):
55        try:
56            for enc in ALL_CJKENCODINGS:
57                code = '# coding: {}\n'.format(enc)
58                exec(code)
59        finally:
60            support.unlink(TESTFN)
61
62    def test_init_segfault(self):
63        # bug #3305: this used to segfault
64        self.assertRaises(AttributeError,
65                          _multibytecodec.MultibyteStreamReader, None)
66        self.assertRaises(AttributeError,
67                          _multibytecodec.MultibyteStreamWriter, None)
68
69    def test_decode_unicode(self):
70        # Trying to decode a unicode string should raise a TypeError
71        for enc in ALL_CJKENCODINGS:
72            self.assertRaises(TypeError, codecs.getdecoder(enc), "")
73
74class Test_IncrementalEncoder(unittest.TestCase):
75
76    def test_stateless(self):
77        # cp949 encoder isn't stateful at all.
78        encoder = codecs.getincrementalencoder('cp949')()
79        self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
80                         b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
81        self.assertEqual(encoder.reset(), None)
82        self.assertEqual(encoder.encode('\u2606\u223c\u2606', True),
83                         b'\xa1\xd9\xa1\xad\xa1\xd9')
84        self.assertEqual(encoder.reset(), None)
85        self.assertEqual(encoder.encode('', True), b'')
86        self.assertEqual(encoder.encode('', False), b'')
87        self.assertEqual(encoder.reset(), None)
88
89    def test_stateful(self):
90        # jisx0213 encoder is stateful for a few code points. eg)
91        #   U+00E6 => A9DC
92        #   U+00E6 U+0300 => ABC4
93        #   U+0300 => ABDC
94
95        encoder = codecs.getincrementalencoder('jisx0213')()
96        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
97        self.assertEqual(encoder.encode('\u00e6'), b'')
98        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
99        self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')
100
101        self.assertEqual(encoder.reset(), None)
102        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
103
104        self.assertEqual(encoder.encode('\u00e6'), b'')
105        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
106        self.assertEqual(encoder.encode('', True), b'')
107
108    def test_stateful_keep_buffer(self):
109        encoder = codecs.getincrementalencoder('jisx0213')()
110        self.assertEqual(encoder.encode('\u00e6'), b'')
111        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
112        self.assertEqual(encoder.encode('\u0300\u00e6'), b'\xab\xc4')
113        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
114        self.assertEqual(encoder.reset(), None)
115        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
116        self.assertEqual(encoder.encode('\u00e6'), b'')
117        self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
118        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
119
120    def test_state_methods_with_buffer_state(self):
121        # euc_jis_2004 stores state as a buffer of pending bytes
122        encoder = codecs.getincrementalencoder('euc_jis_2004')()
123
124        initial_state = encoder.getstate()
125        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
126        encoder.setstate(initial_state)
127        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
128
129        self.assertEqual(encoder.encode('\u00e6'), b'')
130        partial_state = encoder.getstate()
131        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
132        encoder.setstate(partial_state)
133        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
134
135    def test_state_methods_with_non_buffer_state(self):
136        # iso2022_jp stores state without using a buffer
137        encoder = codecs.getincrementalencoder('iso2022_jp')()
138
139        self.assertEqual(encoder.encode('z'), b'z')
140        en_state = encoder.getstate()
141
142        self.assertEqual(encoder.encode('\u3042'), b'\x1b\x24\x42\x24\x22')
143        jp_state = encoder.getstate()
144        self.assertEqual(encoder.encode('z'), b'\x1b\x28\x42z')
145
146        encoder.setstate(jp_state)
147        self.assertEqual(encoder.encode('\u3042'), b'\x24\x22')
148
149        encoder.setstate(en_state)
150        self.assertEqual(encoder.encode('z'), b'z')
151
152    def test_getstate_returns_expected_value(self):
153        # Note: getstate is implemented such that these state values
154        # are expected to be the same across all builds of Python,
155        # regardless of x32/64 bit, endianness and compiler.
156
157        # euc_jis_2004 stores state as a buffer of pending bytes
158        buffer_state_encoder = codecs.getincrementalencoder('euc_jis_2004')()
159        self.assertEqual(buffer_state_encoder.getstate(), 0)
160        buffer_state_encoder.encode('\u00e6')
161        self.assertEqual(buffer_state_encoder.getstate(),
162                         int.from_bytes(
163                             b"\x02"
164                             b"\xc3\xa6"
165                             b"\x00\x00\x00\x00\x00\x00\x00\x00",
166                             'little'))
167        buffer_state_encoder.encode('\u0300')
168        self.assertEqual(buffer_state_encoder.getstate(), 0)
169
170        # iso2022_jp stores state without using a buffer
171        non_buffer_state_encoder = codecs.getincrementalencoder('iso2022_jp')()
172        self.assertEqual(non_buffer_state_encoder.getstate(),
173                         int.from_bytes(
174                             b"\x00"
175                             b"\x42\x42\x00\x00\x00\x00\x00\x00",
176                             'little'))
177        non_buffer_state_encoder.encode('\u3042')
178        self.assertEqual(non_buffer_state_encoder.getstate(),
179                         int.from_bytes(
180                             b"\x00"
181                             b"\xc2\x42\x00\x00\x00\x00\x00\x00",
182                             'little'))
183
184    def test_setstate_validates_input_size(self):
185        encoder = codecs.getincrementalencoder('euc_jp')()
186        pending_size_nine = int.from_bytes(
187            b"\x09"
188            b"\x00\x00\x00\x00\x00\x00\x00\x00"
189            b"\x00\x00\x00\x00\x00\x00\x00\x00",
190            'little')
191        self.assertRaises(UnicodeError, encoder.setstate, pending_size_nine)
192
193    def test_setstate_validates_input_bytes(self):
194        encoder = codecs.getincrementalencoder('euc_jp')()
195        invalid_utf8 = int.from_bytes(
196            b"\x01"
197            b"\xff"
198            b"\x00\x00\x00\x00\x00\x00\x00\x00",
199            'little')
200        self.assertRaises(UnicodeDecodeError, encoder.setstate, invalid_utf8)
201
202    def test_issue5640(self):
203        encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
204        self.assertEqual(encoder.encode('\xff'), b'\\xff')
205        self.assertEqual(encoder.encode('\n'), b'\n')
206
207class Test_IncrementalDecoder(unittest.TestCase):
208
209    def test_dbcs(self):
210        # cp949 decoder is simple with only 1 or 2 bytes sequences.
211        decoder = codecs.getincrementaldecoder('cp949')()
212        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0\xcc\xbd'),
213                         '\ud30c\uc774')
214        self.assertEqual(decoder.decode(b'\xe3 \xb8\xb6\xc0\xbb'),
215                         '\uc36c \ub9c8\uc744')
216        self.assertEqual(decoder.decode(b''), '')
217
218    def test_dbcs_keep_buffer(self):
219        decoder = codecs.getincrementaldecoder('cp949')()
220        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
221        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
222        self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
223
224        self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
225        self.assertRaises(UnicodeDecodeError, decoder.decode,
226                          b'\xcc\xbd', True)
227        self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
228
229    def test_iso2022(self):
230        decoder = codecs.getincrementaldecoder('iso2022-jp')()
231        ESC = b'\x1b'
232        self.assertEqual(decoder.decode(ESC + b'('), '')
233        self.assertEqual(decoder.decode(b'B', True), '')
234        self.assertEqual(decoder.decode(ESC + b'$'), '')
235        self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
236        self.assertEqual(decoder.decode(b'@$@'), '\u4e16')
237        self.assertEqual(decoder.decode(b'$', True), '\u4e16')
238        self.assertEqual(decoder.reset(), None)
239        self.assertEqual(decoder.decode(b'@$'), '@$')
240        self.assertEqual(decoder.decode(ESC + b'$'), '')
241        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
242        self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
243
244    def test_decode_unicode(self):
245        # Trying to decode a unicode string should raise a TypeError
246        for enc in ALL_CJKENCODINGS:
247            decoder = codecs.getincrementaldecoder(enc)()
248            self.assertRaises(TypeError, decoder.decode, "")
249
250    def test_state_methods(self):
251        decoder = codecs.getincrementaldecoder('euc_jp')()
252
253        # Decode a complete input sequence
254        self.assertEqual(decoder.decode(b'\xa4\xa6'), '\u3046')
255        pending1, _ = decoder.getstate()
256        self.assertEqual(pending1, b'')
257
258        # Decode first half of a partial input sequence
259        self.assertEqual(decoder.decode(b'\xa4'), '')
260        pending2, flags2 = decoder.getstate()
261        self.assertEqual(pending2, b'\xa4')
262
263        # Decode second half of a partial input sequence
264        self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
265        pending3, _ = decoder.getstate()
266        self.assertEqual(pending3, b'')
267
268        # Jump back and decode second half of partial input sequence again
269        decoder.setstate((pending2, flags2))
270        self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
271        pending4, _ = decoder.getstate()
272        self.assertEqual(pending4, b'')
273
274        # Ensure state values are preserved correctly
275        decoder.setstate((b'abc', 123456789))
276        self.assertEqual(decoder.getstate(), (b'abc', 123456789))
277
278    def test_setstate_validates_input(self):
279        decoder = codecs.getincrementaldecoder('euc_jp')()
280        self.assertRaises(TypeError, decoder.setstate, 123)
281        self.assertRaises(TypeError, decoder.setstate, ("invalid", 0))
282        self.assertRaises(TypeError, decoder.setstate, (b"1234", "invalid"))
283        self.assertRaises(UnicodeError, decoder.setstate, (b"123456789", 0))
284
285class Test_StreamReader(unittest.TestCase):
286    def test_bug1728403(self):
287        try:
288            f = open(TESTFN, 'wb')
289            try:
290                f.write(b'\xa1')
291            finally:
292                f.close()
293            f = codecs.open(TESTFN, encoding='cp949')
294            try:
295                self.assertRaises(UnicodeDecodeError, f.read, 2)
296            finally:
297                f.close()
298        finally:
299            support.unlink(TESTFN)
300
301class Test_StreamWriter(unittest.TestCase):
302    def test_gb18030(self):
303        s= io.BytesIO()
304        c = codecs.getwriter('gb18030')(s)
305        c.write('123')
306        self.assertEqual(s.getvalue(), b'123')
307        c.write('\U00012345')
308        self.assertEqual(s.getvalue(), b'123\x907\x959')
309        c.write('\uac00\u00ac')
310        self.assertEqual(s.getvalue(),
311                b'123\x907\x959\x827\xcf5\x810\x851')
312
313    def test_utf_8(self):
314        s= io.BytesIO()
315        c = codecs.getwriter('utf-8')(s)
316        c.write('123')
317        self.assertEqual(s.getvalue(), b'123')
318        c.write('\U00012345')
319        self.assertEqual(s.getvalue(), b'123\xf0\x92\x8d\x85')
320        c.write('\uac00\u00ac')
321        self.assertEqual(s.getvalue(),
322            b'123\xf0\x92\x8d\x85'
323            b'\xea\xb0\x80\xc2\xac')
324
325    def test_streamwriter_strwrite(self):
326        s = io.BytesIO()
327        wr = codecs.getwriter('gb18030')(s)
328        wr.write('abcd')
329        self.assertEqual(s.getvalue(), b'abcd')
330
331class Test_ISO2022(unittest.TestCase):
332    def test_g2(self):
333        iso2022jp2 = b'\x1b(B:hu4:unit\x1b.A\x1bNi de famille'
334        uni = ':hu4:unit\xe9 de famille'
335        self.assertEqual(iso2022jp2.decode('iso2022-jp-2'), uni)
336
337    def test_iso2022_jp_g0(self):
338        self.assertNotIn(b'\x0e', '\N{SOFT HYPHEN}'.encode('iso-2022-jp-2'))
339        for encoding in ('iso-2022-jp-2004', 'iso-2022-jp-3'):
340            e = '\u3406'.encode(encoding)
341            self.assertFalse(any(x > 0x80 for x in e))
342
343    def test_bug1572832(self):
344        for x in range(0x10000, 0x110000):
345            # Any ISO 2022 codec will cause the segfault
346            chr(x).encode('iso_2022_jp', 'ignore')
347
348class TestStateful(unittest.TestCase):
349    text = '\u4E16\u4E16'
350    encoding = 'iso-2022-jp'
351    expected = b'\x1b$B@$@$'
352    reset = b'\x1b(B'
353    expected_reset = expected + reset
354
355    def test_encode(self):
356        self.assertEqual(self.text.encode(self.encoding), self.expected_reset)
357
358    def test_incrementalencoder(self):
359        encoder = codecs.getincrementalencoder(self.encoding)()
360        output = b''.join(
361            encoder.encode(char)
362            for char in self.text)
363        self.assertEqual(output, self.expected)
364        self.assertEqual(encoder.encode('', final=True), self.reset)
365        self.assertEqual(encoder.encode('', final=True), b'')
366
367    def test_incrementalencoder_final(self):
368        encoder = codecs.getincrementalencoder(self.encoding)()
369        last_index = len(self.text) - 1
370        output = b''.join(
371            encoder.encode(char, index == last_index)
372            for index, char in enumerate(self.text))
373        self.assertEqual(output, self.expected_reset)
374        self.assertEqual(encoder.encode('', final=True), b'')
375
376class TestHZStateful(TestStateful):
377    text = '\u804a\u804a'
378    encoding = 'hz'
379    expected = b'~{ADAD'
380    reset = b'~}'
381    expected_reset = expected + reset
382
383def test_main():
384    support.run_unittest(__name__)
385
386if __name__ == "__main__":
387    test_main()
388