• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import datetime, timedelta
18import logging
19from queue import SimpleQueue, Empty
20
21from mobly import asserts
22
23from google.protobuf import text_format
24
25
26class EventAsserts(object):
27    """
28    A class that handles various asserts with respect to a gRPC unary stream
29
30    This class must be created before an event happens as events in a
31    EventCallbackStream is not sticky and will be lost if you don't subscribe
32    to them before generating those events.
33
34    When asserting on sequential events, a single EventAsserts object is enough
35
36    When asserting on simultaneous events, you would need multiple EventAsserts
37    objects as each EventAsserts object owns a separate queue that is actively
38    being popped as asserted events happen
39    """
40    DEFAULT_TIMEOUT_SECONDS = 3
41
42    def __init__(self, event_callback_stream):
43        if event_callback_stream is None:
44            raise ValueError("event_callback_stream cannot be None")
45        self.event_callback_stream = event_callback_stream
46        self.event_queue = SimpleQueue()
47        self.callback = lambda event: self.event_queue.put(event)
48        self.event_callback_stream.register_callback(self.callback)
49
50    def __del__(self):
51        self.event_callback_stream.unregister_callback(self.callback)
52
53    def remaining_time_delta(self, end_time):
54        remaining = end_time - datetime.now()
55        if remaining < timedelta(milliseconds=0):
56            remaining = timedelta(milliseconds=0)
57        return remaining
58
59    def assert_none(self, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
60        """
61        Assert no event happens within timeout period
62
63        :param timeout: a timedelta object
64        :return:
65        """
66        logging.debug("assert_none %fs" % (timeout.total_seconds()))
67        try:
68            event = self.event_queue.get(timeout=timeout.total_seconds())
69            asserts.assert_true(
70                event is None,
71                msg=("Expected None, but got %s" % text_format.MessageToString(
72                    event, as_one_line=True)))
73        except Empty:
74            return
75
76    def assert_none_matching(
77            self, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
78        """
79        Assert no events where match_fn(event) is True happen within timeout
80        period
81
82        :param match_fn: return True/False on match_fn(event)
83        :param timeout: a timedelta object
84        :return:
85        """
86        logging.debug("assert_none_matching %fs" % (timeout.total_seconds()))
87        event = None
88        end_time = datetime.now() + timeout
89        while event is None and datetime.now() < end_time:
90            remaining = self.remaining_time_delta(end_time)
91            logging.debug("Waiting for event (%fs remaining)" %
92                          (remaining.total_seconds()))
93            try:
94                current_event = self.event_queue.get(
95                    timeout=remaining.total_seconds())
96                if match_fn(current_event):
97                    event = current_event
98            except Empty:
99                continue
100        logging.debug("Done waiting for an event")
101        if event is None:
102            return  # Avoid an assert in MessageToString(None, ...)
103        asserts.assert_true(
104            event is None,
105            msg=("Expected None matching, but got %s" %
106                 text_format.MessageToString(event, as_one_line=True)))
107
108    def assert_event_occurs(self,
109                            match_fn,
110                            at_least_times=1,
111                            timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
112        """
113        Assert at least |at_least_times| instances of events happen where
114        match_fn(event) returns True within timeout period
115
116        :param match_fn: returns True/False on match_fn(event)
117        :param timeout: a timedelta object
118        :param at_least_times: how many times at least a matching event should
119                               happen
120        :return:
121        """
122        logging.debug("assert_event_occurs %d %fs" % (at_least_times,
123                                                      timeout.total_seconds()))
124        event_list = []
125        end_time = datetime.now() + timeout
126        while len(event_list) < at_least_times and datetime.now() < end_time:
127            remaining = self.remaining_time_delta(end_time)
128            logging.debug("Waiting for event (%fs remaining)" %
129                          (remaining.total_seconds()))
130            try:
131                current_event = self.event_queue.get(
132                    timeout=remaining.total_seconds())
133                if match_fn(current_event):
134                    event_list.append(current_event)
135            except Empty:
136                continue
137        logging.debug("Done waiting for event")
138        asserts.assert_true(
139            len(event_list) >= at_least_times,
140            msg=("Expected at least %d events, but got %d" % (at_least_times,
141                                                              len(event_list))))
142
143    def assert_event_occurs_at_most(
144            self,
145            match_fn,
146            at_most_times,
147            timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
148        """
149        Assert at most |at_most_times| instances of events happen where
150        match_fn(event) returns True within timeout period
151
152        :param match_fn: returns True/False on match_fn(event)
153        :param at_most_times: how many times at most a matching event should
154                               happen
155        :param timeout:a timedelta object
156        :return:
157        """
158        logging.debug("assert_event_occurs_at_most")
159        event_list = []
160        end_time = datetime.now() + timeout
161        while len(event_list) <= at_most_times and datetime.now() < end_time:
162            remaining = self.remaining_time_delta(end_time)
163            logging.debug("Waiting for event iteration (%fs remaining)" %
164                          (remaining.total_seconds()))
165            try:
166                current_event = self.event_queue.get(
167                    timeout=remaining.total_seconds())
168                if match_fn(current_event):
169                    event_list.append(current_event)
170            except Empty:
171                continue
172        logging.debug("Done waiting, got %d events" % len(event_list))
173        asserts.assert_true(
174            len(event_list) <= at_most_times,
175            msg=("Expected at most %d events, but got %d" % (at_most_times,
176                                                             len(event_list))))
177