1# Lint as: python2, python3 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9from multiprocessing import Queue, queues 10from six.moves import range 11 12 13class QueueBarrierTimeout(Exception): 14 """QueueBarrier timeout exception.""" 15 16 17class QueueBarrier(object): 18 """This class implements a simple barrier to synchronize processes. The 19 barrier relies on the fact that there a single process "main" and |n| 20 different "nodes" to make the implementation simpler. Also, given this 21 hierarchy, the nodes and the main can exchange a token while passing 22 through the barrier. 23 24 The so called "main" shall call main_barrier() while the "node" shall 25 call the node_barrier() method. 26 27 If the same group of |n| nodes and the same main are participating in the 28 barrier, it is totally safe to reuse the barrier several times with the same 29 group of processes. 30 """ 31 32 33 def __init__(self, n): 34 """Initializes the barrier with |n| node processes and a main. 35 36 @param n: The number of node processes.""" 37 self.n_ = n 38 self.queue_main_ = Queue() 39 self.queue_node_ = Queue() 40 41 42 def main_barrier(self, token=None, timeout=None): 43 """Makes the main wait until all the "n" nodes have reached this 44 point. 45 46 @param token: A value passed to every node. 47 @param timeout: The timeout, in seconds, to wait for the nodes. 48 A None value will block forever. 49 50 Returns the list of received tokens from the nodes. 51 """ 52 # Wait for all the nodes. 53 result = [] 54 try: 55 for _ in range(self.n_): 56 result.append(self.queue_main_.get(timeout=timeout)) 57 except queues.Empty: 58 # Timeout expired 59 raise QueueBarrierTimeout() 60 # Release all the blocked nodes. 61 for _ in range(self.n_): 62 self.queue_node_.put(token) 63 return result 64 65 66 def node_barrier(self, token=None, timeout=None): 67 """Makes a node wait until all the "n" nodes and the main have 68 reached this point. 69 70 @param token: A value passed to the main. 71 @param timeout: The timeout, in seconds, to wait for the nodes. 72 A None value will block forever. 73 """ 74 self.queue_main_.put(token) 75 try: 76 return self.queue_node_.get(timeout=timeout) 77 except queues.Empty: 78 # Timeout expired 79 raise QueueBarrierTimeout() 80