• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9from multiprocessing import Queue, queues
10from six.moves import range
11
12
13class QueueBarrierTimeout(Exception):
14    """QueueBarrier timeout exception."""
15
16
17class QueueBarrier(object):
18    """This class implements a simple barrier to synchronize processes. The
19    barrier relies on the fact that there a single process "main" and |n|
20    different "nodes" to make the implementation simpler. Also, given this
21    hierarchy, the nodes and the main can exchange a token while passing
22    through the barrier.
23
24    The so called "main" shall call main_barrier() while the "node" shall
25    call the node_barrier() method.
26
27    If the same group of |n| nodes and the same main are participating in the
28    barrier, it is totally safe to reuse the barrier several times with the same
29    group of processes.
30    """
31
32
33    def __init__(self, n):
34        """Initializes the barrier with |n| node processes and a main.
35
36        @param n: The number of node processes."""
37        self.n_ = n
38        self.queue_main_ = Queue()
39        self.queue_node_ = Queue()
40
41
42    def main_barrier(self, token=None, timeout=None):
43        """Makes the main wait until all the "n" nodes have reached this
44        point.
45
46        @param token: A value passed to every node.
47        @param timeout: The timeout, in seconds, to wait for the nodes.
48                A None value will block forever.
49
50        Returns the list of received tokens from the nodes.
51        """
52        # Wait for all the nodes.
53        result = []
54        try:
55            for _ in range(self.n_):
56                result.append(self.queue_main_.get(timeout=timeout))
57        except queues.Empty:
58            # Timeout expired
59            raise QueueBarrierTimeout()
60        # Release all the blocked nodes.
61        for _ in range(self.n_):
62            self.queue_node_.put(token)
63        return result
64
65
66    def node_barrier(self, token=None, timeout=None):
67        """Makes a node wait until all the "n" nodes and the main have
68        reached this point.
69
70        @param token: A value passed to the main.
71        @param timeout: The timeout, in seconds, to wait for the nodes.
72                A None value will block forever.
73        """
74        self.queue_main_.put(token)
75        try:
76            return self.queue_node_.get(timeout=timeout)
77        except queues.Empty:
78            # Timeout expired
79            raise QueueBarrierTimeout()
80