1# 2# multibytecodec_support.py 3# Common Unittest Routines for CJK codecs 4# 5 6import codecs 7import os 8import re 9import sys 10import unittest 11from http.client import HTTPException 12from test import support 13from io import BytesIO 14 15class TestBase: 16 encoding = '' # codec name 17 codec = None # codec tuple (with 4 elements) 18 tstring = None # must set. 2 strings to test StreamReader 19 20 codectests = None # must set. codec test tuple 21 roundtriptest = 1 # set if roundtrip is possible with unicode 22 has_iso10646 = 0 # set if this encoding contains whole iso10646 map 23 xmlcharnametest = None # string to test xmlcharrefreplace 24 unmappedunicode = '\udeee' # a unicode code point that is not mapped. 25 26 def setUp(self): 27 if self.codec is None: 28 self.codec = codecs.lookup(self.encoding) 29 self.encode = self.codec.encode 30 self.decode = self.codec.decode 31 self.reader = self.codec.streamreader 32 self.writer = self.codec.streamwriter 33 self.incrementalencoder = self.codec.incrementalencoder 34 self.incrementaldecoder = self.codec.incrementaldecoder 35 36 def test_chunkcoding(self): 37 tstring_lines = [] 38 for b in self.tstring: 39 lines = b.split(b"\n") 40 last = lines.pop() 41 assert last == b"" 42 lines = [line + b"\n" for line in lines] 43 tstring_lines.append(lines) 44 for native, utf8 in zip(*tstring_lines): 45 u = self.decode(native)[0] 46 self.assertEqual(u, utf8.decode('utf-8')) 47 if self.roundtriptest: 48 self.assertEqual(native, self.encode(u)[0]) 49 50 def test_errorhandle(self): 51 for source, scheme, expected in self.codectests: 52 if isinstance(source, bytes): 53 func = self.decode 54 else: 55 func = self.encode 56 if expected: 57 result = func(source, scheme)[0] 58 if func is self.decode: 59 self.assertTrue(type(result) is str, type(result)) 60 self.assertEqual(result, expected, 61 '%a.decode(%r, %r)=%a != %a' 62 % (source, self.encoding, scheme, result, 63 expected)) 64 else: 65 self.assertTrue(type(result) is bytes, type(result)) 66 self.assertEqual(result, expected, 67 '%a.encode(%r, %r)=%a != %a' 68 % (source, self.encoding, scheme, result, 69 expected)) 70 else: 71 self.assertRaises(UnicodeError, func, source, scheme) 72 73 def test_xmlcharrefreplace(self): 74 if self.has_iso10646: 75 self.skipTest('encoding contains full ISO 10646 map') 76 77 s = "\u0b13\u0b23\u0b60 nd eggs" 78 self.assertEqual( 79 self.encode(s, "xmlcharrefreplace")[0], 80 b"ଓଣୠ nd eggs" 81 ) 82 83 def test_customreplace_encode(self): 84 if self.has_iso10646: 85 self.skipTest('encoding contains full ISO 10646 map') 86 87 from html.entities import codepoint2name 88 89 def xmlcharnamereplace(exc): 90 if not isinstance(exc, UnicodeEncodeError): 91 raise TypeError("don't know how to handle %r" % exc) 92 l = [] 93 for c in exc.object[exc.start:exc.end]: 94 if ord(c) in codepoint2name: 95 l.append("&%s;" % codepoint2name[ord(c)]) 96 else: 97 l.append("&#%d;" % ord(c)) 98 return ("".join(l), exc.end) 99 100 codecs.register_error("test.xmlcharnamereplace", xmlcharnamereplace) 101 102 if self.xmlcharnametest: 103 sin, sout = self.xmlcharnametest 104 else: 105 sin = "\xab\u211c\xbb = \u2329\u1234\u232a" 106 sout = b"«ℜ» = ⟨ሴ⟩" 107 self.assertEqual(self.encode(sin, 108 "test.xmlcharnamereplace")[0], sout) 109 110 def test_callback_returns_bytes(self): 111 def myreplace(exc): 112 return (b"1234", exc.end) 113 codecs.register_error("test.cjktest", myreplace) 114 enc = self.encode("abc" + self.unmappedunicode + "def", "test.cjktest")[0] 115 self.assertEqual(enc, b"abc1234def") 116 117 def test_callback_wrong_objects(self): 118 def myreplace(exc): 119 return (ret, exc.end) 120 codecs.register_error("test.cjktest", myreplace) 121 122 for ret in ([1, 2, 3], [], None, object()): 123 self.assertRaises(TypeError, self.encode, self.unmappedunicode, 124 'test.cjktest') 125 126 def test_callback_long_index(self): 127 def myreplace(exc): 128 return ('x', int(exc.end)) 129 codecs.register_error("test.cjktest", myreplace) 130 self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh', 131 'test.cjktest'), (b'abcdxefgh', 9)) 132 133 def myreplace(exc): 134 return ('x', sys.maxsize + 1) 135 codecs.register_error("test.cjktest", myreplace) 136 self.assertRaises(IndexError, self.encode, self.unmappedunicode, 137 'test.cjktest') 138 139 def test_callback_None_index(self): 140 def myreplace(exc): 141 return ('x', None) 142 codecs.register_error("test.cjktest", myreplace) 143 self.assertRaises(TypeError, self.encode, self.unmappedunicode, 144 'test.cjktest') 145 146 def test_callback_backward_index(self): 147 def myreplace(exc): 148 if myreplace.limit > 0: 149 myreplace.limit -= 1 150 return ('REPLACED', 0) 151 else: 152 return ('TERMINAL', exc.end) 153 myreplace.limit = 3 154 codecs.register_error("test.cjktest", myreplace) 155 self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh', 156 'test.cjktest'), 157 (b'abcdREPLACEDabcdREPLACEDabcdREPLACEDabcdTERMINALefgh', 9)) 158 159 def test_callback_forward_index(self): 160 def myreplace(exc): 161 return ('REPLACED', exc.end + 2) 162 codecs.register_error("test.cjktest", myreplace) 163 self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh', 164 'test.cjktest'), (b'abcdREPLACEDgh', 9)) 165 166 def test_callback_index_outofbound(self): 167 def myreplace(exc): 168 return ('TERM', 100) 169 codecs.register_error("test.cjktest", myreplace) 170 self.assertRaises(IndexError, self.encode, self.unmappedunicode, 171 'test.cjktest') 172 173 def test_incrementalencoder(self): 174 UTF8Reader = codecs.getreader('utf-8') 175 for sizehint in [None] + list(range(1, 33)) + \ 176 [64, 128, 256, 512, 1024]: 177 istream = UTF8Reader(BytesIO(self.tstring[1])) 178 ostream = BytesIO() 179 encoder = self.incrementalencoder() 180 while 1: 181 if sizehint is not None: 182 data = istream.read(sizehint) 183 else: 184 data = istream.read() 185 186 if not data: 187 break 188 e = encoder.encode(data) 189 ostream.write(e) 190 191 self.assertEqual(ostream.getvalue(), self.tstring[0]) 192 193 def test_incrementaldecoder(self): 194 UTF8Writer = codecs.getwriter('utf-8') 195 for sizehint in [None, -1] + list(range(1, 33)) + \ 196 [64, 128, 256, 512, 1024]: 197 istream = BytesIO(self.tstring[0]) 198 ostream = UTF8Writer(BytesIO()) 199 decoder = self.incrementaldecoder() 200 while 1: 201 data = istream.read(sizehint) 202 if not data: 203 break 204 else: 205 u = decoder.decode(data) 206 ostream.write(u) 207 208 self.assertEqual(ostream.getvalue(), self.tstring[1]) 209 210 def test_incrementalencoder_error_callback(self): 211 inv = self.unmappedunicode 212 213 e = self.incrementalencoder() 214 self.assertRaises(UnicodeEncodeError, e.encode, inv, True) 215 216 e.errors = 'ignore' 217 self.assertEqual(e.encode(inv, True), b'') 218 219 e.reset() 220 def tempreplace(exc): 221 return ('called', exc.end) 222 codecs.register_error('test.incremental_error_callback', tempreplace) 223 e.errors = 'test.incremental_error_callback' 224 self.assertEqual(e.encode(inv, True), b'called') 225 226 # again 227 e.errors = 'ignore' 228 self.assertEqual(e.encode(inv, True), b'') 229 230 def test_streamreader(self): 231 UTF8Writer = codecs.getwriter('utf-8') 232 for name in ["read", "readline", "readlines"]: 233 for sizehint in [None, -1] + list(range(1, 33)) + \ 234 [64, 128, 256, 512, 1024]: 235 istream = self.reader(BytesIO(self.tstring[0])) 236 ostream = UTF8Writer(BytesIO()) 237 func = getattr(istream, name) 238 while 1: 239 data = func(sizehint) 240 if not data: 241 break 242 if name == "readlines": 243 ostream.writelines(data) 244 else: 245 ostream.write(data) 246 247 self.assertEqual(ostream.getvalue(), self.tstring[1]) 248 249 def test_streamwriter(self): 250 readfuncs = ('read', 'readline', 'readlines') 251 UTF8Reader = codecs.getreader('utf-8') 252 for name in readfuncs: 253 for sizehint in [None] + list(range(1, 33)) + \ 254 [64, 128, 256, 512, 1024]: 255 istream = UTF8Reader(BytesIO(self.tstring[1])) 256 ostream = self.writer(BytesIO()) 257 func = getattr(istream, name) 258 while 1: 259 if sizehint is not None: 260 data = func(sizehint) 261 else: 262 data = func() 263 264 if not data: 265 break 266 if name == "readlines": 267 ostream.writelines(data) 268 else: 269 ostream.write(data) 270 271 self.assertEqual(ostream.getvalue(), self.tstring[0]) 272 273 def test_streamwriter_reset_no_pending(self): 274 # Issue #23247: Calling reset() on a fresh StreamWriter instance 275 # (without pending data) must not crash 276 stream = BytesIO() 277 writer = self.writer(stream) 278 writer.reset() 279 280 def test_incrementalencoder_del_segfault(self): 281 e = self.incrementalencoder() 282 with self.assertRaises(AttributeError): 283 del e.errors 284 285 286class TestBase_Mapping(unittest.TestCase): 287 pass_enctest = [] 288 pass_dectest = [] 289 supmaps = [] 290 codectests = [] 291 292 def setUp(self): 293 try: 294 self.open_mapping_file().close() # test it to report the error early 295 except (OSError, HTTPException): 296 self.skipTest("Could not retrieve "+self.mapfileurl) 297 298 def open_mapping_file(self): 299 return support.open_urlresource(self.mapfileurl, encoding="utf-8") 300 301 def test_mapping_file(self): 302 if self.mapfileurl.endswith('.xml'): 303 self._test_mapping_file_ucm() 304 else: 305 self._test_mapping_file_plain() 306 307 def _test_mapping_file_plain(self): 308 def unichrs(s): 309 return ''.join(chr(int(x, 16)) for x in s.split('+')) 310 311 urt_wa = {} 312 313 with self.open_mapping_file() as f: 314 for line in f: 315 if not line: 316 break 317 data = line.split('#')[0].split() 318 if len(data) != 2: 319 continue 320 321 if data[0][:2] != '0x': 322 self.fail(f"Invalid line: {line!r}") 323 csetch = bytes.fromhex(data[0][2:]) 324 if len(csetch) == 1 and 0x80 <= csetch[0]: 325 continue 326 327 unich = unichrs(data[1]) 328 if ord(unich) == 0xfffd or unich in urt_wa: 329 continue 330 urt_wa[unich] = csetch 331 332 self._testpoint(csetch, unich) 333 334 def _test_mapping_file_ucm(self): 335 with self.open_mapping_file() as f: 336 ucmdata = f.read() 337 uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata) 338 for uni, coded in uc: 339 unich = chr(int(uni, 16)) 340 codech = bytes.fromhex(coded) 341 self._testpoint(codech, unich) 342 343 def test_mapping_supplemental(self): 344 for mapping in self.supmaps: 345 self._testpoint(*mapping) 346 347 def _testpoint(self, csetch, unich): 348 if (csetch, unich) not in self.pass_enctest: 349 self.assertEqual(unich.encode(self.encoding), csetch) 350 if (csetch, unich) not in self.pass_dectest: 351 self.assertEqual(str(csetch, self.encoding), unich) 352 353 def test_errorhandle(self): 354 for source, scheme, expected in self.codectests: 355 if isinstance(source, bytes): 356 func = source.decode 357 else: 358 func = source.encode 359 if expected: 360 if isinstance(source, bytes): 361 result = func(self.encoding, scheme) 362 self.assertTrue(type(result) is str, type(result)) 363 self.assertEqual(result, expected, 364 '%a.decode(%r, %r)=%a != %a' 365 % (source, self.encoding, scheme, result, 366 expected)) 367 else: 368 result = func(self.encoding, scheme) 369 self.assertTrue(type(result) is bytes, type(result)) 370 self.assertEqual(result, expected, 371 '%a.encode(%r, %r)=%a != %a' 372 % (source, self.encoding, scheme, result, 373 expected)) 374 else: 375 self.assertRaises(UnicodeError, func, self.encoding, scheme) 376 377def load_teststring(name): 378 dir = os.path.join(os.path.dirname(__file__), 'cjkencodings') 379 with open(os.path.join(dir, name + '.txt'), 'rb') as f: 380 encoded = f.read() 381 with open(os.path.join(dir, name + '-utf8.txt'), 'rb') as f: 382 utf8 = f.read() 383 return encoded, utf8 384