1import io 2import platform 3import queue 4import re 5import subprocess 6import sys 7import unittest 8from _android_support import TextLogStream 9from array import array 10from contextlib import ExitStack, contextmanager 11from threading import Thread 12from test.support import LOOPBACK_TIMEOUT 13from time import time 14from unittest.mock import patch 15 16 17if sys.platform != "android": 18 raise unittest.SkipTest("Android-specific") 19 20api_level = platform.android_ver().api_level 21 22# (name, level, fileno) 23STREAM_INFO = [("stdout", "I", 1), ("stderr", "W", 2)] 24 25 26# Test redirection of stdout and stderr to the Android log. 27@unittest.skipIf( 28 api_level < 23 and platform.machine() == "aarch64", 29 "SELinux blocks reading logs on older ARM64 emulators" 30) 31class TestAndroidOutput(unittest.TestCase): 32 maxDiff = None 33 34 def setUp(self): 35 self.logcat_process = subprocess.Popen( 36 ["logcat", "-v", "tag"], stdout=subprocess.PIPE, 37 errors="backslashreplace" 38 ) 39 self.logcat_queue = queue.Queue() 40 41 def logcat_thread(): 42 for line in self.logcat_process.stdout: 43 self.logcat_queue.put(line.rstrip("\n")) 44 self.logcat_process.stdout.close() 45 self.logcat_thread = Thread(target=logcat_thread) 46 self.logcat_thread.start() 47 48 from ctypes import CDLL, c_char_p, c_int 49 android_log_write = getattr(CDLL("liblog.so"), "__android_log_write") 50 android_log_write.argtypes = (c_int, c_char_p, c_char_p) 51 ANDROID_LOG_INFO = 4 52 53 # Separate tests using a marker line with a different tag. 54 tag, message = "python.test", f"{self.id()} {time()}" 55 android_log_write( 56 ANDROID_LOG_INFO, tag.encode("UTF-8"), message.encode("UTF-8")) 57 self.assert_log("I", tag, message, skip=True, timeout=5) 58 59 def assert_logs(self, level, tag, expected, **kwargs): 60 for line in expected: 61 self.assert_log(level, tag, line, **kwargs) 62 63 def assert_log(self, level, tag, expected, *, skip=False, timeout=0.5): 64 deadline = time() + timeout 65 while True: 66 try: 67 line = self.logcat_queue.get(timeout=(deadline - time())) 68 except queue.Empty: 69 self.fail(f"line not found: {expected!r}") 70 if match := re.fullmatch(fr"(.)/{tag}: (.*)", line): 71 try: 72 self.assertEqual(level, match[1]) 73 self.assertEqual(expected, match[2]) 74 break 75 except AssertionError: 76 if not skip: 77 raise 78 79 def tearDown(self): 80 self.logcat_process.terminate() 81 self.logcat_process.wait(LOOPBACK_TIMEOUT) 82 self.logcat_thread.join(LOOPBACK_TIMEOUT) 83 84 @contextmanager 85 def unbuffered(self, stream): 86 stream.reconfigure(write_through=True) 87 try: 88 yield 89 finally: 90 stream.reconfigure(write_through=False) 91 92 # In --verbose3 mode, sys.stdout and sys.stderr are captured, so we can't 93 # test them directly. Detect this mode and use some temporary streams with 94 # the same properties. 95 def stream_context(self, stream_name, level): 96 # https://developer.android.com/ndk/reference/group/logging 97 prio = {"I": 4, "W": 5}[level] 98 99 stack = ExitStack() 100 stack.enter_context(self.subTest(stream_name)) 101 stream = getattr(sys, stream_name) 102 native_stream = getattr(sys, f"__{stream_name}__") 103 if isinstance(stream, io.StringIO): 104 stack.enter_context( 105 patch( 106 f"sys.{stream_name}", 107 TextLogStream( 108 prio, f"python.{stream_name}", native_stream.fileno(), 109 errors="backslashreplace" 110 ), 111 ) 112 ) 113 return stack 114 115 def test_str(self): 116 for stream_name, level, fileno in STREAM_INFO: 117 with self.stream_context(stream_name, level): 118 stream = getattr(sys, stream_name) 119 tag = f"python.{stream_name}" 120 self.assertEqual(f"<TextLogStream '{tag}'>", repr(stream)) 121 122 self.assertIs(stream.writable(), True) 123 self.assertIs(stream.readable(), False) 124 self.assertEqual(stream.fileno(), fileno) 125 self.assertEqual("UTF-8", stream.encoding) 126 self.assertEqual("backslashreplace", stream.errors) 127 self.assertIs(stream.line_buffering, True) 128 self.assertIs(stream.write_through, False) 129 130 def write(s, lines=None, *, write_len=None): 131 if write_len is None: 132 write_len = len(s) 133 self.assertEqual(write_len, stream.write(s)) 134 if lines is None: 135 lines = [s] 136 self.assert_logs(level, tag, lines) 137 138 # Single-line messages, 139 with self.unbuffered(stream): 140 write("", []) 141 142 write("a") 143 write("Hello") 144 write("Hello world") 145 write(" ") 146 write(" ") 147 148 # Non-ASCII text 149 write("ol\u00e9") # Spanish 150 write("\u4e2d\u6587") # Chinese 151 152 # Non-BMP emoji 153 write("\U0001f600") 154 155 # Non-encodable surrogates 156 write("\ud800\udc00", [r"\ud800\udc00"]) 157 158 # Code used by surrogateescape (which isn't enabled here) 159 write("\udc80", [r"\udc80"]) 160 161 # Null characters are logged using "modified UTF-8". 162 write("\u0000", [r"\xc0\x80"]) 163 write("a\u0000", [r"a\xc0\x80"]) 164 write("\u0000b", [r"\xc0\x80b"]) 165 write("a\u0000b", [r"a\xc0\x80b"]) 166 167 # Multi-line messages. Avoid identical consecutive lines, as 168 # they may activate "chatty" filtering and break the tests. 169 write("\nx", [""]) 170 write("\na\n", ["x", "a"]) 171 write("\n", [""]) 172 write("b\n", ["b"]) 173 write("c\n\n", ["c", ""]) 174 write("d\ne", ["d"]) 175 write("xx", []) 176 write("f\n\ng", ["exxf", ""]) 177 write("\n", ["g"]) 178 179 # Since this is a line-based logging system, line buffering 180 # cannot be turned off, i.e. a newline always causes a flush. 181 stream.reconfigure(line_buffering=False) 182 self.assertIs(stream.line_buffering, True) 183 184 # However, buffering can be turned off completely if you want a 185 # flush after every write. 186 with self.unbuffered(stream): 187 write("\nx", ["", "x"]) 188 write("\na\n", ["", "a"]) 189 write("\n", [""]) 190 write("b\n", ["b"]) 191 write("c\n\n", ["c", ""]) 192 write("d\ne", ["d", "e"]) 193 write("xx", ["xx"]) 194 write("f\n\ng", ["f", "", "g"]) 195 write("\n", [""]) 196 197 # "\r\n" should be translated into "\n". 198 write("hello\r\n", ["hello"]) 199 write("hello\r\nworld\r\n", ["hello", "world"]) 200 write("\r\n", [""]) 201 202 # Non-standard line separators should be preserved. 203 write("before form feed\x0cafter form feed\n", 204 ["before form feed\x0cafter form feed"]) 205 write("before line separator\u2028after line separator\n", 206 ["before line separator\u2028after line separator"]) 207 208 # String subclasses are accepted, but they should be converted 209 # to a standard str without calling any of their methods. 210 class CustomStr(str): 211 def splitlines(self, *args, **kwargs): 212 raise AssertionError() 213 214 def __len__(self): 215 raise AssertionError() 216 217 def __str__(self): 218 raise AssertionError() 219 220 write(CustomStr("custom\n"), ["custom"], write_len=7) 221 222 # Non-string classes are not accepted. 223 for obj in [b"", b"hello", None, 42]: 224 with self.subTest(obj=obj): 225 with self.assertRaisesRegex( 226 TypeError, 227 fr"write\(\) argument must be str, not " 228 fr"{type(obj).__name__}" 229 ): 230 stream.write(obj) 231 232 # Manual flushing is supported. 233 write("hello", []) 234 stream.flush() 235 self.assert_log(level, tag, "hello") 236 write("hello", []) 237 write("world", []) 238 stream.flush() 239 self.assert_log(level, tag, "helloworld") 240 241 # Long lines are split into blocks of 1000 characters 242 # (MAX_CHARS_PER_WRITE in _android_support.py), but 243 # TextIOWrapper should then join them back together as much as 244 # possible without exceeding 4000 UTF-8 bytes 245 # (MAX_BYTES_PER_WRITE). 246 # 247 # ASCII (1 byte per character) 248 write(("foobar" * 700) + "\n", # 4200 bytes in 249 [("foobar" * 666) + "foob", # 4000 bytes out 250 "ar" + ("foobar" * 33)]) # 200 bytes out 251 252 # "Full-width" digits 0-9 (3 bytes per character) 253 s = "\uff10\uff11\uff12\uff13\uff14\uff15\uff16\uff17\uff18\uff19" 254 write((s * 150) + "\n", # 4500 bytes in 255 [s * 100, # 3000 bytes out 256 s * 50]) # 1500 bytes out 257 258 s = "0123456789" 259 write(s * 200, []) # 2000 bytes in 260 write(s * 150, []) # 1500 bytes in 261 write(s * 51, [s * 350]) # 510 bytes in, 3500 bytes out 262 write("\n", [s * 51]) # 0 bytes in, 510 bytes out 263 264 def test_bytes(self): 265 for stream_name, level, fileno in STREAM_INFO: 266 with self.stream_context(stream_name, level): 267 stream = getattr(sys, stream_name).buffer 268 tag = f"python.{stream_name}" 269 self.assertEqual(f"<BinaryLogStream '{tag}'>", repr(stream)) 270 self.assertIs(stream.writable(), True) 271 self.assertIs(stream.readable(), False) 272 self.assertEqual(stream.fileno(), fileno) 273 274 def write(b, lines=None, *, write_len=None): 275 if write_len is None: 276 write_len = len(b) 277 self.assertEqual(write_len, stream.write(b)) 278 if lines is None: 279 lines = [b.decode()] 280 self.assert_logs(level, tag, lines) 281 282 # Single-line messages, 283 write(b"", []) 284 285 write(b"a") 286 write(b"Hello") 287 write(b"Hello world") 288 write(b" ") 289 write(b" ") 290 291 # Non-ASCII text 292 write(b"ol\xc3\xa9") # Spanish 293 write(b"\xe4\xb8\xad\xe6\x96\x87") # Chinese 294 295 # Non-BMP emoji 296 write(b"\xf0\x9f\x98\x80") 297 298 # Null bytes are logged using "modified UTF-8". 299 write(b"\x00", [r"\xc0\x80"]) 300 write(b"a\x00", [r"a\xc0\x80"]) 301 write(b"\x00b", [r"\xc0\x80b"]) 302 write(b"a\x00b", [r"a\xc0\x80b"]) 303 304 # Invalid UTF-8 305 write(b"\xff", [r"\xff"]) 306 write(b"a\xff", [r"a\xff"]) 307 write(b"\xffb", [r"\xffb"]) 308 write(b"a\xffb", [r"a\xffb"]) 309 310 # Log entries containing newlines are shown differently by 311 # `logcat -v tag`, `logcat -v long`, and Android Studio. We 312 # currently use `logcat -v tag`, which shows each line as if it 313 # was a separate log entry, but strips a single trailing 314 # newline. 315 # 316 # On newer versions of Android, all three of the above tools (or 317 # maybe Logcat itself) will also strip any number of leading 318 # newlines. 319 write(b"\nx", ["", "x"] if api_level < 30 else ["x"]) 320 write(b"\na\n", ["", "a"] if api_level < 30 else ["a"]) 321 write(b"\n", [""]) 322 write(b"b\n", ["b"]) 323 write(b"c\n\n", ["c", ""]) 324 write(b"d\ne", ["d", "e"]) 325 write(b"xx", ["xx"]) 326 write(b"f\n\ng", ["f", "", "g"]) 327 write(b"\n", [""]) 328 329 # "\r\n" should be translated into "\n". 330 write(b"hello\r\n", ["hello"]) 331 write(b"hello\r\nworld\r\n", ["hello", "world"]) 332 write(b"\r\n", [""]) 333 334 # Other bytes-like objects are accepted. 335 write(bytearray(b"bytearray")) 336 337 mv = memoryview(b"memoryview") 338 write(mv, ["memoryview"]) # Continuous 339 write(mv[::2], ["mmrve"]) # Discontinuous 340 341 write( 342 # Android only supports little-endian architectures, so the 343 # bytes representation is as follows: 344 array("H", [ 345 0, # 00 00 346 1, # 01 00 347 65534, # FE FF 348 65535, # FF FF 349 ]), 350 351 # After encoding null bytes with modified UTF-8, the only 352 # valid UTF-8 sequence is \x01. All other bytes are handled 353 # by backslashreplace. 354 ["\\xc0\\x80\\xc0\\x80" 355 "\x01\\xc0\\x80" 356 "\\xfe\\xff" 357 "\\xff\\xff"], 358 write_len=8, 359 ) 360 361 # Non-bytes-like classes are not accepted. 362 for obj in ["", "hello", None, 42]: 363 with self.subTest(obj=obj): 364 with self.assertRaisesRegex( 365 TypeError, 366 fr"write\(\) argument must be bytes-like, not " 367 fr"{type(obj).__name__}" 368 ): 369 stream.write(obj) 370 371 372class TestAndroidRateLimit(unittest.TestCase): 373 def test_rate_limit(self): 374 # https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log_read.h;l=39 375 PER_MESSAGE_OVERHEAD = 28 376 377 # https://developer.android.com/ndk/reference/group/logging 378 ANDROID_LOG_DEBUG = 3 379 380 # To avoid flooding the test script output, use a different tag rather 381 # than stdout or stderr. 382 tag = "python.rate_limit" 383 stream = TextLogStream(ANDROID_LOG_DEBUG, tag) 384 385 # Make a test message which consumes 1 KB of the logcat buffer. 386 message = "Line {:03d} " 387 message += "." * ( 388 1024 - PER_MESSAGE_OVERHEAD - len(tag) - len(message.format(0)) 389 ) + "\n" 390 391 # To avoid depending on the performance of the test device, we mock the 392 # passage of time. 393 mock_now = time() 394 395 def mock_time(): 396 # Avoid division by zero by simulating a small delay. 397 mock_sleep(0.0001) 398 return mock_now 399 400 def mock_sleep(duration): 401 nonlocal mock_now 402 mock_now += duration 403 404 # See _android_support.py. The default values of these parameters work 405 # well across a wide range of devices, but we'll use smaller values to 406 # ensure a quick and reliable test that doesn't flood the log too much. 407 MAX_KB_PER_SECOND = 100 408 BUCKET_KB = 10 409 with ( 410 patch("_android_support.MAX_BYTES_PER_SECOND", MAX_KB_PER_SECOND * 1024), 411 patch("_android_support.BUCKET_SIZE", BUCKET_KB * 1024), 412 patch("_android_support.sleep", mock_sleep), 413 patch("_android_support.time", mock_time), 414 ): 415 # Make sure the token bucket is full. 416 stream.write("Initial message to reset _prev_write_time") 417 mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND) 418 line_num = 0 419 420 # Write BUCKET_KB messages, and return the rate at which they were 421 # accepted in KB per second. 422 def write_bucketful(): 423 nonlocal line_num 424 start = mock_time() 425 max_line_num = line_num + BUCKET_KB 426 while line_num < max_line_num: 427 stream.write(message.format(line_num)) 428 line_num += 1 429 return BUCKET_KB / (mock_time() - start) 430 431 # The first bucketful should be written with minimal delay. The 432 # factor of 2 here is not arbitrary: it verifies that the system can 433 # write fast enough to empty the bucket within two bucketfuls, which 434 # the next part of the test depends on. 435 self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) 436 437 # Write another bucketful to empty the token bucket completely. 438 write_bucketful() 439 440 # The next bucketful should be written at the rate limit. 441 self.assertAlmostEqual( 442 write_bucketful(), MAX_KB_PER_SECOND, 443 delta=MAX_KB_PER_SECOND * 0.1 444 ) 445 446 # Once the token bucket refills, we should go back to full speed. 447 mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND) 448 self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) 449