1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18from concurrent.futures import ThreadPoolExecutor 19import multiprocessing 20 21 22class AssemblyLine(object): 23 """A class for passing data through a chain of threads or processes, 24 assembly-line style. 25 26 Attributes: 27 nodes: A list of AssemblyLine.Nodes that pass data from one node to the 28 next. 29 """ 30 31 class Node(object): 32 """A Node in an AssemblyLine. 33 34 Each node is composed of the following: 35 36 input_stream output_stream 37 ==============> [ transformer ] ===============> 38 39 Attributes: 40 transformer: The Transformer that takes input from the input 41 stream, transforms the data, and sends it to the output stream. 42 input_stream: The stream of data to be taken in as input to this 43 transformer. This stream is the stream to be registered as the 44 previous node's output stream. 45 46 Properties: 47 output_stream: The stream of data to be passed to the next node. 48 """ 49 50 def __init__(self, transformer=None, input_stream=None): 51 self.transformer = transformer 52 self.input_stream = input_stream 53 54 @property 55 def output_stream(self): 56 return self.transformer.output_stream 57 58 @output_stream.setter 59 def output_stream(self, value): 60 self.transformer.output_stream = value 61 62 def __init__(self, nodes): 63 """Initializes an AssemblyLine class. 64 65 nodes: 66 A list of AssemblyLine.Node objects. 67 """ 68 self.nodes = nodes 69 70 def run(self): 71 """Runs the AssemblyLine, passing the data between each work node.""" 72 raise NotImplementedError() 73 74 75class ProcessAssemblyLine(AssemblyLine): 76 """An AssemblyLine that uses processes to schedule work on nodes.""" 77 78 def run(self): 79 """Runs the AssemblyLine within a process pool.""" 80 if not self.nodes: 81 # If self.nodes is empty, it will create a multiprocessing.Pool of 82 # 0 nodes, which raises a ValueError. 83 return 84 85 process_pool = multiprocessing.Pool(processes=len(self.nodes)) 86 for node in self.nodes: 87 process_pool.apply_async(node.transformer.transform, 88 [node.input_stream]) 89 process_pool.close() 90 process_pool.join() 91 92 93class ThreadAssemblyLine(AssemblyLine): 94 """An AssemblyLine that uses threading to schedule work on nodes.""" 95 96 def run(self): 97 """Runs the AssemblyLine within a thread pool.""" 98 with ThreadPoolExecutor(max_workers=len(self.nodes)) as thread_pool: 99 for node in self.nodes: 100 thread_pool.submit(node.transformer.transform, 101 node.input_stream) 102 103 104class AssemblyLineBuilder(object): 105 """An abstract class that builds an AssemblyLine object. 106 107 Attributes: 108 _assembly_line_generator: The callable that creates the AssemblyLine. 109 Should be in the form of: 110 111 Args: 112 A list of AssemblyLine.Node objects. 113 114 Returns: 115 An AssemblyLine object. 116 117 _queue_generator: The callable that creates new queues to be used for 118 BufferStreams. Should be in the form of: 119 120 Args: 121 None. 122 123 Returns: 124 A Queue object. 125 """ 126 127 def __init__(self, queue_generator, assembly_line_generator): 128 """Creates an AssemblyLineBuilder. 129 130 Args: 131 queue_generator: A callable of type lambda: Queue(). 132 assembly_line_generator: A callable of type 133 lambda list<AssemblyLine.Node>: AssemblyLine. 134 """ 135 super().__init__() 136 self._assembly_line_generator = assembly_line_generator 137 self._queue_generator = queue_generator 138 139 self.nodes = [] 140 self._built = False 141 142 @property 143 def built(self): 144 return self._built 145 146 def __generate_queue(self): 147 """Returns a new Queue object for passing information between nodes.""" 148 return self._queue_generator() 149 150 @property 151 def queue_generator(self): 152 """Returns the callable used for generating queues.""" 153 return self._queue_generator 154 155 def source(self, transformer, input_stream=None): 156 """Adds a SourceTransformer to the AssemblyLine. 157 158 Must be the first function call on the AssemblyLineBuilder. 159 160 Args: 161 transformer: The SourceTransformer that generates data for the 162 AssemblyLine to process. 163 input_stream: The input stream to use, if necessary. 164 165 Raises: 166 ValueError if source is not the first transformer to be added to 167 the AssemblyLine, or the AssemblyLine has been built. 168 """ 169 if self.nodes: 170 raise ValueError('AssemblyLines can only have a single source.') 171 if input_stream is None: 172 input_stream = DevNullBufferStream() 173 self.nodes.append(AssemblyLine.Node(transformer, input_stream)) 174 return self 175 176 def into(self, transformer): 177 """Adds the given transformer next in the AssemblyLine. 178 179 Args: 180 transformer: The transformer next in the AssemblyLine. 181 182 Raises: 183 ValueError if no source node is set, or the AssemblyLine has been 184 built. 185 """ 186 if not self.nodes: 187 raise ValueError('The source transformer must be set first.') 188 if self.built: 189 raise ValueError('Cannot add additional nodes after the ' 190 'AssemblyLine has been built.') 191 stream = BufferStream(self.__generate_queue()) 192 self.nodes[-1].transformer.set_output_stream(stream) 193 self.nodes.append(AssemblyLine.Node(transformer, stream)) 194 return self 195 196 def build(self, output_stream=None): 197 """Builds the AssemblyLine object. 198 199 Note that after this function is called this AssemblyLineBuilder cannot 200 be used again, as it is already marked as built. 201 """ 202 if self.built: 203 raise ValueError('The AssemblyLine is already built.') 204 if not self.nodes: 205 raise ValueError('Cannot create an empty assembly line.') 206 self._built = True 207 if output_stream is None: 208 output_stream = DevNullBufferStream() 209 self.nodes[-1].output_stream = output_stream 210 return self._assembly_line_generator(self.nodes) 211 212 213class ThreadAssemblyLineBuilder(AssemblyLineBuilder): 214 """An AssemblyLineBuilder for generating ThreadAssemblyLines.""" 215 216 def __init__(self, queue_generator=queue.Queue): 217 super().__init__(queue_generator, ThreadAssemblyLine) 218 219 220class ProcessAssemblyLineBuilder(AssemblyLineBuilder): 221 """An AssemblyLineBuilder for ProcessAssemblyLines. 222 223 Attributes: 224 manager: The multiprocessing.Manager used for having queues communicate 225 with one another over multiple processes. 226 """ 227 228 def __init__(self): 229 self.manager = multiprocessing.Manager() 230 super().__init__(self.manager.Queue, ProcessAssemblyLine) 231 232 233class IndexedBuffer(object): 234 """A buffer indexed with the order it was generated in.""" 235 236 def __init__(self, index, size_or_buffer): 237 """Creates an IndexedBuffer. 238 239 Args: 240 index: The integer index associated with the buffer. 241 size_or_buffer: 242 either: 243 An integer specifying the number of slots in the buffer OR 244 A list to be used as a buffer. 245 """ 246 self.index = index 247 if isinstance(size_or_buffer, int): 248 self.buffer = [None] * size_or_buffer 249 else: 250 self.buffer = size_or_buffer 251 252 253class BufferList(list): 254 """A list of Buffers. 255 256 This type is useful for differentiating when a buffer has been returned 257 from a transformer, vs when a list of buffers has been returned from a 258 transformer. 259 """ 260 261 262class BufferStream(object): 263 """An object that acts as a stream between two transformers.""" 264 265 # The object passed to the buffer queue to signal the end-of-stream. 266 END = None 267 268 def __init__(self, buffer_queue): 269 """Creates a new BufferStream. 270 271 Args: 272 buffer_queue: A Queue object used to pass data along the 273 BufferStream. 274 """ 275 self._buffer_queue = buffer_queue 276 277 def initialize(self): 278 """Initializes the stream. 279 280 When running BufferStreams through multiprocessing, initialize must 281 only be called on the process using the BufferStream. 282 """ 283 # Here we need to make any call to the stream to initialize it. This 284 # makes read and write times for the first buffer faster, preventing 285 # the data at the beginning from being dropped. 286 self._buffer_queue.qsize() 287 288 def end_stream(self): 289 """Closes the stream. 290 291 By convention, a None object is used, mirroring file reads returning 292 an empty string when the end of file is reached. 293 """ 294 self._buffer_queue.put(None, block=False) 295 296 def add_indexed_buffer(self, buffer): 297 """Adds the given buffer to the buffer stream.""" 298 self._buffer_queue.put(buffer, block=False) 299 300 def remove_indexed_buffer(self): 301 """Removes an indexed buffer from the array. 302 303 This operation blocks until data is received. 304 305 Returns: 306 an IndexedBuffer. 307 """ 308 return self._buffer_queue.get() 309 310 311class DevNullBufferStream(BufferStream): 312 """A BufferStream that is always empty.""" 313 314 def __init__(self, *_): 315 super().__init__(None) 316 317 def initialize(self): 318 """Does nothing. Nothing to initialize.""" 319 pass 320 321 def end_stream(self): 322 """Does nothing. The stream always returns end-of-stream when read.""" 323 pass 324 325 def add_indexed_buffer(self, buffer): 326 """Imitating /dev/null, nothing will be written to the stream.""" 327 pass 328 329 def remove_indexed_buffer(self): 330 """Always returns the end-of-stream marker.""" 331 return None 332