1#!/usr/bin/python3.4 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18 19from acts import asserts 20from acts.test_decorators import test_tracker_info 21from acts.test_utils.wifi.aware import aware_const as aconsts 22from acts.test_utils.wifi.aware import aware_test_utils as autils 23from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest 24 25KEY_ID = "id" 26KEY_TX_OK_COUNT = "tx_ok_count" 27KEY_TX_FAIL_COUNT = "tx_fail_count" 28KEY_RX_COUNT = "rx_count" 29 30 31class MessagesStressTest(AwareBaseTest): 32 """Set of stress tests for Wi-Fi Aware L2 (layer 2) message exchanges.""" 33 34 # Number of iterations in the stress test (number of messages) 35 NUM_ITERATIONS = 100 36 37 # Maximum permitted percentage of messages which fail to be transmitted 38 # correctly 39 MAX_TX_FAILURE_PERCENTAGE = 2 40 41 # Maximum permitted percentage of messages which are received more than once 42 # (indicating, most likely, that the ACK wasn't received and the message was 43 # retransmitted) 44 MAX_DUPLICATE_RX_PERCENTAGE = 2 45 46 SERVICE_NAME = "GoogleTestServiceXY" 47 48 def __init__(self, controllers): 49 AwareBaseTest.__init__(self, controllers) 50 51 def init_info(self, msg, id, messages_by_msg, messages_by_id): 52 """Initialize the message data structures. 53 54 Args: 55 msg: message text 56 id: message id 57 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 58 messages_by_id: {id -> text} 59 """ 60 messages_by_msg[msg] = {} 61 messages_by_msg[msg][KEY_ID] = id 62 messages_by_msg[msg][KEY_TX_OK_COUNT] = 0 63 messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0 64 messages_by_msg[msg][KEY_RX_COUNT] = 0 65 messages_by_id[id] = msg 66 67 def wait_for_tx_events(self, dut, num_msgs, messages_by_msg, 68 messages_by_id): 69 """Wait for messages to be transmitted and update data structures. 70 71 Args: 72 dut: device under test 73 num_msgs: number of expected message tx 74 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 75 messages_by_id: {id -> text} 76 """ 77 num_ok_tx_confirmations = 0 78 num_fail_tx_confirmations = 0 79 num_unexpected_ids = 0 80 tx_events_regex = "%s|%s" % (aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED, 81 aconsts.SESSION_CB_ON_MESSAGE_SENT) 82 while num_ok_tx_confirmations + num_fail_tx_confirmations < num_msgs: 83 try: 84 events = dut.ed.pop_events(tx_events_regex, 85 autils.EVENT_TIMEOUT) 86 for event in events: 87 if (event["name"] != aconsts.SESSION_CB_ON_MESSAGE_SENT 88 and event["name"] != 89 aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED): 90 asserts.fail("Unexpected event: %s" % event) 91 is_tx_ok = event[ 92 "name"] == aconsts.SESSION_CB_ON_MESSAGE_SENT 93 94 id = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_ID] 95 if id in messages_by_id: 96 msg = messages_by_id[id] 97 if is_tx_ok: 98 messages_by_msg[msg][ 99 KEY_TX_OK_COUNT] = messages_by_msg[msg][KEY_TX_OK_COUNT] + 1 100 if messages_by_msg[msg][KEY_TX_OK_COUNT] == 1: 101 num_ok_tx_confirmations = num_ok_tx_confirmations + 1 102 else: 103 messages_by_msg[msg][KEY_TX_FAIL_COUNT] = ( 104 messages_by_msg[msg][KEY_TX_FAIL_COUNT] + 1) 105 if messages_by_msg[msg][KEY_TX_FAIL_COUNT] == 1: 106 num_fail_tx_confirmations = num_fail_tx_confirmations + 1 107 else: 108 self.log.warning( 109 "Tx confirmation of unknown message ID received: %s", 110 event) 111 num_unexpected_ids = num_unexpected_ids + 1 112 except queue.Empty: 113 self.log.warning( 114 "[%s] Timed out waiting for any MESSAGE_SEND* event - " 115 "assuming the rest are not coming", dut.pretty_name) 116 break 117 118 return (num_ok_tx_confirmations, num_fail_tx_confirmations, 119 num_unexpected_ids) 120 121 def wait_for_rx_events(self, dut, num_msgs, messages_by_msg): 122 """Wait for messages to be received and update data structures 123 124 Args: 125 dut: device under test 126 num_msgs: number of expected messages to receive 127 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 128 """ 129 num_rx_msgs = 0 130 while num_rx_msgs < num_msgs: 131 try: 132 event = dut.ed.pop_event( 133 aconsts.SESSION_CB_ON_MESSAGE_RECEIVED, 134 autils.EVENT_TIMEOUT) 135 msg = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_AS_STRING] 136 if msg not in messages_by_msg: 137 messages_by_msg[msg] = {} 138 messages_by_msg[msg][KEY_ID] = -1 139 messages_by_msg[msg][KEY_TX_OK_COUNT] = 0 140 messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0 141 messages_by_msg[msg][KEY_RX_COUNT] = 1 142 143 messages_by_msg[msg][ 144 KEY_RX_COUNT] = messages_by_msg[msg][KEY_RX_COUNT] + 1 145 if messages_by_msg[msg][KEY_RX_COUNT] == 1: 146 num_rx_msgs = num_rx_msgs + 1 147 except queue.Empty: 148 self.log.warning( 149 "[%s] Timed out waiting for ON_MESSAGE_RECEIVED event - " 150 "assuming the rest are not coming", dut.pretty_name) 151 break 152 153 def analyze_results(self, results, messages_by_msg): 154 """Analyze the results of the stress message test and add to the results 155 dictionary 156 157 Args: 158 results: result dictionary into which to add data 159 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 160 """ 161 results["raw_data"] = messages_by_msg 162 results["tx_count_success"] = 0 163 results["tx_count_duplicate_success"] = 0 164 results["tx_count_fail"] = 0 165 results["tx_count_duplicate_fail"] = 0 166 results["tx_count_neither"] = 0 167 results["tx_count_tx_ok_but_no_rx"] = 0 168 results["rx_count"] = 0 169 results["rx_count_duplicate"] = 0 170 results["rx_count_no_ok_tx_indication"] = 0 171 results["rx_count_fail_tx_indication"] = 0 172 results["rx_count_no_tx_message"] = 0 173 174 for msg, data in messages_by_msg.items(): 175 if data[KEY_TX_OK_COUNT] > 0: 176 results["tx_count_success"] = results["tx_count_success"] + 1 177 if data[KEY_TX_OK_COUNT] > 1: 178 results["tx_count_duplicate_success"] = ( 179 results["tx_count_duplicate_success"] + 1) 180 if data[KEY_TX_FAIL_COUNT] > 0: 181 results["tx_count_fail"] = results["tx_count_fail"] + 1 182 if data[KEY_TX_FAIL_COUNT] > 1: 183 results[ 184 "tx_count_duplicate_fail"] = results["tx_count_duplicate_fail"] + 1 185 if (data[KEY_TX_OK_COUNT] == 0 and data[KEY_TX_FAIL_COUNT] == 0 186 and data[KEY_ID] != -1): 187 results["tx_count_neither"] = results["tx_count_neither"] + 1 188 if data[KEY_TX_OK_COUNT] > 0 and data[KEY_RX_COUNT] == 0: 189 results["tx_count_tx_ok_but_no_rx"] = ( 190 results["tx_count_tx_ok_but_no_rx"] + 1) 191 if data[KEY_RX_COUNT] > 0: 192 results["rx_count"] = results["rx_count"] + 1 193 if data[KEY_RX_COUNT] > 1: 194 results[ 195 "rx_count_duplicate"] = results["rx_count_duplicate"] + 1 196 if data[KEY_RX_COUNT] > 0 and data[KEY_TX_OK_COUNT] == 0: 197 results["rx_count_no_ok_tx_indication"] = ( 198 results["rx_count_no_ok_tx_indication"] + 1) 199 if data[KEY_RX_COUNT] > 0 and data[KEY_TX_FAIL_COUNT] > 0: 200 results["rx_count_fail_tx_indication"] = ( 201 results["rx_count_fail_tx_indication"] + 1) 202 if data[KEY_RX_COUNT] > 0 and data[KEY_ID] == -1: 203 results[ 204 "rx_count_no_tx_message"] = results["rx_count_no_tx_message"] + 1 205 206 ####################################################################### 207 208 @test_tracker_info(uuid="e88c060f-4ca7-41c1-935a-d3d62878ec0b") 209 def test_stress_message(self): 210 """Stress test for bi-directional message transmission and reception.""" 211 p_dut = self.android_devices[0] 212 s_dut = self.android_devices[1] 213 214 # Start up a discovery session 215 discovery_data = autils.create_discovery_pair( 216 p_dut, 217 s_dut, 218 p_config=autils.create_discovery_config( 219 self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), 220 s_config=autils.create_discovery_config( 221 self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), 222 device_startup_offset=self.device_startup_offset, 223 msg_id=self.get_next_msg_id()) 224 p_id = discovery_data[0] 225 s_id = discovery_data[1] 226 p_disc_id = discovery_data[2] 227 s_disc_id = discovery_data[3] 228 peer_id_on_sub = discovery_data[4] 229 peer_id_on_pub = discovery_data[5] 230 231 # Store information on Tx & Rx messages 232 messages_by_msg = {} # keyed by message text 233 # {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 234 messages_by_id = {} # keyed by message ID {id -> text} 235 236 # send all messages at once (one in each direction) 237 for i in range(self.NUM_ITERATIONS): 238 msg_p2s = "Message Publisher -> Subscriber #%d" % i 239 next_msg_id = self.get_next_msg_id() 240 self.init_info(msg_p2s, next_msg_id, messages_by_msg, 241 messages_by_id) 242 p_dut.droid.wifiAwareSendMessage(p_disc_id, peer_id_on_pub, 243 next_msg_id, msg_p2s, 0) 244 245 msg_s2p = "Message Subscriber -> Publisher #%d" % i 246 next_msg_id = self.get_next_msg_id() 247 self.init_info(msg_s2p, next_msg_id, messages_by_msg, 248 messages_by_id) 249 s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub, 250 next_msg_id, msg_s2p, 0) 251 252 # wait for message tx confirmation 253 (p_tx_ok_count, 254 p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events( 255 p_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id) 256 (s_tx_ok_count, 257 s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events( 258 s_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id) 259 self.log.info( 260 "Transmission done: pub=%d, sub=%d transmitted successfully", 261 p_tx_ok_count, s_tx_ok_count) 262 263 # wait for message rx confirmation (giving it the total number of messages 264 # transmitted rather than just those transmitted correctly since sometimes 265 # the Tx doesn't get that information correctly. I.e. a message the Tx 266 # thought was not transmitted correctly is actually received - missing ACK? 267 # bug?) 268 self.wait_for_rx_events(p_dut, self.NUM_ITERATIONS, messages_by_msg) 269 self.wait_for_rx_events(s_dut, self.NUM_ITERATIONS, messages_by_msg) 270 271 # analyze results 272 results = {} 273 results["tx_count"] = 2 * self.NUM_ITERATIONS 274 results["tx_unknown_ids"] = p_tx_unknown_id + s_tx_unknown_id 275 self.analyze_results(results, messages_by_msg) 276 277 # clear errors 278 asserts.assert_equal(results["tx_unknown_ids"], 0, 279 "Message ID corruption", results) 280 asserts.assert_equal(results["tx_count_neither"], 0, 281 "Tx message with no success or fail indication", 282 results) 283 asserts.assert_equal(results["tx_count_duplicate_fail"], 0, 284 "Duplicate Tx fail messages", results) 285 asserts.assert_equal(results["tx_count_duplicate_success"], 0, 286 "Duplicate Tx success messages", results) 287 asserts.assert_equal( 288 results["rx_count_no_tx_message"], 0, 289 "Rx message which wasn't sent - message corruption?", results) 290 asserts.assert_equal(results["tx_count_tx_ok_but_no_rx"], 0, 291 "Tx got ACK but Rx didn't get message", results) 292 293 # possibly ok - but flag since most frequently a bug 294 asserts.assert_equal(results["rx_count_no_ok_tx_indication"], 0, 295 "Message received but Tx didn't get ACK", results) 296 asserts.assert_equal(results["rx_count_fail_tx_indication"], 0, 297 "Message received but Tx didn't get ACK", results) 298 299 # permissible failures based on thresholds 300 asserts.assert_true( 301 results["tx_count_fail"] <= 302 (self.MAX_TX_FAILURE_PERCENTAGE * self.NUM_ITERATIONS / 100), 303 "Number of Tx failures exceeds threshold", 304 extras=results) 305 asserts.assert_true( 306 results["rx_count_duplicate"] <= 307 (self.MAX_DUPLICATE_RX_PERCENTAGE * self.NUM_ITERATIONS / 100), 308 "Number of duplicate Rx exceeds threshold", 309 extras=results) 310 311 asserts.explicit_pass("test_stress_message done", extras=results) 312