1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""pw_console test mode functions.""" 15 16import asyncio 17import time 18import re 19import random 20import logging 21from threading import Thread 22from typing import Dict, List, Tuple 23 24FAKE_DEVICE_LOGGER_NAME = 'pw_console_fake_device' 25 26_ROOT_LOG = logging.getLogger('') 27_FAKE_DEVICE_LOG = logging.getLogger(FAKE_DEVICE_LOGGER_NAME) 28 29 30def start_fake_logger(lines, log_thread_entry, log_thread_loop): 31 fake_log_messages = prepare_fake_logs(lines) 32 33 test_log_thread = Thread(target=log_thread_entry, args=(), daemon=True) 34 test_log_thread.start() 35 36 background_log_task = asyncio.run_coroutine_threadsafe( 37 # This function will be executed in a separate thread. 38 log_forever(fake_log_messages), 39 # Using this asyncio event loop. 40 log_thread_loop, 41 ) # type: ignore 42 return background_log_task 43 44 45def prepare_fake_logs(lines) -> List[Tuple[str, Dict]]: 46 fake_logs: List[Tuple[str, Dict]] = [] 47 key_regex = re.compile(r':kbd:`(?P<key>[^`]+)`') 48 for line in lines: 49 if not line: 50 continue 51 52 keyboard_key = '' 53 search = key_regex.search(line) 54 if search: 55 keyboard_key = search.group(1) 56 57 fake_logs.append((line, {'keys': keyboard_key})) 58 return fake_logs 59 60 61async def log_forever(fake_log_messages: List[Tuple[str, Dict]]): 62 """Test mode async log generator coroutine that runs forever.""" 63 _ROOT_LOG.info('Fake log device connected.') 64 start_time = time.time() 65 message_count = 0 66 67 # Fake module column names. 68 module_names = ['APP', 'RADIO', 'BAT', 'USB', 'CPU'] 69 while True: 70 if message_count > 32 or message_count < 2: 71 await asyncio.sleep(0.1) 72 fake_log = random.choice(fake_log_messages) 73 74 module_name = module_names[message_count % len(module_names)] 75 _FAKE_DEVICE_LOG.info( 76 fake_log[0], 77 extra=dict( 78 extra_metadata_fields=dict( 79 module=module_name, 80 file='fake_app.cc', 81 timestamp=time.time() - start_time, 82 **fake_log[1], 83 ) 84 ), 85 ) 86 message_count += 1 87