1# Copyright 2018 the V8 project authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import traceback 7 8from . import base 9from ..local import pool 10 11 12# Global function for multiprocessing, because pickling a static method doesn't 13# work on Windows. 14def run_job(job, process_context): 15 return job.run(process_context) 16 17 18def create_process_context(result_reduction): 19 return ProcessContext(result_reduction) 20 21 22JobResult = collections.namedtuple('JobResult', ['id', 'result']) 23ProcessContext = collections.namedtuple('ProcessContext', ['result_reduction']) 24 25 26class Job(object): 27 def __init__(self, test_id, cmd, outproc, keep_output): 28 self.test_id = test_id 29 self.cmd = cmd 30 self.outproc = outproc 31 self.keep_output = keep_output 32 33 def run(self, process_ctx): 34 output = self.cmd.execute() 35 reduction = process_ctx.result_reduction if not self.keep_output else None 36 result = self.outproc.process(output, reduction) 37 return JobResult(self.test_id, result) 38 39 40class ExecutionProc(base.TestProc): 41 """Last processor in the chain. Instead of passing tests further it creates 42 commands and output processors, executes them in multiple worker processes and 43 sends results to the previous processor. 44 """ 45 46 def __init__(self, jobs, outproc_factory=None): 47 super(ExecutionProc, self).__init__() 48 self._pool = pool.Pool(jobs, notify_fun=self.notify_previous) 49 self._outproc_factory = outproc_factory or (lambda t: t.output_proc) 50 self._tests = {} 51 52 def connect_to(self, next_proc): 53 assert False, 'ExecutionProc cannot be connected to anything' 54 55 def run(self): 56 it = self._pool.imap_unordered( 57 fn=run_job, 58 gen=[], 59 process_context_fn=create_process_context, 60 process_context_args=[self._prev_requirement], 61 ) 62 for pool_result in it: 63 self._unpack_result(pool_result) 64 65 def next_test(self, test): 66 if self.is_stopped: 67 return False 68 69 test_id = test.procid 70 cmd = test.get_command() 71 self._tests[test_id] = test, cmd 72 73 outproc = self._outproc_factory(test) 74 self._pool.add([Job(test_id, cmd, outproc, test.keep_output)]) 75 76 return True 77 78 def result_for(self, test, result): 79 assert False, 'ExecutionProc cannot receive results' 80 81 def stop(self): 82 super(ExecutionProc, self).stop() 83 self._pool.abort() 84 85 def _unpack_result(self, pool_result): 86 if pool_result.heartbeat: 87 self.heartbeat() 88 return 89 90 job_result = pool_result.value 91 test_id, result = job_result 92 93 test, result.cmd = self._tests[test_id] 94 del self._tests[test_id] 95 self._send_result(test, result) 96