1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferList 20from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferStream 21from acts.controllers.monsoon_lib.sampling.engine.assembly_line import DevNullBufferStream 22from acts.controllers.monsoon_lib.sampling.engine.assembly_line import IndexedBuffer 23 24 25class Transformer(object): 26 """An object that represents how to transform a given buffer into a result. 27 28 Attributes: 29 output_stream: The stream to output data to upon transformation. 30 Defaults to a DevNullBufferStream. 31 """ 32 33 def __init__(self): 34 self.output_stream = DevNullBufferStream(None) 35 36 def set_output_stream(self, output_stream): 37 """Sets the Transformer's output stream to the given output stream.""" 38 self.output_stream = output_stream 39 40 def transform(self, input_stream): 41 """Transforms input_stream data and passes it to self.output_stream. 42 43 Args: 44 input_stream: The BufferStream of input data this transformer should 45 transform. Note that the type of data stored within BufferStream 46 is not guaranteed to be in the format expected, much like STDIN 47 is not guaranteed to be the format a process expects. However, 48 for performance, users should expect the data to be properly 49 formatted anyway. 50 """ 51 input_stream.initialize() 52 self.output_stream.initialize() 53 class_name = self.__class__.__qualname__ 54 try: 55 logging.debug('%s transformer beginning.', class_name) 56 self.on_begin() 57 logging.debug('%s transformation started.', class_name) 58 self._transform(input_stream) 59 except Exception: 60 # TODO(markdr): Get multi-process error reporting to play nicer. 61 logging.exception('%s ran into an exception.', class_name) 62 raise 63 finally: 64 logging.debug('%s transformation ended.', class_name) 65 self.on_end() 66 logging.debug('%s finished.', class_name) 67 68 def _transform_buffer(self, buffer): 69 """Transforms a given buffer. 70 71 The implementation can either: 72 73 1) Return the transformed buffer. Can be either in-place or a new 74 buffer. 75 76 2) Return a BufferList: a list of transformed buffers. This is useful 77 for grouping data together for faster operations. 78 79 Args: 80 buffer: The buffer to transform 81 82 Returns: 83 either a buffer or a BufferList. See detailed documentation. 84 """ 85 raise NotImplementedError() 86 87 def _on_end_of_stream(self, input_stream): 88 """To be called when the input stream has sent the end of stream signal. 89 90 This is particularly useful for flushing any stored memory into the 91 output stream. 92 93 Args: 94 input_stream: the stream that was closed. 95 """ 96 # By default, this function closes the output stream. 97 self.output_stream.end_stream() 98 99 def _transform(self, input_stream): 100 """Should call _transform_buffer within this function.""" 101 raise NotImplementedError() 102 103 def on_begin(self): 104 """A function called before the transform loop begins.""" 105 pass 106 107 def on_end(self): 108 """A function called after the transform loop has ended.""" 109 pass 110 111 112class SourceTransformer(Transformer): 113 """The base class for generating data in an AssemblyLine. 114 115 Note that any Transformer will be able to generate data, but this class is 116 a generic way to send data. 117 118 Attributes: 119 _buffer_size: The buffer size for each IndexedBuffer sent over the 120 output stream. 121 """ 122 123 def __init__(self): 124 super().__init__() 125 # Defaulted to 64, which is small enough to be passed within the .6ms 126 # window, but large enough so that it does not spam the queue. 127 self._buffer_size = 64 128 129 def _transform(self, _): 130 """Generates data and sends it to the output stream.""" 131 buffer_index = 0 132 while True: 133 indexed_buffer = IndexedBuffer(buffer_index, self._buffer_size) 134 buffer = self._transform_buffer(indexed_buffer.buffer) 135 if buffer is BufferStream.END: 136 break 137 indexed_buffer.buffer = buffer 138 self.output_stream.add_indexed_buffer(indexed_buffer) 139 buffer_index += 1 140 141 self.output_stream.end_stream() 142 143 def _transform_buffer(self, buffer): 144 """Fills the passed-in buffer with data.""" 145 raise NotImplementedError() 146 147 148class SequentialTransformer(Transformer): 149 """A transformer that receives input in sequential order. 150 151 Attributes: 152 _next_index: The index of the next IndexedBuffer that should be read. 153 """ 154 155 def __init__(self): 156 super().__init__() 157 self._next_index = 0 158 159 def _transform(self, input_stream): 160 while True: 161 indexed_buffer = input_stream.remove_indexed_buffer() 162 if indexed_buffer is BufferStream.END: 163 break 164 buffer_or_buffers = self._transform_buffer(indexed_buffer.buffer) 165 if buffer_or_buffers is not None: 166 self._send_buffers(buffer_or_buffers) 167 168 self._on_end_of_stream(input_stream) 169 170 def _send_buffers(self, buffer_or_buffer_list): 171 """Sends buffers over to the output_stream. 172 173 Args: 174 buffer_or_buffer_list: A BufferList or buffer object. Note that if 175 buffer is None, it is effectively an end-of-stream signal. 176 """ 177 if not isinstance(buffer_or_buffer_list, BufferList): 178 # Assume a single buffer was returned 179 buffer_or_buffer_list = BufferList([buffer_or_buffer_list]) 180 181 buffer_list = buffer_or_buffer_list 182 for buffer in buffer_list: 183 new_buffer = IndexedBuffer(self._next_index, buffer) 184 self.output_stream.add_indexed_buffer(new_buffer) 185 self._next_index += 1 186 187 def _transform_buffer(self, buffer): 188 raise NotImplementedError() 189 190 191class ParallelTransformer(Transformer): 192 """A Transformer that is capable of running in parallel. 193 194 Buffers received may be unordered. For ordered input, use 195 SequentialTransformer. 196 """ 197 198 def _transform(self, input_stream): 199 while True: 200 indexed_buffer = input_stream.remove_indexed_buffer() 201 if indexed_buffer is None: 202 break 203 buffer = self._transform_buffer(indexed_buffer.buffer) 204 indexed_buffer.buffer = buffer 205 self.output_stream.add_indexed_buffer(indexed_buffer) 206 207 self._on_end_of_stream(input_stream) 208 209 def _transform_buffer(self, buffer): 210 """Transforms a given buffer. 211 212 Note that ParallelTransformers can NOT return a BufferList. This is a 213 limitation with the current indexing system. If the input buffer is 214 replaced with multiple buffers, later transformers will not know what 215 the proper order of buffers is. 216 217 Args: 218 buffer: The buffer to transform 219 220 Returns: 221 either None or a buffer. See detailed documentation. 222 """ 223 raise NotImplementedError() 224