• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import codecs
2import contextlib
3import io
4import locale
5import sys
6import unittest
7import encodings
8from unittest import mock
9
10from test import support
11from test.support import os_helper
12from test.support import warnings_helper
13
14try:
15    import _testcapi
16except ImportError:
17    _testcapi = None
18try:
19    import _testinternalcapi
20except ImportError:
21    _testinternalcapi = None
22
23try:
24    import ctypes
25except ImportError:
26    ctypes = None
27    SIZEOF_WCHAR_T = -1
28else:
29    SIZEOF_WCHAR_T = ctypes.sizeof(ctypes.c_wchar)
30
31def coding_checker(self, coder):
32    def check(input, expect):
33        self.assertEqual(coder(input), (expect, len(input)))
34    return check
35
36# On small versions of Windows like Windows IoT or Windows Nano Server not all codepages are present
37def is_code_page_present(cp):
38    from ctypes import POINTER, WINFUNCTYPE, WinDLL
39    from ctypes.wintypes import BOOL, BYTE, WCHAR, UINT, DWORD
40
41    MAX_LEADBYTES = 12  # 5 ranges, 2 bytes ea., 0 term.
42    MAX_DEFAULTCHAR = 2 # single or double byte
43    MAX_PATH = 260
44    class CPINFOEXW(ctypes.Structure):
45        _fields_ = [("MaxCharSize", UINT),
46                    ("DefaultChar", BYTE*MAX_DEFAULTCHAR),
47                    ("LeadByte", BYTE*MAX_LEADBYTES),
48                    ("UnicodeDefaultChar", WCHAR),
49                    ("CodePage", UINT),
50                    ("CodePageName", WCHAR*MAX_PATH)]
51
52    prototype = WINFUNCTYPE(BOOL, UINT, DWORD, POINTER(CPINFOEXW))
53    GetCPInfoEx = prototype(("GetCPInfoExW", WinDLL("kernel32")))
54    info = CPINFOEXW()
55    return GetCPInfoEx(cp, 0, info)
56
57class Queue(object):
58    """
59    queue: write bytes at one end, read bytes from the other end
60    """
61    def __init__(self, buffer):
62        self._buffer = buffer
63
64    def write(self, chars):
65        self._buffer += chars
66
67    def read(self, size=-1):
68        if size<0:
69            s = self._buffer
70            self._buffer = self._buffer[:0] # make empty
71            return s
72        else:
73            s = self._buffer[:size]
74            self._buffer = self._buffer[size:]
75            return s
76
77
78class MixInCheckStateHandling:
79    def check_state_handling_decode(self, encoding, u, s):
80        for i in range(len(s)+1):
81            d = codecs.getincrementaldecoder(encoding)()
82            part1 = d.decode(s[:i])
83            state = d.getstate()
84            self.assertIsInstance(state[1], int)
85            # Check that the condition stated in the documentation for
86            # IncrementalDecoder.getstate() holds
87            if not state[1]:
88                # reset decoder to the default state without anything buffered
89                d.setstate((state[0][:0], 0))
90                # Feeding the previous input may not produce any output
91                self.assertTrue(not d.decode(state[0]))
92                # The decoder must return to the same state
93                self.assertEqual(state, d.getstate())
94            # Create a new decoder and set it to the state
95            # we extracted from the old one
96            d = codecs.getincrementaldecoder(encoding)()
97            d.setstate(state)
98            part2 = d.decode(s[i:], True)
99            self.assertEqual(u, part1+part2)
100
101    def check_state_handling_encode(self, encoding, u, s):
102        for i in range(len(u)+1):
103            d = codecs.getincrementalencoder(encoding)()
104            part1 = d.encode(u[:i])
105            state = d.getstate()
106            d = codecs.getincrementalencoder(encoding)()
107            d.setstate(state)
108            part2 = d.encode(u[i:], True)
109            self.assertEqual(s, part1+part2)
110
111
112class ReadTest(MixInCheckStateHandling):
113    def check_partial(self, input, partialresults):
114        # get a StreamReader for the encoding and feed the bytestring version
115        # of input to the reader byte by byte. Read everything available from
116        # the StreamReader and check that the results equal the appropriate
117        # entries from partialresults.
118        q = Queue(b"")
119        r = codecs.getreader(self.encoding)(q)
120        result = ""
121        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
122            q.write(bytes([c]))
123            result += r.read()
124            self.assertEqual(result, partialresult)
125        # check that there's nothing left in the buffers
126        self.assertEqual(r.read(), "")
127        self.assertEqual(r.bytebuffer, b"")
128
129        # do the check again, this time using an incremental decoder
130        d = codecs.getincrementaldecoder(self.encoding)()
131        result = ""
132        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
133            result += d.decode(bytes([c]))
134            self.assertEqual(result, partialresult)
135        # check that there's nothing left in the buffers
136        self.assertEqual(d.decode(b"", True), "")
137        self.assertEqual(d.buffer, b"")
138
139        # Check whether the reset method works properly
140        d.reset()
141        result = ""
142        for (c, partialresult) in zip(input.encode(self.encoding), partialresults, strict=True):
143            result += d.decode(bytes([c]))
144            self.assertEqual(result, partialresult)
145        # check that there's nothing left in the buffers
146        self.assertEqual(d.decode(b"", True), "")
147        self.assertEqual(d.buffer, b"")
148
149        # check iterdecode()
150        encoded = input.encode(self.encoding)
151        self.assertEqual(
152            input,
153            "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding))
154        )
155
156    def test_readline(self):
157        def getreader(input):
158            stream = io.BytesIO(input.encode(self.encoding))
159            return codecs.getreader(self.encoding)(stream)
160
161        def readalllines(input, keepends=True, size=None):
162            reader = getreader(input)
163            lines = []
164            while True:
165                line = reader.readline(size=size, keepends=keepends)
166                if not line:
167                    break
168                lines.append(line)
169            return "|".join(lines)
170
171        s = "foo\nbar\r\nbaz\rspam\u2028eggs"
172        sexpected = "foo\n|bar\r\n|baz\r|spam\u2028|eggs"
173        sexpectednoends = "foo|bar|baz|spam|eggs"
174        self.assertEqual(readalllines(s, True), sexpected)
175        self.assertEqual(readalllines(s, False), sexpectednoends)
176        self.assertEqual(readalllines(s, True, 10), sexpected)
177        self.assertEqual(readalllines(s, False, 10), sexpectednoends)
178
179        lineends = ("\n", "\r\n", "\r", "\u2028")
180        # Test long lines (multiple calls to read() in readline())
181        vw = []
182        vwo = []
183        for (i, lineend) in enumerate(lineends):
184            vw.append((i*200+200)*"\u3042" + lineend)
185            vwo.append((i*200+200)*"\u3042")
186        self.assertEqual(readalllines("".join(vw), True), "|".join(vw))
187        self.assertEqual(readalllines("".join(vw), False), "|".join(vwo))
188
189        # Test lines where the first read might end with \r, so the
190        # reader has to look ahead whether this is a lone \r or a \r\n
191        for size in range(80):
192            for lineend in lineends:
193                s = 10*(size*"a" + lineend + "xxx\n")
194                reader = getreader(s)
195                for i in range(10):
196                    self.assertEqual(
197                        reader.readline(keepends=True),
198                        size*"a" + lineend,
199                    )
200                    self.assertEqual(
201                        reader.readline(keepends=True),
202                        "xxx\n",
203                    )
204                reader = getreader(s)
205                for i in range(10):
206                    self.assertEqual(
207                        reader.readline(keepends=False),
208                        size*"a",
209                    )
210                    self.assertEqual(
211                        reader.readline(keepends=False),
212                        "xxx",
213                    )
214
215    def test_mixed_readline_and_read(self):
216        lines = ["Humpty Dumpty sat on a wall,\n",
217                 "Humpty Dumpty had a great fall.\r\n",
218                 "All the king's horses and all the king's men\r",
219                 "Couldn't put Humpty together again."]
220        data = ''.join(lines)
221        def getreader():
222            stream = io.BytesIO(data.encode(self.encoding))
223            return codecs.getreader(self.encoding)(stream)
224
225        # Issue #8260: Test readline() followed by read()
226        f = getreader()
227        self.assertEqual(f.readline(), lines[0])
228        self.assertEqual(f.read(), ''.join(lines[1:]))
229        self.assertEqual(f.read(), '')
230
231        # Issue #32110: Test readline() followed by read(n)
232        f = getreader()
233        self.assertEqual(f.readline(), lines[0])
234        self.assertEqual(f.read(1), lines[1][0])
235        self.assertEqual(f.read(0), '')
236        self.assertEqual(f.read(100), data[len(lines[0]) + 1:][:100])
237
238        # Issue #16636: Test readline() followed by readlines()
239        f = getreader()
240        self.assertEqual(f.readline(), lines[0])
241        self.assertEqual(f.readlines(), lines[1:])
242        self.assertEqual(f.read(), '')
243
244        # Test read(n) followed by read()
245        f = getreader()
246        self.assertEqual(f.read(size=40, chars=5), data[:5])
247        self.assertEqual(f.read(), data[5:])
248        self.assertEqual(f.read(), '')
249
250        # Issue #32110: Test read(n) followed by read(n)
251        f = getreader()
252        self.assertEqual(f.read(size=40, chars=5), data[:5])
253        self.assertEqual(f.read(1), data[5])
254        self.assertEqual(f.read(0), '')
255        self.assertEqual(f.read(100), data[6:106])
256
257        # Issue #12446: Test read(n) followed by readlines()
258        f = getreader()
259        self.assertEqual(f.read(size=40, chars=5), data[:5])
260        self.assertEqual(f.readlines(), [lines[0][5:]] + lines[1:])
261        self.assertEqual(f.read(), '')
262
263    def test_bug1175396(self):
264        s = [
265            '<%!--===================================================\r\n',
266            '    BLOG index page: show recent articles,\r\n',
267            '    today\'s articles, or articles of a specific date.\r\n',
268            '========================================================--%>\r\n',
269            '<%@inputencoding="ISO-8859-1"%>\r\n',
270            '<%@pagetemplate=TEMPLATE.y%>\r\n',
271            '<%@import=import frog.util, frog%>\r\n',
272            '<%@import=import frog.objects%>\r\n',
273            '<%@import=from frog.storageerrors import StorageError%>\r\n',
274            '<%\r\n',
275            '\r\n',
276            'import logging\r\n',
277            'log=logging.getLogger("Snakelets.logger")\r\n',
278            '\r\n',
279            '\r\n',
280            'user=self.SessionCtx.user\r\n',
281            'storageEngine=self.SessionCtx.storageEngine\r\n',
282            '\r\n',
283            '\r\n',
284            'def readArticlesFromDate(date, count=None):\r\n',
285            '    entryids=storageEngine.listBlogEntries(date)\r\n',
286            '    entryids.reverse() # descending\r\n',
287            '    if count:\r\n',
288            '        entryids=entryids[:count]\r\n',
289            '    try:\r\n',
290            '        return [ frog.objects.BlogEntry.load(storageEngine, date, Id) for Id in entryids ]\r\n',
291            '    except StorageError,x:\r\n',
292            '        log.error("Error loading articles: "+str(x))\r\n',
293            '        self.abort("cannot load articles")\r\n',
294            '\r\n',
295            'showdate=None\r\n',
296            '\r\n',
297            'arg=self.Request.getArg()\r\n',
298            'if arg=="today":\r\n',
299            '    #-------------------- TODAY\'S ARTICLES\r\n',
300            '    self.write("<h2>Today\'s articles</h2>")\r\n',
301            '    showdate = frog.util.isodatestr() \r\n',
302            '    entries = readArticlesFromDate(showdate)\r\n',
303            'elif arg=="active":\r\n',
304            '    #-------------------- ACTIVE ARTICLES redirect\r\n',
305            '    self.Yredirect("active.y")\r\n',
306            'elif arg=="login":\r\n',
307            '    #-------------------- LOGIN PAGE redirect\r\n',
308            '    self.Yredirect("login.y")\r\n',
309            'elif arg=="date":\r\n',
310            '    #-------------------- ARTICLES OF A SPECIFIC DATE\r\n',
311            '    showdate = self.Request.getParameter("date")\r\n',
312            '    self.write("<h2>Articles written on %s</h2>"% frog.util.mediumdatestr(showdate))\r\n',
313            '    entries = readArticlesFromDate(showdate)\r\n',
314            'else:\r\n',
315            '    #-------------------- RECENT ARTICLES\r\n',
316            '    self.write("<h2>Recent articles</h2>")\r\n',
317            '    dates=storageEngine.listBlogEntryDates()\r\n',
318            '    if dates:\r\n',
319            '        entries=[]\r\n',
320            '        SHOWAMOUNT=10\r\n',
321            '        for showdate in dates:\r\n',
322            '            entries.extend( readArticlesFromDate(showdate, SHOWAMOUNT-len(entries)) )\r\n',
323            '            if len(entries)>=SHOWAMOUNT:\r\n',
324            '                break\r\n',
325            '                \r\n',
326        ]
327        stream = io.BytesIO("".join(s).encode(self.encoding))
328        reader = codecs.getreader(self.encoding)(stream)
329        for (i, line) in enumerate(reader):
330            self.assertEqual(line, s[i])
331
332    def test_readlinequeue(self):
333        q = Queue(b"")
334        writer = codecs.getwriter(self.encoding)(q)
335        reader = codecs.getreader(self.encoding)(q)
336
337        # No lineends
338        writer.write("foo\r")
339        self.assertEqual(reader.readline(keepends=False), "foo")
340        writer.write("\nbar\r")
341        self.assertEqual(reader.readline(keepends=False), "")
342        self.assertEqual(reader.readline(keepends=False), "bar")
343        writer.write("baz")
344        self.assertEqual(reader.readline(keepends=False), "baz")
345        self.assertEqual(reader.readline(keepends=False), "")
346
347        # Lineends
348        writer.write("foo\r")
349        self.assertEqual(reader.readline(keepends=True), "foo\r")
350        writer.write("\nbar\r")
351        self.assertEqual(reader.readline(keepends=True), "\n")
352        self.assertEqual(reader.readline(keepends=True), "bar\r")
353        writer.write("baz")
354        self.assertEqual(reader.readline(keepends=True), "baz")
355        self.assertEqual(reader.readline(keepends=True), "")
356        writer.write("foo\r\n")
357        self.assertEqual(reader.readline(keepends=True), "foo\r\n")
358
359    def test_bug1098990_a(self):
360        s1 = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy\r\n"
361        s2 = "offending line: ladfj askldfj klasdj fskla dfzaskdj fasklfj laskd fjasklfzzzzaa%whereisthis!!!\r\n"
362        s3 = "next line.\r\n"
363
364        s = (s1+s2+s3).encode(self.encoding)
365        stream = io.BytesIO(s)
366        reader = codecs.getreader(self.encoding)(stream)
367        self.assertEqual(reader.readline(), s1)
368        self.assertEqual(reader.readline(), s2)
369        self.assertEqual(reader.readline(), s3)
370        self.assertEqual(reader.readline(), "")
371
372    def test_bug1098990_b(self):
373        s1 = "aaaaaaaaaaaaaaaaaaaaaaaa\r\n"
374        s2 = "bbbbbbbbbbbbbbbbbbbbbbbb\r\n"
375        s3 = "stillokay:bbbbxx\r\n"
376        s4 = "broken!!!!badbad\r\n"
377        s5 = "againokay.\r\n"
378
379        s = (s1+s2+s3+s4+s5).encode(self.encoding)
380        stream = io.BytesIO(s)
381        reader = codecs.getreader(self.encoding)(stream)
382        self.assertEqual(reader.readline(), s1)
383        self.assertEqual(reader.readline(), s2)
384        self.assertEqual(reader.readline(), s3)
385        self.assertEqual(reader.readline(), s4)
386        self.assertEqual(reader.readline(), s5)
387        self.assertEqual(reader.readline(), "")
388
389    ill_formed_sequence_replace = "\ufffd"
390
391    def test_lone_surrogates(self):
392        self.assertRaises(UnicodeEncodeError, "\ud800".encode, self.encoding)
393        self.assertEqual("[\uDC80]".encode(self.encoding, "backslashreplace"),
394                         "[\\udc80]".encode(self.encoding))
395        self.assertEqual("[\uDC80]".encode(self.encoding, "namereplace"),
396                         "[\\udc80]".encode(self.encoding))
397        self.assertEqual("[\uDC80]".encode(self.encoding, "xmlcharrefreplace"),
398                         "[&#56448;]".encode(self.encoding))
399        self.assertEqual("[\uDC80]".encode(self.encoding, "ignore"),
400                         "[]".encode(self.encoding))
401        self.assertEqual("[\uDC80]".encode(self.encoding, "replace"),
402                         "[?]".encode(self.encoding))
403
404        # sequential surrogate characters
405        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "ignore"),
406                         "[]".encode(self.encoding))
407        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "replace"),
408                         "[??]".encode(self.encoding))
409
410        bom = "".encode(self.encoding)
411        for before, after in [("\U00010fff", "A"), ("[", "]"),
412                              ("A", "\U00010fff")]:
413            before_sequence = before.encode(self.encoding)[len(bom):]
414            after_sequence = after.encode(self.encoding)[len(bom):]
415            test_string = before + "\uDC80" + after
416            test_sequence = (bom + before_sequence +
417                             self.ill_formed_sequence + after_sequence)
418            self.assertRaises(UnicodeDecodeError, test_sequence.decode,
419                              self.encoding)
420            self.assertEqual(test_string.encode(self.encoding,
421                                                "surrogatepass"),
422                             test_sequence)
423            self.assertEqual(test_sequence.decode(self.encoding,
424                                                  "surrogatepass"),
425                             test_string)
426            self.assertEqual(test_sequence.decode(self.encoding, "ignore"),
427                             before + after)
428            self.assertEqual(test_sequence.decode(self.encoding, "replace"),
429                             before + self.ill_formed_sequence_replace + after)
430            backslashreplace = ''.join('\\x%02x' % b
431                                       for b in self.ill_formed_sequence)
432            self.assertEqual(test_sequence.decode(self.encoding, "backslashreplace"),
433                             before + backslashreplace + after)
434
435    def test_incremental_surrogatepass(self):
436        # Test incremental decoder for surrogatepass handler:
437        # see issue #24214
438        # High surrogate
439        data = '\uD901'.encode(self.encoding, 'surrogatepass')
440        for i in range(1, len(data)):
441            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
442            self.assertEqual(dec.decode(data[:i]), '')
443            self.assertEqual(dec.decode(data[i:], True), '\uD901')
444        # Low surrogate
445        data = '\uDC02'.encode(self.encoding, 'surrogatepass')
446        for i in range(1, len(data)):
447            dec = codecs.getincrementaldecoder(self.encoding)('surrogatepass')
448            self.assertEqual(dec.decode(data[:i]), '')
449            self.assertEqual(dec.decode(data[i:]), '\uDC02')
450
451
452class UTF32Test(ReadTest, unittest.TestCase):
453    encoding = "utf-32"
454    if sys.byteorder == 'little':
455        ill_formed_sequence = b"\x80\xdc\x00\x00"
456    else:
457        ill_formed_sequence = b"\x00\x00\xdc\x80"
458
459    spamle = (b'\xff\xfe\x00\x00'
460              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00'
461              b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00')
462    spambe = (b'\x00\x00\xfe\xff'
463              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m'
464              b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m')
465
466    def test_only_one_bom(self):
467        _,_,reader,writer = codecs.lookup(self.encoding)
468        # encode some stream
469        s = io.BytesIO()
470        f = writer(s)
471        f.write("spam")
472        f.write("spam")
473        d = s.getvalue()
474        # check whether there is exactly one BOM in it
475        self.assertTrue(d == self.spamle or d == self.spambe)
476        # try to read it back
477        s = io.BytesIO(d)
478        f = reader(s)
479        self.assertEqual(f.read(), "spamspam")
480
481    def test_badbom(self):
482        s = io.BytesIO(4*b"\xff")
483        f = codecs.getreader(self.encoding)(s)
484        self.assertRaises(UnicodeError, f.read)
485
486        s = io.BytesIO(8*b"\xff")
487        f = codecs.getreader(self.encoding)(s)
488        self.assertRaises(UnicodeError, f.read)
489
490    def test_partial(self):
491        self.check_partial(
492            "\x00\xff\u0100\uffff\U00010000",
493            [
494                "", # first byte of BOM read
495                "", # second byte of BOM read
496                "", # third byte of BOM read
497                "", # fourth byte of BOM read => byteorder known
498                "",
499                "",
500                "",
501                "\x00",
502                "\x00",
503                "\x00",
504                "\x00",
505                "\x00\xff",
506                "\x00\xff",
507                "\x00\xff",
508                "\x00\xff",
509                "\x00\xff\u0100",
510                "\x00\xff\u0100",
511                "\x00\xff\u0100",
512                "\x00\xff\u0100",
513                "\x00\xff\u0100\uffff",
514                "\x00\xff\u0100\uffff",
515                "\x00\xff\u0100\uffff",
516                "\x00\xff\u0100\uffff",
517                "\x00\xff\u0100\uffff\U00010000",
518            ]
519        )
520
521    def test_handlers(self):
522        self.assertEqual(('\ufffd', 1),
523                         codecs.utf_32_decode(b'\x01', 'replace', True))
524        self.assertEqual(('', 1),
525                         codecs.utf_32_decode(b'\x01', 'ignore', True))
526
527    def test_errors(self):
528        self.assertRaises(UnicodeDecodeError, codecs.utf_32_decode,
529                          b"\xff", "strict", True)
530
531    def test_decoder_state(self):
532        self.check_state_handling_decode(self.encoding,
533                                         "spamspam", self.spamle)
534        self.check_state_handling_decode(self.encoding,
535                                         "spamspam", self.spambe)
536
537    def test_issue8941(self):
538        # Issue #8941: insufficient result allocation when decoding into
539        # surrogate pairs on UCS-2 builds.
540        encoded_le = b'\xff\xfe\x00\x00' + b'\x00\x00\x01\x00' * 1024
541        self.assertEqual('\U00010000' * 1024,
542                         codecs.utf_32_decode(encoded_le)[0])
543        encoded_be = b'\x00\x00\xfe\xff' + b'\x00\x01\x00\x00' * 1024
544        self.assertEqual('\U00010000' * 1024,
545                         codecs.utf_32_decode(encoded_be)[0])
546
547
548class UTF32LETest(ReadTest, unittest.TestCase):
549    encoding = "utf-32-le"
550    ill_formed_sequence = b"\x80\xdc\x00\x00"
551
552    def test_partial(self):
553        self.check_partial(
554            "\x00\xff\u0100\uffff\U00010000",
555            [
556                "",
557                "",
558                "",
559                "\x00",
560                "\x00",
561                "\x00",
562                "\x00",
563                "\x00\xff",
564                "\x00\xff",
565                "\x00\xff",
566                "\x00\xff",
567                "\x00\xff\u0100",
568                "\x00\xff\u0100",
569                "\x00\xff\u0100",
570                "\x00\xff\u0100",
571                "\x00\xff\u0100\uffff",
572                "\x00\xff\u0100\uffff",
573                "\x00\xff\u0100\uffff",
574                "\x00\xff\u0100\uffff",
575                "\x00\xff\u0100\uffff\U00010000",
576            ]
577        )
578
579    def test_simple(self):
580        self.assertEqual("\U00010203".encode(self.encoding), b"\x03\x02\x01\x00")
581
582    def test_errors(self):
583        self.assertRaises(UnicodeDecodeError, codecs.utf_32_le_decode,
584                          b"\xff", "strict", True)
585
586    def test_issue8941(self):
587        # Issue #8941: insufficient result allocation when decoding into
588        # surrogate pairs on UCS-2 builds.
589        encoded = b'\x00\x00\x01\x00' * 1024
590        self.assertEqual('\U00010000' * 1024,
591                         codecs.utf_32_le_decode(encoded)[0])
592
593
594class UTF32BETest(ReadTest, unittest.TestCase):
595    encoding = "utf-32-be"
596    ill_formed_sequence = b"\x00\x00\xdc\x80"
597
598    def test_partial(self):
599        self.check_partial(
600            "\x00\xff\u0100\uffff\U00010000",
601            [
602                "",
603                "",
604                "",
605                "\x00",
606                "\x00",
607                "\x00",
608                "\x00",
609                "\x00\xff",
610                "\x00\xff",
611                "\x00\xff",
612                "\x00\xff",
613                "\x00\xff\u0100",
614                "\x00\xff\u0100",
615                "\x00\xff\u0100",
616                "\x00\xff\u0100",
617                "\x00\xff\u0100\uffff",
618                "\x00\xff\u0100\uffff",
619                "\x00\xff\u0100\uffff",
620                "\x00\xff\u0100\uffff",
621                "\x00\xff\u0100\uffff\U00010000",
622            ]
623        )
624
625    def test_simple(self):
626        self.assertEqual("\U00010203".encode(self.encoding), b"\x00\x01\x02\x03")
627
628    def test_errors(self):
629        self.assertRaises(UnicodeDecodeError, codecs.utf_32_be_decode,
630                          b"\xff", "strict", True)
631
632    def test_issue8941(self):
633        # Issue #8941: insufficient result allocation when decoding into
634        # surrogate pairs on UCS-2 builds.
635        encoded = b'\x00\x01\x00\x00' * 1024
636        self.assertEqual('\U00010000' * 1024,
637                         codecs.utf_32_be_decode(encoded)[0])
638
639
640class UTF16Test(ReadTest, unittest.TestCase):
641    encoding = "utf-16"
642    if sys.byteorder == 'little':
643        ill_formed_sequence = b"\x80\xdc"
644    else:
645        ill_formed_sequence = b"\xdc\x80"
646
647    spamle = b'\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00'
648    spambe = b'\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m'
649
650    def test_only_one_bom(self):
651        _,_,reader,writer = codecs.lookup(self.encoding)
652        # encode some stream
653        s = io.BytesIO()
654        f = writer(s)
655        f.write("spam")
656        f.write("spam")
657        d = s.getvalue()
658        # check whether there is exactly one BOM in it
659        self.assertTrue(d == self.spamle or d == self.spambe)
660        # try to read it back
661        s = io.BytesIO(d)
662        f = reader(s)
663        self.assertEqual(f.read(), "spamspam")
664
665    def test_badbom(self):
666        s = io.BytesIO(b"\xff\xff")
667        f = codecs.getreader(self.encoding)(s)
668        self.assertRaises(UnicodeError, f.read)
669
670        s = io.BytesIO(b"\xff\xff\xff\xff")
671        f = codecs.getreader(self.encoding)(s)
672        self.assertRaises(UnicodeError, f.read)
673
674    def test_partial(self):
675        self.check_partial(
676            "\x00\xff\u0100\uffff\U00010000",
677            [
678                "", # first byte of BOM read
679                "", # second byte of BOM read => byteorder known
680                "",
681                "\x00",
682                "\x00",
683                "\x00\xff",
684                "\x00\xff",
685                "\x00\xff\u0100",
686                "\x00\xff\u0100",
687                "\x00\xff\u0100\uffff",
688                "\x00\xff\u0100\uffff",
689                "\x00\xff\u0100\uffff",
690                "\x00\xff\u0100\uffff",
691                "\x00\xff\u0100\uffff\U00010000",
692            ]
693        )
694
695    def test_handlers(self):
696        self.assertEqual(('\ufffd', 1),
697                         codecs.utf_16_decode(b'\x01', 'replace', True))
698        self.assertEqual(('', 1),
699                         codecs.utf_16_decode(b'\x01', 'ignore', True))
700
701    def test_errors(self):
702        self.assertRaises(UnicodeDecodeError, codecs.utf_16_decode,
703                          b"\xff", "strict", True)
704
705    def test_decoder_state(self):
706        self.check_state_handling_decode(self.encoding,
707                                         "spamspam", self.spamle)
708        self.check_state_handling_decode(self.encoding,
709                                         "spamspam", self.spambe)
710
711    def test_bug691291(self):
712        # If encoding is not None, then
713        # files are always opened in binary mode, even if no binary mode was
714        # specified.  This means that no automatic conversion of '\n' is done
715        # on reading and writing.
716        s1 = 'Hello\r\nworld\r\n'
717
718        s = s1.encode(self.encoding)
719        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
720        with open(os_helper.TESTFN, 'wb') as fp:
721            fp.write(s)
722        with codecs.open(os_helper.TESTFN, 'r',
723                         encoding=self.encoding) as reader:
724            self.assertEqual(reader.read(), s1)
725
726    def test_invalid_modes(self):
727        for mode in ('U', 'rU', 'r+U'):
728            with self.assertRaises(ValueError) as cm:
729                codecs.open(os_helper.TESTFN, mode, encoding=self.encoding)
730            self.assertIn('invalid mode', str(cm.exception))
731
732        for mode in ('rt', 'wt', 'at', 'r+t'):
733            with self.assertRaises(ValueError) as cm:
734                codecs.open(os_helper.TESTFN, mode, encoding=self.encoding)
735            self.assertIn("can't have text and binary mode at once",
736                          str(cm.exception))
737
738
739class UTF16LETest(ReadTest, unittest.TestCase):
740    encoding = "utf-16-le"
741    ill_formed_sequence = b"\x80\xdc"
742
743    def test_partial(self):
744        self.check_partial(
745            "\x00\xff\u0100\uffff\U00010000",
746            [
747                "",
748                "\x00",
749                "\x00",
750                "\x00\xff",
751                "\x00\xff",
752                "\x00\xff\u0100",
753                "\x00\xff\u0100",
754                "\x00\xff\u0100\uffff",
755                "\x00\xff\u0100\uffff",
756                "\x00\xff\u0100\uffff",
757                "\x00\xff\u0100\uffff",
758                "\x00\xff\u0100\uffff\U00010000",
759            ]
760        )
761
762    def test_errors(self):
763        tests = [
764            (b'\xff', '\ufffd'),
765            (b'A\x00Z', 'A\ufffd'),
766            (b'A\x00B\x00C\x00D\x00Z', 'ABCD\ufffd'),
767            (b'\x00\xd8', '\ufffd'),
768            (b'\x00\xd8A', '\ufffd'),
769            (b'\x00\xd8A\x00', '\ufffdA'),
770            (b'\x00\xdcA\x00', '\ufffdA'),
771        ]
772        for raw, expected in tests:
773            self.assertRaises(UnicodeDecodeError, codecs.utf_16_le_decode,
774                              raw, 'strict', True)
775            self.assertEqual(raw.decode('utf-16le', 'replace'), expected)
776
777    def test_nonbmp(self):
778        self.assertEqual("\U00010203".encode(self.encoding),
779                         b'\x00\xd8\x03\xde')
780        self.assertEqual(b'\x00\xd8\x03\xde'.decode(self.encoding),
781                         "\U00010203")
782
783class UTF16BETest(ReadTest, unittest.TestCase):
784    encoding = "utf-16-be"
785    ill_formed_sequence = b"\xdc\x80"
786
787    def test_partial(self):
788        self.check_partial(
789            "\x00\xff\u0100\uffff\U00010000",
790            [
791                "",
792                "\x00",
793                "\x00",
794                "\x00\xff",
795                "\x00\xff",
796                "\x00\xff\u0100",
797                "\x00\xff\u0100",
798                "\x00\xff\u0100\uffff",
799                "\x00\xff\u0100\uffff",
800                "\x00\xff\u0100\uffff",
801                "\x00\xff\u0100\uffff",
802                "\x00\xff\u0100\uffff\U00010000",
803            ]
804        )
805
806    def test_errors(self):
807        tests = [
808            (b'\xff', '\ufffd'),
809            (b'\x00A\xff', 'A\ufffd'),
810            (b'\x00A\x00B\x00C\x00DZ', 'ABCD\ufffd'),
811            (b'\xd8\x00', '\ufffd'),
812            (b'\xd8\x00\xdc', '\ufffd'),
813            (b'\xd8\x00\x00A', '\ufffdA'),
814            (b'\xdc\x00\x00A', '\ufffdA'),
815        ]
816        for raw, expected in tests:
817            self.assertRaises(UnicodeDecodeError, codecs.utf_16_be_decode,
818                              raw, 'strict', True)
819            self.assertEqual(raw.decode('utf-16be', 'replace'), expected)
820
821    def test_nonbmp(self):
822        self.assertEqual("\U00010203".encode(self.encoding),
823                         b'\xd8\x00\xde\x03')
824        self.assertEqual(b'\xd8\x00\xde\x03'.decode(self.encoding),
825                         "\U00010203")
826
827class UTF8Test(ReadTest, unittest.TestCase):
828    encoding = "utf-8"
829    ill_formed_sequence = b"\xed\xb2\x80"
830    ill_formed_sequence_replace = "\ufffd" * 3
831    BOM = b''
832
833    def test_partial(self):
834        self.check_partial(
835            "\x00\xff\u07ff\u0800\uffff\U00010000",
836            [
837                "\x00",
838                "\x00",
839                "\x00\xff",
840                "\x00\xff",
841                "\x00\xff\u07ff",
842                "\x00\xff\u07ff",
843                "\x00\xff\u07ff",
844                "\x00\xff\u07ff\u0800",
845                "\x00\xff\u07ff\u0800",
846                "\x00\xff\u07ff\u0800",
847                "\x00\xff\u07ff\u0800\uffff",
848                "\x00\xff\u07ff\u0800\uffff",
849                "\x00\xff\u07ff\u0800\uffff",
850                "\x00\xff\u07ff\u0800\uffff",
851                "\x00\xff\u07ff\u0800\uffff\U00010000",
852            ]
853        )
854
855    def test_decoder_state(self):
856        u = "\x00\x7f\x80\xff\u0100\u07ff\u0800\uffff\U0010ffff"
857        self.check_state_handling_decode(self.encoding,
858                                         u, u.encode(self.encoding))
859
860    def test_decode_error(self):
861        for data, error_handler, expected in (
862            (b'[\x80\xff]', 'ignore', '[]'),
863            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
864            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
865            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
866        ):
867            with self.subTest(data=data, error_handler=error_handler,
868                              expected=expected):
869                self.assertEqual(data.decode(self.encoding, error_handler),
870                                 expected)
871
872    def test_lone_surrogates(self):
873        super().test_lone_surrogates()
874        # not sure if this is making sense for
875        # UTF-16 and UTF-32
876        self.assertEqual("[\uDC80]".encode(self.encoding, "surrogateescape"),
877                         self.BOM + b'[\x80]')
878
879        with self.assertRaises(UnicodeEncodeError) as cm:
880            "[\uDC80\uD800\uDFFF]".encode(self.encoding, "surrogateescape")
881        exc = cm.exception
882        self.assertEqual(exc.object[exc.start:exc.end], '\uD800\uDFFF')
883
884    def test_surrogatepass_handler(self):
885        self.assertEqual("abc\ud800def".encode(self.encoding, "surrogatepass"),
886                         self.BOM + b"abc\xed\xa0\x80def")
887        self.assertEqual("\U00010fff\uD800".encode(self.encoding, "surrogatepass"),
888                         self.BOM + b"\xf0\x90\xbf\xbf\xed\xa0\x80")
889        self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "surrogatepass"),
890                         self.BOM + b'[\xed\xa0\x80\xed\xb2\x80]')
891
892        self.assertEqual(b"abc\xed\xa0\x80def".decode(self.encoding, "surrogatepass"),
893                         "abc\ud800def")
894        self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode(self.encoding, "surrogatepass"),
895                         "\U00010fff\uD800")
896
897        self.assertTrue(codecs.lookup_error("surrogatepass"))
898        with self.assertRaises(UnicodeDecodeError):
899            b"abc\xed\xa0".decode(self.encoding, "surrogatepass")
900        with self.assertRaises(UnicodeDecodeError):
901            b"abc\xed\xa0z".decode(self.encoding, "surrogatepass")
902
903    def test_incremental_errors(self):
904        # Test that the incremental decoder can fail with final=False.
905        # See issue #24214
906        cases = [b'\x80', b'\xBF', b'\xC0', b'\xC1', b'\xF5', b'\xF6', b'\xFF']
907        for prefix in (b'\xC2', b'\xDF', b'\xE0', b'\xE0\xA0', b'\xEF',
908                       b'\xEF\xBF', b'\xF0', b'\xF0\x90', b'\xF0\x90\x80',
909                       b'\xF4', b'\xF4\x8F', b'\xF4\x8F\xBF'):
910            for suffix in b'\x7F', b'\xC0':
911                cases.append(prefix + suffix)
912        cases.extend((b'\xE0\x80', b'\xE0\x9F', b'\xED\xA0\x80',
913                      b'\xED\xBF\xBF', b'\xF0\x80', b'\xF0\x8F', b'\xF4\x90'))
914
915        for data in cases:
916            with self.subTest(data=data):
917                dec = codecs.getincrementaldecoder(self.encoding)()
918                self.assertRaises(UnicodeDecodeError, dec.decode, data)
919
920
921class UTF7Test(ReadTest, unittest.TestCase):
922    encoding = "utf-7"
923
924    def test_ascii(self):
925        # Set D (directly encoded characters)
926        set_d = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
927                 'abcdefghijklmnopqrstuvwxyz'
928                 '0123456789'
929                 '\'(),-./:?')
930        self.assertEqual(set_d.encode(self.encoding), set_d.encode('ascii'))
931        self.assertEqual(set_d.encode('ascii').decode(self.encoding), set_d)
932        # Set O (optional direct characters)
933        set_o = ' !"#$%&*;<=>@[]^_`{|}'
934        self.assertEqual(set_o.encode(self.encoding), set_o.encode('ascii'))
935        self.assertEqual(set_o.encode('ascii').decode(self.encoding), set_o)
936        # +
937        self.assertEqual('a+b'.encode(self.encoding), b'a+-b')
938        self.assertEqual(b'a+-b'.decode(self.encoding), 'a+b')
939        # White spaces
940        ws = ' \t\n\r'
941        self.assertEqual(ws.encode(self.encoding), ws.encode('ascii'))
942        self.assertEqual(ws.encode('ascii').decode(self.encoding), ws)
943        # Other ASCII characters
944        other_ascii = ''.join(sorted(set(bytes(range(0x80)).decode()) -
945                                     set(set_d + set_o + '+' + ws)))
946        self.assertEqual(other_ascii.encode(self.encoding),
947                         b'+AAAAAQACAAMABAAFAAYABwAIAAsADAAOAA8AEAARABIAEwAU'
948                         b'ABUAFgAXABgAGQAaABsAHAAdAB4AHwBcAH4Afw-')
949
950    def test_partial(self):
951        self.check_partial(
952            'a+-b\x00c\x80d\u0100e\U00010000f',
953            [
954                'a',
955                'a',
956                'a+',
957                'a+-',
958                'a+-b',
959                'a+-b',
960                'a+-b',
961                'a+-b',
962                'a+-b',
963                'a+-b\x00',
964                'a+-b\x00c',
965                'a+-b\x00c',
966                'a+-b\x00c',
967                'a+-b\x00c',
968                'a+-b\x00c',
969                'a+-b\x00c\x80',
970                'a+-b\x00c\x80d',
971                'a+-b\x00c\x80d',
972                'a+-b\x00c\x80d',
973                'a+-b\x00c\x80d',
974                'a+-b\x00c\x80d',
975                'a+-b\x00c\x80d\u0100',
976                'a+-b\x00c\x80d\u0100e',
977                'a+-b\x00c\x80d\u0100e',
978                'a+-b\x00c\x80d\u0100e',
979                'a+-b\x00c\x80d\u0100e',
980                'a+-b\x00c\x80d\u0100e',
981                'a+-b\x00c\x80d\u0100e',
982                'a+-b\x00c\x80d\u0100e',
983                'a+-b\x00c\x80d\u0100e',
984                'a+-b\x00c\x80d\u0100e\U00010000',
985                'a+-b\x00c\x80d\u0100e\U00010000f',
986            ]
987        )
988
989    def test_errors(self):
990        tests = [
991            (b'\xffb', '\ufffdb'),
992            (b'a\xffb', 'a\ufffdb'),
993            (b'a\xff\xffb', 'a\ufffd\ufffdb'),
994            (b'a+IK', 'a\ufffd'),
995            (b'a+IK-b', 'a\ufffdb'),
996            (b'a+IK,b', 'a\ufffdb'),
997            (b'a+IKx', 'a\u20ac\ufffd'),
998            (b'a+IKx-b', 'a\u20ac\ufffdb'),
999            (b'a+IKwgr', 'a\u20ac\ufffd'),
1000            (b'a+IKwgr-b', 'a\u20ac\ufffdb'),
1001            (b'a+IKwgr,', 'a\u20ac\ufffd'),
1002            (b'a+IKwgr,-b', 'a\u20ac\ufffd-b'),
1003            (b'a+IKwgrB', 'a\u20ac\u20ac\ufffd'),
1004            (b'a+IKwgrB-b', 'a\u20ac\u20ac\ufffdb'),
1005            (b'a+/,+IKw-b', 'a\ufffd\u20acb'),
1006            (b'a+//,+IKw-b', 'a\ufffd\u20acb'),
1007            (b'a+///,+IKw-b', 'a\uffff\ufffd\u20acb'),
1008            (b'a+////,+IKw-b', 'a\uffff\ufffd\u20acb'),
1009            (b'a+IKw-b\xff', 'a\u20acb\ufffd'),
1010            (b'a+IKw\xffb', 'a\u20ac\ufffdb'),
1011            (b'a+@b', 'a\ufffdb'),
1012        ]
1013        for raw, expected in tests:
1014            with self.subTest(raw=raw):
1015                self.assertRaises(UnicodeDecodeError, codecs.utf_7_decode,
1016                                raw, 'strict', True)
1017                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1018
1019    def test_nonbmp(self):
1020        self.assertEqual('\U000104A0'.encode(self.encoding), b'+2AHcoA-')
1021        self.assertEqual('\ud801\udca0'.encode(self.encoding), b'+2AHcoA-')
1022        self.assertEqual(b'+2AHcoA-'.decode(self.encoding), '\U000104A0')
1023        self.assertEqual(b'+2AHcoA'.decode(self.encoding), '\U000104A0')
1024        self.assertEqual('\u20ac\U000104A0'.encode(self.encoding), b'+IKzYAdyg-')
1025        self.assertEqual(b'+IKzYAdyg-'.decode(self.encoding), '\u20ac\U000104A0')
1026        self.assertEqual(b'+IKzYAdyg'.decode(self.encoding), '\u20ac\U000104A0')
1027        self.assertEqual('\u20ac\u20ac\U000104A0'.encode(self.encoding),
1028                         b'+IKwgrNgB3KA-')
1029        self.assertEqual(b'+IKwgrNgB3KA-'.decode(self.encoding),
1030                         '\u20ac\u20ac\U000104A0')
1031        self.assertEqual(b'+IKwgrNgB3KA'.decode(self.encoding),
1032                         '\u20ac\u20ac\U000104A0')
1033
1034    def test_lone_surrogates(self):
1035        tests = [
1036            (b'a+2AE-b', 'a\ud801b'),
1037            (b'a+2AE\xffb', 'a\ufffdb'),
1038            (b'a+2AE', 'a\ufffd'),
1039            (b'a+2AEA-b', 'a\ufffdb'),
1040            (b'a+2AH-b', 'a\ufffdb'),
1041            (b'a+IKzYAQ-b', 'a\u20ac\ud801b'),
1042            (b'a+IKzYAQ\xffb', 'a\u20ac\ufffdb'),
1043            (b'a+IKzYAQA-b', 'a\u20ac\ufffdb'),
1044            (b'a+IKzYAd-b', 'a\u20ac\ufffdb'),
1045            (b'a+IKwgrNgB-b', 'a\u20ac\u20ac\ud801b'),
1046            (b'a+IKwgrNgB\xffb', 'a\u20ac\u20ac\ufffdb'),
1047            (b'a+IKwgrNgB', 'a\u20ac\u20ac\ufffd'),
1048            (b'a+IKwgrNgBA-b', 'a\u20ac\u20ac\ufffdb'),
1049        ]
1050        for raw, expected in tests:
1051            with self.subTest(raw=raw):
1052                self.assertEqual(raw.decode('utf-7', 'replace'), expected)
1053
1054
1055class UTF16ExTest(unittest.TestCase):
1056
1057    def test_errors(self):
1058        self.assertRaises(UnicodeDecodeError, codecs.utf_16_ex_decode, b"\xff", "strict", 0, True)
1059
1060    def test_bad_args(self):
1061        self.assertRaises(TypeError, codecs.utf_16_ex_decode)
1062
1063class ReadBufferTest(unittest.TestCase):
1064
1065    def test_array(self):
1066        import array
1067        self.assertEqual(
1068            codecs.readbuffer_encode(array.array("b", b"spam")),
1069            (b"spam", 4)
1070        )
1071
1072    def test_empty(self):
1073        self.assertEqual(codecs.readbuffer_encode(""), (b"", 0))
1074
1075    def test_bad_args(self):
1076        self.assertRaises(TypeError, codecs.readbuffer_encode)
1077        self.assertRaises(TypeError, codecs.readbuffer_encode, 42)
1078
1079class UTF8SigTest(UTF8Test, unittest.TestCase):
1080    encoding = "utf-8-sig"
1081    BOM = codecs.BOM_UTF8
1082
1083    def test_partial(self):
1084        self.check_partial(
1085            "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1086            [
1087                "",
1088                "",
1089                "", # First BOM has been read and skipped
1090                "",
1091                "",
1092                "\ufeff", # Second BOM has been read and emitted
1093                "\ufeff\x00", # "\x00" read and emitted
1094                "\ufeff\x00", # First byte of encoded "\xff" read
1095                "\ufeff\x00\xff", # Second byte of encoded "\xff" read
1096                "\ufeff\x00\xff", # First byte of encoded "\u07ff" read
1097                "\ufeff\x00\xff\u07ff", # Second byte of encoded "\u07ff" read
1098                "\ufeff\x00\xff\u07ff",
1099                "\ufeff\x00\xff\u07ff",
1100                "\ufeff\x00\xff\u07ff\u0800",
1101                "\ufeff\x00\xff\u07ff\u0800",
1102                "\ufeff\x00\xff\u07ff\u0800",
1103                "\ufeff\x00\xff\u07ff\u0800\uffff",
1104                "\ufeff\x00\xff\u07ff\u0800\uffff",
1105                "\ufeff\x00\xff\u07ff\u0800\uffff",
1106                "\ufeff\x00\xff\u07ff\u0800\uffff",
1107                "\ufeff\x00\xff\u07ff\u0800\uffff\U00010000",
1108            ]
1109        )
1110
1111    def test_bug1601501(self):
1112        # SF bug #1601501: check that the codec works with a buffer
1113        self.assertEqual(str(b"\xef\xbb\xbf", "utf-8-sig"), "")
1114
1115    def test_bom(self):
1116        d = codecs.getincrementaldecoder("utf-8-sig")()
1117        s = "spam"
1118        self.assertEqual(d.decode(s.encode("utf-8-sig")), s)
1119
1120    def test_stream_bom(self):
1121        unistring = "ABC\u00A1\u2200XYZ"
1122        bytestring = codecs.BOM_UTF8 + b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1123
1124        reader = codecs.getreader("utf-8-sig")
1125        for sizehint in [None] + list(range(1, 11)) + \
1126                        [64, 128, 256, 512, 1024]:
1127            istream = reader(io.BytesIO(bytestring))
1128            ostream = io.StringIO()
1129            while 1:
1130                if sizehint is not None:
1131                    data = istream.read(sizehint)
1132                else:
1133                    data = istream.read()
1134
1135                if not data:
1136                    break
1137                ostream.write(data)
1138
1139            got = ostream.getvalue()
1140            self.assertEqual(got, unistring)
1141
1142    def test_stream_bare(self):
1143        unistring = "ABC\u00A1\u2200XYZ"
1144        bytestring = b"ABC\xC2\xA1\xE2\x88\x80XYZ"
1145
1146        reader = codecs.getreader("utf-8-sig")
1147        for sizehint in [None] + list(range(1, 11)) + \
1148                        [64, 128, 256, 512, 1024]:
1149            istream = reader(io.BytesIO(bytestring))
1150            ostream = io.StringIO()
1151            while 1:
1152                if sizehint is not None:
1153                    data = istream.read(sizehint)
1154                else:
1155                    data = istream.read()
1156
1157                if not data:
1158                    break
1159                ostream.write(data)
1160
1161            got = ostream.getvalue()
1162            self.assertEqual(got, unistring)
1163
1164
1165class EscapeDecodeTest(unittest.TestCase):
1166    def test_empty(self):
1167        self.assertEqual(codecs.escape_decode(b""), (b"", 0))
1168        self.assertEqual(codecs.escape_decode(bytearray()), (b"", 0))
1169
1170    def test_raw(self):
1171        decode = codecs.escape_decode
1172        for b in range(256):
1173            b = bytes([b])
1174            if b != b'\\':
1175                self.assertEqual(decode(b + b'0'), (b + b'0', 2))
1176
1177    def test_escape(self):
1178        decode = codecs.escape_decode
1179        check = coding_checker(self, decode)
1180        check(b"[\\\n]", b"[]")
1181        check(br'[\"]', b'["]')
1182        check(br"[\']", b"[']")
1183        check(br"[\\]", b"[\\]")
1184        check(br"[\a]", b"[\x07]")
1185        check(br"[\b]", b"[\x08]")
1186        check(br"[\t]", b"[\x09]")
1187        check(br"[\n]", b"[\x0a]")
1188        check(br"[\v]", b"[\x0b]")
1189        check(br"[\f]", b"[\x0c]")
1190        check(br"[\r]", b"[\x0d]")
1191        check(br"[\7]", b"[\x07]")
1192        check(br"[\78]", b"[\x078]")
1193        check(br"[\41]", b"[!]")
1194        check(br"[\418]", b"[!8]")
1195        check(br"[\101]", b"[A]")
1196        check(br"[\1010]", b"[A0]")
1197        check(br"[\x41]", b"[A]")
1198        check(br"[\x410]", b"[A0]")
1199
1200    def test_warnings(self):
1201        decode = codecs.escape_decode
1202        check = coding_checker(self, decode)
1203        for i in range(97, 123):
1204            b = bytes([i])
1205            if b not in b'abfnrtvx':
1206                with self.assertWarnsRegex(DeprecationWarning,
1207                        r"invalid escape sequence '\\%c'" % i):
1208                    check(b"\\" + b, b"\\" + b)
1209            with self.assertWarnsRegex(DeprecationWarning,
1210                    r"invalid escape sequence '\\%c'" % (i-32)):
1211                check(b"\\" + b.upper(), b"\\" + b.upper())
1212        with self.assertWarnsRegex(DeprecationWarning,
1213                r"invalid escape sequence '\\8'"):
1214            check(br"\8", b"\\8")
1215        with self.assertWarns(DeprecationWarning):
1216            check(br"\9", b"\\9")
1217        with self.assertWarnsRegex(DeprecationWarning,
1218                r"invalid escape sequence '\\\xfa'") as cm:
1219            check(b"\\\xfa", b"\\\xfa")
1220        for i in range(0o400, 0o1000):
1221            with self.assertWarnsRegex(DeprecationWarning,
1222                    r"invalid octal escape sequence '\\%o'" % i):
1223                check(rb'\%o' % i, bytes([i & 0o377]))
1224
1225        with self.assertWarnsRegex(DeprecationWarning,
1226                r"invalid escape sequence '\\z'"):
1227            self.assertEqual(decode(br'\x\z', 'ignore'), (b'\\z', 4))
1228        with self.assertWarnsRegex(DeprecationWarning,
1229                r"invalid octal escape sequence '\\501'"):
1230            self.assertEqual(decode(br'\x\501', 'ignore'), (b'A', 6))
1231
1232    def test_errors(self):
1233        decode = codecs.escape_decode
1234        self.assertRaises(ValueError, decode, br"\x")
1235        self.assertRaises(ValueError, decode, br"[\x]")
1236        self.assertEqual(decode(br"[\x]\x", "ignore"), (b"[]", 6))
1237        self.assertEqual(decode(br"[\x]\x", "replace"), (b"[?]?", 6))
1238        self.assertRaises(ValueError, decode, br"\x0")
1239        self.assertRaises(ValueError, decode, br"[\x0]")
1240        self.assertEqual(decode(br"[\x0]\x0", "ignore"), (b"[]", 8))
1241        self.assertEqual(decode(br"[\x0]\x0", "replace"), (b"[?]?", 8))
1242
1243
1244# From RFC 3492
1245punycode_testcases = [
1246    # A Arabic (Egyptian):
1247    ("\u0644\u064A\u0647\u0645\u0627\u0628\u062A\u0643\u0644"
1248     "\u0645\u0648\u0634\u0639\u0631\u0628\u064A\u061F",
1249     b"egbpdaj6bu4bxfgehfvwxn"),
1250    # B Chinese (simplified):
1251    ("\u4ED6\u4EEC\u4E3A\u4EC0\u4E48\u4E0D\u8BF4\u4E2D\u6587",
1252     b"ihqwcrb4cv8a8dqg056pqjye"),
1253    # C Chinese (traditional):
1254    ("\u4ED6\u5011\u7232\u4EC0\u9EBD\u4E0D\u8AAA\u4E2D\u6587",
1255     b"ihqwctvzc91f659drss3x8bo0yb"),
1256    # D Czech: Pro<ccaron>prost<ecaron>nemluv<iacute><ccaron>esky
1257    ("\u0050\u0072\u006F\u010D\u0070\u0072\u006F\u0073\u0074"
1258     "\u011B\u006E\u0065\u006D\u006C\u0075\u0076\u00ED\u010D"
1259     "\u0065\u0073\u006B\u0079",
1260     b"Proprostnemluvesky-uyb24dma41a"),
1261    # E Hebrew:
1262    ("\u05DC\u05DE\u05D4\u05D4\u05DD\u05E4\u05E9\u05D5\u05D8"
1263     "\u05DC\u05D0\u05DE\u05D3\u05D1\u05E8\u05D9\u05DD\u05E2"
1264     "\u05D1\u05E8\u05D9\u05EA",
1265     b"4dbcagdahymbxekheh6e0a7fei0b"),
1266    # F Hindi (Devanagari):
1267    ("\u092F\u0939\u0932\u094B\u0917\u0939\u093F\u0928\u094D"
1268     "\u0926\u0940\u0915\u094D\u092F\u094B\u0902\u0928\u0939"
1269     "\u0940\u0902\u092C\u094B\u0932\u0938\u0915\u0924\u0947"
1270     "\u0939\u0948\u0902",
1271     b"i1baa7eci9glrd9b2ae1bj0hfcgg6iyaf8o0a1dig0cd"),
1272
1273    #(G) Japanese (kanji and hiragana):
1274    ("\u306A\u305C\u307F\u3093\u306A\u65E5\u672C\u8A9E\u3092"
1275     "\u8A71\u3057\u3066\u304F\u308C\u306A\u3044\u306E\u304B",
1276     b"n8jok5ay5dzabd5bym9f0cm5685rrjetr6pdxa"),
1277
1278    # (H) Korean (Hangul syllables):
1279    ("\uC138\uACC4\uC758\uBAA8\uB4E0\uC0AC\uB78C\uB4E4\uC774"
1280     "\uD55C\uAD6D\uC5B4\uB97C\uC774\uD574\uD55C\uB2E4\uBA74"
1281     "\uC5BC\uB9C8\uB098\uC88B\uC744\uAE4C",
1282     b"989aomsvi5e83db1d2a355cv1e0vak1dwrv93d5xbh15a0dt30a5j"
1283     b"psd879ccm6fea98c"),
1284
1285    # (I) Russian (Cyrillic):
1286    ("\u043F\u043E\u0447\u0435\u043C\u0443\u0436\u0435\u043E"
1287     "\u043D\u0438\u043D\u0435\u0433\u043E\u0432\u043E\u0440"
1288     "\u044F\u0442\u043F\u043E\u0440\u0443\u0441\u0441\u043A"
1289     "\u0438",
1290     b"b1abfaaepdrnnbgefbaDotcwatmq2g4l"),
1291
1292    # (J) Spanish: Porqu<eacute>nopuedensimplementehablarenEspa<ntilde>ol
1293    ("\u0050\u006F\u0072\u0071\u0075\u00E9\u006E\u006F\u0070"
1294     "\u0075\u0065\u0064\u0065\u006E\u0073\u0069\u006D\u0070"
1295     "\u006C\u0065\u006D\u0065\u006E\u0074\u0065\u0068\u0061"
1296     "\u0062\u006C\u0061\u0072\u0065\u006E\u0045\u0073\u0070"
1297     "\u0061\u00F1\u006F\u006C",
1298     b"PorqunopuedensimplementehablarenEspaol-fmd56a"),
1299
1300    # (K) Vietnamese:
1301    #  T<adotbelow>isaoh<odotbelow>kh<ocirc>ngth<ecirchookabove>ch\
1302    #   <ihookabove>n<oacute>iti<ecircacute>ngVi<ecircdotbelow>t
1303    ("\u0054\u1EA1\u0069\u0073\u0061\u006F\u0068\u1ECD\u006B"
1304     "\u0068\u00F4\u006E\u0067\u0074\u0068\u1EC3\u0063\u0068"
1305     "\u1EC9\u006E\u00F3\u0069\u0074\u0069\u1EBF\u006E\u0067"
1306     "\u0056\u0069\u1EC7\u0074",
1307     b"TisaohkhngthchnitingVit-kjcr8268qyxafd2f1b9g"),
1308
1309    #(L) 3<nen>B<gumi><kinpachi><sensei>
1310    ("\u0033\u5E74\u0042\u7D44\u91D1\u516B\u5148\u751F",
1311     b"3B-ww4c5e180e575a65lsy2b"),
1312
1313    # (M) <amuro><namie>-with-SUPER-MONKEYS
1314    ("\u5B89\u5BA4\u5948\u7F8E\u6075\u002D\u0077\u0069\u0074"
1315     "\u0068\u002D\u0053\u0055\u0050\u0045\u0052\u002D\u004D"
1316     "\u004F\u004E\u004B\u0045\u0059\u0053",
1317     b"-with-SUPER-MONKEYS-pc58ag80a8qai00g7n9n"),
1318
1319    # (N) Hello-Another-Way-<sorezore><no><basho>
1320    ("\u0048\u0065\u006C\u006C\u006F\u002D\u0041\u006E\u006F"
1321     "\u0074\u0068\u0065\u0072\u002D\u0057\u0061\u0079\u002D"
1322     "\u305D\u308C\u305E\u308C\u306E\u5834\u6240",
1323     b"Hello-Another-Way--fc4qua05auwb3674vfr0b"),
1324
1325    # (O) <hitotsu><yane><no><shita>2
1326    ("\u3072\u3068\u3064\u5C4B\u6839\u306E\u4E0B\u0032",
1327     b"2-u9tlzr9756bt3uc0v"),
1328
1329    # (P) Maji<de>Koi<suru>5<byou><mae>
1330    ("\u004D\u0061\u006A\u0069\u3067\u004B\u006F\u0069\u3059"
1331     "\u308B\u0035\u79D2\u524D",
1332     b"MajiKoi5-783gue6qz075azm5e"),
1333
1334     # (Q) <pafii>de<runba>
1335    ("\u30D1\u30D5\u30A3\u30FC\u0064\u0065\u30EB\u30F3\u30D0",
1336     b"de-jg4avhby1noc0d"),
1337
1338    # (R) <sono><supiido><de>
1339    ("\u305D\u306E\u30B9\u30D4\u30FC\u30C9\u3067",
1340     b"d9juau41awczczp"),
1341
1342    # (S) -> $1.00 <-
1343    ("\u002D\u003E\u0020\u0024\u0031\u002E\u0030\u0030\u0020"
1344     "\u003C\u002D",
1345     b"-> $1.00 <--")
1346    ]
1347
1348for i in punycode_testcases:
1349    if len(i)!=2:
1350        print(repr(i))
1351
1352
1353class PunycodeTest(unittest.TestCase):
1354    def test_encode(self):
1355        for uni, puny in punycode_testcases:
1356            # Need to convert both strings to lower case, since
1357            # some of the extended encodings use upper case, but our
1358            # code produces only lower case. Converting just puny to
1359            # lower is also insufficient, since some of the input characters
1360            # are upper case.
1361            self.assertEqual(
1362                str(uni.encode("punycode"), "ascii").lower(),
1363                str(puny, "ascii").lower()
1364            )
1365
1366    def test_decode(self):
1367        for uni, puny in punycode_testcases:
1368            self.assertEqual(uni, puny.decode("punycode"))
1369            puny = puny.decode("ascii").encode("ascii")
1370            self.assertEqual(uni, puny.decode("punycode"))
1371
1372    def test_decode_invalid(self):
1373        testcases = [
1374            (b"xn--w&", "strict", UnicodeError()),
1375            (b"xn--w&", "ignore", "xn-"),
1376        ]
1377        for puny, errors, expected in testcases:
1378            with self.subTest(puny=puny, errors=errors):
1379                if isinstance(expected, Exception):
1380                    self.assertRaises(UnicodeError, puny.decode, "punycode", errors)
1381                else:
1382                    self.assertEqual(puny.decode("punycode", errors), expected)
1383
1384
1385# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
1386nameprep_tests = [
1387    # 3.1 Map to nothing.
1388    (b'foo\xc2\xad\xcd\x8f\xe1\xa0\x86\xe1\xa0\x8bbar'
1389     b'\xe2\x80\x8b\xe2\x81\xa0baz\xef\xb8\x80\xef\xb8\x88\xef'
1390     b'\xb8\x8f\xef\xbb\xbf',
1391     b'foobarbaz'),
1392    # 3.2 Case folding ASCII U+0043 U+0041 U+0046 U+0045.
1393    (b'CAFE',
1394     b'cafe'),
1395    # 3.3 Case folding 8bit U+00DF (german sharp s).
1396    # The original test case is bogus; it says \xc3\xdf
1397    (b'\xc3\x9f',
1398     b'ss'),
1399    # 3.4 Case folding U+0130 (turkish capital I with dot).
1400    (b'\xc4\xb0',
1401     b'i\xcc\x87'),
1402    # 3.5 Case folding multibyte U+0143 U+037A.
1403    (b'\xc5\x83\xcd\xba',
1404     b'\xc5\x84 \xce\xb9'),
1405    # 3.6 Case folding U+2121 U+33C6 U+1D7BB.
1406    # XXX: skip this as it fails in UCS-2 mode
1407    #('\xe2\x84\xa1\xe3\x8f\x86\xf0\x9d\x9e\xbb',
1408    # 'telc\xe2\x88\x95kg\xcf\x83'),
1409    (None, None),
1410    # 3.7 Normalization of U+006a U+030c U+00A0 U+00AA.
1411    (b'j\xcc\x8c\xc2\xa0\xc2\xaa',
1412     b'\xc7\xb0 a'),
1413    # 3.8 Case folding U+1FB7 and normalization.
1414    (b'\xe1\xbe\xb7',
1415     b'\xe1\xbe\xb6\xce\xb9'),
1416    # 3.9 Self-reverting case folding U+01F0 and normalization.
1417    # The original test case is bogus, it says `\xc7\xf0'
1418    (b'\xc7\xb0',
1419     b'\xc7\xb0'),
1420    # 3.10 Self-reverting case folding U+0390 and normalization.
1421    (b'\xce\x90',
1422     b'\xce\x90'),
1423    # 3.11 Self-reverting case folding U+03B0 and normalization.
1424    (b'\xce\xb0',
1425     b'\xce\xb0'),
1426    # 3.12 Self-reverting case folding U+1E96 and normalization.
1427    (b'\xe1\xba\x96',
1428     b'\xe1\xba\x96'),
1429    # 3.13 Self-reverting case folding U+1F56 and normalization.
1430    (b'\xe1\xbd\x96',
1431     b'\xe1\xbd\x96'),
1432    # 3.14 ASCII space character U+0020.
1433    (b' ',
1434     b' '),
1435    # 3.15 Non-ASCII 8bit space character U+00A0.
1436    (b'\xc2\xa0',
1437     b' '),
1438    # 3.16 Non-ASCII multibyte space character U+1680.
1439    (b'\xe1\x9a\x80',
1440     None),
1441    # 3.17 Non-ASCII multibyte space character U+2000.
1442    (b'\xe2\x80\x80',
1443     b' '),
1444    # 3.18 Zero Width Space U+200b.
1445    (b'\xe2\x80\x8b',
1446     b''),
1447    # 3.19 Non-ASCII multibyte space character U+3000.
1448    (b'\xe3\x80\x80',
1449     b' '),
1450    # 3.20 ASCII control characters U+0010 U+007F.
1451    (b'\x10\x7f',
1452     b'\x10\x7f'),
1453    # 3.21 Non-ASCII 8bit control character U+0085.
1454    (b'\xc2\x85',
1455     None),
1456    # 3.22 Non-ASCII multibyte control character U+180E.
1457    (b'\xe1\xa0\x8e',
1458     None),
1459    # 3.23 Zero Width No-Break Space U+FEFF.
1460    (b'\xef\xbb\xbf',
1461     b''),
1462    # 3.24 Non-ASCII control character U+1D175.
1463    (b'\xf0\x9d\x85\xb5',
1464     None),
1465    # 3.25 Plane 0 private use character U+F123.
1466    (b'\xef\x84\xa3',
1467     None),
1468    # 3.26 Plane 15 private use character U+F1234.
1469    (b'\xf3\xb1\x88\xb4',
1470     None),
1471    # 3.27 Plane 16 private use character U+10F234.
1472    (b'\xf4\x8f\x88\xb4',
1473     None),
1474    # 3.28 Non-character code point U+8FFFE.
1475    (b'\xf2\x8f\xbf\xbe',
1476     None),
1477    # 3.29 Non-character code point U+10FFFF.
1478    (b'\xf4\x8f\xbf\xbf',
1479     None),
1480    # 3.30 Surrogate code U+DF42.
1481    (b'\xed\xbd\x82',
1482     None),
1483    # 3.31 Non-plain text character U+FFFD.
1484    (b'\xef\xbf\xbd',
1485     None),
1486    # 3.32 Ideographic description character U+2FF5.
1487    (b'\xe2\xbf\xb5',
1488     None),
1489    # 3.33 Display property character U+0341.
1490    (b'\xcd\x81',
1491     b'\xcc\x81'),
1492    # 3.34 Left-to-right mark U+200E.
1493    (b'\xe2\x80\x8e',
1494     None),
1495    # 3.35 Deprecated U+202A.
1496    (b'\xe2\x80\xaa',
1497     None),
1498    # 3.36 Language tagging character U+E0001.
1499    (b'\xf3\xa0\x80\x81',
1500     None),
1501    # 3.37 Language tagging character U+E0042.
1502    (b'\xf3\xa0\x81\x82',
1503     None),
1504    # 3.38 Bidi: RandALCat character U+05BE and LCat characters.
1505    (b'foo\xd6\xbebar',
1506     None),
1507    # 3.39 Bidi: RandALCat character U+FD50 and LCat characters.
1508    (b'foo\xef\xb5\x90bar',
1509     None),
1510    # 3.40 Bidi: RandALCat character U+FB38 and LCat characters.
1511    (b'foo\xef\xb9\xb6bar',
1512     b'foo \xd9\x8ebar'),
1513    # 3.41 Bidi: RandALCat without trailing RandALCat U+0627 U+0031.
1514    (b'\xd8\xa71',
1515     None),
1516    # 3.42 Bidi: RandALCat character U+0627 U+0031 U+0628.
1517    (b'\xd8\xa71\xd8\xa8',
1518     b'\xd8\xa71\xd8\xa8'),
1519    # 3.43 Unassigned code point U+E0002.
1520    # Skip this test as we allow unassigned
1521    #(b'\xf3\xa0\x80\x82',
1522    # None),
1523    (None, None),
1524    # 3.44 Larger test (shrinking).
1525    # Original test case reads \xc3\xdf
1526    (b'X\xc2\xad\xc3\x9f\xc4\xb0\xe2\x84\xa1j\xcc\x8c\xc2\xa0\xc2'
1527     b'\xaa\xce\xb0\xe2\x80\x80',
1528     b'xssi\xcc\x87tel\xc7\xb0 a\xce\xb0 '),
1529    # 3.45 Larger test (expanding).
1530    # Original test case reads \xc3\x9f
1531    (b'X\xc3\x9f\xe3\x8c\x96\xc4\xb0\xe2\x84\xa1\xe2\x92\x9f\xe3\x8c'
1532     b'\x80',
1533     b'xss\xe3\x82\xad\xe3\x83\xad\xe3\x83\xa1\xe3\x83\xbc\xe3'
1534     b'\x83\x88\xe3\x83\xabi\xcc\x87tel\x28d\x29\xe3\x82'
1535     b'\xa2\xe3\x83\x91\xe3\x83\xbc\xe3\x83\x88')
1536    ]
1537
1538
1539class NameprepTest(unittest.TestCase):
1540    def test_nameprep(self):
1541        from encodings.idna import nameprep
1542        for pos, (orig, prepped) in enumerate(nameprep_tests):
1543            if orig is None:
1544                # Skipped
1545                continue
1546            # The Unicode strings are given in UTF-8
1547            orig = str(orig, "utf-8", "surrogatepass")
1548            if prepped is None:
1549                # Input contains prohibited characters
1550                self.assertRaises(UnicodeError, nameprep, orig)
1551            else:
1552                prepped = str(prepped, "utf-8", "surrogatepass")
1553                try:
1554                    self.assertEqual(nameprep(orig), prepped)
1555                except Exception as e:
1556                    raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
1557
1558
1559class IDNACodecTest(unittest.TestCase):
1560    def test_builtin_decode(self):
1561        self.assertEqual(str(b"python.org", "idna"), "python.org")
1562        self.assertEqual(str(b"python.org.", "idna"), "python.org.")
1563        self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org")
1564        self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
1565
1566    def test_builtin_encode(self):
1567        self.assertEqual("python.org".encode("idna"), b"python.org")
1568        self.assertEqual("python.org.".encode("idna"), b"python.org.")
1569        self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
1570        self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
1571
1572    def test_builtin_decode_length_limit(self):
1573        with self.assertRaisesRegex(UnicodeError, "too long"):
1574            (b"xn--016c"+b"a"*1100).decode("idna")
1575        with self.assertRaisesRegex(UnicodeError, "too long"):
1576            (b"xn--016c"+b"a"*70).decode("idna")
1577
1578    def test_stream(self):
1579        r = codecs.getreader("idna")(io.BytesIO(b"abc"))
1580        r.read(3)
1581        self.assertEqual(r.read(), "")
1582
1583    def test_incremental_decode(self):
1584        self.assertEqual(
1585            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
1586            "python.org"
1587        )
1588        self.assertEqual(
1589            "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")),
1590            "python.org."
1591        )
1592        self.assertEqual(
1593            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1594            "pyth\xf6n.org."
1595        )
1596        self.assertEqual(
1597            "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")),
1598            "pyth\xf6n.org."
1599        )
1600
1601        decoder = codecs.getincrementaldecoder("idna")()
1602        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1603        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1604        self.assertEqual(decoder.decode(b"rg"), "")
1605        self.assertEqual(decoder.decode(b"", True), "org")
1606
1607        decoder.reset()
1608        self.assertEqual(decoder.decode(b"xn--xam", ), "")
1609        self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.")
1610        self.assertEqual(decoder.decode(b"rg."), "org.")
1611        self.assertEqual(decoder.decode(b"", True), "")
1612
1613    def test_incremental_encode(self):
1614        self.assertEqual(
1615            b"".join(codecs.iterencode("python.org", "idna")),
1616            b"python.org"
1617        )
1618        self.assertEqual(
1619            b"".join(codecs.iterencode("python.org.", "idna")),
1620            b"python.org."
1621        )
1622        self.assertEqual(
1623            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1624            b"xn--pythn-mua.org."
1625        )
1626        self.assertEqual(
1627            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
1628            b"xn--pythn-mua.org."
1629        )
1630
1631        encoder = codecs.getincrementalencoder("idna")()
1632        self.assertEqual(encoder.encode("\xe4x"), b"")
1633        self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.")
1634        self.assertEqual(encoder.encode("", True), b"org")
1635
1636        encoder.reset()
1637        self.assertEqual(encoder.encode("\xe4x"), b"")
1638        self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.")
1639        self.assertEqual(encoder.encode("", True), b"")
1640
1641    def test_errors(self):
1642        """Only supports "strict" error handler"""
1643        "python.org".encode("idna", "strict")
1644        b"python.org".decode("idna", "strict")
1645        for errors in ("ignore", "replace", "backslashreplace",
1646                "surrogateescape"):
1647            self.assertRaises(Exception, "python.org".encode, "idna", errors)
1648            self.assertRaises(Exception,
1649                b"python.org".decode, "idna", errors)
1650
1651
1652class CodecsModuleTest(unittest.TestCase):
1653
1654    def test_decode(self):
1655        self.assertEqual(codecs.decode(b'\xe4\xf6\xfc', 'latin-1'),
1656                         '\xe4\xf6\xfc')
1657        self.assertRaises(TypeError, codecs.decode)
1658        self.assertEqual(codecs.decode(b'abc'), 'abc')
1659        self.assertRaises(UnicodeDecodeError, codecs.decode, b'\xff', 'ascii')
1660
1661        # test keywords
1662        self.assertEqual(codecs.decode(obj=b'\xe4\xf6\xfc', encoding='latin-1'),
1663                         '\xe4\xf6\xfc')
1664        self.assertEqual(codecs.decode(b'[\xff]', 'ascii', errors='ignore'),
1665                         '[]')
1666
1667    def test_encode(self):
1668        self.assertEqual(codecs.encode('\xe4\xf6\xfc', 'latin-1'),
1669                         b'\xe4\xf6\xfc')
1670        self.assertRaises(TypeError, codecs.encode)
1671        self.assertRaises(LookupError, codecs.encode, "foo", "__spam__")
1672        self.assertEqual(codecs.encode('abc'), b'abc')
1673        self.assertRaises(UnicodeEncodeError, codecs.encode, '\xffff', 'ascii')
1674
1675        # test keywords
1676        self.assertEqual(codecs.encode(obj='\xe4\xf6\xfc', encoding='latin-1'),
1677                         b'\xe4\xf6\xfc')
1678        self.assertEqual(codecs.encode('[\xff]', 'ascii', errors='ignore'),
1679                         b'[]')
1680
1681    def test_register(self):
1682        self.assertRaises(TypeError, codecs.register)
1683        self.assertRaises(TypeError, codecs.register, 42)
1684
1685    def test_unregister(self):
1686        name = "nonexistent_codec_name"
1687        search_function = mock.Mock()
1688        codecs.register(search_function)
1689        self.assertRaises(TypeError, codecs.lookup, name)
1690        search_function.assert_called_with(name)
1691        search_function.reset_mock()
1692
1693        codecs.unregister(search_function)
1694        self.assertRaises(LookupError, codecs.lookup, name)
1695        search_function.assert_not_called()
1696
1697    def test_lookup(self):
1698        self.assertRaises(TypeError, codecs.lookup)
1699        self.assertRaises(LookupError, codecs.lookup, "__spam__")
1700        self.assertRaises(LookupError, codecs.lookup, " ")
1701
1702    def test_getencoder(self):
1703        self.assertRaises(TypeError, codecs.getencoder)
1704        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
1705
1706    def test_getdecoder(self):
1707        self.assertRaises(TypeError, codecs.getdecoder)
1708        self.assertRaises(LookupError, codecs.getdecoder, "__spam__")
1709
1710    def test_getreader(self):
1711        self.assertRaises(TypeError, codecs.getreader)
1712        self.assertRaises(LookupError, codecs.getreader, "__spam__")
1713
1714    def test_getwriter(self):
1715        self.assertRaises(TypeError, codecs.getwriter)
1716        self.assertRaises(LookupError, codecs.getwriter, "__spam__")
1717
1718    def test_lookup_issue1813(self):
1719        # Issue #1813: under Turkish locales, lookup of some codecs failed
1720        # because 'I' is lowercased as "ı" (dotless i)
1721        oldlocale = locale.setlocale(locale.LC_CTYPE)
1722        self.addCleanup(locale.setlocale, locale.LC_CTYPE, oldlocale)
1723        try:
1724            locale.setlocale(locale.LC_CTYPE, 'tr_TR')
1725        except locale.Error:
1726            # Unsupported locale on this system
1727            self.skipTest('test needs Turkish locale')
1728        c = codecs.lookup('ASCII')
1729        self.assertEqual(c.name, 'ascii')
1730
1731    def test_all(self):
1732        api = (
1733            "encode", "decode",
1734            "register", "CodecInfo", "Codec", "IncrementalEncoder",
1735            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
1736            "getencoder", "getdecoder", "getincrementalencoder",
1737            "getincrementaldecoder", "getreader", "getwriter",
1738            "register_error", "lookup_error",
1739            "strict_errors", "replace_errors", "ignore_errors",
1740            "xmlcharrefreplace_errors", "backslashreplace_errors",
1741            "namereplace_errors",
1742            "open", "EncodedFile",
1743            "iterencode", "iterdecode",
1744            "BOM", "BOM_BE", "BOM_LE",
1745            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
1746            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
1747            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
1748            "StreamReaderWriter", "StreamRecoder",
1749        )
1750        self.assertCountEqual(api, codecs.__all__)
1751        for api in codecs.__all__:
1752            getattr(codecs, api)
1753
1754    def test_open(self):
1755        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1756        for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'):
1757            with self.subTest(mode), \
1758                    codecs.open(os_helper.TESTFN, mode, 'ascii') as file:
1759                self.assertIsInstance(file, codecs.StreamReaderWriter)
1760
1761    def test_undefined(self):
1762        self.assertRaises(UnicodeError, codecs.encode, 'abc', 'undefined')
1763        self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined')
1764        self.assertRaises(UnicodeError, codecs.encode, '', 'undefined')
1765        self.assertRaises(UnicodeError, codecs.decode, b'', 'undefined')
1766        for errors in ('strict', 'ignore', 'replace', 'backslashreplace'):
1767            self.assertRaises(UnicodeError,
1768                codecs.encode, 'abc', 'undefined', errors)
1769            self.assertRaises(UnicodeError,
1770                codecs.decode, b'abc', 'undefined', errors)
1771
1772    def test_file_closes_if_lookup_error_raised(self):
1773        mock_open = mock.mock_open()
1774        with mock.patch('builtins.open', mock_open) as file:
1775            with self.assertRaises(LookupError):
1776                codecs.open(os_helper.TESTFN, 'wt', 'invalid-encoding')
1777
1778            file().close.assert_called()
1779
1780
1781class StreamReaderTest(unittest.TestCase):
1782
1783    def setUp(self):
1784        self.reader = codecs.getreader('utf-8')
1785        self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1786
1787    def test_readlines(self):
1788        f = self.reader(self.stream)
1789        self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
1790
1791
1792class EncodedFileTest(unittest.TestCase):
1793
1794    def test_basic(self):
1795        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
1796        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
1797        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
1798
1799        f = io.BytesIO()
1800        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
1801        ef.write(b'\xc3\xbc')
1802        self.assertEqual(f.getvalue(), b'\xfc')
1803
1804all_unicode_encodings = [
1805    "ascii",
1806    "big5",
1807    "big5hkscs",
1808    "charmap",
1809    "cp037",
1810    "cp1006",
1811    "cp1026",
1812    "cp1125",
1813    "cp1140",
1814    "cp1250",
1815    "cp1251",
1816    "cp1252",
1817    "cp1253",
1818    "cp1254",
1819    "cp1255",
1820    "cp1256",
1821    "cp1257",
1822    "cp1258",
1823    "cp424",
1824    "cp437",
1825    "cp500",
1826    "cp720",
1827    "cp737",
1828    "cp775",
1829    "cp850",
1830    "cp852",
1831    "cp855",
1832    "cp856",
1833    "cp857",
1834    "cp858",
1835    "cp860",
1836    "cp861",
1837    "cp862",
1838    "cp863",
1839    "cp864",
1840    "cp865",
1841    "cp866",
1842    "cp869",
1843    "cp874",
1844    "cp875",
1845    "cp932",
1846    "cp949",
1847    "cp950",
1848    "euc_jis_2004",
1849    "euc_jisx0213",
1850    "euc_jp",
1851    "euc_kr",
1852    "gb18030",
1853    "gb2312",
1854    "gbk",
1855    "hp_roman8",
1856    "hz",
1857    "idna",
1858    "iso2022_jp",
1859    "iso2022_jp_1",
1860    "iso2022_jp_2",
1861    "iso2022_jp_2004",
1862    "iso2022_jp_3",
1863    "iso2022_jp_ext",
1864    "iso2022_kr",
1865    "iso8859_1",
1866    "iso8859_10",
1867    "iso8859_11",
1868    "iso8859_13",
1869    "iso8859_14",
1870    "iso8859_15",
1871    "iso8859_16",
1872    "iso8859_2",
1873    "iso8859_3",
1874    "iso8859_4",
1875    "iso8859_5",
1876    "iso8859_6",
1877    "iso8859_7",
1878    "iso8859_8",
1879    "iso8859_9",
1880    "johab",
1881    "koi8_r",
1882    "koi8_t",
1883    "koi8_u",
1884    "kz1048",
1885    "latin_1",
1886    "mac_cyrillic",
1887    "mac_greek",
1888    "mac_iceland",
1889    "mac_latin2",
1890    "mac_roman",
1891    "mac_turkish",
1892    "palmos",
1893    "ptcp154",
1894    "punycode",
1895    "raw_unicode_escape",
1896    "shift_jis",
1897    "shift_jis_2004",
1898    "shift_jisx0213",
1899    "tis_620",
1900    "unicode_escape",
1901    "utf_16",
1902    "utf_16_be",
1903    "utf_16_le",
1904    "utf_7",
1905    "utf_8",
1906]
1907
1908if hasattr(codecs, "mbcs_encode"):
1909    all_unicode_encodings.append("mbcs")
1910if hasattr(codecs, "oem_encode"):
1911    all_unicode_encodings.append("oem")
1912
1913# The following encoding is not tested, because it's not supposed
1914# to work:
1915#    "undefined"
1916
1917# The following encodings don't work in stateful mode
1918broken_unicode_with_stateful = [
1919    "punycode",
1920]
1921
1922
1923class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
1924    def test_basics(self):
1925        s = "abc123"  # all codecs should be able to encode these
1926        for encoding in all_unicode_encodings:
1927            name = codecs.lookup(encoding).name
1928            if encoding.endswith("_codec"):
1929                name += "_codec"
1930            elif encoding == "latin_1":
1931                name = "latin_1"
1932            # Skip the mbcs alias on Windows
1933            if name != "mbcs":
1934                self.assertEqual(encoding.replace("_", "-"),
1935                                 name.replace("_", "-"))
1936
1937            (b, size) = codecs.getencoder(encoding)(s)
1938            self.assertEqual(size, len(s), "encoding=%r" % encoding)
1939            (chars, size) = codecs.getdecoder(encoding)(b)
1940            self.assertEqual(chars, s, "encoding=%r" % encoding)
1941
1942            if encoding not in broken_unicode_with_stateful:
1943                # check stream reader/writer
1944                q = Queue(b"")
1945                writer = codecs.getwriter(encoding)(q)
1946                encodedresult = b""
1947                for c in s:
1948                    writer.write(c)
1949                    chunk = q.read()
1950                    self.assertTrue(type(chunk) is bytes, type(chunk))
1951                    encodedresult += chunk
1952                q = Queue(b"")
1953                reader = codecs.getreader(encoding)(q)
1954                decodedresult = ""
1955                for c in encodedresult:
1956                    q.write(bytes([c]))
1957                    decodedresult += reader.read()
1958                self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
1959
1960            if encoding not in broken_unicode_with_stateful:
1961                # check incremental decoder/encoder and iterencode()/iterdecode()
1962                try:
1963                    encoder = codecs.getincrementalencoder(encoding)()
1964                except LookupError:  # no IncrementalEncoder
1965                    pass
1966                else:
1967                    # check incremental decoder/encoder
1968                    encodedresult = b""
1969                    for c in s:
1970                        encodedresult += encoder.encode(c)
1971                    encodedresult += encoder.encode("", True)
1972                    decoder = codecs.getincrementaldecoder(encoding)()
1973                    decodedresult = ""
1974                    for c in encodedresult:
1975                        decodedresult += decoder.decode(bytes([c]))
1976                    decodedresult += decoder.decode(b"", True)
1977                    self.assertEqual(decodedresult, s,
1978                                     "encoding=%r" % encoding)
1979
1980                    # check iterencode()/iterdecode()
1981                    result = "".join(codecs.iterdecode(
1982                            codecs.iterencode(s, encoding), encoding))
1983                    self.assertEqual(result, s, "encoding=%r" % encoding)
1984
1985                    # check iterencode()/iterdecode() with empty string
1986                    result = "".join(codecs.iterdecode(
1987                            codecs.iterencode("", encoding), encoding))
1988                    self.assertEqual(result, "")
1989
1990                if encoding not in ("idna", "mbcs"):
1991                    # check incremental decoder/encoder with errors argument
1992                    try:
1993                        encoder = codecs.getincrementalencoder(encoding)("ignore")
1994                    except LookupError:  # no IncrementalEncoder
1995                        pass
1996                    else:
1997                        encodedresult = b"".join(encoder.encode(c) for c in s)
1998                        decoder = codecs.getincrementaldecoder(encoding)("ignore")
1999                        decodedresult = "".join(decoder.decode(bytes([c]))
2000                                                for c in encodedresult)
2001                        self.assertEqual(decodedresult, s,
2002                                         "encoding=%r" % encoding)
2003
2004    @support.cpython_only
2005    @unittest.skipIf(_testcapi is None, 'need _testcapi module')
2006    def test_basics_capi(self):
2007        s = "abc123"  # all codecs should be able to encode these
2008        for encoding in all_unicode_encodings:
2009            if encoding not in broken_unicode_with_stateful:
2010                # check incremental decoder/encoder (fetched via the C API)
2011                try:
2012                    cencoder = _testcapi.codec_incrementalencoder(encoding)
2013                except LookupError:  # no IncrementalEncoder
2014                    pass
2015                else:
2016                    # check C API
2017                    encodedresult = b""
2018                    for c in s:
2019                        encodedresult += cencoder.encode(c)
2020                    encodedresult += cencoder.encode("", True)
2021                    cdecoder = _testcapi.codec_incrementaldecoder(encoding)
2022                    decodedresult = ""
2023                    for c in encodedresult:
2024                        decodedresult += cdecoder.decode(bytes([c]))
2025                    decodedresult += cdecoder.decode(b"", True)
2026                    self.assertEqual(decodedresult, s,
2027                                     "encoding=%r" % encoding)
2028
2029                if encoding not in ("idna", "mbcs"):
2030                    # check incremental decoder/encoder with errors argument
2031                    try:
2032                        cencoder = _testcapi.codec_incrementalencoder(encoding, "ignore")
2033                    except LookupError:  # no IncrementalEncoder
2034                        pass
2035                    else:
2036                        encodedresult = b"".join(cencoder.encode(c) for c in s)
2037                        cdecoder = _testcapi.codec_incrementaldecoder(encoding, "ignore")
2038                        decodedresult = "".join(cdecoder.decode(bytes([c]))
2039                                                for c in encodedresult)
2040                        self.assertEqual(decodedresult, s,
2041                                         "encoding=%r" % encoding)
2042
2043    def test_seek(self):
2044        # all codecs should be able to encode these
2045        s = "%s\n%s\n" % (100*"abc123", 100*"def456")
2046        for encoding in all_unicode_encodings:
2047            if encoding == "idna": # FIXME: See SF bug #1163178
2048                continue
2049            if encoding in broken_unicode_with_stateful:
2050                continue
2051            reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
2052            for t in range(5):
2053                # Test that calling seek resets the internal codec state and buffers
2054                reader.seek(0, 0)
2055                data = reader.read()
2056                self.assertEqual(s, data)
2057
2058    def test_bad_decode_args(self):
2059        for encoding in all_unicode_encodings:
2060            decoder = codecs.getdecoder(encoding)
2061            self.assertRaises(TypeError, decoder)
2062            if encoding not in ("idna", "punycode"):
2063                self.assertRaises(TypeError, decoder, 42)
2064
2065    def test_bad_encode_args(self):
2066        for encoding in all_unicode_encodings:
2067            encoder = codecs.getencoder(encoding)
2068            self.assertRaises(TypeError, encoder)
2069
2070    def test_encoding_map_type_initialized(self):
2071        from encodings import cp1140
2072        # This used to crash, we are only verifying there's no crash.
2073        table_type = type(cp1140.encoding_table)
2074        self.assertEqual(table_type, table_type)
2075
2076    def test_decoder_state(self):
2077        # Check that getstate() and setstate() handle the state properly
2078        u = "abc123"
2079        for encoding in all_unicode_encodings:
2080            if encoding not in broken_unicode_with_stateful:
2081                self.check_state_handling_decode(encoding, u, u.encode(encoding))
2082                self.check_state_handling_encode(encoding, u, u.encode(encoding))
2083
2084
2085class CharmapTest(unittest.TestCase):
2086    def test_decode_with_string_map(self):
2087        self.assertEqual(
2088            codecs.charmap_decode(b"\x00\x01\x02", "strict", "abc"),
2089            ("abc", 3)
2090        )
2091
2092        self.assertEqual(
2093            codecs.charmap_decode(b"\x00\x01\x02", "strict", "\U0010FFFFbc"),
2094            ("\U0010FFFFbc", 3)
2095        )
2096
2097        self.assertRaises(UnicodeDecodeError,
2098            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab"
2099        )
2100
2101        self.assertRaises(UnicodeDecodeError,
2102            codecs.charmap_decode, b"\x00\x01\x02", "strict", "ab\ufffe"
2103        )
2104
2105        self.assertEqual(
2106            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab"),
2107            ("ab\ufffd", 3)
2108        )
2109
2110        self.assertEqual(
2111            codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab\ufffe"),
2112            ("ab\ufffd", 3)
2113        )
2114
2115        self.assertEqual(
2116            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab"),
2117            ("ab\\x02", 3)
2118        )
2119
2120        self.assertEqual(
2121            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace", "ab\ufffe"),
2122            ("ab\\x02", 3)
2123        )
2124
2125        self.assertEqual(
2126            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab"),
2127            ("ab", 3)
2128        )
2129
2130        self.assertEqual(
2131            codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab\ufffe"),
2132            ("ab", 3)
2133        )
2134
2135        allbytes = bytes(range(256))
2136        self.assertEqual(
2137            codecs.charmap_decode(allbytes, "ignore", ""),
2138            ("", len(allbytes))
2139        )
2140
2141    def test_decode_with_int2str_map(self):
2142        self.assertEqual(
2143            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2144                                  {0: 'a', 1: 'b', 2: 'c'}),
2145            ("abc", 3)
2146        )
2147
2148        self.assertEqual(
2149            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2150                                  {0: 'Aa', 1: 'Bb', 2: 'Cc'}),
2151            ("AaBbCc", 3)
2152        )
2153
2154        self.assertEqual(
2155            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2156                                  {0: '\U0010FFFF', 1: 'b', 2: 'c'}),
2157            ("\U0010FFFFbc", 3)
2158        )
2159
2160        self.assertEqual(
2161            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2162                                  {0: 'a', 1: 'b', 2: ''}),
2163            ("ab", 3)
2164        )
2165
2166        self.assertRaises(UnicodeDecodeError,
2167            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2168                                   {0: 'a', 1: 'b'}
2169        )
2170
2171        self.assertRaises(UnicodeDecodeError,
2172            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2173                                   {0: 'a', 1: 'b', 2: None}
2174        )
2175
2176        # Issue #14850
2177        self.assertRaises(UnicodeDecodeError,
2178            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2179                                   {0: 'a', 1: 'b', 2: '\ufffe'}
2180        )
2181
2182        self.assertEqual(
2183            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2184                                  {0: 'a', 1: 'b'}),
2185            ("ab\ufffd", 3)
2186        )
2187
2188        self.assertEqual(
2189            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2190                                  {0: 'a', 1: 'b', 2: None}),
2191            ("ab\ufffd", 3)
2192        )
2193
2194        # Issue #14850
2195        self.assertEqual(
2196            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2197                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2198            ("ab\ufffd", 3)
2199        )
2200
2201        self.assertEqual(
2202            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2203                                  {0: 'a', 1: 'b'}),
2204            ("ab\\x02", 3)
2205        )
2206
2207        self.assertEqual(
2208            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2209                                  {0: 'a', 1: 'b', 2: None}),
2210            ("ab\\x02", 3)
2211        )
2212
2213        # Issue #14850
2214        self.assertEqual(
2215            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2216                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2217            ("ab\\x02", 3)
2218        )
2219
2220        self.assertEqual(
2221            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2222                                  {0: 'a', 1: 'b'}),
2223            ("ab", 3)
2224        )
2225
2226        self.assertEqual(
2227            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2228                                  {0: 'a', 1: 'b', 2: None}),
2229            ("ab", 3)
2230        )
2231
2232        # Issue #14850
2233        self.assertEqual(
2234            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2235                                  {0: 'a', 1: 'b', 2: '\ufffe'}),
2236            ("ab", 3)
2237        )
2238
2239        allbytes = bytes(range(256))
2240        self.assertEqual(
2241            codecs.charmap_decode(allbytes, "ignore", {}),
2242            ("", len(allbytes))
2243        )
2244
2245        self.assertRaisesRegex(TypeError,
2246            "character mapping must be in range\\(0x110000\\)",
2247            codecs.charmap_decode,
2248            b"\x00\x01\x02", "strict", {0: "A", 1: 'Bb', 2: -2}
2249        )
2250
2251        self.assertRaisesRegex(TypeError,
2252            "character mapping must be in range\\(0x110000\\)",
2253            codecs.charmap_decode,
2254            b"\x00\x01\x02", "strict", {0: "A", 1: 'Bb', 2: 999999999}
2255        )
2256
2257    def test_decode_with_int2int_map(self):
2258        a = ord('a')
2259        b = ord('b')
2260        c = ord('c')
2261
2262        self.assertEqual(
2263            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2264                                  {0: a, 1: b, 2: c}),
2265            ("abc", 3)
2266        )
2267
2268        # Issue #15379
2269        self.assertEqual(
2270            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2271                                  {0: 0x10FFFF, 1: b, 2: c}),
2272            ("\U0010FFFFbc", 3)
2273        )
2274
2275        self.assertEqual(
2276            codecs.charmap_decode(b"\x00\x01\x02", "strict",
2277                                  {0: sys.maxunicode, 1: b, 2: c}),
2278            (chr(sys.maxunicode) + "bc", 3)
2279        )
2280
2281        self.assertRaises(TypeError,
2282            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2283                                   {0: sys.maxunicode + 1, 1: b, 2: c}
2284        )
2285
2286        self.assertRaises(UnicodeDecodeError,
2287            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2288                                   {0: a, 1: b},
2289        )
2290
2291        self.assertRaises(UnicodeDecodeError,
2292            codecs.charmap_decode, b"\x00\x01\x02", "strict",
2293                                   {0: a, 1: b, 2: 0xFFFE},
2294        )
2295
2296        self.assertEqual(
2297            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2298                                  {0: a, 1: b}),
2299            ("ab\ufffd", 3)
2300        )
2301
2302        self.assertEqual(
2303            codecs.charmap_decode(b"\x00\x01\x02", "replace",
2304                                  {0: a, 1: b, 2: 0xFFFE}),
2305            ("ab\ufffd", 3)
2306        )
2307
2308        self.assertEqual(
2309            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2310                                  {0: a, 1: b}),
2311            ("ab\\x02", 3)
2312        )
2313
2314        self.assertEqual(
2315            codecs.charmap_decode(b"\x00\x01\x02", "backslashreplace",
2316                                  {0: a, 1: b, 2: 0xFFFE}),
2317            ("ab\\x02", 3)
2318        )
2319
2320        self.assertEqual(
2321            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2322                                  {0: a, 1: b}),
2323            ("ab", 3)
2324        )
2325
2326        self.assertEqual(
2327            codecs.charmap_decode(b"\x00\x01\x02", "ignore",
2328                                  {0: a, 1: b, 2: 0xFFFE}),
2329            ("ab", 3)
2330        )
2331
2332
2333class WithStmtTest(unittest.TestCase):
2334    def test_encodedfile(self):
2335        f = io.BytesIO(b"\xc3\xbc")
2336        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
2337            self.assertEqual(ef.read(), b"\xfc")
2338        self.assertTrue(f.closed)
2339
2340    def test_streamreaderwriter(self):
2341        f = io.BytesIO(b"\xc3\xbc")
2342        info = codecs.lookup("utf-8")
2343        with codecs.StreamReaderWriter(f, info.streamreader,
2344                                       info.streamwriter, 'strict') as srw:
2345            self.assertEqual(srw.read(), "\xfc")
2346
2347
2348class TypesTest(unittest.TestCase):
2349    def test_decode_unicode(self):
2350        # Most decoders don't accept unicode input
2351        decoders = [
2352            codecs.utf_7_decode,
2353            codecs.utf_8_decode,
2354            codecs.utf_16_le_decode,
2355            codecs.utf_16_be_decode,
2356            codecs.utf_16_ex_decode,
2357            codecs.utf_32_decode,
2358            codecs.utf_32_le_decode,
2359            codecs.utf_32_be_decode,
2360            codecs.utf_32_ex_decode,
2361            codecs.latin_1_decode,
2362            codecs.ascii_decode,
2363            codecs.charmap_decode,
2364        ]
2365        if hasattr(codecs, "mbcs_decode"):
2366            decoders.append(codecs.mbcs_decode)
2367        for decoder in decoders:
2368            self.assertRaises(TypeError, decoder, "xxx")
2369
2370    def test_unicode_escape(self):
2371        # Escape-decoding a unicode string is supported and gives the same
2372        # result as decoding the equivalent ASCII bytes string.
2373        self.assertEqual(codecs.unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2374        self.assertEqual(codecs.unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2375        self.assertEqual(codecs.raw_unicode_escape_decode(r"\u1234"), ("\u1234", 6))
2376        self.assertEqual(codecs.raw_unicode_escape_decode(br"\u1234"), ("\u1234", 6))
2377
2378        self.assertRaises(UnicodeDecodeError, codecs.unicode_escape_decode, br"\U00110000")
2379        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2380        self.assertEqual(codecs.unicode_escape_decode(r"\U00110000", "backslashreplace"),
2381                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2382
2383        self.assertRaises(UnicodeDecodeError, codecs.raw_unicode_escape_decode, br"\U00110000")
2384        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "replace"), ("\ufffd", 10))
2385        self.assertEqual(codecs.raw_unicode_escape_decode(r"\U00110000", "backslashreplace"),
2386                         (r"\x5c\x55\x30\x30\x31\x31\x30\x30\x30\x30", 10))
2387
2388
2389class UnicodeEscapeTest(ReadTest, unittest.TestCase):
2390    encoding = "unicode-escape"
2391
2392    test_lone_surrogates = None
2393
2394    def test_empty(self):
2395        self.assertEqual(codecs.unicode_escape_encode(""), (b"", 0))
2396        self.assertEqual(codecs.unicode_escape_decode(b""), ("", 0))
2397
2398    def test_raw_encode(self):
2399        encode = codecs.unicode_escape_encode
2400        for b in range(32, 127):
2401            if b != b'\\'[0]:
2402                self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2403
2404    def test_raw_decode(self):
2405        decode = codecs.unicode_escape_decode
2406        for b in range(256):
2407            if b != b'\\'[0]:
2408                self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2409
2410    def test_escape_encode(self):
2411        encode = codecs.unicode_escape_encode
2412        check = coding_checker(self, encode)
2413        check('\t', br'\t')
2414        check('\n', br'\n')
2415        check('\r', br'\r')
2416        check('\\', br'\\')
2417        for b in range(32):
2418            if chr(b) not in '\t\n\r':
2419                check(chr(b), ('\\x%02x' % b).encode())
2420        for b in range(127, 256):
2421            check(chr(b), ('\\x%02x' % b).encode())
2422        check('\u20ac', br'\u20ac')
2423        check('\U0001d120', br'\U0001d120')
2424
2425    def test_escape_decode(self):
2426        decode = codecs.unicode_escape_decode
2427        check = coding_checker(self, decode)
2428        check(b"[\\\n]", "[]")
2429        check(br'[\"]', '["]')
2430        check(br"[\']", "[']")
2431        check(br"[\\]", r"[\]")
2432        check(br"[\a]", "[\x07]")
2433        check(br"[\b]", "[\x08]")
2434        check(br"[\t]", "[\x09]")
2435        check(br"[\n]", "[\x0a]")
2436        check(br"[\v]", "[\x0b]")
2437        check(br"[\f]", "[\x0c]")
2438        check(br"[\r]", "[\x0d]")
2439        check(br"[\7]", "[\x07]")
2440        check(br"[\78]", "[\x078]")
2441        check(br"[\41]", "[!]")
2442        check(br"[\418]", "[!8]")
2443        check(br"[\101]", "[A]")
2444        check(br"[\1010]", "[A0]")
2445        check(br"[\x41]", "[A]")
2446        check(br"[\x410]", "[A0]")
2447        check(br"\u20ac", "\u20ac")
2448        check(br"\U0001d120", "\U0001d120")
2449
2450    def test_decode_warnings(self):
2451        decode = codecs.unicode_escape_decode
2452        check = coding_checker(self, decode)
2453        for i in range(97, 123):
2454            b = bytes([i])
2455            if b not in b'abfnrtuvx':
2456                with self.assertWarnsRegex(DeprecationWarning,
2457                        r"invalid escape sequence '\\%c'" % i):
2458                    check(b"\\" + b, "\\" + chr(i))
2459            if b.upper() not in b'UN':
2460                with self.assertWarnsRegex(DeprecationWarning,
2461                        r"invalid escape sequence '\\%c'" % (i-32)):
2462                    check(b"\\" + b.upper(), "\\" + chr(i-32))
2463        with self.assertWarnsRegex(DeprecationWarning,
2464                r"invalid escape sequence '\\8'"):
2465            check(br"\8", "\\8")
2466        with self.assertWarns(DeprecationWarning):
2467            check(br"\9", "\\9")
2468        with self.assertWarnsRegex(DeprecationWarning,
2469                r"invalid escape sequence '\\\xfa'") as cm:
2470            check(b"\\\xfa", "\\\xfa")
2471        for i in range(0o400, 0o1000):
2472            with self.assertWarnsRegex(DeprecationWarning,
2473                    r"invalid octal escape sequence '\\%o'" % i):
2474                check(rb'\%o' % i, chr(i))
2475
2476        with self.assertWarnsRegex(DeprecationWarning,
2477                r"invalid escape sequence '\\z'"):
2478            self.assertEqual(decode(br'\x\z', 'ignore'), ('\\z', 4))
2479        with self.assertWarnsRegex(DeprecationWarning,
2480                r"invalid octal escape sequence '\\501'"):
2481            self.assertEqual(decode(br'\x\501', 'ignore'), ('\u0141', 6))
2482
2483    def test_decode_errors(self):
2484        decode = codecs.unicode_escape_decode
2485        for c, d in (b'x', 2), (b'u', 4), (b'U', 4):
2486            for i in range(d):
2487                self.assertRaises(UnicodeDecodeError, decode,
2488                                  b"\\" + c + b"0"*i)
2489                self.assertRaises(UnicodeDecodeError, decode,
2490                                  b"[\\" + c + b"0"*i + b"]")
2491                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2492                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2493                self.assertEqual(decode(data, "replace"),
2494                                 ("[\ufffd]\ufffd", len(data)))
2495        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2496        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2497        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2498
2499    def test_partial(self):
2500        self.check_partial(
2501            "\x00\t\n\r\\\xff\uffff\U00010000",
2502            [
2503                '',
2504                '',
2505                '',
2506                '\x00',
2507                '\x00',
2508                '\x00\t',
2509                '\x00\t',
2510                '\x00\t\n',
2511                '\x00\t\n',
2512                '\x00\t\n\r',
2513                '\x00\t\n\r',
2514                '\x00\t\n\r\\',
2515                '\x00\t\n\r\\',
2516                '\x00\t\n\r\\',
2517                '\x00\t\n\r\\',
2518                '\x00\t\n\r\\\xff',
2519                '\x00\t\n\r\\\xff',
2520                '\x00\t\n\r\\\xff',
2521                '\x00\t\n\r\\\xff',
2522                '\x00\t\n\r\\\xff',
2523                '\x00\t\n\r\\\xff',
2524                '\x00\t\n\r\\\xff\uffff',
2525                '\x00\t\n\r\\\xff\uffff',
2526                '\x00\t\n\r\\\xff\uffff',
2527                '\x00\t\n\r\\\xff\uffff',
2528                '\x00\t\n\r\\\xff\uffff',
2529                '\x00\t\n\r\\\xff\uffff',
2530                '\x00\t\n\r\\\xff\uffff',
2531                '\x00\t\n\r\\\xff\uffff',
2532                '\x00\t\n\r\\\xff\uffff',
2533                '\x00\t\n\r\\\xff\uffff',
2534                '\x00\t\n\r\\\xff\uffff\U00010000',
2535            ]
2536        )
2537
2538class RawUnicodeEscapeTest(ReadTest, unittest.TestCase):
2539    encoding = "raw-unicode-escape"
2540
2541    test_lone_surrogates = None
2542
2543    def test_empty(self):
2544        self.assertEqual(codecs.raw_unicode_escape_encode(""), (b"", 0))
2545        self.assertEqual(codecs.raw_unicode_escape_decode(b""), ("", 0))
2546
2547    def test_raw_encode(self):
2548        encode = codecs.raw_unicode_escape_encode
2549        for b in range(256):
2550            self.assertEqual(encode(chr(b)), (bytes([b]), 1))
2551
2552    def test_raw_decode(self):
2553        decode = codecs.raw_unicode_escape_decode
2554        for b in range(256):
2555            self.assertEqual(decode(bytes([b]) + b'0'), (chr(b) + '0', 2))
2556
2557    def test_escape_encode(self):
2558        encode = codecs.raw_unicode_escape_encode
2559        check = coding_checker(self, encode)
2560        for b in range(256):
2561            if b not in b'uU':
2562                check('\\' + chr(b), b'\\' + bytes([b]))
2563        check('\u20ac', br'\u20ac')
2564        check('\U0001d120', br'\U0001d120')
2565
2566    def test_escape_decode(self):
2567        decode = codecs.raw_unicode_escape_decode
2568        check = coding_checker(self, decode)
2569        for b in range(256):
2570            if b not in b'uU':
2571                check(b'\\' + bytes([b]), '\\' + chr(b))
2572        check(br"\u20ac", "\u20ac")
2573        check(br"\U0001d120", "\U0001d120")
2574
2575    def test_decode_errors(self):
2576        decode = codecs.raw_unicode_escape_decode
2577        for c, d in (b'u', 4), (b'U', 4):
2578            for i in range(d):
2579                self.assertRaises(UnicodeDecodeError, decode,
2580                                  b"\\" + c + b"0"*i)
2581                self.assertRaises(UnicodeDecodeError, decode,
2582                                  b"[\\" + c + b"0"*i + b"]")
2583                data = b"[\\" + c + b"0"*i + b"]\\" + c + b"0"*i
2584                self.assertEqual(decode(data, "ignore"), ("[]", len(data)))
2585                self.assertEqual(decode(data, "replace"),
2586                                 ("[\ufffd]\ufffd", len(data)))
2587        self.assertRaises(UnicodeDecodeError, decode, br"\U00110000")
2588        self.assertEqual(decode(br"\U00110000", "ignore"), ("", 10))
2589        self.assertEqual(decode(br"\U00110000", "replace"), ("\ufffd", 10))
2590
2591    def test_partial(self):
2592        self.check_partial(
2593            "\x00\t\n\r\\\xff\uffff\U00010000",
2594            [
2595                '\x00',
2596                '\x00\t',
2597                '\x00\t\n',
2598                '\x00\t\n\r',
2599                '\x00\t\n\r',
2600                '\x00\t\n\r\\\xff',
2601                '\x00\t\n\r\\\xff',
2602                '\x00\t\n\r\\\xff',
2603                '\x00\t\n\r\\\xff',
2604                '\x00\t\n\r\\\xff',
2605                '\x00\t\n\r\\\xff',
2606                '\x00\t\n\r\\\xff\uffff',
2607                '\x00\t\n\r\\\xff\uffff',
2608                '\x00\t\n\r\\\xff\uffff',
2609                '\x00\t\n\r\\\xff\uffff',
2610                '\x00\t\n\r\\\xff\uffff',
2611                '\x00\t\n\r\\\xff\uffff',
2612                '\x00\t\n\r\\\xff\uffff',
2613                '\x00\t\n\r\\\xff\uffff',
2614                '\x00\t\n\r\\\xff\uffff',
2615                '\x00\t\n\r\\\xff\uffff',
2616                '\x00\t\n\r\\\xff\uffff\U00010000',
2617            ]
2618        )
2619
2620
2621class EscapeEncodeTest(unittest.TestCase):
2622
2623    def test_escape_encode(self):
2624        tests = [
2625            (b'', (b'', 0)),
2626            (b'foobar', (b'foobar', 6)),
2627            (b'spam\0eggs', (b'spam\\x00eggs', 9)),
2628            (b'a\'b', (b"a\\'b", 3)),
2629            (b'b\\c', (b'b\\\\c', 3)),
2630            (b'c\nd', (b'c\\nd', 3)),
2631            (b'd\re', (b'd\\re', 3)),
2632            (b'f\x7fg', (b'f\\x7fg', 3)),
2633        ]
2634        for data, output in tests:
2635            with self.subTest(data=data):
2636                self.assertEqual(codecs.escape_encode(data), output)
2637        self.assertRaises(TypeError, codecs.escape_encode, 'spam')
2638        self.assertRaises(TypeError, codecs.escape_encode, bytearray(b'spam'))
2639
2640
2641class SurrogateEscapeTest(unittest.TestCase):
2642
2643    def test_utf8(self):
2644        # Bad byte
2645        self.assertEqual(b"foo\x80bar".decode("utf-8", "surrogateescape"),
2646                         "foo\udc80bar")
2647        self.assertEqual("foo\udc80bar".encode("utf-8", "surrogateescape"),
2648                         b"foo\x80bar")
2649        # bad-utf-8 encoded surrogate
2650        self.assertEqual(b"\xed\xb0\x80".decode("utf-8", "surrogateescape"),
2651                         "\udced\udcb0\udc80")
2652        self.assertEqual("\udced\udcb0\udc80".encode("utf-8", "surrogateescape"),
2653                         b"\xed\xb0\x80")
2654
2655    def test_ascii(self):
2656        # bad byte
2657        self.assertEqual(b"foo\x80bar".decode("ascii", "surrogateescape"),
2658                         "foo\udc80bar")
2659        self.assertEqual("foo\udc80bar".encode("ascii", "surrogateescape"),
2660                         b"foo\x80bar")
2661
2662    def test_charmap(self):
2663        # bad byte: \xa5 is unmapped in iso-8859-3
2664        self.assertEqual(b"foo\xa5bar".decode("iso-8859-3", "surrogateescape"),
2665                         "foo\udca5bar")
2666        self.assertEqual("foo\udca5bar".encode("iso-8859-3", "surrogateescape"),
2667                         b"foo\xa5bar")
2668
2669    def test_latin1(self):
2670        # Issue6373
2671        self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin-1", "surrogateescape"),
2672                         b"\xe4\xeb\xef\xf6\xfc")
2673
2674
2675class BomTest(unittest.TestCase):
2676    def test_seek0(self):
2677        data = "1234567890"
2678        tests = ("utf-16",
2679                 "utf-16-le",
2680                 "utf-16-be",
2681                 "utf-32",
2682                 "utf-32-le",
2683                 "utf-32-be")
2684        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2685        for encoding in tests:
2686            # Check if the BOM is written only once
2687            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2688                f.write(data)
2689                f.write(data)
2690                f.seek(0)
2691                self.assertEqual(f.read(), data * 2)
2692                f.seek(0)
2693                self.assertEqual(f.read(), data * 2)
2694
2695            # Check that the BOM is written after a seek(0)
2696            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2697                f.write(data[0])
2698                self.assertNotEqual(f.tell(), 0)
2699                f.seek(0)
2700                f.write(data)
2701                f.seek(0)
2702                self.assertEqual(f.read(), data)
2703
2704            # (StreamWriter) Check that the BOM is written after a seek(0)
2705            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2706                f.writer.write(data[0])
2707                self.assertNotEqual(f.writer.tell(), 0)
2708                f.writer.seek(0)
2709                f.writer.write(data)
2710                f.seek(0)
2711                self.assertEqual(f.read(), data)
2712
2713            # Check that the BOM is not written after a seek() at a position
2714            # different than the start
2715            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2716                f.write(data)
2717                f.seek(f.tell())
2718                f.write(data)
2719                f.seek(0)
2720                self.assertEqual(f.read(), data * 2)
2721
2722            # (StreamWriter) Check that the BOM is not written after a seek()
2723            # at a position different than the start
2724            with codecs.open(os_helper.TESTFN, 'w+', encoding=encoding) as f:
2725                f.writer.write(data)
2726                f.writer.seek(f.writer.tell())
2727                f.writer.write(data)
2728                f.seek(0)
2729                self.assertEqual(f.read(), data * 2)
2730
2731
2732bytes_transform_encodings = [
2733    "base64_codec",
2734    "uu_codec",
2735    "quopri_codec",
2736    "hex_codec",
2737]
2738
2739transform_aliases = {
2740    "base64_codec": ["base64", "base_64"],
2741    "uu_codec": ["uu"],
2742    "quopri_codec": ["quopri", "quoted_printable", "quotedprintable"],
2743    "hex_codec": ["hex"],
2744    "rot_13": ["rot13"],
2745}
2746
2747try:
2748    import zlib
2749except ImportError:
2750    zlib = None
2751else:
2752    bytes_transform_encodings.append("zlib_codec")
2753    transform_aliases["zlib_codec"] = ["zip", "zlib"]
2754try:
2755    import bz2
2756except ImportError:
2757    pass
2758else:
2759    bytes_transform_encodings.append("bz2_codec")
2760    transform_aliases["bz2_codec"] = ["bz2"]
2761
2762
2763class TransformCodecTest(unittest.TestCase):
2764
2765    def test_basics(self):
2766        binput = bytes(range(256))
2767        for encoding in bytes_transform_encodings:
2768            with self.subTest(encoding=encoding):
2769                # generic codecs interface
2770                (o, size) = codecs.getencoder(encoding)(binput)
2771                self.assertEqual(size, len(binput))
2772                (i, size) = codecs.getdecoder(encoding)(o)
2773                self.assertEqual(size, len(o))
2774                self.assertEqual(i, binput)
2775
2776    def test_read(self):
2777        for encoding in bytes_transform_encodings:
2778            with self.subTest(encoding=encoding):
2779                sin = codecs.encode(b"\x80", encoding)
2780                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2781                sout = reader.read()
2782                self.assertEqual(sout, b"\x80")
2783
2784    def test_readline(self):
2785        for encoding in bytes_transform_encodings:
2786            with self.subTest(encoding=encoding):
2787                sin = codecs.encode(b"\x80", encoding)
2788                reader = codecs.getreader(encoding)(io.BytesIO(sin))
2789                sout = reader.readline()
2790                self.assertEqual(sout, b"\x80")
2791
2792    def test_buffer_api_usage(self):
2793        # We check all the transform codecs accept memoryview input
2794        # for encoding and decoding
2795        # and also that they roundtrip correctly
2796        original = b"12345\x80"
2797        for encoding in bytes_transform_encodings:
2798            with self.subTest(encoding=encoding):
2799                data = original
2800                view = memoryview(data)
2801                data = codecs.encode(data, encoding)
2802                view_encoded = codecs.encode(view, encoding)
2803                self.assertEqual(view_encoded, data)
2804                view = memoryview(data)
2805                data = codecs.decode(data, encoding)
2806                self.assertEqual(data, original)
2807                view_decoded = codecs.decode(view, encoding)
2808                self.assertEqual(view_decoded, data)
2809
2810    def test_text_to_binary_denylists_binary_transforms(self):
2811        # Check binary -> binary codecs give a good error for str input
2812        bad_input = "bad input type"
2813        for encoding in bytes_transform_encodings:
2814            with self.subTest(encoding=encoding):
2815                fmt = (r"{!r} is not a text encoding; "
2816                       r"use codecs.encode\(\) to handle arbitrary codecs")
2817                msg = fmt.format(encoding)
2818                with self.assertRaisesRegex(LookupError, msg) as failure:
2819                    bad_input.encode(encoding)
2820                self.assertIsNone(failure.exception.__cause__)
2821
2822    def test_text_to_binary_denylists_text_transforms(self):
2823        # Check str.encode gives a good error message for str -> str codecs
2824        msg = (r"^'rot_13' is not a text encoding; "
2825               r"use codecs.encode\(\) to handle arbitrary codecs")
2826        with self.assertRaisesRegex(LookupError, msg):
2827            "just an example message".encode("rot_13")
2828
2829    def test_binary_to_text_denylists_binary_transforms(self):
2830        # Check bytes.decode and bytearray.decode give a good error
2831        # message for binary -> binary codecs
2832        data = b"encode first to ensure we meet any format restrictions"
2833        for encoding in bytes_transform_encodings:
2834            with self.subTest(encoding=encoding):
2835                encoded_data = codecs.encode(data, encoding)
2836                fmt = (r"{!r} is not a text encoding; "
2837                       r"use codecs.decode\(\) to handle arbitrary codecs")
2838                msg = fmt.format(encoding)
2839                with self.assertRaisesRegex(LookupError, msg):
2840                    encoded_data.decode(encoding)
2841                with self.assertRaisesRegex(LookupError, msg):
2842                    bytearray(encoded_data).decode(encoding)
2843
2844    def test_binary_to_text_denylists_text_transforms(self):
2845        # Check str -> str codec gives a good error for binary input
2846        for bad_input in (b"immutable", bytearray(b"mutable")):
2847            with self.subTest(bad_input=bad_input):
2848                msg = (r"^'rot_13' is not a text encoding; "
2849                       r"use codecs.decode\(\) to handle arbitrary codecs")
2850                with self.assertRaisesRegex(LookupError, msg) as failure:
2851                    bad_input.decode("rot_13")
2852                self.assertIsNone(failure.exception.__cause__)
2853
2854    @unittest.skipUnless(zlib, "Requires zlib support")
2855    def test_custom_zlib_error_is_wrapped(self):
2856        # Check zlib codec gives a good error for malformed input
2857        msg = "^decoding with 'zlib_codec' codec failed"
2858        with self.assertRaisesRegex(Exception, msg) as failure:
2859            codecs.decode(b"hello", "zlib_codec")
2860        self.assertIsInstance(failure.exception.__cause__,
2861                                                type(failure.exception))
2862
2863    def test_custom_hex_error_is_wrapped(self):
2864        # Check hex codec gives a good error for malformed input
2865        msg = "^decoding with 'hex_codec' codec failed"
2866        with self.assertRaisesRegex(Exception, msg) as failure:
2867            codecs.decode(b"hello", "hex_codec")
2868        self.assertIsInstance(failure.exception.__cause__,
2869                                                type(failure.exception))
2870
2871    # Unfortunately, the bz2 module throws OSError, which the codec
2872    # machinery currently can't wrap :(
2873
2874    # Ensure codec aliases from http://bugs.python.org/issue7475 work
2875    def test_aliases(self):
2876        for codec_name, aliases in transform_aliases.items():
2877            expected_name = codecs.lookup(codec_name).name
2878            for alias in aliases:
2879                with self.subTest(alias=alias):
2880                    info = codecs.lookup(alias)
2881                    self.assertEqual(info.name, expected_name)
2882
2883    def test_quopri_stateless(self):
2884        # Should encode with quotetabs=True
2885        encoded = codecs.encode(b"space tab\teol \n", "quopri-codec")
2886        self.assertEqual(encoded, b"space=20tab=09eol=20\n")
2887        # But should still support unescaped tabs and spaces
2888        unescaped = b"space tab eol\n"
2889        self.assertEqual(codecs.decode(unescaped, "quopri-codec"), unescaped)
2890
2891    def test_uu_invalid(self):
2892        # Missing "begin" line
2893        self.assertRaises(ValueError, codecs.decode, b"", "uu-codec")
2894
2895
2896# The codec system tries to wrap exceptions in order to ensure the error
2897# mentions the operation being performed and the codec involved. We
2898# currently *only* want this to happen for relatively stateless
2899# exceptions, where the only significant information they contain is their
2900# type and a single str argument.
2901
2902# Use a local codec registry to avoid appearing to leak objects when
2903# registering multiple search functions
2904_TEST_CODECS = {}
2905
2906def _get_test_codec(codec_name):
2907    return _TEST_CODECS.get(codec_name)
2908
2909
2910class ExceptionChainingTest(unittest.TestCase):
2911
2912    def setUp(self):
2913        self.codec_name = 'exception_chaining_test'
2914        codecs.register(_get_test_codec)
2915        self.addCleanup(codecs.unregister, _get_test_codec)
2916
2917        # We store the object to raise on the instance because of a bad
2918        # interaction between the codec caching (which means we can't
2919        # recreate the codec entry) and regrtest refleak hunting (which
2920        # runs the same test instance multiple times). This means we
2921        # need to ensure the codecs call back in to the instance to find
2922        # out which exception to raise rather than binding them in a
2923        # closure to an object that may change on the next run
2924        self.obj_to_raise = RuntimeError
2925
2926    def tearDown(self):
2927        _TEST_CODECS.pop(self.codec_name, None)
2928        # Issue #22166: Also pop from caches to avoid appearance of ref leaks
2929        encodings._cache.pop(self.codec_name, None)
2930
2931    def set_codec(self, encode, decode):
2932        codec_info = codecs.CodecInfo(encode, decode,
2933                                      name=self.codec_name)
2934        _TEST_CODECS[self.codec_name] = codec_info
2935
2936    @contextlib.contextmanager
2937    def assertWrapped(self, operation, exc_type, msg):
2938        full_msg = r"{} with {!r} codec failed \({}: {}\)".format(
2939                  operation, self.codec_name, exc_type.__name__, msg)
2940        with self.assertRaisesRegex(exc_type, full_msg) as caught:
2941            yield caught
2942        self.assertIsInstance(caught.exception.__cause__, exc_type)
2943        self.assertIsNotNone(caught.exception.__cause__.__traceback__)
2944
2945    def raise_obj(self, *args, **kwds):
2946        # Helper to dynamically change the object raised by a test codec
2947        raise self.obj_to_raise
2948
2949    def check_wrapped(self, obj_to_raise, msg, exc_type=RuntimeError):
2950        self.obj_to_raise = obj_to_raise
2951        self.set_codec(self.raise_obj, self.raise_obj)
2952        with self.assertWrapped("encoding", exc_type, msg):
2953            "str_input".encode(self.codec_name)
2954        with self.assertWrapped("encoding", exc_type, msg):
2955            codecs.encode("str_input", self.codec_name)
2956        with self.assertWrapped("decoding", exc_type, msg):
2957            b"bytes input".decode(self.codec_name)
2958        with self.assertWrapped("decoding", exc_type, msg):
2959            codecs.decode(b"bytes input", self.codec_name)
2960
2961    def test_raise_by_type(self):
2962        self.check_wrapped(RuntimeError, "")
2963
2964    def test_raise_by_value(self):
2965        msg = "This should be wrapped"
2966        self.check_wrapped(RuntimeError(msg), msg)
2967
2968    def test_raise_grandchild_subclass_exact_size(self):
2969        msg = "This should be wrapped"
2970        class MyRuntimeError(RuntimeError):
2971            __slots__ = ()
2972        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2973
2974    def test_raise_subclass_with_weakref_support(self):
2975        msg = "This should be wrapped"
2976        class MyRuntimeError(RuntimeError):
2977            pass
2978        self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError)
2979
2980    def check_not_wrapped(self, obj_to_raise, msg):
2981        def raise_obj(*args, **kwds):
2982            raise obj_to_raise
2983        self.set_codec(raise_obj, raise_obj)
2984        with self.assertRaisesRegex(RuntimeError, msg):
2985            "str input".encode(self.codec_name)
2986        with self.assertRaisesRegex(RuntimeError, msg):
2987            codecs.encode("str input", self.codec_name)
2988        with self.assertRaisesRegex(RuntimeError, msg):
2989            b"bytes input".decode(self.codec_name)
2990        with self.assertRaisesRegex(RuntimeError, msg):
2991            codecs.decode(b"bytes input", self.codec_name)
2992
2993    def test_init_override_is_not_wrapped(self):
2994        class CustomInit(RuntimeError):
2995            def __init__(self):
2996                pass
2997        self.check_not_wrapped(CustomInit, "")
2998
2999    def test_new_override_is_not_wrapped(self):
3000        class CustomNew(RuntimeError):
3001            def __new__(cls):
3002                return super().__new__(cls)
3003        self.check_not_wrapped(CustomNew, "")
3004
3005    def test_instance_attribute_is_not_wrapped(self):
3006        msg = "This should NOT be wrapped"
3007        exc = RuntimeError(msg)
3008        exc.attr = 1
3009        self.check_not_wrapped(exc, "^{}$".format(msg))
3010
3011    def test_non_str_arg_is_not_wrapped(self):
3012        self.check_not_wrapped(RuntimeError(1), "1")
3013
3014    def test_multiple_args_is_not_wrapped(self):
3015        msg_re = r"^\('a', 'b', 'c'\)$"
3016        self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg_re)
3017
3018    # http://bugs.python.org/issue19609
3019    def test_codec_lookup_failure_not_wrapped(self):
3020        msg = "^unknown encoding: {}$".format(self.codec_name)
3021        # The initial codec lookup should not be wrapped
3022        with self.assertRaisesRegex(LookupError, msg):
3023            "str input".encode(self.codec_name)
3024        with self.assertRaisesRegex(LookupError, msg):
3025            codecs.encode("str input", self.codec_name)
3026        with self.assertRaisesRegex(LookupError, msg):
3027            b"bytes input".decode(self.codec_name)
3028        with self.assertRaisesRegex(LookupError, msg):
3029            codecs.decode(b"bytes input", self.codec_name)
3030
3031    def test_unflagged_non_text_codec_handling(self):
3032        # The stdlib non-text codecs are now marked so they're
3033        # pre-emptively skipped by the text model related methods
3034        # However, third party codecs won't be flagged, so we still make
3035        # sure the case where an inappropriate output type is produced is
3036        # handled appropriately
3037        def encode_to_str(*args, **kwds):
3038            return "not bytes!", 0
3039        def decode_to_bytes(*args, **kwds):
3040            return b"not str!", 0
3041        self.set_codec(encode_to_str, decode_to_bytes)
3042        # No input or output type checks on the codecs module functions
3043        encoded = codecs.encode(None, self.codec_name)
3044        self.assertEqual(encoded, "not bytes!")
3045        decoded = codecs.decode(None, self.codec_name)
3046        self.assertEqual(decoded, b"not str!")
3047        # Text model methods should complain
3048        fmt = (r"^{!r} encoder returned 'str' instead of 'bytes'; "
3049               r"use codecs.encode\(\) to encode to arbitrary types$")
3050        msg = fmt.format(self.codec_name)
3051        with self.assertRaisesRegex(TypeError, msg):
3052            "str_input".encode(self.codec_name)
3053        fmt = (r"^{!r} decoder returned 'bytes' instead of 'str'; "
3054               r"use codecs.decode\(\) to decode to arbitrary types$")
3055        msg = fmt.format(self.codec_name)
3056        with self.assertRaisesRegex(TypeError, msg):
3057            b"bytes input".decode(self.codec_name)
3058
3059
3060
3061@unittest.skipUnless(sys.platform == 'win32',
3062                     'code pages are specific to Windows')
3063class CodePageTest(unittest.TestCase):
3064    CP_UTF8 = 65001
3065
3066    def test_invalid_code_page(self):
3067        self.assertRaises(ValueError, codecs.code_page_encode, -1, 'a')
3068        self.assertRaises(ValueError, codecs.code_page_decode, -1, b'a')
3069        self.assertRaises(OSError, codecs.code_page_encode, 123, 'a')
3070        self.assertRaises(OSError, codecs.code_page_decode, 123, b'a')
3071
3072    def test_code_page_name(self):
3073        self.assertRaisesRegex(UnicodeEncodeError, 'cp932',
3074            codecs.code_page_encode, 932, '\xff')
3075        self.assertRaisesRegex(UnicodeDecodeError, 'cp932',
3076            codecs.code_page_decode, 932, b'\x81\x00', 'strict', True)
3077        self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8',
3078            codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True)
3079
3080    def check_decode(self, cp, tests):
3081        for raw, errors, expected in tests:
3082            if expected is not None:
3083                try:
3084                    decoded = codecs.code_page_decode(cp, raw, errors, True)
3085                except UnicodeDecodeError as err:
3086                    self.fail('Unable to decode %a from "cp%s" with '
3087                              'errors=%r: %s' % (raw, cp, errors, err))
3088                self.assertEqual(decoded[0], expected,
3089                    '%a.decode("cp%s", %r)=%a != %a'
3090                    % (raw, cp, errors, decoded[0], expected))
3091                # assert 0 <= decoded[1] <= len(raw)
3092                self.assertGreaterEqual(decoded[1], 0)
3093                self.assertLessEqual(decoded[1], len(raw))
3094            else:
3095                self.assertRaises(UnicodeDecodeError,
3096                    codecs.code_page_decode, cp, raw, errors, True)
3097
3098    def check_encode(self, cp, tests):
3099        for text, errors, expected in tests:
3100            if expected is not None:
3101                try:
3102                    encoded = codecs.code_page_encode(cp, text, errors)
3103                except UnicodeEncodeError as err:
3104                    self.fail('Unable to encode %a to "cp%s" with '
3105                              'errors=%r: %s' % (text, cp, errors, err))
3106                self.assertEqual(encoded[0], expected,
3107                    '%a.encode("cp%s", %r)=%a != %a'
3108                    % (text, cp, errors, encoded[0], expected))
3109                self.assertEqual(encoded[1], len(text))
3110            else:
3111                self.assertRaises(UnicodeEncodeError,
3112                    codecs.code_page_encode, cp, text, errors)
3113
3114    def test_cp932(self):
3115        self.check_encode(932, (
3116            ('abc', 'strict', b'abc'),
3117            ('\uff44\u9a3e', 'strict', b'\x82\x84\xe9\x80'),
3118            # test error handlers
3119            ('\xff', 'strict', None),
3120            ('[\xff]', 'ignore', b'[]'),
3121            ('[\xff]', 'replace', b'[y]'),
3122            ('[\u20ac]', 'replace', b'[?]'),
3123            ('[\xff]', 'backslashreplace', b'[\\xff]'),
3124            ('[\xff]', 'namereplace',
3125             b'[\\N{LATIN SMALL LETTER Y WITH DIAERESIS}]'),
3126            ('[\xff]', 'xmlcharrefreplace', b'[&#255;]'),
3127            ('\udcff', 'strict', None),
3128            ('[\udcff]', 'surrogateescape', b'[\xff]'),
3129            ('[\udcff]', 'surrogatepass', None),
3130        ))
3131        self.check_decode(932, (
3132            (b'abc', 'strict', 'abc'),
3133            (b'\x82\x84\xe9\x80', 'strict', '\uff44\u9a3e'),
3134            # invalid bytes
3135            (b'[\xff]', 'strict', None),
3136            (b'[\xff]', 'ignore', '[]'),
3137            (b'[\xff]', 'replace', '[\ufffd]'),
3138            (b'[\xff]', 'backslashreplace', '[\\xff]'),
3139            (b'[\xff]', 'surrogateescape', '[\udcff]'),
3140            (b'[\xff]', 'surrogatepass', None),
3141            (b'\x81\x00abc', 'strict', None),
3142            (b'\x81\x00abc', 'ignore', '\x00abc'),
3143            (b'\x81\x00abc', 'replace', '\ufffd\x00abc'),
3144            (b'\x81\x00abc', 'backslashreplace', '\\x81\x00abc'),
3145        ))
3146
3147    def test_cp1252(self):
3148        self.check_encode(1252, (
3149            ('abc', 'strict', b'abc'),
3150            ('\xe9\u20ac', 'strict',  b'\xe9\x80'),
3151            ('\xff', 'strict', b'\xff'),
3152            # test error handlers
3153            ('\u0141', 'strict', None),
3154            ('\u0141', 'ignore', b''),
3155            ('\u0141', 'replace', b'L'),
3156            ('\udc98', 'surrogateescape', b'\x98'),
3157            ('\udc98', 'surrogatepass', None),
3158        ))
3159        self.check_decode(1252, (
3160            (b'abc', 'strict', 'abc'),
3161            (b'\xe9\x80', 'strict', '\xe9\u20ac'),
3162            (b'\xff', 'strict', '\xff'),
3163        ))
3164
3165    def test_cp_utf7(self):
3166        cp = 65000
3167        self.check_encode(cp, (
3168            ('abc', 'strict', b'abc'),
3169            ('\xe9\u20ac', 'strict',  b'+AOkgrA-'),
3170            ('\U0010ffff', 'strict',  b'+2//f/w-'),
3171            ('\udc80', 'strict', b'+3IA-'),
3172            ('\ufffd', 'strict', b'+//0-'),
3173        ))
3174        self.check_decode(cp, (
3175            (b'abc', 'strict', 'abc'),
3176            (b'+AOkgrA-', 'strict', '\xe9\u20ac'),
3177            (b'+2//f/w-', 'strict', '\U0010ffff'),
3178            (b'+3IA-', 'strict', '\udc80'),
3179            (b'+//0-', 'strict', '\ufffd'),
3180            # invalid bytes
3181            (b'[+/]', 'strict', '[]'),
3182            (b'[\xff]', 'strict', '[\xff]'),
3183        ))
3184
3185    def test_multibyte_encoding(self):
3186        self.check_decode(932, (
3187            (b'\x84\xe9\x80', 'ignore', '\u9a3e'),
3188            (b'\x84\xe9\x80', 'replace', '\ufffd\u9a3e'),
3189        ))
3190        self.check_decode(self.CP_UTF8, (
3191            (b'\xff\xf4\x8f\xbf\xbf', 'ignore', '\U0010ffff'),
3192            (b'\xff\xf4\x8f\xbf\xbf', 'replace', '\ufffd\U0010ffff'),
3193        ))
3194        self.check_encode(self.CP_UTF8, (
3195            ('[\U0010ffff\uDC80]', 'ignore', b'[\xf4\x8f\xbf\xbf]'),
3196            ('[\U0010ffff\uDC80]', 'replace', b'[\xf4\x8f\xbf\xbf?]'),
3197        ))
3198
3199    def test_code_page_decode_flags(self):
3200        # Issue #36312: For some code pages (e.g. UTF-7) flags for
3201        # MultiByteToWideChar() must be set to 0.
3202        if support.verbose:
3203            sys.stdout.write('\n')
3204        for cp in (50220, 50221, 50222, 50225, 50227, 50229,
3205                   *range(57002, 57011+1), 65000):
3206            # On small versions of Windows like Windows IoT
3207            # not all codepages are present.
3208            # A missing codepage causes an OSError exception
3209            # so check for the codepage before decoding
3210            if is_code_page_present(cp):
3211                self.assertEqual(codecs.code_page_decode(cp, b'abc'), ('abc', 3), f'cp{cp}')
3212            else:
3213                if support.verbose:
3214                    print(f"  skipping cp={cp}")
3215        self.assertEqual(codecs.code_page_decode(42, b'abc'),
3216                         ('\uf061\uf062\uf063', 3))
3217
3218    def test_incremental(self):
3219        decoded = codecs.code_page_decode(932, b'\x82', 'strict', False)
3220        self.assertEqual(decoded, ('', 0))
3221
3222        decoded = codecs.code_page_decode(932,
3223                                          b'\xe9\x80\xe9', 'strict',
3224                                          False)
3225        self.assertEqual(decoded, ('\u9a3e', 2))
3226
3227        decoded = codecs.code_page_decode(932,
3228                                          b'\xe9\x80\xe9\x80', 'strict',
3229                                          False)
3230        self.assertEqual(decoded, ('\u9a3e\u9a3e', 4))
3231
3232        decoded = codecs.code_page_decode(932,
3233                                          b'abc', 'strict',
3234                                          False)
3235        self.assertEqual(decoded, ('abc', 3))
3236
3237    def test_mbcs_alias(self):
3238        # Check that looking up our 'default' codepage will return
3239        # mbcs when we don't have a more specific one available
3240        code_page = 99_999
3241        name = f'cp{code_page}'
3242        with mock.patch('_winapi.GetACP', return_value=code_page):
3243            try:
3244                codec = codecs.lookup(name)
3245                self.assertEqual(codec.name, 'mbcs')
3246            finally:
3247                codecs.unregister(name)
3248
3249    @support.bigmemtest(size=2**31, memuse=7, dry_run=False)
3250    def test_large_input(self, size):
3251        # Test input longer than INT_MAX.
3252        # Input should contain undecodable bytes before and after
3253        # the INT_MAX limit.
3254        encoded = (b'01234567' * ((size//8)-1) +
3255                   b'\x85\x86\xea\xeb\xec\xef\xfc\xfd\xfe\xff')
3256        self.assertEqual(len(encoded), size+2)
3257        decoded = codecs.code_page_decode(932, encoded, 'surrogateescape', True)
3258        self.assertEqual(decoded[1], len(encoded))
3259        del encoded
3260        self.assertEqual(len(decoded[0]), decoded[1])
3261        self.assertEqual(decoded[0][:10], '0123456701')
3262        self.assertEqual(decoded[0][-20:],
3263                         '6701234567'
3264                         '\udc85\udc86\udcea\udceb\udcec'
3265                         '\udcef\udcfc\udcfd\udcfe\udcff')
3266
3267    @support.bigmemtest(size=2**31, memuse=6, dry_run=False)
3268    def test_large_utf8_input(self, size):
3269        # Test input longer than INT_MAX.
3270        # Input should contain a decodable multi-byte character
3271        # surrounding INT_MAX
3272        encoded = (b'0123456\xed\x84\x80' * (size//8))
3273        self.assertEqual(len(encoded), size // 8 * 10)
3274        decoded = codecs.code_page_decode(65001, encoded, 'ignore', True)
3275        self.assertEqual(decoded[1], len(encoded))
3276        del encoded
3277        self.assertEqual(len(decoded[0]), size)
3278        self.assertEqual(decoded[0][:10], '0123456\ud10001')
3279        self.assertEqual(decoded[0][-11:], '56\ud1000123456\ud100')
3280
3281
3282class ASCIITest(unittest.TestCase):
3283    def test_encode(self):
3284        self.assertEqual('abc123'.encode('ascii'), b'abc123')
3285
3286    def test_encode_error(self):
3287        for data, error_handler, expected in (
3288            ('[\x80\xff\u20ac]', 'ignore', b'[]'),
3289            ('[\x80\xff\u20ac]', 'replace', b'[???]'),
3290            ('[\x80\xff\u20ac]', 'xmlcharrefreplace', b'[&#128;&#255;&#8364;]'),
3291            ('[\x80\xff\u20ac\U000abcde]', 'backslashreplace',
3292             b'[\\x80\\xff\\u20ac\\U000abcde]'),
3293            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3294        ):
3295            with self.subTest(data=data, error_handler=error_handler,
3296                              expected=expected):
3297                self.assertEqual(data.encode('ascii', error_handler),
3298                                 expected)
3299
3300    def test_encode_surrogateescape_error(self):
3301        with self.assertRaises(UnicodeEncodeError):
3302            # the first character can be decoded, but not the second
3303            '\udc80\xff'.encode('ascii', 'surrogateescape')
3304
3305    def test_decode(self):
3306        self.assertEqual(b'abc'.decode('ascii'), 'abc')
3307
3308    def test_decode_error(self):
3309        for data, error_handler, expected in (
3310            (b'[\x80\xff]', 'ignore', '[]'),
3311            (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
3312            (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
3313            (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
3314        ):
3315            with self.subTest(data=data, error_handler=error_handler,
3316                              expected=expected):
3317                self.assertEqual(data.decode('ascii', error_handler),
3318                                 expected)
3319
3320
3321class Latin1Test(unittest.TestCase):
3322    def test_encode(self):
3323        for data, expected in (
3324            ('abc', b'abc'),
3325            ('\x80\xe9\xff', b'\x80\xe9\xff'),
3326        ):
3327            with self.subTest(data=data, expected=expected):
3328                self.assertEqual(data.encode('latin1'), expected)
3329
3330    def test_encode_errors(self):
3331        for data, error_handler, expected in (
3332            ('[\u20ac\udc80]', 'ignore', b'[]'),
3333            ('[\u20ac\udc80]', 'replace', b'[??]'),
3334            ('[\u20ac\U000abcde]', 'backslashreplace',
3335             b'[\\u20ac\\U000abcde]'),
3336            ('[\u20ac\udc80]', 'xmlcharrefreplace', b'[&#8364;&#56448;]'),
3337            ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
3338        ):
3339            with self.subTest(data=data, error_handler=error_handler,
3340                              expected=expected):
3341                self.assertEqual(data.encode('latin1', error_handler),
3342                                 expected)
3343
3344    def test_encode_surrogateescape_error(self):
3345        with self.assertRaises(UnicodeEncodeError):
3346            # the first character can be decoded, but not the second
3347            '\udc80\u20ac'.encode('latin1', 'surrogateescape')
3348
3349    def test_decode(self):
3350        for data, expected in (
3351            (b'abc', 'abc'),
3352            (b'[\x80\xff]', '[\x80\xff]'),
3353        ):
3354            with self.subTest(data=data, expected=expected):
3355                self.assertEqual(data.decode('latin1'), expected)
3356
3357
3358class StreamRecoderTest(unittest.TestCase):
3359    def test_writelines(self):
3360        bio = io.BytesIO()
3361        codec = codecs.lookup('ascii')
3362        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3363                                  encodings.ascii.StreamReader, encodings.ascii.StreamWriter)
3364        sr.writelines([b'a', b'b'])
3365        self.assertEqual(bio.getvalue(), b'ab')
3366
3367    def test_write(self):
3368        bio = io.BytesIO()
3369        codec = codecs.lookup('latin1')
3370        # Recode from Latin-1 to utf-8.
3371        sr = codecs.StreamRecoder(bio, codec.encode, codec.decode,
3372                                  encodings.utf_8.StreamReader, encodings.utf_8.StreamWriter)
3373
3374        text = 'àñé'
3375        sr.write(text.encode('latin1'))
3376        self.assertEqual(bio.getvalue(), text.encode('utf-8'))
3377
3378    def test_seeking_read(self):
3379        bio = io.BytesIO('line1\nline2\nline3\n'.encode('utf-16-le'))
3380        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3381
3382        self.assertEqual(sr.readline(), b'line1\n')
3383        sr.seek(0)
3384        self.assertEqual(sr.readline(), b'line1\n')
3385        self.assertEqual(sr.readline(), b'line2\n')
3386        self.assertEqual(sr.readline(), b'line3\n')
3387        self.assertEqual(sr.readline(), b'')
3388
3389    def test_seeking_write(self):
3390        bio = io.BytesIO('123456789\n'.encode('utf-16-le'))
3391        sr = codecs.EncodedFile(bio, 'utf-8', 'utf-16-le')
3392
3393        # Test that seek() only resets its internal buffer when offset
3394        # and whence are zero.
3395        sr.seek(2)
3396        sr.write(b'\nabc\n')
3397        self.assertEqual(sr.readline(), b'789\n')
3398        sr.seek(0)
3399        self.assertEqual(sr.readline(), b'1\n')
3400        self.assertEqual(sr.readline(), b'abc\n')
3401        self.assertEqual(sr.readline(), b'789\n')
3402
3403
3404@unittest.skipIf(_testinternalcapi is None, 'need _testinternalcapi module')
3405class LocaleCodecTest(unittest.TestCase):
3406    """
3407    Test indirectly _Py_DecodeUTF8Ex() and _Py_EncodeUTF8Ex().
3408    """
3409    ENCODING = sys.getfilesystemencoding()
3410    STRINGS = ("ascii", "ulatin1:\xa7\xe9",
3411               "u255:\xff",
3412               "UCS:\xe9\u20ac\U0010ffff",
3413               "surrogates:\uDC80\uDCFF")
3414    BYTES_STRINGS = (b"blatin1:\xa7\xe9", b"b255:\xff")
3415    SURROGATES = "\uDC80\uDCFF"
3416
3417    def encode(self, text, errors="strict"):
3418        return _testinternalcapi.EncodeLocaleEx(text, 0, errors)
3419
3420    def check_encode_strings(self, errors):
3421        for text in self.STRINGS:
3422            with self.subTest(text=text):
3423                try:
3424                    expected = text.encode(self.ENCODING, errors)
3425                except UnicodeEncodeError:
3426                    with self.assertRaises(RuntimeError) as cm:
3427                        self.encode(text, errors)
3428                    errmsg = str(cm.exception)
3429                    self.assertRegex(errmsg, r"encode error: pos=[0-9]+, reason=")
3430                else:
3431                    encoded = self.encode(text, errors)
3432                    self.assertEqual(encoded, expected)
3433
3434    def test_encode_strict(self):
3435        self.check_encode_strings("strict")
3436
3437    def test_encode_surrogateescape(self):
3438        self.check_encode_strings("surrogateescape")
3439
3440    def test_encode_surrogatepass(self):
3441        try:
3442            self.encode('', 'surrogatepass')
3443        except ValueError as exc:
3444            if str(exc) == 'unsupported error handler':
3445                self.skipTest(f"{self.ENCODING!r} encoder doesn't support "
3446                              f"surrogatepass error handler")
3447            else:
3448                raise
3449
3450        self.check_encode_strings("surrogatepass")
3451
3452    def test_encode_unsupported_error_handler(self):
3453        with self.assertRaises(ValueError) as cm:
3454            self.encode('', 'backslashreplace')
3455        self.assertEqual(str(cm.exception), 'unsupported error handler')
3456
3457    def decode(self, encoded, errors="strict"):
3458        return _testinternalcapi.DecodeLocaleEx(encoded, 0, errors)
3459
3460    def check_decode_strings(self, errors):
3461        is_utf8 = (self.ENCODING == "utf-8")
3462        if is_utf8:
3463            encode_errors = 'surrogateescape'
3464        else:
3465            encode_errors = 'strict'
3466
3467        strings = list(self.BYTES_STRINGS)
3468        for text in self.STRINGS:
3469            try:
3470                encoded = text.encode(self.ENCODING, encode_errors)
3471                if encoded not in strings:
3472                    strings.append(encoded)
3473            except UnicodeEncodeError:
3474                encoded = None
3475
3476            if is_utf8:
3477                encoded2 = text.encode(self.ENCODING, 'surrogatepass')
3478                if encoded2 != encoded:
3479                    strings.append(encoded2)
3480
3481        for encoded in strings:
3482            with self.subTest(encoded=encoded):
3483                try:
3484                    expected = encoded.decode(self.ENCODING, errors)
3485                except UnicodeDecodeError:
3486                    with self.assertRaises(RuntimeError) as cm:
3487                        self.decode(encoded, errors)
3488                    errmsg = str(cm.exception)
3489                    self.assertTrue(errmsg.startswith("decode error: "), errmsg)
3490                else:
3491                    decoded = self.decode(encoded, errors)
3492                    self.assertEqual(decoded, expected)
3493
3494    def test_decode_strict(self):
3495        self.check_decode_strings("strict")
3496
3497    def test_decode_surrogateescape(self):
3498        self.check_decode_strings("surrogateescape")
3499
3500    def test_decode_surrogatepass(self):
3501        try:
3502            self.decode(b'', 'surrogatepass')
3503        except ValueError as exc:
3504            if str(exc) == 'unsupported error handler':
3505                self.skipTest(f"{self.ENCODING!r} decoder doesn't support "
3506                              f"surrogatepass error handler")
3507            else:
3508                raise
3509
3510        self.check_decode_strings("surrogatepass")
3511
3512    def test_decode_unsupported_error_handler(self):
3513        with self.assertRaises(ValueError) as cm:
3514            self.decode(b'', 'backslashreplace')
3515        self.assertEqual(str(cm.exception), 'unsupported error handler')
3516
3517
3518class Rot13Test(unittest.TestCase):
3519    """Test the educational ROT-13 codec."""
3520    def test_encode(self):
3521        ciphertext = codecs.encode("Caesar liked ciphers", 'rot-13')
3522        self.assertEqual(ciphertext, 'Pnrfne yvxrq pvcuref')
3523
3524    def test_decode(self):
3525        plaintext = codecs.decode('Rg gh, Oehgr?', 'rot-13')
3526        self.assertEqual(plaintext, 'Et tu, Brute?')
3527
3528    def test_incremental_encode(self):
3529        encoder = codecs.getincrementalencoder('rot-13')()
3530        ciphertext = encoder.encode('ABBA nag Cheryl Baker')
3531        self.assertEqual(ciphertext, 'NOON ant Purely Onxre')
3532
3533    def test_incremental_decode(self):
3534        decoder = codecs.getincrementaldecoder('rot-13')()
3535        plaintext = decoder.decode('terra Ares envy tha')
3536        self.assertEqual(plaintext, 'green Nerf rail gun')
3537
3538
3539class Rot13UtilTest(unittest.TestCase):
3540    """Test the ROT-13 codec via rot13 function,
3541    i.e. the user has done something like:
3542    $ echo "Hello World" | python -m encodings.rot_13
3543    """
3544    def test_rot13_func(self):
3545        infile = io.StringIO('Gb or, be abg gb or, gung vf gur dhrfgvba')
3546        outfile = io.StringIO()
3547        encodings.rot_13.rot13(infile, outfile)
3548        outfile.seek(0)
3549        plain_text = outfile.read()
3550        self.assertEqual(
3551            plain_text,
3552            'To be, or not to be, that is the question')
3553
3554
3555class CodecNameNormalizationTest(unittest.TestCase):
3556    """Test codec name normalization"""
3557    def test_codecs_lookup(self):
3558        FOUND = (1, 2, 3, 4)
3559        NOT_FOUND = (None, None, None, None)
3560        def search_function(encoding):
3561            if encoding == "aaa_8":
3562                return FOUND
3563            else:
3564                return NOT_FOUND
3565
3566        codecs.register(search_function)
3567        self.addCleanup(codecs.unregister, search_function)
3568        self.assertEqual(FOUND, codecs.lookup('aaa_8'))
3569        self.assertEqual(FOUND, codecs.lookup('AAA-8'))
3570        self.assertEqual(FOUND, codecs.lookup('AAA---8'))
3571        self.assertEqual(FOUND, codecs.lookup('AAA   8'))
3572        self.assertEqual(FOUND, codecs.lookup('aaa\xe9\u20ac-8'))
3573        self.assertEqual(NOT_FOUND, codecs.lookup('AAA.8'))
3574        self.assertEqual(NOT_FOUND, codecs.lookup('AAA...8'))
3575        self.assertEqual(NOT_FOUND, codecs.lookup('BBB-8'))
3576        self.assertEqual(NOT_FOUND, codecs.lookup('BBB.8'))
3577        self.assertEqual(NOT_FOUND, codecs.lookup('a\xe9\u20ac-8'))
3578
3579    def test_encodings_normalize_encoding(self):
3580        # encodings.normalize_encoding() ignores non-ASCII characters.
3581        normalize = encodings.normalize_encoding
3582        self.assertEqual(normalize('utf_8'), 'utf_8')
3583        self.assertEqual(normalize('utf\xE9\u20AC\U0010ffff-8'), 'utf_8')
3584        self.assertEqual(normalize('utf   8'), 'utf_8')
3585        # encodings.normalize_encoding() doesn't convert
3586        # characters to lower case.
3587        self.assertEqual(normalize('UTF 8'), 'UTF_8')
3588        self.assertEqual(normalize('utf.8'), 'utf.8')
3589        self.assertEqual(normalize('utf...8'), 'utf...8')
3590
3591
3592if __name__ == "__main__":
3593    unittest.main()
3594