• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import codecs
2import contextlib
3import io
4import locale
5import sys
6import unittest
7import encodings
8from unittest import mock
9
10from test import support
11from test.support import os_helper
12from test.support import warnings_helper
13
14try:
15    import _testcapi
16except ImportError:
17    _testcapi = None
18
19try:
20    import ctypes
21except ImportError:
22    ctypes = None
23    SIZEOF_WCHAR_T = -1
24else:
25    SIZEOF_WCHAR_T = ctypes.sizeof(ctypes.c_wchar)
26
27def coding_checker(self, coder):
28    def check(input, expect):
29        self.assertEqual(coder(input), (expect, len(input)))
30    return check
31
32# On small versions of Windows like Windows IoT or Windows Nano Server not all codepages are present
33def is_code_page_present(cp):
34    from ctypes import POINTER, WINFUNCTYPE, WinDLL
35    from ctypes.wintypes import BOOL, UINT, BYTE, WCHAR, UINT, DWORD
36
37    MAX_LEADBYTES = 12  # 5 ranges, 2 bytes ea., 0 term.
38    MAX_DEFAULTCHAR = 2 # single or double byte
39    MAX_PATH = 260
40    class CPINFOEXW(ctypes.Structure):
41        _fields_ = [("MaxCharSize", UINT),
42                    ("DefaultChar", BYTE*MAX_DEFAULTCHAR),
43                    ("LeadByte", BYTE*MAX_LEADBYTES),
44                    ("UnicodeDefaultChar", WCHAR),
45                    ("CodePage", UINT),
46                    ("CodePageName", WCHAR*MAX_PATH)]
47
48    prototype = WINFUNCTYPE(BOOL, UINT, DWORD, POINTER(CPINFOEXW))
49    GetCPInfoEx = prototype(("GetCPInfoExW", WinDLL("kernel32")))
50    info = CPINFOEXW()
51    return GetCPInfoEx(cp, 0, info)
52
53class Queue(object):
54    """
55    queue: write bytes at one end, read bytes from the other end
56    """
57    def __init__(self, buffer):
58        self._buffer = buffer
59
60    def write(self, chars):
61        self._buffer += chars
62
63    def read(self, size=-1):
64        if size<0:
65            s = self._buffer
66            self._buffer = self._buffer[:0] # make empty
67            return s
68        else:
69            s = self._buffer[:size]
70            self._buffer = self._buffer[size:]
71            return s
72
73
74class MixInCheckStateHandling:
75    def check_state_handling_decode(self, encoding, u, s):
76        for i in range(len(s)+1):
77            d = codecs.getincrementaldecoder(encoding)()
78            part1 = d.decode(s[:i])
79            state = d.getstate()
80            self.assertIsInstance(state[1], int)
81            # Check that the condition stated in the documentation for
82            # IncrementalDecoder.getstate() holds
83            if not state[1]:
84                # reset decoder to the default state without anything buffered
85                d.setstate((state[0][:0], 0))
86                # Feeding the previous input may not produce any output
87                self.assertTrue(not d.decode(state[0]))
88                # The decoder must return to the same state
89                self.assertEqual(state, d.getstate())
90            # Create a new decoder and set it to the state
91            # we extracted from the old one
92            d = codecs.getincrementaldecoder(encoding)()
93            d.setstate(state)
94            part2 = d.decode(s[i:], True)
95            self.assertEqual(u, part1+part2)
96
97    def check_state_handling_encode(self, encoding, u, s):
98        for i in range(len(u)+1):
99            d = codecs.getincrementalencoder(encoding)()
100            part1 = d.encode(u[:i])
101            state = d.getstate()
102            d = codecs.getincrementalencoder(encoding)()
103            d.setstate(state)
104            part2 = d.encode(u[i:], True)
105            self.assertEqual(s, part1+part2)
106
107
108class ReadTest(MixInCheckStateHandling):
109    def check_partial(self, input, partialresults):
110        # get a StreamReader for the encoding and feed the bytestring version
111        # of input to the reader byte by byte. Read everything available from
112        # the StreamReader and check that the results equal the appropriate
113        # entries from partialresults.
114        q = Queue(b"")
115        r = codecs.getreader(self.encoding)(q)
116        result = ""
117        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
118            q.write(bytes([c]))
119            result += r.read()
120            self.assertEqual(result, partialresult)
121        # check that there's nothing left in the buffers
122        self.assertEqual(r.read(), "")
123        self.assertEqual(r.bytebuffer, b"")
124
125        # do the check again, this time using an incremental decoder
126        d = codecs.getincrementaldecoder(self.encoding)()
127        result = ""
128        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
129            result += d.decode(bytes([c]))
130            self.assertEqual(result, partialresult)
131        # check that there's nothing left in the buffers
132        self.assertEqual(d.decode(b"", True), "")
133        self.assertEqual(d.buffer, b"")
134
135        # Check whether the reset method works properly
136        d.reset()
137        result = ""
138        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
139            result += d.decode(bytes([c]))
140            self.assertEqual(result, partialresult)
141        # check that there's nothing left in the buffers
142        self.assertEqual(d.decode(b"", True), "")
143        self.assertEqual(d.buffer, b"")
144
145        # check iterdecode()
146        encoded = input.encode(self.encoding)
147        self.assertEqual(
148            input,
149            "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding))
150        )
151
152    def test_readline(self):
153        def getreader(input):
154            stream = io.BytesIO(input.encode(self.encoding))
155            return codecs.getreader(self.encoding)(stream)
156
157        def readalllines(input, keepends=True, size=None):
158            reader = getreader(input)
159            lines = []
160            while True:
161                line = reader.readline(size=size, keepends=keepends)
162                if not line:
163                    break
164                lines.append(line)
165            return "|".join(lines)
166
167        s = "foo\nbar\r\nbaz\rspam\u2028eggs"
168        sexpected = "foo\n|bar\r\n|baz\r|spam\u2028|eggs"
169        sexpectednoends = "foo|bar|baz|spam|eggs"
170        self.assertEqual(readalllines(s, True), sexpected)
171        self.assertEqual(readalllines(s, False), sexpectednoends)
172        self.assertEqual(readalllines(s, True, 10), sexpected)
173        self.assertEqual(readalllines(s, False, 10), sexpectednoends)
174
175        lineends = ("\n", "\r\n", "\r", "\u2028")
176        # Test long lines (multiple calls to read() in readline())
177        vw = []
178        vwo = []
179        for (i, lineend) in enumerate(lineends):
180            vw.append((i*200+200)*"\u3042" + lineend)
181            vwo.append((i*200+200)*"\u3042")
182        self.assertEqual(readalllines("".join(vw), True), "|".join(vw))
183        self.assertEqual(readalllines("".join(vw), False), "|".join(vwo))
184
185        # Test lines where the first read might end with \r, so the
186        # reader has to look ahead whether this is a lone \r or a \r\n
187        for size in range(80):
188            for lineend in lineends:
189                s = 10*(size*"a" + lineend + "xxx\n")
190                reader = getreader(s)
191                for i in range(10):
192                    self.assertEqual(
193                        reader.readline(keepends=True),
194                        size*"a" + lineend,
195                    )
196                    self.assertEqual(
197                        reader.readline(keepends=True),
198                        "xxx\n",
199                    )
200                reader = getreader(s)
201                for i in range(10):
202                    self.assertEqual(
203                        reader.readline(keepends=False),
204                        size*"a",
205                    )
206                    self.assertEqual(
207                        reader.readline(keepends=False),
208                        "xxx",
209                    )
210
211    def test_mixed_readline_and_read(self):
212        lines = ["Humpty Dumpty sat on a wall,\n",
213                 "Humpty Dumpty had a great fall.\r\n",
214                 "All the king's horses and all the king's men\r",
215                 "Couldn't put Humpty together again."]
216        data = ''.join(lines)
217        def getreader():
218            stream = io.BytesIO(data.encode(self.encoding))
219            return codecs.getreader(self.encoding)(stream)
220
221        # Issue #8260: Test readline() followed by read()
222        f = getreader()
223        self.assertEqual(f.readline(), lines[0])
224        self.assertEqual(f.read(), ''.join(lines[1:]))
225        self.assertEqual(f.read(), '')
226
227        # Issue #32110: Test readline() followed by read(n)
228        f = getreader()
229        self.assertEqual(f.readline(), lines[0])
230        self.assertEqual(f.read(1), lines[1][0])
231        self.assertEqual(f.read(0), '')
232        self.assertEqual(f.read(100), data[len(lines[0]) + 1:][:100])
233
234        # Issue #16636: Test readline() followed by readlines()
235        f = getreader()
236        self.assertEqual(f.readline(), lines[0])
237        self.assertEqual(f.readlines(), lines[1:])
238        self.assertEqual(f.read(), '')
239
240        # Test read(n) followed by read()
241        f = getreader()
242        self.assertEqual(f.read(size=40, chars=5), data[:5])
243        self.assertEqual(f.read(), data[5:])
244        self.assertEqual(f.read(), '')
245
246        # Issue #32110: Test read(n) followed by read(n)
247        f = getreader()
248        self.assertEqual(f.read(size=40, chars=5), data[:5])
249        self.assertEqual(f.read(1), data[5])
250        self.assertEqual(f.read(0), '')
251        self.assertEqual(f.read(100), data[6:106])
252
253        # Issue #12446: Test read(n) followed by readlines()
254        f = getreader()
255        self.assertEqual(f.read(size=40, chars=5), data[:5])
256        self.assertEqual(f.readlines(), [lines[0][5:]] + lines[1:])
257        self.assertEqual(f.read(), '')
258
259    def test_bug1175396(self):
260        s = [
261            '<%!--===================================================\r\n',
262            '    BLOG index page: show recent articles,\r\n',
263            '    today\'s articles, or articles of a specific date.\r\n',
264            '========================================================--%>\r\n',
265            '<%@inputencoding="ISO-8859-1"%>\r\n',
266            '<%@pagetemplate=TEMPLATE.y%>\r\n',
267            '<%@import=import frog.util, frog%>\r\n',
268            '<%@import=import frog.objects%>\r\n',
269            '<%@import=from frog.storageerrors import StorageError%>\r\n',
270            '<%\r\n',
271            '\r\n',
272            'import logging\r\n',
273            'log=logging.getLogger("Snakelets.logger")\r\n',
274            '\r\n',
275            '\r\n',
276            'user=self.SessionCtx.user\r\n',
277            'storageEngine=self.SessionCtx.storageEngine\r\n',
278            '\r\n',
279            '\r\n',
280            'def readArticlesFromDate(date, count=None):\r\n',
281            '    entryids=storageEngine.listBlogEntries(date)\r\n',
282            '    entryids.reverse() # descending\r\n',
283            '    if count:\r\n',
284            '        entryids=entryids[:count]\r\n',
285            '    try:\r\n',
286            '        return [ frog.objects.BlogEntry.load(storageEngine, date, Id) for Id in entryids ]\r\n',
287            '    except StorageError,x:\r\n',
288            '        log.error("Error loading articles: "+str(x))\r\n',
289            '        self.abort("cannot load articles")\r\n',
290            '\r\n',
291            'showdate=None\r\n',
292            '\r\n',
293            'arg=self.Request.getArg()\r\n',
294            'if arg=="today":\r\n',
295            '    #-------------------- TODAY\'S ARTICLES\r\n',
296            '    self.write("<h2>Today\'s articles</h2>")\r\n',
297            '    showdate = frog.util.isodatestr() \r\n',
298            '    entries = readArticlesFromDate(showdate)\r\n',
299            'elif arg=="active":\r\n',
300            '    #-------------------- ACTIVE ARTICLES redirect\r\n',
301            '    self.Yredirect("active.y")\r\n',
302            'elif arg=="login":\r\n',
303            '    #-------------------- LOGIN PAGE redirect\r\n',
304            '    self.Yredirect("login.y")\r\n',
305            'elif arg=="date":\r\n',
306            '    #-------------------- ARTICLES OF A SPECIFIC DATE\r\n',
307            '    showdate = self.Request.getParameter("date")\r\n',
308            '    self.write("<h2>Articles written on %s</h2>"% frog.util.mediumdatestr(showdate))\r\n',
309            '    entries = readArticlesFromDate(showdate)\r\n',
310            'else:\r\n',
311            '    #-------------------- RECENT ARTICLES\r\n',
312            '    self.write("<h2>Recent articles</h2>")\r\n',
313            '    dates=storageEngine.listBlogEntryDates()\r\n',
314            '    if dates:\r\n',
315            '        entries=[]\r\n',
316            '        SHOWAMOUNT=10\r\n',
317            '        for showdate in dates:\r\n',
318            '            entries.extend( readArticlesFromDate(showdate, SHOWAMOUNT-len(entries)) )\r\n',
319            '            if len(entries)>=SHOWAMOUNT:\r\n',
320            '                break\r\n',
321            '                \r\n',
322        ]
323        stream = io.BytesIO("".join(s).encode(self.encoding))
324        reader = codecs.getreader(self.encoding)(stream)
325        for (i, line) in enumerate(reader):
326            self.assertEqual(line, s[i])
327
328    def test_readlinequeue(self):
329        q = Queue(b"")
330        writer = codecs.getwriter(self.encoding)(q)
331        reader = codecs.getreader(self.encoding)(q)
332
333        # No lineends
334        writer.write("foo\r")
335        self.assertEqual(reader.readline(keepends=False), "foo")
336        writer.write("\nbar\r")
337        self.assertEqual(reader.readline(keepends=False), "")
338        self.assertEqual(reader.readline(keepends=False), "bar")
339        writer.write("baz")
340        self.assertEqual(reader.readline(keepends=False), "baz")
341        self.assertEqual(reader.readline(keepends=False), "")
342
343        # Lineends
344        writer.write("foo\r")
345        self.assertEqual(reader.readline(keepends=True), "foo\r")
346        writer.write("\nbar\r")
347        self.assertEqual(reader.readline(keepends=True), "\n")
348        self.assertEqual(reader.readline(keepends=True), "bar\r")
349        writer.write("baz")
350        self.assertEqual(reader.readline(keepends=True), "baz")
351        self.assertEqual(reader.readline(keepends=True), "")
352        writer.write("foo\r\n")
353        self.assertEqual(reader.readline(keepends=True), "foo\r\n")
354
355    def test_bug1098990_a(self):
356        s1 = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy\r\n"
357        s2 = "offending line: ladfj askldfj klasdj fskla dfzaskdj fasklfj laskd fjasklfzzzzaa%whereisthis!!!\r\n"
358        s3 = "next line.\r\n"
359
360        s = (s1+s2+s3).encode(self.encoding)
361        stream = io.BytesIO(s)
362        reader = codecs.getreader(self.encoding)(stream)
363        self.assertEqual(reader.readline(), s1)
364        self.assertEqual(reader.readline(), s2)
365        self.assertEqual(reader.readline(), s3)
366        self.assertEqual(reader.readline(), "")
367
368    def test_bug1098990_b(self):
369        s1 = "aaaaaaaaaaaaaaaaaaaaaaaa\r\n"
370        s2 = "bbbbbbbbbbbbbbbbbbbbbbbb\r\n"
371        s3 = "stillokay:bbbbxx\r\n"
372        s4 = "broken!!!!badbad\r\n"
373        s5 = "againokay.\r\n"
374
375        s = (s1+s2+s3+s4+s5).encode(self.encoding)
376        stream = io.BytesIO(s)
377        reader = codecs.getreader(self.encoding)(stream)
378        self.assertEqual(reader.readline(), s1)
379        self.assertEqual(reader.readline(), s2)
380        self.assertEqual(reader.readline(), s3)
381        self.assertEqual(reader.readline(), s4)
382        self.assertEqual(reader.readline(), s5)
383        self.assertEqual(reader.readline(), "")
384
385    ill_formed_sequence_replace = "\ufffd"
386
387    def test_lone_surrogates(self):
388        self.assertRaises(UnicodeEncodeError, "\ud800".encode, self.encoding)
389        self.assertEqual("[\uDC80]".encode(self.encoding, "backslashreplace"),
390                         "[\\udc80]".encode(self.encoding))
391        self.assertEqual("[\uDC80]".encode(self.encoding, "namereplace"),
392                         "[\\udc80]".encode(self.encoding))
393        self.assertEqual("[\uDC80]".encode(self.encoding, "xmlcharrefreplace"),
394                         "[&#56448;]".encode(self.encoding))
395        self.assertEqual("[\uDC80]".encode(self.encoding, "ignore"),
396                         "[]".encode(self.encoding))
397        self.assertEqual("[\uDC80]".encode(self.encoding, "replace"),
398                         "[?]".encode(self.encoding))
399
400        # sequential surrogate characters
401        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "ignore"),
402                         "[]".encode(self.encoding))
403        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "replace"),
404                         "[??]".encode(self.encoding))
405
406        bom = "".encode(self.encoding)
407        for before, after in [("\U00010fff", "A"), ("[", "]"),
408                              ("A", "\U00010fff")]:
409            before_sequence = before.encode(self.encoding)[len(bom):]
410            after_sequence = after.encode(self.encoding)[len(bom):]
411            test_string = before + "\uDC80" + after
412            test_sequence = (bom + before_sequence +
413                             self.ill_formed_sequence + after_sequence)
414            self.assertRaises(UnicodeDecodeError, test_sequence.decode,
415                              self.encoding)
416            self.assertEqual(test_string.encode(self.encoding,
417                                                "surrogatepass"),
418                             test_sequence)
419            self.assertEqual(test_sequence.decode(self.encoding,
420                                                  "surrogatepass"),
421                             test_string)
422            self.assertEqual(test_sequence.decode(self.encoding, "ignore"),
423                             before + after)
424            self.assertEqual(test_sequence.decode(self.encoding, "replace"),
425                             before + self.ill_formed_sequence_replace + after)
426            backslashreplace = ''.join('\\x%02x' % b
427                                       for b in self.ill_formed_sequence)
428            self.assertEqual(test_sequence.decode(self.encoding, "backslashreplace"),
429                             before + backslashreplace + after)
430
431    def test_incremental_surrogatepass(self):
432        # Test incremental decoder for surrogatepass handler:
433        # see issue #24214
434        # High surrogate
435        data = '\uD901'.encode(self.encoding, 'surrogatepass')
436        for i in range(1, len(data)):
437            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
438            self.assertEqual(dec.decode(data[:i]), '')
439            self.assertEqual(dec.decode(data[i:], True), '\uD901')
440        # Low surrogate
441        data = '\uDC02'.encode(self.encoding, 'surrogatepass')
442        for i in range(1, len(data)):
443            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
444            self.assertEqual(dec.decode(data[:i]), '')
445            self.assertEqual(dec.decode(data[i:]), '\uDC02')
446
447
448class UTF32Test(ReadTest, unittest.TestCase):
449    encoding = "utf-32"
450    if sys.byteorder == 'little':
451        ill_formed_sequence = b"\x80\xdc\x00\x00"
452    else:
453        ill_formed_sequence = b"\x00\x00\xdc\x80"
454
455    spamle = (b'\xff\xfe\x00\x00'
456              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00'
457              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00')
458    spambe = (b'\x00\x00\xfe\xff'
459              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m'
460              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m')
461
462    def test_only_one_bom(self):
463        _,_,reader,writer = codecs.lookup(self.encoding)
464        # encode some stream
465        s = io.BytesIO()
466        f = writer(s)
467        f.write("spam")
468        f.write("spam")
469        d = s.getvalue()
470        # check whether there is exactly one BOM in it
471        self.assertTrue(d == self.spamle or d == self.spambe)
472        # try to read it back
473        s = io.BytesIO(d)
474        f = reader(s)
475        self.assertEqual(f.read(), "spamspam")
476
477    def test_badbom(self):
478        s = io.BytesIO(4*b"\xff")
479        f = codecs.getreader(self.encoding)(s)
480        self.assertRaises(UnicodeError, f.read)
481
482        s = io.BytesIO(8*b"\xff")
483        f = codecs.getreader(self.encoding)(s)
484        self.assertRaises(UnicodeError, f.read)
485
486    def test_partial(self):
487        self.check_partial(
488            "\x00\xff\u0100\uffff\U00010000",
489            [
490                "", # first byte of BOM read
491                "", # second byte of BOM read
492                "", # third byte of BOM read
493                "", # fourth byte of BOM read => byteorder known
494                "",
495                "",
496                "",
497                "\x00",
498                "\x00",
499                "\x00",
500                "\x00",
501                "\x00\xff",
502                "\x00\xff",
503                "\x00\xff",
504                "\x00\xff",
505                "\x00\xff\u0100",
506                "\x00\xff\u0100",
507                "\x00\xff\u0100",
508                "\x00\xff\u0100",
509                "\x00\xff\u0100\uffff",
510                "\x00\xff\u0100\uffff",
511                "\x00\xff\u0100\uffff",
512                "\x00\xff\u0100\uffff",
513                "\x00\xff\u0100\uffff\U00010000",
514            ]
515        )
516
517    def test_handlers(self):
518        self.assertEqual(('\ufffd', 1),
519                         codecs.utf_32_decode(b'\x01', 'replace', True))
520        self.assertEqual(('', 1),
521                         codecs.utf_32_decode(b'\x01', 'ignore', True))
522
523    def test_errors(self):
524        self.assertRaises(UnicodeDecodeError, codecs.utf_32_decode,
525                          b"\xff", "strict", True)
526
527    def test_decoder_state(self):
528        self.check_state_handling_decode(self.encoding,
529                                         "spamspam", self.spamle)
530        self.check_state_handling_decode(self.encoding,
531                                         "spamspam", self.spambe)
532
533    def test_issue8941(self):
534        # Issue #8941: insufficient result allocation when decoding into
535        # surrogate pairs on UCS-2 builds.
536        encoded_le = b'\xff\xfe\x00\x00' + b'\x00\x00\x01\x00' * 1024
537        self.assertEqual('\U00010000' * 1024,
538                         codecs.utf_32_decode(encoded_le)[0])
539        encoded_be = b'\x00\x00\xfe\xff' + b'\x00\x01\x00\x00' * 1024
540        self.assertEqual('\U00010000' * 1024,
541                         codecs.utf_32_decode(encoded_be)[0])
542
543
544class UTF32LETest(ReadTest, unittest.TestCase):
545    encoding = "utf-32-le"
546    ill_formed_sequence = b"\x80\xdc\x00\x00"
547
548    def test_partial(self):
549        self.check_partial(
550            "\x00\xff\u0100\uffff\U00010000",
551            [
552                "",
553                "",
554                "",
555                "\x00",
556                "\x00",
557                "\x00",
558                "\x00",
559                "\x00\xff",
560                "\x00\xff",
561                "\x00\xff",
562                "\x00\xff",
563                "\x00\xff\u0100",
564                "\x00\xff\u0100",
565                "\x00\xff\u0100",
566                "\x00\xff\u0100",
567                "\x00\xff\u0100\uffff",
568                "\x00\xff\u0100\uffff",
569                "\x00\xff\u0100\uffff",
570                "\x00\xff\u0100\uffff",
571                "\x00\xff\u0100\uffff\U00010000",
572            ]
573        )
574
575    def test_simple(self):
576        self.assertEqual("\U00010203".encode(self.encoding), b"\x03\x02\x01\x00")
577
578    def test_errors(self):
579        self.assertRaises(UnicodeDecodeError, codecs.utf_32_le_decode,
580                          b"\xff", "strict", True)
581
582    def test_issue8941(self):
583        # Issue #8941: insufficient result allocation when decoding into
584        # surrogate pairs on UCS-2 builds.
585        encoded = b'\x00\x00\x01\x00' * 1024
586        self.assertEqual('\U00010000' * 1024,
587                         codecs.utf_32_le_decode(encoded)[0])
588
589
590class UTF32BETest(ReadTest, unittest.TestCase):
591    encoding = "utf-32-be"
592    ill_formed_sequence = b"\x00\x00\xdc\x80"
593
594    def test_partial(self):
595        self.check_partial(
596            "\x00\xff\u0100\uffff\U00010000",
597            [
598                "",
599                "",
600                "",
601                "\x00",
602                "\x00",
603                "\x00",
604                "\x00",
605                "\x00\xff",
606                "\x00\xff",
607                "\x00\xff",
608                "\x00\xff",
609                "\x00\xff\u0100",
610                "\x00\xff\u0100",
611                "\x00\xff\u0100",
612                "\x00\xff\u0100",
613                "\x00\xff\u0100\uffff",
614                "\x00\xff\u0100\uffff",
615                "\x00\xff\u0100\uffff",
616                "\x00\xff\u0100\uffff",
617                "\x00\xff\u0100\uffff\U00010000",
618            ]
619        )
620
621    def test_simple(self):
622        self.assertEqual("\U00010203".encode(self.encoding), b"\x00\x01\x02\x03")
623
624    def test_errors(self):
625        self.assertRaises(UnicodeDecodeError, codecs.utf_32_be_decode,
626                          b"\xff", "strict", True)
627
628    def test_issue8941(self):
629        # Issue #8941: insufficient result allocation when decoding into
630        # surrogate pairs on UCS-2 builds.
631        encoded = b'\x00\x01\x00\x00' * 1024
632        self.assertEqual('\U00010000' * 1024,
633                         codecs.utf_32_be_decode(encoded)[0])
634
635
636class UTF16Test(ReadTest, unittest.TestCase):
637    encoding = "utf-16"
638    if sys.byteorder == 'little':
639        ill_formed_sequence = b"\x80\xdc"
640    else:
641        ill_formed_sequence = b"\xdc\x80"
642
643    spamle = b'\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00'
644    spambe = b'\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m'
645
646    def test_only_one_bom(self):
647        _,_,reader,writer = codecs.lookup(self.encoding)
648        # encode some stream
649        s = io.BytesIO()
650        f = writer(s)
651        f.write("spam")
652        f.write("spam")
653        d = s.getvalue()
654        # check whether there is exactly one BOM in it
655        self.assertTrue(d == self.spamle or d == self.spambe)
656        # try to read it back
657        s = io.BytesIO(d)
658        f = reader(s)
659        self.assertEqual(f.read(), "spamspam")
660
661    def test_badbom(self):
662        s = io.BytesIO(b"\xff\xff")
663        f = codecs.getreader(self.encoding)(s)
664        self.assertRaises(UnicodeError, f.read)
665
666        s = io.BytesIO(b"\xff\xff\xff\xff")
667        f = codecs.getreader(self.encoding)(s)
668        self.assertRaises(UnicodeError, f.read)
669
670    def test_partial(self):
671        self.check_partial(
672            "\x00\xff\u0100\uffff\U00010000",
673            [
674                "", # first byte of BOM read
675                "", # second byte of BOM read => byteorder known
676                "",
677                "\x00",
678                "\x00",
679                "\x00\xff",
680                "\x00\xff",
681                "\x00\xff\u0100",
682                "\x00\xff\u0100",
683                "\x00\xff\u0100\uffff",
684                "\x00\xff\u0100\uffff",
685                "\x00\xff\u0100\uffff",
686                "\x00\xff\u0100\uffff",
687                "\x00\xff\u0100\uffff\U00010000",
688            ]
689        )
690
691    def test_handlers(self):
692        self.assertEqual(('\ufffd', 1),
693                         codecs.utf_16_decode(b'\x01', 'replace', True))
694        self.assertEqual(('', 1),
695                         codecs.utf_16_decode(b'\x01', 'ignore', True))
696
697    def test_errors(self):
698        self.assertRaises(UnicodeDecodeError, codecs.utf_16_decode,
699                          b"\xff", "strict", True)
700
701    def test_decoder_state(self):
702        self.check_state_handling_decode(self.encoding,
703                                         "spamspam", self.spamle)
704        self.check_state_handling_decode(self.encoding,
705                                         "spamspam", self.spambe)
706
707    def test_bug691291(self):
708        # Files are always opened in binary mode, even if no binary mode was
709        # specified.  This means that no automatic conversion of '\n' is done
710        # on reading and writing.
711        s1 = 'Hello\r\nworld\r\n'
712
713        s = s1.encode(self.encoding)
714        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
715        with open(os_helper.TESTFN, 'wb') as fp:
716            fp.write(s)
717        with warnings_helper.check_warnings(('', DeprecationWarning)):
718            reader = codecs.open(os_helper.TESTFN, 'U', encoding=self.encoding)
719        with reader:
720            self.assertEqual(reader.read(), s1)
721
722class UTF16LETest(ReadTest, unittest.TestCase):
723    encoding = "utf-16-le"
724    ill_formed_sequence = b"\x80\xdc"
725
726    def test_partial(self):
727        self.check_partial(
728            "\x00\xff\u0100\uffff\U00010000",
729            [
730                "",
731                "\x00",
732                "\x00",
733                "\x00\xff",
734                "\x00\xff",
735                "\x00\xff\u0100",
736                "\x00\xff\u0100",
737                "\x00\xff\u0100\uffff",
738                "\x00\xff\u0100\uffff",
739                "\x00\xff\u0100\uffff",
740                "\x00\xff\u0100\uffff",
741                "\x00\xff\u0100\uffff\U00010000",
742            ]
743        )
744
745    def test_errors(self):
746        tests = [
747            (b'\xff', '\ufffd'),
748            (b'A\x00Z', 'A\ufffd'),
749            (b'A\x00B\x00C\x00D\x00Z', 'ABCD\ufffd'),
750            (b'\x00\xd8', '\ufffd'),
751            (b'\x00\xd8A', '\ufffd'),
752            (b'\x00\xd8A\x00', '\ufffdA'),
753            (b'\x00\xdcA\x00', '\ufffdA'),
754        ]
755        for raw, expected in tests:
756            self.assertRaises(UnicodeDecodeError, codecs.utf_16_le_decode,
757                              raw, 'strict', True)
758            self.assertEqual(raw.decode('utf-16le', 'replace'), expected)
759
760    def test_nonbmp(self):
761        self.assertEqual("\U00010203".encode(self.encoding),
762                         b'\x00\xd8\x03\xde')
763        self.assertEqual(b'\x00\xd8\x03\xde'.decode(self.encoding),
764                         "\U00010203")
765
766class UTF16BETest(ReadTest, unittest.TestCase):
767    encoding = "utf-16-be"
768    ill_formed_sequence = b"\xdc\x80"
769
770    def test_partial(self):
771        self.check_partial(
772            "\x00\xff\u0100\uffff\U00010000",
773            [
774                "",
775                "\x00",
776                "\x00",
777                "\x00\xff",
778                "\x00\xff",
779                "\x00\xff\u0100",
780                "\x00\xff\u0100",
781                "\x00\xff\u0100\uffff",
782                "\x00\xff\u0100\uffff",
783                "\x00\xff\u0100\uffff",
784                "\x00\xff\u0100\uffff",
785                "\x00\xff\u0100\uffff\U00010000",
786            ]
787        )
788
789    def test_errors(self):
790        tests = [
791            (b'\xff', '\ufffd'),
792            (b'\x00A\xff', 'A\ufffd'),
793            (b'\x00A\x00B\x00C\x00DZ', 'ABCD\ufffd'),
794            (b'\xd8\x00', '\ufffd'),
795            (b'\xd8\x00\xdc', '\ufffd'),
796            (b'\xd8\x00\x00A', '\ufffdA'),
797            (b'\xdc\x00\x00A', '\ufffdA'),
798        ]
799        for raw, expected in tests:
800            self.assertRaises(UnicodeDecodeError, codecs.utf_16_be_decode,
801                              raw, 'strict', True)
802            self.assertEqual(raw.decode('utf-16be', 'replace'), expected)
803
804    def test_nonbmp(self):
805        self.assertEqual("\U00010203".encode(self.encoding),
806                         b'\xd8\x00\xde\x03')
807        self.assertEqual(b'\xd8\x00\xde\x03'.decode(self.encoding),
808                         "\U00010203")
809
810class UTF8Test(ReadTest, unittest.TestCase):
811    encoding = "utf-8"
812    ill_formed_sequence = b"\xed\xb2\x80"
813    ill_formed_sequence_replace = "\ufffd" * 3
814    BOM = b''
815
816    def test_partial(self):
817        self.check_partial(
818            "\x00\xff\u07ff\u0800\uffff\U00010000",
819            [
820                "\x00",
821                "\x00",
822                "\x00\xff",
823                "\x00\xff",
824                "\x00\xff\u07ff",
825                "\x00\xff\u07ff",
826                "\x00\xff\u07ff",
827                "\x00\xff\u07ff\u0800",
828                "\x00\xff\u07ff\u0800",
829                "\x00\xff\u07ff\u0800",
830                "\x00\xff\u07ff\u0800\uffff",
831                "\x00\xff\u07ff\u0800\uffff",
832                "\x00\xff\u07ff\u0800\uffff",
833                "\x00\xff\u07ff\u0800\uffff",
834                "\x00\xff\u07ff\u0800\uffff\U00010000",
835            ]
836        )
837
838    def test_decoder_state(self):
839        u = "\x00\x7f\x80\xff\u0100\u07ff\u0800\uffff\U0010ffff"
840        self.check_state_handling_decode(self.encoding,
841                                         u, u.encode(self.encoding))
842
843    def test_decode_error(self):
844        for data, error_handler, expected in (
845            (b'[\x80\xff]', 'ignore', '[]'),
846            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
847            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
848            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
849        ):
850            with self.subTest(data=data, error_handler=error_handler,
851                              expected=expected):
852                self.assertEqual(data.decode(self.encoding, error_handler),
853                                 expected)
854
855    def test_lone_surrogates(self):
856        super().test_lone_surrogates()
857        # not sure if this is making sense for
858        # UTF-16 and UTF-32
859        self.assertEqual("[\uDC80]".encode(self.encoding, "surrogateescape"),
860                         self.BOM + b'[\x80]')
861
862        with self.assertRaises(UnicodeEncodeError) as cm:
863            "[\uDC80\uD800\uDFFF]".encode(self.encoding, "surrogateescape")
864        exc = cm.exception
865        self.assertEqual(exc.object[exc.start:exc.end], '\uD800\uDFFF')
866
867    def test_surrogatepass_handler(self):
868        self.assertEqual("abc\ud800def".encode(self.encoding, "surrogatepass"),
869                         self.BOM + b"abc\xed\xa0\x80def")
870        self.assertEqual("\U00010fff\uD800".encode(self.encoding, "surrogatepass"),
871                         self.BOM + b"\xf0\x90\xbf\xbf\xed\xa0\x80")
872        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "surrogatepass"),
873                         self.BOM + b'[\xed\xa0\x80\xed\xb2\x80]')
874
875        self.assertEqual(b"abc\xed\xa0\x80def".decode(self.encoding, "surrogatepass"),
876                         "abc\ud800def")
877        self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode(self.encoding, "surrogatepass"),
878                         "\U00010fff\uD800")
879
880        self.assertTrue(codecs.lookup_error("surrogatepass"))
881        with self.assertRaises(UnicodeDecodeError):
882            b"abc\xed\xa0".decode(self.encoding, "surrogatepass")
883        with self.assertRaises(UnicodeDecodeError):
884            b"abc\xed\xa0z".decode(self.encoding, "surrogatepass")
885
886    def test_incremental_errors(self):
887        # Test that the incremental decoder can fail with final=False.
888        # See issue #24214
889        cases = [b'\x80', b'\xBF', b'\xC0', b'\xC1', b'\xF5', b'\xF6', b'\xFF']
890        for prefix in (b'\xC2', b'\xDF', b'\xE0', b'\xE0\xA0', b'\xEF',
891                       b'\xEF\xBF', b'\xF0', b'\xF0\x90', b'\xF0\x90\x80',
892                       b'\xF4', b'\xF4\x8F', b'\xF4\x8F\xBF'):
893            for suffix in b'\x7F', b'\xC0':
894                cases.append(prefix + suffix)
895        cases.extend((b'\xE0\x80', b'\xE0\x9F', b'\xED\xA0\x80',
896                      b'\xED\xBF\xBF', b'\xF0\x80', b'\xF0\x8F', b'\xF4\x90'))
897
898        for data in cases:
899            with self.subTest(data=data):
900                dec = codecs.getincrementaldecoder(self.encoding)()
901                self.assertRaises(UnicodeDecodeError, dec.decode, data)
902
903
904class UTF7Test(ReadTest, unittest.TestCase):
905    encoding = "utf-7"
906
907    def test_ascii(self):
908        # Set D (directly encoded characters)
909        set_d = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
910                 'abcdefghijklmnopqrstuvwxyz'
911                 '0123456789'
912                 '\'(),-./:?')
913        self.assertEqual(set_d.encode(self.encoding), set_d.encode('ascii'))
914        self.assertEqual(set_d.encode('ascii').decode(self.encoding), set_d)
915        # Set O (optional direct characters)
916        set_o = ' !"#$%&*;<=>@[]^_`{|}'
917        self.assertEqual(set_o.encode(self.encoding), set_o.encode('ascii'))
918        self.assertEqual(set_o.encode('ascii').decode(self.encoding), set_o)
919        # +
920        self.assertEqual('a+b'.encode(self.encoding), b'a+-b')
921        self.assertEqual(b'a+-b'.decode(self.encoding), 'a+b')
922        # White spaces
923        ws = ' \t\n\r'
924        self.assertEqual(ws.encode(self.encoding), ws.encode('ascii'))
925        self.assertEqual(ws.encode('ascii').decode(self.encoding), ws)
926        # Other ASCII characters
927        other_ascii = ''.join(sorted(set(bytes(range(0x80)).decode()) -
928                                     set(set_d + set_o + '+' + ws)))
929        self.assertEqual(other_ascii.encode(self.encoding),
930                         b'+AAAAAQACAAMABAAFAAYABwAIAAsADAAOAA8AEAARABIAEwAU'
931                         b'ABUAFgAXABgAGQAaABsAHAAdAB4AHwBcAH4Afw-')
932
933    def test_partial(self):
934        self.check_partial(
935            'a+-b\x00c\x80d\u0100e\U00010000f',
936            [
937                'a',
938                'a',
939                'a+',
940                'a+-',
941                'a+-b',
942                'a+-b',
943                'a+-b',
944                'a+-b',
945                'a+-b',
946                'a+-b\x00',
947                'a+-b\x00c',
948                'a+-b\x00c',
949                'a+-b\x00c',
950                'a+-b\x00c',
951                'a+-b\x00c',
952                'a+-b\x00c\x80',
953                'a+-b\x00c\x80d',
954                'a+-b\x00c\x80d',
955                'a+-b\x00c\x80d',
956                'a+-b\x00c\x80d',
957                'a+-b\x00c\x80d',
958                'a+-b\x00c\x80d\u0100',
959                'a+-b\x00c\x80d\u0100e',
960                'a+-b\x00c\x80d\u0100e',
961                'a+-b\x00c\x80d\u0100e',
962                'a+-b\x00c\x80d\u0100e',
963                'a+-b\x00c\x80d\u0100e',
964                'a+-b\x00c\x80d\u0100e',
965                'a+-b\x00c\x80d\u0100e',
966                'a+-b\x00c\x80d\u0100e',
967                'a+-b\x00c\x80d\u0100e\U00010000',
968                'a+-b\x00c\x80d\u0100e\U00010000f',
969            ]
970        )
971
972    def test_errors(self):
973        tests = [
974            (b'\xffb', '\ufffdb'),
975            (b'a\xffb', 'a\ufffdb'),
976            (b'a\xff\xffb', 'a\ufffd\ufffdb'),
977            (b'a+IK', 'a\ufffd'),
978            (b'a+IK-b', 'a\ufffdb'),
979            (b'a+IK,b', 'a\ufffdb'),
980            (b'a+IKx', 'a\u20ac\ufffd'),
981            (b'a+IKx-b', 'a\u20ac\ufffdb'),
982            (b'a+IKwgr', 'a\u20ac\ufffd'),
983            (b'a+IKwgr-b', 'a\u20ac\ufffdb'),
984            (b'a+IKwgr,', 'a\u20ac\ufffd'),
985            (b'a+IKwgr,-b', 'a\u20ac\ufffd-b'),
986            (b'a+IKwgrB', 'a\u20ac\u20ac\ufffd'),
987            (b'a+IKwgrB-b', 'a\u20ac\u20ac\ufffdb'),
988            (b'a+/,+IKw-b', 'a\ufffd\u20acb'),
989            (b'a+//,+IKw-b', 'a\ufffd\u20acb'),
990            (b'a+///,+IKw-b', 'a\uffff\ufffd\u20acb'),
991            (b'a+////,+IKw-b', 'a\uffff\ufffd\u20acb'),
992            (b'a+IKw-b\xff', 'a\u20acb\ufffd'),
993            (b'a+IKw\xffb', 'a\u20ac\ufffdb'),
994            (b'a+@b', 'a\ufffdb'),
995        ]
996        for raw, expected in tests:
997            with self.subTest(raw=raw):
998                self.assertRaises(UnicodeDecodeError, codecs.utf_7_decode,
999                                raw, 'strict', True)
1000                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1001
1002    def test_nonbmp(self):
1003        self.assertEqual('\U000104A0'.encode(self.encoding), b'+2AHcoA-')
1004        self.assertEqual('\ud801\udca0'.encode(self.encoding), b'+2AHcoA-')
1005        self.assertEqual(b'+2AHcoA-'.decode(self.encoding), '\U000104A0')
1006        self.assertEqual(b'+2AHcoA'.decode(self.encoding), '\U000104A0')
1007        self.assertEqual('\u20ac\U000104A0'.encode(self.encoding), b'+IKzYAdyg-')
1008        self.assertEqual(b'+IKzYAdyg-'.decode(self.encoding), '\u20ac\U000104A0')
1009        self.assertEqual(b'+IKzYAdyg'.decode(self.encoding), '\u20ac\U000104A0')
1010        self.assertEqual('\u20ac\u20ac\U000104A0'.encode(self.encoding),
1011                         b'+IKwgrNgB3KA-')
1012        self.assertEqual(b'+IKwgrNgB3KA-'.decode(self.encoding),
1013                         '\u20ac\u20ac\U000104A0')
1014        self.assertEqual(b'+IKwgrNgB3KA'.decode(self.encoding),
1015                         '\u20ac\u20ac\U000104A0')
1016
1017    def test_lone_surrogates(self):
1018        tests = [
1019            (b'a+2AE-b', 'a\ud801b'),
1020            (b'a+2AE\xffb', 'a\ufffdb'),
1021            (b'a+2AE', 'a\ufffd'),
1022            (b'a+2AEA-b', 'a\ufffdb'),
1023            (b'a+2AH-b', 'a\ufffdb'),
1024            (b'a+IKzYAQ-b', 'a\u20ac\ud801b'),
1025            (b'a+IKzYAQ\xffb', 'a\u20ac\ufffdb'),
1026            (b'a+IKzYAQA-b', 'a\u20ac\ufffdb'),
1027            (b'a+IKzYAd-b', 'a\u20ac\ufffdb'),
1028            (b'a+IKwgrNgB-b', 'a\u20ac\u20ac\ud801b'),
1029            (b'a+IKwgrNgB\xffb', 'a\u20ac\u20ac\ufffdb'),
1030            (b'a+IKwgrNgB', 'a\u20ac\u20ac\ufffd'),
1031            (b'a+IKwgrNgBA-b', 'a\u20ac\u20ac\ufffdb'),
1032        ]
1033        for raw, expected in tests:
1034            with self.subTest(raw=raw):
1035                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1036
1037
1038class UTF16ExTest(unittest.TestCase):
1039
1040    def test_errors(self):
1041        self.assertRaises(UnicodeDecodeError, codecs.utf_16_ex_decode, b"\xff", "strict", 0, True)
1042
1043    def test_bad_args(self):
1044        self.assertRaises(TypeError, codecs.utf_16_ex_decode)
1045
1046class ReadBufferTest(unittest.TestCase):
1047
1048    def test_array(self):
1049        import array
1050        self.assertEqual(
1051            codecs.readbuffer_encode(array.array("b", b"spam")),
1052            (b"spam", 4)
1053        )
1054
1055    def test_empty(self):
1056        self.assertEqual(codecs.readbuffer_encode(""), (b"", 0))
1057
1058    def test_bad_args(self):
1059        self.assertRaises(TypeError, codecs.readbuffer_encode)
1060        self.assertRaises(TypeError, codecs.readbuffer_encode, 42)
1061
1062class UTF8SigTest(UTF8Test, unittest.TestCase):
1063    encoding = "utf-8-sig"
1064    BOM = codecs.BOM_UTF8
1065
1066    def test_partial(self):
1067        self.check_partial(
1068            "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1069            [
1070                "",
1071                "",
1072                "", # First BOM has been read and skipped
1073                "",
1074                "",
1075                "\ufeff", # Second BOM has been read and emitted
1076                "\ufeff\x00", # "\x00" read and emitted
1077                "\ufeff\x00", # First byte of encoded "\xff" read
1078                "\ufeff\x00\xff", # Second byte of encoded "\xff" read
1079                "\ufeff\x00\xff", # First byte of encoded "\u07ff" read
1080                "\ufeff\x00\xff\u07ff", # Second byte of encoded "\u07ff" read
1081                "\ufeff\x00\xff\u07ff",
1082                "\ufeff\x00\xff\u07ff",
1083                "\ufeff\x00\xff\u07ff\u0800",
1084                "\ufeff\x00\xff\u07ff\u0800",
1085                "\ufeff\x00\xff\u07ff\u0800",
1086                "\ufeff\x00\xff\u07ff\u0800\uffff",
1087                "\ufeff\x00\xff\u07ff\u0800\uffff",
1088                "\ufeff\x00\xff\u07ff\u0800\uffff",
1089                "\ufeff\x00\xff\u07ff\u0800\uffff",
1090                "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1091            ]
1092        )
1093
1094    def test_bug1601501(self):
1095        # SF bug #1601501: check that the codec works with a buffer
1096        self.assertEqual(str(b"\xef\xbb\xbf", "utf-8-sig"), "")
1097
1098    def test_bom(self):
1099        d = codecs.getincrementaldecoder("utf-8-sig")()
1100        s = "spam"
1101        self.assertEqual(d.decode(s.encode("utf-8-sig")), s)
1102
1103    def test_stream_bom(self):
1104        unistring = "ABC\u00A1\u2200XYZ"
1105        bytestring = codecs.BOM_UTF8 + b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1106
1107        reader = codecs.getreader("utf-8-sig")
1108        for sizehint in [None] + list(range(1, 11)) + \
1109                        [64, 128, 256, 512, 1024]:
1110            istream = reader(io.BytesIO(bytestring))
1111            ostream = io.StringIO()
1112            while 1:
1113                if sizehint is not None:
1114                    data = istream.read(sizehint)
1115                else:
1116                    data = istream.read()
1117
1118                if not data:
1119                    break
1120                ostream.write(data)
1121
1122            got = ostream.getvalue()
1123            self.assertEqual(got, unistring)
1124
1125    def test_stream_bare(self):
1126        unistring = "ABC\u00A1\u2200XYZ"
1127        bytestring = b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1128
1129        reader = codecs.getreader("utf-8-sig")
1130        for sizehint in [None] + list(range(1, 11)) + \
1131                        [64, 128, 256, 512, 1024]:
1132            istream = reader(io.BytesIO(bytestring))
1133            ostream = io.StringIO()
1134            while 1:
1135                if sizehint is not None:
1136                    data = istream.read(sizehint)
1137                else:
1138                    data = istream.read()
1139
1140                if not data:
1141                    break
1142                ostream.write(data)
1143
1144            got = ostream.getvalue()
1145            self.assertEqual(got, unistring)
1146
1147
1148class EscapeDecodeTest(unittest.TestCase):
1149    def test_empty(self):
1150        self.assertEqual(codecs.escape_decode(b""), (b"", 0))
1151        self.assertEqual(codecs.escape_decode(bytearray()), (b"", 0))
1152
1153    def test_raw(self):
1154        decode = codecs.escape_decode
1155        for b in range(256):
1156            b = bytes([b])
1157            if b != b'\\':
1158                self.assertEqual(decode(b + b'0'), (b + b'0', 2))
1159
1160    def test_escape(self):
1161        decode = codecs.escape_decode
1162        check = coding_checker(self, decode)
1163        check(b"[\\\n]", b"[]")
1164        check(br'[\"]', b'["]')
1165        check(br"[\']", b"[']")
1166        check(br"[\\]", b"[\\]")
1167        check(br"[\a]", b"[\x07]")
1168        check(br"[\b]", b"[\x08]")
1169        check(br"[\t]", b"[\x09]")
1170        check(br"[\n]", b"[\x0a]")
1171        check(br"[\v]", b"[\x0b]")
1172        check(br"[\f]", b"[\x0c]")
1173        check(br"[\r]", b"[\x0d]")
1174        check(br"[\7]", b"[\x07]")
1175        check(br"[\78]", b"[\x078]")
1176        check(br"[\41]", b"[!]")
1177        check(br"[\418]", b"[!8]")
1178        check(br"[\101]", b"[A]")
1179        check(br"[\1010]", b"[A0]")
1180        check(br"[\501]", b"[A]")
1181        check(br"[\x41]", b"[A]")
1182        check(br"[\x410]", b"[A0]")
1183        for i in range(97, 123):
1184            b = bytes([i])
1185            if b not in b'abfnrtvx':
1186                with self.assertWarns(DeprecationWarning):
1187                    check(b"\\" + b, b"\\" + b)
1188            with self.assertWarns(DeprecationWarning):
1189                check(b"\\" + b.upper(), b"\\" + b.upper())
1190        with self.assertWarns(DeprecationWarning):
1191            check(br"\8", b"\\8")
1192        with self.assertWarns(DeprecationWarning):
1193            check(br"\9", b"\\9")
1194        with self.assertWarns(DeprecationWarning):
1195            check(b"\\\xfa", b"\\\xfa")
1196
1197    def test_errors(self):
1198        decode = codecs.escape_decode
1199        self.assertRaises(ValueError, decode, br"\x")
1200        self.assertRaises(ValueError, decode, br"[\x]")
1201        self.assertEqual(decode(br"[\x]\x", "ignore"), (b"[]", 6))
1202        self.assertEqual(decode(br"[\x]\x", "replace"), (b"[?]?", 6))
1203        self.assertRaises(ValueError, decode, br"\x0")
1204        self.assertRaises(ValueError, decode, br"[\x0]")
1205        self.assertEqual(decode(br"[\x0]\x0", "ignore"), (b"[]", 8))
1206        self.assertEqual(decode(br"[\x0]\x0", "replace"), (b"[?]?", 8))
1207
1208
1209# From RFC 3492
1210punycode_testcases = [
1211    # A Arabic (Egyptian):
1212    ("\u0644\u064A\u0647\u0645\u0627\u0628\u062A\u0643\u0644"
1213     "\u0645\u0648\u0634\u0639\u0631\u0628\u064A\u061F",
1214     b"egbpdaj6bu4bxfgehfvwxn"),
1215    # B Chinese (simplified):
1216    ("\u4ED6\u4EEC\u4E3A\u4EC0\u4E48\u4E0D\u8BF4\u4E2D\u6587",
1217     b"ihqwcrb4cv8a8dqg056pqjye"),
1218    # C Chinese (traditional):
1219    ("\u4ED6\u5011\u7232\u4EC0\u9EBD\u4E0D\u8AAA\u4E2D\u6587",
1220     b"ihqwctvzc91f659drss3x8bo0yb"),
1221    # D Czech: Pro<ccaron>prost<ecaron>nemluv<iacute><ccaron>esky
1222    ("\u0050\u0072\u006F\u010D\u0070\u0072\u006F\u0073\u0074"
1223     "\u011B\u006E\u0065\u006D\u006C\u0075\u0076\u00ED\u010D"
1224     "\u0065\u0073\u006B\u0079",
1225     b"Proprostnemluvesky-uyb24dma41a"),
1226    # E Hebrew:
1227    ("\u05DC\u05DE\u05D4\u05D4\u05DD\u05E4\u05E9\u05D5\u05D8"
1228     "\u05DC\u05D0\u05DE\u05D3\u05D1\u05E8\u05D9\u05DD\u05E2"
1229     "\u05D1\u05E8\u05D9\u05EA",
1230     b"4dbcagdahymbxekheh6e0a7fei0b"),
1231    # F Hindi (Devanagari):
1232    ("\u092F\u0939\u0932\u094B\u0917\u0939\u093F\u0928\u094D"
1233     "\u0926\u0940\u0915\u094D\u092F\u094B\u0902\u0928\u0939"
1234     "\u0940\u0902\u092C\u094B\u0932\u0938\u0915\u0924\u0947"
1235     "\u0939\u0948\u0902",
1236     b"i1baa7eci9glrd9b2ae1bj0hfcgg6iyaf8o0a1dig0cd"),
1237
1238    #(G) Japanese (kanji and hiragana):
1239    ("\u306A\u305C\u307F\u3093\u306A\u65E5\u672C\u8A9E\u3092"
1240     "\u8A71\u3057\u3066\u304F\u308C\u306A\u3044\u306E\u304B",
1241     b"n8jok5ay5dzabd5bym9f0cm5685rrjetr6pdxa"),
1242
1243    # (H) Korean (Hangul syllables):
1244    ("\uC138\uACC4\uC758\uBAA8\uB4E0\uC0AC\uB78C\uB4E4\uC774"
1245     "\uD55C\uAD6D\uC5B4\uB97C\uC774\uD574\uD55C\uB2E4\uBA74"
1246     "\uC5BC\uB9C8\uB098\uC88B\uC744\uAE4C",
1247     b"989aomsvi5e83db1d2a355cv1e0vak1dwrv93d5xbh15a0dt30a5j"
1248     b"psd879ccm6fea98c"),
1249
1250    # (I) Russian (Cyrillic):
1251    ("\u043F\u043E\u0447\u0435\u043C\u0443\u0436\u0435\u043E"
1252     "\u043D\u0438\u043D\u0435\u0433\u043E\u0432\u043E\u0440"
1253     "\u044F\u0442\u043F\u043E\u0440\u0443\u0441\u0441\u043A"
1254     "\u0438",
1255     b"b1abfaaepdrnnbgefbaDotcwatmq2g4l"),
1256
1257    # (J) Spanish: Porqu<eacute>nopuedensimplementehablarenEspa<ntilde>ol
1258    ("\u0050\u006F\u0072\u0071\u0075\u00E9\u006E\u006F\u0070"
1259     "\u0075\u0065\u0064\u0065\u006E\u0073\u0069\u006D\u0070"
1260     "\u006C\u0065\u006D\u0065\u006E\u0074\u0065\u0068\u0061"
1261     "\u0062\u006C\u0061\u0072\u0065\u006E\u0045\u0073\u0070"
1262     "\u0061\u00F1\u006F\u006C",
1263     b"PorqunopuedensimplementehablarenEspaol-fmd56a"),
1264
1265    # (K) Vietnamese:
1266    #  T<adotbelow>isaoh<odotbelow>kh<ocirc>ngth<ecirchookabove>ch\
1267    #   <ihookabove>n<oacute>iti<ecircacute>ngVi<ecircdotbelow>t
1268    ("\u0054\u1EA1\u0069\u0073\u0061\u006F\u0068\u1ECD\u006B"
1269     "\u0068\u00F4\u006E\u0067\u0074\u0068\u1EC3\u0063\u0068"
1270     "\u1EC9\u006E\u00F3\u0069\u0074\u0069\u1EBF\u006E\u0067"
1271     "\u0056\u0069\u1EC7\u0074",
1272     b"TisaohkhngthchnitingVit-kjcr8268qyxafd2f1b9g"),
1273
1274    #(L) 3<nen>B<gumi><kinpachi><sensei>
1275    ("\u0033\u5E74\u0042\u7D44\u91D1\u516B\u5148\u751F",
1276     b"3B-ww4c5e180e575a65lsy2b"),
1277
1278    # (M) <amuro><namie>-with-SUPER-MONKEYS
1279    ("\u5B89\u5BA4\u5948\u7F8E\u6075\u002D\u0077\u0069\u0074"
1280     "\u0068\u002D\u0053\u0055\u0050\u0045\u0052\u002D\u004D"
1281     "\u004F\u004E\u004B\u0045\u0059\u0053",
1282     b"-with-SUPER-MONKEYS-pc58ag80a8qai00g7n9n"),
1283
1284    # (N) Hello-Another-Way-<sorezore><no><basho>
1285    ("\u0048\u0065\u006C\u006C\u006F\u002D\u0041\u006E\u006F"
1286     "\u0074\u0068\u0065\u0072\u002D\u0057\u0061\u0079\u002D"
1287     "\u305D\u308C\u305E\u308C\u306E\u5834\u6240",
1288     b"Hello-Another-Way--fc4qua05auwb3674vfr0b"),
1289
1290    # (O) <hitotsu><yane><no><shita>2
1291    ("\u3072\u3068\u3064\u5C4B\u6839\u306E\u4E0B\u0032",
1292     b"2-u9tlzr9756bt3uc0v"),
1293
1294    # (P) Maji<de>Koi<suru>5<byou><mae>
1295    ("\u004D\u0061\u006A\u0069\u3067\u004B\u006F\u0069\u3059"
1296     "\u308B\u0035\u79D2\u524D",
1297     b"MajiKoi5-783gue6qz075azm5e"),
1298
1299     # (Q) <pafii>de<runba>
1300    ("\u30D1\u30D5\u30A3\u30FC\u0064\u0065\u30EB\u30F3\u30D0",
1301     b"de-jg4avhby1noc0d"),
1302
1303    # (R) <sono><supiido><de>
1304    ("\u305D\u306E\u30B9\u30D4\u30FC\u30C9\u3067",
1305     b"d9juau41awczczp"),
1306
1307    # (S) -> $1.00 <-
1308    ("\u002D\u003E\u0020\u0024\u0031\u002E\u0030\u0030\u0020"
1309     "\u003C\u002D",
1310     b"-> $1.00 <--")
1311    ]
1312
1313for i in punycode_testcases:
1314    if len(i)!=2:
1315        print(repr(i))
1316
1317
1318class PunycodeTest(unittest.TestCase):
1319    def test_encode(self):
1320        for uni, puny in punycode_testcases:
1321            # Need to convert both strings to lower case, since
1322            # some of the extended encodings use upper case, but our
1323            # code produces only lower case. Converting just puny to
1324            # lower is also insufficient, since some of the input characters
1325            # are upper case.
1326            self.assertEqual(
1327                str(uni.encode("punycode"), "ascii").lower(),
1328                str(puny, "ascii").lower()
1329            )
1330
1331    def test_decode(self):
1332        for uni, puny in punycode_testcases:
1333            self.assertEqual(uni, puny.decode("punycode"))
1334            puny = puny.decode("ascii").encode("ascii")
1335            self.assertEqual(uni, puny.decode("punycode"))
1336
1337    def test_decode_invalid(self):
1338        testcases = [
1339            (b"xn--w&", "strict", UnicodeError()),
1340            (b"xn--w&", "ignore", "xn-"),
1341        ]
1342        for puny, errors, expected in testcases:
1343            with self.subTest(puny=puny, errors=errors):
1344                if isinstance(expected, Exception):
1345                    self.assertRaises(UnicodeError, puny.decode, "punycode", errors)
1346                else:
1347                    self.assertEqual(puny.decode("punycode", errors), expected)
1348
1349
1350# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
1351nameprep_tests = [
1352    # 3.1 Map to nothing.
1353    (b'foo\xc2\xad\xcd\x8f\xe1\xa0\x86\xe1\xa0\x8bbar'
1354     b'\xe2\x80\x8b\xe2\x81\xa0baz\xef\xb8\x80\xef\xb8\x88\xef'
1355     b'\xb8\x8f\xef\xbb\xbf',
1356     b'foobarbaz'),
1357    # 3.2 Case folding ASCII U+0043 U+0041 U+0046 U+0045.
1358    (b'CAFE',
1359     b'cafe'),
1360    # 3.3 Case folding 8bit U+00DF (german sharp s).
1361    # The original test case is bogus; it says \xc3\xdf
1362    (b'\xc3\x9f',
1363     b'ss'),
1364    # 3.4 Case folding U+0130 (turkish capital I with dot).
1365    (b'\xc4\xb0',
1366     b'i\xcc\x87'),
1367    # 3.5 Case folding multibyte U+0143 U+037A.
1368    (b'\xc5\x83\xcd\xba',
1369     b'\xc5\x84 \xce\xb9'),
1370    # 3.6 Case folding U+2121 U+33C6 U+1D7BB.
1371    # XXX: skip this as it fails in UCS-2 mode
1372    #('\xe2\x84\xa1\xe3\x8f\x86\xf0\x9d\x9e\xbb',
1373    # 'telc\xe2\x88\x95kg\xcf\x83'),
1374    (None, None),
1375    # 3.7 Normalization of U+006a U+030c U+00A0 U+00AA.
1376    (b'j\xcc\x8c\xc2\xa0\xc2\xaa',
1377     b'\xc7\xb0 a'),
1378    # 3.8 Case folding U+1FB7 and normalization.
1379    (b'\xe1\xbe\xb7',
1380     b'\xe1\xbe\xb6\xce\xb9'),
1381    # 3.9 Self-reverting case folding U+01F0 and normalization.
1382    # The original test case is bogus, it says `\xc7\xf0'
1383    (b'\xc7\xb0',
1384     b'\xc7\xb0'),
1385    # 3.10 Self-reverting case folding U+0390 and normalization.
1386    (b'\xce\x90',
1387     b'\xce\x90'),
1388    # 3.11 Self-reverting case folding U+03B0 and normalization.
1389    (b'\xce\xb0',
1390     b'\xce\xb0'),
1391    # 3.12 Self-reverting case folding U+1E96 and normalization.
1392    (b'\xe1\xba\x96',
1393     b'\xe1\xba\x96'),
1394    # 3.13 Self-reverting case folding U+1F56 and normalization.
1395    (b'\xe1\xbd\x96',
1396     b'\xe1\xbd\x96'),
1397    # 3.14 ASCII space character U+0020.
1398    (b' ',
1399     b' '),
1400    # 3.15 Non-ASCII 8bit space character U+00A0.
1401    (b'\xc2\xa0',
1402     b' '),
1403    # 3.16 Non-ASCII multibyte space character U+1680.
1404    (b'\xe1\x9a\x80',
1405     None),
1406    # 3.17 Non-ASCII multibyte space character U+2000.
1407    (b'\xe2\x80\x80',
1408     b' '),
1409    # 3.18 Zero Width Space U+200b.
1410    (b'\xe2\x80\x8b',
1411     b''),
1412    # 3.19 Non-ASCII multibyte space character U+3000.
1413    (b'\xe3\x80\x80',
1414     b' '),
1415    # 3.20 ASCII control characters U+0010 U+007F.
1416    (b'\x10\x7f',
1417     b'\x10\x7f'),
1418    # 3.21 Non-ASCII 8bit control character U+0085.
1419    (b'\xc2\x85',
1420     None),
1421    # 3.22 Non-ASCII multibyte control character U+180E.
1422    (b'\xe1\xa0\x8e',
1423     None),
1424    # 3.23 Zero Width No-Break Space U+FEFF.
1425    (b'\xef\xbb\xbf',
1426     b''),
1427    # 3.24 Non-ASCII control character U+1D175.
1428    (b'\xf0\x9d\x85\xb5',
1429     None),
1430    # 3.25 Plane 0 private use character U+F123.
1431    (b'\xef\x84\xa3',
1432     None),
1433    # 3.26 Plane 15 private use character U+F1234.
1434    (b'\xf3\xb1\x88\xb4',
1435     None),
1436    # 3.27 Plane 16 private use character U+10F234.
1437    (b'\xf4\x8f\x88\xb4',
1438     None),
1439    # 3.28 Non-character code point U+8FFFE.
1440    (b'\xf2\x8f\xbf\xbe',
1441     None),
1442    # 3.29 Non-character code point U+10FFFF.
1443    (b'\xf4\x8f\xbf\xbf',
1444     None),
1445    # 3.30 Surrogate code U+DF42.
1446    (b'\xed\xbd\x82',
1447     None),
1448    # 3.31 Non-plain text character U+FFFD.
1449    (b'\xef\xbf\xbd',
1450     None),
1451    # 3.32 Ideographic description character U+2FF5.
1452    (b'\xe2\xbf\xb5',
1453     None),
1454    # 3.33 Display property character U+0341.
1455    (b'\xcd\x81',
1456     b'\xcc\x81'),
1457    # 3.34 Left-to-right mark U+200E.
1458    (b'\xe2\x80\x8e',
1459     None),
1460    # 3.35 Deprecated U+202A.
1461    (b'\xe2\x80\xaa',
1462     None),
1463    # 3.36 Language tagging character U+E0001.
1464    (b'\xf3\xa0\x80\x81',
1465     None),
1466    # 3.37 Language tagging character U+E0042.
1467    (b'\xf3\xa0\x81\x82',
1468     None),
1469    # 3.38 Bidi: RandALCat character U+05BE and LCat characters.
1470    (b'foo\xd6\xbebar',
1471     None),
1472    # 3.39 Bidi: RandALCat character U+FD50 and LCat characters.
1473    (b'foo\xef\xb5\x90bar',
1474     None),
1475    # 3.40 Bidi: RandALCat character U+FB38 and LCat characters.
1476    (b'foo\xef\xb9\xb6bar',
1477     b'foo \xd9\x8ebar'),
1478    # 3.41 Bidi: RandALCat without trailing RandALCat U+0627 U+0031.
1479    (b'\xd8\xa71',
1480     None),
1481    # 3.42 Bidi: RandALCat character U+0627 U+0031 U+0628.
1482    (b'\xd8\xa71\xd8\xa8',
1483     b'\xd8\xa71\xd8\xa8'),
1484    # 3.43 Unassigned code point U+E0002.
1485    # Skip this test as we allow unassigned
1486    #(b'\xf3\xa0\x80\x82',
1487    # None),
1488    (None, None),
1489    # 3.44 Larger test (shrinking).
1490    # Original test case reads \xc3\xdf
1491    (b'X\xc2\xad\xc3\x9f\xc4\xb0\xe2\x84\xa1j\xcc\x8c\xc2\xa0\xc2'
1492     b'\xaa\xce\xb0\xe2\x80\x80',
1493     b'xssi\xcc\x87tel\xc7\xb0 a\xce\xb0 '),
1494    # 3.45 Larger test (expanding).
1495    # Original test case reads \xc3\x9f
1496    (b'X\xc3\x9f\xe3\x8c\x96\xc4\xb0\xe2\x84\xa1\xe2\x92\x9f\xe3\x8c'
1497     b'\x80',
1498     b'xss\xe3\x82\xad\xe3\x83\xad\xe3\x83\xa1\xe3\x83\xbc\xe3'
1499     b'\x83\x88\xe3\x83\xabi\xcc\x87tel\x28d\x29\xe3\x82'
1500     b'\xa2\xe3\x83\x91\xe3\x83\xbc\xe3\x83\x88')
1501    ]
1502
1503
1504class NameprepTest(unittest.TestCase):
1505    def test_nameprep(self):
1506        from encodings.idna import nameprep
1507        for pos, (orig, prepped) in enumerate(nameprep_tests):
1508            if orig is None:
1509                # Skipped
1510                continue
1511            # The Unicode strings are given in UTF-8
1512            orig = str(orig, "utf-8", "surrogatepass")
1513            if prepped is None:
1514                # Input contains prohibited characters
1515                self.assertRaises(UnicodeError, nameprep, orig)
1516            else:
1517                prepped = str(prepped, "utf-8", "surrogatepass")
1518                try:
1519                    self.assertEqual(nameprep(orig), prepped)
1520                except Exception as e:
1521                    raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
1522
1523
1524class IDNACodecTest(unittest.TestCase):
1525    def test_builtin_decode(self):
1526        self.assertEqual(str(b"python.org", "idna"), "python.org")
1527        self.assertEqual(str(b"python.org.", "idna"), "python.org.")
1528        self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org")
1529        self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
1530
1531    def test_builtin_encode(self):
1532        self.assertEqual("python.org".encode("idna"), b"python.org")
1533        self.assertEqual("python.org.".encode("idna"), b"python.org.")
1534        self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
1535        self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
1536
1537    def test_stream(self):
1538        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
1539        r.read(3)
1540        self.assertEqual(r.read(), "")
1541
1542    def test_incremental_decode(self):
1543        self.assertEqual(
1544            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
1545            "python.org"
1546        )
1547        self.assertEqual(
1548            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")),
1549            "python.org."
1550        )
1551        self.assertEqual(
1552            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1553            "pyth\xf6n.org."
1554        )
1555        self.assertEqual(
1556            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1557            "pyth\xf6n.org."
1558        )
1559
1560        decoder = codecs.getincrementaldecoder("idna")()
1561        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1562        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1563        self.assertEqual(decoder.decode(b"rg"), "")
1564        self.assertEqual(decoder.decode(b"", True), "org")
1565
1566        decoder.reset()
1567        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1568        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1569        self.assertEqual(decoder.decode(b"rg."), "org.")
1570        self.assertEqual(decoder.decode(b"", True), "")
1571
1572    def test_incremental_encode(self):
1573        self.assertEqual(
1574            b"".join(codecs.iterencode("python.org", "idna")),
1575            b"python.org"
1576        )
1577        self.assertEqual(
1578            b"".join(codecs.iterencode("python.org.", "idna")),
1579            b"python.org."
1580        )
1581        self.assertEqual(
1582            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1583            b"xn--pythn-mua.org."
1584        )
1585        self.assertEqual(
1586            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1587            b"xn--pythn-mua.org."
1588        )
1589
1590        encoder = codecs.getincrementalencoder("idna")()
1591        self.assertEqual(encoder.encode("\xe4x"), b"")
1592        self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.")
1593        self.assertEqual(encoder.encode("", True), b"org")
1594
1595        encoder.reset()
1596        self.assertEqual(encoder.encode("\xe4x"), b"")
1597        self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.")
1598        self.assertEqual(encoder.encode("", True), b"")
1599
1600    def test_errors(self):
1601        """Only supports "strict" error handler"""
1602        "python.org".encode("idna", "strict")
1603        b"python.org".decode("idna", "strict")
1604        for errors in ("ignore", "replace", "backslashreplace",
1605                "surrogateescape"):
1606            self.assertRaises(Exception, "python.org".encode, "idna", errors)
1607            self.assertRaises(Exception,
1608                b"python.org".decode, "idna", errors)
1609
1610
1611class CodecsModuleTest(unittest.TestCase):
1612
1613    def test_decode(self):
1614        self.assertEqual(codecs.decode(b'\xe4\xf6\xfc', 'latin-1'),
1615                         '\xe4\xf6\xfc')
1616        self.assertRaises(TypeError, codecs.decode)
1617        self.assertEqual(codecs.decode(b'abc'), 'abc')
1618        self.assertRaises(UnicodeDecodeError, codecs.decode, b'\xff', 'ascii')
1619
1620        # test keywords
1621        self.assertEqual(codecs.decode(obj=b'\xe4\xf6\xfc', encoding='latin-1'),
1622                         '\xe4\xf6\xfc')
1623        self.assertEqual(codecs.decode(b'[\xff]', 'ascii', errors='ignore'),
1624                         '[]')
1625
1626    def test_encode(self):
1627        self.assertEqual(codecs.encode('\xe4\xf6\xfc', 'latin-1'),
1628                         b'\xe4\xf6\xfc')
1629        self.assertRaises(TypeError, codecs.encode)
1630        self.assertRaises(LookupError, codecs.encode, "foo", "__spam__")
1631        self.assertEqual(codecs.encode('abc'), b'abc')
1632        self.assertRaises(UnicodeEncodeError, codecs.encode, '\xffff', 'ascii')
1633
1634        # test keywords
1635        self.assertEqual(codecs.encode(obj='\xe4\xf6\xfc', encoding='latin-1'),
1636                         b'\xe4\xf6\xfc')
1637        self.assertEqual(codecs.encode('[\xff]', 'ascii', errors='ignore'),
1638                         b'[]')
1639
1640    def test_register(self):
1641        self.assertRaises(TypeError, codecs.register)
1642        self.assertRaises(TypeError, codecs.register, 42)
1643
1644    def test_unregister(self):
1645        name = "nonexistent_codec_name"
1646        search_function = mock.Mock()
1647        codecs.register(search_function)
1648        self.assertRaises(TypeError, codecs.lookup, name)
1649        search_function.assert_called_with(name)
1650        search_function.reset_mock()
1651
1652        codecs.unregister(search_function)
1653        self.assertRaises(LookupError, codecs.lookup, name)
1654        search_function.assert_not_called()
1655
1656    def test_lookup(self):
1657        self.assertRaises(TypeError, codecs.lookup)
1658        self.assertRaises(LookupError, codecs.lookup, "__spam__")
1659        self.assertRaises(LookupError, codecs.lookup, " ")
1660
1661    def test_getencoder(self):
1662        self.assertRaises(TypeError, codecs.getencoder)
1663        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
1664
1665    def test_getdecoder(self):
1666        self.assertRaises(TypeError, codecs.getdecoder)
1667        self.assertRaises(LookupError, codecs.getdecoder, "__spam__")
1668
1669    def test_getreader(self):
1670        self.assertRaises(TypeError, codecs.getreader)
1671        self.assertRaises(LookupError, codecs.getreader, "__spam__")
1672
1673    def test_getwriter(self):
1674        self.assertRaises(TypeError, codecs.getwriter)
1675        self.assertRaises(LookupError, codecs.getwriter, "__spam__")
1676
1677    def test_lookup_issue1813(self):
1678        # Issue #1813: under Turkish locales, lookup of some codecs failed
1679        # because 'I' is lowercased as "ı" (dotless i)
1680        oldlocale = locale.setlocale(locale.LC_CTYPE)
1681        self.addCleanup(locale.setlocale, locale.LC_CTYPE, oldlocale)
1682        try:
1683            locale.setlocale(locale.LC_CTYPE, 'tr_TR')
1684        except locale.Error:
1685            # Unsupported locale on this system
1686            self.skipTest('test needs Turkish locale')
1687        c = codecs.lookup('ASCII')
1688        self.assertEqual(c.name, 'ascii')
1689
1690    def test_all(self):
1691        api = (
1692            "encode", "decode",
1693            "register", "CodecInfo", "Codec", "IncrementalEncoder",
1694            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
1695            "getencoder", "getdecoder", "getincrementalencoder",
1696            "getincrementaldecoder", "getreader", "getwriter",
1697            "register_error", "lookup_error",
1698            "strict_errors", "replace_errors", "ignore_errors",
1699            "xmlcharrefreplace_errors", "backslashreplace_errors",
1700            "namereplace_errors",
1701            "open", "EncodedFile",
1702            "iterencode", "iterdecode",
1703            "BOM", "BOM_BE", "BOM_LE",
1704            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
1705            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
1706            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
1707            "StreamReaderWriter", "StreamRecoder",
1708        )
1709        self.assertCountEqual(api, codecs.__all__)
1710        for api in codecs.__all__:
1711            getattr(codecs, api)
1712
1713    def test_open(self):
1714        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1715        for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'):
1716            with self.subTest(mode), \
1717                    codecs.open(os_helper.TESTFN, mode, 'ascii') as file:
1718                self.assertIsInstance(file, codecs.StreamReaderWriter)
1719
1720    def test_undefined(self):
1721        self.assertRaises(UnicodeError, codecs.encode, 'abc', 'undefined')
1722        self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined')
1723        self.assertRaises(UnicodeError, codecs.encode, '', 'undefined')
1724        self.assertRaises(UnicodeError, codecs.decode, b'', 'undefined')
1725        for errors in ('strict', 'ignore', 'replace', 'backslashreplace'):
1726            self.assertRaises(UnicodeError,
1727                codecs.encode, 'abc', 'undefined', errors)
1728            self.assertRaises(UnicodeError,
1729                codecs.decode, b'abc', 'undefined', errors)
1730
1731    def test_file_closes_if_lookup_error_raised(self):
1732        mock_open = mock.mock_open()
1733        with mock.patch('builtins.open', mock_open) as file:
1734            with self.assertRaises(LookupError):
1735                codecs.open(os_helper.TESTFN, 'wt', 'invalid-encoding')
1736
1737            file().close.assert_called()
1738
1739
1740class StreamReaderTest(unittest.TestCase):
1741
1742    def setUp(self):
1743        self.reader = codecs.getreader('utf-8')
1744        self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1745
1746    def test_readlines(self):
1747        f = self.reader(self.stream)
1748        self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
1749
1750
1751class EncodedFileTest(unittest.TestCase):
1752
1753    def test_basic(self):
1754        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1755        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
1756        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
1757
1758        f = io.BytesIO()
1759        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
1760        ef.write(b'\xc3\xbc')
1761        self.assertEqual(f.getvalue(), b'\xfc')
1762
1763all_unicode_encodings = [
1764    "ascii",
1765    "big5",
1766    "big5hkscs",
1767    "charmap",
1768    "cp037",
1769    "cp1006",
1770    "cp1026",
1771    "cp1125",
1772    "cp1140",
1773    "cp1250",
1774    "cp1251",
1775    "cp1252",
1776    "cp1253",
1777    "cp1254",
1778    "cp1255",
1779    "cp1256",
1780    "cp1257",
1781    "cp1258",
1782    "cp424",
1783    "cp437",
1784    "cp500",
1785    "cp720",
1786    "cp737",
1787    "cp775",
1788    "cp850",
1789    "cp852",
1790    "cp855",
1791    "cp856",
1792    "cp857",
1793    "cp858",
1794    "cp860",
1795    "cp861",
1796    "cp862",
1797    "cp863",
1798    "cp864",
1799    "cp865",
1800    "cp866",
1801    "cp869",
1802    "cp874",
1803    "cp875",
1804    "cp932",
1805    "cp949",
1806    "cp950",
1807    "euc_jis_2004",
1808    "euc_jisx0213",
1809    "euc_jp",
1810    "euc_kr",
1811    "gb18030",
1812    "gb2312",
1813    "gbk",
1814    "hp_roman8",
1815    "hz",
1816    "idna",
1817    "iso2022_jp",
1818    "iso2022_jp_1",
1819    "iso2022_jp_2",
1820    "iso2022_jp_2004",
1821    "iso2022_jp_3",
1822    "iso2022_jp_ext",
1823    "iso2022_kr",
1824    "iso8859_1",
1825    "iso8859_10",
1826    "iso8859_11",
1827    "iso8859_13",
1828    "iso8859_14",
1829    "iso8859_15",
1830    "iso8859_16",
1831    "iso8859_2",
1832    "iso8859_3",
1833    "iso8859_4",
1834    "iso8859_5",
1835    "iso8859_6",
1836    "iso8859_7",
1837    "iso8859_8",
1838    "iso8859_9",
1839    "johab",
1840    "koi8_r",
1841    "koi8_t",
1842    "koi8_u",
1843    "kz1048",
1844    "latin_1",
1845    "mac_cyrillic",
1846    "mac_greek",
1847    "mac_iceland",
1848    "mac_latin2",
1849    "mac_roman",
1850    "mac_turkish",
1851    "palmos",
1852    "ptcp154",
1853    "punycode",
1854    "raw_unicode_escape",
1855    "shift_jis",
1856    "shift_jis_2004",
1857    "shift_jisx0213",
1858    "tis_620",
1859    "unicode_escape",
1860    "utf_16",
1861    "utf_16_be",
1862    "utf_16_le",
1863    "utf_7",
1864    "utf_8",
1865]
1866
1867if hasattr(codecs, "mbcs_encode"):
1868    all_unicode_encodings.append("mbcs")
1869if hasattr(codecs, "oem_encode"):
1870    all_unicode_encodings.append("oem")
1871
1872# The following encoding is not tested, because it's not supposed
1873# to work:
1874#    "undefined"
1875
1876# The following encodings don't work in stateful mode
1877broken_unicode_with_stateful = [
1878    "punycode",
1879]
1880
1881
1882class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
1883    def test_basics(self):
1884        s = "abc123"  # all codecs should be able to encode these
1885        for encoding in all_unicode_encodings:
1886            name = codecs.lookup(encoding).name
1887            if encoding.endswith("_codec"):
1888                name += "_codec"
1889            elif encoding == "latin_1":
1890                name = "latin_1"
1891            self.assertEqual(encoding.replace("_", "-"), name.replace("_", "-"))
1892
1893            (b, size) = codecs.getencoder(encoding)(s)
1894            self.assertEqual(size, len(s), "encoding=%r" % encoding)
1895            (chars, size) = codecs.getdecoder(encoding)(b)
1896            self.assertEqual(chars, s, "encoding=%r" % encoding)
1897
1898            if encoding not in broken_unicode_with_stateful:
1899                # check stream reader/writer
1900                q = Queue(b"")
1901                writer = codecs.getwriter(encoding)(q)
1902                encodedresult = b""
1903                for c in s:
1904                    writer.write(c)
1905                    chunk = q.read()
1906                    self.assertTrue(type(chunk) is bytes, type(chunk))
1907                    encodedresult += chunk
1908                q = Queue(b"")
1909                reader = codecs.getreader(encoding)(q)
1910                decodedresult = ""
1911                for c in encodedresult:
1912                    q.write(bytes([c]))
1913                    decodedresult += reader.read()
1914                self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
1915
1916            if encoding not in broken_unicode_with_stateful:
1917                # check incremental decoder/encoder and iterencode()/iterdecode()
1918                try:
1919                    encoder = codecs.getincrementalencoder(encoding)()
1920                except LookupError:  # no IncrementalEncoder
1921                    pass
1922                else:
1923                    # check incremental decoder/encoder
1924                    encodedresult = b""
1925                    for c in s:
1926                        encodedresult += encoder.encode(c)
1927                    encodedresult += encoder.encode("", True)
1928                    decoder = codecs.getincrementaldecoder(encoding)()
1929                    decodedresult = ""
1930                    for c in encodedresult:
1931                        decodedresult += decoder.decode(bytes([c]))
1932                    decodedresult += decoder.decode(b"", True)
1933                    self.assertEqual(decodedresult, s,
1934                                     "encoding=%r" % encoding)
1935
1936                    # check iterencode()/iterdecode()
1937                    result = "".join(codecs.iterdecode(
1938                            codecs.iterencode(s, encoding), encoding))
1939                    self.assertEqual(result, s, "encoding=%r" % encoding)
1940
1941                    # check iterencode()/iterdecode() with empty string
1942                    result = "".join(codecs.iterdecode(
1943                            codecs.iterencode("", encoding), encoding))
1944                    self.assertEqual(result, "")
1945
1946                if encoding not in ("idna", "mbcs"):
1947                    # check incremental decoder/encoder with errors argument
1948                    try:
1949                        encoder = codecs.getincrementalencoder(encoding)("ignore")
1950                    except LookupError:  # no IncrementalEncoder
1951                        pass
1952                    else:
1953                        encodedresult = b"".join(encoder.encode(c) for c in s)
1954                        decoder = codecs.getincrementaldecoder(encoding)("ignore")
1955                        decodedresult = "".join(decoder.decode(bytes([c]))
1956                                                for c in encodedresult)
1957                        self.assertEqual(decodedresult, s,
1958                                         "encoding=%r" % encoding)
1959
1960    @support.cpython_only
1961    def test_basics_capi(self):
1962        s = "abc123"  # all codecs should be able to encode these
1963        for encoding in all_unicode_encodings:
1964            if encoding not in broken_unicode_with_stateful:
1965                # check incremental decoder/encoder (fetched via the C API)
1966                try:
1967                    cencoder = _testcapi.codec_incrementalencoder(encoding)
1968                except LookupError:  # no IncrementalEncoder
1969                    pass
1970                else:
1971                    # check C API
1972                    encodedresult = b""
1973                    for c in s:
1974                        encodedresult += cencoder.encode(c)
1975                    encodedresult += cencoder.encode("", True)
1976                    cdecoder = _testcapi.codec_incrementaldecoder(encoding)
1977                    decodedresult = ""
1978                    for c in encodedresult:
1979                        decodedresult += cdecoder.decode(bytes([c]))
1980                    decodedresult += cdecoder.decode(b"", True)
1981                    self.assertEqual(decodedresult, s,
1982                                     "encoding=%r" % encoding)
1983
1984                if encoding not in ("idna", "mbcs"):
1985                    # check incremental decoder/encoder with errors argument
1986                    try:
1987                        cencoder = _testcapi.codec_incrementalencoder(encoding, "ignore")
1988                    except LookupError:  # no IncrementalEncoder
1989                        pass
1990                    else:
1991                        encodedresult = b"".join(cencoder.encode(c) for c in s)
1992                        cdecoder = _testcapi.codec_incrementaldecoder(encoding, "ignore")
1993                        decodedresult = "".join(cdecoder.decode(bytes([c]))
1994                                                for c in encodedresult)
1995                        self.assertEqual(decodedresult, s,
1996                                         "encoding=%r" % encoding)
1997
1998    def test_seek(self):
1999        # all codecs should be able to encode these
2000        s = "%s\n%s\n" % (100*"abc123", 100*"def456")
2001        for encoding in all_unicode_encodings:
2002            if encoding == "idna": # FIXME: See SF bug #1163178
2003                continue
2004            if encoding in broken_unicode_with_stateful:
2005                continue
2006            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
2007            for t in range(5):
2008                # Test that calling seek resets the internal codec state and buffers
2009                reader.seek(0, 0)
2010                data = reader.read()
2011                self.assertEqual(s, data)
2012
2013    def test_bad_decode_args(self):
2014        for encoding in all_unicode_encodings:
2015            decoder = codecs.getdecoder(encoding)
2016            self.assertRaises(TypeError, decoder)
2017            if encoding not in ("idna", "punycode"):
2018                self.assertRaises(TypeError, decoder, 42)
2019
2020    def test_bad_encode_args(self):
2021        for encoding in all_unicode_encodings:
2022            encoder = codecs.getencoder(encoding)
2023            self.assertRaises(TypeError, encoder)
2024
2025    def test_encoding_map_type_initialized(self):
2026        from encodings import cp1140
2027        # This used to crash, we are only verifying there's no crash.
2028        table_type = type(cp1140.encoding_table)
2029        self.assertEqual(table_type, table_type)
2030
2031    def test_decoder_state(self):
2032        # Check that getstate() and setstate() handle the state properly
2033        u = "abc123"
2034        for encoding in all_unicode_encodings:
2035            if encoding not in broken_unicode_with_stateful:
2036                self.check_state_handling_decode(encoding, u, u.encode(encoding))
2037                self.check_state_handling_encode(encoding, u, u.encode(encoding))
2038
2039
2040class CharmapTest(unittest.TestCase):
2041    def test_decode_with_string_map(self):
2042        self.assertEqual(
2043            codecs.charmap_decode(b"\x00\x01\x02", "strict", "abc"),
2044            ("abc", 3)
2045        )
2046
2047        self.assertEqual(
2048            codecs.charmap_decode(b"\x00\x01\x02", "strict", "\U0010FFFFbc"),
2049            ("\U0010FFFFbc", 3)
2050        )
2051
2052        self.assertRaises(UnicodeDecodeError,
2053            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab"
2054        )
2055
2056        self.assertRaises(UnicodeDecodeError,
2057            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab\ufffe"
2058        )
2059
2060        self.assertEqual(
2061            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab"),
2062            ("ab\ufffd", 3)
2063        )
2064
2065        self.assertEqual(
2066            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab\ufffe"),
2067            ("ab\ufffd", 3)
2068        )
2069
2070        self.assertEqual(
2071            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab"),
2072            ("ab\\x02", 3)
2073        )
2074
2075        self.assertEqual(
2076            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab\ufffe"),
2077            ("ab\\x02", 3)
2078        )
2079
2080        self.assertEqual(
2081            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab"),
2082            ("ab", 3)
2083        )
2084
2085        self.assertEqual(
2086            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab\ufffe"),
2087            ("ab", 3)
2088        )
2089
2090        allbytes = bytes(range(256))
2091        self.assertEqual(
2092            codecs.charmap_decode(allbytes, "ignore", ""),
2093            ("", len(allbytes))
2094        )
2095
2096    def test_decode_with_int2str_map(self):
2097        self.assertEqual(
2098            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2099                                  {0: 'a', 1: 'b', 2: 'c'}),
2100            ("abc", 3)
2101        )
2102
2103        self.assertEqual(
2104            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2105                                  {0: 'Aa', 1: 'Bb', 2: 'Cc'}),
2106            ("AaBbCc", 3)
2107        )
2108
2109        self.assertEqual(
2110            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2111                                  {0: '\U0010FFFF', 1: 'b', 2: 'c'}),
2112            ("\U0010FFFFbc", 3)
2113        )
2114
2115        self.assertEqual(
2116            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2117                                  {0: 'a', 1: 'b', 2: ''}),
2118            ("ab", 3)
2119        )
2120
2121        self.assertRaises(UnicodeDecodeError,
2122            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2123                                   {0: 'a', 1: 'b'}
2124        )
2125
2126        self.assertRaises(UnicodeDecodeError,
2127            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2128                                   {0: 'a', 1: 'b', 2: None}
2129        )
2130
2131        # Issue #14850
2132        self.assertRaises(UnicodeDecodeError,
2133            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2134                                   {0: 'a', 1: 'b', 2: '\ufffe'}
2135        )
2136
2137        self.assertEqual(
2138            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2139                                  {0: 'a', 1: 'b'}),
2140            ("ab\ufffd", 3)
2141        )
2142
2143        self.assertEqual(
2144            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2145                                  {0: 'a', 1: 'b', 2: None}),
2146            ("ab\ufffd", 3)
2147        )
2148
2149        # Issue #14850
2150        self.assertEqual(
2151            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2152                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2153            ("ab\ufffd", 3)
2154        )
2155
2156        self.assertEqual(
2157            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2158                                  {0: 'a', 1: 'b'}),
2159            ("ab\\x02", 3)
2160        )
2161
2162        self.assertEqual(
2163            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2164                                  {0: 'a', 1: 'b', 2: None}),
2165            ("ab\\x02", 3)
2166        )
2167
2168        # Issue #14850
2169        self.assertEqual(
2170            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2171                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2172            ("ab\\x02", 3)
2173        )
2174
2175        self.assertEqual(
2176            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2177                                  {0: 'a', 1: 'b'}),
2178            ("ab", 3)
2179        )
2180
2181        self.assertEqual(
2182            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2183                                  {0: 'a', 1: 'b', 2: None}),
2184            ("ab", 3)
2185        )
2186
2187        # Issue #14850
2188        self.assertEqual(
2189            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2190                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2191            ("ab", 3)
2192        )
2193
2194        allbytes = bytes(range(256))
2195        self.assertEqual(
2196            codecs.charmap_decode(allbytes, "ignore", {}),
2197            ("", len(allbytes))
2198        )
2199
2200        self.assertRaisesRegex(TypeError,
2201            "character mapping must be in range\\(0x110000\\)",
2202            codecs.charmap_decode,
2203            b"\x00\x01\x02", "strict", {0: "A", 1: 'Bb', 2: -2}
2204        )
2205
2206        self.assertRaisesRegex(TypeError,
2207            "character mapping must be in range\\(0x110000\\)",
2208            codecs.charmap_decode,
2209            b"\x00\x01\x02", "strict", {0: "A", 1: 'Bb', 2: 999999999}
2210        )
2211
2212    def test_decode_with_int2int_map(self):
2213        a = ord('a')
2214        b = ord('b')
2215        c = ord('c')
2216
2217        self.assertEqual(
2218            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2219                                  {0: a, 1: b, 2: c}),
2220            ("abc", 3)
2221        )
2222
2223        # Issue #15379
2224        self.assertEqual(
2225            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2226                                  {0: 0x10FFFF, 1: b, 2: c}),
2227            ("\U0010FFFFbc", 3)
2228        )
2229
2230        self.assertEqual(
2231            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2232                                  {0: sys.maxunicode, 1: b, 2: c}),
2233            (chr(sys.maxunicode) + "bc", 3)
2234        )
2235
2236        self.assertRaises(TypeError,
2237            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2238                                   {0: sys.maxunicode + 1, 1: b, 2: c}
2239        )
2240
2241        self.assertRaises(UnicodeDecodeError,
2242            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2243                                   {0: a, 1: b},
2244        )
2245
2246        self.assertRaises(UnicodeDecodeError,
2247            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2248                                   {0: a, 1: b, 2: 0xFFFE},
2249        )
2250
2251        self.assertEqual(
2252            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2253                                  {0: a, 1: b}),
2254            ("ab\ufffd", 3)
2255        )
2256
2257        self.assertEqual(
2258            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2259                                  {0: a, 1: b, 2: 0xFFFE}),
2260            ("ab\ufffd", 3)
2261        )
2262
2263        self.assertEqual(
2264            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2265                                  {0: a, 1: b}),
2266            ("ab\\x02", 3)
2267        )
2268
2269        self.assertEqual(
2270            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2271                                  {0: a, 1: b, 2: 0xFFFE}),
2272            ("ab\\x02", 3)
2273        )
2274
2275        self.assertEqual(
2276            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2277                                  {0: a, 1: b}),
2278            ("ab", 3)
2279        )
2280
2281        self.assertEqual(
2282            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2283                                  {0: a, 1: b, 2: 0xFFFE}),
2284            ("ab", 3)
2285        )
2286
2287
2288class WithStmtTest(unittest.TestCase):
2289    def test_encodedfile(self):
2290        f = io.BytesIO(b"\xc3\xbc")
2291        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
2292            self.assertEqual(ef.read(), b"\xfc")
2293        self.assertTrue(f.closed)
2294
2295    def test_streamreaderwriter(self):
2296        f = io.BytesIO(b"\xc3\xbc")
2297        info = codecs.lookup("utf-8")
2298        with codecs.StreamReaderWriter(f, info.streamreader,
2299                                       info.streamwriter, 'strict') as srw:
2300            self.assertEqual(srw.read(), "\xfc")
2301
2302
2303class TypesTest(unittest.TestCase):
2304    def test_decode_unicode(self):
2305        # Most decoders don't accept unicode input
2306        decoders = [
2307            codecs.utf_7_decode,
2308            codecs.utf_8_decode,
2309            codecs.utf_16_le_decode,
2310            codecs.utf_16_be_decode,
2311            codecs.utf_16_ex_decode,
2312            codecs.utf_32_decode,
2313            codecs.utf_32_le_decode,
2314            codecs.utf_32_be_decode,
2315            codecs.utf_32_ex_decode,
2316            codecs.latin_1_decode,
2317            codecs.ascii_decode,
2318            codecs.charmap_decode,
2319        ]
2320        if hasattr(codecs, "mbcs_decode"):
2321            decoders.append(codecs.mbcs_decode)
2322        for decoder in decoders:
2323            self.assertRaises(TypeError, decoder, "xxx")
2324
2325    def test_unicode_escape(self):
2326        # Escape-decoding a unicode string is supported and gives the same
2327        # result as decoding the equivalent ASCII bytes string.
2328        self.assertEqual(codecs.unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2329        self.assertEqual(codecs.unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2330        self.assertEqual(codecs.raw_unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2331        self.assertEqual(codecs.raw_unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2332
2333        self.assertRaises(UnicodeDecodeError, codecs.unicode_escape_decode, br"\U00110000")
2334        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2335        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "backslashreplace"),
2336                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2337
2338        self.assertRaises(UnicodeDecodeError, codecs.raw_unicode_escape_decode, br"\U00110000")
2339        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2340        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "backslashreplace"),
2341                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2342
2343
2344class UnicodeEscapeTest(ReadTest, unittest.TestCase):
2345    encoding = "unicode-escape"
2346
2347    test_lone_surrogates = None
2348
2349    def test_empty(self):
2350        self.assertEqual(codecs.unicode_escape_encode(""), (b"", 0))
2351        self.assertEqual(codecs.unicode_escape_decode(b""), ("", 0))
2352
2353    def test_raw_encode(self):
2354        encode = codecs.unicode_escape_encode
2355        for b in range(32, 127):
2356            if b != b'\\'[0]:
2357                self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2358
2359    def test_raw_decode(self):
2360        decode = codecs.unicode_escape_decode
2361        for b in range(256):
2362            if b != b'\\'[0]:
2363                self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2364
2365    def test_escape_encode(self):
2366        encode = codecs.unicode_escape_encode
2367        check = coding_checker(self, encode)
2368        check('\t', br'\t')
2369        check('\n', br'\n')
2370        check('\r', br'\r')
2371        check('\\', br'\\')
2372        for b in range(32):
2373            if chr(b) not in '\t\n\r':
2374                check(chr(b), ('\\x%02x' % b).encode())
2375        for b in range(127, 256):
2376            check(chr(b), ('\\x%02x' % b).encode())
2377        check('\u20ac', br'\u20ac')
2378        check('\U0001d120', br'\U0001d120')
2379
2380    def test_escape_decode(self):
2381        decode = codecs.unicode_escape_decode
2382        check = coding_checker(self, decode)
2383        check(b"[\\\n]", "[]")
2384        check(br'[\"]', '["]')
2385        check(br"[\']", "[']")
2386        check(br"[\\]", r"[\]")
2387        check(br"[\a]", "[\x07]")
2388        check(br"[\b]", "[\x08]")
2389        check(br"[\t]", "[\x09]")
2390        check(br"[\n]", "[\x0a]")
2391        check(br"[\v]", "[\x0b]")
2392        check(br"[\f]", "[\x0c]")
2393        check(br"[\r]", "[\x0d]")
2394        check(br"[\7]", "[\x07]")
2395        check(br"[\78]", "[\x078]")
2396        check(br"[\41]", "[!]")
2397        check(br"[\418]", "[!8]")
2398        check(br"[\101]", "[A]")
2399        check(br"[\1010]", "[A0]")
2400        check(br"[\x41]", "[A]")
2401        check(br"[\x410]", "[A0]")
2402        check(br"\u20ac", "\u20ac")
2403        check(br"\U0001d120", "\U0001d120")
2404        for i in range(97, 123):
2405            b = bytes([i])
2406            if b not in b'abfnrtuvx':
2407                with self.assertWarns(DeprecationWarning):
2408                    check(b"\\" + b, "\\" + chr(i))
2409            if b.upper() not in b'UN':
2410                with self.assertWarns(DeprecationWarning):
2411                    check(b"\\" + b.upper(), "\\" + chr(i-32))
2412        with self.assertWarns(DeprecationWarning):
2413            check(br"\8", "\\8")
2414        with self.assertWarns(DeprecationWarning):
2415            check(br"\9", "\\9")
2416        with self.assertWarns(DeprecationWarning):
2417            check(b"\\\xfa", "\\\xfa")
2418
2419    def test_decode_errors(self):
2420        decode = codecs.unicode_escape_decode
2421        for c, d in (b'x', 2), (b'u', 4), (b'U', 4):
2422            for i in range(d):
2423                self.assertRaises(UnicodeDecodeError, decode,
2424                                  b"\\" + c + b"0"*i)
2425                self.assertRaises(UnicodeDecodeError, decode,
2426                                  b"[\\" + c + b"0"*i + b"]")
2427                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2428                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2429                self.assertEqual(decode(data, "replace"),
2430                                 ("[\ufffd]\ufffd", len(data)))
2431        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2432        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2433        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2434
2435    def test_partial(self):
2436        self.check_partial(
2437            "\x00\t\n\r\\\xff\uffff\U00010000",
2438            [
2439                '',
2440                '',
2441                '',
2442                '\x00',
2443                '\x00',
2444                '\x00\t',
2445                '\x00\t',
2446                '\x00\t\n',
2447                '\x00\t\n',
2448                '\x00\t\n\r',
2449                '\x00\t\n\r',
2450                '\x00\t\n\r\\',
2451                '\x00\t\n\r\\',
2452                '\x00\t\n\r\\',
2453                '\x00\t\n\r\\',
2454                '\x00\t\n\r\\\xff',
2455                '\x00\t\n\r\\\xff',
2456                '\x00\t\n\r\\\xff',
2457                '\x00\t\n\r\\\xff',
2458                '\x00\t\n\r\\\xff',
2459                '\x00\t\n\r\\\xff',
2460                '\x00\t\n\r\\\xff\uffff',
2461                '\x00\t\n\r\\\xff\uffff',
2462                '\x00\t\n\r\\\xff\uffff',
2463                '\x00\t\n\r\\\xff\uffff',
2464                '\x00\t\n\r\\\xff\uffff',
2465                '\x00\t\n\r\\\xff\uffff',
2466                '\x00\t\n\r\\\xff\uffff',
2467                '\x00\t\n\r\\\xff\uffff',
2468                '\x00\t\n\r\\\xff\uffff',
2469                '\x00\t\n\r\\\xff\uffff',
2470                '\x00\t\n\r\\\xff\uffff\U00010000',
2471            ]
2472        )
2473
2474class RawUnicodeEscapeTest(ReadTest, unittest.TestCase):
2475    encoding = "raw-unicode-escape"
2476
2477    test_lone_surrogates = None
2478
2479    def test_empty(self):
2480        self.assertEqual(codecs.raw_unicode_escape_encode(""), (b"", 0))
2481        self.assertEqual(codecs.raw_unicode_escape_decode(b""), ("", 0))
2482
2483    def test_raw_encode(self):
2484        encode = codecs.raw_unicode_escape_encode
2485        for b in range(256):
2486            self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2487
2488    def test_raw_decode(self):
2489        decode = codecs.raw_unicode_escape_decode
2490        for b in range(256):
2491            self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2492
2493    def test_escape_encode(self):
2494        encode = codecs.raw_unicode_escape_encode
2495        check = coding_checker(self, encode)
2496        for b in range(256):
2497            if b not in b'uU':
2498                check('\\' + chr(b), b'\\' + bytes([b]))
2499        check('\u20ac', br'\u20ac')
2500        check('\U0001d120', br'\U0001d120')
2501
2502    def test_escape_decode(self):
2503        decode = codecs.raw_unicode_escape_decode
2504        check = coding_checker(self, decode)
2505        for b in range(256):
2506            if b not in b'uU':
2507                check(b'\\' + bytes([b]), '\\' + chr(b))
2508        check(br"\u20ac", "\u20ac")
2509        check(br"\U0001d120", "\U0001d120")
2510
2511    def test_decode_errors(self):
2512        decode = codecs.raw_unicode_escape_decode
2513        for c, d in (b'u', 4), (b'U', 4):
2514            for i in range(d):
2515                self.assertRaises(UnicodeDecodeError, decode,
2516                                  b"\\" + c + b"0"*i)
2517                self.assertRaises(UnicodeDecodeError, decode,
2518                                  b"[\\" + c + b"0"*i + b"]")
2519                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2520                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2521                self.assertEqual(decode(data, "replace"),
2522                                 ("[\ufffd]\ufffd", len(data)))
2523        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2524        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2525        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2526
2527    def test_partial(self):
2528        self.check_partial(
2529            "\x00\t\n\r\\\xff\uffff\U00010000",
2530            [
2531                '\x00',
2532                '\x00\t',
2533                '\x00\t\n',
2534                '\x00\t\n\r',
2535                '\x00\t\n\r',
2536                '\x00\t\n\r\\\xff',
2537                '\x00\t\n\r\\\xff',
2538                '\x00\t\n\r\\\xff',
2539                '\x00\t\n\r\\\xff',
2540                '\x00\t\n\r\\\xff',
2541                '\x00\t\n\r\\\xff',
2542                '\x00\t\n\r\\\xff\uffff',
2543                '\x00\t\n\r\\\xff\uffff',
2544                '\x00\t\n\r\\\xff\uffff',
2545                '\x00\t\n\r\\\xff\uffff',
2546                '\x00\t\n\r\\\xff\uffff',
2547                '\x00\t\n\r\\\xff\uffff',
2548                '\x00\t\n\r\\\xff\uffff',
2549                '\x00\t\n\r\\\xff\uffff',
2550                '\x00\t\n\r\\\xff\uffff',
2551                '\x00\t\n\r\\\xff\uffff',
2552                '\x00\t\n\r\\\xff\uffff\U00010000',
2553            ]
2554        )
2555
2556
2557class EscapeEncodeTest(unittest.TestCase):
2558
2559    def test_escape_encode(self):
2560        tests = [
2561            (b'', (b'', 0)),
2562            (b'foobar', (b'foobar', 6)),
2563            (b'spam\0eggs', (b'spam\\x00eggs', 9)),
2564            (b'a\'b', (b"a\\'b", 3)),
2565            (b'b\\c', (b'b\\\\c', 3)),
2566            (b'c\nd', (b'c\\nd', 3)),
2567            (b'd\re', (b'd\\re', 3)),
2568            (b'f\x7fg', (b'f\\x7fg', 3)),
2569        ]
2570        for data, output in tests:
2571            with self.subTest(data=data):
2572                self.assertEqual(codecs.escape_encode(data), output)
2573        self.assertRaises(TypeError, codecs.escape_encode, 'spam')
2574        self.assertRaises(TypeError, codecs.escape_encode, bytearray(b'spam'))
2575
2576
2577class SurrogateEscapeTest(unittest.TestCase):
2578
2579    def test_utf8(self):
2580        # Bad byte
2581        self.assertEqual(b"foo\x80bar".decode("utf-8", "surrogateescape"),
2582                         "foo\udc80bar")
2583        self.assertEqual("foo\udc80bar".encode("utf-8", "surrogateescape"),
2584                         b"foo\x80bar")
2585        # bad-utf-8 encoded surrogate
2586        self.assertEqual(b"\xed\xb0\x80".decode("utf-8", "surrogateescape"),
2587                         "\udced\udcb0\udc80")
2588        self.assertEqual("\udced\udcb0\udc80".encode("utf-8", "surrogateescape"),
2589                         b"\xed\xb0\x80")
2590
2591    def test_ascii(self):
2592        # bad byte
2593        self.assertEqual(b"foo\x80bar".decode("ascii", "surrogateescape"),
2594                         "foo\udc80bar")
2595        self.assertEqual("foo\udc80bar".encode("ascii", "surrogateescape"),
2596                         b"foo\x80bar")
2597
2598    def test_charmap(self):
2599        # bad byte: \xa5 is unmapped in iso-8859-3
2600        self.assertEqual(b"foo\xa5bar".decode("iso-8859-3", "surrogateescape"),
2601                         "foo\udca5bar")
2602        self.assertEqual("foo\udca5bar".encode("iso-8859-3", "surrogateescape"),
2603                         b"foo\xa5bar")
2604
2605    def test_latin1(self):
2606        # Issue6373
2607        self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin-1", "surrogateescape"),
2608                         b"\xe4\xeb\xef\xf6\xfc")
2609
2610
2611class BomTest(unittest.TestCase):
2612    def test_seek0(self):
2613        data = "1234567890"
2614        tests = ("utf-16",
2615                 "utf-16-le",
2616                 "utf-16-be",
2617                 "utf-32",
2618                 "utf-32-le",
2619                 "utf-32-be")
2620        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2621        for encoding in tests:
2622            # Check if the BOM is written only once
2623            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2624                f.write(data)
2625                f.write(data)
2626                f.seek(0)
2627                self.assertEqual(f.read(), data * 2)
2628                f.seek(0)
2629                self.assertEqual(f.read(), data * 2)
2630
2631            # Check that the BOM is written after a seek(0)
2632            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2633                f.write(data[0])
2634                self.assertNotEqual(f.tell(), 0)
2635                f.seek(0)
2636                f.write(data)
2637                f.seek(0)
2638                self.assertEqual(f.read(), data)
2639
2640            # (StreamWriter) Check that the BOM is written after a seek(0)
2641            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2642                f.writer.write(data[0])
2643                self.assertNotEqual(f.writer.tell(), 0)
2644                f.writer.seek(0)
2645                f.writer.write(data)
2646                f.seek(0)
2647                self.assertEqual(f.read(), data)
2648
2649            # Check that the BOM is not written after a seek() at a position
2650            # different than the start
2651            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2652                f.write(data)
2653                f.seek(f.tell())
2654                f.write(data)
2655                f.seek(0)
2656                self.assertEqual(f.read(), data * 2)
2657
2658            # (StreamWriter) Check that the BOM is not written after a seek()
2659            # at a position different than the start
2660            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2661                f.writer.write(data)
2662                f.writer.seek(f.writer.tell())
2663                f.writer.write(data)
2664                f.seek(0)
2665                self.assertEqual(f.read(), data * 2)
2666
2667
2668bytes_transform_encodings = [
2669    "base64_codec",
2670    "uu_codec",
2671    "quopri_codec",
2672    "hex_codec",
2673]
2674
2675transform_aliases = {
2676    "base64_codec": ["base64", "base_64"],
2677    "uu_codec": ["uu"],
2678    "quopri_codec": ["quopri", "quoted_printable", "quotedprintable"],
2679    "hex_codec": ["hex"],
2680    "rot_13": ["rot13"],
2681}
2682
2683try:
2684    import zlib
2685except ImportError:
2686    zlib = None
2687else:
2688    bytes_transform_encodings.append("zlib_codec")
2689    transform_aliases["zlib_codec"] = ["zip", "zlib"]
2690try:
2691    import bz2
2692except ImportError:
2693    pass
2694else:
2695    bytes_transform_encodings.append("bz2_codec")
2696    transform_aliases["bz2_codec"] = ["bz2"]
2697
2698
2699class TransformCodecTest(unittest.TestCase):
2700
2701    def test_basics(self):
2702        binput = bytes(range(256))
2703        for encoding in bytes_transform_encodings:
2704            with self.subTest(encoding=encoding):
2705                # generic codecs interface
2706                (o, size) = codecs.getencoder(encoding)(binput)
2707                self.assertEqual(size, len(binput))
2708                (i, size) = codecs.getdecoder(encoding)(o)
2709                self.assertEqual(size, len(o))
2710                self.assertEqual(i, binput)
2711
2712    def test_read(self):
2713        for encoding in bytes_transform_encodings:
2714            with self.subTest(encoding=encoding):
2715                sin = codecs.encode(b"\x80", encoding)
2716                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2717                sout = reader.read()
2718                self.assertEqual(sout, b"\x80")
2719
2720    def test_readline(self):
2721        for encoding in bytes_transform_encodings:
2722            with self.subTest(encoding=encoding):
2723                sin = codecs.encode(b"\x80", encoding)
2724                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2725                sout = reader.readline()
2726                self.assertEqual(sout, b"\x80")
2727
2728    def test_buffer_api_usage(self):
2729        # We check all the transform codecs accept memoryview input
2730        # for encoding and decoding
2731        # and also that they roundtrip correctly
2732        original = b"12345\x80"
2733        for encoding in bytes_transform_encodings:
2734            with self.subTest(encoding=encoding):
2735                data = original
2736                view = memoryview(data)
2737                data = codecs.encode(data, encoding)
2738                view_encoded = codecs.encode(view, encoding)
2739                self.assertEqual(view_encoded, data)
2740                view = memoryview(data)
2741                data = codecs.decode(data, encoding)
2742                self.assertEqual(data, original)
2743                view_decoded = codecs.decode(view, encoding)
2744                self.assertEqual(view_decoded, data)
2745
2746    def test_text_to_binary_denylists_binary_transforms(self):
2747        # Check binary -> binary codecs give a good error for str input
2748        bad_input = "bad input type"
2749        for encoding in bytes_transform_encodings:
2750            with self.subTest(encoding=encoding):
2751                fmt = (r"{!r} is not a text encoding; "
2752                       r"use codecs.encode\(\) to handle arbitrary codecs")
2753                msg = fmt.format(encoding)
2754                with self.assertRaisesRegex(LookupError, msg) as failure:
2755                    bad_input.encode(encoding)
2756                self.assertIsNone(failure.exception.__cause__)
2757
2758    def test_text_to_binary_denylists_text_transforms(self):
2759        # Check str.encode gives a good error message for str -> str codecs
2760        msg = (r"^'rot_13' is not a text encoding; "
2761               r"use codecs.encode\(\) to handle arbitrary codecs")
2762        with self.assertRaisesRegex(LookupError, msg):
2763            "just an example message".encode("rot_13")
2764
2765    def test_binary_to_text_denylists_binary_transforms(self):
2766        # Check bytes.decode and bytearray.decode give a good error
2767        # message for binary -> binary codecs
2768        data = b"encode first to ensure we meet any format restrictions"
2769        for encoding in bytes_transform_encodings:
2770            with self.subTest(encoding=encoding):
2771                encoded_data = codecs.encode(data, encoding)
2772                fmt = (r"{!r} is not a text encoding; "
2773                       r"use codecs.decode\(\) to handle arbitrary codecs")
2774                msg = fmt.format(encoding)
2775                with self.assertRaisesRegex(LookupError, msg):
2776                    encoded_data.decode(encoding)
2777                with self.assertRaisesRegex(LookupError, msg):
2778                    bytearray(encoded_data).decode(encoding)
2779
2780    def test_binary_to_text_denylists_text_transforms(self):
2781        # Check str -> str codec gives a good error for binary input
2782        for bad_input in (b"immutable", bytearray(b"mutable")):
2783            with self.subTest(bad_input=bad_input):
2784                msg = (r"^'rot_13' is not a text encoding; "
2785                       r"use codecs.decode\(\) to handle arbitrary codecs")
2786                with self.assertRaisesRegex(LookupError, msg) as failure:
2787                    bad_input.decode("rot_13")
2788                self.assertIsNone(failure.exception.__cause__)
2789
2790    @unittest.skipUnless(zlib, "Requires zlib support")
2791    def test_custom_zlib_error_is_wrapped(self):
2792        # Check zlib codec gives a good error for malformed input
2793        msg = "^decoding with 'zlib_codec' codec failed"
2794        with self.assertRaisesRegex(Exception, msg) as failure:
2795            codecs.decode(b"hello", "zlib_codec")
2796        self.assertIsInstance(failure.exception.__cause__,
2797                                                type(failure.exception))
2798
2799    def test_custom_hex_error_is_wrapped(self):
2800        # Check hex codec gives a good error for malformed input
2801        msg = "^decoding with 'hex_codec' codec failed"
2802        with self.assertRaisesRegex(Exception, msg) as failure:
2803            codecs.decode(b"hello", "hex_codec")
2804        self.assertIsInstance(failure.exception.__cause__,
2805                                                type(failure.exception))
2806
2807    # Unfortunately, the bz2 module throws OSError, which the codec
2808    # machinery currently can't wrap :(
2809
2810    # Ensure codec aliases from http://bugs.python.org/issue7475 work
2811    def test_aliases(self):
2812        for codec_name, aliases in transform_aliases.items():
2813            expected_name = codecs.lookup(codec_name).name
2814            for alias in aliases:
2815                with self.subTest(alias=alias):
2816                    info = codecs.lookup(alias)
2817                    self.assertEqual(info.name, expected_name)
2818
2819    def test_quopri_stateless(self):
2820        # Should encode with quotetabs=True
2821        encoded = codecs.encode(b"space tab\teol \n", "quopri-codec")
2822        self.assertEqual(encoded, b"space=20tab=09eol=20\n")
2823        # But should still support unescaped tabs and spaces
2824        unescaped = b"space tab eol\n"
2825        self.assertEqual(codecs.decode(unescaped, "quopri-codec"), unescaped)
2826
2827    def test_uu_invalid(self):
2828        # Missing "begin" line
2829        self.assertRaises(ValueError, codecs.decode, b"", "uu-codec")
2830
2831
2832# The codec system tries to wrap exceptions in order to ensure the error
2833# mentions the operation being performed and the codec involved. We
2834# currently *only* want this to happen for relatively stateless
2835# exceptions, where the only significant information they contain is their
2836# type and a single str argument.
2837
2838# Use a local codec registry to avoid appearing to leak objects when
2839# registering multiple search functions
2840_TEST_CODECS = {}
2841
2842def _get_test_codec(codec_name):
2843    return _TEST_CODECS.get(codec_name)
2844
2845
2846class ExceptionChainingTest(unittest.TestCase):
2847
2848    def setUp(self):
2849        self.codec_name = 'exception_chaining_test'
2850        codecs.register(_get_test_codec)
2851        self.addCleanup(codecs.unregister, _get_test_codec)
2852
2853        # We store the object to raise on the instance because of a bad
2854        # interaction between the codec caching (which means we can't
2855        # recreate the codec entry) and regrtest refleak hunting (which
2856        # runs the same test instance multiple times). This means we
2857        # need to ensure the codecs call back in to the instance to find
2858        # out which exception to raise rather than binding them in a
2859        # closure to an object that may change on the next run
2860        self.obj_to_raise = RuntimeError
2861
2862    def tearDown(self):
2863        _TEST_CODECS.pop(self.codec_name, None)
2864        # Issue #22166: Also pop from caches to avoid appearance of ref leaks
2865        encodings._cache.pop(self.codec_name, None)
2866
2867    def set_codec(self, encode, decode):
2868        codec_info = codecs.CodecInfo(encode, decode,
2869                                      name=self.codec_name)
2870        _TEST_CODECS[self.codec_name] = codec_info
2871
2872    @contextlib.contextmanager
2873    def assertWrapped(self, operation, exc_type, msg):
2874        full_msg = r"{} with {!r} codec failed \({}: {}\)".format(
2875                  operation, self.codec_name, exc_type.__name__, msg)
2876        with self.assertRaisesRegex(exc_type, full_msg) as caught:
2877            yield caught
2878        self.assertIsInstance(caught.exception.__cause__, exc_type)
2879        self.assertIsNotNone(caught.exception.__cause__.__traceback__)
2880
2881    def raise_obj(self, *args, **kwds):
2882        # Helper to dynamically change the object raised by a test codec
2883        raise self.obj_to_raise
2884
2885    def check_wrapped(self, obj_to_raise, msg, exc_type=RuntimeError):
2886        self.obj_to_raise = obj_to_raise
2887        self.set_codec(self.raise_obj, self.raise_obj)
2888        with self.assertWrapped("encoding", exc_type, msg):
2889            "str_input".encode(self.codec_name)
2890        with self.assertWrapped("encoding", exc_type, msg):
2891            codecs.encode("str_input", self.codec_name)
2892        with self.assertWrapped("decoding", exc_type, msg):
2893            b"bytes input".decode(self.codec_name)
2894        with self.assertWrapped("decoding", exc_type, msg):
2895            codecs.decode(b"bytes input", self.codec_name)
2896
2897    def test_raise_by_type(self):
2898        self.check_wrapped(RuntimeError, "")
2899
2900    def test_raise_by_value(self):
2901        msg = "This should be wrapped"
2902        self.check_wrapped(RuntimeError(msg), msg)
2903
2904    def test_raise_grandchild_subclass_exact_size(self):
2905        msg = "This should be wrapped"
2906        class MyRuntimeError(RuntimeError):
2907            __slots__ = ()
2908        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2909
2910    def test_raise_subclass_with_weakref_support(self):
2911        msg = "This should be wrapped"
2912        class MyRuntimeError(RuntimeError):
2913            pass
2914        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2915
2916    def check_not_wrapped(self, obj_to_raise, msg):
2917        def raise_obj(*args, **kwds):
2918            raise obj_to_raise
2919        self.set_codec(raise_obj, raise_obj)
2920        with self.assertRaisesRegex(RuntimeError, msg):
2921            "str input".encode(self.codec_name)
2922        with self.assertRaisesRegex(RuntimeError, msg):
2923            codecs.encode("str input", self.codec_name)
2924        with self.assertRaisesRegex(RuntimeError, msg):
2925            b"bytes input".decode(self.codec_name)
2926        with self.assertRaisesRegex(RuntimeError, msg):
2927            codecs.decode(b"bytes input", self.codec_name)
2928
2929    def test_init_override_is_not_wrapped(self):
2930        class CustomInit(RuntimeError):
2931            def __init__(self):
2932                pass
2933        self.check_not_wrapped(CustomInit, "")
2934
2935    def test_new_override_is_not_wrapped(self):
2936        class CustomNew(RuntimeError):
2937            def __new__(cls):
2938                return super().__new__(cls)
2939        self.check_not_wrapped(CustomNew, "")
2940
2941    def test_instance_attribute_is_not_wrapped(self):
2942        msg = "This should NOT be wrapped"
2943        exc = RuntimeError(msg)
2944        exc.attr = 1
2945        self.check_not_wrapped(exc, "^{}$".format(msg))
2946
2947    def test_non_str_arg_is_not_wrapped(self):
2948        self.check_not_wrapped(RuntimeError(1), "1")
2949
2950    def test_multiple_args_is_not_wrapped(self):
2951        msg_re = r"^\('a', 'b', 'c'\)$"
2952        self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg_re)
2953
2954    # http://bugs.python.org/issue19609
2955    def test_codec_lookup_failure_not_wrapped(self):
2956        msg = "^unknown encoding: {}$".format(self.codec_name)
2957        # The initial codec lookup should not be wrapped
2958        with self.assertRaisesRegex(LookupError, msg):
2959            "str input".encode(self.codec_name)
2960        with self.assertRaisesRegex(LookupError, msg):
2961            codecs.encode("str input", self.codec_name)
2962        with self.assertRaisesRegex(LookupError, msg):
2963            b"bytes input".decode(self.codec_name)
2964        with self.assertRaisesRegex(LookupError, msg):
2965            codecs.decode(b"bytes input", self.codec_name)
2966
2967    def test_unflagged_non_text_codec_handling(self):
2968        # The stdlib non-text codecs are now marked so they're
2969        # pre-emptively skipped by the text model related methods
2970        # However, third party codecs won't be flagged, so we still make
2971        # sure the case where an inappropriate output type is produced is
2972        # handled appropriately
2973        def encode_to_str(*args, **kwds):
2974            return "not bytes!", 0
2975        def decode_to_bytes(*args, **kwds):
2976            return b"not str!", 0
2977        self.set_codec(encode_to_str, decode_to_bytes)
2978        # No input or output type checks on the codecs module functions
2979        encoded = codecs.encode(None, self.codec_name)
2980        self.assertEqual(encoded, "not bytes!")
2981        decoded = codecs.decode(None, self.codec_name)
2982        self.assertEqual(decoded, b"not str!")
2983        # Text model methods should complain
2984        fmt = (r"^{!r} encoder returned 'str' instead of 'bytes'; "
2985               r"use codecs.encode\(\) to encode to arbitrary types$")
2986        msg = fmt.format(self.codec_name)
2987        with self.assertRaisesRegex(TypeError, msg):
2988            "str_input".encode(self.codec_name)
2989        fmt = (r"^{!r} decoder returned 'bytes' instead of 'str'; "
2990               r"use codecs.decode\(\) to decode to arbitrary types$")
2991        msg = fmt.format(self.codec_name)
2992        with self.assertRaisesRegex(TypeError, msg):
2993            b"bytes input".decode(self.codec_name)
2994
2995
2996
2997@unittest.skipUnless(sys.platform == 'win32',
2998                     'code pages are specific to Windows')
2999class CodePageTest(unittest.TestCase):
3000    CP_UTF8 = 65001
3001
3002    def test_invalid_code_page(self):
3003        self.assertRaises(ValueError, codecs.code_page_encode, -1, 'a')
3004        self.assertRaises(ValueError, codecs.code_page_decode, -1, b'a')
3005        self.assertRaises(OSError, codecs.code_page_encode, 123, 'a')
3006        self.assertRaises(OSError, codecs.code_page_decode, 123, b'a')
3007
3008    def test_code_page_name(self):
3009        self.assertRaisesRegex(UnicodeEncodeError, 'cp932',
3010            codecs.code_page_encode, 932, '\xff')
3011        self.assertRaisesRegex(UnicodeDecodeError, 'cp932',
3012            codecs.code_page_decode, 932, b'\x81\x00', 'strict', True)
3013        self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8',
3014            codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True)
3015
3016    def check_decode(self, cp, tests):
3017        for raw, errors, expected in tests:
3018            if expected is not None:
3019                try:
3020                    decoded = codecs.code_page_decode(cp, raw, errors, True)
3021                except UnicodeDecodeError as err:
3022                    self.fail('Unable to decode %a from "cp%s" with '
3023                              'errors=%r: %s' % (raw, cp, errors, err))
3024                self.assertEqual(decoded[0], expected,
3025                    '%a.decode("cp%s", %r)=%a != %a'
3026                    % (raw, cp, errors, decoded[0], expected))
3027                # assert 0 <= decoded[1] <= len(raw)
3028                self.assertGreaterEqual(decoded[1], 0)
3029                self.assertLessEqual(decoded[1], len(raw))
3030            else:
3031                self.assertRaises(UnicodeDecodeError,
3032                    codecs.code_page_decode, cp, raw, errors, True)
3033
3034    def check_encode(self, cp, tests):
3035        for text, errors, expected in tests:
3036            if expected is not None:
3037                try:
3038                    encoded = codecs.code_page_encode(cp, text, errors)
3039                except UnicodeEncodeError as err:
3040                    self.fail('Unable to encode %a to "cp%s" with '
3041                              'errors=%r: %s' % (text, cp, errors, err))
3042                self.assertEqual(encoded[0], expected,
3043                    '%a.encode("cp%s", %r)=%a != %a'
3044                    % (text, cp, errors, encoded[0], expected))
3045                self.assertEqual(encoded[1], len(text))
3046            else:
3047                self.assertRaises(UnicodeEncodeError,
3048                    codecs.code_page_encode, cp, text, errors)
3049
3050    def test_cp932(self):
3051        self.check_encode(932, (
3052            ('abc', 'strict', b'abc'),
3053            ('\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
3054            # test error handlers
3055            ('\xff', 'strict', None),
3056            ('[\xff]', 'ignore', b'[]'),
3057            ('[\xff]', 'replace', b'[y]'),
3058            ('[\u20ac]', 'replace', b'[?]'),
3059            ('[\xff]', 'backslashreplace', b'[\\xff]'),
3060            ('[\xff]', 'namereplace',
3061             b'[\\N{LATIN SMALL LETTER Y WITH DIAERESIS}]'),
3062            ('[\xff]', 'xmlcharrefreplace', b'[&#255;]'),
3063            ('\udcff', 'strict', None),
3064            ('[\udcff]', 'surrogateescape', b'[\xff]'),
3065            ('[\udcff]', 'surrogatepass', None),
3066        ))
3067        self.check_decode(932, (
3068            (b'abc', 'strict', 'abc'),
3069            (b'\x82\x84\xe9\x80', 'strict', '\uff44\u9a3e'),
3070            # invalid bytes
3071            (b'[\xff]', 'strict', None),
3072            (b'[\xff]', 'ignore', '[]'),
3073            (b'[\xff]', 'replace', '[\ufffd]'),
3074            (b'[\xff]', 'backslashreplace', '[\\xff]'),
3075            (b'[\xff]', 'surrogateescape', '[\udcff]'),
3076            (b'[\xff]', 'surrogatepass', None),
3077            (b'\x81\x00abc', 'strict', None),
3078            (b'\x81\x00abc', 'ignore', '\x00abc'),
3079            (b'\x81\x00abc', 'replace', '\ufffd\x00abc'),
3080            (b'\x81\x00abc', 'backslashreplace', '\\x81\x00abc'),
3081        ))
3082
3083    def test_cp1252(self):
3084        self.check_encode(1252, (
3085            ('abc', 'strict', b'abc'),
3086            ('\xe9\u20ac', 'strict',  b'\xe9\x80'),
3087            ('\xff', 'strict', b'\xff'),
3088            # test error handlers
3089            ('\u0141', 'strict', None),
3090            ('\u0141', 'ignore', b''),
3091            ('\u0141', 'replace', b'L'),
3092            ('\udc98', 'surrogateescape', b'\x98'),
3093            ('\udc98', 'surrogatepass', None),
3094        ))
3095        self.check_decode(1252, (
3096            (b'abc', 'strict', 'abc'),
3097            (b'\xe9\x80', 'strict', '\xe9\u20ac'),
3098            (b'\xff', 'strict', '\xff'),
3099        ))
3100
3101    def test_cp_utf7(self):
3102        cp = 65000
3103        self.check_encode(cp, (
3104            ('abc', 'strict', b'abc'),
3105            ('\xe9\u20ac', 'strict',  b'+AOkgrA-'),
3106            ('\U0010ffff', 'strict',  b'+2//f/w-'),
3107            ('\udc80', 'strict', b'+3IA-'),
3108            ('\ufffd', 'strict', b'+//0-'),
3109        ))
3110        self.check_decode(cp, (
3111            (b'abc', 'strict', 'abc'),
3112            (b'+AOkgrA-', 'strict', '\xe9\u20ac'),
3113            (b'+2//f/w-', 'strict', '\U0010ffff'),
3114            (b'+3IA-', 'strict', '\udc80'),
3115            (b'+//0-', 'strict', '\ufffd'),
3116            # invalid bytes
3117            (b'[+/]', 'strict', '[]'),
3118            (b'[\xff]', 'strict', '[\xff]'),
3119        ))
3120
3121    def test_multibyte_encoding(self):
3122        self.check_decode(932, (
3123            (b'\x84\xe9\x80', 'ignore', '\u9a3e'),
3124            (b'\x84\xe9\x80', 'replace', '\ufffd\u9a3e'),
3125        ))
3126        self.check_decode(self.CP_UTF8, (
3127            (b'\xff\xf4\x8f\xbf\xbf', 'ignore', '\U0010ffff'),
3128            (b'\xff\xf4\x8f\xbf\xbf', 'replace', '\ufffd\U0010ffff'),
3129        ))
3130        self.check_encode(self.CP_UTF8, (
3131            ('[\U0010ffff\uDC80]', 'ignore', b'[\xf4\x8f\xbf\xbf]'),
3132            ('[\U0010ffff\uDC80]', 'replace', b'[\xf4\x8f\xbf\xbf?]'),
3133        ))
3134
3135    def test_code_page_decode_flags(self):
3136        # Issue #36312: For some code pages (e.g. UTF-7) flags for
3137        # MultiByteToWideChar() must be set to 0.
3138        if support.verbose:
3139            sys.stdout.write('\n')
3140        for cp in (50220, 50221, 50222, 50225, 50227, 50229,
3141                   *range(57002, 57011+1), 65000):
3142            # On small versions of Windows like Windows IoT
3143            # not all codepages are present.
3144            # A missing codepage causes an OSError exception
3145            # so check for the codepage before decoding
3146            if is_code_page_present(cp):
3147                self.assertEqual(codecs.code_page_decode(cp, b'abc'), ('abc', 3), f'cp{cp}')
3148            else:
3149                if support.verbose:
3150                    print(f"  skipping cp={cp}")
3151        self.assertEqual(codecs.code_page_decode(42, b'abc'),
3152                         ('\uf061\uf062\uf063', 3))
3153
3154    def test_incremental(self):
3155        decoded = codecs.code_page_decode(932, b'\x82', 'strict', False)
3156        self.assertEqual(decoded, ('', 0))
3157
3158        decoded = codecs.code_page_decode(932,
3159                                          b'\xe9\x80\xe9', 'strict',
3160                                          False)
3161        self.assertEqual(decoded, ('\u9a3e', 2))
3162
3163        decoded = codecs.code_page_decode(932,
3164                                          b'\xe9\x80\xe9\x80', 'strict',
3165                                          False)
3166        self.assertEqual(decoded, ('\u9a3e\u9a3e', 4))
3167
3168        decoded = codecs.code_page_decode(932,
3169                                          b'abc', 'strict',
3170                                          False)
3171        self.assertEqual(decoded, ('abc', 3))
3172
3173    def test_mbcs_alias(self):
3174        # Check that looking up our 'default' codepage will return
3175        # mbcs when we don't have a more specific one available
3176        with mock.patch('_winapi.GetACP', return_value=123):
3177            codec = codecs.lookup('cp123')
3178            self.assertEqual(codec.name, 'mbcs')
3179
3180    @support.bigmemtest(size=2**31, memuse=7, dry_run=False)
3181    def test_large_input(self, size):
3182        # Test input longer than INT_MAX.
3183        # Input should contain undecodable bytes before and after
3184        # the INT_MAX limit.
3185        encoded = (b'01234567' * ((size//8)-1) +
3186                   b'\x85\x86\xea\xeb\xec\xef\xfc\xfd\xfe\xff')
3187        self.assertEqual(len(encoded), size+2)
3188        decoded = codecs.code_page_decode(932, encoded, 'surrogateescape', True)
3189        self.assertEqual(decoded[1], len(encoded))
3190        del encoded
3191        self.assertEqual(len(decoded[0]), decoded[1])
3192        self.assertEqual(decoded[0][:10], '0123456701')
3193        self.assertEqual(decoded[0][-20:],
3194                         '6701234567'
3195                         '\udc85\udc86\udcea\udceb\udcec'
3196                         '\udcef\udcfc\udcfd\udcfe\udcff')
3197
3198    @support.bigmemtest(size=2**31, memuse=6, dry_run=False)
3199    def test_large_utf8_input(self, size):
3200        # Test input longer than INT_MAX.
3201        # Input should contain a decodable multi-byte character
3202        # surrounding INT_MAX
3203        encoded = (b'0123456\xed\x84\x80' * (size//8))
3204        self.assertEqual(len(encoded), size // 8 * 10)
3205        decoded = codecs.code_page_decode(65001, encoded, 'ignore', True)
3206        self.assertEqual(decoded[1], len(encoded))
3207        del encoded
3208        self.assertEqual(len(decoded[0]), size)
3209        self.assertEqual(decoded[0][:10], '0123456\ud10001')
3210        self.assertEqual(decoded[0][-11:], '56\ud1000123456\ud100')
3211
3212
3213class ASCIITest(unittest.TestCase):
3214    def test_encode(self):
3215        self.assertEqual('abc123'.encode('ascii'), b'abc123')
3216
3217    def test_encode_error(self):
3218        for data, error_handler, expected in (
3219            ('[\x80\xff\u20ac]', 'ignore', b'[]'),
3220            ('[\x80\xff\u20ac]', 'replace', b'[???]'),
3221            ('[\x80\xff\u20ac]', 'xmlcharrefreplace', b'[&#128;&#255;&#8364;]'),
3222            ('[\x80\xff\u20ac\U000abcde]', 'backslashreplace',
3223             b'[\\x80\\xff\\u20ac\\U000abcde]'),
3224            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3225        ):
3226            with self.subTest(data=data, error_handler=error_handler,
3227                              expected=expected):
3228                self.assertEqual(data.encode('ascii', error_handler),
3229                                 expected)
3230
3231    def test_encode_surrogateescape_error(self):
3232        with self.assertRaises(UnicodeEncodeError):
3233            # the first character can be decoded, but not the second
3234            '\udc80\xff'.encode('ascii', 'surrogateescape')
3235
3236    def test_decode(self):
3237        self.assertEqual(b'abc'.decode('ascii'), 'abc')
3238
3239    def test_decode_error(self):
3240        for data, error_handler, expected in (
3241            (b'[\x80\xff]', 'ignore', '[]'),
3242            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
3243            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
3244            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
3245        ):
3246            with self.subTest(data=data, error_handler=error_handler,
3247                              expected=expected):
3248                self.assertEqual(data.decode('ascii', error_handler),
3249                                 expected)
3250
3251
3252class Latin1Test(unittest.TestCase):
3253    def test_encode(self):
3254        for data, expected in (
3255            ('abc', b'abc'),
3256            ('\x80\xe9\xff', b'\x80\xe9\xff'),
3257        ):
3258            with self.subTest(data=data, expected=expected):
3259                self.assertEqual(data.encode('latin1'), expected)
3260
3261    def test_encode_errors(self):
3262        for data, error_handler, expected in (
3263            ('[\u20ac\udc80]', 'ignore', b'[]'),
3264            ('[\u20ac\udc80]', 'replace', b'[??]'),
3265            ('[\u20ac\U000abcde]', 'backslashreplace',
3266             b'[\\u20ac\\U000abcde]'),
3267            ('[\u20ac\udc80]', 'xmlcharrefreplace', b'[&#8364;&#56448;]'),
3268            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3269        ):
3270            with self.subTest(data=data, error_handler=error_handler,
3271                              expected=expected):
3272                self.assertEqual(data.encode('latin1', error_handler),
3273                                 expected)
3274
3275    def test_encode_surrogateescape_error(self):
3276        with self.assertRaises(UnicodeEncodeError):
3277            # the first character can be decoded, but not the second
3278            '\udc80\u20ac'.encode('latin1', 'surrogateescape')
3279
3280    def test_decode(self):
3281        for data, expected in (
3282            (b'abc', 'abc'),
3283            (b'[\x80\xff]', '[\x80\xff]'),
3284        ):
3285            with self.subTest(data=data, expected=expected):
3286                self.assertEqual(data.decode('latin1'), expected)
3287
3288
3289class StreamRecoderTest(unittest.TestCase):
3290    def test_writelines(self):
3291        bio = io.BytesIO()
3292        codec = codecs.lookup('ascii')
3293        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3294                                  encodings.ascii.StreamReader, encodings.ascii.StreamWriter)
3295        sr.writelines([b'a', b'b'])
3296        self.assertEqual(bio.getvalue(), b'ab')
3297
3298    def test_write(self):
3299        bio = io.BytesIO()
3300        codec = codecs.lookup('latin1')
3301        # Recode from Latin-1 to utf-8.
3302        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3303                                  encodings.utf_8.StreamReader, encodings.utf_8.StreamWriter)
3304
3305        text = 'àñé'
3306        sr.write(text.encode('latin1'))
3307        self.assertEqual(bio.getvalue(), text.encode('utf-8'))
3308
3309    def test_seeking_read(self):
3310        bio = io.BytesIO('line1\nline2\nline3\n'.encode('utf-16-le'))
3311        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3312
3313        self.assertEqual(sr.readline(), b'line1\n')
3314        sr.seek(0)
3315        self.assertEqual(sr.readline(), b'line1\n')
3316        self.assertEqual(sr.readline(), b'line2\n')
3317        self.assertEqual(sr.readline(), b'line3\n')
3318        self.assertEqual(sr.readline(), b'')
3319
3320    def test_seeking_write(self):
3321        bio = io.BytesIO('123456789\n'.encode('utf-16-le'))
3322        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3323
3324        # Test that seek() only resets its internal buffer when offset
3325        # and whence are zero.
3326        sr.seek(2)
3327        sr.write(b'\nabc\n')
3328        self.assertEqual(sr.readline(), b'789\n')
3329        sr.seek(0)
3330        self.assertEqual(sr.readline(), b'1\n')
3331        self.assertEqual(sr.readline(), b'abc\n')
3332        self.assertEqual(sr.readline(), b'789\n')
3333
3334
3335@unittest.skipIf(_testcapi is None, 'need _testcapi module')
3336class LocaleCodecTest(unittest.TestCase):
3337    """
3338    Test indirectly _Py_DecodeUTF8Ex() and _Py_EncodeUTF8Ex().
3339    """
3340    ENCODING = sys.getfilesystemencoding()
3341    STRINGS = ("ascii", "ulatin1:\xa7\xe9",
3342               "u255:\xff",
3343               "UCS:\xe9\u20ac\U0010ffff",
3344               "surrogates:\uDC80\uDCFF")
3345    BYTES_STRINGS = (b"blatin1:\xa7\xe9", b"b255:\xff")
3346    SURROGATES = "\uDC80\uDCFF"
3347
3348    def encode(self, text, errors="strict"):
3349        return _testcapi.EncodeLocaleEx(text, 0, errors)
3350
3351    def check_encode_strings(self, errors):
3352        for text in self.STRINGS:
3353            with self.subTest(text=text):
3354                try:
3355                    expected = text.encode(self.ENCODING, errors)
3356                except UnicodeEncodeError:
3357                    with self.assertRaises(RuntimeError) as cm:
3358                        self.encode(text, errors)
3359                    errmsg = str(cm.exception)
3360                    self.assertRegex(errmsg, r"encode error: pos=[0-9]+, reason=")
3361                else:
3362                    encoded = self.encode(text, errors)
3363                    self.assertEqual(encoded, expected)
3364
3365    def test_encode_strict(self):
3366        self.check_encode_strings("strict")
3367
3368    def test_encode_surrogateescape(self):
3369        self.check_encode_strings("surrogateescape")
3370
3371    def test_encode_surrogatepass(self):
3372        try:
3373            self.encode('', 'surrogatepass')
3374        except ValueError as exc:
3375            if str(exc) == 'unsupported error handler':
3376                self.skipTest(f"{self.ENCODING!r} encoder doesn't support "
3377                              f"surrogatepass error handler")
3378            else:
3379                raise
3380
3381        self.check_encode_strings("surrogatepass")
3382
3383    def test_encode_unsupported_error_handler(self):
3384        with self.assertRaises(ValueError) as cm:
3385            self.encode('', 'backslashreplace')
3386        self.assertEqual(str(cm.exception), 'unsupported error handler')
3387
3388    def decode(self, encoded, errors="strict"):
3389        return _testcapi.DecodeLocaleEx(encoded, 0, errors)
3390
3391    def check_decode_strings(self, errors):
3392        is_utf8 = (self.ENCODING == "utf-8")
3393        if is_utf8:
3394            encode_errors = 'surrogateescape'
3395        else:
3396            encode_errors = 'strict'
3397
3398        strings = list(self.BYTES_STRINGS)
3399        for text in self.STRINGS:
3400            try:
3401                encoded = text.encode(self.ENCODING, encode_errors)
3402                if encoded not in strings:
3403                    strings.append(encoded)
3404            except UnicodeEncodeError:
3405                encoded = None
3406
3407            if is_utf8:
3408                encoded2 = text.encode(self.ENCODING, 'surrogatepass')
3409                if encoded2 != encoded:
3410                    strings.append(encoded2)
3411
3412        for encoded in strings:
3413            with self.subTest(encoded=encoded):
3414                try:
3415                    expected = encoded.decode(self.ENCODING, errors)
3416                except UnicodeDecodeError:
3417                    with self.assertRaises(RuntimeError) as cm:
3418                        self.decode(encoded, errors)
3419                    errmsg = str(cm.exception)
3420                    self.assertTrue(errmsg.startswith("decode error: "), errmsg)
3421                else:
3422                    decoded = self.decode(encoded, errors)
3423                    self.assertEqual(decoded, expected)
3424
3425    def test_decode_strict(self):
3426        self.check_decode_strings("strict")
3427
3428    def test_decode_surrogateescape(self):
3429        self.check_decode_strings("surrogateescape")
3430
3431    def test_decode_surrogatepass(self):
3432        try:
3433            self.decode(b'', 'surrogatepass')
3434        except ValueError as exc:
3435            if str(exc) == 'unsupported error handler':
3436                self.skipTest(f"{self.ENCODING!r} decoder doesn't support "
3437                              f"surrogatepass error handler")
3438            else:
3439                raise
3440
3441        self.check_decode_strings("surrogatepass")
3442
3443    def test_decode_unsupported_error_handler(self):
3444        with self.assertRaises(ValueError) as cm:
3445            self.decode(b'', 'backslashreplace')
3446        self.assertEqual(str(cm.exception), 'unsupported error handler')
3447
3448
3449class Rot13Test(unittest.TestCase):
3450    """Test the educational ROT-13 codec."""
3451    def test_encode(self):
3452        ciphertext = codecs.encode("Caesar liked ciphers", 'rot-13')
3453        self.assertEqual(ciphertext, 'Pnrfne yvxrq pvcuref')
3454
3455    def test_decode(self):
3456        plaintext = codecs.decode('Rg gh, Oehgr?', 'rot-13')
3457        self.assertEqual(plaintext, 'Et tu, Brute?')
3458
3459    def test_incremental_encode(self):
3460        encoder = codecs.getincrementalencoder('rot-13')()
3461        ciphertext = encoder.encode('ABBA nag Cheryl Baker')
3462        self.assertEqual(ciphertext, 'NOON ant Purely Onxre')
3463
3464    def test_incremental_decode(self):
3465        decoder = codecs.getincrementaldecoder('rot-13')()
3466        plaintext = decoder.decode('terra Ares envy tha')
3467        self.assertEqual(plaintext, 'green Nerf rail gun')
3468
3469
3470class Rot13UtilTest(unittest.TestCase):
3471    """Test the ROT-13 codec via rot13 function,
3472    i.e. the user has done something like:
3473    $ echo "Hello World" | python -m encodings.rot_13
3474    """
3475    def test_rot13_func(self):
3476        infile = io.StringIO('Gb or, be abg gb or, gung vf gur dhrfgvba')
3477        outfile = io.StringIO()
3478        encodings.rot_13.rot13(infile, outfile)
3479        outfile.seek(0)
3480        plain_text = outfile.read()
3481        self.assertEqual(
3482            plain_text,
3483            'To be, or not to be, that is the question')
3484
3485
3486class CodecNameNormalizationTest(unittest.TestCase):
3487    """Test codec name normalization"""
3488    def test_codecs_lookup(self):
3489        FOUND = (1, 2, 3, 4)
3490        NOT_FOUND = (None, None, None, None)
3491        def search_function(encoding):
3492            if encoding == "aaa_8":
3493                return FOUND
3494            else:
3495                return NOT_FOUND
3496
3497        codecs.register(search_function)
3498        self.addCleanup(codecs.unregister, search_function)
3499        self.assertEqual(FOUND, codecs.lookup('aaa_8'))
3500        self.assertEqual(FOUND, codecs.lookup('AAA-8'))
3501        self.assertEqual(FOUND, codecs.lookup('AAA---8'))
3502        self.assertEqual(FOUND, codecs.lookup('AAA   8'))
3503        self.assertEqual(FOUND, codecs.lookup('aaa\xe9\u20ac-8'))
3504        self.assertEqual(NOT_FOUND, codecs.lookup('AAA.8'))
3505        self.assertEqual(NOT_FOUND, codecs.lookup('AAA...8'))
3506        self.assertEqual(NOT_FOUND, codecs.lookup('BBB-8'))
3507        self.assertEqual(NOT_FOUND, codecs.lookup('BBB.8'))
3508        self.assertEqual(NOT_FOUND, codecs.lookup('a\xe9\u20ac-8'))
3509
3510    def test_encodings_normalize_encoding(self):
3511        # encodings.normalize_encoding() ignores non-ASCII characters.
3512        normalize = encodings.normalize_encoding
3513        self.assertEqual(normalize('utf_8'), 'utf_8')
3514        self.assertEqual(normalize('utf\xE9\u20AC\U0010ffff-8'), 'utf_8')
3515        self.assertEqual(normalize('utf   8'), 'utf_8')
3516        # encodings.normalize_encoding() doesn't convert
3517        # characters to lower case.
3518        self.assertEqual(normalize('UTF 8'), 'UTF_8')
3519        self.assertEqual(normalize('utf.8'), 'utf.8')
3520        self.assertEqual(normalize('utf...8'), 'utf...8')
3521
3522
3523if __name__ == "__main__":
3524    unittest.main()
3525