# Lint as: python2, python3 # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This module provides an object to record the output of command-line program. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import fcntl import logging import os import pty import re import subprocess import threading import time class OutputRecorderError(Exception): """An exception class for output_recorder module.""" pass class OutputRecorder(object): """A class used to record the output of command line program. A thread is dedicated to performing non-blocking reading of the command outpt in this class. Other possible approaches include 1. using gobject.io_add_watch() to register a callback and reading the output when available, or 2. using select.select() with a short timeout, and reading the output if available. However, the above two approaches are not very reliable. Hence, this approach using non-blocking reading is adopted. To prevent the block buffering of the command output, a pseudo terminal is created through pty.openpty(). This forces the line output. This class saves the output in self.contents so that it is easy to perform regular expression search(). The output is also saved in a file. """ DEFAULT_OPEN_MODE = 'a' START_DELAY_SECS = 1 # Delay after starting recording. STOP_DELAY_SECS = 1 # Delay before stopping recording. POLLING_DELAY_SECS = 0.1 # Delay before next polling. TMP_FILE = '/tmp/output_recorder.dat' def __init__(self, cmd, open_mode=DEFAULT_OPEN_MODE, start_delay_secs=START_DELAY_SECS, stop_delay_secs=STOP_DELAY_SECS, save_file=TMP_FILE): """Construction of output recorder. @param cmd: the command of which the output is to record. @param open_mode: the open mode for writing output to save_file. Could be either 'w' or 'a'. @param stop_delay_secs: the delay time before stopping the cmd. @param save_file: the file to save the output. """ self.cmd = cmd self.open_mode = open_mode self.start_delay_secs = start_delay_secs self.stop_delay_secs = stop_delay_secs self.save_file = save_file self.contents = [] # Create a thread dedicated to record the output. self._recording_thread = None self._stop_recording_thread_event = threading.Event() # Use pseudo terminal to prevent buffering of the program output. self._main, self._node = pty.openpty() self._output = os.fdopen(self._main) # Set non-blocking flag. fcntl.fcntl(self._output, fcntl.F_SETFL, os.O_NONBLOCK) def record(self): """Record the output of the cmd.""" logging.info('Recording output of "%s".', self.cmd) try: self._recorder = subprocess.Popen( self.cmd, stdout=self._node, stderr=self._node) except: raise OutputRecorderError('Failed to run "%s"' % self.cmd) with open(self.save_file, self.open_mode) as output_f: output_f.write(os.linesep + '*' * 80 + os.linesep) while True: try: # Perform non-blocking read. line = self._output.readline() except: # Set empty string if nothing to read. line = '' if line: output_f.write(line) output_f.flush() # The output, e.g. the output of btmon, may contain some # special unicode such that we would like to escape. # In this way, regular expression search could be conducted # properly. self.contents.append(line.encode('unicode-escape')) elif self._stop_recording_thread_event.is_set(): self._stop_recording_thread_event.clear() break else: # Sleep a while if nothing to read yet. time.sleep(self.POLLING_DELAY_SECS) def start(self): """Start the recording thread.""" logging.info('Start recording thread.') self.clear_contents() self._recording_thread = threading.Thread(target=self.record) self._recording_thread.start() time.sleep(self.start_delay_secs) def stop(self): """Stop the recording thread.""" logging.info('Stop recording thread.') time.sleep(self.stop_delay_secs) self._stop_recording_thread_event.set() self._recording_thread.join() # Kill the process. self._recorder.terminate() self._recorder.kill() def clear_contents(self): """Clear the contents.""" self.contents = [] def get_contents(self, search_str='', start_str=''): """Get the (filtered) contents. @param search_str: only lines with search_str would be kept. @param start_str: all lines before the occurrence of start_str would be filtered. @returns: the (filtered) contents. """ search_pattern = re.compile(search_str) if search_str else None start_pattern = re.compile(start_str) if start_str else None # Just returns the original contents if no filtered conditions are # specified. if not search_pattern and not start_pattern: return self.contents contents = [] start_flag = not bool(start_pattern) for line in self.contents: if start_flag: if search_pattern.search(line): contents.append(line.strip()) elif start_pattern.search(line): start_flag = True contents.append(line.strip()) return contents def find(self, pattern_str, flags=re.I): """Find a pattern string in the contents. Note that the pattern_str is considered as an arbitrary literal string that might contain re meta-characters, e.g., '(' or ')'. Hence, re.escape() is applied before using re.compile. @param pattern_str: the pattern string to search. @param flags: the flags of the pattern expression behavior. @returns: True if found. False otherwise. """ pattern = re.compile(re.escape(pattern_str), flags) for line in self.contents: result = pattern.search(line) if result: return True return False if __name__ == '__main__': # A demo using btmon tool to monitor bluetoohd activity. cmd = 'btmon' recorder = OutputRecorder(cmd) if True: recorder.start() # Perform some bluetooth activities here in another terminal. time.sleep(recorder.stop_delay_secs) recorder.stop() for line in recorder.get_contents(): print(line)