• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import threading
5import traceback
6import Queue
7
8
9class ThreadedWorkQueue(object):
10
11  def __init__(self, num_threads):
12    self._num_threads = num_threads
13
14    self._main_thread_tasks = None
15    self._any_thread_tasks = None
16
17    self._running = False
18    self._stop = False
19    self._stop_result = None
20
21    self.Reset()
22
23  @property
24  def is_running(self):
25    return self._running
26
27  def Run(self):
28    if self.is_running:
29      raise Exception('Already running')
30
31    self._running = True
32    self._stop = False
33    self._stop_result = None
34
35    if self._num_threads == 1:
36      self._RunSingleThreaded()
37    else:
38      self._RunMultiThreaded()
39
40    self._main_thread_tasks = Queue.Queue()
41    self._any_thread_tasks = Queue.Queue()
42
43    r = self._stop_result
44    self._stop_result = None
45    self._running = False
46
47    return r
48
49  def Stop(self, stop_result=None):
50    if not self.is_running:
51      raise Exception('Not running')
52
53    if self._stop:
54      return False
55    self._stop_result = stop_result
56    self._stop = True
57    return True
58
59  def Reset(self):
60    assert not self.is_running
61    self._main_thread_tasks = Queue.Queue()
62    self._any_thread_tasks = Queue.Queue()
63
64  def PostMainThreadTask(self, cb, *args, **kwargs):
65    def RunTask():
66      cb(*args, **kwargs)
67    self._main_thread_tasks.put(RunTask)
68
69  def PostAnyThreadTask(self, cb, *args, **kwargs):
70    def RunTask():
71      cb(*args, **kwargs)
72    self._any_thread_tasks.put(RunTask)
73
74  def _TryToRunOneTask(self, queue, block=False):
75    if block:
76      try:
77        task = queue.get(True, 0.1)
78      except Queue.Empty:
79        return
80    else:
81      if queue.empty():
82        return
83      task = queue.get()
84
85    try:
86      task()
87    except KeyboardInterrupt, ex:
88      raise ex
89    except Exception:
90      traceback.print_exc()
91    finally:
92      queue.task_done()
93
94  def _RunSingleThreaded(self):
95    while True:
96      if self._stop:
97        break
98      self._TryToRunOneTask(self._any_thread_tasks)
99      self._TryToRunOneTask(self._main_thread_tasks)
100
101  def _RunMultiThreaded(self):
102    threads = []
103    for _ in range(self._num_threads):
104      t = threading.Thread(target=self._ThreadMain)
105      t.setDaemon(True)
106      t.start()
107      threads.append(t)
108
109    while True:
110      if self._stop:
111        break
112      self._TryToRunOneTask(self._main_thread_tasks)
113
114    for t in threads:
115      t.join()
116
117  def _ThreadMain(self):
118    while True:
119      if self._stop:
120        break
121      self._TryToRunOneTask(self._any_thread_tasks, block=True)
122