• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import platform
3import queue
4import re
5import subprocess
6import sys
7import unittest
8from _android_support import TextLogStream
9from array import array
10from contextlib import ExitStack, contextmanager
11from threading import Thread
12from test.support import LOOPBACK_TIMEOUT
13from time import time
14from unittest.mock import patch
15
16
17if sys.platform != "android":
18    raise unittest.SkipTest("Android-specific")
19
20api_level = platform.android_ver().api_level
21
22# (name, level, fileno)
23STREAM_INFO = [("stdout", "I", 1), ("stderr", "W", 2)]
24
25
26# Test redirection of stdout and stderr to the Android log.
27@unittest.skipIf(
28    api_level < 23 and platform.machine() == "aarch64",
29    "SELinux blocks reading logs on older ARM64 emulators"
30)
31class TestAndroidOutput(unittest.TestCase):
32    maxDiff = None
33
34    def setUp(self):
35        self.logcat_process = subprocess.Popen(
36            ["logcat", "-v", "tag"], stdout=subprocess.PIPE,
37            errors="backslashreplace"
38        )
39        self.logcat_queue = queue.Queue()
40
41        def logcat_thread():
42            for line in self.logcat_process.stdout:
43                self.logcat_queue.put(line.rstrip("\n"))
44            self.logcat_process.stdout.close()
45        self.logcat_thread = Thread(target=logcat_thread)
46        self.logcat_thread.start()
47
48        from ctypes import CDLL, c_char_p, c_int
49        android_log_write = getattr(CDLL("liblog.so"), "__android_log_write")
50        android_log_write.argtypes = (c_int, c_char_p, c_char_p)
51        ANDROID_LOG_INFO = 4
52
53        # Separate tests using a marker line with a different tag.
54        tag, message = "python.test", f"{self.id()} {time()}"
55        android_log_write(
56            ANDROID_LOG_INFO, tag.encode("UTF-8"), message.encode("UTF-8"))
57        self.assert_log("I", tag, message, skip=True, timeout=5)
58
59    def assert_logs(self, level, tag, expected, **kwargs):
60        for line in expected:
61            self.assert_log(level, tag, line, **kwargs)
62
63    def assert_log(self, level, tag, expected, *, skip=False, timeout=0.5):
64        deadline = time() + timeout
65        while True:
66            try:
67                line = self.logcat_queue.get(timeout=(deadline - time()))
68            except queue.Empty:
69                self.fail(f"line not found: {expected!r}")
70            if match := re.fullmatch(fr"(.)/{tag}: (.*)", line):
71                try:
72                    self.assertEqual(level, match[1])
73                    self.assertEqual(expected, match[2])
74                    break
75                except AssertionError:
76                    if not skip:
77                        raise
78
79    def tearDown(self):
80        self.logcat_process.terminate()
81        self.logcat_process.wait(LOOPBACK_TIMEOUT)
82        self.logcat_thread.join(LOOPBACK_TIMEOUT)
83
84    @contextmanager
85    def unbuffered(self, stream):
86        stream.reconfigure(write_through=True)
87        try:
88            yield
89        finally:
90            stream.reconfigure(write_through=False)
91
92    # In --verbose3 mode, sys.stdout and sys.stderr are captured, so we can't
93    # test them directly. Detect this mode and use some temporary streams with
94    # the same properties.
95    def stream_context(self, stream_name, level):
96        # https://developer.android.com/ndk/reference/group/logging
97        prio = {"I": 4, "W": 5}[level]
98
99        stack = ExitStack()
100        stack.enter_context(self.subTest(stream_name))
101        stream = getattr(sys, stream_name)
102        native_stream = getattr(sys, f"__{stream_name}__")
103        if isinstance(stream, io.StringIO):
104            stack.enter_context(
105                patch(
106                    f"sys.{stream_name}",
107                    TextLogStream(
108                        prio, f"python.{stream_name}", native_stream.fileno(),
109                        errors="backslashreplace"
110                    ),
111                )
112            )
113        return stack
114
115    def test_str(self):
116        for stream_name, level, fileno in STREAM_INFO:
117            with self.stream_context(stream_name, level):
118                stream = getattr(sys, stream_name)
119                tag = f"python.{stream_name}"
120                self.assertEqual(f"<TextLogStream '{tag}'>", repr(stream))
121
122                self.assertIs(stream.writable(), True)
123                self.assertIs(stream.readable(), False)
124                self.assertEqual(stream.fileno(), fileno)
125                self.assertEqual("UTF-8", stream.encoding)
126                self.assertEqual("backslashreplace", stream.errors)
127                self.assertIs(stream.line_buffering, True)
128                self.assertIs(stream.write_through, False)
129
130                def write(s, lines=None, *, write_len=None):
131                    if write_len is None:
132                        write_len = len(s)
133                    self.assertEqual(write_len, stream.write(s))
134                    if lines is None:
135                        lines = [s]
136                    self.assert_logs(level, tag, lines)
137
138                # Single-line messages,
139                with self.unbuffered(stream):
140                    write("", [])
141
142                    write("a")
143                    write("Hello")
144                    write("Hello world")
145                    write(" ")
146                    write("  ")
147
148                    # Non-ASCII text
149                    write("ol\u00e9")  # Spanish
150                    write("\u4e2d\u6587")  # Chinese
151
152                    # Non-BMP emoji
153                    write("\U0001f600")
154
155                    # Non-encodable surrogates
156                    write("\ud800\udc00", [r"\ud800\udc00"])
157
158                    # Code used by surrogateescape (which isn't enabled here)
159                    write("\udc80", [r"\udc80"])
160
161                    # Null characters are logged using "modified UTF-8".
162                    write("\u0000", [r"\xc0\x80"])
163                    write("a\u0000", [r"a\xc0\x80"])
164                    write("\u0000b", [r"\xc0\x80b"])
165                    write("a\u0000b", [r"a\xc0\x80b"])
166
167                # Multi-line messages. Avoid identical consecutive lines, as
168                # they may activate "chatty" filtering and break the tests.
169                write("\nx", [""])
170                write("\na\n", ["x", "a"])
171                write("\n", [""])
172                write("b\n", ["b"])
173                write("c\n\n", ["c", ""])
174                write("d\ne", ["d"])
175                write("xx", [])
176                write("f\n\ng", ["exxf", ""])
177                write("\n", ["g"])
178
179                # Since this is a line-based logging system, line buffering
180                # cannot be turned off, i.e. a newline always causes a flush.
181                stream.reconfigure(line_buffering=False)
182                self.assertIs(stream.line_buffering, True)
183
184                # However, buffering can be turned off completely if you want a
185                # flush after every write.
186                with self.unbuffered(stream):
187                    write("\nx", ["", "x"])
188                    write("\na\n", ["", "a"])
189                    write("\n", [""])
190                    write("b\n", ["b"])
191                    write("c\n\n", ["c", ""])
192                    write("d\ne", ["d", "e"])
193                    write("xx", ["xx"])
194                    write("f\n\ng", ["f", "", "g"])
195                    write("\n", [""])
196
197                # "\r\n" should be translated into "\n".
198                write("hello\r\n", ["hello"])
199                write("hello\r\nworld\r\n", ["hello", "world"])
200                write("\r\n", [""])
201
202                # Non-standard line separators should be preserved.
203                write("before form feed\x0cafter form feed\n",
204                      ["before form feed\x0cafter form feed"])
205                write("before line separator\u2028after line separator\n",
206                      ["before line separator\u2028after line separator"])
207
208                # String subclasses are accepted, but they should be converted
209                # to a standard str without calling any of their methods.
210                class CustomStr(str):
211                    def splitlines(self, *args, **kwargs):
212                        raise AssertionError()
213
214                    def __len__(self):
215                        raise AssertionError()
216
217                    def __str__(self):
218                        raise AssertionError()
219
220                write(CustomStr("custom\n"), ["custom"], write_len=7)
221
222                # Non-string classes are not accepted.
223                for obj in [b"", b"hello", None, 42]:
224                    with self.subTest(obj=obj):
225                        with self.assertRaisesRegex(
226                            TypeError,
227                            fr"write\(\) argument must be str, not "
228                            fr"{type(obj).__name__}"
229                        ):
230                            stream.write(obj)
231
232                # Manual flushing is supported.
233                write("hello", [])
234                stream.flush()
235                self.assert_log(level, tag, "hello")
236                write("hello", [])
237                write("world", [])
238                stream.flush()
239                self.assert_log(level, tag, "helloworld")
240
241                # Long lines are split into blocks of 1000 characters
242                # (MAX_CHARS_PER_WRITE in _android_support.py), but
243                # TextIOWrapper should then join them back together as much as
244                # possible without exceeding 4000 UTF-8 bytes
245                # (MAX_BYTES_PER_WRITE).
246                #
247                # ASCII (1 byte per character)
248                write(("foobar" * 700) + "\n",  # 4200 bytes in
249                      [("foobar" * 666) + "foob",  # 4000 bytes out
250                       "ar" + ("foobar" * 33)])  # 200 bytes out
251
252                # "Full-width" digits 0-9 (3 bytes per character)
253                s = "\uff10\uff11\uff12\uff13\uff14\uff15\uff16\uff17\uff18\uff19"
254                write((s * 150) + "\n",  # 4500 bytes in
255                      [s * 100,  # 3000 bytes out
256                       s * 50])  # 1500 bytes out
257
258                s = "0123456789"
259                write(s * 200, [])  # 2000 bytes in
260                write(s * 150, [])  # 1500 bytes in
261                write(s * 51, [s * 350])  # 510 bytes in, 3500 bytes out
262                write("\n", [s * 51])  # 0 bytes in, 510 bytes out
263
264    def test_bytes(self):
265        for stream_name, level, fileno in STREAM_INFO:
266            with self.stream_context(stream_name, level):
267                stream = getattr(sys, stream_name).buffer
268                tag = f"python.{stream_name}"
269                self.assertEqual(f"<BinaryLogStream '{tag}'>", repr(stream))
270                self.assertIs(stream.writable(), True)
271                self.assertIs(stream.readable(), False)
272                self.assertEqual(stream.fileno(), fileno)
273
274                def write(b, lines=None, *, write_len=None):
275                    if write_len is None:
276                        write_len = len(b)
277                    self.assertEqual(write_len, stream.write(b))
278                    if lines is None:
279                        lines = [b.decode()]
280                    self.assert_logs(level, tag, lines)
281
282                # Single-line messages,
283                write(b"", [])
284
285                write(b"a")
286                write(b"Hello")
287                write(b"Hello world")
288                write(b" ")
289                write(b"  ")
290
291                # Non-ASCII text
292                write(b"ol\xc3\xa9")  # Spanish
293                write(b"\xe4\xb8\xad\xe6\x96\x87")  # Chinese
294
295                # Non-BMP emoji
296                write(b"\xf0\x9f\x98\x80")
297
298                # Null bytes are logged using "modified UTF-8".
299                write(b"\x00", [r"\xc0\x80"])
300                write(b"a\x00", [r"a\xc0\x80"])
301                write(b"\x00b", [r"\xc0\x80b"])
302                write(b"a\x00b", [r"a\xc0\x80b"])
303
304                # Invalid UTF-8
305                write(b"\xff", [r"\xff"])
306                write(b"a\xff", [r"a\xff"])
307                write(b"\xffb", [r"\xffb"])
308                write(b"a\xffb", [r"a\xffb"])
309
310                # Log entries containing newlines are shown differently by
311                # `logcat -v tag`, `logcat -v long`, and Android Studio. We
312                # currently use `logcat -v tag`, which shows each line as if it
313                # was a separate log entry, but strips a single trailing
314                # newline.
315                #
316                # On newer versions of Android, all three of the above tools (or
317                # maybe Logcat itself) will also strip any number of leading
318                # newlines.
319                write(b"\nx", ["", "x"] if api_level < 30 else ["x"])
320                write(b"\na\n", ["", "a"] if api_level < 30 else ["a"])
321                write(b"\n", [""])
322                write(b"b\n", ["b"])
323                write(b"c\n\n", ["c", ""])
324                write(b"d\ne", ["d", "e"])
325                write(b"xx", ["xx"])
326                write(b"f\n\ng", ["f", "", "g"])
327                write(b"\n", [""])
328
329                # "\r\n" should be translated into "\n".
330                write(b"hello\r\n", ["hello"])
331                write(b"hello\r\nworld\r\n", ["hello", "world"])
332                write(b"\r\n", [""])
333
334                # Other bytes-like objects are accepted.
335                write(bytearray(b"bytearray"))
336
337                mv = memoryview(b"memoryview")
338                write(mv, ["memoryview"])  # Continuous
339                write(mv[::2], ["mmrve"])  # Discontinuous
340
341                write(
342                    # Android only supports little-endian architectures, so the
343                    # bytes representation is as follows:
344                    array("H", [
345                        0,      # 00 00
346                        1,      # 01 00
347                        65534,  # FE FF
348                        65535,  # FF FF
349                    ]),
350
351                    # After encoding null bytes with modified UTF-8, the only
352                    # valid UTF-8 sequence is \x01. All other bytes are handled
353                    # by backslashreplace.
354                    ["\\xc0\\x80\\xc0\\x80"
355                     "\x01\\xc0\\x80"
356                     "\\xfe\\xff"
357                     "\\xff\\xff"],
358                    write_len=8,
359                )
360
361                # Non-bytes-like classes are not accepted.
362                for obj in ["", "hello", None, 42]:
363                    with self.subTest(obj=obj):
364                        with self.assertRaisesRegex(
365                            TypeError,
366                            fr"write\(\) argument must be bytes-like, not "
367                            fr"{type(obj).__name__}"
368                        ):
369                            stream.write(obj)
370
371
372class TestAndroidRateLimit(unittest.TestCase):
373    def test_rate_limit(self):
374        # https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log_read.h;l=39
375        PER_MESSAGE_OVERHEAD = 28
376
377        # https://developer.android.com/ndk/reference/group/logging
378        ANDROID_LOG_DEBUG = 3
379
380        # To avoid flooding the test script output, use a different tag rather
381        # than stdout or stderr.
382        tag = "python.rate_limit"
383        stream = TextLogStream(ANDROID_LOG_DEBUG, tag)
384
385        # Make a test message which consumes 1 KB of the logcat buffer.
386        message = "Line {:03d} "
387        message += "." * (
388            1024 - PER_MESSAGE_OVERHEAD - len(tag) - len(message.format(0))
389        ) + "\n"
390
391        # To avoid depending on the performance of the test device, we mock the
392        # passage of time.
393        mock_now = time()
394
395        def mock_time():
396            # Avoid division by zero by simulating a small delay.
397            mock_sleep(0.0001)
398            return mock_now
399
400        def mock_sleep(duration):
401            nonlocal mock_now
402            mock_now += duration
403
404        # See _android_support.py. The default values of these parameters work
405        # well across a wide range of devices, but we'll use smaller values to
406        # ensure a quick and reliable test that doesn't flood the log too much.
407        MAX_KB_PER_SECOND = 100
408        BUCKET_KB = 10
409        with (
410            patch("_android_support.MAX_BYTES_PER_SECOND", MAX_KB_PER_SECOND * 1024),
411            patch("_android_support.BUCKET_SIZE", BUCKET_KB * 1024),
412            patch("_android_support.sleep", mock_sleep),
413            patch("_android_support.time", mock_time),
414        ):
415            # Make sure the token bucket is full.
416            stream.write("Initial message to reset _prev_write_time")
417            mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND)
418            line_num = 0
419
420            # Write BUCKET_KB messages, and return the rate at which they were
421            # accepted in KB per second.
422            def write_bucketful():
423                nonlocal line_num
424                start = mock_time()
425                max_line_num = line_num + BUCKET_KB
426                while line_num < max_line_num:
427                    stream.write(message.format(line_num))
428                    line_num += 1
429                return BUCKET_KB / (mock_time() - start)
430
431            # The first bucketful should be written with minimal delay. The
432            # factor of 2 here is not arbitrary: it verifies that the system can
433            # write fast enough to empty the bucket within two bucketfuls, which
434            # the next part of the test depends on.
435            self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2)
436
437            # Write another bucketful to empty the token bucket completely.
438            write_bucketful()
439
440            # The next bucketful should be written at the rate limit.
441            self.assertAlmostEqual(
442                write_bucketful(), MAX_KB_PER_SECOND,
443                delta=MAX_KB_PER_SECOND * 0.1
444            )
445
446            # Once the token bucket refills, we should go back to full speed.
447            mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND)
448            self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2)
449