1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import datetime, timedelta 18import logging 19from queue import SimpleQueue, Empty 20 21from mobly import asserts 22 23from google.protobuf import text_format 24 25 26class EventAsserts(object): 27 """ 28 A class that handles various asserts with respect to a gRPC unary stream 29 30 This class must be created before an event happens as events in a 31 EventCallbackStream is not sticky and will be lost if you don't subscribe 32 to them before generating those events. 33 34 When asserting on sequential events, a single EventAsserts object is enough 35 36 When asserting on simultaneous events, you would need multiple EventAsserts 37 objects as each EventAsserts object owns a separate queue that is actively 38 being popped as asserted events happen 39 """ 40 DEFAULT_TIMEOUT_SECONDS = 3 41 42 def __init__(self, event_callback_stream): 43 if event_callback_stream is None: 44 raise ValueError("event_callback_stream cannot be None") 45 self.event_callback_stream = event_callback_stream 46 self.event_queue = SimpleQueue() 47 self.callback = lambda event: self.event_queue.put(event) 48 self.event_callback_stream.register_callback(self.callback) 49 50 def __del__(self): 51 self.event_callback_stream.unregister_callback(self.callback) 52 53 def remaining_time_delta(self, end_time): 54 remaining = end_time - datetime.now() 55 if remaining < timedelta(milliseconds=0): 56 remaining = timedelta(milliseconds=0) 57 return remaining 58 59 def assert_none(self, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): 60 """ 61 Assert no event happens within timeout period 62 63 :param timeout: a timedelta object 64 :return: 65 """ 66 logging.debug("assert_none %fs" % (timeout.total_seconds())) 67 try: 68 event = self.event_queue.get(timeout=timeout.total_seconds()) 69 asserts.assert_true( 70 event is None, 71 msg=("Expected None, but got %s" % text_format.MessageToString( 72 event, as_one_line=True))) 73 except Empty: 74 return 75 76 def assert_none_matching( 77 self, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): 78 """ 79 Assert no events where match_fn(event) is True happen within timeout 80 period 81 82 :param match_fn: return True/False on match_fn(event) 83 :param timeout: a timedelta object 84 :return: 85 """ 86 logging.debug("assert_none_matching %fs" % (timeout.total_seconds())) 87 event = None 88 end_time = datetime.now() + timeout 89 while event is None and datetime.now() < end_time: 90 remaining = self.remaining_time_delta(end_time) 91 logging.debug("Waiting for event (%fs remaining)" % 92 (remaining.total_seconds())) 93 try: 94 current_event = self.event_queue.get( 95 timeout=remaining.total_seconds()) 96 if match_fn(current_event): 97 event = current_event 98 except Empty: 99 continue 100 logging.debug("Done waiting for an event") 101 if event is None: 102 return # Avoid an assert in MessageToString(None, ...) 103 asserts.assert_true( 104 event is None, 105 msg=("Expected None matching, but got %s" % 106 text_format.MessageToString(event, as_one_line=True))) 107 108 def assert_event_occurs(self, 109 match_fn, 110 at_least_times=1, 111 timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): 112 """ 113 Assert at least |at_least_times| instances of events happen where 114 match_fn(event) returns True within timeout period 115 116 :param match_fn: returns True/False on match_fn(event) 117 :param timeout: a timedelta object 118 :param at_least_times: how many times at least a matching event should 119 happen 120 :return: 121 """ 122 logging.debug("assert_event_occurs %d %fs" % (at_least_times, 123 timeout.total_seconds())) 124 event_list = [] 125 end_time = datetime.now() + timeout 126 while len(event_list) < at_least_times and datetime.now() < end_time: 127 remaining = self.remaining_time_delta(end_time) 128 logging.debug("Waiting for event (%fs remaining)" % 129 (remaining.total_seconds())) 130 try: 131 current_event = self.event_queue.get( 132 timeout=remaining.total_seconds()) 133 if match_fn(current_event): 134 event_list.append(current_event) 135 except Empty: 136 continue 137 logging.debug("Done waiting for event") 138 asserts.assert_true( 139 len(event_list) >= at_least_times, 140 msg=("Expected at least %d events, but got %d" % (at_least_times, 141 len(event_list)))) 142 143 def assert_event_occurs_at_most( 144 self, 145 match_fn, 146 at_most_times, 147 timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): 148 """ 149 Assert at most |at_most_times| instances of events happen where 150 match_fn(event) returns True within timeout period 151 152 :param match_fn: returns True/False on match_fn(event) 153 :param at_most_times: how many times at most a matching event should 154 happen 155 :param timeout:a timedelta object 156 :return: 157 """ 158 logging.debug("assert_event_occurs_at_most") 159 event_list = [] 160 end_time = datetime.now() + timeout 161 while len(event_list) <= at_most_times and datetime.now() < end_time: 162 remaining = self.remaining_time_delta(end_time) 163 logging.debug("Waiting for event iteration (%fs remaining)" % 164 (remaining.total_seconds())) 165 try: 166 current_event = self.event_queue.get( 167 timeout=remaining.total_seconds()) 168 if match_fn(current_event): 169 event_list.append(current_event) 170 except Empty: 171 continue 172 logging.debug("Done waiting, got %d events" % len(event_list)) 173 asserts.assert_true( 174 len(event_list) <= at_most_times, 175 msg=("Expected at most %d events, but got %d" % (at_most_times, 176 len(event_list)))) 177