• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for grpc.framework.foundation.logging_pool."""
15
16import threading
17import unittest
18
19from grpc.framework.foundation import logging_pool
20
21_POOL_SIZE = 16
22
23
24class _CallableObject(object):
25
26    def __init__(self):
27        self._lock = threading.Lock()
28        self._passed_values = []
29
30    def __call__(self, value):
31        with self._lock:
32            self._passed_values.append(value)
33
34    def passed_values(self):
35        with self._lock:
36            return tuple(self._passed_values)
37
38
39class LoggingPoolTest(unittest.TestCase):
40
41    def testUpAndDown(self):
42        pool = logging_pool.pool(_POOL_SIZE)
43        pool.shutdown(wait=True)
44
45        with logging_pool.pool(_POOL_SIZE) as pool:
46            self.assertIsNotNone(pool)
47
48    def testTaskExecuted(self):
49        test_list = []
50
51        with logging_pool.pool(_POOL_SIZE) as pool:
52            pool.submit(lambda: test_list.append(object())).result()
53
54        self.assertTrue(test_list)
55
56    def testException(self):
57        with logging_pool.pool(_POOL_SIZE) as pool:
58            raised_exception = pool.submit(lambda: 1 / 0).exception()
59
60        self.assertIsNotNone(raised_exception)
61
62    def testCallableObjectExecuted(self):
63        callable_object = _CallableObject()
64        passed_object = object()
65        with logging_pool.pool(_POOL_SIZE) as pool:
66            future = pool.submit(callable_object, passed_object)
67        self.assertIsNone(future.result())
68        self.assertSequenceEqual((passed_object,),
69                                 callable_object.passed_values())
70
71
72if __name__ == '__main__':
73    unittest.main(verbosity=2)
74