• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import codecs
2import contextlib
3import io
4import locale
5import sys
6import unittest
7import encodings
8from unittest import mock
9
10from test import support
11
12try:
13    import _testcapi
14except ImportError as exc:
15    _testcapi = None
16
17try:
18    import ctypes
19except ImportError:
20    ctypes = None
21    SIZEOF_WCHAR_T = -1
22else:
23    SIZEOF_WCHAR_T = ctypes.sizeof(ctypes.c_wchar)
24
25def coding_checker(self, coder):
26    def check(input, expect):
27        self.assertEqual(coder(input), (expect, len(input)))
28    return check
29
30# On small versions of Windows like Windows IoT or Windows Nano Server not all codepages are present
31def is_code_page_present(cp):
32    from ctypes import POINTER, WINFUNCTYPE, WinDLL
33    from ctypes.wintypes import BOOL, UINT, BYTE, WCHAR, UINT, DWORD
34
35    MAX_LEADBYTES = 12  # 5 ranges, 2 bytes ea., 0 term.
36    MAX_DEFAULTCHAR = 2 # single or double byte
37    MAX_PATH = 260
38    class CPINFOEXW(ctypes.Structure):
39        _fields_ = [("MaxCharSize", UINT),
40                    ("DefaultChar", BYTE*MAX_DEFAULTCHAR),
41                    ("LeadByte", BYTE*MAX_LEADBYTES),
42                    ("UnicodeDefaultChar", WCHAR),
43                    ("CodePage", UINT),
44                    ("CodePageName", WCHAR*MAX_PATH)]
45
46    prototype = WINFUNCTYPE(BOOL, UINT, DWORD, POINTER(CPINFOEXW))
47    GetCPInfoEx = prototype(("GetCPInfoExW", WinDLL("kernel32")))
48    info = CPINFOEXW()
49    return GetCPInfoEx(cp, 0, info)
50
51class Queue(object):
52    """
53    queue: write bytes at one end, read bytes from the other end
54    """
55    def __init__(self, buffer):
56        self._buffer = buffer
57
58    def write(self, chars):
59        self._buffer += chars
60
61    def read(self, size=-1):
62        if size<0:
63            s = self._buffer
64            self._buffer = self._buffer[:0] # make empty
65            return s
66        else:
67            s = self._buffer[:size]
68            self._buffer = self._buffer[size:]
69            return s
70
71
72class MixInCheckStateHandling:
73    def check_state_handling_decode(self, encoding, u, s):
74        for i in range(len(s)+1):
75            d = codecs.getincrementaldecoder(encoding)()
76            part1 = d.decode(s[:i])
77            state = d.getstate()
78            self.assertIsInstance(state[1], int)
79            # Check that the condition stated in the documentation for
80            # IncrementalDecoder.getstate() holds
81            if not state[1]:
82                # reset decoder to the default state without anything buffered
83                d.setstate((state[0][:0], 0))
84                # Feeding the previous input may not produce any output
85                self.assertTrue(not d.decode(state[0]))
86                # The decoder must return to the same state
87                self.assertEqual(state, d.getstate())
88            # Create a new decoder and set it to the state
89            # we extracted from the old one
90            d = codecs.getincrementaldecoder(encoding)()
91            d.setstate(state)
92            part2 = d.decode(s[i:], True)
93            self.assertEqual(u, part1+part2)
94
95    def check_state_handling_encode(self, encoding, u, s):
96        for i in range(len(u)+1):
97            d = codecs.getincrementalencoder(encoding)()
98            part1 = d.encode(u[:i])
99            state = d.getstate()
100            d = codecs.getincrementalencoder(encoding)()
101            d.setstate(state)
102            part2 = d.encode(u[i:], True)
103            self.assertEqual(s, part1+part2)
104
105
106class ReadTest(MixInCheckStateHandling):
107    def check_partial(self, input, partialresults):
108        # get a StreamReader for the encoding and feed the bytestring version
109        # of input to the reader byte by byte. Read everything available from
110        # the StreamReader and check that the results equal the appropriate
111        # entries from partialresults.
112        q = Queue(b"")
113        r = codecs.getreader(self.encoding)(q)
114        result = ""
115        for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
116            q.write(bytes([c]))
117            result += r.read()
118            self.assertEqual(result, partialresult)
119        # check that there's nothing left in the buffers
120        self.assertEqual(r.read(), "")
121        self.assertEqual(r.bytebuffer, b"")
122
123        # do the check again, this time using an incremental decoder
124        d = codecs.getincrementaldecoder(self.encoding)()
125        result = ""
126        for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
127            result += d.decode(bytes([c]))
128            self.assertEqual(result, partialresult)
129        # check that there's nothing left in the buffers
130        self.assertEqual(d.decode(b"", True), "")
131        self.assertEqual(d.buffer, b"")
132
133        # Check whether the reset method works properly
134        d.reset()
135        result = ""
136        for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
137            result += d.decode(bytes([c]))
138            self.assertEqual(result, partialresult)
139        # check that there's nothing left in the buffers
140        self.assertEqual(d.decode(b"", True), "")
141        self.assertEqual(d.buffer, b"")
142
143        # check iterdecode()
144        encoded = input.encode(self.encoding)
145        self.assertEqual(
146            input,
147            "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding))
148        )
149
150    def test_readline(self):
151        def getreader(input):
152            stream = io.BytesIO(input.encode(self.encoding))
153            return codecs.getreader(self.encoding)(stream)
154
155        def readalllines(input, keepends=True, size=None):
156            reader = getreader(input)
157            lines = []
158            while True:
159                line = reader.readline(size=size, keepends=keepends)
160                if not line:
161                    break
162                lines.append(line)
163            return "|".join(lines)
164
165        s = "foo\nbar\r\nbaz\rspam\u2028eggs"
166        sexpected = "foo\n|bar\r\n|baz\r|spam\u2028|eggs"
167        sexpectednoends = "foo|bar|baz|spam|eggs"
168        self.assertEqual(readalllines(s, True), sexpected)
169        self.assertEqual(readalllines(s, False), sexpectednoends)
170        self.assertEqual(readalllines(s, True, 10), sexpected)
171        self.assertEqual(readalllines(s, False, 10), sexpectednoends)
172
173        lineends = ("\n", "\r\n", "\r", "\u2028")
174        # Test long lines (multiple calls to read() in readline())
175        vw = []
176        vwo = []
177        for (i, lineend) in enumerate(lineends):
178            vw.append((i*200+200)*"\u3042" + lineend)
179            vwo.append((i*200+200)*"\u3042")
180        self.assertEqual(readalllines("".join(vw), True), "|".join(vw))
181        self.assertEqual(readalllines("".join(vw), False), "|".join(vwo))
182
183        # Test lines where the first read might end with \r, so the
184        # reader has to look ahead whether this is a lone \r or a \r\n
185        for size in range(80):
186            for lineend in lineends:
187                s = 10*(size*"a" + lineend + "xxx\n")
188                reader = getreader(s)
189                for i in range(10):
190                    self.assertEqual(
191                        reader.readline(keepends=True),
192                        size*"a" + lineend,
193                    )
194                    self.assertEqual(
195                        reader.readline(keepends=True),
196                        "xxx\n",
197                    )
198                reader = getreader(s)
199                for i in range(10):
200                    self.assertEqual(
201                        reader.readline(keepends=False),
202                        size*"a",
203                    )
204                    self.assertEqual(
205                        reader.readline(keepends=False),
206                        "xxx",
207                    )
208
209    def test_mixed_readline_and_read(self):
210        lines = ["Humpty Dumpty sat on a wall,\n",
211                 "Humpty Dumpty had a great fall.\r\n",
212                 "All the king's horses and all the king's men\r",
213                 "Couldn't put Humpty together again."]
214        data = ''.join(lines)
215        def getreader():
216            stream = io.BytesIO(data.encode(self.encoding))
217            return codecs.getreader(self.encoding)(stream)
218
219        # Issue #8260: Test readline() followed by read()
220        f = getreader()
221        self.assertEqual(f.readline(), lines[0])
222        self.assertEqual(f.read(), ''.join(lines[1:]))
223        self.assertEqual(f.read(), '')
224
225        # Issue #32110: Test readline() followed by read(n)
226        f = getreader()
227        self.assertEqual(f.readline(), lines[0])
228        self.assertEqual(f.read(1), lines[1][0])
229        self.assertEqual(f.read(0), '')
230        self.assertEqual(f.read(100), data[len(lines[0]) + 1:][:100])
231
232        # Issue #16636: Test readline() followed by readlines()
233        f = getreader()
234        self.assertEqual(f.readline(), lines[0])
235        self.assertEqual(f.readlines(), lines[1:])
236        self.assertEqual(f.read(), '')
237
238        # Test read(n) followed by read()
239        f = getreader()
240        self.assertEqual(f.read(size=40, chars=5), data[:5])
241        self.assertEqual(f.read(), data[5:])
242        self.assertEqual(f.read(), '')
243
244        # Issue #32110: Test read(n) followed by read(n)
245        f = getreader()
246        self.assertEqual(f.read(size=40, chars=5), data[:5])
247        self.assertEqual(f.read(1), data[5])
248        self.assertEqual(f.read(0), '')
249        self.assertEqual(f.read(100), data[6:106])
250
251        # Issue #12446: Test read(n) followed by readlines()
252        f = getreader()
253        self.assertEqual(f.read(size=40, chars=5), data[:5])
254        self.assertEqual(f.readlines(), [lines[0][5:]] + lines[1:])
255        self.assertEqual(f.read(), '')
256
257    def test_bug1175396(self):
258        s = [
259            '<%!--===================================================\r\n',
260            '    BLOG index page: show recent articles,\r\n',
261            '    today\'s articles, or articles of a specific date.\r\n',
262            '========================================================--%>\r\n',
263            '<%@inputencoding="ISO-8859-1"%>\r\n',
264            '<%@pagetemplate=TEMPLATE.y%>\r\n',
265            '<%@import=import frog.util, frog%>\r\n',
266            '<%@import=import frog.objects%>\r\n',
267            '<%@import=from frog.storageerrors import StorageError%>\r\n',
268            '<%\r\n',
269            '\r\n',
270            'import logging\r\n',
271            'log=logging.getLogger("Snakelets.logger")\r\n',
272            '\r\n',
273            '\r\n',
274            'user=self.SessionCtx.user\r\n',
275            'storageEngine=self.SessionCtx.storageEngine\r\n',
276            '\r\n',
277            '\r\n',
278            'def readArticlesFromDate(date, count=None):\r\n',
279            '    entryids=storageEngine.listBlogEntries(date)\r\n',
280            '    entryids.reverse() # descending\r\n',
281            '    if count:\r\n',
282            '        entryids=entryids[:count]\r\n',
283            '    try:\r\n',
284            '        return [ frog.objects.BlogEntry.load(storageEngine, date, Id) for Id in entryids ]\r\n',
285            '    except StorageError,x:\r\n',
286            '        log.error("Error loading articles: "+str(x))\r\n',
287            '        self.abort("cannot load articles")\r\n',
288            '\r\n',
289            'showdate=None\r\n',
290            '\r\n',
291            'arg=self.Request.getArg()\r\n',
292            'if arg=="today":\r\n',
293            '    #-------------------- TODAY\'S ARTICLES\r\n',
294            '    self.write("<h2>Today\'s articles</h2>")\r\n',
295            '    showdate = frog.util.isodatestr() \r\n',
296            '    entries = readArticlesFromDate(showdate)\r\n',
297            'elif arg=="active":\r\n',
298            '    #-------------------- ACTIVE ARTICLES redirect\r\n',
299            '    self.Yredirect("active.y")\r\n',
300            'elif arg=="login":\r\n',
301            '    #-------------------- LOGIN PAGE redirect\r\n',
302            '    self.Yredirect("login.y")\r\n',
303            'elif arg=="date":\r\n',
304            '    #-------------------- ARTICLES OF A SPECIFIC DATE\r\n',
305            '    showdate = self.Request.getParameter("date")\r\n',
306            '    self.write("<h2>Articles written on %s</h2>"% frog.util.mediumdatestr(showdate))\r\n',
307            '    entries = readArticlesFromDate(showdate)\r\n',
308            'else:\r\n',
309            '    #-------------------- RECENT ARTICLES\r\n',
310            '    self.write("<h2>Recent articles</h2>")\r\n',
311            '    dates=storageEngine.listBlogEntryDates()\r\n',
312            '    if dates:\r\n',
313            '        entries=[]\r\n',
314            '        SHOWAMOUNT=10\r\n',
315            '        for showdate in dates:\r\n',
316            '            entries.extend( readArticlesFromDate(showdate, SHOWAMOUNT-len(entries)) )\r\n',
317            '            if len(entries)>=SHOWAMOUNT:\r\n',
318            '                break\r\n',
319            '                \r\n',
320        ]
321        stream = io.BytesIO("".join(s).encode(self.encoding))
322        reader = codecs.getreader(self.encoding)(stream)
323        for (i, line) in enumerate(reader):
324            self.assertEqual(line, s[i])
325
326    def test_readlinequeue(self):
327        q = Queue(b"")
328        writer = codecs.getwriter(self.encoding)(q)
329        reader = codecs.getreader(self.encoding)(q)
330
331        # No lineends
332        writer.write("foo\r")
333        self.assertEqual(reader.readline(keepends=False), "foo")
334        writer.write("\nbar\r")
335        self.assertEqual(reader.readline(keepends=False), "")
336        self.assertEqual(reader.readline(keepends=False), "bar")
337        writer.write("baz")
338        self.assertEqual(reader.readline(keepends=False), "baz")
339        self.assertEqual(reader.readline(keepends=False), "")
340
341        # Lineends
342        writer.write("foo\r")
343        self.assertEqual(reader.readline(keepends=True), "foo\r")
344        writer.write("\nbar\r")
345        self.assertEqual(reader.readline(keepends=True), "\n")
346        self.assertEqual(reader.readline(keepends=True), "bar\r")
347        writer.write("baz")
348        self.assertEqual(reader.readline(keepends=True), "baz")
349        self.assertEqual(reader.readline(keepends=True), "")
350        writer.write("foo\r\n")
351        self.assertEqual(reader.readline(keepends=True), "foo\r\n")
352
353    def test_bug1098990_a(self):
354        s1 = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy\r\n"
355        s2 = "offending line: ladfj askldfj klasdj fskla dfzaskdj fasklfj laskd fjasklfzzzzaa%whereisthis!!!\r\n"
356        s3 = "next line.\r\n"
357
358        s = (s1+s2+s3).encode(self.encoding)
359        stream = io.BytesIO(s)
360        reader = codecs.getreader(self.encoding)(stream)
361        self.assertEqual(reader.readline(), s1)
362        self.assertEqual(reader.readline(), s2)
363        self.assertEqual(reader.readline(), s3)
364        self.assertEqual(reader.readline(), "")
365
366    def test_bug1098990_b(self):
367        s1 = "aaaaaaaaaaaaaaaaaaaaaaaa\r\n"
368        s2 = "bbbbbbbbbbbbbbbbbbbbbbbb\r\n"
369        s3 = "stillokay:bbbbxx\r\n"
370        s4 = "broken!!!!badbad\r\n"
371        s5 = "againokay.\r\n"
372
373        s = (s1+s2+s3+s4+s5).encode(self.encoding)
374        stream = io.BytesIO(s)
375        reader = codecs.getreader(self.encoding)(stream)
376        self.assertEqual(reader.readline(), s1)
377        self.assertEqual(reader.readline(), s2)
378        self.assertEqual(reader.readline(), s3)
379        self.assertEqual(reader.readline(), s4)
380        self.assertEqual(reader.readline(), s5)
381        self.assertEqual(reader.readline(), "")
382
383    ill_formed_sequence_replace = "\ufffd"
384
385    def test_lone_surrogates(self):
386        self.assertRaises(UnicodeEncodeError, "\ud800".encode, self.encoding)
387        self.assertEqual("[\uDC80]".encode(self.encoding, "backslashreplace"),
388                         "[\\udc80]".encode(self.encoding))
389        self.assertEqual("[\uDC80]".encode(self.encoding, "namereplace"),
390                         "[\\udc80]".encode(self.encoding))
391        self.assertEqual("[\uDC80]".encode(self.encoding, "xmlcharrefreplace"),
392                         "[&#56448;]".encode(self.encoding))
393        self.assertEqual("[\uDC80]".encode(self.encoding, "ignore"),
394                         "[]".encode(self.encoding))
395        self.assertEqual("[\uDC80]".encode(self.encoding, "replace"),
396                         "[?]".encode(self.encoding))
397
398        # sequential surrogate characters
399        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "ignore"),
400                         "[]".encode(self.encoding))
401        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "replace"),
402                         "[??]".encode(self.encoding))
403
404        bom = "".encode(self.encoding)
405        for before, after in [("\U00010fff", "A"), ("[", "]"),
406                              ("A", "\U00010fff")]:
407            before_sequence = before.encode(self.encoding)[len(bom):]
408            after_sequence = after.encode(self.encoding)[len(bom):]
409            test_string = before + "\uDC80" + after
410            test_sequence = (bom + before_sequence +
411                             self.ill_formed_sequence + after_sequence)
412            self.assertRaises(UnicodeDecodeError, test_sequence.decode,
413                              self.encoding)
414            self.assertEqual(test_string.encode(self.encoding,
415                                                "surrogatepass"),
416                             test_sequence)
417            self.assertEqual(test_sequence.decode(self.encoding,
418                                                  "surrogatepass"),
419                             test_string)
420            self.assertEqual(test_sequence.decode(self.encoding, "ignore"),
421                             before + after)
422            self.assertEqual(test_sequence.decode(self.encoding, "replace"),
423                             before + self.ill_formed_sequence_replace + after)
424            backslashreplace = ''.join('\\x%02x' % b
425                                       for b in self.ill_formed_sequence)
426            self.assertEqual(test_sequence.decode(self.encoding, "backslashreplace"),
427                             before + backslashreplace + after)
428
429    def test_incremental_surrogatepass(self):
430        # Test incremental decoder for surrogatepass handler:
431        # see issue #24214
432        # High surrogate
433        data = '\uD901'.encode(self.encoding, 'surrogatepass')
434        for i in range(1, len(data)):
435            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
436            self.assertEqual(dec.decode(data[:i]), '')
437            self.assertEqual(dec.decode(data[i:], True), '\uD901')
438        # Low surrogate
439        data = '\uDC02'.encode(self.encoding, 'surrogatepass')
440        for i in range(1, len(data)):
441            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
442            self.assertEqual(dec.decode(data[:i]), '')
443            self.assertEqual(dec.decode(data[i:]), '\uDC02')
444
445
446class UTF32Test(ReadTest, unittest.TestCase):
447    encoding = "utf-32"
448    if sys.byteorder == 'little':
449        ill_formed_sequence = b"\x80\xdc\x00\x00"
450    else:
451        ill_formed_sequence = b"\x00\x00\xdc\x80"
452
453    spamle = (b'\xff\xfe\x00\x00'
454              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00'
455              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00')
456    spambe = (b'\x00\x00\xfe\xff'
457              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m'
458              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m')
459
460    def test_only_one_bom(self):
461        _,_,reader,writer = codecs.lookup(self.encoding)
462        # encode some stream
463        s = io.BytesIO()
464        f = writer(s)
465        f.write("spam")
466        f.write("spam")
467        d = s.getvalue()
468        # check whether there is exactly one BOM in it
469        self.assertTrue(d == self.spamle or d == self.spambe)
470        # try to read it back
471        s = io.BytesIO(d)
472        f = reader(s)
473        self.assertEqual(f.read(), "spamspam")
474
475    def test_badbom(self):
476        s = io.BytesIO(4*b"\xff")
477        f = codecs.getreader(self.encoding)(s)
478        self.assertRaises(UnicodeError, f.read)
479
480        s = io.BytesIO(8*b"\xff")
481        f = codecs.getreader(self.encoding)(s)
482        self.assertRaises(UnicodeError, f.read)
483
484    def test_partial(self):
485        self.check_partial(
486            "\x00\xff\u0100\uffff\U00010000",
487            [
488                "", # first byte of BOM read
489                "", # second byte of BOM read
490                "", # third byte of BOM read
491                "", # fourth byte of BOM read => byteorder known
492                "",
493                "",
494                "",
495                "\x00",
496                "\x00",
497                "\x00",
498                "\x00",
499                "\x00\xff",
500                "\x00\xff",
501                "\x00\xff",
502                "\x00\xff",
503                "\x00\xff\u0100",
504                "\x00\xff\u0100",
505                "\x00\xff\u0100",
506                "\x00\xff\u0100",
507                "\x00\xff\u0100\uffff",
508                "\x00\xff\u0100\uffff",
509                "\x00\xff\u0100\uffff",
510                "\x00\xff\u0100\uffff",
511                "\x00\xff\u0100\uffff\U00010000",
512            ]
513        )
514
515    def test_handlers(self):
516        self.assertEqual(('\ufffd', 1),
517                         codecs.utf_32_decode(b'\x01', 'replace', True))
518        self.assertEqual(('', 1),
519                         codecs.utf_32_decode(b'\x01', 'ignore', True))
520
521    def test_errors(self):
522        self.assertRaises(UnicodeDecodeError, codecs.utf_32_decode,
523                          b"\xff", "strict", True)
524
525    def test_decoder_state(self):
526        self.check_state_handling_decode(self.encoding,
527                                         "spamspam", self.spamle)
528        self.check_state_handling_decode(self.encoding,
529                                         "spamspam", self.spambe)
530
531    def test_issue8941(self):
532        # Issue #8941: insufficient result allocation when decoding into
533        # surrogate pairs on UCS-2 builds.
534        encoded_le = b'\xff\xfe\x00\x00' + b'\x00\x00\x01\x00' * 1024
535        self.assertEqual('\U00010000' * 1024,
536                         codecs.utf_32_decode(encoded_le)[0])
537        encoded_be = b'\x00\x00\xfe\xff' + b'\x00\x01\x00\x00' * 1024
538        self.assertEqual('\U00010000' * 1024,
539                         codecs.utf_32_decode(encoded_be)[0])
540
541
542class UTF32LETest(ReadTest, unittest.TestCase):
543    encoding = "utf-32-le"
544    ill_formed_sequence = b"\x80\xdc\x00\x00"
545
546    def test_partial(self):
547        self.check_partial(
548            "\x00\xff\u0100\uffff\U00010000",
549            [
550                "",
551                "",
552                "",
553                "\x00",
554                "\x00",
555                "\x00",
556                "\x00",
557                "\x00\xff",
558                "\x00\xff",
559                "\x00\xff",
560                "\x00\xff",
561                "\x00\xff\u0100",
562                "\x00\xff\u0100",
563                "\x00\xff\u0100",
564                "\x00\xff\u0100",
565                "\x00\xff\u0100\uffff",
566                "\x00\xff\u0100\uffff",
567                "\x00\xff\u0100\uffff",
568                "\x00\xff\u0100\uffff",
569                "\x00\xff\u0100\uffff\U00010000",
570            ]
571        )
572
573    def test_simple(self):
574        self.assertEqual("\U00010203".encode(self.encoding), b"\x03\x02\x01\x00")
575
576    def test_errors(self):
577        self.assertRaises(UnicodeDecodeError, codecs.utf_32_le_decode,
578                          b"\xff", "strict", True)
579
580    def test_issue8941(self):
581        # Issue #8941: insufficient result allocation when decoding into
582        # surrogate pairs on UCS-2 builds.
583        encoded = b'\x00\x00\x01\x00' * 1024
584        self.assertEqual('\U00010000' * 1024,
585                         codecs.utf_32_le_decode(encoded)[0])
586
587
588class UTF32BETest(ReadTest, unittest.TestCase):
589    encoding = "utf-32-be"
590    ill_formed_sequence = b"\x00\x00\xdc\x80"
591
592    def test_partial(self):
593        self.check_partial(
594            "\x00\xff\u0100\uffff\U00010000",
595            [
596                "",
597                "",
598                "",
599                "\x00",
600                "\x00",
601                "\x00",
602                "\x00",
603                "\x00\xff",
604                "\x00\xff",
605                "\x00\xff",
606                "\x00\xff",
607                "\x00\xff\u0100",
608                "\x00\xff\u0100",
609                "\x00\xff\u0100",
610                "\x00\xff\u0100",
611                "\x00\xff\u0100\uffff",
612                "\x00\xff\u0100\uffff",
613                "\x00\xff\u0100\uffff",
614                "\x00\xff\u0100\uffff",
615                "\x00\xff\u0100\uffff\U00010000",
616            ]
617        )
618
619    def test_simple(self):
620        self.assertEqual("\U00010203".encode(self.encoding), b"\x00\x01\x02\x03")
621
622    def test_errors(self):
623        self.assertRaises(UnicodeDecodeError, codecs.utf_32_be_decode,
624                          b"\xff", "strict", True)
625
626    def test_issue8941(self):
627        # Issue #8941: insufficient result allocation when decoding into
628        # surrogate pairs on UCS-2 builds.
629        encoded = b'\x00\x01\x00\x00' * 1024
630        self.assertEqual('\U00010000' * 1024,
631                         codecs.utf_32_be_decode(encoded)[0])
632
633
634class UTF16Test(ReadTest, unittest.TestCase):
635    encoding = "utf-16"
636    if sys.byteorder == 'little':
637        ill_formed_sequence = b"\x80\xdc"
638    else:
639        ill_formed_sequence = b"\xdc\x80"
640
641    spamle = b'\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00'
642    spambe = b'\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m'
643
644    def test_only_one_bom(self):
645        _,_,reader,writer = codecs.lookup(self.encoding)
646        # encode some stream
647        s = io.BytesIO()
648        f = writer(s)
649        f.write("spam")
650        f.write("spam")
651        d = s.getvalue()
652        # check whether there is exactly one BOM in it
653        self.assertTrue(d == self.spamle or d == self.spambe)
654        # try to read it back
655        s = io.BytesIO(d)
656        f = reader(s)
657        self.assertEqual(f.read(), "spamspam")
658
659    def test_badbom(self):
660        s = io.BytesIO(b"\xff\xff")
661        f = codecs.getreader(self.encoding)(s)
662        self.assertRaises(UnicodeError, f.read)
663
664        s = io.BytesIO(b"\xff\xff\xff\xff")
665        f = codecs.getreader(self.encoding)(s)
666        self.assertRaises(UnicodeError, f.read)
667
668    def test_partial(self):
669        self.check_partial(
670            "\x00\xff\u0100\uffff\U00010000",
671            [
672                "", # first byte of BOM read
673                "", # second byte of BOM read => byteorder known
674                "",
675                "\x00",
676                "\x00",
677                "\x00\xff",
678                "\x00\xff",
679                "\x00\xff\u0100",
680                "\x00\xff\u0100",
681                "\x00\xff\u0100\uffff",
682                "\x00\xff\u0100\uffff",
683                "\x00\xff\u0100\uffff",
684                "\x00\xff\u0100\uffff",
685                "\x00\xff\u0100\uffff\U00010000",
686            ]
687        )
688
689    def test_handlers(self):
690        self.assertEqual(('\ufffd', 1),
691                         codecs.utf_16_decode(b'\x01', 'replace', True))
692        self.assertEqual(('', 1),
693                         codecs.utf_16_decode(b'\x01', 'ignore', True))
694
695    def test_errors(self):
696        self.assertRaises(UnicodeDecodeError, codecs.utf_16_decode,
697                          b"\xff", "strict", True)
698
699    def test_decoder_state(self):
700        self.check_state_handling_decode(self.encoding,
701                                         "spamspam", self.spamle)
702        self.check_state_handling_decode(self.encoding,
703                                         "spamspam", self.spambe)
704
705    def test_bug691291(self):
706        # Files are always opened in binary mode, even if no binary mode was
707        # specified.  This means that no automatic conversion of '\n' is done
708        # on reading and writing.
709        s1 = 'Hello\r\nworld\r\n'
710
711        s = s1.encode(self.encoding)
712        self.addCleanup(support.unlink, support.TESTFN)
713        with open(support.TESTFN, 'wb') as fp:
714            fp.write(s)
715        with support.check_warnings(('', DeprecationWarning)):
716            reader = codecs.open(support.TESTFN, 'U', encoding=self.encoding)
717        with reader:
718            self.assertEqual(reader.read(), s1)
719
720class UTF16LETest(ReadTest, unittest.TestCase):
721    encoding = "utf-16-le"
722    ill_formed_sequence = b"\x80\xdc"
723
724    def test_partial(self):
725        self.check_partial(
726            "\x00\xff\u0100\uffff\U00010000",
727            [
728                "",
729                "\x00",
730                "\x00",
731                "\x00\xff",
732                "\x00\xff",
733                "\x00\xff\u0100",
734                "\x00\xff\u0100",
735                "\x00\xff\u0100\uffff",
736                "\x00\xff\u0100\uffff",
737                "\x00\xff\u0100\uffff",
738                "\x00\xff\u0100\uffff",
739                "\x00\xff\u0100\uffff\U00010000",
740            ]
741        )
742
743    def test_errors(self):
744        tests = [
745            (b'\xff', '\ufffd'),
746            (b'A\x00Z', 'A\ufffd'),
747            (b'A\x00B\x00C\x00D\x00Z', 'ABCD\ufffd'),
748            (b'\x00\xd8', '\ufffd'),
749            (b'\x00\xd8A', '\ufffd'),
750            (b'\x00\xd8A\x00', '\ufffdA'),
751            (b'\x00\xdcA\x00', '\ufffdA'),
752        ]
753        for raw, expected in tests:
754            self.assertRaises(UnicodeDecodeError, codecs.utf_16_le_decode,
755                              raw, 'strict', True)
756            self.assertEqual(raw.decode('utf-16le', 'replace'), expected)
757
758    def test_nonbmp(self):
759        self.assertEqual("\U00010203".encode(self.encoding),
760                         b'\x00\xd8\x03\xde')
761        self.assertEqual(b'\x00\xd8\x03\xde'.decode(self.encoding),
762                         "\U00010203")
763
764class UTF16BETest(ReadTest, unittest.TestCase):
765    encoding = "utf-16-be"
766    ill_formed_sequence = b"\xdc\x80"
767
768    def test_partial(self):
769        self.check_partial(
770            "\x00\xff\u0100\uffff\U00010000",
771            [
772                "",
773                "\x00",
774                "\x00",
775                "\x00\xff",
776                "\x00\xff",
777                "\x00\xff\u0100",
778                "\x00\xff\u0100",
779                "\x00\xff\u0100\uffff",
780                "\x00\xff\u0100\uffff",
781                "\x00\xff\u0100\uffff",
782                "\x00\xff\u0100\uffff",
783                "\x00\xff\u0100\uffff\U00010000",
784            ]
785        )
786
787    def test_errors(self):
788        tests = [
789            (b'\xff', '\ufffd'),
790            (b'\x00A\xff', 'A\ufffd'),
791            (b'\x00A\x00B\x00C\x00DZ', 'ABCD\ufffd'),
792            (b'\xd8\x00', '\ufffd'),
793            (b'\xd8\x00\xdc', '\ufffd'),
794            (b'\xd8\x00\x00A', '\ufffdA'),
795            (b'\xdc\x00\x00A', '\ufffdA'),
796        ]
797        for raw, expected in tests:
798            self.assertRaises(UnicodeDecodeError, codecs.utf_16_be_decode,
799                              raw, 'strict', True)
800            self.assertEqual(raw.decode('utf-16be', 'replace'), expected)
801
802    def test_nonbmp(self):
803        self.assertEqual("\U00010203".encode(self.encoding),
804                         b'\xd8\x00\xde\x03')
805        self.assertEqual(b'\xd8\x00\xde\x03'.decode(self.encoding),
806                         "\U00010203")
807
808class UTF8Test(ReadTest, unittest.TestCase):
809    encoding = "utf-8"
810    ill_formed_sequence = b"\xed\xb2\x80"
811    ill_formed_sequence_replace = "\ufffd" * 3
812    BOM = b''
813
814    def test_partial(self):
815        self.check_partial(
816            "\x00\xff\u07ff\u0800\uffff\U00010000",
817            [
818                "\x00",
819                "\x00",
820                "\x00\xff",
821                "\x00\xff",
822                "\x00\xff\u07ff",
823                "\x00\xff\u07ff",
824                "\x00\xff\u07ff",
825                "\x00\xff\u07ff\u0800",
826                "\x00\xff\u07ff\u0800",
827                "\x00\xff\u07ff\u0800",
828                "\x00\xff\u07ff\u0800\uffff",
829                "\x00\xff\u07ff\u0800\uffff",
830                "\x00\xff\u07ff\u0800\uffff",
831                "\x00\xff\u07ff\u0800\uffff",
832                "\x00\xff\u07ff\u0800\uffff\U00010000",
833            ]
834        )
835
836    def test_decoder_state(self):
837        u = "\x00\x7f\x80\xff\u0100\u07ff\u0800\uffff\U0010ffff"
838        self.check_state_handling_decode(self.encoding,
839                                         u, u.encode(self.encoding))
840
841    def test_decode_error(self):
842        for data, error_handler, expected in (
843            (b'[\x80\xff]', 'ignore', '[]'),
844            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
845            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
846            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
847        ):
848            with self.subTest(data=data, error_handler=error_handler,
849                              expected=expected):
850                self.assertEqual(data.decode(self.encoding, error_handler),
851                                 expected)
852
853    def test_lone_surrogates(self):
854        super().test_lone_surrogates()
855        # not sure if this is making sense for
856        # UTF-16 and UTF-32
857        self.assertEqual("[\uDC80]".encode(self.encoding, "surrogateescape"),
858                         self.BOM + b'[\x80]')
859
860        with self.assertRaises(UnicodeEncodeError) as cm:
861            "[\uDC80\uD800\uDFFF]".encode(self.encoding, "surrogateescape")
862        exc = cm.exception
863        self.assertEqual(exc.object[exc.start:exc.end], '\uD800\uDFFF')
864
865    def test_surrogatepass_handler(self):
866        self.assertEqual("abc\ud800def".encode(self.encoding, "surrogatepass"),
867                         self.BOM + b"abc\xed\xa0\x80def")
868        self.assertEqual("\U00010fff\uD800".encode(self.encoding, "surrogatepass"),
869                         self.BOM + b"\xf0\x90\xbf\xbf\xed\xa0\x80")
870        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "surrogatepass"),
871                         self.BOM + b'[\xed\xa0\x80\xed\xb2\x80]')
872
873        self.assertEqual(b"abc\xed\xa0\x80def".decode(self.encoding, "surrogatepass"),
874                         "abc\ud800def")
875        self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode(self.encoding, "surrogatepass"),
876                         "\U00010fff\uD800")
877
878        self.assertTrue(codecs.lookup_error("surrogatepass"))
879        with self.assertRaises(UnicodeDecodeError):
880            b"abc\xed\xa0".decode(self.encoding, "surrogatepass")
881        with self.assertRaises(UnicodeDecodeError):
882            b"abc\xed\xa0z".decode(self.encoding, "surrogatepass")
883
884    def test_incremental_errors(self):
885        # Test that the incremental decoder can fail with final=False.
886        # See issue #24214
887        cases = [b'\x80', b'\xBF', b'\xC0', b'\xC1', b'\xF5', b'\xF6', b'\xFF']
888        for prefix in (b'\xC2', b'\xDF', b'\xE0', b'\xE0\xA0', b'\xEF',
889                       b'\xEF\xBF', b'\xF0', b'\xF0\x90', b'\xF0\x90\x80',
890                       b'\xF4', b'\xF4\x8F', b'\xF4\x8F\xBF'):
891            for suffix in b'\x7F', b'\xC0':
892                cases.append(prefix + suffix)
893        cases.extend((b'\xE0\x80', b'\xE0\x9F', b'\xED\xA0\x80',
894                      b'\xED\xBF\xBF', b'\xF0\x80', b'\xF0\x8F', b'\xF4\x90'))
895
896        for data in cases:
897            with self.subTest(data=data):
898                dec = codecs.getincrementaldecoder(self.encoding)()
899                self.assertRaises(UnicodeDecodeError, dec.decode, data)
900
901
902class UTF7Test(ReadTest, unittest.TestCase):
903    encoding = "utf-7"
904
905    def test_ascii(self):
906        # Set D (directly encoded characters)
907        set_d = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
908                 'abcdefghijklmnopqrstuvwxyz'
909                 '0123456789'
910                 '\'(),-./:?')
911        self.assertEqual(set_d.encode(self.encoding), set_d.encode('ascii'))
912        self.assertEqual(set_d.encode('ascii').decode(self.encoding), set_d)
913        # Set O (optional direct characters)
914        set_o = ' !"#$%&*;<=>@[]^_`{|}'
915        self.assertEqual(set_o.encode(self.encoding), set_o.encode('ascii'))
916        self.assertEqual(set_o.encode('ascii').decode(self.encoding), set_o)
917        # +
918        self.assertEqual('a+b'.encode(self.encoding), b'a+-b')
919        self.assertEqual(b'a+-b'.decode(self.encoding), 'a+b')
920        # White spaces
921        ws = ' \t\n\r'
922        self.assertEqual(ws.encode(self.encoding), ws.encode('ascii'))
923        self.assertEqual(ws.encode('ascii').decode(self.encoding), ws)
924        # Other ASCII characters
925        other_ascii = ''.join(sorted(set(bytes(range(0x80)).decode()) -
926                                     set(set_d + set_o + '+' + ws)))
927        self.assertEqual(other_ascii.encode(self.encoding),
928                         b'+AAAAAQACAAMABAAFAAYABwAIAAsADAAOAA8AEAARABIAEwAU'
929                         b'ABUAFgAXABgAGQAaABsAHAAdAB4AHwBcAH4Afw-')
930
931    def test_partial(self):
932        self.check_partial(
933            'a+-b\x00c\x80d\u0100e\U00010000f',
934            [
935                'a',
936                'a',
937                'a+',
938                'a+-',
939                'a+-b',
940                'a+-b',
941                'a+-b',
942                'a+-b',
943                'a+-b',
944                'a+-b\x00',
945                'a+-b\x00c',
946                'a+-b\x00c',
947                'a+-b\x00c',
948                'a+-b\x00c',
949                'a+-b\x00c',
950                'a+-b\x00c\x80',
951                'a+-b\x00c\x80d',
952                'a+-b\x00c\x80d',
953                'a+-b\x00c\x80d',
954                'a+-b\x00c\x80d',
955                'a+-b\x00c\x80d',
956                'a+-b\x00c\x80d\u0100',
957                'a+-b\x00c\x80d\u0100e',
958                'a+-b\x00c\x80d\u0100e',
959                'a+-b\x00c\x80d\u0100e',
960                'a+-b\x00c\x80d\u0100e',
961                'a+-b\x00c\x80d\u0100e',
962                'a+-b\x00c\x80d\u0100e',
963                'a+-b\x00c\x80d\u0100e',
964                'a+-b\x00c\x80d\u0100e',
965                'a+-b\x00c\x80d\u0100e\U00010000',
966                'a+-b\x00c\x80d\u0100e\U00010000f',
967            ]
968        )
969
970    def test_errors(self):
971        tests = [
972            (b'\xffb', '\ufffdb'),
973            (b'a\xffb', 'a\ufffdb'),
974            (b'a\xff\xffb', 'a\ufffd\ufffdb'),
975            (b'a+IK', 'a\ufffd'),
976            (b'a+IK-b', 'a\ufffdb'),
977            (b'a+IK,b', 'a\ufffdb'),
978            (b'a+IKx', 'a\u20ac\ufffd'),
979            (b'a+IKx-b', 'a\u20ac\ufffdb'),
980            (b'a+IKwgr', 'a\u20ac\ufffd'),
981            (b'a+IKwgr-b', 'a\u20ac\ufffdb'),
982            (b'a+IKwgr,', 'a\u20ac\ufffd'),
983            (b'a+IKwgr,-b', 'a\u20ac\ufffd-b'),
984            (b'a+IKwgrB', 'a\u20ac\u20ac\ufffd'),
985            (b'a+IKwgrB-b', 'a\u20ac\u20ac\ufffdb'),
986            (b'a+/,+IKw-b', 'a\ufffd\u20acb'),
987            (b'a+//,+IKw-b', 'a\ufffd\u20acb'),
988            (b'a+///,+IKw-b', 'a\uffff\ufffd\u20acb'),
989            (b'a+////,+IKw-b', 'a\uffff\ufffd\u20acb'),
990            (b'a+IKw-b\xff', 'a\u20acb\ufffd'),
991            (b'a+IKw\xffb', 'a\u20ac\ufffdb'),
992            (b'a+@b', 'a\ufffdb'),
993        ]
994        for raw, expected in tests:
995            with self.subTest(raw=raw):
996                self.assertRaises(UnicodeDecodeError, codecs.utf_7_decode,
997                                raw, 'strict', True)
998                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
999
1000    def test_nonbmp(self):
1001        self.assertEqual('\U000104A0'.encode(self.encoding), b'+2AHcoA-')
1002        self.assertEqual('\ud801\udca0'.encode(self.encoding), b'+2AHcoA-')
1003        self.assertEqual(b'+2AHcoA-'.decode(self.encoding), '\U000104A0')
1004        self.assertEqual(b'+2AHcoA'.decode(self.encoding), '\U000104A0')
1005        self.assertEqual('\u20ac\U000104A0'.encode(self.encoding), b'+IKzYAdyg-')
1006        self.assertEqual(b'+IKzYAdyg-'.decode(self.encoding), '\u20ac\U000104A0')
1007        self.assertEqual(b'+IKzYAdyg'.decode(self.encoding), '\u20ac\U000104A0')
1008        self.assertEqual('\u20ac\u20ac\U000104A0'.encode(self.encoding),
1009                         b'+IKwgrNgB3KA-')
1010        self.assertEqual(b'+IKwgrNgB3KA-'.decode(self.encoding),
1011                         '\u20ac\u20ac\U000104A0')
1012        self.assertEqual(b'+IKwgrNgB3KA'.decode(self.encoding),
1013                         '\u20ac\u20ac\U000104A0')
1014
1015    def test_lone_surrogates(self):
1016        tests = [
1017            (b'a+2AE-b', 'a\ud801b'),
1018            (b'a+2AE\xffb', 'a\ufffdb'),
1019            (b'a+2AE', 'a\ufffd'),
1020            (b'a+2AEA-b', 'a\ufffdb'),
1021            (b'a+2AH-b', 'a\ufffdb'),
1022            (b'a+IKzYAQ-b', 'a\u20ac\ud801b'),
1023            (b'a+IKzYAQ\xffb', 'a\u20ac\ufffdb'),
1024            (b'a+IKzYAQA-b', 'a\u20ac\ufffdb'),
1025            (b'a+IKzYAd-b', 'a\u20ac\ufffdb'),
1026            (b'a+IKwgrNgB-b', 'a\u20ac\u20ac\ud801b'),
1027            (b'a+IKwgrNgB\xffb', 'a\u20ac\u20ac\ufffdb'),
1028            (b'a+IKwgrNgB', 'a\u20ac\u20ac\ufffd'),
1029            (b'a+IKwgrNgBA-b', 'a\u20ac\u20ac\ufffdb'),
1030        ]
1031        for raw, expected in tests:
1032            with self.subTest(raw=raw):
1033                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1034
1035
1036class UTF16ExTest(unittest.TestCase):
1037
1038    def test_errors(self):
1039        self.assertRaises(UnicodeDecodeError, codecs.utf_16_ex_decode, b"\xff", "strict", 0, True)
1040
1041    def test_bad_args(self):
1042        self.assertRaises(TypeError, codecs.utf_16_ex_decode)
1043
1044class ReadBufferTest(unittest.TestCase):
1045
1046    def test_array(self):
1047        import array
1048        self.assertEqual(
1049            codecs.readbuffer_encode(array.array("b", b"spam")),
1050            (b"spam", 4)
1051        )
1052
1053    def test_empty(self):
1054        self.assertEqual(codecs.readbuffer_encode(""), (b"", 0))
1055
1056    def test_bad_args(self):
1057        self.assertRaises(TypeError, codecs.readbuffer_encode)
1058        self.assertRaises(TypeError, codecs.readbuffer_encode, 42)
1059
1060class UTF8SigTest(UTF8Test, unittest.TestCase):
1061    encoding = "utf-8-sig"
1062    BOM = codecs.BOM_UTF8
1063
1064    def test_partial(self):
1065        self.check_partial(
1066            "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1067            [
1068                "",
1069                "",
1070                "", # First BOM has been read and skipped
1071                "",
1072                "",
1073                "\ufeff", # Second BOM has been read and emitted
1074                "\ufeff\x00", # "\x00" read and emitted
1075                "\ufeff\x00", # First byte of encoded "\xff" read
1076                "\ufeff\x00\xff", # Second byte of encoded "\xff" read
1077                "\ufeff\x00\xff", # First byte of encoded "\u07ff" read
1078                "\ufeff\x00\xff\u07ff", # Second byte of encoded "\u07ff" read
1079                "\ufeff\x00\xff\u07ff",
1080                "\ufeff\x00\xff\u07ff",
1081                "\ufeff\x00\xff\u07ff\u0800",
1082                "\ufeff\x00\xff\u07ff\u0800",
1083                "\ufeff\x00\xff\u07ff\u0800",
1084                "\ufeff\x00\xff\u07ff\u0800\uffff",
1085                "\ufeff\x00\xff\u07ff\u0800\uffff",
1086                "\ufeff\x00\xff\u07ff\u0800\uffff",
1087                "\ufeff\x00\xff\u07ff\u0800\uffff",
1088                "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1089            ]
1090        )
1091
1092    def test_bug1601501(self):
1093        # SF bug #1601501: check that the codec works with a buffer
1094        self.assertEqual(str(b"\xef\xbb\xbf", "utf-8-sig"), "")
1095
1096    def test_bom(self):
1097        d = codecs.getincrementaldecoder("utf-8-sig")()
1098        s = "spam"
1099        self.assertEqual(d.decode(s.encode("utf-8-sig")), s)
1100
1101    def test_stream_bom(self):
1102        unistring = "ABC\u00A1\u2200XYZ"
1103        bytestring = codecs.BOM_UTF8 + b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1104
1105        reader = codecs.getreader("utf-8-sig")
1106        for sizehint in [None] + list(range(1, 11)) + \
1107                        [64, 128, 256, 512, 1024]:
1108            istream = reader(io.BytesIO(bytestring))
1109            ostream = io.StringIO()
1110            while 1:
1111                if sizehint is not None:
1112                    data = istream.read(sizehint)
1113                else:
1114                    data = istream.read()
1115
1116                if not data:
1117                    break
1118                ostream.write(data)
1119
1120            got = ostream.getvalue()
1121            self.assertEqual(got, unistring)
1122
1123    def test_stream_bare(self):
1124        unistring = "ABC\u00A1\u2200XYZ"
1125        bytestring = b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1126
1127        reader = codecs.getreader("utf-8-sig")
1128        for sizehint in [None] + list(range(1, 11)) + \
1129                        [64, 128, 256, 512, 1024]:
1130            istream = reader(io.BytesIO(bytestring))
1131            ostream = io.StringIO()
1132            while 1:
1133                if sizehint is not None:
1134                    data = istream.read(sizehint)
1135                else:
1136                    data = istream.read()
1137
1138                if not data:
1139                    break
1140                ostream.write(data)
1141
1142            got = ostream.getvalue()
1143            self.assertEqual(got, unistring)
1144
1145
1146class EscapeDecodeTest(unittest.TestCase):
1147    def test_empty(self):
1148        self.assertEqual(codecs.escape_decode(b""), (b"", 0))
1149        self.assertEqual(codecs.escape_decode(bytearray()), (b"", 0))
1150
1151    def test_raw(self):
1152        decode = codecs.escape_decode
1153        for b in range(256):
1154            b = bytes([b])
1155            if b != b'\\':
1156                self.assertEqual(decode(b + b'0'), (b + b'0', 2))
1157
1158    def test_escape(self):
1159        decode = codecs.escape_decode
1160        check = coding_checker(self, decode)
1161        check(b"[\\\n]", b"[]")
1162        check(br'[\"]', b'["]')
1163        check(br"[\']", b"[']")
1164        check(br"[\\]", b"[\\]")
1165        check(br"[\a]", b"[\x07]")
1166        check(br"[\b]", b"[\x08]")
1167        check(br"[\t]", b"[\x09]")
1168        check(br"[\n]", b"[\x0a]")
1169        check(br"[\v]", b"[\x0b]")
1170        check(br"[\f]", b"[\x0c]")
1171        check(br"[\r]", b"[\x0d]")
1172        check(br"[\7]", b"[\x07]")
1173        check(br"[\78]", b"[\x078]")
1174        check(br"[\41]", b"[!]")
1175        check(br"[\418]", b"[!8]")
1176        check(br"[\101]", b"[A]")
1177        check(br"[\1010]", b"[A0]")
1178        check(br"[\501]", b"[A]")
1179        check(br"[\x41]", b"[A]")
1180        check(br"[\x410]", b"[A0]")
1181        for i in range(97, 123):
1182            b = bytes([i])
1183            if b not in b'abfnrtvx':
1184                with self.assertWarns(DeprecationWarning):
1185                    check(b"\\" + b, b"\\" + b)
1186            with self.assertWarns(DeprecationWarning):
1187                check(b"\\" + b.upper(), b"\\" + b.upper())
1188        with self.assertWarns(DeprecationWarning):
1189            check(br"\8", b"\\8")
1190        with self.assertWarns(DeprecationWarning):
1191            check(br"\9", b"\\9")
1192        with self.assertWarns(DeprecationWarning):
1193            check(b"\\\xfa", b"\\\xfa")
1194
1195    def test_errors(self):
1196        decode = codecs.escape_decode
1197        self.assertRaises(ValueError, decode, br"\x")
1198        self.assertRaises(ValueError, decode, br"[\x]")
1199        self.assertEqual(decode(br"[\x]\x", "ignore"), (b"[]", 6))
1200        self.assertEqual(decode(br"[\x]\x", "replace"), (b"[?]?", 6))
1201        self.assertRaises(ValueError, decode, br"\x0")
1202        self.assertRaises(ValueError, decode, br"[\x0]")
1203        self.assertEqual(decode(br"[\x0]\x0", "ignore"), (b"[]", 8))
1204        self.assertEqual(decode(br"[\x0]\x0", "replace"), (b"[?]?", 8))
1205
1206
1207# From RFC 3492
1208punycode_testcases = [
1209    # A Arabic (Egyptian):
1210    ("\u0644\u064A\u0647\u0645\u0627\u0628\u062A\u0643\u0644"
1211     "\u0645\u0648\u0634\u0639\u0631\u0628\u064A\u061F",
1212     b"egbpdaj6bu4bxfgehfvwxn"),
1213    # B Chinese (simplified):
1214    ("\u4ED6\u4EEC\u4E3A\u4EC0\u4E48\u4E0D\u8BF4\u4E2D\u6587",
1215     b"ihqwcrb4cv8a8dqg056pqjye"),
1216    # C Chinese (traditional):
1217    ("\u4ED6\u5011\u7232\u4EC0\u9EBD\u4E0D\u8AAA\u4E2D\u6587",
1218     b"ihqwctvzc91f659drss3x8bo0yb"),
1219    # D Czech: Pro<ccaron>prost<ecaron>nemluv<iacute><ccaron>esky
1220    ("\u0050\u0072\u006F\u010D\u0070\u0072\u006F\u0073\u0074"
1221     "\u011B\u006E\u0065\u006D\u006C\u0075\u0076\u00ED\u010D"
1222     "\u0065\u0073\u006B\u0079",
1223     b"Proprostnemluvesky-uyb24dma41a"),
1224    # E Hebrew:
1225    ("\u05DC\u05DE\u05D4\u05D4\u05DD\u05E4\u05E9\u05D5\u05D8"
1226     "\u05DC\u05D0\u05DE\u05D3\u05D1\u05E8\u05D9\u05DD\u05E2"
1227     "\u05D1\u05E8\u05D9\u05EA",
1228     b"4dbcagdahymbxekheh6e0a7fei0b"),
1229    # F Hindi (Devanagari):
1230    ("\u092F\u0939\u0932\u094B\u0917\u0939\u093F\u0928\u094D"
1231     "\u0926\u0940\u0915\u094D\u092F\u094B\u0902\u0928\u0939"
1232     "\u0940\u0902\u092C\u094B\u0932\u0938\u0915\u0924\u0947"
1233     "\u0939\u0948\u0902",
1234     b"i1baa7eci9glrd9b2ae1bj0hfcgg6iyaf8o0a1dig0cd"),
1235
1236    #(G) Japanese (kanji and hiragana):
1237    ("\u306A\u305C\u307F\u3093\u306A\u65E5\u672C\u8A9E\u3092"
1238     "\u8A71\u3057\u3066\u304F\u308C\u306A\u3044\u306E\u304B",
1239     b"n8jok5ay5dzabd5bym9f0cm5685rrjetr6pdxa"),
1240
1241    # (H) Korean (Hangul syllables):
1242    ("\uC138\uACC4\uC758\uBAA8\uB4E0\uC0AC\uB78C\uB4E4\uC774"
1243     "\uD55C\uAD6D\uC5B4\uB97C\uC774\uD574\uD55C\uB2E4\uBA74"
1244     "\uC5BC\uB9C8\uB098\uC88B\uC744\uAE4C",
1245     b"989aomsvi5e83db1d2a355cv1e0vak1dwrv93d5xbh15a0dt30a5j"
1246     b"psd879ccm6fea98c"),
1247
1248    # (I) Russian (Cyrillic):
1249    ("\u043F\u043E\u0447\u0435\u043C\u0443\u0436\u0435\u043E"
1250     "\u043D\u0438\u043D\u0435\u0433\u043E\u0432\u043E\u0440"
1251     "\u044F\u0442\u043F\u043E\u0440\u0443\u0441\u0441\u043A"
1252     "\u0438",
1253     b"b1abfaaepdrnnbgefbaDotcwatmq2g4l"),
1254
1255    # (J) Spanish: Porqu<eacute>nopuedensimplementehablarenEspa<ntilde>ol
1256    ("\u0050\u006F\u0072\u0071\u0075\u00E9\u006E\u006F\u0070"
1257     "\u0075\u0065\u0064\u0065\u006E\u0073\u0069\u006D\u0070"
1258     "\u006C\u0065\u006D\u0065\u006E\u0074\u0065\u0068\u0061"
1259     "\u0062\u006C\u0061\u0072\u0065\u006E\u0045\u0073\u0070"
1260     "\u0061\u00F1\u006F\u006C",
1261     b"PorqunopuedensimplementehablarenEspaol-fmd56a"),
1262
1263    # (K) Vietnamese:
1264    #  T<adotbelow>isaoh<odotbelow>kh<ocirc>ngth<ecirchookabove>ch\
1265    #   <ihookabove>n<oacute>iti<ecircacute>ngVi<ecircdotbelow>t
1266    ("\u0054\u1EA1\u0069\u0073\u0061\u006F\u0068\u1ECD\u006B"
1267     "\u0068\u00F4\u006E\u0067\u0074\u0068\u1EC3\u0063\u0068"
1268     "\u1EC9\u006E\u00F3\u0069\u0074\u0069\u1EBF\u006E\u0067"
1269     "\u0056\u0069\u1EC7\u0074",
1270     b"TisaohkhngthchnitingVit-kjcr8268qyxafd2f1b9g"),
1271
1272    #(L) 3<nen>B<gumi><kinpachi><sensei>
1273    ("\u0033\u5E74\u0042\u7D44\u91D1\u516B\u5148\u751F",
1274     b"3B-ww4c5e180e575a65lsy2b"),
1275
1276    # (M) <amuro><namie>-with-SUPER-MONKEYS
1277    ("\u5B89\u5BA4\u5948\u7F8E\u6075\u002D\u0077\u0069\u0074"
1278     "\u0068\u002D\u0053\u0055\u0050\u0045\u0052\u002D\u004D"
1279     "\u004F\u004E\u004B\u0045\u0059\u0053",
1280     b"-with-SUPER-MONKEYS-pc58ag80a8qai00g7n9n"),
1281
1282    # (N) Hello-Another-Way-<sorezore><no><basho>
1283    ("\u0048\u0065\u006C\u006C\u006F\u002D\u0041\u006E\u006F"
1284     "\u0074\u0068\u0065\u0072\u002D\u0057\u0061\u0079\u002D"
1285     "\u305D\u308C\u305E\u308C\u306E\u5834\u6240",
1286     b"Hello-Another-Way--fc4qua05auwb3674vfr0b"),
1287
1288    # (O) <hitotsu><yane><no><shita>2
1289    ("\u3072\u3068\u3064\u5C4B\u6839\u306E\u4E0B\u0032",
1290     b"2-u9tlzr9756bt3uc0v"),
1291
1292    # (P) Maji<de>Koi<suru>5<byou><mae>
1293    ("\u004D\u0061\u006A\u0069\u3067\u004B\u006F\u0069\u3059"
1294     "\u308B\u0035\u79D2\u524D",
1295     b"MajiKoi5-783gue6qz075azm5e"),
1296
1297     # (Q) <pafii>de<runba>
1298    ("\u30D1\u30D5\u30A3\u30FC\u0064\u0065\u30EB\u30F3\u30D0",
1299     b"de-jg4avhby1noc0d"),
1300
1301    # (R) <sono><supiido><de>
1302    ("\u305D\u306E\u30B9\u30D4\u30FC\u30C9\u3067",
1303     b"d9juau41awczczp"),
1304
1305    # (S) -> $1.00 <-
1306    ("\u002D\u003E\u0020\u0024\u0031\u002E\u0030\u0030\u0020"
1307     "\u003C\u002D",
1308     b"-> $1.00 <--")
1309    ]
1310
1311for i in punycode_testcases:
1312    if len(i)!=2:
1313        print(repr(i))
1314
1315
1316class PunycodeTest(unittest.TestCase):
1317    def test_encode(self):
1318        for uni, puny in punycode_testcases:
1319            # Need to convert both strings to lower case, since
1320            # some of the extended encodings use upper case, but our
1321            # code produces only lower case. Converting just puny to
1322            # lower is also insufficient, since some of the input characters
1323            # are upper case.
1324            self.assertEqual(
1325                str(uni.encode("punycode"), "ascii").lower(),
1326                str(puny, "ascii").lower()
1327            )
1328
1329    def test_decode(self):
1330        for uni, puny in punycode_testcases:
1331            self.assertEqual(uni, puny.decode("punycode"))
1332            puny = puny.decode("ascii").encode("ascii")
1333            self.assertEqual(uni, puny.decode("punycode"))
1334
1335    def test_decode_invalid(self):
1336        testcases = [
1337            (b"xn--w&", "strict", UnicodeError()),
1338            (b"xn--w&", "ignore", "xn-"),
1339        ]
1340        for puny, errors, expected in testcases:
1341            with self.subTest(puny=puny, errors=errors):
1342                if isinstance(expected, Exception):
1343                    self.assertRaises(UnicodeError, puny.decode, "punycode", errors)
1344                else:
1345                    self.assertEqual(puny.decode("punycode", errors), expected)
1346
1347
1348# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
1349nameprep_tests = [
1350    # 3.1 Map to nothing.
1351    (b'foo\xc2\xad\xcd\x8f\xe1\xa0\x86\xe1\xa0\x8bbar'
1352     b'\xe2\x80\x8b\xe2\x81\xa0baz\xef\xb8\x80\xef\xb8\x88\xef'
1353     b'\xb8\x8f\xef\xbb\xbf',
1354     b'foobarbaz'),
1355    # 3.2 Case folding ASCII U+0043 U+0041 U+0046 U+0045.
1356    (b'CAFE',
1357     b'cafe'),
1358    # 3.3 Case folding 8bit U+00DF (german sharp s).
1359    # The original test case is bogus; it says \xc3\xdf
1360    (b'\xc3\x9f',
1361     b'ss'),
1362    # 3.4 Case folding U+0130 (turkish capital I with dot).
1363    (b'\xc4\xb0',
1364     b'i\xcc\x87'),
1365    # 3.5 Case folding multibyte U+0143 U+037A.
1366    (b'\xc5\x83\xcd\xba',
1367     b'\xc5\x84 \xce\xb9'),
1368    # 3.6 Case folding U+2121 U+33C6 U+1D7BB.
1369    # XXX: skip this as it fails in UCS-2 mode
1370    #('\xe2\x84\xa1\xe3\x8f\x86\xf0\x9d\x9e\xbb',
1371    # 'telc\xe2\x88\x95kg\xcf\x83'),
1372    (None, None),
1373    # 3.7 Normalization of U+006a U+030c U+00A0 U+00AA.
1374    (b'j\xcc\x8c\xc2\xa0\xc2\xaa',
1375     b'\xc7\xb0 a'),
1376    # 3.8 Case folding U+1FB7 and normalization.
1377    (b'\xe1\xbe\xb7',
1378     b'\xe1\xbe\xb6\xce\xb9'),
1379    # 3.9 Self-reverting case folding U+01F0 and normalization.
1380    # The original test case is bogus, it says `\xc7\xf0'
1381    (b'\xc7\xb0',
1382     b'\xc7\xb0'),
1383    # 3.10 Self-reverting case folding U+0390 and normalization.
1384    (b'\xce\x90',
1385     b'\xce\x90'),
1386    # 3.11 Self-reverting case folding U+03B0 and normalization.
1387    (b'\xce\xb0',
1388     b'\xce\xb0'),
1389    # 3.12 Self-reverting case folding U+1E96 and normalization.
1390    (b'\xe1\xba\x96',
1391     b'\xe1\xba\x96'),
1392    # 3.13 Self-reverting case folding U+1F56 and normalization.
1393    (b'\xe1\xbd\x96',
1394     b'\xe1\xbd\x96'),
1395    # 3.14 ASCII space character U+0020.
1396    (b' ',
1397     b' '),
1398    # 3.15 Non-ASCII 8bit space character U+00A0.
1399    (b'\xc2\xa0',
1400     b' '),
1401    # 3.16 Non-ASCII multibyte space character U+1680.
1402    (b'\xe1\x9a\x80',
1403     None),
1404    # 3.17 Non-ASCII multibyte space character U+2000.
1405    (b'\xe2\x80\x80',
1406     b' '),
1407    # 3.18 Zero Width Space U+200b.
1408    (b'\xe2\x80\x8b',
1409     b''),
1410    # 3.19 Non-ASCII multibyte space character U+3000.
1411    (b'\xe3\x80\x80',
1412     b' '),
1413    # 3.20 ASCII control characters U+0010 U+007F.
1414    (b'\x10\x7f',
1415     b'\x10\x7f'),
1416    # 3.21 Non-ASCII 8bit control character U+0085.
1417    (b'\xc2\x85',
1418     None),
1419    # 3.22 Non-ASCII multibyte control character U+180E.
1420    (b'\xe1\xa0\x8e',
1421     None),
1422    # 3.23 Zero Width No-Break Space U+FEFF.
1423    (b'\xef\xbb\xbf',
1424     b''),
1425    # 3.24 Non-ASCII control character U+1D175.
1426    (b'\xf0\x9d\x85\xb5',
1427     None),
1428    # 3.25 Plane 0 private use character U+F123.
1429    (b'\xef\x84\xa3',
1430     None),
1431    # 3.26 Plane 15 private use character U+F1234.
1432    (b'\xf3\xb1\x88\xb4',
1433     None),
1434    # 3.27 Plane 16 private use character U+10F234.
1435    (b'\xf4\x8f\x88\xb4',
1436     None),
1437    # 3.28 Non-character code point U+8FFFE.
1438    (b'\xf2\x8f\xbf\xbe',
1439     None),
1440    # 3.29 Non-character code point U+10FFFF.
1441    (b'\xf4\x8f\xbf\xbf',
1442     None),
1443    # 3.30 Surrogate code U+DF42.
1444    (b'\xed\xbd\x82',
1445     None),
1446    # 3.31 Non-plain text character U+FFFD.
1447    (b'\xef\xbf\xbd',
1448     None),
1449    # 3.32 Ideographic description character U+2FF5.
1450    (b'\xe2\xbf\xb5',
1451     None),
1452    # 3.33 Display property character U+0341.
1453    (b'\xcd\x81',
1454     b'\xcc\x81'),
1455    # 3.34 Left-to-right mark U+200E.
1456    (b'\xe2\x80\x8e',
1457     None),
1458    # 3.35 Deprecated U+202A.
1459    (b'\xe2\x80\xaa',
1460     None),
1461    # 3.36 Language tagging character U+E0001.
1462    (b'\xf3\xa0\x80\x81',
1463     None),
1464    # 3.37 Language tagging character U+E0042.
1465    (b'\xf3\xa0\x81\x82',
1466     None),
1467    # 3.38 Bidi: RandALCat character U+05BE and LCat characters.
1468    (b'foo\xd6\xbebar',
1469     None),
1470    # 3.39 Bidi: RandALCat character U+FD50 and LCat characters.
1471    (b'foo\xef\xb5\x90bar',
1472     None),
1473    # 3.40 Bidi: RandALCat character U+FB38 and LCat characters.
1474    (b'foo\xef\xb9\xb6bar',
1475     b'foo \xd9\x8ebar'),
1476    # 3.41 Bidi: RandALCat without trailing RandALCat U+0627 U+0031.
1477    (b'\xd8\xa71',
1478     None),
1479    # 3.42 Bidi: RandALCat character U+0627 U+0031 U+0628.
1480    (b'\xd8\xa71\xd8\xa8',
1481     b'\xd8\xa71\xd8\xa8'),
1482    # 3.43 Unassigned code point U+E0002.
1483    # Skip this test as we allow unassigned
1484    #(b'\xf3\xa0\x80\x82',
1485    # None),
1486    (None, None),
1487    # 3.44 Larger test (shrinking).
1488    # Original test case reads \xc3\xdf
1489    (b'X\xc2\xad\xc3\x9f\xc4\xb0\xe2\x84\xa1j\xcc\x8c\xc2\xa0\xc2'
1490     b'\xaa\xce\xb0\xe2\x80\x80',
1491     b'xssi\xcc\x87tel\xc7\xb0 a\xce\xb0 '),
1492    # 3.45 Larger test (expanding).
1493    # Original test case reads \xc3\x9f
1494    (b'X\xc3\x9f\xe3\x8c\x96\xc4\xb0\xe2\x84\xa1\xe2\x92\x9f\xe3\x8c'
1495     b'\x80',
1496     b'xss\xe3\x82\xad\xe3\x83\xad\xe3\x83\xa1\xe3\x83\xbc\xe3'
1497     b'\x83\x88\xe3\x83\xabi\xcc\x87tel\x28d\x29\xe3\x82'
1498     b'\xa2\xe3\x83\x91\xe3\x83\xbc\xe3\x83\x88')
1499    ]
1500
1501
1502class NameprepTest(unittest.TestCase):
1503    def test_nameprep(self):
1504        from encodings.idna import nameprep
1505        for pos, (orig, prepped) in enumerate(nameprep_tests):
1506            if orig is None:
1507                # Skipped
1508                continue
1509            # The Unicode strings are given in UTF-8
1510            orig = str(orig, "utf-8", "surrogatepass")
1511            if prepped is None:
1512                # Input contains prohibited characters
1513                self.assertRaises(UnicodeError, nameprep, orig)
1514            else:
1515                prepped = str(prepped, "utf-8", "surrogatepass")
1516                try:
1517                    self.assertEqual(nameprep(orig), prepped)
1518                except Exception as e:
1519                    raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
1520
1521
1522class IDNACodecTest(unittest.TestCase):
1523    def test_builtin_decode(self):
1524        self.assertEqual(str(b"python.org", "idna"), "python.org")
1525        self.assertEqual(str(b"python.org.", "idna"), "python.org.")
1526        self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org")
1527        self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
1528
1529    def test_builtin_encode(self):
1530        self.assertEqual("python.org".encode("idna"), b"python.org")
1531        self.assertEqual("python.org.".encode("idna"), b"python.org.")
1532        self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
1533        self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
1534
1535    def test_builtin_decode_length_limit(self):
1536        with self.assertRaisesRegex(UnicodeError, "too long"):
1537            (b"xn--016c"+b"a"*1100).decode("idna")
1538        with self.assertRaisesRegex(UnicodeError, "too long"):
1539            (b"xn--016c"+b"a"*70).decode("idna")
1540
1541    def test_stream(self):
1542        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
1543        r.read(3)
1544        self.assertEqual(r.read(), "")
1545
1546    def test_incremental_decode(self):
1547        self.assertEqual(
1548            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
1549            "python.org"
1550        )
1551        self.assertEqual(
1552            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")),
1553            "python.org."
1554        )
1555        self.assertEqual(
1556            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1557            "pyth\xf6n.org."
1558        )
1559        self.assertEqual(
1560            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1561            "pyth\xf6n.org."
1562        )
1563
1564        decoder = codecs.getincrementaldecoder("idna")()
1565        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1566        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1567        self.assertEqual(decoder.decode(b"rg"), "")
1568        self.assertEqual(decoder.decode(b"", True), "org")
1569
1570        decoder.reset()
1571        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1572        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1573        self.assertEqual(decoder.decode(b"rg."), "org.")
1574        self.assertEqual(decoder.decode(b"", True), "")
1575
1576    def test_incremental_encode(self):
1577        self.assertEqual(
1578            b"".join(codecs.iterencode("python.org", "idna")),
1579            b"python.org"
1580        )
1581        self.assertEqual(
1582            b"".join(codecs.iterencode("python.org.", "idna")),
1583            b"python.org."
1584        )
1585        self.assertEqual(
1586            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1587            b"xn--pythn-mua.org."
1588        )
1589        self.assertEqual(
1590            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1591            b"xn--pythn-mua.org."
1592        )
1593
1594        encoder = codecs.getincrementalencoder("idna")()
1595        self.assertEqual(encoder.encode("\xe4x"), b"")
1596        self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.")
1597        self.assertEqual(encoder.encode("", True), b"org")
1598
1599        encoder.reset()
1600        self.assertEqual(encoder.encode("\xe4x"), b"")
1601        self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.")
1602        self.assertEqual(encoder.encode("", True), b"")
1603
1604    def test_errors(self):
1605        """Only supports "strict" error handler"""
1606        "python.org".encode("idna", "strict")
1607        b"python.org".decode("idna", "strict")
1608        for errors in ("ignore", "replace", "backslashreplace",
1609                "surrogateescape"):
1610            self.assertRaises(Exception, "python.org".encode, "idna", errors)
1611            self.assertRaises(Exception,
1612                b"python.org".decode, "idna", errors)
1613
1614
1615class CodecsModuleTest(unittest.TestCase):
1616
1617    def test_decode(self):
1618        self.assertEqual(codecs.decode(b'\xe4\xf6\xfc', 'latin-1'),
1619                         '\xe4\xf6\xfc')
1620        self.assertRaises(TypeError, codecs.decode)
1621        self.assertEqual(codecs.decode(b'abc'), 'abc')
1622        self.assertRaises(UnicodeDecodeError, codecs.decode, b'\xff', 'ascii')
1623
1624        # test keywords
1625        self.assertEqual(codecs.decode(obj=b'\xe4\xf6\xfc', encoding='latin-1'),
1626                         '\xe4\xf6\xfc')
1627        self.assertEqual(codecs.decode(b'[\xff]', 'ascii', errors='ignore'),
1628                         '[]')
1629
1630    def test_encode(self):
1631        self.assertEqual(codecs.encode('\xe4\xf6\xfc', 'latin-1'),
1632                         b'\xe4\xf6\xfc')
1633        self.assertRaises(TypeError, codecs.encode)
1634        self.assertRaises(LookupError, codecs.encode, "foo", "__spam__")
1635        self.assertEqual(codecs.encode('abc'), b'abc')
1636        self.assertRaises(UnicodeEncodeError, codecs.encode, '\xffff', 'ascii')
1637
1638        # test keywords
1639        self.assertEqual(codecs.encode(obj='\xe4\xf6\xfc', encoding='latin-1'),
1640                         b'\xe4\xf6\xfc')
1641        self.assertEqual(codecs.encode('[\xff]', 'ascii', errors='ignore'),
1642                         b'[]')
1643
1644    def test_register(self):
1645        self.assertRaises(TypeError, codecs.register)
1646        self.assertRaises(TypeError, codecs.register, 42)
1647
1648    def test_lookup(self):
1649        self.assertRaises(TypeError, codecs.lookup)
1650        self.assertRaises(LookupError, codecs.lookup, "__spam__")
1651        self.assertRaises(LookupError, codecs.lookup, " ")
1652
1653    def test_getencoder(self):
1654        self.assertRaises(TypeError, codecs.getencoder)
1655        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
1656
1657    def test_getdecoder(self):
1658        self.assertRaises(TypeError, codecs.getdecoder)
1659        self.assertRaises(LookupError, codecs.getdecoder, "__spam__")
1660
1661    def test_getreader(self):
1662        self.assertRaises(TypeError, codecs.getreader)
1663        self.assertRaises(LookupError, codecs.getreader, "__spam__")
1664
1665    def test_getwriter(self):
1666        self.assertRaises(TypeError, codecs.getwriter)
1667        self.assertRaises(LookupError, codecs.getwriter, "__spam__")
1668
1669    def test_lookup_issue1813(self):
1670        # Issue #1813: under Turkish locales, lookup of some codecs failed
1671        # because 'I' is lowercased as "ı" (dotless i)
1672        oldlocale = locale.setlocale(locale.LC_CTYPE)
1673        self.addCleanup(locale.setlocale, locale.LC_CTYPE, oldlocale)
1674        try:
1675            locale.setlocale(locale.LC_CTYPE, 'tr_TR')
1676        except locale.Error:
1677            # Unsupported locale on this system
1678            self.skipTest('test needs Turkish locale')
1679        c = codecs.lookup('ASCII')
1680        self.assertEqual(c.name, 'ascii')
1681
1682    def test_all(self):
1683        api = (
1684            "encode", "decode",
1685            "register", "CodecInfo", "Codec", "IncrementalEncoder",
1686            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
1687            "getencoder", "getdecoder", "getincrementalencoder",
1688            "getincrementaldecoder", "getreader", "getwriter",
1689            "register_error", "lookup_error",
1690            "strict_errors", "replace_errors", "ignore_errors",
1691            "xmlcharrefreplace_errors", "backslashreplace_errors",
1692            "namereplace_errors",
1693            "open", "EncodedFile",
1694            "iterencode", "iterdecode",
1695            "BOM", "BOM_BE", "BOM_LE",
1696            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
1697            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
1698            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
1699            "StreamReaderWriter", "StreamRecoder",
1700        )
1701        self.assertCountEqual(api, codecs.__all__)
1702        for api in codecs.__all__:
1703            getattr(codecs, api)
1704
1705    def test_open(self):
1706        self.addCleanup(support.unlink, support.TESTFN)
1707        for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'):
1708            with self.subTest(mode), \
1709                    codecs.open(support.TESTFN, mode, 'ascii') as file:
1710                self.assertIsInstance(file, codecs.StreamReaderWriter)
1711
1712    def test_undefined(self):
1713        self.assertRaises(UnicodeError, codecs.encode, 'abc', 'undefined')
1714        self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined')
1715        self.assertRaises(UnicodeError, codecs.encode, '', 'undefined')
1716        self.assertRaises(UnicodeError, codecs.decode, b'', 'undefined')
1717        for errors in ('strict', 'ignore', 'replace', 'backslashreplace'):
1718            self.assertRaises(UnicodeError,
1719                codecs.encode, 'abc', 'undefined', errors)
1720            self.assertRaises(UnicodeError,
1721                codecs.decode, b'abc', 'undefined', errors)
1722
1723    def test_file_closes_if_lookup_error_raised(self):
1724        mock_open = mock.mock_open()
1725        with mock.patch('builtins.open', mock_open) as file:
1726            with self.assertRaises(LookupError):
1727                codecs.open(support.TESTFN, 'wt', 'invalid-encoding')
1728
1729            file().close.assert_called()
1730
1731
1732class StreamReaderTest(unittest.TestCase):
1733
1734    def setUp(self):
1735        self.reader = codecs.getreader('utf-8')
1736        self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1737
1738    def test_readlines(self):
1739        f = self.reader(self.stream)
1740        self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
1741
1742
1743class EncodedFileTest(unittest.TestCase):
1744
1745    def test_basic(self):
1746        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1747        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
1748        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
1749
1750        f = io.BytesIO()
1751        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
1752        ef.write(b'\xc3\xbc')
1753        self.assertEqual(f.getvalue(), b'\xfc')
1754
1755all_unicode_encodings = [
1756    "ascii",
1757    "big5",
1758    "big5hkscs",
1759    "charmap",
1760    "cp037",
1761    "cp1006",
1762    "cp1026",
1763    "cp1125",
1764    "cp1140",
1765    "cp1250",
1766    "cp1251",
1767    "cp1252",
1768    "cp1253",
1769    "cp1254",
1770    "cp1255",
1771    "cp1256",
1772    "cp1257",
1773    "cp1258",
1774    "cp424",
1775    "cp437",
1776    "cp500",
1777    "cp720",
1778    "cp737",
1779    "cp775",
1780    "cp850",
1781    "cp852",
1782    "cp855",
1783    "cp856",
1784    "cp857",
1785    "cp858",
1786    "cp860",
1787    "cp861",
1788    "cp862",
1789    "cp863",
1790    "cp864",
1791    "cp865",
1792    "cp866",
1793    "cp869",
1794    "cp874",
1795    "cp875",
1796    "cp932",
1797    "cp949",
1798    "cp950",
1799    "euc_jis_2004",
1800    "euc_jisx0213",
1801    "euc_jp",
1802    "euc_kr",
1803    "gb18030",
1804    "gb2312",
1805    "gbk",
1806    "hp_roman8",
1807    "hz",
1808    "idna",
1809    "iso2022_jp",
1810    "iso2022_jp_1",
1811    "iso2022_jp_2",
1812    "iso2022_jp_2004",
1813    "iso2022_jp_3",
1814    "iso2022_jp_ext",
1815    "iso2022_kr",
1816    "iso8859_1",
1817    "iso8859_10",
1818    "iso8859_11",
1819    "iso8859_13",
1820    "iso8859_14",
1821    "iso8859_15",
1822    "iso8859_16",
1823    "iso8859_2",
1824    "iso8859_3",
1825    "iso8859_4",
1826    "iso8859_5",
1827    "iso8859_6",
1828    "iso8859_7",
1829    "iso8859_8",
1830    "iso8859_9",
1831    "johab",
1832    "koi8_r",
1833    "koi8_t",
1834    "koi8_u",
1835    "kz1048",
1836    "latin_1",
1837    "mac_cyrillic",
1838    "mac_greek",
1839    "mac_iceland",
1840    "mac_latin2",
1841    "mac_roman",
1842    "mac_turkish",
1843    "palmos",
1844    "ptcp154",
1845    "punycode",
1846    "raw_unicode_escape",
1847    "shift_jis",
1848    "shift_jis_2004",
1849    "shift_jisx0213",
1850    "tis_620",
1851    "unicode_escape",
1852    "utf_16",
1853    "utf_16_be",
1854    "utf_16_le",
1855    "utf_7",
1856    "utf_8",
1857]
1858
1859if hasattr(codecs, "mbcs_encode"):
1860    all_unicode_encodings.append("mbcs")
1861if hasattr(codecs, "oem_encode"):
1862    all_unicode_encodings.append("oem")
1863
1864# The following encoding is not tested, because it's not supposed
1865# to work:
1866#    "undefined"
1867
1868# The following encodings don't work in stateful mode
1869broken_unicode_with_stateful = [
1870    "punycode",
1871]
1872
1873
1874class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
1875    def test_basics(self):
1876        s = "abc123"  # all codecs should be able to encode these
1877        for encoding in all_unicode_encodings:
1878            name = codecs.lookup(encoding).name
1879            if encoding.endswith("_codec"):
1880                name += "_codec"
1881            elif encoding == "latin_1":
1882                name = "latin_1"
1883            self.assertEqual(encoding.replace("_", "-"), name.replace("_", "-"))
1884
1885            (b, size) = codecs.getencoder(encoding)(s)
1886            self.assertEqual(size, len(s), "encoding=%r" % encoding)
1887            (chars, size) = codecs.getdecoder(encoding)(b)
1888            self.assertEqual(chars, s, "encoding=%r" % encoding)
1889
1890            if encoding not in broken_unicode_with_stateful:
1891                # check stream reader/writer
1892                q = Queue(b"")
1893                writer = codecs.getwriter(encoding)(q)
1894                encodedresult = b""
1895                for c in s:
1896                    writer.write(c)
1897                    chunk = q.read()
1898                    self.assertTrue(type(chunk) is bytes, type(chunk))
1899                    encodedresult += chunk
1900                q = Queue(b"")
1901                reader = codecs.getreader(encoding)(q)
1902                decodedresult = ""
1903                for c in encodedresult:
1904                    q.write(bytes([c]))
1905                    decodedresult += reader.read()
1906                self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
1907
1908            if encoding not in broken_unicode_with_stateful:
1909                # check incremental decoder/encoder and iterencode()/iterdecode()
1910                try:
1911                    encoder = codecs.getincrementalencoder(encoding)()
1912                except LookupError:  # no IncrementalEncoder
1913                    pass
1914                else:
1915                    # check incremental decoder/encoder
1916                    encodedresult = b""
1917                    for c in s:
1918                        encodedresult += encoder.encode(c)
1919                    encodedresult += encoder.encode("", True)
1920                    decoder = codecs.getincrementaldecoder(encoding)()
1921                    decodedresult = ""
1922                    for c in encodedresult:
1923                        decodedresult += decoder.decode(bytes([c]))
1924                    decodedresult += decoder.decode(b"", True)
1925                    self.assertEqual(decodedresult, s,
1926                                     "encoding=%r" % encoding)
1927
1928                    # check iterencode()/iterdecode()
1929                    result = "".join(codecs.iterdecode(
1930                            codecs.iterencode(s, encoding), encoding))
1931                    self.assertEqual(result, s, "encoding=%r" % encoding)
1932
1933                    # check iterencode()/iterdecode() with empty string
1934                    result = "".join(codecs.iterdecode(
1935                            codecs.iterencode("", encoding), encoding))
1936                    self.assertEqual(result, "")
1937
1938                if encoding not in ("idna", "mbcs"):
1939                    # check incremental decoder/encoder with errors argument
1940                    try:
1941                        encoder = codecs.getincrementalencoder(encoding)("ignore")
1942                    except LookupError:  # no IncrementalEncoder
1943                        pass
1944                    else:
1945                        encodedresult = b"".join(encoder.encode(c) for c in s)
1946                        decoder = codecs.getincrementaldecoder(encoding)("ignore")
1947                        decodedresult = "".join(decoder.decode(bytes([c]))
1948                                                for c in encodedresult)
1949                        self.assertEqual(decodedresult, s,
1950                                         "encoding=%r" % encoding)
1951
1952    @support.cpython_only
1953    def test_basics_capi(self):
1954        s = "abc123"  # all codecs should be able to encode these
1955        for encoding in all_unicode_encodings:
1956            if encoding not in broken_unicode_with_stateful:
1957                # check incremental decoder/encoder (fetched via the C API)
1958                try:
1959                    cencoder = _testcapi.codec_incrementalencoder(encoding)
1960                except LookupError:  # no IncrementalEncoder
1961                    pass
1962                else:
1963                    # check C API
1964                    encodedresult = b""
1965                    for c in s:
1966                        encodedresult += cencoder.encode(c)
1967                    encodedresult += cencoder.encode("", True)
1968                    cdecoder = _testcapi.codec_incrementaldecoder(encoding)
1969                    decodedresult = ""
1970                    for c in encodedresult:
1971                        decodedresult += cdecoder.decode(bytes([c]))
1972                    decodedresult += cdecoder.decode(b"", True)
1973                    self.assertEqual(decodedresult, s,
1974                                     "encoding=%r" % encoding)
1975
1976                if encoding not in ("idna", "mbcs"):
1977                    # check incremental decoder/encoder with errors argument
1978                    try:
1979                        cencoder = _testcapi.codec_incrementalencoder(encoding, "ignore")
1980                    except LookupError:  # no IncrementalEncoder
1981                        pass
1982                    else:
1983                        encodedresult = b"".join(cencoder.encode(c) for c in s)
1984                        cdecoder = _testcapi.codec_incrementaldecoder(encoding, "ignore")
1985                        decodedresult = "".join(cdecoder.decode(bytes([c]))
1986                                                for c in encodedresult)
1987                        self.assertEqual(decodedresult, s,
1988                                         "encoding=%r" % encoding)
1989
1990    def test_seek(self):
1991        # all codecs should be able to encode these
1992        s = "%s\n%s\n" % (100*"abc123", 100*"def456")
1993        for encoding in all_unicode_encodings:
1994            if encoding == "idna": # FIXME: See SF bug #1163178
1995                continue
1996            if encoding in broken_unicode_with_stateful:
1997                continue
1998            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
1999            for t in range(5):
2000                # Test that calling seek resets the internal codec state and buffers
2001                reader.seek(0, 0)
2002                data = reader.read()
2003                self.assertEqual(s, data)
2004
2005    def test_bad_decode_args(self):
2006        for encoding in all_unicode_encodings:
2007            decoder = codecs.getdecoder(encoding)
2008            self.assertRaises(TypeError, decoder)
2009            if encoding not in ("idna", "punycode"):
2010                self.assertRaises(TypeError, decoder, 42)
2011
2012    def test_bad_encode_args(self):
2013        for encoding in all_unicode_encodings:
2014            encoder = codecs.getencoder(encoding)
2015            self.assertRaises(TypeError, encoder)
2016
2017    def test_encoding_map_type_initialized(self):
2018        from encodings import cp1140
2019        # This used to crash, we are only verifying there's no crash.
2020        table_type = type(cp1140.encoding_table)
2021        self.assertEqual(table_type, table_type)
2022
2023    def test_decoder_state(self):
2024        # Check that getstate() and setstate() handle the state properly
2025        u = "abc123"
2026        for encoding in all_unicode_encodings:
2027            if encoding not in broken_unicode_with_stateful:
2028                self.check_state_handling_decode(encoding, u, u.encode(encoding))
2029                self.check_state_handling_encode(encoding, u, u.encode(encoding))
2030
2031
2032class CharmapTest(unittest.TestCase):
2033    def test_decode_with_string_map(self):
2034        self.assertEqual(
2035            codecs.charmap_decode(b"\x00\x01\x02", "strict", "abc"),
2036            ("abc", 3)
2037        )
2038
2039        self.assertEqual(
2040            codecs.charmap_decode(b"\x00\x01\x02", "strict", "\U0010FFFFbc"),
2041            ("\U0010FFFFbc", 3)
2042        )
2043
2044        self.assertRaises(UnicodeDecodeError,
2045            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab"
2046        )
2047
2048        self.assertRaises(UnicodeDecodeError,
2049            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab\ufffe"
2050        )
2051
2052        self.assertEqual(
2053            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab"),
2054            ("ab\ufffd", 3)
2055        )
2056
2057        self.assertEqual(
2058            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab\ufffe"),
2059            ("ab\ufffd", 3)
2060        )
2061
2062        self.assertEqual(
2063            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab"),
2064            ("ab\\x02", 3)
2065        )
2066
2067        self.assertEqual(
2068            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab\ufffe"),
2069            ("ab\\x02", 3)
2070        )
2071
2072        self.assertEqual(
2073            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab"),
2074            ("ab", 3)
2075        )
2076
2077        self.assertEqual(
2078            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab\ufffe"),
2079            ("ab", 3)
2080        )
2081
2082        allbytes = bytes(range(256))
2083        self.assertEqual(
2084            codecs.charmap_decode(allbytes, "ignore", ""),
2085            ("", len(allbytes))
2086        )
2087
2088    def test_decode_with_int2str_map(self):
2089        self.assertEqual(
2090            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2091                                  {0: 'a', 1: 'b', 2: 'c'}),
2092            ("abc", 3)
2093        )
2094
2095        self.assertEqual(
2096            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2097                                  {0: 'Aa', 1: 'Bb', 2: 'Cc'}),
2098            ("AaBbCc", 3)
2099        )
2100
2101        self.assertEqual(
2102            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2103                                  {0: '\U0010FFFF', 1: 'b', 2: 'c'}),
2104            ("\U0010FFFFbc", 3)
2105        )
2106
2107        self.assertEqual(
2108            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2109                                  {0: 'a', 1: 'b', 2: ''}),
2110            ("ab", 3)
2111        )
2112
2113        self.assertRaises(UnicodeDecodeError,
2114            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2115                                   {0: 'a', 1: 'b'}
2116        )
2117
2118        self.assertRaises(UnicodeDecodeError,
2119            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2120                                   {0: 'a', 1: 'b', 2: None}
2121        )
2122
2123        # Issue #14850
2124        self.assertRaises(UnicodeDecodeError,
2125            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2126                                   {0: 'a', 1: 'b', 2: '\ufffe'}
2127        )
2128
2129        self.assertEqual(
2130            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2131                                  {0: 'a', 1: 'b'}),
2132            ("ab\ufffd", 3)
2133        )
2134
2135        self.assertEqual(
2136            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2137                                  {0: 'a', 1: 'b', 2: None}),
2138            ("ab\ufffd", 3)
2139        )
2140
2141        # Issue #14850
2142        self.assertEqual(
2143            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2144                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2145            ("ab\ufffd", 3)
2146        )
2147
2148        self.assertEqual(
2149            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2150                                  {0: 'a', 1: 'b'}),
2151            ("ab\\x02", 3)
2152        )
2153
2154        self.assertEqual(
2155            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2156                                  {0: 'a', 1: 'b', 2: None}),
2157            ("ab\\x02", 3)
2158        )
2159
2160        # Issue #14850
2161        self.assertEqual(
2162            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2163                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2164            ("ab\\x02", 3)
2165        )
2166
2167        self.assertEqual(
2168            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2169                                  {0: 'a', 1: 'b'}),
2170            ("ab", 3)
2171        )
2172
2173        self.assertEqual(
2174            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2175                                  {0: 'a', 1: 'b', 2: None}),
2176            ("ab", 3)
2177        )
2178
2179        # Issue #14850
2180        self.assertEqual(
2181            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2182                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2183            ("ab", 3)
2184        )
2185
2186        allbytes = bytes(range(256))
2187        self.assertEqual(
2188            codecs.charmap_decode(allbytes, "ignore", {}),
2189            ("", len(allbytes))
2190        )
2191
2192    def test_decode_with_int2int_map(self):
2193        a = ord('a')
2194        b = ord('b')
2195        c = ord('c')
2196
2197        self.assertEqual(
2198            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2199                                  {0: a, 1: b, 2: c}),
2200            ("abc", 3)
2201        )
2202
2203        # Issue #15379
2204        self.assertEqual(
2205            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2206                                  {0: 0x10FFFF, 1: b, 2: c}),
2207            ("\U0010FFFFbc", 3)
2208        )
2209
2210        self.assertEqual(
2211            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2212                                  {0: sys.maxunicode, 1: b, 2: c}),
2213            (chr(sys.maxunicode) + "bc", 3)
2214        )
2215
2216        self.assertRaises(TypeError,
2217            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2218                                   {0: sys.maxunicode + 1, 1: b, 2: c}
2219        )
2220
2221        self.assertRaises(UnicodeDecodeError,
2222            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2223                                   {0: a, 1: b},
2224        )
2225
2226        self.assertRaises(UnicodeDecodeError,
2227            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2228                                   {0: a, 1: b, 2: 0xFFFE},
2229        )
2230
2231        self.assertEqual(
2232            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2233                                  {0: a, 1: b}),
2234            ("ab\ufffd", 3)
2235        )
2236
2237        self.assertEqual(
2238            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2239                                  {0: a, 1: b, 2: 0xFFFE}),
2240            ("ab\ufffd", 3)
2241        )
2242
2243        self.assertEqual(
2244            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2245                                  {0: a, 1: b}),
2246            ("ab\\x02", 3)
2247        )
2248
2249        self.assertEqual(
2250            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2251                                  {0: a, 1: b, 2: 0xFFFE}),
2252            ("ab\\x02", 3)
2253        )
2254
2255        self.assertEqual(
2256            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2257                                  {0: a, 1: b}),
2258            ("ab", 3)
2259        )
2260
2261        self.assertEqual(
2262            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2263                                  {0: a, 1: b, 2: 0xFFFE}),
2264            ("ab", 3)
2265        )
2266
2267
2268class WithStmtTest(unittest.TestCase):
2269    def test_encodedfile(self):
2270        f = io.BytesIO(b"\xc3\xbc")
2271        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
2272            self.assertEqual(ef.read(), b"\xfc")
2273        self.assertTrue(f.closed)
2274
2275    def test_streamreaderwriter(self):
2276        f = io.BytesIO(b"\xc3\xbc")
2277        info = codecs.lookup("utf-8")
2278        with codecs.StreamReaderWriter(f, info.streamreader,
2279                                       info.streamwriter, 'strict') as srw:
2280            self.assertEqual(srw.read(), "\xfc")
2281
2282
2283class TypesTest(unittest.TestCase):
2284    def test_decode_unicode(self):
2285        # Most decoders don't accept unicode input
2286        decoders = [
2287            codecs.utf_7_decode,
2288            codecs.utf_8_decode,
2289            codecs.utf_16_le_decode,
2290            codecs.utf_16_be_decode,
2291            codecs.utf_16_ex_decode,
2292            codecs.utf_32_decode,
2293            codecs.utf_32_le_decode,
2294            codecs.utf_32_be_decode,
2295            codecs.utf_32_ex_decode,
2296            codecs.latin_1_decode,
2297            codecs.ascii_decode,
2298            codecs.charmap_decode,
2299        ]
2300        if hasattr(codecs, "mbcs_decode"):
2301            decoders.append(codecs.mbcs_decode)
2302        for decoder in decoders:
2303            self.assertRaises(TypeError, decoder, "xxx")
2304
2305    def test_unicode_escape(self):
2306        # Escape-decoding a unicode string is supported and gives the same
2307        # result as decoding the equivalent ASCII bytes string.
2308        self.assertEqual(codecs.unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2309        self.assertEqual(codecs.unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2310        self.assertEqual(codecs.raw_unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2311        self.assertEqual(codecs.raw_unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2312
2313        self.assertRaises(UnicodeDecodeError, codecs.unicode_escape_decode, br"\U00110000")
2314        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2315        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "backslashreplace"),
2316                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2317
2318        self.assertRaises(UnicodeDecodeError, codecs.raw_unicode_escape_decode, br"\U00110000")
2319        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2320        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "backslashreplace"),
2321                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2322
2323
2324class UnicodeEscapeTest(unittest.TestCase):
2325    def test_empty(self):
2326        self.assertEqual(codecs.unicode_escape_encode(""), (b"", 0))
2327        self.assertEqual(codecs.unicode_escape_decode(b""), ("", 0))
2328
2329    def test_raw_encode(self):
2330        encode = codecs.unicode_escape_encode
2331        for b in range(32, 127):
2332            if b != b'\\'[0]:
2333                self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2334
2335    def test_raw_decode(self):
2336        decode = codecs.unicode_escape_decode
2337        for b in range(256):
2338            if b != b'\\'[0]:
2339                self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2340
2341    def test_escape_encode(self):
2342        encode = codecs.unicode_escape_encode
2343        check = coding_checker(self, encode)
2344        check('\t', br'\t')
2345        check('\n', br'\n')
2346        check('\r', br'\r')
2347        check('\\', br'\\')
2348        for b in range(32):
2349            if chr(b) not in '\t\n\r':
2350                check(chr(b), ('\\x%02x' % b).encode())
2351        for b in range(127, 256):
2352            check(chr(b), ('\\x%02x' % b).encode())
2353        check('\u20ac', br'\u20ac')
2354        check('\U0001d120', br'\U0001d120')
2355
2356    def test_escape_decode(self):
2357        decode = codecs.unicode_escape_decode
2358        check = coding_checker(self, decode)
2359        check(b"[\\\n]", "[]")
2360        check(br'[\"]', '["]')
2361        check(br"[\']", "[']")
2362        check(br"[\\]", r"[\]")
2363        check(br"[\a]", "[\x07]")
2364        check(br"[\b]", "[\x08]")
2365        check(br"[\t]", "[\x09]")
2366        check(br"[\n]", "[\x0a]")
2367        check(br"[\v]", "[\x0b]")
2368        check(br"[\f]", "[\x0c]")
2369        check(br"[\r]", "[\x0d]")
2370        check(br"[\7]", "[\x07]")
2371        check(br"[\78]", "[\x078]")
2372        check(br"[\41]", "[!]")
2373        check(br"[\418]", "[!8]")
2374        check(br"[\101]", "[A]")
2375        check(br"[\1010]", "[A0]")
2376        check(br"[\x41]", "[A]")
2377        check(br"[\x410]", "[A0]")
2378        check(br"\u20ac", "\u20ac")
2379        check(br"\U0001d120", "\U0001d120")
2380        for i in range(97, 123):
2381            b = bytes([i])
2382            if b not in b'abfnrtuvx':
2383                with self.assertWarns(DeprecationWarning):
2384                    check(b"\\" + b, "\\" + chr(i))
2385            if b.upper() not in b'UN':
2386                with self.assertWarns(DeprecationWarning):
2387                    check(b"\\" + b.upper(), "\\" + chr(i-32))
2388        with self.assertWarns(DeprecationWarning):
2389            check(br"\8", "\\8")
2390        with self.assertWarns(DeprecationWarning):
2391            check(br"\9", "\\9")
2392        with self.assertWarns(DeprecationWarning):
2393            check(b"\\\xfa", "\\\xfa")
2394
2395    def test_decode_errors(self):
2396        decode = codecs.unicode_escape_decode
2397        for c, d in (b'x', 2), (b'u', 4), (b'U', 4):
2398            for i in range(d):
2399                self.assertRaises(UnicodeDecodeError, decode,
2400                                  b"\\" + c + b"0"*i)
2401                self.assertRaises(UnicodeDecodeError, decode,
2402                                  b"[\\" + c + b"0"*i + b"]")
2403                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2404                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2405                self.assertEqual(decode(data, "replace"),
2406                                 ("[\ufffd]\ufffd", len(data)))
2407        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2408        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2409        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2410
2411
2412class RawUnicodeEscapeTest(unittest.TestCase):
2413    def test_empty(self):
2414        self.assertEqual(codecs.raw_unicode_escape_encode(""), (b"", 0))
2415        self.assertEqual(codecs.raw_unicode_escape_decode(b""), ("", 0))
2416
2417    def test_raw_encode(self):
2418        encode = codecs.raw_unicode_escape_encode
2419        for b in range(256):
2420            self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2421
2422    def test_raw_decode(self):
2423        decode = codecs.raw_unicode_escape_decode
2424        for b in range(256):
2425            self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2426
2427    def test_escape_encode(self):
2428        encode = codecs.raw_unicode_escape_encode
2429        check = coding_checker(self, encode)
2430        for b in range(256):
2431            if b not in b'uU':
2432                check('\\' + chr(b), b'\\' + bytes([b]))
2433        check('\u20ac', br'\u20ac')
2434        check('\U0001d120', br'\U0001d120')
2435
2436    def test_escape_decode(self):
2437        decode = codecs.raw_unicode_escape_decode
2438        check = coding_checker(self, decode)
2439        for b in range(256):
2440            if b not in b'uU':
2441                check(b'\\' + bytes([b]), '\\' + chr(b))
2442        check(br"\u20ac", "\u20ac")
2443        check(br"\U0001d120", "\U0001d120")
2444
2445    def test_decode_errors(self):
2446        decode = codecs.raw_unicode_escape_decode
2447        for c, d in (b'u', 4), (b'U', 4):
2448            for i in range(d):
2449                self.assertRaises(UnicodeDecodeError, decode,
2450                                  b"\\" + c + b"0"*i)
2451                self.assertRaises(UnicodeDecodeError, decode,
2452                                  b"[\\" + c + b"0"*i + b"]")
2453                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2454                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2455                self.assertEqual(decode(data, "replace"),
2456                                 ("[\ufffd]\ufffd", len(data)))
2457        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2458        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2459        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2460
2461
2462class EscapeEncodeTest(unittest.TestCase):
2463
2464    def test_escape_encode(self):
2465        tests = [
2466            (b'', (b'', 0)),
2467            (b'foobar', (b'foobar', 6)),
2468            (b'spam\0eggs', (b'spam\\x00eggs', 9)),
2469            (b'a\'b', (b"a\\'b", 3)),
2470            (b'b\\c', (b'b\\\\c', 3)),
2471            (b'c\nd', (b'c\\nd', 3)),
2472            (b'd\re', (b'd\\re', 3)),
2473            (b'f\x7fg', (b'f\\x7fg', 3)),
2474        ]
2475        for data, output in tests:
2476            with self.subTest(data=data):
2477                self.assertEqual(codecs.escape_encode(data), output)
2478        self.assertRaises(TypeError, codecs.escape_encode, 'spam')
2479        self.assertRaises(TypeError, codecs.escape_encode, bytearray(b'spam'))
2480
2481
2482class SurrogateEscapeTest(unittest.TestCase):
2483
2484    def test_utf8(self):
2485        # Bad byte
2486        self.assertEqual(b"foo\x80bar".decode("utf-8", "surrogateescape"),
2487                         "foo\udc80bar")
2488        self.assertEqual("foo\udc80bar".encode("utf-8", "surrogateescape"),
2489                         b"foo\x80bar")
2490        # bad-utf-8 encoded surrogate
2491        self.assertEqual(b"\xed\xb0\x80".decode("utf-8", "surrogateescape"),
2492                         "\udced\udcb0\udc80")
2493        self.assertEqual("\udced\udcb0\udc80".encode("utf-8", "surrogateescape"),
2494                         b"\xed\xb0\x80")
2495
2496    def test_ascii(self):
2497        # bad byte
2498        self.assertEqual(b"foo\x80bar".decode("ascii", "surrogateescape"),
2499                         "foo\udc80bar")
2500        self.assertEqual("foo\udc80bar".encode("ascii", "surrogateescape"),
2501                         b"foo\x80bar")
2502
2503    def test_charmap(self):
2504        # bad byte: \xa5 is unmapped in iso-8859-3
2505        self.assertEqual(b"foo\xa5bar".decode("iso-8859-3", "surrogateescape"),
2506                         "foo\udca5bar")
2507        self.assertEqual("foo\udca5bar".encode("iso-8859-3", "surrogateescape"),
2508                         b"foo\xa5bar")
2509
2510    def test_latin1(self):
2511        # Issue6373
2512        self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin-1", "surrogateescape"),
2513                         b"\xe4\xeb\xef\xf6\xfc")
2514
2515
2516class BomTest(unittest.TestCase):
2517    def test_seek0(self):
2518        data = "1234567890"
2519        tests = ("utf-16",
2520                 "utf-16-le",
2521                 "utf-16-be",
2522                 "utf-32",
2523                 "utf-32-le",
2524                 "utf-32-be")
2525        self.addCleanup(support.unlink, support.TESTFN)
2526        for encoding in tests:
2527            # Check if the BOM is written only once
2528            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2529                f.write(data)
2530                f.write(data)
2531                f.seek(0)
2532                self.assertEqual(f.read(), data * 2)
2533                f.seek(0)
2534                self.assertEqual(f.read(), data * 2)
2535
2536            # Check that the BOM is written after a seek(0)
2537            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2538                f.write(data[0])
2539                self.assertNotEqual(f.tell(), 0)
2540                f.seek(0)
2541                f.write(data)
2542                f.seek(0)
2543                self.assertEqual(f.read(), data)
2544
2545            # (StreamWriter) Check that the BOM is written after a seek(0)
2546            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2547                f.writer.write(data[0])
2548                self.assertNotEqual(f.writer.tell(), 0)
2549                f.writer.seek(0)
2550                f.writer.write(data)
2551                f.seek(0)
2552                self.assertEqual(f.read(), data)
2553
2554            # Check that the BOM is not written after a seek() at a position
2555            # different than the start
2556            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2557                f.write(data)
2558                f.seek(f.tell())
2559                f.write(data)
2560                f.seek(0)
2561                self.assertEqual(f.read(), data * 2)
2562
2563            # (StreamWriter) Check that the BOM is not written after a seek()
2564            # at a position different than the start
2565            with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
2566                f.writer.write(data)
2567                f.writer.seek(f.writer.tell())
2568                f.writer.write(data)
2569                f.seek(0)
2570                self.assertEqual(f.read(), data * 2)
2571
2572
2573bytes_transform_encodings = [
2574    "base64_codec",
2575    "uu_codec",
2576    "quopri_codec",
2577    "hex_codec",
2578]
2579
2580transform_aliases = {
2581    "base64_codec": ["base64", "base_64"],
2582    "uu_codec": ["uu"],
2583    "quopri_codec": ["quopri", "quoted_printable", "quotedprintable"],
2584    "hex_codec": ["hex"],
2585    "rot_13": ["rot13"],
2586}
2587
2588try:
2589    import zlib
2590except ImportError:
2591    zlib = None
2592else:
2593    bytes_transform_encodings.append("zlib_codec")
2594    transform_aliases["zlib_codec"] = ["zip", "zlib"]
2595try:
2596    import bz2
2597except ImportError:
2598    pass
2599else:
2600    bytes_transform_encodings.append("bz2_codec")
2601    transform_aliases["bz2_codec"] = ["bz2"]
2602
2603
2604class TransformCodecTest(unittest.TestCase):
2605
2606    def test_basics(self):
2607        binput = bytes(range(256))
2608        for encoding in bytes_transform_encodings:
2609            with self.subTest(encoding=encoding):
2610                # generic codecs interface
2611                (o, size) = codecs.getencoder(encoding)(binput)
2612                self.assertEqual(size, len(binput))
2613                (i, size) = codecs.getdecoder(encoding)(o)
2614                self.assertEqual(size, len(o))
2615                self.assertEqual(i, binput)
2616
2617    def test_read(self):
2618        for encoding in bytes_transform_encodings:
2619            with self.subTest(encoding=encoding):
2620                sin = codecs.encode(b"\x80", encoding)
2621                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2622                sout = reader.read()
2623                self.assertEqual(sout, b"\x80")
2624
2625    def test_readline(self):
2626        for encoding in bytes_transform_encodings:
2627            with self.subTest(encoding=encoding):
2628                sin = codecs.encode(b"\x80", encoding)
2629                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2630                sout = reader.readline()
2631                self.assertEqual(sout, b"\x80")
2632
2633    def test_buffer_api_usage(self):
2634        # We check all the transform codecs accept memoryview input
2635        # for encoding and decoding
2636        # and also that they roundtrip correctly
2637        original = b"12345\x80"
2638        for encoding in bytes_transform_encodings:
2639            with self.subTest(encoding=encoding):
2640                data = original
2641                view = memoryview(data)
2642                data = codecs.encode(data, encoding)
2643                view_encoded = codecs.encode(view, encoding)
2644                self.assertEqual(view_encoded, data)
2645                view = memoryview(data)
2646                data = codecs.decode(data, encoding)
2647                self.assertEqual(data, original)
2648                view_decoded = codecs.decode(view, encoding)
2649                self.assertEqual(view_decoded, data)
2650
2651    def test_text_to_binary_blacklists_binary_transforms(self):
2652        # Check binary -> binary codecs give a good error for str input
2653        bad_input = "bad input type"
2654        for encoding in bytes_transform_encodings:
2655            with self.subTest(encoding=encoding):
2656                fmt = (r"{!r} is not a text encoding; "
2657                       r"use codecs.encode\(\) to handle arbitrary codecs")
2658                msg = fmt.format(encoding)
2659                with self.assertRaisesRegex(LookupError, msg) as failure:
2660                    bad_input.encode(encoding)
2661                self.assertIsNone(failure.exception.__cause__)
2662
2663    def test_text_to_binary_blacklists_text_transforms(self):
2664        # Check str.encode gives a good error message for str -> str codecs
2665        msg = (r"^'rot_13' is not a text encoding; "
2666               r"use codecs.encode\(\) to handle arbitrary codecs")
2667        with self.assertRaisesRegex(LookupError, msg):
2668            "just an example message".encode("rot_13")
2669
2670    def test_binary_to_text_blacklists_binary_transforms(self):
2671        # Check bytes.decode and bytearray.decode give a good error
2672        # message for binary -> binary codecs
2673        data = b"encode first to ensure we meet any format restrictions"
2674        for encoding in bytes_transform_encodings:
2675            with self.subTest(encoding=encoding):
2676                encoded_data = codecs.encode(data, encoding)
2677                fmt = (r"{!r} is not a text encoding; "
2678                       r"use codecs.decode\(\) to handle arbitrary codecs")
2679                msg = fmt.format(encoding)
2680                with self.assertRaisesRegex(LookupError, msg):
2681                    encoded_data.decode(encoding)
2682                with self.assertRaisesRegex(LookupError, msg):
2683                    bytearray(encoded_data).decode(encoding)
2684
2685    def test_binary_to_text_blacklists_text_transforms(self):
2686        # Check str -> str codec gives a good error for binary input
2687        for bad_input in (b"immutable", bytearray(b"mutable")):
2688            with self.subTest(bad_input=bad_input):
2689                msg = (r"^'rot_13' is not a text encoding; "
2690                       r"use codecs.decode\(\) to handle arbitrary codecs")
2691                with self.assertRaisesRegex(LookupError, msg) as failure:
2692                    bad_input.decode("rot_13")
2693                self.assertIsNone(failure.exception.__cause__)
2694
2695    @unittest.skipUnless(zlib, "Requires zlib support")
2696    def test_custom_zlib_error_is_wrapped(self):
2697        # Check zlib codec gives a good error for malformed input
2698        msg = "^decoding with 'zlib_codec' codec failed"
2699        with self.assertRaisesRegex(Exception, msg) as failure:
2700            codecs.decode(b"hello", "zlib_codec")
2701        self.assertIsInstance(failure.exception.__cause__,
2702                                                type(failure.exception))
2703
2704    def test_custom_hex_error_is_wrapped(self):
2705        # Check hex codec gives a good error for malformed input
2706        msg = "^decoding with 'hex_codec' codec failed"
2707        with self.assertRaisesRegex(Exception, msg) as failure:
2708            codecs.decode(b"hello", "hex_codec")
2709        self.assertIsInstance(failure.exception.__cause__,
2710                                                type(failure.exception))
2711
2712    # Unfortunately, the bz2 module throws OSError, which the codec
2713    # machinery currently can't wrap :(
2714
2715    # Ensure codec aliases from http://bugs.python.org/issue7475 work
2716    def test_aliases(self):
2717        for codec_name, aliases in transform_aliases.items():
2718            expected_name = codecs.lookup(codec_name).name
2719            for alias in aliases:
2720                with self.subTest(alias=alias):
2721                    info = codecs.lookup(alias)
2722                    self.assertEqual(info.name, expected_name)
2723
2724    def test_quopri_stateless(self):
2725        # Should encode with quotetabs=True
2726        encoded = codecs.encode(b"space tab\teol \n", "quopri-codec")
2727        self.assertEqual(encoded, b"space=20tab=09eol=20\n")
2728        # But should still support unescaped tabs and spaces
2729        unescaped = b"space tab eol\n"
2730        self.assertEqual(codecs.decode(unescaped, "quopri-codec"), unescaped)
2731
2732    def test_uu_invalid(self):
2733        # Missing "begin" line
2734        self.assertRaises(ValueError, codecs.decode, b"", "uu-codec")
2735
2736
2737# The codec system tries to wrap exceptions in order to ensure the error
2738# mentions the operation being performed and the codec involved. We
2739# currently *only* want this to happen for relatively stateless
2740# exceptions, where the only significant information they contain is their
2741# type and a single str argument.
2742
2743# Use a local codec registry to avoid appearing to leak objects when
2744# registering multiple search functions
2745_TEST_CODECS = {}
2746
2747def _get_test_codec(codec_name):
2748    return _TEST_CODECS.get(codec_name)
2749codecs.register(_get_test_codec) # Returns None, not usable as a decorator
2750
2751try:
2752    # Issue #22166: Also need to clear the internal cache in CPython
2753    from _codecs import _forget_codec
2754except ImportError:
2755    def _forget_codec(codec_name):
2756        pass
2757
2758
2759class ExceptionChainingTest(unittest.TestCase):
2760
2761    def setUp(self):
2762        # There's no way to unregister a codec search function, so we just
2763        # ensure we render this one fairly harmless after the test
2764        # case finishes by using the test case repr as the codec name
2765        # The codecs module normalizes codec names, although this doesn't
2766        # appear to be formally documented...
2767        # We also make sure we use a truly unique id for the custom codec
2768        # to avoid issues with the codec cache when running these tests
2769        # multiple times (e.g. when hunting for refleaks)
2770        unique_id = repr(self) + str(id(self))
2771        self.codec_name = encodings.normalize_encoding(unique_id).lower()
2772
2773        # We store the object to raise on the instance because of a bad
2774        # interaction between the codec caching (which means we can't
2775        # recreate the codec entry) and regrtest refleak hunting (which
2776        # runs the same test instance multiple times). This means we
2777        # need to ensure the codecs call back in to the instance to find
2778        # out which exception to raise rather than binding them in a
2779        # closure to an object that may change on the next run
2780        self.obj_to_raise = RuntimeError
2781
2782    def tearDown(self):
2783        _TEST_CODECS.pop(self.codec_name, None)
2784        # Issue #22166: Also pop from caches to avoid appearance of ref leaks
2785        encodings._cache.pop(self.codec_name, None)
2786        try:
2787            _forget_codec(self.codec_name)
2788        except KeyError:
2789            pass
2790
2791    def set_codec(self, encode, decode):
2792        codec_info = codecs.CodecInfo(encode, decode,
2793                                      name=self.codec_name)
2794        _TEST_CODECS[self.codec_name] = codec_info
2795
2796    @contextlib.contextmanager
2797    def assertWrapped(self, operation, exc_type, msg):
2798        full_msg = r"{} with {!r} codec failed \({}: {}\)".format(
2799                  operation, self.codec_name, exc_type.__name__, msg)
2800        with self.assertRaisesRegex(exc_type, full_msg) as caught:
2801            yield caught
2802        self.assertIsInstance(caught.exception.__cause__, exc_type)
2803        self.assertIsNotNone(caught.exception.__cause__.__traceback__)
2804
2805    def raise_obj(self, *args, **kwds):
2806        # Helper to dynamically change the object raised by a test codec
2807        raise self.obj_to_raise
2808
2809    def check_wrapped(self, obj_to_raise, msg, exc_type=RuntimeError):
2810        self.obj_to_raise = obj_to_raise
2811        self.set_codec(self.raise_obj, self.raise_obj)
2812        with self.assertWrapped("encoding", exc_type, msg):
2813            "str_input".encode(self.codec_name)
2814        with self.assertWrapped("encoding", exc_type, msg):
2815            codecs.encode("str_input", self.codec_name)
2816        with self.assertWrapped("decoding", exc_type, msg):
2817            b"bytes input".decode(self.codec_name)
2818        with self.assertWrapped("decoding", exc_type, msg):
2819            codecs.decode(b"bytes input", self.codec_name)
2820
2821    def test_raise_by_type(self):
2822        self.check_wrapped(RuntimeError, "")
2823
2824    def test_raise_by_value(self):
2825        msg = "This should be wrapped"
2826        self.check_wrapped(RuntimeError(msg), msg)
2827
2828    def test_raise_grandchild_subclass_exact_size(self):
2829        msg = "This should be wrapped"
2830        class MyRuntimeError(RuntimeError):
2831            __slots__ = ()
2832        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2833
2834    def test_raise_subclass_with_weakref_support(self):
2835        msg = "This should be wrapped"
2836        class MyRuntimeError(RuntimeError):
2837            pass
2838        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2839
2840    def check_not_wrapped(self, obj_to_raise, msg):
2841        def raise_obj(*args, **kwds):
2842            raise obj_to_raise
2843        self.set_codec(raise_obj, raise_obj)
2844        with self.assertRaisesRegex(RuntimeError, msg):
2845            "str input".encode(self.codec_name)
2846        with self.assertRaisesRegex(RuntimeError, msg):
2847            codecs.encode("str input", self.codec_name)
2848        with self.assertRaisesRegex(RuntimeError, msg):
2849            b"bytes input".decode(self.codec_name)
2850        with self.assertRaisesRegex(RuntimeError, msg):
2851            codecs.decode(b"bytes input", self.codec_name)
2852
2853    def test_init_override_is_not_wrapped(self):
2854        class CustomInit(RuntimeError):
2855            def __init__(self):
2856                pass
2857        self.check_not_wrapped(CustomInit, "")
2858
2859    def test_new_override_is_not_wrapped(self):
2860        class CustomNew(RuntimeError):
2861            def __new__(cls):
2862                return super().__new__(cls)
2863        self.check_not_wrapped(CustomNew, "")
2864
2865    def test_instance_attribute_is_not_wrapped(self):
2866        msg = "This should NOT be wrapped"
2867        exc = RuntimeError(msg)
2868        exc.attr = 1
2869        self.check_not_wrapped(exc, "^{}$".format(msg))
2870
2871    def test_non_str_arg_is_not_wrapped(self):
2872        self.check_not_wrapped(RuntimeError(1), "1")
2873
2874    def test_multiple_args_is_not_wrapped(self):
2875        msg_re = r"^\('a', 'b', 'c'\)$"
2876        self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg_re)
2877
2878    # http://bugs.python.org/issue19609
2879    def test_codec_lookup_failure_not_wrapped(self):
2880        msg = "^unknown encoding: {}$".format(self.codec_name)
2881        # The initial codec lookup should not be wrapped
2882        with self.assertRaisesRegex(LookupError, msg):
2883            "str input".encode(self.codec_name)
2884        with self.assertRaisesRegex(LookupError, msg):
2885            codecs.encode("str input", self.codec_name)
2886        with self.assertRaisesRegex(LookupError, msg):
2887            b"bytes input".decode(self.codec_name)
2888        with self.assertRaisesRegex(LookupError, msg):
2889            codecs.decode(b"bytes input", self.codec_name)
2890
2891    def test_unflagged_non_text_codec_handling(self):
2892        # The stdlib non-text codecs are now marked so they're
2893        # pre-emptively skipped by the text model related methods
2894        # However, third party codecs won't be flagged, so we still make
2895        # sure the case where an inappropriate output type is produced is
2896        # handled appropriately
2897        def encode_to_str(*args, **kwds):
2898            return "not bytes!", 0
2899        def decode_to_bytes(*args, **kwds):
2900            return b"not str!", 0
2901        self.set_codec(encode_to_str, decode_to_bytes)
2902        # No input or output type checks on the codecs module functions
2903        encoded = codecs.encode(None, self.codec_name)
2904        self.assertEqual(encoded, "not bytes!")
2905        decoded = codecs.decode(None, self.codec_name)
2906        self.assertEqual(decoded, b"not str!")
2907        # Text model methods should complain
2908        fmt = (r"^{!r} encoder returned 'str' instead of 'bytes'; "
2909               r"use codecs.encode\(\) to encode to arbitrary types$")
2910        msg = fmt.format(self.codec_name)
2911        with self.assertRaisesRegex(TypeError, msg):
2912            "str_input".encode(self.codec_name)
2913        fmt = (r"^{!r} decoder returned 'bytes' instead of 'str'; "
2914               r"use codecs.decode\(\) to decode to arbitrary types$")
2915        msg = fmt.format(self.codec_name)
2916        with self.assertRaisesRegex(TypeError, msg):
2917            b"bytes input".decode(self.codec_name)
2918
2919
2920
2921@unittest.skipUnless(sys.platform == 'win32',
2922                     'code pages are specific to Windows')
2923class CodePageTest(unittest.TestCase):
2924    CP_UTF8 = 65001
2925
2926    def test_invalid_code_page(self):
2927        self.assertRaises(ValueError, codecs.code_page_encode, -1, 'a')
2928        self.assertRaises(ValueError, codecs.code_page_decode, -1, b'a')
2929        self.assertRaises(OSError, codecs.code_page_encode, 123, 'a')
2930        self.assertRaises(OSError, codecs.code_page_decode, 123, b'a')
2931
2932    def test_code_page_name(self):
2933        self.assertRaisesRegex(UnicodeEncodeError, 'cp932',
2934            codecs.code_page_encode, 932, '\xff')
2935        self.assertRaisesRegex(UnicodeDecodeError, 'cp932',
2936            codecs.code_page_decode, 932, b'\x81\x00', 'strict', True)
2937        self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8',
2938            codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True)
2939
2940    def check_decode(self, cp, tests):
2941        for raw, errors, expected in tests:
2942            if expected is not None:
2943                try:
2944                    decoded = codecs.code_page_decode(cp, raw, errors, True)
2945                except UnicodeDecodeError as err:
2946                    self.fail('Unable to decode %a from "cp%s" with '
2947                              'errors=%r: %s' % (raw, cp, errors, err))
2948                self.assertEqual(decoded[0], expected,
2949                    '%a.decode("cp%s", %r)=%a != %a'
2950                    % (raw, cp, errors, decoded[0], expected))
2951                # assert 0 <= decoded[1] <= len(raw)
2952                self.assertGreaterEqual(decoded[1], 0)
2953                self.assertLessEqual(decoded[1], len(raw))
2954            else:
2955                self.assertRaises(UnicodeDecodeError,
2956                    codecs.code_page_decode, cp, raw, errors, True)
2957
2958    def check_encode(self, cp, tests):
2959        for text, errors, expected in tests:
2960            if expected is not None:
2961                try:
2962                    encoded = codecs.code_page_encode(cp, text, errors)
2963                except UnicodeEncodeError as err:
2964                    self.fail('Unable to encode %a to "cp%s" with '
2965                              'errors=%r: %s' % (text, cp, errors, err))
2966                self.assertEqual(encoded[0], expected,
2967                    '%a.encode("cp%s", %r)=%a != %a'
2968                    % (text, cp, errors, encoded[0], expected))
2969                self.assertEqual(encoded[1], len(text))
2970            else:
2971                self.assertRaises(UnicodeEncodeError,
2972                    codecs.code_page_encode, cp, text, errors)
2973
2974    def test_cp932(self):
2975        self.check_encode(932, (
2976            ('abc', 'strict', b'abc'),
2977            ('\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
2978            # test error handlers
2979            ('\xff', 'strict', None),
2980            ('[\xff]', 'ignore', b'[]'),
2981            ('[\xff]', 'replace', b'[y]'),
2982            ('[\u20ac]', 'replace', b'[?]'),
2983            ('[\xff]', 'backslashreplace', b'[\\xff]'),
2984            ('[\xff]', 'namereplace',
2985             b'[\\N{LATIN SMALL LETTER Y WITH DIAERESIS}]'),
2986            ('[\xff]', 'xmlcharrefreplace', b'[&#255;]'),
2987            ('\udcff', 'strict', None),
2988            ('[\udcff]', 'surrogateescape', b'[\xff]'),
2989            ('[\udcff]', 'surrogatepass', None),
2990        ))
2991        self.check_decode(932, (
2992            (b'abc', 'strict', 'abc'),
2993            (b'\x82\x84\xe9\x80', 'strict', '\uff44\u9a3e'),
2994            # invalid bytes
2995            (b'[\xff]', 'strict', None),
2996            (b'[\xff]', 'ignore', '[]'),
2997            (b'[\xff]', 'replace', '[\ufffd]'),
2998            (b'[\xff]', 'backslashreplace', '[\\xff]'),
2999            (b'[\xff]', 'surrogateescape', '[\udcff]'),
3000            (b'[\xff]', 'surrogatepass', None),
3001            (b'\x81\x00abc', 'strict', None),
3002            (b'\x81\x00abc', 'ignore', '\x00abc'),
3003            (b'\x81\x00abc', 'replace', '\ufffd\x00abc'),
3004            (b'\x81\x00abc', 'backslashreplace', '\\x81\x00abc'),
3005        ))
3006
3007    def test_cp1252(self):
3008        self.check_encode(1252, (
3009            ('abc', 'strict', b'abc'),
3010            ('\xe9\u20ac', 'strict',  b'\xe9\x80'),
3011            ('\xff', 'strict', b'\xff'),
3012            # test error handlers
3013            ('\u0141', 'strict', None),
3014            ('\u0141', 'ignore', b''),
3015            ('\u0141', 'replace', b'L'),
3016            ('\udc98', 'surrogateescape', b'\x98'),
3017            ('\udc98', 'surrogatepass', None),
3018        ))
3019        self.check_decode(1252, (
3020            (b'abc', 'strict', 'abc'),
3021            (b'\xe9\x80', 'strict', '\xe9\u20ac'),
3022            (b'\xff', 'strict', '\xff'),
3023        ))
3024
3025    def test_cp_utf7(self):
3026        cp = 65000
3027        self.check_encode(cp, (
3028            ('abc', 'strict', b'abc'),
3029            ('\xe9\u20ac', 'strict',  b'+AOkgrA-'),
3030            ('\U0010ffff', 'strict',  b'+2//f/w-'),
3031            ('\udc80', 'strict', b'+3IA-'),
3032            ('\ufffd', 'strict', b'+//0-'),
3033        ))
3034        self.check_decode(cp, (
3035            (b'abc', 'strict', 'abc'),
3036            (b'+AOkgrA-', 'strict', '\xe9\u20ac'),
3037            (b'+2//f/w-', 'strict', '\U0010ffff'),
3038            (b'+3IA-', 'strict', '\udc80'),
3039            (b'+//0-', 'strict', '\ufffd'),
3040            # invalid bytes
3041            (b'[+/]', 'strict', '[]'),
3042            (b'[\xff]', 'strict', '[\xff]'),
3043        ))
3044
3045    def test_multibyte_encoding(self):
3046        self.check_decode(932, (
3047            (b'\x84\xe9\x80', 'ignore', '\u9a3e'),
3048            (b'\x84\xe9\x80', 'replace', '\ufffd\u9a3e'),
3049        ))
3050        self.check_decode(self.CP_UTF8, (
3051            (b'\xff\xf4\x8f\xbf\xbf', 'ignore', '\U0010ffff'),
3052            (b'\xff\xf4\x8f\xbf\xbf', 'replace', '\ufffd\U0010ffff'),
3053        ))
3054        self.check_encode(self.CP_UTF8, (
3055            ('[\U0010ffff\uDC80]', 'ignore', b'[\xf4\x8f\xbf\xbf]'),
3056            ('[\U0010ffff\uDC80]', 'replace', b'[\xf4\x8f\xbf\xbf?]'),
3057        ))
3058
3059    def test_code_page_decode_flags(self):
3060        # Issue #36312: For some code pages (e.g. UTF-7) flags for
3061        # MultiByteToWideChar() must be set to 0.
3062        if support.verbose:
3063            sys.stdout.write('\n')
3064        for cp in (50220, 50221, 50222, 50225, 50227, 50229,
3065                   *range(57002, 57011+1), 65000):
3066            # On small versions of Windows like Windows IoT
3067            # not all codepages are present.
3068            # A missing codepage causes an OSError exception
3069            # so check for the codepage before decoding
3070            if is_code_page_present(cp):
3071                self.assertEqual(codecs.code_page_decode(cp, b'abc'), ('abc', 3), f'cp{cp}')
3072            else:
3073                if support.verbose:
3074                    print(f"  skipping cp={cp}")
3075        self.assertEqual(codecs.code_page_decode(42, b'abc'),
3076                         ('\uf061\uf062\uf063', 3))
3077
3078    def test_incremental(self):
3079        decoded = codecs.code_page_decode(932, b'\x82', 'strict', False)
3080        self.assertEqual(decoded, ('', 0))
3081
3082        decoded = codecs.code_page_decode(932,
3083                                          b'\xe9\x80\xe9', 'strict',
3084                                          False)
3085        self.assertEqual(decoded, ('\u9a3e', 2))
3086
3087        decoded = codecs.code_page_decode(932,
3088                                          b'\xe9\x80\xe9\x80', 'strict',
3089                                          False)
3090        self.assertEqual(decoded, ('\u9a3e\u9a3e', 4))
3091
3092        decoded = codecs.code_page_decode(932,
3093                                          b'abc', 'strict',
3094                                          False)
3095        self.assertEqual(decoded, ('abc', 3))
3096
3097    def test_mbcs_alias(self):
3098        # Check that looking up our 'default' codepage will return
3099        # mbcs when we don't have a more specific one available
3100        with mock.patch('_winapi.GetACP', return_value=123):
3101            codec = codecs.lookup('cp123')
3102            self.assertEqual(codec.name, 'mbcs')
3103
3104    @support.bigmemtest(size=2**31, memuse=7, dry_run=False)
3105    def test_large_input(self, size):
3106        # Test input longer than INT_MAX.
3107        # Input should contain undecodable bytes before and after
3108        # the INT_MAX limit.
3109        encoded = (b'01234567' * ((size//8)-1) +
3110                   b'\x85\x86\xea\xeb\xec\xef\xfc\xfd\xfe\xff')
3111        self.assertEqual(len(encoded), size+2)
3112        decoded = codecs.code_page_decode(932, encoded, 'surrogateescape', True)
3113        self.assertEqual(decoded[1], len(encoded))
3114        del encoded
3115        self.assertEqual(len(decoded[0]), decoded[1])
3116        self.assertEqual(decoded[0][:10], '0123456701')
3117        self.assertEqual(decoded[0][-20:],
3118                         '6701234567'
3119                         '\udc85\udc86\udcea\udceb\udcec'
3120                         '\udcef\udcfc\udcfd\udcfe\udcff')
3121
3122    @support.bigmemtest(size=2**31, memuse=6, dry_run=False)
3123    def test_large_utf8_input(self, size):
3124        # Test input longer than INT_MAX.
3125        # Input should contain a decodable multi-byte character
3126        # surrounding INT_MAX
3127        encoded = (b'0123456\xed\x84\x80' * (size//8))
3128        self.assertEqual(len(encoded), size // 8 * 10)
3129        decoded = codecs.code_page_decode(65001, encoded, 'ignore', True)
3130        self.assertEqual(decoded[1], len(encoded))
3131        del encoded
3132        self.assertEqual(len(decoded[0]), size)
3133        self.assertEqual(decoded[0][:10], '0123456\ud10001')
3134        self.assertEqual(decoded[0][-11:], '56\ud1000123456\ud100')
3135
3136
3137class ASCIITest(unittest.TestCase):
3138    def test_encode(self):
3139        self.assertEqual('abc123'.encode('ascii'), b'abc123')
3140
3141    def test_encode_error(self):
3142        for data, error_handler, expected in (
3143            ('[\x80\xff\u20ac]', 'ignore', b'[]'),
3144            ('[\x80\xff\u20ac]', 'replace', b'[???]'),
3145            ('[\x80\xff\u20ac]', 'xmlcharrefreplace', b'[&#128;&#255;&#8364;]'),
3146            ('[\x80\xff\u20ac\U000abcde]', 'backslashreplace',
3147             b'[\\x80\\xff\\u20ac\\U000abcde]'),
3148            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3149        ):
3150            with self.subTest(data=data, error_handler=error_handler,
3151                              expected=expected):
3152                self.assertEqual(data.encode('ascii', error_handler),
3153                                 expected)
3154
3155    def test_encode_surrogateescape_error(self):
3156        with self.assertRaises(UnicodeEncodeError):
3157            # the first character can be decoded, but not the second
3158            '\udc80\xff'.encode('ascii', 'surrogateescape')
3159
3160    def test_decode(self):
3161        self.assertEqual(b'abc'.decode('ascii'), 'abc')
3162
3163    def test_decode_error(self):
3164        for data, error_handler, expected in (
3165            (b'[\x80\xff]', 'ignore', '[]'),
3166            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
3167            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
3168            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
3169        ):
3170            with self.subTest(data=data, error_handler=error_handler,
3171                              expected=expected):
3172                self.assertEqual(data.decode('ascii', error_handler),
3173                                 expected)
3174
3175
3176class Latin1Test(unittest.TestCase):
3177    def test_encode(self):
3178        for data, expected in (
3179            ('abc', b'abc'),
3180            ('\x80\xe9\xff', b'\x80\xe9\xff'),
3181        ):
3182            with self.subTest(data=data, expected=expected):
3183                self.assertEqual(data.encode('latin1'), expected)
3184
3185    def test_encode_errors(self):
3186        for data, error_handler, expected in (
3187            ('[\u20ac\udc80]', 'ignore', b'[]'),
3188            ('[\u20ac\udc80]', 'replace', b'[??]'),
3189            ('[\u20ac\U000abcde]', 'backslashreplace',
3190             b'[\\u20ac\\U000abcde]'),
3191            ('[\u20ac\udc80]', 'xmlcharrefreplace', b'[&#8364;&#56448;]'),
3192            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3193        ):
3194            with self.subTest(data=data, error_handler=error_handler,
3195                              expected=expected):
3196                self.assertEqual(data.encode('latin1', error_handler),
3197                                 expected)
3198
3199    def test_encode_surrogateescape_error(self):
3200        with self.assertRaises(UnicodeEncodeError):
3201            # the first character can be decoded, but not the second
3202            '\udc80\u20ac'.encode('latin1', 'surrogateescape')
3203
3204    def test_decode(self):
3205        for data, expected in (
3206            (b'abc', 'abc'),
3207            (b'[\x80\xff]', '[\x80\xff]'),
3208        ):
3209            with self.subTest(data=data, expected=expected):
3210                self.assertEqual(data.decode('latin1'), expected)
3211
3212
3213class StreamRecoderTest(unittest.TestCase):
3214    def test_writelines(self):
3215        bio = io.BytesIO()
3216        codec = codecs.lookup('ascii')
3217        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3218                                  encodings.ascii.StreamReader, encodings.ascii.StreamWriter)
3219        sr.writelines([b'a', b'b'])
3220        self.assertEqual(bio.getvalue(), b'ab')
3221
3222    def test_write(self):
3223        bio = io.BytesIO()
3224        codec = codecs.lookup('latin1')
3225        # Recode from Latin-1 to utf-8.
3226        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3227                                  encodings.utf_8.StreamReader, encodings.utf_8.StreamWriter)
3228
3229        text = 'àñé'
3230        sr.write(text.encode('latin1'))
3231        self.assertEqual(bio.getvalue(), text.encode('utf-8'))
3232
3233    def test_seeking_read(self):
3234        bio = io.BytesIO('line1\nline2\nline3\n'.encode('utf-16-le'))
3235        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3236
3237        self.assertEqual(sr.readline(), b'line1\n')
3238        sr.seek(0)
3239        self.assertEqual(sr.readline(), b'line1\n')
3240        self.assertEqual(sr.readline(), b'line2\n')
3241        self.assertEqual(sr.readline(), b'line3\n')
3242        self.assertEqual(sr.readline(), b'')
3243
3244    def test_seeking_write(self):
3245        bio = io.BytesIO('123456789\n'.encode('utf-16-le'))
3246        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3247
3248        # Test that seek() only resets its internal buffer when offset
3249        # and whence are zero.
3250        sr.seek(2)
3251        sr.write(b'\nabc\n')
3252        self.assertEqual(sr.readline(), b'789\n')
3253        sr.seek(0)
3254        self.assertEqual(sr.readline(), b'1\n')
3255        self.assertEqual(sr.readline(), b'abc\n')
3256        self.assertEqual(sr.readline(), b'789\n')
3257
3258
3259@unittest.skipIf(_testcapi is None, 'need _testcapi module')
3260class LocaleCodecTest(unittest.TestCase):
3261    """
3262    Test indirectly _Py_DecodeUTF8Ex() and _Py_EncodeUTF8Ex().
3263    """
3264    ENCODING = sys.getfilesystemencoding()
3265    STRINGS = ("ascii", "ulatin1:\xa7\xe9",
3266               "u255:\xff",
3267               "UCS:\xe9\u20ac\U0010ffff",
3268               "surrogates:\uDC80\uDCFF")
3269    BYTES_STRINGS = (b"blatin1:\xa7\xe9", b"b255:\xff")
3270    SURROGATES = "\uDC80\uDCFF"
3271
3272    def encode(self, text, errors="strict"):
3273        return _testcapi.EncodeLocaleEx(text, 0, errors)
3274
3275    def check_encode_strings(self, errors):
3276        for text in self.STRINGS:
3277            with self.subTest(text=text):
3278                try:
3279                    expected = text.encode(self.ENCODING, errors)
3280                except UnicodeEncodeError:
3281                    with self.assertRaises(RuntimeError) as cm:
3282                        self.encode(text, errors)
3283                    errmsg = str(cm.exception)
3284                    self.assertRegex(errmsg, r"encode error: pos=[0-9]+, reason=")
3285                else:
3286                    encoded = self.encode(text, errors)
3287                    self.assertEqual(encoded, expected)
3288
3289    def test_encode_strict(self):
3290        self.check_encode_strings("strict")
3291
3292    def test_encode_surrogateescape(self):
3293        self.check_encode_strings("surrogateescape")
3294
3295    def test_encode_surrogatepass(self):
3296        try:
3297            self.encode('', 'surrogatepass')
3298        except ValueError as exc:
3299            if str(exc) == 'unsupported error handler':
3300                self.skipTest(f"{self.ENCODING!r} encoder doesn't support "
3301                              f"surrogatepass error handler")
3302            else:
3303                raise
3304
3305        self.check_encode_strings("surrogatepass")
3306
3307    def test_encode_unsupported_error_handler(self):
3308        with self.assertRaises(ValueError) as cm:
3309            self.encode('', 'backslashreplace')
3310        self.assertEqual(str(cm.exception), 'unsupported error handler')
3311
3312    def decode(self, encoded, errors="strict"):
3313        return _testcapi.DecodeLocaleEx(encoded, 0, errors)
3314
3315    def check_decode_strings(self, errors):
3316        is_utf8 = (self.ENCODING == "utf-8")
3317        if is_utf8:
3318            encode_errors = 'surrogateescape'
3319        else:
3320            encode_errors = 'strict'
3321
3322        strings = list(self.BYTES_STRINGS)
3323        for text in self.STRINGS:
3324            try:
3325                encoded = text.encode(self.ENCODING, encode_errors)
3326                if encoded not in strings:
3327                    strings.append(encoded)
3328            except UnicodeEncodeError:
3329                encoded = None
3330
3331            if is_utf8:
3332                encoded2 = text.encode(self.ENCODING, 'surrogatepass')
3333                if encoded2 != encoded:
3334                    strings.append(encoded2)
3335
3336        for encoded in strings:
3337            with self.subTest(encoded=encoded):
3338                try:
3339                    expected = encoded.decode(self.ENCODING, errors)
3340                except UnicodeDecodeError:
3341                    with self.assertRaises(RuntimeError) as cm:
3342                        self.decode(encoded, errors)
3343                    errmsg = str(cm.exception)
3344                    self.assertTrue(errmsg.startswith("decode error: "), errmsg)
3345                else:
3346                    decoded = self.decode(encoded, errors)
3347                    self.assertEqual(decoded, expected)
3348
3349    def test_decode_strict(self):
3350        self.check_decode_strings("strict")
3351
3352    def test_decode_surrogateescape(self):
3353        self.check_decode_strings("surrogateescape")
3354
3355    def test_decode_surrogatepass(self):
3356        try:
3357            self.decode(b'', 'surrogatepass')
3358        except ValueError as exc:
3359            if str(exc) == 'unsupported error handler':
3360                self.skipTest(f"{self.ENCODING!r} decoder doesn't support "
3361                              f"surrogatepass error handler")
3362            else:
3363                raise
3364
3365        self.check_decode_strings("surrogatepass")
3366
3367    def test_decode_unsupported_error_handler(self):
3368        with self.assertRaises(ValueError) as cm:
3369            self.decode(b'', 'backslashreplace')
3370        self.assertEqual(str(cm.exception), 'unsupported error handler')
3371
3372
3373class Rot13Test(unittest.TestCase):
3374    """Test the educational ROT-13 codec."""
3375    def test_encode(self):
3376        ciphertext = codecs.encode("Caesar liked ciphers", 'rot-13')
3377        self.assertEqual(ciphertext, 'Pnrfne yvxrq pvcuref')
3378
3379    def test_decode(self):
3380        plaintext = codecs.decode('Rg gh, Oehgr?', 'rot-13')
3381        self.assertEqual(plaintext, 'Et tu, Brute?')
3382
3383    def test_incremental_encode(self):
3384        encoder = codecs.getincrementalencoder('rot-13')()
3385        ciphertext = encoder.encode('ABBA nag Cheryl Baker')
3386        self.assertEqual(ciphertext, 'NOON ant Purely Onxre')
3387
3388    def test_incremental_decode(self):
3389        decoder = codecs.getincrementaldecoder('rot-13')()
3390        plaintext = decoder.decode('terra Ares envy tha')
3391        self.assertEqual(plaintext, 'green Nerf rail gun')
3392
3393
3394class Rot13UtilTest(unittest.TestCase):
3395    """Test the ROT-13 codec via rot13 function,
3396    i.e. the user has done something like:
3397    $ echo "Hello World" | python -m encodings.rot_13
3398    """
3399    def test_rot13_func(self):
3400        infile = io.StringIO('Gb or, be abg gb or, gung vf gur dhrfgvba')
3401        outfile = io.StringIO()
3402        encodings.rot_13.rot13(infile, outfile)
3403        outfile.seek(0)
3404        plain_text = outfile.read()
3405        self.assertEqual(
3406            plain_text,
3407            'To be, or not to be, that is the question')
3408
3409
3410if __name__ == "__main__":
3411    unittest.main()
3412