• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests for lock_util."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import random
22import time
23
24from absl.testing import parameterized
25
26from tensorflow.python.platform import test
27from tensorflow.python.util import lock_util
28
29
30class GroupLockTest(test.TestCase, parameterized.TestCase):
31
32  @parameterized.parameters(1, 2, 3, 5, 10)
33  def testGroups(self, num_groups):
34    lock = lock_util.GroupLock(num_groups)
35    num_threads = 10
36    finished = set()
37
38    def thread_fn(thread_id):
39      time.sleep(random.random() * 0.1)
40      group_id = thread_id % num_groups
41      with lock.group(group_id):
42        time.sleep(random.random() * 0.1)
43        self.assertGreater(lock._group_member_counts[group_id], 0)
44        for g, c in enumerate(lock._group_member_counts):
45          if g != group_id:
46            self.assertEqual(0, c)
47        finished.add(thread_id)
48
49    threads = [
50        self.checkedThread(target=thread_fn, args=(i,))
51        for i in range(num_threads)
52    ]
53
54    for i in range(num_threads):
55      threads[i].start()
56    for i in range(num_threads):
57      threads[i].join()
58
59    self.assertEqual(set(range(num_threads)), finished)
60
61
62if __name__ == "__main__":
63  test.main()
64