1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import threading 5import traceback 6import Queue 7 8 9class ThreadedWorkQueue(object): 10 11 def __init__(self, num_threads): 12 self._num_threads = num_threads 13 14 self._main_thread_tasks = None 15 self._any_thread_tasks = None 16 17 self._running = False 18 self._stop = False 19 self._stop_result = None 20 21 self.Reset() 22 23 @property 24 def is_running(self): 25 return self._running 26 27 def Run(self): 28 if self.is_running: 29 raise Exception('Already running') 30 31 self._running = True 32 self._stop = False 33 self._stop_result = None 34 35 if self._num_threads == 1: 36 self._RunSingleThreaded() 37 else: 38 self._RunMultiThreaded() 39 40 self._main_thread_tasks = Queue.Queue() 41 self._any_thread_tasks = Queue.Queue() 42 43 r = self._stop_result 44 self._stop_result = None 45 self._running = False 46 47 return r 48 49 def Stop(self, stop_result=None): 50 if not self.is_running: 51 raise Exception('Not running') 52 53 if self._stop: 54 return False 55 self._stop_result = stop_result 56 self._stop = True 57 return True 58 59 def Reset(self): 60 assert not self.is_running 61 self._main_thread_tasks = Queue.Queue() 62 self._any_thread_tasks = Queue.Queue() 63 64 def PostMainThreadTask(self, cb, *args, **kwargs): 65 def RunTask(): 66 cb(*args, **kwargs) 67 self._main_thread_tasks.put(RunTask) 68 69 def PostAnyThreadTask(self, cb, *args, **kwargs): 70 def RunTask(): 71 cb(*args, **kwargs) 72 self._any_thread_tasks.put(RunTask) 73 74 def _TryToRunOneTask(self, queue, block=False): 75 if block: 76 try: 77 task = queue.get(True, 0.1) 78 except Queue.Empty: 79 return 80 else: 81 if queue.empty(): 82 return 83 task = queue.get() 84 85 try: 86 task() 87 except KeyboardInterrupt, ex: 88 raise ex 89 except Exception: 90 traceback.print_exc() 91 finally: 92 queue.task_done() 93 94 def _RunSingleThreaded(self): 95 while True: 96 if self._stop: 97 break 98 self._TryToRunOneTask(self._any_thread_tasks) 99 self._TryToRunOneTask(self._main_thread_tasks) 100 101 def _RunMultiThreaded(self): 102 threads = [] 103 for _ in range(self._num_threads): 104 t = threading.Thread(target=self._ThreadMain) 105 t.setDaemon(True) 106 t.start() 107 threads.append(t) 108 109 while True: 110 if self._stop: 111 break 112 self._TryToRunOneTask(self._main_thread_tasks) 113 114 for t in threads: 115 t.join() 116 117 def _ThreadMain(self): 118 while True: 119 if self._stop: 120 break 121 self._TryToRunOneTask(self._any_thread_tasks, block=True) 122