#!/usr/bin/python # Copyright (C) 2015 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import argparse, math, re, sys import xml.etree.ElementTree as ET from collections import defaultdict, namedtuple import itertools def createLookup(values, key): """Creates a lookup table for a collection of values based on keys. Arguments: values: a collection of arbitrary values. Must be iterable. key: a function of one argument that returns the key for a value. Returns: A dict mapping keys (as generated by the key argument) to lists of values. All values in the lists have the same key, and are in the order they appeared in the collection. """ lookup = defaultdict(list) for v in values: lookup[key(v)].append(v) return lookup def _intify(value): """Returns a value converted to int if possible, else the original value.""" try: return int(value) except ValueError: return value class Size(namedtuple('Size', ['width', 'height'])): """A namedtuple with width and height fields.""" def __str__(self): return '%dx%d' % (self.width, self.height) class _VideoResultBase(object): """Helper methods for results. Not for use by applications. Attributes: codec: The name of the codec (string) or None size: Size representing the video size or None mime: The mime-type of the codec (string) or None rates: The measured achievable frame rates is_decoder: True iff codec is a decoder. """ def __init__(self, is_decoder): self.codec = None self.mime = None self.size = None self._rates_from_failure = [] self._rates_from_message = [] self.is_decoder = is_decoder def _inited(self): """Returns true iff codec, mime and size was set.""" return None not in (self.codec, self.mime, self.size) def __len__(self): # don't report any result if codec name, mime type and size is unclear if not self._inited(): return 0 return len(self.rates) @property def rates(self): return self._rates_from_failure or self._rates_from_message def _parseDict(self, value): """Parses a MediaFormat from its string representation sans brackets.""" return dict((k, _intify(v)) for k, v in re.findall(r'([^ =]+)=([^ [=]+(?:|\[[^\]]+\]))(?:, |$)', value)) def _cleanFormat(self, format): """Removes internal fields from a parsed MediaFormat.""" format.pop('what', None) format.pop('image-data', None) MESSAGE_PATTERN = r'(?P\w+)=(?P\{[^}]*\}|[^ ,{}]+)' def _parsePartialResult(self, message_match): """Parses a partial test result conforming to the message pattern. Returns: A tuple of string key and int, string or dict value, where dict has string keys mapping to int or string values. """ key, value = message_match.group('key', 'value') if value.startswith('{'): value = self._parseDict(value[1:-1]) if key.endswith('Format'): self._cleanFormat(value) else: value = _intify(value) return key, value def _parseValuesFromBracket(self, line): """Returns the values enclosed in brackets without the brackets. Parses a line matching the pattern ": []" and returns . Raises: ValueError: if the line does not match the pattern. """ try: return re.match(r'^[^:]+: *\[(?P.*)\]\.$', line).group('values') except AttributeError: raise ValueError('line does not match "tag: [value]": %s' % line) def _parseRawData(self, line): """Parses the raw data line for video performance tests. Yields: Dict objects corresponding to parsed results, mapping string keys to int, string or dict values. """ try: values = self._parseValuesFromBracket(line) result = {} for m in re.finditer(self.MESSAGE_PATTERN + r'(?P,? +|$)', values): key, value = self._parsePartialResult(m) result[key] = value if m.group('sep') != ' ': yield result result = {} except ValueError: print >> sys.stderr, 'could not parse line %s' % repr(line) def _tryParseMeasuredFrameRate(self, line): """Parses a line starting with 'Measured frame rate:'.""" if line.startswith('Measured frame rate: '): try: values = self._parseValuesFromBracket(line) values = re.split(r' *, *', values) self._rates_from_failure = list(map(float, values)) except ValueError: print >> sys.stderr, 'could not parse line %s' % repr(line) def parse(self, test): """Parses the ValueArray and FailedScene lines of a test result. Arguments: test: An ElementTree element. """ failure = test.find('FailedScene') if failure is not None: trace = failure.find('StackTrace') if trace is not None: for line in re.split(r'[\r\n]+', trace.text): self._parseFailureLine(line) details = test.find('Details') if details is not None: for array in details.iter('ValueArray'): message = array.get('message') self._parseMessage(message, array) def _parseFailureLine(self, line): raise NotImplementedError def _parseMessage(self, message, array): raise NotImplementedError def getData(self): """Gets the parsed test result data. Yields: Result objects containing at least codec, size, mime and rates attributes.""" yield self class VideoEncoderDecoderTestResult(_VideoResultBase): """Represents a result from a VideoEncoderDecoderTest performance case.""" def __init__(self, unused_m): super(VideoEncoderDecoderTestResult, self).__init__(is_decoder=False) # If a VideoEncoderDecoderTest succeeds, it provides the results in the # message of a ValueArray. If fails, it provides the results in the failure # using raw data. (For now it also includes some data in the ValueArrays even # if it fails, which we ignore.) def _parseFailureLine(self, line): """Handles parsing a line from the failure log.""" self._tryParseMeasuredFrameRate(line) def _parseMessage(self, message, array): """Handles parsing a message from ValueArrays.""" if message.startswith('codec='): result = dict(self._parsePartialResult(m) for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message)) if 'EncInputFormat' in result: self.codec = result['codec'] fmt = result['EncInputFormat'] self.size = Size(fmt['width'], fmt['height']) self.mime = result['EncOutputFormat']['mime'] self._rates_from_message.append(1000000./result['min']) class VideoDecoderPerfTestResult(_VideoResultBase): """Represents a result from a VideoDecoderPerfTest performance case.""" # If a VideoDecoderPerfTest succeeds, it provides the results in the message # of a ValueArray. If fails, it provides the results in the failure only # using raw data. def __init__(self, unused_m): super(VideoDecoderPerfTestResult, self).__init__(is_decoder=True) def _parseFailureLine(self, line): """Handles parsing a line from the failure log.""" self._tryParseMeasuredFrameRate(line) # if the test failed, we can only get the codec/size/mime from the raw data. if line.startswith('Raw data: '): for result in self._parseRawData(line): fmt = result['DecOutputFormat'] self.size = Size(fmt['width'], fmt['height']) self.codec = result['codec'] self.mime = result['mime'] def _parseMessage(self, message, array): """Handles parsing a message from ValueArrays.""" if message.startswith('codec='): result = dict(self._parsePartialResult(m) for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message)) if result.get('decodeto') == 'surface': self.codec = result['codec'] fmt = result['DecOutputFormat'] self.size = Size(fmt['width'], fmt['height']) self.mime = result['mime'] self._rates_from_message.append(1000000. / result['min']) class Results(object): """Container that keeps all test results.""" def __init__(self): self._results = [] # namedtuples self._device = None VIDEO_ENCODER_DECODER_TEST_REGEX = re.compile( 'test(.*)(\d{4})x(\d{4})(Goog|Other)$') VIDEO_DECODER_PERF_TEST_REGEX = re.compile( 'test(VP[89]|H26[34]|MPEG4|HEVC)(\d+)x(\d+)(.*)$') TestCaseSpec = namedtuple('TestCaseSpec', 'package path class_ regex result_class') def _getTestCases(self): return [ self.TestCaseSpec(package='CtsDeviceVideoPerf', path='TestSuite/TestSuite/TestSuite/TestSuite/TestCase', class_='VideoEncoderDecoderTest', regex=self.VIDEO_ENCODER_DECODER_TEST_REGEX, result_class=VideoEncoderDecoderTestResult), self.TestCaseSpec(package='CtsMediaTestCases', path='TestSuite/TestSuite/TestSuite/TestCase', class_='VideoDecoderPerfTest', regex=self.VIDEO_DECODER_PERF_TEST_REGEX, result_class=VideoDecoderPerfTestResult) ] def _verifyDeviceInfo(self, device): assert self._device in (None, device), "expected %s device" % self._device self._device = device def importXml(self, xml): self._verifyDeviceInfo(xml.find('DeviceInfo/BuildInfo').get('buildName')) packages = createLookup(self._getTestCases(), lambda tc: tc.package) for pkg in xml.iter('TestPackage'): tests_in_package = packages.get(pkg.get('name')) if not tests_in_package: continue paths = createLookup(tests_in_package, lambda tc: tc.path) for path, tests_in_path in paths.items(): classes = createLookup(tests_in_path, lambda tc: tc.class_) for tc in pkg.iterfind(path): tests_in_class = classes.get(tc.get('name')) if not tests_in_class: continue for test in tc.iter('Test'): for tc in tests_in_class: m = tc.regex.match(test.get('name')) if m: result = tc.result_class(m) result.parse(test) self._results.append(result) def importFile(self, path): print >> sys.stderr, 'Importing "%s"...' % path try: return self.importXml(ET.parse(path)) except ET.ParseError: raise ValueError('not a valid XML file') def getData(self): for result in self._results: for data in result.getData(): yield data def dumpXml(self, results): yield '' yield '' yield '' yield '' last_section = None Comp = namedtuple('Comp', 'is_decoder google mime name') by_comp = createLookup(results, lambda e: Comp(is_decoder=e.is_decoder, google='.google.' in e.codec, mime=e.mime, name=e.codec)) for comp in sorted(by_comp): section = 'Decoders' if comp.is_decoder else 'Encoders' if section != last_section: if last_section: yield ' ' % last_section yield ' <%s>' % section last_section = section yield ' ' % (comp.name, comp.mime) by_size = createLookup(by_comp[comp], lambda e: e.size) for size in sorted(by_size): values = list(itertools.chain(*(e.rates for e in by_size[size]))) min_, max_ = min(values), max(values) med_ = int(math.sqrt(min_ * max_)) yield ' ' % (size, med_, med_) yield ' ' if last_section: yield ' ' % last_section yield '' class Main(object): """Executor of this utility.""" def __init__(self): self._result = Results() self._parser = argparse.ArgumentParser('get_achievable_framerates') self._parser.add_argument('result_xml', nargs='+') def _parseArgs(self): self._args = self._parser.parse_args() def _importXml(self, xml): self._result.importFile(xml) def _report(self): for line in self._result.dumpXml(r for r in self._result.getData() if r): print line def run(self): self._parseArgs() try: for xml in self._args.result_xml: try: self._importXml(xml) except (ValueError, IOError, AssertionError) as e: print >> sys.stderr, e raise KeyboardInterrupt self._report() except KeyboardInterrupt: print >> sys.stderr, 'Interrupted.' if __name__ == '__main__': Main().run()