• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import codecs
2import contextlib
3import io
4import locale
5import sys
6import unittest
7import encodings
8from unittest import mock
9
10from test import support
11from test.support import os_helper
12from test.support import warnings_helper
13
14try:
15    import _testcapi
16except ImportError:
17    _testcapi = None
18
19try:
20    import ctypes
21except ImportError:
22    ctypes = None
23    SIZEOF_WCHAR_T = -1
24else:
25    SIZEOF_WCHAR_T = ctypes.sizeof(ctypes.c_wchar)
26
27def coding_checker(self, coder):
28    def check(input, expect):
29        self.assertEqual(coder(input), (expect, len(input)))
30    return check
31
32# On small versions of Windows like Windows IoT or Windows Nano Server not all codepages are present
33def is_code_page_present(cp):
34    from ctypes import POINTER, WINFUNCTYPE, WinDLL
35    from ctypes.wintypes import BOOL, UINT, BYTE, WCHAR, UINT, DWORD
36
37    MAX_LEADBYTES = 12  # 5 ranges, 2 bytes ea., 0 term.
38    MAX_DEFAULTCHAR = 2 # single or double byte
39    MAX_PATH = 260
40    class CPINFOEXW(ctypes.Structure):
41        _fields_ = [("MaxCharSize", UINT),
42                    ("DefaultChar", BYTE*MAX_DEFAULTCHAR),
43                    ("LeadByte", BYTE*MAX_LEADBYTES),
44                    ("UnicodeDefaultChar", WCHAR),
45                    ("CodePage", UINT),
46                    ("CodePageName", WCHAR*MAX_PATH)]
47
48    prototype = WINFUNCTYPE(BOOL, UINT, DWORD, POINTER(CPINFOEXW))
49    GetCPInfoEx = prototype(("GetCPInfoExW", WinDLL("kernel32")))
50    info = CPINFOEXW()
51    return GetCPInfoEx(cp, 0, info)
52
53class Queue(object):
54    """
55    queue: write bytes at one end, read bytes from the other end
56    """
57    def __init__(self, buffer):
58        self._buffer = buffer
59
60    def write(self, chars):
61        self._buffer += chars
62
63    def read(self, size=-1):
64        if size<0:
65            s = self._buffer
66            self._buffer = self._buffer[:0] # make empty
67            return s
68        else:
69            s = self._buffer[:size]
70            self._buffer = self._buffer[size:]
71            return s
72
73
74class MixInCheckStateHandling:
75    def check_state_handling_decode(self, encoding, u, s):
76        for i in range(len(s)+1):
77            d = codecs.getincrementaldecoder(encoding)()
78            part1 = d.decode(s[:i])
79            state = d.getstate()
80            self.assertIsInstance(state[1], int)
81            # Check that the condition stated in the documentation for
82            # IncrementalDecoder.getstate() holds
83            if not state[1]:
84                # reset decoder to the default state without anything buffered
85                d.setstate((state[0][:0], 0))
86                # Feeding the previous input may not produce any output
87                self.assertTrue(not d.decode(state[0]))
88                # The decoder must return to the same state
89                self.assertEqual(state, d.getstate())
90            # Create a new decoder and set it to the state
91            # we extracted from the old one
92            d = codecs.getincrementaldecoder(encoding)()
93            d.setstate(state)
94            part2 = d.decode(s[i:], True)
95            self.assertEqual(u, part1+part2)
96
97    def check_state_handling_encode(self, encoding, u, s):
98        for i in range(len(u)+1):
99            d = codecs.getincrementalencoder(encoding)()
100            part1 = d.encode(u[:i])
101            state = d.getstate()
102            d = codecs.getincrementalencoder(encoding)()
103            d.setstate(state)
104            part2 = d.encode(u[i:], True)
105            self.assertEqual(s, part1+part2)
106
107
108class ReadTest(MixInCheckStateHandling):
109    def check_partial(self, input, partialresults):
110        # get a StreamReader for the encoding and feed the bytestring version
111        # of input to the reader byte by byte. Read everything available from
112        # the StreamReader and check that the results equal the appropriate
113        # entries from partialresults.
114        q = Queue(b"")
115        r = codecs.getreader(self.encoding)(q)
116        result = ""
117        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
118            q.write(bytes([c]))
119            result += r.read()
120            self.assertEqual(result, partialresult)
121        # check that there's nothing left in the buffers
122        self.assertEqual(r.read(), "")
123        self.assertEqual(r.bytebuffer, b"")
124
125        # do the check again, this time using an incremental decoder
126        d = codecs.getincrementaldecoder(self.encoding)()
127        result = ""
128        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
129            result += d.decode(bytes([c]))
130            self.assertEqual(result, partialresult)
131        # check that there's nothing left in the buffers
132        self.assertEqual(d.decode(b"", True), "")
133        self.assertEqual(d.buffer, b"")
134
135        # Check whether the reset method works properly
136        d.reset()
137        result = ""
138        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
139            result += d.decode(bytes([c]))
140            self.assertEqual(result, partialresult)
141        # check that there's nothing left in the buffers
142        self.assertEqual(d.decode(b"", True), "")
143        self.assertEqual(d.buffer, b"")
144
145        # check iterdecode()
146        encoded = input.encode(self.encoding)
147        self.assertEqual(
148            input,
149            "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding))
150        )
151
152    def test_readline(self):
153        def getreader(input):
154            stream = io.BytesIO(input.encode(self.encoding))
155            return codecs.getreader(self.encoding)(stream)
156
157        def readalllines(input, keepends=True, size=None):
158            reader = getreader(input)
159            lines = []
160            while True:
161                line = reader.readline(size=size, keepends=keepends)
162                if not line:
163                    break
164                lines.append(line)
165            return "|".join(lines)
166
167        s = "foo\nbar\r\nbaz\rspam\u2028eggs"
168        sexpected = "foo\n|bar\r\n|baz\r|spam\u2028|eggs"
169        sexpectednoends = "foo|bar|baz|spam|eggs"
170        self.assertEqual(readalllines(s, True), sexpected)
171        self.assertEqual(readalllines(s, False), sexpectednoends)
172        self.assertEqual(readalllines(s, True, 10), sexpected)
173        self.assertEqual(readalllines(s, False, 10), sexpectednoends)
174
175        lineends = ("\n", "\r\n", "\r", "\u2028")
176        # Test long lines (multiple calls to read() in readline())
177        vw = []
178        vwo = []
179        for (i, lineend) in enumerate(lineends):
180            vw.append((i*200+200)*"\u3042" + lineend)
181            vwo.append((i*200+200)*"\u3042")
182        self.assertEqual(readalllines("".join(vw), True), "|".join(vw))
183        self.assertEqual(readalllines("".join(vw), False), "|".join(vwo))
184
185        # Test lines where the first read might end with \r, so the
186        # reader has to look ahead whether this is a lone \r or a \r\n
187        for size in range(80):
188            for lineend in lineends:
189                s = 10*(size*"a" + lineend + "xxx\n")
190                reader = getreader(s)
191                for i in range(10):
192                    self.assertEqual(
193                        reader.readline(keepends=True),
194                        size*"a" + lineend,
195                    )
196                    self.assertEqual(
197                        reader.readline(keepends=True),
198                        "xxx\n",
199                    )
200                reader = getreader(s)
201                for i in range(10):
202                    self.assertEqual(
203                        reader.readline(keepends=False),
204                        size*"a",
205                    )
206                    self.assertEqual(
207                        reader.readline(keepends=False),
208                        "xxx",
209                    )
210
211    def test_mixed_readline_and_read(self):
212        lines = ["Humpty Dumpty sat on a wall,\n",
213                 "Humpty Dumpty had a great fall.\r\n",
214                 "All the king's horses and all the king's men\r",
215                 "Couldn't put Humpty together again."]
216        data = ''.join(lines)
217        def getreader():
218            stream = io.BytesIO(data.encode(self.encoding))
219            return codecs.getreader(self.encoding)(stream)
220
221        # Issue #8260: Test readline() followed by read()
222        f = getreader()
223        self.assertEqual(f.readline(), lines[0])
224        self.assertEqual(f.read(), ''.join(lines[1:]))
225        self.assertEqual(f.read(), '')
226
227        # Issue #32110: Test readline() followed by read(n)
228        f = getreader()
229        self.assertEqual(f.readline(), lines[0])
230        self.assertEqual(f.read(1), lines[1][0])
231        self.assertEqual(f.read(0), '')
232        self.assertEqual(f.read(100), data[len(lines[0]) + 1:][:100])
233
234        # Issue #16636: Test readline() followed by readlines()
235        f = getreader()
236        self.assertEqual(f.readline(), lines[0])
237        self.assertEqual(f.readlines(), lines[1:])
238        self.assertEqual(f.read(), '')
239
240        # Test read(n) followed by read()
241        f = getreader()
242        self.assertEqual(f.read(size=40, chars=5), data[:5])
243        self.assertEqual(f.read(), data[5:])
244        self.assertEqual(f.read(), '')
245
246        # Issue #32110: Test read(n) followed by read(n)
247        f = getreader()
248        self.assertEqual(f.read(size=40, chars=5), data[:5])
249        self.assertEqual(f.read(1), data[5])
250        self.assertEqual(f.read(0), '')
251        self.assertEqual(f.read(100), data[6:106])
252
253        # Issue #12446: Test read(n) followed by readlines()
254        f = getreader()
255        self.assertEqual(f.read(size=40, chars=5), data[:5])
256        self.assertEqual(f.readlines(), [lines[0][5:]] + lines[1:])
257        self.assertEqual(f.read(), '')
258
259    def test_bug1175396(self):
260        s = [
261            '<%!--===================================================\r\n',
262            '    BLOG index page: show recent articles,\r\n',
263            '    today\'s articles, or articles of a specific date.\r\n',
264            '========================================================--%>\r\n',
265            '<%@inputencoding="ISO-8859-1"%>\r\n',
266            '<%@pagetemplate=TEMPLATE.y%>\r\n',
267            '<%@import=import frog.util, frog%>\r\n',
268            '<%@import=import frog.objects%>\r\n',
269            '<%@import=from frog.storageerrors import StorageError%>\r\n',
270            '<%\r\n',
271            '\r\n',
272            'import logging\r\n',
273            'log=logging.getLogger("Snakelets.logger")\r\n',
274            '\r\n',
275            '\r\n',
276            'user=self.SessionCtx.user\r\n',
277            'storageEngine=self.SessionCtx.storageEngine\r\n',
278            '\r\n',
279            '\r\n',
280            'def readArticlesFromDate(date, count=None):\r\n',
281            '    entryids=storageEngine.listBlogEntries(date)\r\n',
282            '    entryids.reverse() # descending\r\n',
283            '    if count:\r\n',
284            '        entryids=entryids[:count]\r\n',
285            '    try:\r\n',
286            '        return [ frog.objects.BlogEntry.load(storageEngine, date, Id) for Id in entryids ]\r\n',
287            '    except StorageError,x:\r\n',
288            '        log.error("Error loading articles: "+str(x))\r\n',
289            '        self.abort("cannot load articles")\r\n',
290            '\r\n',
291            'showdate=None\r\n',
292            '\r\n',
293            'arg=self.Request.getArg()\r\n',
294            'if arg=="today":\r\n',
295            '    #-------------------- TODAY\'S ARTICLES\r\n',
296            '    self.write("<h2>Today\'s articles</h2>")\r\n',
297            '    showdate = frog.util.isodatestr() \r\n',
298            '    entries = readArticlesFromDate(showdate)\r\n',
299            'elif arg=="active":\r\n',
300            '    #-------------------- ACTIVE ARTICLES redirect\r\n',
301            '    self.Yredirect("active.y")\r\n',
302            'elif arg=="login":\r\n',
303            '    #-------------------- LOGIN PAGE redirect\r\n',
304            '    self.Yredirect("login.y")\r\n',
305            'elif arg=="date":\r\n',
306            '    #-------------------- ARTICLES OF A SPECIFIC DATE\r\n',
307            '    showdate = self.Request.getParameter("date")\r\n',
308            '    self.write("<h2>Articles written on %s</h2>"% frog.util.mediumdatestr(showdate))\r\n',
309            '    entries = readArticlesFromDate(showdate)\r\n',
310            'else:\r\n',
311            '    #-------------------- RECENT ARTICLES\r\n',
312            '    self.write("<h2>Recent articles</h2>")\r\n',
313            '    dates=storageEngine.listBlogEntryDates()\r\n',
314            '    if dates:\r\n',
315            '        entries=[]\r\n',
316            '        SHOWAMOUNT=10\r\n',
317            '        for showdate in dates:\r\n',
318            '            entries.extend( readArticlesFromDate(showdate, SHOWAMOUNT-len(entries)) )\r\n',
319            '            if len(entries)>=SHOWAMOUNT:\r\n',
320            '                break\r\n',
321            '                \r\n',
322        ]
323        stream = io.BytesIO("".join(s).encode(self.encoding))
324        reader = codecs.getreader(self.encoding)(stream)
325        for (i, line) in enumerate(reader):
326            self.assertEqual(line, s[i])
327
328    def test_readlinequeue(self):
329        q = Queue(b"")
330        writer = codecs.getwriter(self.encoding)(q)
331        reader = codecs.getreader(self.encoding)(q)
332
333        # No lineends
334        writer.write("foo\r")
335        self.assertEqual(reader.readline(keepends=False), "foo")
336        writer.write("\nbar\r")
337        self.assertEqual(reader.readline(keepends=False), "")
338        self.assertEqual(reader.readline(keepends=False), "bar")
339        writer.write("baz")
340        self.assertEqual(reader.readline(keepends=False), "baz")
341        self.assertEqual(reader.readline(keepends=False), "")
342
343        # Lineends
344        writer.write("foo\r")
345        self.assertEqual(reader.readline(keepends=True), "foo\r")
346        writer.write("\nbar\r")
347        self.assertEqual(reader.readline(keepends=True), "\n")
348        self.assertEqual(reader.readline(keepends=True), "bar\r")
349        writer.write("baz")
350        self.assertEqual(reader.readline(keepends=True), "baz")
351        self.assertEqual(reader.readline(keepends=True), "")
352        writer.write("foo\r\n")
353        self.assertEqual(reader.readline(keepends=True), "foo\r\n")
354
355    def test_bug1098990_a(self):
356        s1 = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy\r\n"
357        s2 = "offending line: ladfj askldfj klasdj fskla dfzaskdj fasklfj laskd fjasklfzzzzaa%whereisthis!!!\r\n"
358        s3 = "next line.\r\n"
359
360        s = (s1+s2+s3).encode(self.encoding)
361        stream = io.BytesIO(s)
362        reader = codecs.getreader(self.encoding)(stream)
363        self.assertEqual(reader.readline(), s1)
364        self.assertEqual(reader.readline(), s2)
365        self.assertEqual(reader.readline(), s3)
366        self.assertEqual(reader.readline(), "")
367
368    def test_bug1098990_b(self):
369        s1 = "aaaaaaaaaaaaaaaaaaaaaaaa\r\n"
370        s2 = "bbbbbbbbbbbbbbbbbbbbbbbb\r\n"
371        s3 = "stillokay:bbbbxx\r\n"
372        s4 = "broken!!!!badbad\r\n"
373        s5 = "againokay.\r\n"
374
375        s = (s1+s2+s3+s4+s5).encode(self.encoding)
376        stream = io.BytesIO(s)
377        reader = codecs.getreader(self.encoding)(stream)
378        self.assertEqual(reader.readline(), s1)
379        self.assertEqual(reader.readline(), s2)
380        self.assertEqual(reader.readline(), s3)
381        self.assertEqual(reader.readline(), s4)
382        self.assertEqual(reader.readline(), s5)
383        self.assertEqual(reader.readline(), "")
384
385    ill_formed_sequence_replace = "\ufffd"
386
387    def test_lone_surrogates(self):
388        self.assertRaises(UnicodeEncodeError, "\ud800".encode, self.encoding)
389        self.assertEqual("[\uDC80]".encode(self.encoding, "backslashreplace"),
390                         "[\\udc80]".encode(self.encoding))
391        self.assertEqual("[\uDC80]".encode(self.encoding, "namereplace"),
392                         "[\\udc80]".encode(self.encoding))
393        self.assertEqual("[\uDC80]".encode(self.encoding, "xmlcharrefreplace"),
394                         "[&#56448;]".encode(self.encoding))
395        self.assertEqual("[\uDC80]".encode(self.encoding, "ignore"),
396                         "[]".encode(self.encoding))
397        self.assertEqual("[\uDC80]".encode(self.encoding, "replace"),
398                         "[?]".encode(self.encoding))
399
400        # sequential surrogate characters
401        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "ignore"),
402                         "[]".encode(self.encoding))
403        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "replace"),
404                         "[??]".encode(self.encoding))
405
406        bom = "".encode(self.encoding)
407        for before, after in [("\U00010fff", "A"), ("[", "]"),
408                              ("A", "\U00010fff")]:
409            before_sequence = before.encode(self.encoding)[len(bom):]
410            after_sequence = after.encode(self.encoding)[len(bom):]
411            test_string = before + "\uDC80" + after
412            test_sequence = (bom + before_sequence +
413                             self.ill_formed_sequence + after_sequence)
414            self.assertRaises(UnicodeDecodeError, test_sequence.decode,
415                              self.encoding)
416            self.assertEqual(test_string.encode(self.encoding,
417                                                "surrogatepass"),
418                             test_sequence)
419            self.assertEqual(test_sequence.decode(self.encoding,
420                                                  "surrogatepass"),
421                             test_string)
422            self.assertEqual(test_sequence.decode(self.encoding, "ignore"),
423                             before + after)
424            self.assertEqual(test_sequence.decode(self.encoding, "replace"),
425                             before + self.ill_formed_sequence_replace + after)
426            backslashreplace = ''.join('\\x%02x' % b
427                                       for b in self.ill_formed_sequence)
428            self.assertEqual(test_sequence.decode(self.encoding, "backslashreplace"),
429                             before + backslashreplace + after)
430
431    def test_incremental_surrogatepass(self):
432        # Test incremental decoder for surrogatepass handler:
433        # see issue #24214
434        # High surrogate
435        data = '\uD901'.encode(self.encoding, 'surrogatepass')
436        for i in range(1, len(data)):
437            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
438            self.assertEqual(dec.decode(data[:i]), '')
439            self.assertEqual(dec.decode(data[i:], True), '\uD901')
440        # Low surrogate
441        data = '\uDC02'.encode(self.encoding, 'surrogatepass')
442        for i in range(1, len(data)):
443            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
444            self.assertEqual(dec.decode(data[:i]), '')
445            self.assertEqual(dec.decode(data[i:]), '\uDC02')
446
447
448class UTF32Test(ReadTest, unittest.TestCase):
449    encoding = "utf-32"
450    if sys.byteorder == 'little':
451        ill_formed_sequence = b"\x80\xdc\x00\x00"
452    else:
453        ill_formed_sequence = b"\x00\x00\xdc\x80"
454
455    spamle = (b'\xff\xfe\x00\x00'
456              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00'
457              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00')
458    spambe = (b'\x00\x00\xfe\xff'
459              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m'
460              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m')
461
462    def test_only_one_bom(self):
463        _,_,reader,writer = codecs.lookup(self.encoding)
464        # encode some stream
465        s = io.BytesIO()
466        f = writer(s)
467        f.write("spam")
468        f.write("spam")
469        d = s.getvalue()
470        # check whether there is exactly one BOM in it
471        self.assertTrue(d == self.spamle or d == self.spambe)
472        # try to read it back
473        s = io.BytesIO(d)
474        f = reader(s)
475        self.assertEqual(f.read(), "spamspam")
476
477    def test_badbom(self):
478        s = io.BytesIO(4*b"\xff")
479        f = codecs.getreader(self.encoding)(s)
480        self.assertRaises(UnicodeError, f.read)
481
482        s = io.BytesIO(8*b"\xff")
483        f = codecs.getreader(self.encoding)(s)
484        self.assertRaises(UnicodeError, f.read)
485
486    def test_partial(self):
487        self.check_partial(
488            "\x00\xff\u0100\uffff\U00010000",
489            [
490                "", # first byte of BOM read
491                "", # second byte of BOM read
492                "", # third byte of BOM read
493                "", # fourth byte of BOM read => byteorder known
494                "",
495                "",
496                "",
497                "\x00",
498                "\x00",
499                "\x00",
500                "\x00",
501                "\x00\xff",
502                "\x00\xff",
503                "\x00\xff",
504                "\x00\xff",
505                "\x00\xff\u0100",
506                "\x00\xff\u0100",
507                "\x00\xff\u0100",
508                "\x00\xff\u0100",
509                "\x00\xff\u0100\uffff",
510                "\x00\xff\u0100\uffff",
511                "\x00\xff\u0100\uffff",
512                "\x00\xff\u0100\uffff",
513                "\x00\xff\u0100\uffff\U00010000",
514            ]
515        )
516
517    def test_handlers(self):
518        self.assertEqual(('\ufffd', 1),
519                         codecs.utf_32_decode(b'\x01', 'replace', True))
520        self.assertEqual(('', 1),
521                         codecs.utf_32_decode(b'\x01', 'ignore', True))
522
523    def test_errors(self):
524        self.assertRaises(UnicodeDecodeError, codecs.utf_32_decode,
525                          b"\xff", "strict", True)
526
527    def test_decoder_state(self):
528        self.check_state_handling_decode(self.encoding,
529                                         "spamspam", self.spamle)
530        self.check_state_handling_decode(self.encoding,
531                                         "spamspam", self.spambe)
532
533    def test_issue8941(self):
534        # Issue #8941: insufficient result allocation when decoding into
535        # surrogate pairs on UCS-2 builds.
536        encoded_le = b'\xff\xfe\x00\x00' + b'\x00\x00\x01\x00' * 1024
537        self.assertEqual('\U00010000' * 1024,
538                         codecs.utf_32_decode(encoded_le)[0])
539        encoded_be = b'\x00\x00\xfe\xff' + b'\x00\x01\x00\x00' * 1024
540        self.assertEqual('\U00010000' * 1024,
541                         codecs.utf_32_decode(encoded_be)[0])
542
543
544class UTF32LETest(ReadTest, unittest.TestCase):
545    encoding = "utf-32-le"
546    ill_formed_sequence = b"\x80\xdc\x00\x00"
547
548    def test_partial(self):
549        self.check_partial(
550            "\x00\xff\u0100\uffff\U00010000",
551            [
552                "",
553                "",
554                "",
555                "\x00",
556                "\x00",
557                "\x00",
558                "\x00",
559                "\x00\xff",
560                "\x00\xff",
561                "\x00\xff",
562                "\x00\xff",
563                "\x00\xff\u0100",
564                "\x00\xff\u0100",
565                "\x00\xff\u0100",
566                "\x00\xff\u0100",
567                "\x00\xff\u0100\uffff",
568                "\x00\xff\u0100\uffff",
569                "\x00\xff\u0100\uffff",
570                "\x00\xff\u0100\uffff",
571                "\x00\xff\u0100\uffff\U00010000",
572            ]
573        )
574
575    def test_simple(self):
576        self.assertEqual("\U00010203".encode(self.encoding), b"\x03\x02\x01\x00")
577
578    def test_errors(self):
579        self.assertRaises(UnicodeDecodeError, codecs.utf_32_le_decode,
580                          b"\xff", "strict", True)
581
582    def test_issue8941(self):
583        # Issue #8941: insufficient result allocation when decoding into
584        # surrogate pairs on UCS-2 builds.
585        encoded = b'\x00\x00\x01\x00' * 1024
586        self.assertEqual('\U00010000' * 1024,
587                         codecs.utf_32_le_decode(encoded)[0])
588
589
590class UTF32BETest(ReadTest, unittest.TestCase):
591    encoding = "utf-32-be"
592    ill_formed_sequence = b"\x00\x00\xdc\x80"
593
594    def test_partial(self):
595        self.check_partial(
596            "\x00\xff\u0100\uffff\U00010000",
597            [
598                "",
599                "",
600                "",
601                "\x00",
602                "\x00",
603                "\x00",
604                "\x00",
605                "\x00\xff",
606                "\x00\xff",
607                "\x00\xff",
608                "\x00\xff",
609                "\x00\xff\u0100",
610                "\x00\xff\u0100",
611                "\x00\xff\u0100",
612                "\x00\xff\u0100",
613                "\x00\xff\u0100\uffff",
614                "\x00\xff\u0100\uffff",
615                "\x00\xff\u0100\uffff",
616                "\x00\xff\u0100\uffff",
617                "\x00\xff\u0100\uffff\U00010000",
618            ]
619        )
620
621    def test_simple(self):
622        self.assertEqual("\U00010203".encode(self.encoding), b"\x00\x01\x02\x03")
623
624    def test_errors(self):
625        self.assertRaises(UnicodeDecodeError, codecs.utf_32_be_decode,
626                          b"\xff", "strict", True)
627
628    def test_issue8941(self):
629        # Issue #8941: insufficient result allocation when decoding into
630        # surrogate pairs on UCS-2 builds.
631        encoded = b'\x00\x01\x00\x00' * 1024
632        self.assertEqual('\U00010000' * 1024,
633                         codecs.utf_32_be_decode(encoded)[0])
634
635
636class UTF16Test(ReadTest, unittest.TestCase):
637    encoding = "utf-16"
638    if sys.byteorder == 'little':
639        ill_formed_sequence = b"\x80\xdc"
640    else:
641        ill_formed_sequence = b"\xdc\x80"
642
643    spamle = b'\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00'
644    spambe = b'\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m'
645
646    def test_only_one_bom(self):
647        _,_,reader,writer = codecs.lookup(self.encoding)
648        # encode some stream
649        s = io.BytesIO()
650        f = writer(s)
651        f.write("spam")
652        f.write("spam")
653        d = s.getvalue()
654        # check whether there is exactly one BOM in it
655        self.assertTrue(d == self.spamle or d == self.spambe)
656        # try to read it back
657        s = io.BytesIO(d)
658        f = reader(s)
659        self.assertEqual(f.read(), "spamspam")
660
661    def test_badbom(self):
662        s = io.BytesIO(b"\xff\xff")
663        f = codecs.getreader(self.encoding)(s)
664        self.assertRaises(UnicodeError, f.read)
665
666        s = io.BytesIO(b"\xff\xff\xff\xff")
667        f = codecs.getreader(self.encoding)(s)
668        self.assertRaises(UnicodeError, f.read)
669
670    def test_partial(self):
671        self.check_partial(
672            "\x00\xff\u0100\uffff\U00010000",
673            [
674                "", # first byte of BOM read
675                "", # second byte of BOM read => byteorder known
676                "",
677                "\x00",
678                "\x00",
679                "\x00\xff",
680                "\x00\xff",
681                "\x00\xff\u0100",
682                "\x00\xff\u0100",
683                "\x00\xff\u0100\uffff",
684                "\x00\xff\u0100\uffff",
685                "\x00\xff\u0100\uffff",
686                "\x00\xff\u0100\uffff",
687                "\x00\xff\u0100\uffff\U00010000",
688            ]
689        )
690
691    def test_handlers(self):
692        self.assertEqual(('\ufffd', 1),
693                         codecs.utf_16_decode(b'\x01', 'replace', True))
694        self.assertEqual(('', 1),
695                         codecs.utf_16_decode(b'\x01', 'ignore', True))
696
697    def test_errors(self):
698        self.assertRaises(UnicodeDecodeError, codecs.utf_16_decode,
699                          b"\xff", "strict", True)
700
701    def test_decoder_state(self):
702        self.check_state_handling_decode(self.encoding,
703                                         "spamspam", self.spamle)
704        self.check_state_handling_decode(self.encoding,
705                                         "spamspam", self.spambe)
706
707    def test_bug691291(self):
708        # Files are always opened in binary mode, even if no binary mode was
709        # specified.  This means that no automatic conversion of '\n' is done
710        # on reading and writing.
711        s1 = 'Hello\r\nworld\r\n'
712
713        s = s1.encode(self.encoding)
714        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
715        with open(os_helper.TESTFN, 'wb') as fp:
716            fp.write(s)
717        with warnings_helper.check_warnings(('', DeprecationWarning)):
718            reader = codecs.open(os_helper.TESTFN, 'U', encoding=self.encoding)
719        with reader:
720            self.assertEqual(reader.read(), s1)
721
722class UTF16LETest(ReadTest, unittest.TestCase):
723    encoding = "utf-16-le"
724    ill_formed_sequence = b"\x80\xdc"
725
726    def test_partial(self):
727        self.check_partial(
728            "\x00\xff\u0100\uffff\U00010000",
729            [
730                "",
731                "\x00",
732                "\x00",
733                "\x00\xff",
734                "\x00\xff",
735                "\x00\xff\u0100",
736                "\x00\xff\u0100",
737                "\x00\xff\u0100\uffff",
738                "\x00\xff\u0100\uffff",
739                "\x00\xff\u0100\uffff",
740                "\x00\xff\u0100\uffff",
741                "\x00\xff\u0100\uffff\U00010000",
742            ]
743        )
744
745    def test_errors(self):
746        tests = [
747            (b'\xff', '\ufffd'),
748            (b'A\x00Z', 'A\ufffd'),
749            (b'A\x00B\x00C\x00D\x00Z', 'ABCD\ufffd'),
750            (b'\x00\xd8', '\ufffd'),
751            (b'\x00\xd8A', '\ufffd'),
752            (b'\x00\xd8A\x00', '\ufffdA'),
753            (b'\x00\xdcA\x00', '\ufffdA'),
754        ]
755        for raw, expected in tests:
756            self.assertRaises(UnicodeDecodeError, codecs.utf_16_le_decode,
757                              raw, 'strict', True)
758            self.assertEqual(raw.decode('utf-16le', 'replace'), expected)
759
760    def test_nonbmp(self):
761        self.assertEqual("\U00010203".encode(self.encoding),
762                         b'\x00\xd8\x03\xde')
763        self.assertEqual(b'\x00\xd8\x03\xde'.decode(self.encoding),
764                         "\U00010203")
765
766class UTF16BETest(ReadTest, unittest.TestCase):
767    encoding = "utf-16-be"
768    ill_formed_sequence = b"\xdc\x80"
769
770    def test_partial(self):
771        self.check_partial(
772            "\x00\xff\u0100\uffff\U00010000",
773            [
774                "",
775                "\x00",
776                "\x00",
777                "\x00\xff",
778                "\x00\xff",
779                "\x00\xff\u0100",
780                "\x00\xff\u0100",
781                "\x00\xff\u0100\uffff",
782                "\x00\xff\u0100\uffff",
783                "\x00\xff\u0100\uffff",
784                "\x00\xff\u0100\uffff",
785                "\x00\xff\u0100\uffff\U00010000",
786            ]
787        )
788
789    def test_errors(self):
790        tests = [
791            (b'\xff', '\ufffd'),
792            (b'\x00A\xff', 'A\ufffd'),
793            (b'\x00A\x00B\x00C\x00DZ', 'ABCD\ufffd'),
794            (b'\xd8\x00', '\ufffd'),
795            (b'\xd8\x00\xdc', '\ufffd'),
796            (b'\xd8\x00\x00A', '\ufffdA'),
797            (b'\xdc\x00\x00A', '\ufffdA'),
798        ]
799        for raw, expected in tests:
800            self.assertRaises(UnicodeDecodeError, codecs.utf_16_be_decode,
801                              raw, 'strict', True)
802            self.assertEqual(raw.decode('utf-16be', 'replace'), expected)
803
804    def test_nonbmp(self):
805        self.assertEqual("\U00010203".encode(self.encoding),
806                         b'\xd8\x00\xde\x03')
807        self.assertEqual(b'\xd8\x00\xde\x03'.decode(self.encoding),
808                         "\U00010203")
809
810class UTF8Test(ReadTest, unittest.TestCase):
811    encoding = "utf-8"
812    ill_formed_sequence = b"\xed\xb2\x80"
813    ill_formed_sequence_replace = "\ufffd" * 3
814    BOM = b''
815
816    def test_partial(self):
817        self.check_partial(
818            "\x00\xff\u07ff\u0800\uffff\U00010000",
819            [
820                "\x00",
821                "\x00",
822                "\x00\xff",
823                "\x00\xff",
824                "\x00\xff\u07ff",
825                "\x00\xff\u07ff",
826                "\x00\xff\u07ff",
827                "\x00\xff\u07ff\u0800",
828                "\x00\xff\u07ff\u0800",
829                "\x00\xff\u07ff\u0800",
830                "\x00\xff\u07ff\u0800\uffff",
831                "\x00\xff\u07ff\u0800\uffff",
832                "\x00\xff\u07ff\u0800\uffff",
833                "\x00\xff\u07ff\u0800\uffff",
834                "\x00\xff\u07ff\u0800\uffff\U00010000",
835            ]
836        )
837
838    def test_decoder_state(self):
839        u = "\x00\x7f\x80\xff\u0100\u07ff\u0800\uffff\U0010ffff"
840        self.check_state_handling_decode(self.encoding,
841                                         u, u.encode(self.encoding))
842
843    def test_decode_error(self):
844        for data, error_handler, expected in (
845            (b'[\x80\xff]', 'ignore', '[]'),
846            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
847            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
848            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
849        ):
850            with self.subTest(data=data, error_handler=error_handler,
851                              expected=expected):
852                self.assertEqual(data.decode(self.encoding, error_handler),
853                                 expected)
854
855    def test_lone_surrogates(self):
856        super().test_lone_surrogates()
857        # not sure if this is making sense for
858        # UTF-16 and UTF-32
859        self.assertEqual("[\uDC80]".encode(self.encoding, "surrogateescape"),
860                         self.BOM + b'[\x80]')
861
862        with self.assertRaises(UnicodeEncodeError) as cm:
863            "[\uDC80\uD800\uDFFF]".encode(self.encoding, "surrogateescape")
864        exc = cm.exception
865        self.assertEqual(exc.object[exc.start:exc.end], '\uD800\uDFFF')
866
867    def test_surrogatepass_handler(self):
868        self.assertEqual("abc\ud800def".encode(self.encoding, "surrogatepass"),
869                         self.BOM + b"abc\xed\xa0\x80def")
870        self.assertEqual("\U00010fff\uD800".encode(self.encoding, "surrogatepass"),
871                         self.BOM + b"\xf0\x90\xbf\xbf\xed\xa0\x80")
872        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "surrogatepass"),
873                         self.BOM + b'[\xed\xa0\x80\xed\xb2\x80]')
874
875        self.assertEqual(b"abc\xed\xa0\x80def".decode(self.encoding, "surrogatepass"),
876                         "abc\ud800def")
877        self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode(self.encoding, "surrogatepass"),
878                         "\U00010fff\uD800")
879
880        self.assertTrue(codecs.lookup_error("surrogatepass"))
881        with self.assertRaises(UnicodeDecodeError):
882            b"abc\xed\xa0".decode(self.encoding, "surrogatepass")
883        with self.assertRaises(UnicodeDecodeError):
884            b"abc\xed\xa0z".decode(self.encoding, "surrogatepass")
885
886    def test_incremental_errors(self):
887        # Test that the incremental decoder can fail with final=False.
888        # See issue #24214
889        cases = [b'\x80', b'\xBF', b'\xC0', b'\xC1', b'\xF5', b'\xF6', b'\xFF']
890        for prefix in (b'\xC2', b'\xDF', b'\xE0', b'\xE0\xA0', b'\xEF',
891                       b'\xEF\xBF', b'\xF0', b'\xF0\x90', b'\xF0\x90\x80',
892                       b'\xF4', b'\xF4\x8F', b'\xF4\x8F\xBF'):
893            for suffix in b'\x7F', b'\xC0':
894                cases.append(prefix + suffix)
895        cases.extend((b'\xE0\x80', b'\xE0\x9F', b'\xED\xA0\x80',
896                      b'\xED\xBF\xBF', b'\xF0\x80', b'\xF0\x8F', b'\xF4\x90'))
897
898        for data in cases:
899            with self.subTest(data=data):
900                dec = codecs.getincrementaldecoder(self.encoding)()
901                self.assertRaises(UnicodeDecodeError, dec.decode, data)
902
903
904class UTF7Test(ReadTest, unittest.TestCase):
905    encoding = "utf-7"
906
907    def test_ascii(self):
908        # Set D (directly encoded characters)
909        set_d = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
910                 'abcdefghijklmnopqrstuvwxyz'
911                 '0123456789'
912                 '\'(),-./:?')
913        self.assertEqual(set_d.encode(self.encoding), set_d.encode('ascii'))
914        self.assertEqual(set_d.encode('ascii').decode(self.encoding), set_d)
915        # Set O (optional direct characters)
916        set_o = ' !"#$%&*;<=>@[]^_`{|}'
917        self.assertEqual(set_o.encode(self.encoding), set_o.encode('ascii'))
918        self.assertEqual(set_o.encode('ascii').decode(self.encoding), set_o)
919        # +
920        self.assertEqual('a+b'.encode(self.encoding), b'a+-b')
921        self.assertEqual(b'a+-b'.decode(self.encoding), 'a+b')
922        # White spaces
923        ws = ' \t\n\r'
924        self.assertEqual(ws.encode(self.encoding), ws.encode('ascii'))
925        self.assertEqual(ws.encode('ascii').decode(self.encoding), ws)
926        # Other ASCII characters
927        other_ascii = ''.join(sorted(set(bytes(range(0x80)).decode()) -
928                                     set(set_d + set_o + '+' + ws)))
929        self.assertEqual(other_ascii.encode(self.encoding),
930                         b'+AAAAAQACAAMABAAFAAYABwAIAAsADAAOAA8AEAARABIAEwAU'
931                         b'ABUAFgAXABgAGQAaABsAHAAdAB4AHwBcAH4Afw-')
932
933    def test_partial(self):
934        self.check_partial(
935            'a+-b\x00c\x80d\u0100e\U00010000f',
936            [
937                'a',
938                'a',
939                'a+',
940                'a+-',
941                'a+-b',
942                'a+-b',
943                'a+-b',
944                'a+-b',
945                'a+-b',
946                'a+-b\x00',
947                'a+-b\x00c',
948                'a+-b\x00c',
949                'a+-b\x00c',
950                'a+-b\x00c',
951                'a+-b\x00c',
952                'a+-b\x00c\x80',
953                'a+-b\x00c\x80d',
954                'a+-b\x00c\x80d',
955                'a+-b\x00c\x80d',
956                'a+-b\x00c\x80d',
957                'a+-b\x00c\x80d',
958                'a+-b\x00c\x80d\u0100',
959                'a+-b\x00c\x80d\u0100e',
960                'a+-b\x00c\x80d\u0100e',
961                'a+-b\x00c\x80d\u0100e',
962                'a+-b\x00c\x80d\u0100e',
963                'a+-b\x00c\x80d\u0100e',
964                'a+-b\x00c\x80d\u0100e',
965                'a+-b\x00c\x80d\u0100e',
966                'a+-b\x00c\x80d\u0100e',
967                'a+-b\x00c\x80d\u0100e\U00010000',
968                'a+-b\x00c\x80d\u0100e\U00010000f',
969            ]
970        )
971
972    def test_errors(self):
973        tests = [
974            (b'\xffb', '\ufffdb'),
975            (b'a\xffb', 'a\ufffdb'),
976            (b'a\xff\xffb', 'a\ufffd\ufffdb'),
977            (b'a+IK', 'a\ufffd'),
978            (b'a+IK-b', 'a\ufffdb'),
979            (b'a+IK,b', 'a\ufffdb'),
980            (b'a+IKx', 'a\u20ac\ufffd'),
981            (b'a+IKx-b', 'a\u20ac\ufffdb'),
982            (b'a+IKwgr', 'a\u20ac\ufffd'),
983            (b'a+IKwgr-b', 'a\u20ac\ufffdb'),
984            (b'a+IKwgr,', 'a\u20ac\ufffd'),
985            (b'a+IKwgr,-b', 'a\u20ac\ufffd-b'),
986            (b'a+IKwgrB', 'a\u20ac\u20ac\ufffd'),
987            (b'a+IKwgrB-b', 'a\u20ac\u20ac\ufffdb'),
988            (b'a+/,+IKw-b', 'a\ufffd\u20acb'),
989            (b'a+//,+IKw-b', 'a\ufffd\u20acb'),
990            (b'a+///,+IKw-b', 'a\uffff\ufffd\u20acb'),
991            (b'a+////,+IKw-b', 'a\uffff\ufffd\u20acb'),
992            (b'a+IKw-b\xff', 'a\u20acb\ufffd'),
993            (b'a+IKw\xffb', 'a\u20ac\ufffdb'),
994            (b'a+@b', 'a\ufffdb'),
995        ]
996        for raw, expected in tests:
997            with self.subTest(raw=raw):
998                self.assertRaises(UnicodeDecodeError, codecs.utf_7_decode,
999                                raw, 'strict', True)
1000                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1001
1002    def test_nonbmp(self):
1003        self.assertEqual('\U000104A0'.encode(self.encoding), b'+2AHcoA-')
1004        self.assertEqual('\ud801\udca0'.encode(self.encoding), b'+2AHcoA-')
1005        self.assertEqual(b'+2AHcoA-'.decode(self.encoding), '\U000104A0')
1006        self.assertEqual(b'+2AHcoA'.decode(self.encoding), '\U000104A0')
1007        self.assertEqual('\u20ac\U000104A0'.encode(self.encoding), b'+IKzYAdyg-')
1008        self.assertEqual(b'+IKzYAdyg-'.decode(self.encoding), '\u20ac\U000104A0')
1009        self.assertEqual(b'+IKzYAdyg'.decode(self.encoding), '\u20ac\U000104A0')
1010        self.assertEqual('\u20ac\u20ac\U000104A0'.encode(self.encoding),
1011                         b'+IKwgrNgB3KA-')
1012        self.assertEqual(b'+IKwgrNgB3KA-'.decode(self.encoding),
1013                         '\u20ac\u20ac\U000104A0')
1014        self.assertEqual(b'+IKwgrNgB3KA'.decode(self.encoding),
1015                         '\u20ac\u20ac\U000104A0')
1016
1017    def test_lone_surrogates(self):
1018        tests = [
1019            (b'a+2AE-b', 'a\ud801b'),
1020            (b'a+2AE\xffb', 'a\ufffdb'),
1021            (b'a+2AE', 'a\ufffd'),
1022            (b'a+2AEA-b', 'a\ufffdb'),
1023            (b'a+2AH-b', 'a\ufffdb'),
1024            (b'a+IKzYAQ-b', 'a\u20ac\ud801b'),
1025            (b'a+IKzYAQ\xffb', 'a\u20ac\ufffdb'),
1026            (b'a+IKzYAQA-b', 'a\u20ac\ufffdb'),
1027            (b'a+IKzYAd-b', 'a\u20ac\ufffdb'),
1028            (b'a+IKwgrNgB-b', 'a\u20ac\u20ac\ud801b'),
1029            (b'a+IKwgrNgB\xffb', 'a\u20ac\u20ac\ufffdb'),
1030            (b'a+IKwgrNgB', 'a\u20ac\u20ac\ufffd'),
1031            (b'a+IKwgrNgBA-b', 'a\u20ac\u20ac\ufffdb'),
1032        ]
1033        for raw, expected in tests:
1034            with self.subTest(raw=raw):
1035                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1036
1037
1038class UTF16ExTest(unittest.TestCase):
1039
1040    def test_errors(self):
1041        self.assertRaises(UnicodeDecodeError, codecs.utf_16_ex_decode, b"\xff", "strict", 0, True)
1042
1043    def test_bad_args(self):
1044        self.assertRaises(TypeError, codecs.utf_16_ex_decode)
1045
1046class ReadBufferTest(unittest.TestCase):
1047
1048    def test_array(self):
1049        import array
1050        self.assertEqual(
1051            codecs.readbuffer_encode(array.array("b", b"spam")),
1052            (b"spam", 4)
1053        )
1054
1055    def test_empty(self):
1056        self.assertEqual(codecs.readbuffer_encode(""), (b"", 0))
1057
1058    def test_bad_args(self):
1059        self.assertRaises(TypeError, codecs.readbuffer_encode)
1060        self.assertRaises(TypeError, codecs.readbuffer_encode, 42)
1061
1062class UTF8SigTest(UTF8Test, unittest.TestCase):
1063    encoding = "utf-8-sig"
1064    BOM = codecs.BOM_UTF8
1065
1066    def test_partial(self):
1067        self.check_partial(
1068            "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1069            [
1070                "",
1071                "",
1072                "", # First BOM has been read and skipped
1073                "",
1074                "",
1075                "\ufeff", # Second BOM has been read and emitted
1076                "\ufeff\x00", # "\x00" read and emitted
1077                "\ufeff\x00", # First byte of encoded "\xff" read
1078                "\ufeff\x00\xff", # Second byte of encoded "\xff" read
1079                "\ufeff\x00\xff", # First byte of encoded "\u07ff" read
1080                "\ufeff\x00\xff\u07ff", # Second byte of encoded "\u07ff" read
1081                "\ufeff\x00\xff\u07ff",
1082                "\ufeff\x00\xff\u07ff",
1083                "\ufeff\x00\xff\u07ff\u0800",
1084                "\ufeff\x00\xff\u07ff\u0800",
1085                "\ufeff\x00\xff\u07ff\u0800",
1086                "\ufeff\x00\xff\u07ff\u0800\uffff",
1087                "\ufeff\x00\xff\u07ff\u0800\uffff",
1088                "\ufeff\x00\xff\u07ff\u0800\uffff",
1089                "\ufeff\x00\xff\u07ff\u0800\uffff",
1090                "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1091            ]
1092        )
1093
1094    def test_bug1601501(self):
1095        # SF bug #1601501: check that the codec works with a buffer
1096        self.assertEqual(str(b"\xef\xbb\xbf", "utf-8-sig"), "")
1097
1098    def test_bom(self):
1099        d = codecs.getincrementaldecoder("utf-8-sig")()
1100        s = "spam"
1101        self.assertEqual(d.decode(s.encode("utf-8-sig")), s)
1102
1103    def test_stream_bom(self):
1104        unistring = "ABC\u00A1\u2200XYZ"
1105        bytestring = codecs.BOM_UTF8 + b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1106
1107        reader = codecs.getreader("utf-8-sig")
1108        for sizehint in [None] + list(range(1, 11)) + \
1109                        [64, 128, 256, 512, 1024]:
1110            istream = reader(io.BytesIO(bytestring))
1111            ostream = io.StringIO()
1112            while 1:
1113                if sizehint is not None:
1114                    data = istream.read(sizehint)
1115                else:
1116                    data = istream.read()
1117
1118                if not data:
1119                    break
1120                ostream.write(data)
1121
1122            got = ostream.getvalue()
1123            self.assertEqual(got, unistring)
1124
1125    def test_stream_bare(self):
1126        unistring = "ABC\u00A1\u2200XYZ"
1127        bytestring = b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1128
1129        reader = codecs.getreader("utf-8-sig")
1130        for sizehint in [None] + list(range(1, 11)) + \
1131                        [64, 128, 256, 512, 1024]:
1132            istream = reader(io.BytesIO(bytestring))
1133            ostream = io.StringIO()
1134            while 1:
1135                if sizehint is not None:
1136                    data = istream.read(sizehint)
1137                else:
1138                    data = istream.read()
1139
1140                if not data:
1141                    break
1142                ostream.write(data)
1143
1144            got = ostream.getvalue()
1145            self.assertEqual(got, unistring)
1146
1147
1148class EscapeDecodeTest(unittest.TestCase):
1149    def test_empty(self):
1150        self.assertEqual(codecs.escape_decode(b""), (b"", 0))
1151        self.assertEqual(codecs.escape_decode(bytearray()), (b"", 0))
1152
1153    def test_raw(self):
1154        decode = codecs.escape_decode
1155        for b in range(256):
1156            b = bytes([b])
1157            if b != b'\\':
1158                self.assertEqual(decode(b + b'0'), (b + b'0', 2))
1159
1160    def test_escape(self):
1161        decode = codecs.escape_decode
1162        check = coding_checker(self, decode)
1163        check(b"[\\\n]", b"[]")
1164        check(br'[\"]', b'["]')
1165        check(br"[\']", b"[']")
1166        check(br"[\\]", b"[\\]")
1167        check(br"[\a]", b"[\x07]")
1168        check(br"[\b]", b"[\x08]")
1169        check(br"[\t]", b"[\x09]")
1170        check(br"[\n]", b"[\x0a]")
1171        check(br"[\v]", b"[\x0b]")
1172        check(br"[\f]", b"[\x0c]")
1173        check(br"[\r]", b"[\x0d]")
1174        check(br"[\7]", b"[\x07]")
1175        check(br"[\78]", b"[\x078]")
1176        check(br"[\41]", b"[!]")
1177        check(br"[\418]", b"[!8]")
1178        check(br"[\101]", b"[A]")
1179        check(br"[\1010]", b"[A0]")
1180        check(br"[\501]", b"[A]")
1181        check(br"[\x41]", b"[A]")
1182        check(br"[\x410]", b"[A0]")
1183        for i in range(97, 123):
1184            b = bytes([i])
1185            if b not in b'abfnrtvx':
1186                with self.assertWarns(DeprecationWarning):
1187                    check(b"\\" + b, b"\\" + b)
1188            with self.assertWarns(DeprecationWarning):
1189                check(b"\\" + b.upper(), b"\\" + b.upper())
1190        with self.assertWarns(DeprecationWarning):
1191            check(br"\8", b"\\8")
1192        with self.assertWarns(DeprecationWarning):
1193            check(br"\9", b"\\9")
1194        with self.assertWarns(DeprecationWarning):
1195            check(b"\\\xfa", b"\\\xfa")
1196
1197    def test_errors(self):
1198        decode = codecs.escape_decode
1199        self.assertRaises(ValueError, decode, br"\x")
1200        self.assertRaises(ValueError, decode, br"[\x]")
1201        self.assertEqual(decode(br"[\x]\x", "ignore"), (b"[]", 6))
1202        self.assertEqual(decode(br"[\x]\x", "replace"), (b"[?]?", 6))
1203        self.assertRaises(ValueError, decode, br"\x0")
1204        self.assertRaises(ValueError, decode, br"[\x0]")
1205        self.assertEqual(decode(br"[\x0]\x0", "ignore"), (b"[]", 8))
1206        self.assertEqual(decode(br"[\x0]\x0", "replace"), (b"[?]?", 8))
1207
1208
1209# From RFC 3492
1210punycode_testcases = [
1211    # A Arabic (Egyptian):
1212    ("\u0644\u064A\u0647\u0645\u0627\u0628\u062A\u0643\u0644"
1213     "\u0645\u0648\u0634\u0639\u0631\u0628\u064A\u061F",
1214     b"egbpdaj6bu4bxfgehfvwxn"),
1215    # B Chinese (simplified):
1216    ("\u4ED6\u4EEC\u4E3A\u4EC0\u4E48\u4E0D\u8BF4\u4E2D\u6587",
1217     b"ihqwcrb4cv8a8dqg056pqjye"),
1218    # C Chinese (traditional):
1219    ("\u4ED6\u5011\u7232\u4EC0\u9EBD\u4E0D\u8AAA\u4E2D\u6587",
1220     b"ihqwctvzc91f659drss3x8bo0yb"),
1221    # D Czech: Pro<ccaron>prost<ecaron>nemluv<iacute><ccaron>esky
1222    ("\u0050\u0072\u006F\u010D\u0070\u0072\u006F\u0073\u0074"
1223     "\u011B\u006E\u0065\u006D\u006C\u0075\u0076\u00ED\u010D"
1224     "\u0065\u0073\u006B\u0079",
1225     b"Proprostnemluvesky-uyb24dma41a"),
1226    # E Hebrew:
1227    ("\u05DC\u05DE\u05D4\u05D4\u05DD\u05E4\u05E9\u05D5\u05D8"
1228     "\u05DC\u05D0\u05DE\u05D3\u05D1\u05E8\u05D9\u05DD\u05E2"
1229     "\u05D1\u05E8\u05D9\u05EA",
1230     b"4dbcagdahymbxekheh6e0a7fei0b"),
1231    # F Hindi (Devanagari):
1232    ("\u092F\u0939\u0932\u094B\u0917\u0939\u093F\u0928\u094D"
1233     "\u0926\u0940\u0915\u094D\u092F\u094B\u0902\u0928\u0939"
1234     "\u0940\u0902\u092C\u094B\u0932\u0938\u0915\u0924\u0947"
1235     "\u0939\u0948\u0902",
1236     b"i1baa7eci9glrd9b2ae1bj0hfcgg6iyaf8o0a1dig0cd"),
1237
1238    #(G) Japanese (kanji and hiragana):
1239    ("\u306A\u305C\u307F\u3093\u306A\u65E5\u672C\u8A9E\u3092"
1240     "\u8A71\u3057\u3066\u304F\u308C\u306A\u3044\u306E\u304B",
1241     b"n8jok5ay5dzabd5bym9f0cm5685rrjetr6pdxa"),
1242
1243    # (H) Korean (Hangul syllables):
1244    ("\uC138\uACC4\uC758\uBAA8\uB4E0\uC0AC\uB78C\uB4E4\uC774"
1245     "\uD55C\uAD6D\uC5B4\uB97C\uC774\uD574\uD55C\uB2E4\uBA74"
1246     "\uC5BC\uB9C8\uB098\uC88B\uC744\uAE4C",
1247     b"989aomsvi5e83db1d2a355cv1e0vak1dwrv93d5xbh15a0dt30a5j"
1248     b"psd879ccm6fea98c"),
1249
1250    # (I) Russian (Cyrillic):
1251    ("\u043F\u043E\u0447\u0435\u043C\u0443\u0436\u0435\u043E"
1252     "\u043D\u0438\u043D\u0435\u0433\u043E\u0432\u043E\u0440"
1253     "\u044F\u0442\u043F\u043E\u0440\u0443\u0441\u0441\u043A"
1254     "\u0438",
1255     b"b1abfaaepdrnnbgefbaDotcwatmq2g4l"),
1256
1257    # (J) Spanish: Porqu<eacute>nopuedensimplementehablarenEspa<ntilde>ol
1258    ("\u0050\u006F\u0072\u0071\u0075\u00E9\u006E\u006F\u0070"
1259     "\u0075\u0065\u0064\u0065\u006E\u0073\u0069\u006D\u0070"
1260     "\u006C\u0065\u006D\u0065\u006E\u0074\u0065\u0068\u0061"
1261     "\u0062\u006C\u0061\u0072\u0065\u006E\u0045\u0073\u0070"
1262     "\u0061\u00F1\u006F\u006C",
1263     b"PorqunopuedensimplementehablarenEspaol-fmd56a"),
1264
1265    # (K) Vietnamese:
1266    #  T<adotbelow>isaoh<odotbelow>kh<ocirc>ngth<ecirchookabove>ch\
1267    #   <ihookabove>n<oacute>iti<ecircacute>ngVi<ecircdotbelow>t
1268    ("\u0054\u1EA1\u0069\u0073\u0061\u006F\u0068\u1ECD\u006B"
1269     "\u0068\u00F4\u006E\u0067\u0074\u0068\u1EC3\u0063\u0068"
1270     "\u1EC9\u006E\u00F3\u0069\u0074\u0069\u1EBF\u006E\u0067"
1271     "\u0056\u0069\u1EC7\u0074",
1272     b"TisaohkhngthchnitingVit-kjcr8268qyxafd2f1b9g"),
1273
1274    #(L) 3<nen>B<gumi><kinpachi><sensei>
1275    ("\u0033\u5E74\u0042\u7D44\u91D1\u516B\u5148\u751F",
1276     b"3B-ww4c5e180e575a65lsy2b"),
1277
1278    # (M) <amuro><namie>-with-SUPER-MONKEYS
1279    ("\u5B89\u5BA4\u5948\u7F8E\u6075\u002D\u0077\u0069\u0074"
1280     "\u0068\u002D\u0053\u0055\u0050\u0045\u0052\u002D\u004D"
1281     "\u004F\u004E\u004B\u0045\u0059\u0053",
1282     b"-with-SUPER-MONKEYS-pc58ag80a8qai00g7n9n"),
1283
1284    # (N) Hello-Another-Way-<sorezore><no><basho>
1285    ("\u0048\u0065\u006C\u006C\u006F\u002D\u0041\u006E\u006F"
1286     "\u0074\u0068\u0065\u0072\u002D\u0057\u0061\u0079\u002D"
1287     "\u305D\u308C\u305E\u308C\u306E\u5834\u6240",
1288     b"Hello-Another-Way--fc4qua05auwb3674vfr0b"),
1289
1290    # (O) <hitotsu><yane><no><shita>2
1291    ("\u3072\u3068\u3064\u5C4B\u6839\u306E\u4E0B\u0032",
1292     b"2-u9tlzr9756bt3uc0v"),
1293
1294    # (P) Maji<de>Koi<suru>5<byou><mae>
1295    ("\u004D\u0061\u006A\u0069\u3067\u004B\u006F\u0069\u3059"
1296     "\u308B\u0035\u79D2\u524D",
1297     b"MajiKoi5-783gue6qz075azm5e"),
1298
1299     # (Q) <pafii>de<runba>
1300    ("\u30D1\u30D5\u30A3\u30FC\u0064\u0065\u30EB\u30F3\u30D0",
1301     b"de-jg4avhby1noc0d"),
1302
1303    # (R) <sono><supiido><de>
1304    ("\u305D\u306E\u30B9\u30D4\u30FC\u30C9\u3067",
1305     b"d9juau41awczczp"),
1306
1307    # (S) -> $1.00 <-
1308    ("\u002D\u003E\u0020\u0024\u0031\u002E\u0030\u0030\u0020"
1309     "\u003C\u002D",
1310     b"-> $1.00 <--")
1311    ]
1312
1313for i in punycode_testcases:
1314    if len(i)!=2:
1315        print(repr(i))
1316
1317
1318class PunycodeTest(unittest.TestCase):
1319    def test_encode(self):
1320        for uni, puny in punycode_testcases:
1321            # Need to convert both strings to lower case, since
1322            # some of the extended encodings use upper case, but our
1323            # code produces only lower case. Converting just puny to
1324            # lower is also insufficient, since some of the input characters
1325            # are upper case.
1326            self.assertEqual(
1327                str(uni.encode("punycode"), "ascii").lower(),
1328                str(puny, "ascii").lower()
1329            )
1330
1331    def test_decode(self):
1332        for uni, puny in punycode_testcases:
1333            self.assertEqual(uni, puny.decode("punycode"))
1334            puny = puny.decode("ascii").encode("ascii")
1335            self.assertEqual(uni, puny.decode("punycode"))
1336
1337    def test_decode_invalid(self):
1338        testcases = [
1339            (b"xn--w&", "strict", UnicodeError()),
1340            (b"xn--w&", "ignore", "xn-"),
1341        ]
1342        for puny, errors, expected in testcases:
1343            with self.subTest(puny=puny, errors=errors):
1344                if isinstance(expected, Exception):
1345                    self.assertRaises(UnicodeError, puny.decode, "punycode", errors)
1346                else:
1347                    self.assertEqual(puny.decode("punycode", errors), expected)
1348
1349
1350# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
1351nameprep_tests = [
1352    # 3.1 Map to nothing.
1353    (b'foo\xc2\xad\xcd\x8f\xe1\xa0\x86\xe1\xa0\x8bbar'
1354     b'\xe2\x80\x8b\xe2\x81\xa0baz\xef\xb8\x80\xef\xb8\x88\xef'
1355     b'\xb8\x8f\xef\xbb\xbf',
1356     b'foobarbaz'),
1357    # 3.2 Case folding ASCII U+0043 U+0041 U+0046 U+0045.
1358    (b'CAFE',
1359     b'cafe'),
1360    # 3.3 Case folding 8bit U+00DF (german sharp s).
1361    # The original test case is bogus; it says \xc3\xdf
1362    (b'\xc3\x9f',
1363     b'ss'),
1364    # 3.4 Case folding U+0130 (turkish capital I with dot).
1365    (b'\xc4\xb0',
1366     b'i\xcc\x87'),
1367    # 3.5 Case folding multibyte U+0143 U+037A.
1368    (b'\xc5\x83\xcd\xba',
1369     b'\xc5\x84 \xce\xb9'),
1370    # 3.6 Case folding U+2121 U+33C6 U+1D7BB.
1371    # XXX: skip this as it fails in UCS-2 mode
1372    #('\xe2\x84\xa1\xe3\x8f\x86\xf0\x9d\x9e\xbb',
1373    # 'telc\xe2\x88\x95kg\xcf\x83'),
1374    (None, None),
1375    # 3.7 Normalization of U+006a U+030c U+00A0 U+00AA.
1376    (b'j\xcc\x8c\xc2\xa0\xc2\xaa',
1377     b'\xc7\xb0 a'),
1378    # 3.8 Case folding U+1FB7 and normalization.
1379    (b'\xe1\xbe\xb7',
1380     b'\xe1\xbe\xb6\xce\xb9'),
1381    # 3.9 Self-reverting case folding U+01F0 and normalization.
1382    # The original test case is bogus, it says `\xc7\xf0'
1383    (b'\xc7\xb0',
1384     b'\xc7\xb0'),
1385    # 3.10 Self-reverting case folding U+0390 and normalization.
1386    (b'\xce\x90',
1387     b'\xce\x90'),
1388    # 3.11 Self-reverting case folding U+03B0 and normalization.
1389    (b'\xce\xb0',
1390     b'\xce\xb0'),
1391    # 3.12 Self-reverting case folding U+1E96 and normalization.
1392    (b'\xe1\xba\x96',
1393     b'\xe1\xba\x96'),
1394    # 3.13 Self-reverting case folding U+1F56 and normalization.
1395    (b'\xe1\xbd\x96',
1396     b'\xe1\xbd\x96'),
1397    # 3.14 ASCII space character U+0020.
1398    (b' ',
1399     b' '),
1400    # 3.15 Non-ASCII 8bit space character U+00A0.
1401    (b'\xc2\xa0',
1402     b' '),
1403    # 3.16 Non-ASCII multibyte space character U+1680.
1404    (b'\xe1\x9a\x80',
1405     None),
1406    # 3.17 Non-ASCII multibyte space character U+2000.
1407    (b'\xe2\x80\x80',
1408     b' '),
1409    # 3.18 Zero Width Space U+200b.
1410    (b'\xe2\x80\x8b',
1411     b''),
1412    # 3.19 Non-ASCII multibyte space character U+3000.
1413    (b'\xe3\x80\x80',
1414     b' '),
1415    # 3.20 ASCII control characters U+0010 U+007F.
1416    (b'\x10\x7f',
1417     b'\x10\x7f'),
1418    # 3.21 Non-ASCII 8bit control character U+0085.
1419    (b'\xc2\x85',
1420     None),
1421    # 3.22 Non-ASCII multibyte control character U+180E.
1422    (b'\xe1\xa0\x8e',
1423     None),
1424    # 3.23 Zero Width No-Break Space U+FEFF.
1425    (b'\xef\xbb\xbf',
1426     b''),
1427    # 3.24 Non-ASCII control character U+1D175.
1428    (b'\xf0\x9d\x85\xb5',
1429     None),
1430    # 3.25 Plane 0 private use character U+F123.
1431    (b'\xef\x84\xa3',
1432     None),
1433    # 3.26 Plane 15 private use character U+F1234.
1434    (b'\xf3\xb1\x88\xb4',
1435     None),
1436    # 3.27 Plane 16 private use character U+10F234.
1437    (b'\xf4\x8f\x88\xb4',
1438     None),
1439    # 3.28 Non-character code point U+8FFFE.
1440    (b'\xf2\x8f\xbf\xbe',
1441     None),
1442    # 3.29 Non-character code point U+10FFFF.
1443    (b'\xf4\x8f\xbf\xbf',
1444     None),
1445    # 3.30 Surrogate code U+DF42.
1446    (b'\xed\xbd\x82',
1447     None),
1448    # 3.31 Non-plain text character U+FFFD.
1449    (b'\xef\xbf\xbd',
1450     None),
1451    # 3.32 Ideographic description character U+2FF5.
1452    (b'\xe2\xbf\xb5',
1453     None),
1454    # 3.33 Display property character U+0341.
1455    (b'\xcd\x81',
1456     b'\xcc\x81'),
1457    # 3.34 Left-to-right mark U+200E.
1458    (b'\xe2\x80\x8e',
1459     None),
1460    # 3.35 Deprecated U+202A.
1461    (b'\xe2\x80\xaa',
1462     None),
1463    # 3.36 Language tagging character U+E0001.
1464    (b'\xf3\xa0\x80\x81',
1465     None),
1466    # 3.37 Language tagging character U+E0042.
1467    (b'\xf3\xa0\x81\x82',
1468     None),
1469    # 3.38 Bidi: RandALCat character U+05BE and LCat characters.
1470    (b'foo\xd6\xbebar',
1471     None),
1472    # 3.39 Bidi: RandALCat character U+FD50 and LCat characters.
1473    (b'foo\xef\xb5\x90bar',
1474     None),
1475    # 3.40 Bidi: RandALCat character U+FB38 and LCat characters.
1476    (b'foo\xef\xb9\xb6bar',
1477     b'foo \xd9\x8ebar'),
1478    # 3.41 Bidi: RandALCat without trailing RandALCat U+0627 U+0031.
1479    (b'\xd8\xa71',
1480     None),
1481    # 3.42 Bidi: RandALCat character U+0627 U+0031 U+0628.
1482    (b'\xd8\xa71\xd8\xa8',
1483     b'\xd8\xa71\xd8\xa8'),
1484    # 3.43 Unassigned code point U+E0002.
1485    # Skip this test as we allow unassigned
1486    #(b'\xf3\xa0\x80\x82',
1487    # None),
1488    (None, None),
1489    # 3.44 Larger test (shrinking).
1490    # Original test case reads \xc3\xdf
1491    (b'X\xc2\xad\xc3\x9f\xc4\xb0\xe2\x84\xa1j\xcc\x8c\xc2\xa0\xc2'
1492     b'\xaa\xce\xb0\xe2\x80\x80',
1493     b'xssi\xcc\x87tel\xc7\xb0 a\xce\xb0 '),
1494    # 3.45 Larger test (expanding).
1495    # Original test case reads \xc3\x9f
1496    (b'X\xc3\x9f\xe3\x8c\x96\xc4\xb0\xe2\x84\xa1\xe2\x92\x9f\xe3\x8c'
1497     b'\x80',
1498     b'xss\xe3\x82\xad\xe3\x83\xad\xe3\x83\xa1\xe3\x83\xbc\xe3'
1499     b'\x83\x88\xe3\x83\xabi\xcc\x87tel\x28d\x29\xe3\x82'
1500     b'\xa2\xe3\x83\x91\xe3\x83\xbc\xe3\x83\x88')
1501    ]
1502
1503
1504class NameprepTest(unittest.TestCase):
1505    def test_nameprep(self):
1506        from encodings.idna import nameprep
1507        for pos, (orig, prepped) in enumerate(nameprep_tests):
1508            if orig is None:
1509                # Skipped
1510                continue
1511            # The Unicode strings are given in UTF-8
1512            orig = str(orig, "utf-8", "surrogatepass")
1513            if prepped is None:
1514                # Input contains prohibited characters
1515                self.assertRaises(UnicodeError, nameprep, orig)
1516            else:
1517                prepped = str(prepped, "utf-8", "surrogatepass")
1518                try:
1519                    self.assertEqual(nameprep(orig), prepped)
1520                except Exception as e:
1521                    raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
1522
1523
1524class IDNACodecTest(unittest.TestCase):
1525    def test_builtin_decode(self):
1526        self.assertEqual(str(b"python.org", "idna"), "python.org")
1527        self.assertEqual(str(b"python.org.", "idna"), "python.org.")
1528        self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org")
1529        self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
1530
1531    def test_builtin_encode(self):
1532        self.assertEqual("python.org".encode("idna"), b"python.org")
1533        self.assertEqual("python.org.".encode("idna"), b"python.org.")
1534        self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
1535        self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
1536
1537    def test_builtin_decode_length_limit(self):
1538        with self.assertRaisesRegex(UnicodeError, "too long"):
1539            (b"xn--016c"+b"a"*1100).decode("idna")
1540        with self.assertRaisesRegex(UnicodeError, "too long"):
1541            (b"xn--016c"+b"a"*70).decode("idna")
1542
1543    def test_stream(self):
1544        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
1545        r.read(3)
1546        self.assertEqual(r.read(), "")
1547
1548    def test_incremental_decode(self):
1549        self.assertEqual(
1550            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
1551            "python.org"
1552        )
1553        self.assertEqual(
1554            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")),
1555            "python.org."
1556        )
1557        self.assertEqual(
1558            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1559            "pyth\xf6n.org."
1560        )
1561        self.assertEqual(
1562            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1563            "pyth\xf6n.org."
1564        )
1565
1566        decoder = codecs.getincrementaldecoder("idna")()
1567        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1568        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1569        self.assertEqual(decoder.decode(b"rg"), "")
1570        self.assertEqual(decoder.decode(b"", True), "org")
1571
1572        decoder.reset()
1573        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1574        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1575        self.assertEqual(decoder.decode(b"rg."), "org.")
1576        self.assertEqual(decoder.decode(b"", True), "")
1577
1578    def test_incremental_encode(self):
1579        self.assertEqual(
1580            b"".join(codecs.iterencode("python.org", "idna")),
1581            b"python.org"
1582        )
1583        self.assertEqual(
1584            b"".join(codecs.iterencode("python.org.", "idna")),
1585            b"python.org."
1586        )
1587        self.assertEqual(
1588            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1589            b"xn--pythn-mua.org."
1590        )
1591        self.assertEqual(
1592            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1593            b"xn--pythn-mua.org."
1594        )
1595
1596        encoder = codecs.getincrementalencoder("idna")()
1597        self.assertEqual(encoder.encode("\xe4x"), b"")
1598        self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.")
1599        self.assertEqual(encoder.encode("", True), b"org")
1600
1601        encoder.reset()
1602        self.assertEqual(encoder.encode("\xe4x"), b"")
1603        self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.")
1604        self.assertEqual(encoder.encode("", True), b"")
1605
1606    def test_errors(self):
1607        """Only supports "strict" error handler"""
1608        "python.org".encode("idna", "strict")
1609        b"python.org".decode("idna", "strict")
1610        for errors in ("ignore", "replace", "backslashreplace",
1611                "surrogateescape"):
1612            self.assertRaises(Exception, "python.org".encode, "idna", errors)
1613            self.assertRaises(Exception,
1614                b"python.org".decode, "idna", errors)
1615
1616
1617class CodecsModuleTest(unittest.TestCase):
1618
1619    def test_decode(self):
1620        self.assertEqual(codecs.decode(b'\xe4\xf6\xfc', 'latin-1'),
1621                         '\xe4\xf6\xfc')
1622        self.assertRaises(TypeError, codecs.decode)
1623        self.assertEqual(codecs.decode(b'abc'), 'abc')
1624        self.assertRaises(UnicodeDecodeError, codecs.decode, b'\xff', 'ascii')
1625
1626        # test keywords
1627        self.assertEqual(codecs.decode(obj=b'\xe4\xf6\xfc', encoding='latin-1'),
1628                         '\xe4\xf6\xfc')
1629        self.assertEqual(codecs.decode(b'[\xff]', 'ascii', errors='ignore'),
1630                         '[]')
1631
1632    def test_encode(self):
1633        self.assertEqual(codecs.encode('\xe4\xf6\xfc', 'latin-1'),
1634                         b'\xe4\xf6\xfc')
1635        self.assertRaises(TypeError, codecs.encode)
1636        self.assertRaises(LookupError, codecs.encode, "foo", "__spam__")
1637        self.assertEqual(codecs.encode('abc'), b'abc')
1638        self.assertRaises(UnicodeEncodeError, codecs.encode, '\xffff', 'ascii')
1639
1640        # test keywords
1641        self.assertEqual(codecs.encode(obj='\xe4\xf6\xfc', encoding='latin-1'),
1642                         b'\xe4\xf6\xfc')
1643        self.assertEqual(codecs.encode('[\xff]', 'ascii', errors='ignore'),
1644                         b'[]')
1645
1646    def test_register(self):
1647        self.assertRaises(TypeError, codecs.register)
1648        self.assertRaises(TypeError, codecs.register, 42)
1649
1650    def test_unregister(self):
1651        name = "nonexistent_codec_name"
1652        search_function = mock.Mock()
1653        codecs.register(search_function)
1654        self.assertRaises(TypeError, codecs.lookup, name)
1655        search_function.assert_called_with(name)
1656        search_function.reset_mock()
1657
1658        codecs.unregister(search_function)
1659        self.assertRaises(LookupError, codecs.lookup, name)
1660        search_function.assert_not_called()
1661
1662    def test_lookup(self):
1663        self.assertRaises(TypeError, codecs.lookup)
1664        self.assertRaises(LookupError, codecs.lookup, "__spam__")
1665        self.assertRaises(LookupError, codecs.lookup, " ")
1666
1667    def test_getencoder(self):
1668        self.assertRaises(TypeError, codecs.getencoder)
1669        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
1670
1671    def test_getdecoder(self):
1672        self.assertRaises(TypeError, codecs.getdecoder)
1673        self.assertRaises(LookupError, codecs.getdecoder, "__spam__")
1674
1675    def test_getreader(self):
1676        self.assertRaises(TypeError, codecs.getreader)
1677        self.assertRaises(LookupError, codecs.getreader, "__spam__")
1678
1679    def test_getwriter(self):
1680        self.assertRaises(TypeError, codecs.getwriter)
1681        self.assertRaises(LookupError, codecs.getwriter, "__spam__")
1682
1683    def test_lookup_issue1813(self):
1684        # Issue #1813: under Turkish locales, lookup of some codecs failed
1685        # because 'I' is lowercased as "ı" (dotless i)
1686        oldlocale = locale.setlocale(locale.LC_CTYPE)
1687        self.addCleanup(locale.setlocale, locale.LC_CTYPE, oldlocale)
1688        try:
1689            locale.setlocale(locale.LC_CTYPE, 'tr_TR')
1690        except locale.Error:
1691            # Unsupported locale on this system
1692            self.skipTest('test needs Turkish locale')
1693        c = codecs.lookup('ASCII')
1694        self.assertEqual(c.name, 'ascii')
1695
1696    def test_all(self):
1697        api = (
1698            "encode", "decode",
1699            "register", "CodecInfo", "Codec", "IncrementalEncoder",
1700            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
1701            "getencoder", "getdecoder", "getincrementalencoder",
1702            "getincrementaldecoder", "getreader", "getwriter",
1703            "register_error", "lookup_error",
1704            "strict_errors", "replace_errors", "ignore_errors",
1705            "xmlcharrefreplace_errors", "backslashreplace_errors",
1706            "namereplace_errors",
1707            "open", "EncodedFile",
1708            "iterencode", "iterdecode",
1709            "BOM", "BOM_BE", "BOM_LE",
1710            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
1711            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
1712            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
1713            "StreamReaderWriter", "StreamRecoder",
1714        )
1715        self.assertCountEqual(api, codecs.__all__)
1716        for api in codecs.__all__:
1717            getattr(codecs, api)
1718
1719    def test_open(self):
1720        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1721        for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'):
1722            with self.subTest(mode), \
1723                    codecs.open(os_helper.TESTFN, mode, 'ascii') as file:
1724                self.assertIsInstance(file, codecs.StreamReaderWriter)
1725
1726    def test_undefined(self):
1727        self.assertRaises(UnicodeError, codecs.encode, 'abc', 'undefined')
1728        self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined')
1729        self.assertRaises(UnicodeError, codecs.encode, '', 'undefined')
1730        self.assertRaises(UnicodeError, codecs.decode, b'', 'undefined')
1731        for errors in ('strict', 'ignore', 'replace', 'backslashreplace'):
1732            self.assertRaises(UnicodeError,
1733                codecs.encode, 'abc', 'undefined', errors)
1734            self.assertRaises(UnicodeError,
1735                codecs.decode, b'abc', 'undefined', errors)
1736
1737    def test_file_closes_if_lookup_error_raised(self):
1738        mock_open = mock.mock_open()
1739        with mock.patch('builtins.open', mock_open) as file:
1740            with self.assertRaises(LookupError):
1741                codecs.open(os_helper.TESTFN, 'wt', 'invalid-encoding')
1742
1743            file().close.assert_called()
1744
1745
1746class StreamReaderTest(unittest.TestCase):
1747
1748    def setUp(self):
1749        self.reader = codecs.getreader('utf-8')
1750        self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1751
1752    def test_readlines(self):
1753        f = self.reader(self.stream)
1754        self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
1755
1756
1757class EncodedFileTest(unittest.TestCase):
1758
1759    def test_basic(self):
1760        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1761        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
1762        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
1763
1764        f = io.BytesIO()
1765        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
1766        ef.write(b'\xc3\xbc')
1767        self.assertEqual(f.getvalue(), b'\xfc')
1768
1769all_unicode_encodings = [
1770    "ascii",
1771    "big5",
1772    "big5hkscs",
1773    "charmap",
1774    "cp037",
1775    "cp1006",
1776    "cp1026",
1777    "cp1125",
1778    "cp1140",
1779    "cp1250",
1780    "cp1251",
1781    "cp1252",
1782    "cp1253",
1783    "cp1254",
1784    "cp1255",
1785    "cp1256",
1786    "cp1257",
1787    "cp1258",
1788    "cp424",
1789    "cp437",
1790    "cp500",
1791    "cp720",
1792    "cp737",
1793    "cp775",
1794    "cp850",
1795    "cp852",
1796    "cp855",
1797    "cp856",
1798    "cp857",
1799    "cp858",
1800    "cp860",
1801    "cp861",
1802    "cp862",
1803    "cp863",
1804    "cp864",
1805    "cp865",
1806    "cp866",
1807    "cp869",
1808    "cp874",
1809    "cp875",
1810    "cp932",
1811    "cp949",
1812    "cp950",
1813    "euc_jis_2004",
1814    "euc_jisx0213",
1815    "euc_jp",
1816    "euc_kr",
1817    "gb18030",
1818    "gb2312",
1819    "gbk",
1820    "hp_roman8",
1821    "hz",
1822    "idna",
1823    "iso2022_jp",
1824    "iso2022_jp_1",
1825    "iso2022_jp_2",
1826    "iso2022_jp_2004",
1827    "iso2022_jp_3",
1828    "iso2022_jp_ext",
1829    "iso2022_kr",
1830    "iso8859_1",
1831    "iso8859_10",
1832    "iso8859_11",
1833    "iso8859_13",
1834    "iso8859_14",
1835    "iso8859_15",
1836    "iso8859_16",
1837    "iso8859_2",
1838    "iso8859_3",
1839    "iso8859_4",
1840    "iso8859_5",
1841    "iso8859_6",
1842    "iso8859_7",
1843    "iso8859_8",
1844    "iso8859_9",
1845    "johab",
1846    "koi8_r",
1847    "koi8_t",
1848    "koi8_u",
1849    "kz1048",
1850    "latin_1",
1851    "mac_cyrillic",
1852    "mac_greek",
1853    "mac_iceland",
1854    "mac_latin2",
1855    "mac_roman",
1856    "mac_turkish",
1857    "palmos",
1858    "ptcp154",
1859    "punycode",
1860    "raw_unicode_escape",
1861    "shift_jis",
1862    "shift_jis_2004",
1863    "shift_jisx0213",
1864    "tis_620",
1865    "unicode_escape",
1866    "utf_16",
1867    "utf_16_be",
1868    "utf_16_le",
1869    "utf_7",
1870    "utf_8",
1871]
1872
1873if hasattr(codecs, "mbcs_encode"):
1874    all_unicode_encodings.append("mbcs")
1875if hasattr(codecs, "oem_encode"):
1876    all_unicode_encodings.append("oem")
1877
1878# The following encoding is not tested, because it's not supposed
1879# to work:
1880#    "undefined"
1881
1882# The following encodings don't work in stateful mode
1883broken_unicode_with_stateful = [
1884    "punycode",
1885]
1886
1887
1888class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
1889    def test_basics(self):
1890        s = "abc123"  # all codecs should be able to encode these
1891        for encoding in all_unicode_encodings:
1892            name = codecs.lookup(encoding).name
1893            if encoding.endswith("_codec"):
1894                name += "_codec"
1895            elif encoding == "latin_1":
1896                name = "latin_1"
1897            self.assertEqual(encoding.replace("_", "-"), name.replace("_", "-"))
1898
1899            (b, size) = codecs.getencoder(encoding)(s)
1900            self.assertEqual(size, len(s), "encoding=%r" % encoding)
1901            (chars, size) = codecs.getdecoder(encoding)(b)
1902            self.assertEqual(chars, s, "encoding=%r" % encoding)
1903
1904            if encoding not in broken_unicode_with_stateful:
1905                # check stream reader/writer
1906                q = Queue(b"")
1907                writer = codecs.getwriter(encoding)(q)
1908                encodedresult = b""
1909                for c in s:
1910                    writer.write(c)
1911                    chunk = q.read()
1912                    self.assertTrue(type(chunk) is bytes, type(chunk))
1913                    encodedresult += chunk
1914                q = Queue(b"")
1915                reader = codecs.getreader(encoding)(q)
1916                decodedresult = ""
1917                for c in encodedresult:
1918                    q.write(bytes([c]))
1919                    decodedresult += reader.read()
1920                self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
1921
1922            if encoding not in broken_unicode_with_stateful:
1923                # check incremental decoder/encoder and iterencode()/iterdecode()
1924                try:
1925                    encoder = codecs.getincrementalencoder(encoding)()
1926                except LookupError:  # no IncrementalEncoder
1927                    pass
1928                else:
1929                    # check incremental decoder/encoder
1930                    encodedresult = b""
1931                    for c in s:
1932                        encodedresult += encoder.encode(c)
1933                    encodedresult += encoder.encode("", True)
1934                    decoder = codecs.getincrementaldecoder(encoding)()
1935                    decodedresult = ""
1936                    for c in encodedresult:
1937                        decodedresult += decoder.decode(bytes([c]))
1938                    decodedresult += decoder.decode(b"", True)
1939                    self.assertEqual(decodedresult, s,
1940                                     "encoding=%r" % encoding)
1941
1942                    # check iterencode()/iterdecode()
1943                    result = "".join(codecs.iterdecode(
1944                            codecs.iterencode(s, encoding), encoding))
1945                    self.assertEqual(result, s, "encoding=%r" % encoding)
1946
1947                    # check iterencode()/iterdecode() with empty string
1948                    result = "".join(codecs.iterdecode(
1949                            codecs.iterencode("", encoding), encoding))
1950                    self.assertEqual(result, "")
1951
1952                if encoding not in ("idna", "mbcs"):
1953                    # check incremental decoder/encoder with errors argument
1954                    try:
1955                        encoder = codecs.getincrementalencoder(encoding)("ignore")
1956                    except LookupError:  # no IncrementalEncoder
1957                        pass
1958                    else:
1959                        encodedresult = b"".join(encoder.encode(c) for c in s)
1960                        decoder = codecs.getincrementaldecoder(encoding)("ignore")
1961                        decodedresult = "".join(decoder.decode(bytes([c]))
1962                                                for c in encodedresult)
1963                        self.assertEqual(decodedresult, s,
1964                                         "encoding=%r" % encoding)
1965
1966    @support.cpython_only
1967    def test_basics_capi(self):
1968        s = "abc123"  # all codecs should be able to encode these
1969        for encoding in all_unicode_encodings:
1970            if encoding not in broken_unicode_with_stateful:
1971                # check incremental decoder/encoder (fetched via the C API)
1972                try:
1973                    cencoder = _testcapi.codec_incrementalencoder(encoding)
1974                except LookupError:  # no IncrementalEncoder
1975                    pass
1976                else:
1977                    # check C API
1978                    encodedresult = b""
1979                    for c in s:
1980                        encodedresult += cencoder.encode(c)
1981                    encodedresult += cencoder.encode("", True)
1982                    cdecoder = _testcapi.codec_incrementaldecoder(encoding)
1983                    decodedresult = ""
1984                    for c in encodedresult:
1985                        decodedresult += cdecoder.decode(bytes([c]))
1986                    decodedresult += cdecoder.decode(b"", True)
1987                    self.assertEqual(decodedresult, s,
1988                                     "encoding=%r" % encoding)
1989
1990                if encoding not in ("idna", "mbcs"):
1991                    # check incremental decoder/encoder with errors argument
1992                    try:
1993                        cencoder = _testcapi.codec_incrementalencoder(encoding, "ignore")
1994                    except LookupError:  # no IncrementalEncoder
1995                        pass
1996                    else:
1997                        encodedresult = b"".join(cencoder.encode(c) for c in s)
1998                        cdecoder = _testcapi.codec_incrementaldecoder(encoding, "ignore")
1999                        decodedresult = "".join(cdecoder.decode(bytes([c]))
2000                                                for c in encodedresult)
2001                        self.assertEqual(decodedresult, s,
2002                                         "encoding=%r" % encoding)
2003
2004    def test_seek(self):
2005        # all codecs should be able to encode these
2006        s = "%s\n%s\n" % (100*"abc123", 100*"def456")
2007        for encoding in all_unicode_encodings:
2008            if encoding == "idna": # FIXME: See SF bug #1163178
2009                continue
2010            if encoding in broken_unicode_with_stateful:
2011                continue
2012            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
2013            for t in range(5):
2014                # Test that calling seek resets the internal codec state and buffers
2015                reader.seek(0, 0)
2016                data = reader.read()
2017                self.assertEqual(s, data)
2018
2019    def test_bad_decode_args(self):
2020        for encoding in all_unicode_encodings:
2021            decoder = codecs.getdecoder(encoding)
2022            self.assertRaises(TypeError, decoder)
2023            if encoding not in ("idna", "punycode"):
2024                self.assertRaises(TypeError, decoder, 42)
2025
2026    def test_bad_encode_args(self):
2027        for encoding in all_unicode_encodings:
2028            encoder = codecs.getencoder(encoding)
2029            self.assertRaises(TypeError, encoder)
2030
2031    def test_encoding_map_type_initialized(self):
2032        from encodings import cp1140
2033        # This used to crash, we are only verifying there's no crash.
2034        table_type = type(cp1140.encoding_table)
2035        self.assertEqual(table_type, table_type)
2036
2037    def test_decoder_state(self):
2038        # Check that getstate() and setstate() handle the state properly
2039        u = "abc123"
2040        for encoding in all_unicode_encodings:
2041            if encoding not in broken_unicode_with_stateful:
2042                self.check_state_handling_decode(encoding, u, u.encode(encoding))
2043                self.check_state_handling_encode(encoding, u, u.encode(encoding))
2044
2045
2046class CharmapTest(unittest.TestCase):
2047    def test_decode_with_string_map(self):
2048        self.assertEqual(
2049            codecs.charmap_decode(b"\x00\x01\x02", "strict", "abc"),
2050            ("abc", 3)
2051        )
2052
2053        self.assertEqual(
2054            codecs.charmap_decode(b"\x00\x01\x02", "strict", "\U0010FFFFbc"),
2055            ("\U0010FFFFbc", 3)
2056        )
2057
2058        self.assertRaises(UnicodeDecodeError,
2059            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab"
2060        )
2061
2062        self.assertRaises(UnicodeDecodeError,
2063            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab\ufffe"
2064        )
2065
2066        self.assertEqual(
2067            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab"),
2068            ("ab\ufffd", 3)
2069        )
2070
2071        self.assertEqual(
2072            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab\ufffe"),
2073            ("ab\ufffd", 3)
2074        )
2075
2076        self.assertEqual(
2077            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab"),
2078            ("ab\\x02", 3)
2079        )
2080
2081        self.assertEqual(
2082            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab\ufffe"),
2083            ("ab\\x02", 3)
2084        )
2085
2086        self.assertEqual(
2087            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab"),
2088            ("ab", 3)
2089        )
2090
2091        self.assertEqual(
2092            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab\ufffe"),
2093            ("ab", 3)
2094        )
2095
2096        allbytes = bytes(range(256))
2097        self.assertEqual(
2098            codecs.charmap_decode(allbytes, "ignore", ""),
2099            ("", len(allbytes))
2100        )
2101
2102    def test_decode_with_int2str_map(self):
2103        self.assertEqual(
2104            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2105                                  {0: 'a', 1: 'b', 2: 'c'}),
2106            ("abc", 3)
2107        )
2108
2109        self.assertEqual(
2110            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2111                                  {0: 'Aa', 1: 'Bb', 2: 'Cc'}),
2112            ("AaBbCc", 3)
2113        )
2114
2115        self.assertEqual(
2116            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2117                                  {0: '\U0010FFFF', 1: 'b', 2: 'c'}),
2118            ("\U0010FFFFbc", 3)
2119        )
2120
2121        self.assertEqual(
2122            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2123                                  {0: 'a', 1: 'b', 2: ''}),
2124            ("ab", 3)
2125        )
2126
2127        self.assertRaises(UnicodeDecodeError,
2128            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2129                                   {0: 'a', 1: 'b'}
2130        )
2131
2132        self.assertRaises(UnicodeDecodeError,
2133            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2134                                   {0: 'a', 1: 'b', 2: None}
2135        )
2136
2137        # Issue #14850
2138        self.assertRaises(UnicodeDecodeError,
2139            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2140                                   {0: 'a', 1: 'b', 2: '\ufffe'}
2141        )
2142
2143        self.assertEqual(
2144            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2145                                  {0: 'a', 1: 'b'}),
2146            ("ab\ufffd", 3)
2147        )
2148
2149        self.assertEqual(
2150            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2151                                  {0: 'a', 1: 'b', 2: None}),
2152            ("ab\ufffd", 3)
2153        )
2154
2155        # Issue #14850
2156        self.assertEqual(
2157            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2158                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2159            ("ab\ufffd", 3)
2160        )
2161
2162        self.assertEqual(
2163            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2164                                  {0: 'a', 1: 'b'}),
2165            ("ab\\x02", 3)
2166        )
2167
2168        self.assertEqual(
2169            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2170                                  {0: 'a', 1: 'b', 2: None}),
2171            ("ab\\x02", 3)
2172        )
2173
2174        # Issue #14850
2175        self.assertEqual(
2176            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2177                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2178            ("ab\\x02", 3)
2179        )
2180
2181        self.assertEqual(
2182            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2183                                  {0: 'a', 1: 'b'}),
2184            ("ab", 3)
2185        )
2186
2187        self.assertEqual(
2188            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2189                                  {0: 'a', 1: 'b', 2: None}),
2190            ("ab", 3)
2191        )
2192
2193        # Issue #14850
2194        self.assertEqual(
2195            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2196                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2197            ("ab", 3)
2198        )
2199
2200        allbytes = bytes(range(256))
2201        self.assertEqual(
2202            codecs.charmap_decode(allbytes, "ignore", {}),
2203            ("", len(allbytes))
2204        )
2205
2206        self.assertRaisesRegex(TypeError,
2207            "character mapping must be in range\\(0x110000\\)",
2208            codecs.charmap_decode,
2209            b"\x00\x01\x02", "strict", {0: "A", 1: 'Bb', 2: -2}
2210        )
2211
2212        self.assertRaisesRegex(TypeError,
2213            "character mapping must be in range\\(0x110000\\)",
2214            codecs.charmap_decode,
2215            b"\x00\x01\x02", "strict", {0: "A", 1: 'Bb', 2: 999999999}
2216        )
2217
2218    def test_decode_with_int2int_map(self):
2219        a = ord('a')
2220        b = ord('b')
2221        c = ord('c')
2222
2223        self.assertEqual(
2224            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2225                                  {0: a, 1: b, 2: c}),
2226            ("abc", 3)
2227        )
2228
2229        # Issue #15379
2230        self.assertEqual(
2231            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2232                                  {0: 0x10FFFF, 1: b, 2: c}),
2233            ("\U0010FFFFbc", 3)
2234        )
2235
2236        self.assertEqual(
2237            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2238                                  {0: sys.maxunicode, 1: b, 2: c}),
2239            (chr(sys.maxunicode) + "bc", 3)
2240        )
2241
2242        self.assertRaises(TypeError,
2243            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2244                                   {0: sys.maxunicode + 1, 1: b, 2: c}
2245        )
2246
2247        self.assertRaises(UnicodeDecodeError,
2248            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2249                                   {0: a, 1: b},
2250        )
2251
2252        self.assertRaises(UnicodeDecodeError,
2253            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2254                                   {0: a, 1: b, 2: 0xFFFE},
2255        )
2256
2257        self.assertEqual(
2258            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2259                                  {0: a, 1: b}),
2260            ("ab\ufffd", 3)
2261        )
2262
2263        self.assertEqual(
2264            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2265                                  {0: a, 1: b, 2: 0xFFFE}),
2266            ("ab\ufffd", 3)
2267        )
2268
2269        self.assertEqual(
2270            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2271                                  {0: a, 1: b}),
2272            ("ab\\x02", 3)
2273        )
2274
2275        self.assertEqual(
2276            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2277                                  {0: a, 1: b, 2: 0xFFFE}),
2278            ("ab\\x02", 3)
2279        )
2280
2281        self.assertEqual(
2282            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2283                                  {0: a, 1: b}),
2284            ("ab", 3)
2285        )
2286
2287        self.assertEqual(
2288            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2289                                  {0: a, 1: b, 2: 0xFFFE}),
2290            ("ab", 3)
2291        )
2292
2293
2294class WithStmtTest(unittest.TestCase):
2295    def test_encodedfile(self):
2296        f = io.BytesIO(b"\xc3\xbc")
2297        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
2298            self.assertEqual(ef.read(), b"\xfc")
2299        self.assertTrue(f.closed)
2300
2301    def test_streamreaderwriter(self):
2302        f = io.BytesIO(b"\xc3\xbc")
2303        info = codecs.lookup("utf-8")
2304        with codecs.StreamReaderWriter(f, info.streamreader,
2305                                       info.streamwriter, 'strict') as srw:
2306            self.assertEqual(srw.read(), "\xfc")
2307
2308
2309class TypesTest(unittest.TestCase):
2310    def test_decode_unicode(self):
2311        # Most decoders don't accept unicode input
2312        decoders = [
2313            codecs.utf_7_decode,
2314            codecs.utf_8_decode,
2315            codecs.utf_16_le_decode,
2316            codecs.utf_16_be_decode,
2317            codecs.utf_16_ex_decode,
2318            codecs.utf_32_decode,
2319            codecs.utf_32_le_decode,
2320            codecs.utf_32_be_decode,
2321            codecs.utf_32_ex_decode,
2322            codecs.latin_1_decode,
2323            codecs.ascii_decode,
2324            codecs.charmap_decode,
2325        ]
2326        if hasattr(codecs, "mbcs_decode"):
2327            decoders.append(codecs.mbcs_decode)
2328        for decoder in decoders:
2329            self.assertRaises(TypeError, decoder, "xxx")
2330
2331    def test_unicode_escape(self):
2332        # Escape-decoding a unicode string is supported and gives the same
2333        # result as decoding the equivalent ASCII bytes string.
2334        self.assertEqual(codecs.unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2335        self.assertEqual(codecs.unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2336        self.assertEqual(codecs.raw_unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2337        self.assertEqual(codecs.raw_unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2338
2339        self.assertRaises(UnicodeDecodeError, codecs.unicode_escape_decode, br"\U00110000")
2340        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2341        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "backslashreplace"),
2342                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2343
2344        self.assertRaises(UnicodeDecodeError, codecs.raw_unicode_escape_decode, br"\U00110000")
2345        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2346        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "backslashreplace"),
2347                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2348
2349
2350class UnicodeEscapeTest(ReadTest, unittest.TestCase):
2351    encoding = "unicode-escape"
2352
2353    test_lone_surrogates = None
2354
2355    def test_empty(self):
2356        self.assertEqual(codecs.unicode_escape_encode(""), (b"", 0))
2357        self.assertEqual(codecs.unicode_escape_decode(b""), ("", 0))
2358
2359    def test_raw_encode(self):
2360        encode = codecs.unicode_escape_encode
2361        for b in range(32, 127):
2362            if b != b'\\'[0]:
2363                self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2364
2365    def test_raw_decode(self):
2366        decode = codecs.unicode_escape_decode
2367        for b in range(256):
2368            if b != b'\\'[0]:
2369                self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2370
2371    def test_escape_encode(self):
2372        encode = codecs.unicode_escape_encode
2373        check = coding_checker(self, encode)
2374        check('\t', br'\t')
2375        check('\n', br'\n')
2376        check('\r', br'\r')
2377        check('\\', br'\\')
2378        for b in range(32):
2379            if chr(b) not in '\t\n\r':
2380                check(chr(b), ('\\x%02x' % b).encode())
2381        for b in range(127, 256):
2382            check(chr(b), ('\\x%02x' % b).encode())
2383        check('\u20ac', br'\u20ac')
2384        check('\U0001d120', br'\U0001d120')
2385
2386    def test_escape_decode(self):
2387        decode = codecs.unicode_escape_decode
2388        check = coding_checker(self, decode)
2389        check(b"[\\\n]", "[]")
2390        check(br'[\"]', '["]')
2391        check(br"[\']", "[']")
2392        check(br"[\\]", r"[\]")
2393        check(br"[\a]", "[\x07]")
2394        check(br"[\b]", "[\x08]")
2395        check(br"[\t]", "[\x09]")
2396        check(br"[\n]", "[\x0a]")
2397        check(br"[\v]", "[\x0b]")
2398        check(br"[\f]", "[\x0c]")
2399        check(br"[\r]", "[\x0d]")
2400        check(br"[\7]", "[\x07]")
2401        check(br"[\78]", "[\x078]")
2402        check(br"[\41]", "[!]")
2403        check(br"[\418]", "[!8]")
2404        check(br"[\101]", "[A]")
2405        check(br"[\1010]", "[A0]")
2406        check(br"[\x41]", "[A]")
2407        check(br"[\x410]", "[A0]")
2408        check(br"\u20ac", "\u20ac")
2409        check(br"\U0001d120", "\U0001d120")
2410        for i in range(97, 123):
2411            b = bytes([i])
2412            if b not in b'abfnrtuvx':
2413                with self.assertWarns(DeprecationWarning):
2414                    check(b"\\" + b, "\\" + chr(i))
2415            if b.upper() not in b'UN':
2416                with self.assertWarns(DeprecationWarning):
2417                    check(b"\\" + b.upper(), "\\" + chr(i-32))
2418        with self.assertWarns(DeprecationWarning):
2419            check(br"\8", "\\8")
2420        with self.assertWarns(DeprecationWarning):
2421            check(br"\9", "\\9")
2422        with self.assertWarns(DeprecationWarning):
2423            check(b"\\\xfa", "\\\xfa")
2424
2425    def test_decode_errors(self):
2426        decode = codecs.unicode_escape_decode
2427        for c, d in (b'x', 2), (b'u', 4), (b'U', 4):
2428            for i in range(d):
2429                self.assertRaises(UnicodeDecodeError, decode,
2430                                  b"\\" + c + b"0"*i)
2431                self.assertRaises(UnicodeDecodeError, decode,
2432                                  b"[\\" + c + b"0"*i + b"]")
2433                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2434                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2435                self.assertEqual(decode(data, "replace"),
2436                                 ("[\ufffd]\ufffd", len(data)))
2437        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2438        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2439        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2440
2441    def test_partial(self):
2442        self.check_partial(
2443            "\x00\t\n\r\\\xff\uffff\U00010000",
2444            [
2445                '',
2446                '',
2447                '',
2448                '\x00',
2449                '\x00',
2450                '\x00\t',
2451                '\x00\t',
2452                '\x00\t\n',
2453                '\x00\t\n',
2454                '\x00\t\n\r',
2455                '\x00\t\n\r',
2456                '\x00\t\n\r\\',
2457                '\x00\t\n\r\\',
2458                '\x00\t\n\r\\',
2459                '\x00\t\n\r\\',
2460                '\x00\t\n\r\\\xff',
2461                '\x00\t\n\r\\\xff',
2462                '\x00\t\n\r\\\xff',
2463                '\x00\t\n\r\\\xff',
2464                '\x00\t\n\r\\\xff',
2465                '\x00\t\n\r\\\xff',
2466                '\x00\t\n\r\\\xff\uffff',
2467                '\x00\t\n\r\\\xff\uffff',
2468                '\x00\t\n\r\\\xff\uffff',
2469                '\x00\t\n\r\\\xff\uffff',
2470                '\x00\t\n\r\\\xff\uffff',
2471                '\x00\t\n\r\\\xff\uffff',
2472                '\x00\t\n\r\\\xff\uffff',
2473                '\x00\t\n\r\\\xff\uffff',
2474                '\x00\t\n\r\\\xff\uffff',
2475                '\x00\t\n\r\\\xff\uffff',
2476                '\x00\t\n\r\\\xff\uffff\U00010000',
2477            ]
2478        )
2479
2480class RawUnicodeEscapeTest(ReadTest, unittest.TestCase):
2481    encoding = "raw-unicode-escape"
2482
2483    test_lone_surrogates = None
2484
2485    def test_empty(self):
2486        self.assertEqual(codecs.raw_unicode_escape_encode(""), (b"", 0))
2487        self.assertEqual(codecs.raw_unicode_escape_decode(b""), ("", 0))
2488
2489    def test_raw_encode(self):
2490        encode = codecs.raw_unicode_escape_encode
2491        for b in range(256):
2492            self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2493
2494    def test_raw_decode(self):
2495        decode = codecs.raw_unicode_escape_decode
2496        for b in range(256):
2497            self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2498
2499    def test_escape_encode(self):
2500        encode = codecs.raw_unicode_escape_encode
2501        check = coding_checker(self, encode)
2502        for b in range(256):
2503            if b not in b'uU':
2504                check('\\' + chr(b), b'\\' + bytes([b]))
2505        check('\u20ac', br'\u20ac')
2506        check('\U0001d120', br'\U0001d120')
2507
2508    def test_escape_decode(self):
2509        decode = codecs.raw_unicode_escape_decode
2510        check = coding_checker(self, decode)
2511        for b in range(256):
2512            if b not in b'uU':
2513                check(b'\\' + bytes([b]), '\\' + chr(b))
2514        check(br"\u20ac", "\u20ac")
2515        check(br"\U0001d120", "\U0001d120")
2516
2517    def test_decode_errors(self):
2518        decode = codecs.raw_unicode_escape_decode
2519        for c, d in (b'u', 4), (b'U', 4):
2520            for i in range(d):
2521                self.assertRaises(UnicodeDecodeError, decode,
2522                                  b"\\" + c + b"0"*i)
2523                self.assertRaises(UnicodeDecodeError, decode,
2524                                  b"[\\" + c + b"0"*i + b"]")
2525                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2526                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2527                self.assertEqual(decode(data, "replace"),
2528                                 ("[\ufffd]\ufffd", len(data)))
2529        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2530        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2531        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2532
2533    def test_partial(self):
2534        self.check_partial(
2535            "\x00\t\n\r\\\xff\uffff\U00010000",
2536            [
2537                '\x00',
2538                '\x00\t',
2539                '\x00\t\n',
2540                '\x00\t\n\r',
2541                '\x00\t\n\r',
2542                '\x00\t\n\r\\\xff',
2543                '\x00\t\n\r\\\xff',
2544                '\x00\t\n\r\\\xff',
2545                '\x00\t\n\r\\\xff',
2546                '\x00\t\n\r\\\xff',
2547                '\x00\t\n\r\\\xff',
2548                '\x00\t\n\r\\\xff\uffff',
2549                '\x00\t\n\r\\\xff\uffff',
2550                '\x00\t\n\r\\\xff\uffff',
2551                '\x00\t\n\r\\\xff\uffff',
2552                '\x00\t\n\r\\\xff\uffff',
2553                '\x00\t\n\r\\\xff\uffff',
2554                '\x00\t\n\r\\\xff\uffff',
2555                '\x00\t\n\r\\\xff\uffff',
2556                '\x00\t\n\r\\\xff\uffff',
2557                '\x00\t\n\r\\\xff\uffff',
2558                '\x00\t\n\r\\\xff\uffff\U00010000',
2559            ]
2560        )
2561
2562
2563class EscapeEncodeTest(unittest.TestCase):
2564
2565    def test_escape_encode(self):
2566        tests = [
2567            (b'', (b'', 0)),
2568            (b'foobar', (b'foobar', 6)),
2569            (b'spam\0eggs', (b'spam\\x00eggs', 9)),
2570            (b'a\'b', (b"a\\'b", 3)),
2571            (b'b\\c', (b'b\\\\c', 3)),
2572            (b'c\nd', (b'c\\nd', 3)),
2573            (b'd\re', (b'd\\re', 3)),
2574            (b'f\x7fg', (b'f\\x7fg', 3)),
2575        ]
2576        for data, output in tests:
2577            with self.subTest(data=data):
2578                self.assertEqual(codecs.escape_encode(data), output)
2579        self.assertRaises(TypeError, codecs.escape_encode, 'spam')
2580        self.assertRaises(TypeError, codecs.escape_encode, bytearray(b'spam'))
2581
2582
2583class SurrogateEscapeTest(unittest.TestCase):
2584
2585    def test_utf8(self):
2586        # Bad byte
2587        self.assertEqual(b"foo\x80bar".decode("utf-8", "surrogateescape"),
2588                         "foo\udc80bar")
2589        self.assertEqual("foo\udc80bar".encode("utf-8", "surrogateescape"),
2590                         b"foo\x80bar")
2591        # bad-utf-8 encoded surrogate
2592        self.assertEqual(b"\xed\xb0\x80".decode("utf-8", "surrogateescape"),
2593                         "\udced\udcb0\udc80")
2594        self.assertEqual("\udced\udcb0\udc80".encode("utf-8", "surrogateescape"),
2595                         b"\xed\xb0\x80")
2596
2597    def test_ascii(self):
2598        # bad byte
2599        self.assertEqual(b"foo\x80bar".decode("ascii", "surrogateescape"),
2600                         "foo\udc80bar")
2601        self.assertEqual("foo\udc80bar".encode("ascii", "surrogateescape"),
2602                         b"foo\x80bar")
2603
2604    def test_charmap(self):
2605        # bad byte: \xa5 is unmapped in iso-8859-3
2606        self.assertEqual(b"foo\xa5bar".decode("iso-8859-3", "surrogateescape"),
2607                         "foo\udca5bar")
2608        self.assertEqual("foo\udca5bar".encode("iso-8859-3", "surrogateescape"),
2609                         b"foo\xa5bar")
2610
2611    def test_latin1(self):
2612        # Issue6373
2613        self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin-1", "surrogateescape"),
2614                         b"\xe4\xeb\xef\xf6\xfc")
2615
2616
2617class BomTest(unittest.TestCase):
2618    def test_seek0(self):
2619        data = "1234567890"
2620        tests = ("utf-16",
2621                 "utf-16-le",
2622                 "utf-16-be",
2623                 "utf-32",
2624                 "utf-32-le",
2625                 "utf-32-be")
2626        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2627        for encoding in tests:
2628            # Check if the BOM is written only once
2629            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2630                f.write(data)
2631                f.write(data)
2632                f.seek(0)
2633                self.assertEqual(f.read(), data * 2)
2634                f.seek(0)
2635                self.assertEqual(f.read(), data * 2)
2636
2637            # Check that the BOM is written after a seek(0)
2638            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2639                f.write(data[0])
2640                self.assertNotEqual(f.tell(), 0)
2641                f.seek(0)
2642                f.write(data)
2643                f.seek(0)
2644                self.assertEqual(f.read(), data)
2645
2646            # (StreamWriter) Check that the BOM is written after a seek(0)
2647            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2648                f.writer.write(data[0])
2649                self.assertNotEqual(f.writer.tell(), 0)
2650                f.writer.seek(0)
2651                f.writer.write(data)
2652                f.seek(0)
2653                self.assertEqual(f.read(), data)
2654
2655            # Check that the BOM is not written after a seek() at a position
2656            # different than the start
2657            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2658                f.write(data)
2659                f.seek(f.tell())
2660                f.write(data)
2661                f.seek(0)
2662                self.assertEqual(f.read(), data * 2)
2663
2664            # (StreamWriter) Check that the BOM is not written after a seek()
2665            # at a position different than the start
2666            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2667                f.writer.write(data)
2668                f.writer.seek(f.writer.tell())
2669                f.writer.write(data)
2670                f.seek(0)
2671                self.assertEqual(f.read(), data * 2)
2672
2673
2674bytes_transform_encodings = [
2675    "base64_codec",
2676    "uu_codec",
2677    "quopri_codec",
2678    "hex_codec",
2679]
2680
2681transform_aliases = {
2682    "base64_codec": ["base64", "base_64"],
2683    "uu_codec": ["uu"],
2684    "quopri_codec": ["quopri", "quoted_printable", "quotedprintable"],
2685    "hex_codec": ["hex"],
2686    "rot_13": ["rot13"],
2687}
2688
2689try:
2690    import zlib
2691except ImportError:
2692    zlib = None
2693else:
2694    bytes_transform_encodings.append("zlib_codec")
2695    transform_aliases["zlib_codec"] = ["zip", "zlib"]
2696try:
2697    import bz2
2698except ImportError:
2699    pass
2700else:
2701    bytes_transform_encodings.append("bz2_codec")
2702    transform_aliases["bz2_codec"] = ["bz2"]
2703
2704
2705class TransformCodecTest(unittest.TestCase):
2706
2707    def test_basics(self):
2708        binput = bytes(range(256))
2709        for encoding in bytes_transform_encodings:
2710            with self.subTest(encoding=encoding):
2711                # generic codecs interface
2712                (o, size) = codecs.getencoder(encoding)(binput)
2713                self.assertEqual(size, len(binput))
2714                (i, size) = codecs.getdecoder(encoding)(o)
2715                self.assertEqual(size, len(o))
2716                self.assertEqual(i, binput)
2717
2718    def test_read(self):
2719        for encoding in bytes_transform_encodings:
2720            with self.subTest(encoding=encoding):
2721                sin = codecs.encode(b"\x80", encoding)
2722                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2723                sout = reader.read()
2724                self.assertEqual(sout, b"\x80")
2725
2726    def test_readline(self):
2727        for encoding in bytes_transform_encodings:
2728            with self.subTest(encoding=encoding):
2729                sin = codecs.encode(b"\x80", encoding)
2730                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2731                sout = reader.readline()
2732                self.assertEqual(sout, b"\x80")
2733
2734    def test_buffer_api_usage(self):
2735        # We check all the transform codecs accept memoryview input
2736        # for encoding and decoding
2737        # and also that they roundtrip correctly
2738        original = b"12345\x80"
2739        for encoding in bytes_transform_encodings:
2740            with self.subTest(encoding=encoding):
2741                data = original
2742                view = memoryview(data)
2743                data = codecs.encode(data, encoding)
2744                view_encoded = codecs.encode(view, encoding)
2745                self.assertEqual(view_encoded, data)
2746                view = memoryview(data)
2747                data = codecs.decode(data, encoding)
2748                self.assertEqual(data, original)
2749                view_decoded = codecs.decode(view, encoding)
2750                self.assertEqual(view_decoded, data)
2751
2752    def test_text_to_binary_denylists_binary_transforms(self):
2753        # Check binary -> binary codecs give a good error for str input
2754        bad_input = "bad input type"
2755        for encoding in bytes_transform_encodings:
2756            with self.subTest(encoding=encoding):
2757                fmt = (r"{!r} is not a text encoding; "
2758                       r"use codecs.encode\(\) to handle arbitrary codecs")
2759                msg = fmt.format(encoding)
2760                with self.assertRaisesRegex(LookupError, msg) as failure:
2761                    bad_input.encode(encoding)
2762                self.assertIsNone(failure.exception.__cause__)
2763
2764    def test_text_to_binary_denylists_text_transforms(self):
2765        # Check str.encode gives a good error message for str -> str codecs
2766        msg = (r"^'rot_13' is not a text encoding; "
2767               r"use codecs.encode\(\) to handle arbitrary codecs")
2768        with self.assertRaisesRegex(LookupError, msg):
2769            "just an example message".encode("rot_13")
2770
2771    def test_binary_to_text_denylists_binary_transforms(self):
2772        # Check bytes.decode and bytearray.decode give a good error
2773        # message for binary -> binary codecs
2774        data = b"encode first to ensure we meet any format restrictions"
2775        for encoding in bytes_transform_encodings:
2776            with self.subTest(encoding=encoding):
2777                encoded_data = codecs.encode(data, encoding)
2778                fmt = (r"{!r} is not a text encoding; "
2779                       r"use codecs.decode\(\) to handle arbitrary codecs")
2780                msg = fmt.format(encoding)
2781                with self.assertRaisesRegex(LookupError, msg):
2782                    encoded_data.decode(encoding)
2783                with self.assertRaisesRegex(LookupError, msg):
2784                    bytearray(encoded_data).decode(encoding)
2785
2786    def test_binary_to_text_denylists_text_transforms(self):
2787        # Check str -> str codec gives a good error for binary input
2788        for bad_input in (b"immutable", bytearray(b"mutable")):
2789            with self.subTest(bad_input=bad_input):
2790                msg = (r"^'rot_13' is not a text encoding; "
2791                       r"use codecs.decode\(\) to handle arbitrary codecs")
2792                with self.assertRaisesRegex(LookupError, msg) as failure:
2793                    bad_input.decode("rot_13")
2794                self.assertIsNone(failure.exception.__cause__)
2795
2796    @unittest.skipUnless(zlib, "Requires zlib support")
2797    def test_custom_zlib_error_is_wrapped(self):
2798        # Check zlib codec gives a good error for malformed input
2799        msg = "^decoding with 'zlib_codec' codec failed"
2800        with self.assertRaisesRegex(Exception, msg) as failure:
2801            codecs.decode(b"hello", "zlib_codec")
2802        self.assertIsInstance(failure.exception.__cause__,
2803                                                type(failure.exception))
2804
2805    def test_custom_hex_error_is_wrapped(self):
2806        # Check hex codec gives a good error for malformed input
2807        msg = "^decoding with 'hex_codec' codec failed"
2808        with self.assertRaisesRegex(Exception, msg) as failure:
2809            codecs.decode(b"hello", "hex_codec")
2810        self.assertIsInstance(failure.exception.__cause__,
2811                                                type(failure.exception))
2812
2813    # Unfortunately, the bz2 module throws OSError, which the codec
2814    # machinery currently can't wrap :(
2815
2816    # Ensure codec aliases from http://bugs.python.org/issue7475 work
2817    def test_aliases(self):
2818        for codec_name, aliases in transform_aliases.items():
2819            expected_name = codecs.lookup(codec_name).name
2820            for alias in aliases:
2821                with self.subTest(alias=alias):
2822                    info = codecs.lookup(alias)
2823                    self.assertEqual(info.name, expected_name)
2824
2825    def test_quopri_stateless(self):
2826        # Should encode with quotetabs=True
2827        encoded = codecs.encode(b"space tab\teol \n", "quopri-codec")
2828        self.assertEqual(encoded, b"space=20tab=09eol=20\n")
2829        # But should still support unescaped tabs and spaces
2830        unescaped = b"space tab eol\n"
2831        self.assertEqual(codecs.decode(unescaped, "quopri-codec"), unescaped)
2832
2833    def test_uu_invalid(self):
2834        # Missing "begin" line
2835        self.assertRaises(ValueError, codecs.decode, b"", "uu-codec")
2836
2837
2838# The codec system tries to wrap exceptions in order to ensure the error
2839# mentions the operation being performed and the codec involved. We
2840# currently *only* want this to happen for relatively stateless
2841# exceptions, where the only significant information they contain is their
2842# type and a single str argument.
2843
2844# Use a local codec registry to avoid appearing to leak objects when
2845# registering multiple search functions
2846_TEST_CODECS = {}
2847
2848def _get_test_codec(codec_name):
2849    return _TEST_CODECS.get(codec_name)
2850
2851
2852class ExceptionChainingTest(unittest.TestCase):
2853
2854    def setUp(self):
2855        self.codec_name = 'exception_chaining_test'
2856        codecs.register(_get_test_codec)
2857        self.addCleanup(codecs.unregister, _get_test_codec)
2858
2859        # We store the object to raise on the instance because of a bad
2860        # interaction between the codec caching (which means we can't
2861        # recreate the codec entry) and regrtest refleak hunting (which
2862        # runs the same test instance multiple times). This means we
2863        # need to ensure the codecs call back in to the instance to find
2864        # out which exception to raise rather than binding them in a
2865        # closure to an object that may change on the next run
2866        self.obj_to_raise = RuntimeError
2867
2868    def tearDown(self):
2869        _TEST_CODECS.pop(self.codec_name, None)
2870        # Issue #22166: Also pop from caches to avoid appearance of ref leaks
2871        encodings._cache.pop(self.codec_name, None)
2872
2873    def set_codec(self, encode, decode):
2874        codec_info = codecs.CodecInfo(encode, decode,
2875                                      name=self.codec_name)
2876        _TEST_CODECS[self.codec_name] = codec_info
2877
2878    @contextlib.contextmanager
2879    def assertWrapped(self, operation, exc_type, msg):
2880        full_msg = r"{} with {!r} codec failed \({}: {}\)".format(
2881                  operation, self.codec_name, exc_type.__name__, msg)
2882        with self.assertRaisesRegex(exc_type, full_msg) as caught:
2883            yield caught
2884        self.assertIsInstance(caught.exception.__cause__, exc_type)
2885        self.assertIsNotNone(caught.exception.__cause__.__traceback__)
2886
2887    def raise_obj(self, *args, **kwds):
2888        # Helper to dynamically change the object raised by a test codec
2889        raise self.obj_to_raise
2890
2891    def check_wrapped(self, obj_to_raise, msg, exc_type=RuntimeError):
2892        self.obj_to_raise = obj_to_raise
2893        self.set_codec(self.raise_obj, self.raise_obj)
2894        with self.assertWrapped("encoding", exc_type, msg):
2895            "str_input".encode(self.codec_name)
2896        with self.assertWrapped("encoding", exc_type, msg):
2897            codecs.encode("str_input", self.codec_name)
2898        with self.assertWrapped("decoding", exc_type, msg):
2899            b"bytes input".decode(self.codec_name)
2900        with self.assertWrapped("decoding", exc_type, msg):
2901            codecs.decode(b"bytes input", self.codec_name)
2902
2903    def test_raise_by_type(self):
2904        self.check_wrapped(RuntimeError, "")
2905
2906    def test_raise_by_value(self):
2907        msg = "This should be wrapped"
2908        self.check_wrapped(RuntimeError(msg), msg)
2909
2910    def test_raise_grandchild_subclass_exact_size(self):
2911        msg = "This should be wrapped"
2912        class MyRuntimeError(RuntimeError):
2913            __slots__ = ()
2914        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2915
2916    def test_raise_subclass_with_weakref_support(self):
2917        msg = "This should be wrapped"
2918        class MyRuntimeError(RuntimeError):
2919            pass
2920        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2921
2922    def check_not_wrapped(self, obj_to_raise, msg):
2923        def raise_obj(*args, **kwds):
2924            raise obj_to_raise
2925        self.set_codec(raise_obj, raise_obj)
2926        with self.assertRaisesRegex(RuntimeError, msg):
2927            "str input".encode(self.codec_name)
2928        with self.assertRaisesRegex(RuntimeError, msg):
2929            codecs.encode("str input", self.codec_name)
2930        with self.assertRaisesRegex(RuntimeError, msg):
2931            b"bytes input".decode(self.codec_name)
2932        with self.assertRaisesRegex(RuntimeError, msg):
2933            codecs.decode(b"bytes input", self.codec_name)
2934
2935    def test_init_override_is_not_wrapped(self):
2936        class CustomInit(RuntimeError):
2937            def __init__(self):
2938                pass
2939        self.check_not_wrapped(CustomInit, "")
2940
2941    def test_new_override_is_not_wrapped(self):
2942        class CustomNew(RuntimeError):
2943            def __new__(cls):
2944                return super().__new__(cls)
2945        self.check_not_wrapped(CustomNew, "")
2946
2947    def test_instance_attribute_is_not_wrapped(self):
2948        msg = "This should NOT be wrapped"
2949        exc = RuntimeError(msg)
2950        exc.attr = 1
2951        self.check_not_wrapped(exc, "^{}$".format(msg))
2952
2953    def test_non_str_arg_is_not_wrapped(self):
2954        self.check_not_wrapped(RuntimeError(1), "1")
2955
2956    def test_multiple_args_is_not_wrapped(self):
2957        msg_re = r"^\('a', 'b', 'c'\)$"
2958        self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg_re)
2959
2960    # http://bugs.python.org/issue19609
2961    def test_codec_lookup_failure_not_wrapped(self):
2962        msg = "^unknown encoding: {}$".format(self.codec_name)
2963        # The initial codec lookup should not be wrapped
2964        with self.assertRaisesRegex(LookupError, msg):
2965            "str input".encode(self.codec_name)
2966        with self.assertRaisesRegex(LookupError, msg):
2967            codecs.encode("str input", self.codec_name)
2968        with self.assertRaisesRegex(LookupError, msg):
2969            b"bytes input".decode(self.codec_name)
2970        with self.assertRaisesRegex(LookupError, msg):
2971            codecs.decode(b"bytes input", self.codec_name)
2972
2973    def test_unflagged_non_text_codec_handling(self):
2974        # The stdlib non-text codecs are now marked so they're
2975        # pre-emptively skipped by the text model related methods
2976        # However, third party codecs won't be flagged, so we still make
2977        # sure the case where an inappropriate output type is produced is
2978        # handled appropriately
2979        def encode_to_str(*args, **kwds):
2980            return "not bytes!", 0
2981        def decode_to_bytes(*args, **kwds):
2982            return b"not str!", 0
2983        self.set_codec(encode_to_str, decode_to_bytes)
2984        # No input or output type checks on the codecs module functions
2985        encoded = codecs.encode(None, self.codec_name)
2986        self.assertEqual(encoded, "not bytes!")
2987        decoded = codecs.decode(None, self.codec_name)
2988        self.assertEqual(decoded, b"not str!")
2989        # Text model methods should complain
2990        fmt = (r"^{!r} encoder returned 'str' instead of 'bytes'; "
2991               r"use codecs.encode\(\) to encode to arbitrary types$")
2992        msg = fmt.format(self.codec_name)
2993        with self.assertRaisesRegex(TypeError, msg):
2994            "str_input".encode(self.codec_name)
2995        fmt = (r"^{!r} decoder returned 'bytes' instead of 'str'; "
2996               r"use codecs.decode\(\) to decode to arbitrary types$")
2997        msg = fmt.format(self.codec_name)
2998        with self.assertRaisesRegex(TypeError, msg):
2999            b"bytes input".decode(self.codec_name)
3000
3001
3002
3003@unittest.skipUnless(sys.platform == 'win32',
3004                     'code pages are specific to Windows')
3005class CodePageTest(unittest.TestCase):
3006    CP_UTF8 = 65001
3007
3008    def test_invalid_code_page(self):
3009        self.assertRaises(ValueError, codecs.code_page_encode, -1, 'a')
3010        self.assertRaises(ValueError, codecs.code_page_decode, -1, b'a')
3011        self.assertRaises(OSError, codecs.code_page_encode, 123, 'a')
3012        self.assertRaises(OSError, codecs.code_page_decode, 123, b'a')
3013
3014    def test_code_page_name(self):
3015        self.assertRaisesRegex(UnicodeEncodeError, 'cp932',
3016            codecs.code_page_encode, 932, '\xff')
3017        self.assertRaisesRegex(UnicodeDecodeError, 'cp932',
3018            codecs.code_page_decode, 932, b'\x81\x00', 'strict', True)
3019        self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8',
3020            codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True)
3021
3022    def check_decode(self, cp, tests):
3023        for raw, errors, expected in tests:
3024            if expected is not None:
3025                try:
3026                    decoded = codecs.code_page_decode(cp, raw, errors, True)
3027                except UnicodeDecodeError as err:
3028                    self.fail('Unable to decode %a from "cp%s" with '
3029                              'errors=%r: %s' % (raw, cp, errors, err))
3030                self.assertEqual(decoded[0], expected,
3031                    '%a.decode("cp%s", %r)=%a != %a'
3032                    % (raw, cp, errors, decoded[0], expected))
3033                # assert 0 <= decoded[1] <= len(raw)
3034                self.assertGreaterEqual(decoded[1], 0)
3035                self.assertLessEqual(decoded[1], len(raw))
3036            else:
3037                self.assertRaises(UnicodeDecodeError,
3038                    codecs.code_page_decode, cp, raw, errors, True)
3039
3040    def check_encode(self, cp, tests):
3041        for text, errors, expected in tests:
3042            if expected is not None:
3043                try:
3044                    encoded = codecs.code_page_encode(cp, text, errors)
3045                except UnicodeEncodeError as err:
3046                    self.fail('Unable to encode %a to "cp%s" with '
3047                              'errors=%r: %s' % (text, cp, errors, err))
3048                self.assertEqual(encoded[0], expected,
3049                    '%a.encode("cp%s", %r)=%a != %a'
3050                    % (text, cp, errors, encoded[0], expected))
3051                self.assertEqual(encoded[1], len(text))
3052            else:
3053                self.assertRaises(UnicodeEncodeError,
3054                    codecs.code_page_encode, cp, text, errors)
3055
3056    def test_cp932(self):
3057        self.check_encode(932, (
3058            ('abc', 'strict', b'abc'),
3059            ('\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
3060            # test error handlers
3061            ('\xff', 'strict', None),
3062            ('[\xff]', 'ignore', b'[]'),
3063            ('[\xff]', 'replace', b'[y]'),
3064            ('[\u20ac]', 'replace', b'[?]'),
3065            ('[\xff]', 'backslashreplace', b'[\\xff]'),
3066            ('[\xff]', 'namereplace',
3067             b'[\\N{LATIN SMALL LETTER Y WITH DIAERESIS}]'),
3068            ('[\xff]', 'xmlcharrefreplace', b'[&#255;]'),
3069            ('\udcff', 'strict', None),
3070            ('[\udcff]', 'surrogateescape', b'[\xff]'),
3071            ('[\udcff]', 'surrogatepass', None),
3072        ))
3073        self.check_decode(932, (
3074            (b'abc', 'strict', 'abc'),
3075            (b'\x82\x84\xe9\x80', 'strict', '\uff44\u9a3e'),
3076            # invalid bytes
3077            (b'[\xff]', 'strict', None),
3078            (b'[\xff]', 'ignore', '[]'),
3079            (b'[\xff]', 'replace', '[\ufffd]'),
3080            (b'[\xff]', 'backslashreplace', '[\\xff]'),
3081            (b'[\xff]', 'surrogateescape', '[\udcff]'),
3082            (b'[\xff]', 'surrogatepass', None),
3083            (b'\x81\x00abc', 'strict', None),
3084            (b'\x81\x00abc', 'ignore', '\x00abc'),
3085            (b'\x81\x00abc', 'replace', '\ufffd\x00abc'),
3086            (b'\x81\x00abc', 'backslashreplace', '\\x81\x00abc'),
3087        ))
3088
3089    def test_cp1252(self):
3090        self.check_encode(1252, (
3091            ('abc', 'strict', b'abc'),
3092            ('\xe9\u20ac', 'strict',  b'\xe9\x80'),
3093            ('\xff', 'strict', b'\xff'),
3094            # test error handlers
3095            ('\u0141', 'strict', None),
3096            ('\u0141', 'ignore', b''),
3097            ('\u0141', 'replace', b'L'),
3098            ('\udc98', 'surrogateescape', b'\x98'),
3099            ('\udc98', 'surrogatepass', None),
3100        ))
3101        self.check_decode(1252, (
3102            (b'abc', 'strict', 'abc'),
3103            (b'\xe9\x80', 'strict', '\xe9\u20ac'),
3104            (b'\xff', 'strict', '\xff'),
3105        ))
3106
3107    def test_cp_utf7(self):
3108        cp = 65000
3109        self.check_encode(cp, (
3110            ('abc', 'strict', b'abc'),
3111            ('\xe9\u20ac', 'strict',  b'+AOkgrA-'),
3112            ('\U0010ffff', 'strict',  b'+2//f/w-'),
3113            ('\udc80', 'strict', b'+3IA-'),
3114            ('\ufffd', 'strict', b'+//0-'),
3115        ))
3116        self.check_decode(cp, (
3117            (b'abc', 'strict', 'abc'),
3118            (b'+AOkgrA-', 'strict', '\xe9\u20ac'),
3119            (b'+2//f/w-', 'strict', '\U0010ffff'),
3120            (b'+3IA-', 'strict', '\udc80'),
3121            (b'+//0-', 'strict', '\ufffd'),
3122            # invalid bytes
3123            (b'[+/]', 'strict', '[]'),
3124            (b'[\xff]', 'strict', '[\xff]'),
3125        ))
3126
3127    def test_multibyte_encoding(self):
3128        self.check_decode(932, (
3129            (b'\x84\xe9\x80', 'ignore', '\u9a3e'),
3130            (b'\x84\xe9\x80', 'replace', '\ufffd\u9a3e'),
3131        ))
3132        self.check_decode(self.CP_UTF8, (
3133            (b'\xff\xf4\x8f\xbf\xbf', 'ignore', '\U0010ffff'),
3134            (b'\xff\xf4\x8f\xbf\xbf', 'replace', '\ufffd\U0010ffff'),
3135        ))
3136        self.check_encode(self.CP_UTF8, (
3137            ('[\U0010ffff\uDC80]', 'ignore', b'[\xf4\x8f\xbf\xbf]'),
3138            ('[\U0010ffff\uDC80]', 'replace', b'[\xf4\x8f\xbf\xbf?]'),
3139        ))
3140
3141    def test_code_page_decode_flags(self):
3142        # Issue #36312: For some code pages (e.g. UTF-7) flags for
3143        # MultiByteToWideChar() must be set to 0.
3144        if support.verbose:
3145            sys.stdout.write('\n')
3146        for cp in (50220, 50221, 50222, 50225, 50227, 50229,
3147                   *range(57002, 57011+1), 65000):
3148            # On small versions of Windows like Windows IoT
3149            # not all codepages are present.
3150            # A missing codepage causes an OSError exception
3151            # so check for the codepage before decoding
3152            if is_code_page_present(cp):
3153                self.assertEqual(codecs.code_page_decode(cp, b'abc'), ('abc', 3), f'cp{cp}')
3154            else:
3155                if support.verbose:
3156                    print(f"  skipping cp={cp}")
3157        self.assertEqual(codecs.code_page_decode(42, b'abc'),
3158                         ('\uf061\uf062\uf063', 3))
3159
3160    def test_incremental(self):
3161        decoded = codecs.code_page_decode(932, b'\x82', 'strict', False)
3162        self.assertEqual(decoded, ('', 0))
3163
3164        decoded = codecs.code_page_decode(932,
3165                                          b'\xe9\x80\xe9', 'strict',
3166                                          False)
3167        self.assertEqual(decoded, ('\u9a3e', 2))
3168
3169        decoded = codecs.code_page_decode(932,
3170                                          b'\xe9\x80\xe9\x80', 'strict',
3171                                          False)
3172        self.assertEqual(decoded, ('\u9a3e\u9a3e', 4))
3173
3174        decoded = codecs.code_page_decode(932,
3175                                          b'abc', 'strict',
3176                                          False)
3177        self.assertEqual(decoded, ('abc', 3))
3178
3179    def test_mbcs_alias(self):
3180        # Check that looking up our 'default' codepage will return
3181        # mbcs when we don't have a more specific one available
3182        with mock.patch('_winapi.GetACP', return_value=123):
3183            codec = codecs.lookup('cp123')
3184            self.assertEqual(codec.name, 'mbcs')
3185
3186    @support.bigmemtest(size=2**31, memuse=7, dry_run=False)
3187    def test_large_input(self, size):
3188        # Test input longer than INT_MAX.
3189        # Input should contain undecodable bytes before and after
3190        # the INT_MAX limit.
3191        encoded = (b'01234567' * ((size//8)-1) +
3192                   b'\x85\x86\xea\xeb\xec\xef\xfc\xfd\xfe\xff')
3193        self.assertEqual(len(encoded), size+2)
3194        decoded = codecs.code_page_decode(932, encoded, 'surrogateescape', True)
3195        self.assertEqual(decoded[1], len(encoded))
3196        del encoded
3197        self.assertEqual(len(decoded[0]), decoded[1])
3198        self.assertEqual(decoded[0][:10], '0123456701')
3199        self.assertEqual(decoded[0][-20:],
3200                         '6701234567'
3201                         '\udc85\udc86\udcea\udceb\udcec'
3202                         '\udcef\udcfc\udcfd\udcfe\udcff')
3203
3204    @support.bigmemtest(size=2**31, memuse=6, dry_run=False)
3205    def test_large_utf8_input(self, size):
3206        # Test input longer than INT_MAX.
3207        # Input should contain a decodable multi-byte character
3208        # surrounding INT_MAX
3209        encoded = (b'0123456\xed\x84\x80' * (size//8))
3210        self.assertEqual(len(encoded), size // 8 * 10)
3211        decoded = codecs.code_page_decode(65001, encoded, 'ignore', True)
3212        self.assertEqual(decoded[1], len(encoded))
3213        del encoded
3214        self.assertEqual(len(decoded[0]), size)
3215        self.assertEqual(decoded[0][:10], '0123456\ud10001')
3216        self.assertEqual(decoded[0][-11:], '56\ud1000123456\ud100')
3217
3218
3219class ASCIITest(unittest.TestCase):
3220    def test_encode(self):
3221        self.assertEqual('abc123'.encode('ascii'), b'abc123')
3222
3223    def test_encode_error(self):
3224        for data, error_handler, expected in (
3225            ('[\x80\xff\u20ac]', 'ignore', b'[]'),
3226            ('[\x80\xff\u20ac]', 'replace', b'[???]'),
3227            ('[\x80\xff\u20ac]', 'xmlcharrefreplace', b'[&#128;&#255;&#8364;]'),
3228            ('[\x80\xff\u20ac\U000abcde]', 'backslashreplace',
3229             b'[\\x80\\xff\\u20ac\\U000abcde]'),
3230            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3231        ):
3232            with self.subTest(data=data, error_handler=error_handler,
3233                              expected=expected):
3234                self.assertEqual(data.encode('ascii', error_handler),
3235                                 expected)
3236
3237    def test_encode_surrogateescape_error(self):
3238        with self.assertRaises(UnicodeEncodeError):
3239            # the first character can be decoded, but not the second
3240            '\udc80\xff'.encode('ascii', 'surrogateescape')
3241
3242    def test_decode(self):
3243        self.assertEqual(b'abc'.decode('ascii'), 'abc')
3244
3245    def test_decode_error(self):
3246        for data, error_handler, expected in (
3247            (b'[\x80\xff]', 'ignore', '[]'),
3248            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
3249            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
3250            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
3251        ):
3252            with self.subTest(data=data, error_handler=error_handler,
3253                              expected=expected):
3254                self.assertEqual(data.decode('ascii', error_handler),
3255                                 expected)
3256
3257
3258class Latin1Test(unittest.TestCase):
3259    def test_encode(self):
3260        for data, expected in (
3261            ('abc', b'abc'),
3262            ('\x80\xe9\xff', b'\x80\xe9\xff'),
3263        ):
3264            with self.subTest(data=data, expected=expected):
3265                self.assertEqual(data.encode('latin1'), expected)
3266
3267    def test_encode_errors(self):
3268        for data, error_handler, expected in (
3269            ('[\u20ac\udc80]', 'ignore', b'[]'),
3270            ('[\u20ac\udc80]', 'replace', b'[??]'),
3271            ('[\u20ac\U000abcde]', 'backslashreplace',
3272             b'[\\u20ac\\U000abcde]'),
3273            ('[\u20ac\udc80]', 'xmlcharrefreplace', b'[&#8364;&#56448;]'),
3274            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3275        ):
3276            with self.subTest(data=data, error_handler=error_handler,
3277                              expected=expected):
3278                self.assertEqual(data.encode('latin1', error_handler),
3279                                 expected)
3280
3281    def test_encode_surrogateescape_error(self):
3282        with self.assertRaises(UnicodeEncodeError):
3283            # the first character can be decoded, but not the second
3284            '\udc80\u20ac'.encode('latin1', 'surrogateescape')
3285
3286    def test_decode(self):
3287        for data, expected in (
3288            (b'abc', 'abc'),
3289            (b'[\x80\xff]', '[\x80\xff]'),
3290        ):
3291            with self.subTest(data=data, expected=expected):
3292                self.assertEqual(data.decode('latin1'), expected)
3293
3294
3295class StreamRecoderTest(unittest.TestCase):
3296    def test_writelines(self):
3297        bio = io.BytesIO()
3298        codec = codecs.lookup('ascii')
3299        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3300                                  encodings.ascii.StreamReader, encodings.ascii.StreamWriter)
3301        sr.writelines([b'a', b'b'])
3302        self.assertEqual(bio.getvalue(), b'ab')
3303
3304    def test_write(self):
3305        bio = io.BytesIO()
3306        codec = codecs.lookup('latin1')
3307        # Recode from Latin-1 to utf-8.
3308        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3309                                  encodings.utf_8.StreamReader, encodings.utf_8.StreamWriter)
3310
3311        text = 'àñé'
3312        sr.write(text.encode('latin1'))
3313        self.assertEqual(bio.getvalue(), text.encode('utf-8'))
3314
3315    def test_seeking_read(self):
3316        bio = io.BytesIO('line1\nline2\nline3\n'.encode('utf-16-le'))
3317        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3318
3319        self.assertEqual(sr.readline(), b'line1\n')
3320        sr.seek(0)
3321        self.assertEqual(sr.readline(), b'line1\n')
3322        self.assertEqual(sr.readline(), b'line2\n')
3323        self.assertEqual(sr.readline(), b'line3\n')
3324        self.assertEqual(sr.readline(), b'')
3325
3326    def test_seeking_write(self):
3327        bio = io.BytesIO('123456789\n'.encode('utf-16-le'))
3328        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3329
3330        # Test that seek() only resets its internal buffer when offset
3331        # and whence are zero.
3332        sr.seek(2)
3333        sr.write(b'\nabc\n')
3334        self.assertEqual(sr.readline(), b'789\n')
3335        sr.seek(0)
3336        self.assertEqual(sr.readline(), b'1\n')
3337        self.assertEqual(sr.readline(), b'abc\n')
3338        self.assertEqual(sr.readline(), b'789\n')
3339
3340
3341@unittest.skipIf(_testcapi is None, 'need _testcapi module')
3342class LocaleCodecTest(unittest.TestCase):
3343    """
3344    Test indirectly _Py_DecodeUTF8Ex() and _Py_EncodeUTF8Ex().
3345    """
3346    ENCODING = sys.getfilesystemencoding()
3347    STRINGS = ("ascii", "ulatin1:\xa7\xe9",
3348               "u255:\xff",
3349               "UCS:\xe9\u20ac\U0010ffff",
3350               "surrogates:\uDC80\uDCFF")
3351    BYTES_STRINGS = (b"blatin1:\xa7\xe9", b"b255:\xff")
3352    SURROGATES = "\uDC80\uDCFF"
3353
3354    def encode(self, text, errors="strict"):
3355        return _testcapi.EncodeLocaleEx(text, 0, errors)
3356
3357    def check_encode_strings(self, errors):
3358        for text in self.STRINGS:
3359            with self.subTest(text=text):
3360                try:
3361                    expected = text.encode(self.ENCODING, errors)
3362                except UnicodeEncodeError:
3363                    with self.assertRaises(RuntimeError) as cm:
3364                        self.encode(text, errors)
3365                    errmsg = str(cm.exception)
3366                    self.assertRegex(errmsg, r"encode error: pos=[0-9]+, reason=")
3367                else:
3368                    encoded = self.encode(text, errors)
3369                    self.assertEqual(encoded, expected)
3370
3371    def test_encode_strict(self):
3372        self.check_encode_strings("strict")
3373
3374    def test_encode_surrogateescape(self):
3375        self.check_encode_strings("surrogateescape")
3376
3377    def test_encode_surrogatepass(self):
3378        try:
3379            self.encode('', 'surrogatepass')
3380        except ValueError as exc:
3381            if str(exc) == 'unsupported error handler':
3382                self.skipTest(f"{self.ENCODING!r} encoder doesn't support "
3383                              f"surrogatepass error handler")
3384            else:
3385                raise
3386
3387        self.check_encode_strings("surrogatepass")
3388
3389    def test_encode_unsupported_error_handler(self):
3390        with self.assertRaises(ValueError) as cm:
3391            self.encode('', 'backslashreplace')
3392        self.assertEqual(str(cm.exception), 'unsupported error handler')
3393
3394    def decode(self, encoded, errors="strict"):
3395        return _testcapi.DecodeLocaleEx(encoded, 0, errors)
3396
3397    def check_decode_strings(self, errors):
3398        is_utf8 = (self.ENCODING == "utf-8")
3399        if is_utf8:
3400            encode_errors = 'surrogateescape'
3401        else:
3402            encode_errors = 'strict'
3403
3404        strings = list(self.BYTES_STRINGS)
3405        for text in self.STRINGS:
3406            try:
3407                encoded = text.encode(self.ENCODING, encode_errors)
3408                if encoded not in strings:
3409                    strings.append(encoded)
3410            except UnicodeEncodeError:
3411                encoded = None
3412
3413            if is_utf8:
3414                encoded2 = text.encode(self.ENCODING, 'surrogatepass')
3415                if encoded2 != encoded:
3416                    strings.append(encoded2)
3417
3418        for encoded in strings:
3419            with self.subTest(encoded=encoded):
3420                try:
3421                    expected = encoded.decode(self.ENCODING, errors)
3422                except UnicodeDecodeError:
3423                    with self.assertRaises(RuntimeError) as cm:
3424                        self.decode(encoded, errors)
3425                    errmsg = str(cm.exception)
3426                    self.assertTrue(errmsg.startswith("decode error: "), errmsg)
3427                else:
3428                    decoded = self.decode(encoded, errors)
3429                    self.assertEqual(decoded, expected)
3430
3431    def test_decode_strict(self):
3432        self.check_decode_strings("strict")
3433
3434    def test_decode_surrogateescape(self):
3435        self.check_decode_strings("surrogateescape")
3436
3437    def test_decode_surrogatepass(self):
3438        try:
3439            self.decode(b'', 'surrogatepass')
3440        except ValueError as exc:
3441            if str(exc) == 'unsupported error handler':
3442                self.skipTest(f"{self.ENCODING!r} decoder doesn't support "
3443                              f"surrogatepass error handler")
3444            else:
3445                raise
3446
3447        self.check_decode_strings("surrogatepass")
3448
3449    def test_decode_unsupported_error_handler(self):
3450        with self.assertRaises(ValueError) as cm:
3451            self.decode(b'', 'backslashreplace')
3452        self.assertEqual(str(cm.exception), 'unsupported error handler')
3453
3454
3455class Rot13Test(unittest.TestCase):
3456    """Test the educational ROT-13 codec."""
3457    def test_encode(self):
3458        ciphertext = codecs.encode("Caesar liked ciphers", 'rot-13')
3459        self.assertEqual(ciphertext, 'Pnrfne yvxrq pvcuref')
3460
3461    def test_decode(self):
3462        plaintext = codecs.decode('Rg gh, Oehgr?', 'rot-13')
3463        self.assertEqual(plaintext, 'Et tu, Brute?')
3464
3465    def test_incremental_encode(self):
3466        encoder = codecs.getincrementalencoder('rot-13')()
3467        ciphertext = encoder.encode('ABBA nag Cheryl Baker')
3468        self.assertEqual(ciphertext, 'NOON ant Purely Onxre')
3469
3470    def test_incremental_decode(self):
3471        decoder = codecs.getincrementaldecoder('rot-13')()
3472        plaintext = decoder.decode('terra Ares envy tha')
3473        self.assertEqual(plaintext, 'green Nerf rail gun')
3474
3475
3476class Rot13UtilTest(unittest.TestCase):
3477    """Test the ROT-13 codec via rot13 function,
3478    i.e. the user has done something like:
3479    $ echo "Hello World" | python -m encodings.rot_13
3480    """
3481    def test_rot13_func(self):
3482        infile = io.StringIO('Gb or, be abg gb or, gung vf gur dhrfgvba')
3483        outfile = io.StringIO()
3484        encodings.rot_13.rot13(infile, outfile)
3485        outfile.seek(0)
3486        plain_text = outfile.read()
3487        self.assertEqual(
3488            plain_text,
3489            'To be, or not to be, that is the question')
3490
3491
3492class CodecNameNormalizationTest(unittest.TestCase):
3493    """Test codec name normalization"""
3494    def test_codecs_lookup(self):
3495        FOUND = (1, 2, 3, 4)
3496        NOT_FOUND = (None, None, None, None)
3497        def search_function(encoding):
3498            if encoding == "aaa_8":
3499                return FOUND
3500            else:
3501                return NOT_FOUND
3502
3503        codecs.register(search_function)
3504        self.addCleanup(codecs.unregister, search_function)
3505        self.assertEqual(FOUND, codecs.lookup('aaa_8'))
3506        self.assertEqual(FOUND, codecs.lookup('AAA-8'))
3507        self.assertEqual(FOUND, codecs.lookup('AAA---8'))
3508        self.assertEqual(FOUND, codecs.lookup('AAA   8'))
3509        self.assertEqual(FOUND, codecs.lookup('aaa\xe9\u20ac-8'))
3510        self.assertEqual(NOT_FOUND, codecs.lookup('AAA.8'))
3511        self.assertEqual(NOT_FOUND, codecs.lookup('AAA...8'))
3512        self.assertEqual(NOT_FOUND, codecs.lookup('BBB-8'))
3513        self.assertEqual(NOT_FOUND, codecs.lookup('BBB.8'))
3514        self.assertEqual(NOT_FOUND, codecs.lookup('a\xe9\u20ac-8'))
3515
3516    def test_encodings_normalize_encoding(self):
3517        # encodings.normalize_encoding() ignores non-ASCII characters.
3518        normalize = encodings.normalize_encoding
3519        self.assertEqual(normalize('utf_8'), 'utf_8')
3520        self.assertEqual(normalize('utf\xE9\u20AC\U0010ffff-8'), 'utf_8')
3521        self.assertEqual(normalize('utf   8'), 'utf_8')
3522        # encodings.normalize_encoding() doesn't convert
3523        # characters to lower case.
3524        self.assertEqual(normalize('UTF 8'), 'UTF_8')
3525        self.assertEqual(normalize('utf.8'), 'utf.8')
3526        self.assertEqual(normalize('utf...8'), 'utf...8')
3527
3528
3529if __name__ == "__main__":
3530    unittest.main()
3531