1#!/usr/bin/python3.4 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18 19from acts import asserts 20from acts.test_decorators import test_tracker_info 21from acts_contrib.test_utils.wifi.aware import aware_const as aconsts 22from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils 23from acts_contrib.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest 24 25KEY_ID = "id" 26KEY_TX_OK_COUNT = "tx_ok_count" 27KEY_TX_FAIL_COUNT = "tx_fail_count" 28KEY_RX_COUNT = "rx_count" 29 30 31class MessagesStressTest(AwareBaseTest): 32 """Set of stress tests for Wi-Fi Aware L2 (layer 2) message exchanges.""" 33 # Number of the message queue depth per Uid from framework 34 MESSAGE_QUEUE_DEPTH_PER_UID = 50 35 36 # Number of iterations in the stress test (number of messages) 37 # Should be larger than MESSAGE_QUEUE_DEPTH_PER_UID 38 NUM_ITERATIONS = 200 39 40 # Number of message to send per round to avoid exceed message queue depth limit 41 # Should be less than or equal to 1/2 of MESSAGE_QUEUE_DEPTH_PER_UID 42 NUM_PER_ROUND = 20 43 NUM_ROUNDS = 5 44 45 # Maximum permitted percentage of messages which fail to be transmitted 46 # correctly 47 MAX_TX_FAILURE_PERCENTAGE = 2 48 49 # Maximum permitted percentage of messages which are received more than once 50 # (indicating, most likely, that the ACK wasn't received and the message was 51 # retransmitted) 52 MAX_DUPLICATE_RX_PERCENTAGE = 2 53 54 SERVICE_NAME = "GoogleTestServiceXY" 55 56 def init_info(self, msg, id, messages_by_msg, messages_by_id): 57 """Initialize the message data structures. 58 59 Args: 60 msg: message text 61 id: message id 62 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 63 messages_by_id: {id -> text} 64 """ 65 messages_by_msg[msg] = {} 66 messages_by_msg[msg][KEY_ID] = id 67 messages_by_msg[msg][KEY_TX_OK_COUNT] = 0 68 messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0 69 messages_by_msg[msg][KEY_RX_COUNT] = 0 70 messages_by_id[id] = msg 71 72 def wait_for_tx_events(self, dut, num_msgs, messages_by_msg, 73 messages_by_id): 74 """Wait for messages to be transmitted and update data structures. 75 76 Args: 77 dut: device under test 78 num_msgs: number of expected message tx 79 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 80 messages_by_id: {id -> text} 81 """ 82 num_ok_tx_confirmations = 0 83 num_fail_tx_confirmations = 0 84 num_unexpected_ids = 0 85 tx_events_regex = "%s|%s" % (aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED, 86 aconsts.SESSION_CB_ON_MESSAGE_SENT) 87 while num_ok_tx_confirmations + num_fail_tx_confirmations < num_msgs: 88 try: 89 events = dut.ed.pop_events(tx_events_regex, 90 autils.EVENT_TIMEOUT) 91 for event in events: 92 if (event["name"] != aconsts.SESSION_CB_ON_MESSAGE_SENT 93 and event["name"] != 94 aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED): 95 asserts.fail("Unexpected event: %s" % event) 96 is_tx_ok = event[ 97 "name"] == aconsts.SESSION_CB_ON_MESSAGE_SENT 98 99 id = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_ID] 100 if id in messages_by_id: 101 msg = messages_by_id[id] 102 if is_tx_ok: 103 messages_by_msg[msg][ 104 KEY_TX_OK_COUNT] = messages_by_msg[msg][KEY_TX_OK_COUNT] + 1 105 if messages_by_msg[msg][KEY_TX_OK_COUNT] == 1: 106 num_ok_tx_confirmations = num_ok_tx_confirmations + 1 107 else: 108 messages_by_msg[msg][KEY_TX_FAIL_COUNT] = ( 109 messages_by_msg[msg][KEY_TX_FAIL_COUNT] + 1) 110 if messages_by_msg[msg][KEY_TX_FAIL_COUNT] == 1: 111 num_fail_tx_confirmations = num_fail_tx_confirmations + 1 112 else: 113 self.log.warning( 114 "Tx confirmation of unknown message ID received: %s", 115 event) 116 num_unexpected_ids = num_unexpected_ids + 1 117 except queue.Empty: 118 self.log.warning( 119 "[%s] Timed out waiting for any MESSAGE_SEND* event - " 120 "assuming the rest are not coming", dut.pretty_name) 121 break 122 123 return (num_ok_tx_confirmations, num_fail_tx_confirmations, 124 num_unexpected_ids) 125 126 def wait_for_rx_events(self, dut, num_msgs, messages_by_msg): 127 """Wait for messages to be received and update data structures 128 129 Args: 130 dut: device under test 131 num_msgs: number of expected messages to receive 132 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 133 """ 134 num_rx_msgs = 0 135 while num_rx_msgs < num_msgs: 136 try: 137 event = dut.ed.pop_event( 138 aconsts.SESSION_CB_ON_MESSAGE_RECEIVED, 139 autils.EVENT_TIMEOUT) 140 msg = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_AS_STRING] 141 if msg not in messages_by_msg: 142 messages_by_msg[msg] = {} 143 messages_by_msg[msg][KEY_ID] = -1 144 messages_by_msg[msg][KEY_TX_OK_COUNT] = 0 145 messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0 146 messages_by_msg[msg][KEY_RX_COUNT] = 1 147 148 messages_by_msg[msg][ 149 KEY_RX_COUNT] = messages_by_msg[msg][KEY_RX_COUNT] + 1 150 if messages_by_msg[msg][KEY_RX_COUNT] == 1: 151 num_rx_msgs = num_rx_msgs + 1 152 except queue.Empty: 153 self.log.warning( 154 "[%s] Timed out waiting for ON_MESSAGE_RECEIVED event - " 155 "assuming the rest are not coming", dut.pretty_name) 156 break 157 158 def analyze_results(self, results, messages_by_msg): 159 """Analyze the results of the stress message test and add to the results 160 dictionary 161 162 Args: 163 results: result dictionary into which to add data 164 messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 165 """ 166 results["raw_data"] = messages_by_msg 167 results["tx_count_success"] = 0 168 results["tx_count_duplicate_success"] = 0 169 results["tx_count_fail"] = 0 170 results["tx_count_duplicate_fail"] = 0 171 results["tx_count_neither"] = 0 172 results["tx_count_tx_ok_but_no_rx"] = 0 173 results["rx_count"] = 0 174 results["rx_count_duplicate"] = 0 175 results["rx_count_no_ok_tx_indication"] = 0 176 results["rx_count_fail_tx_indication"] = 0 177 results["rx_count_no_tx_message"] = 0 178 179 for msg, data in messages_by_msg.items(): 180 if data[KEY_TX_OK_COUNT] > 0: 181 results["tx_count_success"] = results["tx_count_success"] + 1 182 if data[KEY_TX_OK_COUNT] > 1: 183 results["tx_count_duplicate_success"] += 1 184 if data[KEY_TX_FAIL_COUNT] > 0: 185 results["tx_count_fail"] += 1 186 if data[KEY_TX_FAIL_COUNT] > 1: 187 results["tx_count_duplicate_fail"] += 1 188 if (data[KEY_TX_OK_COUNT] == 0 and data[KEY_TX_FAIL_COUNT] == 0 189 and data[KEY_ID] != -1): 190 results["tx_count_neither"] += 1 191 if data[KEY_TX_OK_COUNT] > 0 and data[KEY_RX_COUNT] == 0: 192 results["tx_count_tx_ok_but_no_rx"] += 1 193 if data[KEY_RX_COUNT] > 0: 194 results["rx_count"] += 1 195 if data[KEY_RX_COUNT] > 1: 196 results["rx_count_duplicate"] += 1 197 if data[KEY_RX_COUNT] > 0 and data[KEY_TX_OK_COUNT] == 0: 198 results["rx_count_no_ok_tx_indication"] += 1 199 if data[KEY_RX_COUNT] > 0 and data[KEY_TX_FAIL_COUNT] > 0: 200 results["rx_count_fail_tx_indication"] += 1 201 if data[KEY_RX_COUNT] > 0 and data[KEY_ID] == -1: 202 results["rx_count_no_tx_message"] += 1 203 204 ####################################################################### 205 206 @test_tracker_info(uuid="e88c060f-4ca7-41c1-935a-d3d62878ec0b") 207 def test_stress_message_no_throttling(self): 208 """Stress test for bi-directional message transmission and reception no throttling""" 209 p_dut = self.android_devices[0] 210 s_dut = self.android_devices[1] 211 212 # Start up a discovery session 213 discovery_data = autils.create_discovery_pair( 214 p_dut, 215 s_dut, 216 p_config=autils.create_discovery_config( 217 self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), 218 s_config=autils.create_discovery_config( 219 self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), 220 device_startup_offset=self.device_startup_offset, 221 msg_id=self.get_next_msg_id()) 222 p_id = discovery_data[0] 223 s_id = discovery_data[1] 224 p_disc_id = discovery_data[2] 225 s_disc_id = discovery_data[3] 226 peer_id_on_sub = discovery_data[4] 227 peer_id_on_pub = discovery_data[5] 228 229 # Store information on Tx & Rx messages 230 messages_by_msg = {} # keyed by message text 231 # {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 232 messages_by_id = {} # keyed by message ID {id -> text} 233 iterations = 0 234 p_tx_ok_count_total = 0 235 p_tx_fail_count_total = 0 236 p_tx_unknown_id_total = 0 237 s_tx_ok_count_total = 0 238 s_tx_fail_count_total = 0 239 s_tx_unknown_id_total = 0 240 241 # First round will fill up the message queue 242 num_of_messages_this_round = self.MESSAGE_QUEUE_DEPTH_PER_UID 243 244 # send messages (one in each direction) in rounds to avoid exceed the queue limit 245 for j in range(self.NUM_ROUNDS): 246 for k in range(num_of_messages_this_round): 247 msg_p2s = "Message Publisher -> Subscriber #%d" % iterations 248 next_msg_id = self.get_next_msg_id() 249 self.init_info(msg_p2s, next_msg_id, messages_by_msg, 250 messages_by_id) 251 p_dut.droid.wifiAwareSendMessage(p_disc_id, peer_id_on_pub, 252 next_msg_id, msg_p2s, 0) 253 254 msg_s2p = "Message Subscriber -> Publisher #%d" % iterations 255 next_msg_id = self.get_next_msg_id() 256 self.init_info(msg_s2p, next_msg_id, messages_by_msg, 257 messages_by_id) 258 s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub, 259 next_msg_id, msg_s2p, 0) 260 iterations += 1 261 262 # wait for message tx confirmation 263 (p_tx_ok_count, p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events( 264 p_dut, self.NUM_PER_ROUND, messages_by_msg, messages_by_id) 265 p_tx_ok_count_total += p_tx_ok_count 266 p_tx_fail_count_total += p_tx_fail_count 267 p_tx_unknown_id_total += p_tx_unknown_id 268 (s_tx_ok_count, s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events( 269 s_dut, self.NUM_PER_ROUND, messages_by_msg, messages_by_id) 270 s_tx_ok_count_total += s_tx_ok_count 271 s_tx_fail_count_total += s_tx_fail_count 272 s_tx_unknown_id_total += s_tx_unknown_id 273 274 num_of_messages_this_round = self.NUM_PER_ROUND 275 276 # wait for the rest message tx confirmation 277 p_tx_total = p_tx_ok_count_total + p_tx_fail_count_total + p_tx_unknown_id_total 278 s_tx_total = s_tx_ok_count_total + s_tx_fail_count_total + s_tx_unknown_id_total 279 (p_tx_ok_count, p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events( 280 p_dut, iterations - p_tx_total, messages_by_msg, messages_by_id) 281 (s_tx_ok_count, s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events( 282 s_dut, iterations - s_tx_total, messages_by_msg, messages_by_id) 283 p_tx_ok_count_total += p_tx_ok_count 284 p_tx_fail_count_total += p_tx_fail_count 285 p_tx_unknown_id_total += p_tx_unknown_id 286 s_tx_ok_count_total += s_tx_ok_count 287 s_tx_fail_count_total += s_tx_fail_count 288 s_tx_unknown_id_total += s_tx_unknown_id 289 self.log.info( 290 "Transmission done: pub=%d, sub=%d transmitted successfully", 291 p_tx_ok_count_total, s_tx_ok_count_total) 292 293 # wait for message rx confirmation (giving it the total number of messages 294 # transmitted rather than just those transmitted correctly since sometimes 295 # the Tx doesn't get that information correctly. I.e. a message the Tx 296 # thought was not transmitted correctly is actually received - missing ACK? 297 # bug?) 298 self.wait_for_rx_events(p_dut, iterations, messages_by_msg) 299 self.wait_for_rx_events(s_dut, iterations, messages_by_msg) 300 301 # analyze results 302 results = {} 303 results["tx_count"] = 2 * iterations 304 results["tx_unknown_ids"] = p_tx_unknown_id_total + s_tx_unknown_id_total 305 self.analyze_results(results, messages_by_msg) 306 307 # clear errors 308 asserts.assert_equal(results["tx_unknown_ids"], 0, 309 "Message ID corruption", results) 310 asserts.assert_equal(results["tx_count_neither"], 0, 311 "Tx message with no success or fail indication", 312 results) 313 asserts.assert_equal(results["tx_count_duplicate_fail"], 0, 314 "Duplicate Tx fail messages", results) 315 asserts.assert_equal(results["tx_count_duplicate_success"], 0, 316 "Duplicate Tx success messages", results) 317 asserts.assert_equal(results["rx_count_no_tx_message"], 0, 318 "Rx message which wasn't sent - message corruption?", results) 319 asserts.assert_equal(results["tx_count_tx_ok_but_no_rx"], 0, 320 "Tx got ACK but Rx didn't get message", results) 321 322 # possibly ok - but flag since most frequently a bug 323 asserts.assert_equal(results["rx_count_no_ok_tx_indication"], 0, 324 "Message received but Tx didn't get ACK", results) 325 asserts.assert_equal(results["rx_count_fail_tx_indication"], 0, 326 "Message received but Tx didn't get ACK", results) 327 328 # permissible failures based on thresholds 329 asserts.assert_true( 330 results["tx_count_fail"] <= 331 (self.MAX_TX_FAILURE_PERCENTAGE * iterations * 2 / 100), 332 "Number of Tx failures exceeds threshold", extras=results) 333 asserts.assert_true( 334 results["rx_count_duplicate"] <= 335 (self.MAX_DUPLICATE_RX_PERCENTAGE * iterations * 2 / 100), 336 "Number of duplicate Rx exceeds threshold", extras=results) 337 338 asserts.explicit_pass("test_stress_message_no_throttling done", extras=results) 339 340 @test_tracker_info(uuid="546b0c6f-3071-4330-8e23-842ecbd07018") 341 def test_stress_message_throttling(self): 342 """Stress test for bi-directional message transmission and reception with throttling""" 343 p_dut = self.android_devices[0] 344 s_dut = self.android_devices[1] 345 346 # Start up a discovery session 347 discovery_data = autils.create_discovery_pair( 348 p_dut, 349 s_dut, 350 p_config=autils.create_discovery_config( 351 self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), 352 s_config=autils.create_discovery_config( 353 self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), 354 device_startup_offset=self.device_startup_offset, 355 msg_id=self.get_next_msg_id()) 356 p_id = discovery_data[0] 357 s_id = discovery_data[1] 358 p_disc_id = discovery_data[2] 359 s_disc_id = discovery_data[3] 360 peer_id_on_sub = discovery_data[4] 361 peer_id_on_pub = discovery_data[5] 362 363 # Store information on Tx & Rx messages 364 messages_by_msg = {} # keyed by message text 365 # {text -> {id, tx_ok_count, tx_fail_count, rx_count}} 366 messages_by_id = {} # keyed by message ID {id -> text} 367 368 # send all messages at once (one in each direction) 369 for i in range(self.NUM_ITERATIONS): 370 msg_p2s = "Message Publisher -> Subscriber #%d" % i 371 next_msg_id = self.get_next_msg_id() 372 self.init_info(msg_p2s, next_msg_id, messages_by_msg, 373 messages_by_id) 374 p_dut.droid.wifiAwareSendMessage(p_disc_id, peer_id_on_pub, 375 next_msg_id, msg_p2s, 0) 376 377 msg_s2p = "Message Subscriber -> Publisher #%d" % i 378 next_msg_id = self.get_next_msg_id() 379 self.init_info(msg_s2p, next_msg_id, messages_by_msg, 380 messages_by_id) 381 s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub, 382 next_msg_id, msg_s2p, 0) 383 384 # wait for message tx confirmation 385 (p_tx_ok_count, 386 p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events( 387 p_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id) 388 (s_tx_ok_count, 389 s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events( 390 s_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id) 391 self.log.info( 392 "Transmission done: pub=%d, sub=%d transmitted successfully", 393 p_tx_ok_count, s_tx_ok_count) 394 395 # wait for message rx confirmation (giving it the total number of messages 396 # transmitted rather than just those transmitted correctly since sometimes 397 # the Tx doesn't get that information correctly. I.e. a message the Tx 398 # thought was not transmitted correctly is actually received - missing ACK? 399 # bug?) 400 self.wait_for_rx_events(p_dut, self.NUM_ITERATIONS, messages_by_msg) 401 self.wait_for_rx_events(s_dut, self.NUM_ITERATIONS, messages_by_msg) 402 403 # analyze results 404 results = {} 405 results["tx_count"] = 2 * self.NUM_ITERATIONS 406 results["tx_unknown_ids"] = p_tx_unknown_id + s_tx_unknown_id 407 self.analyze_results(results, messages_by_msg) 408 409 # clear errors 410 asserts.assert_equal(results["tx_unknown_ids"], 0, 411 "Message ID corruption", results) 412 asserts.assert_equal(results["tx_count_neither"], 0, 413 "Tx message with no success or fail indication", 414 results) 415 asserts.assert_equal(results["tx_count_duplicate_fail"], 0, 416 "Duplicate Tx fail messages", results) 417 asserts.assert_equal(results["tx_count_duplicate_success"], 0, 418 "Duplicate Tx success messages", results) 419 asserts.assert_equal( 420 results["rx_count_no_tx_message"], 0, 421 "Rx message which wasn't sent - message corruption?", results) 422 asserts.assert_equal(results["tx_count_tx_ok_but_no_rx"], 0, 423 "Tx got ACK but Rx didn't get message", results) 424 425 # possibly ok - but flag since most frequently a bug 426 asserts.assert_equal(results["rx_count_no_ok_tx_indication"], 0, 427 "Message received but Tx didn't get ACK", results) 428 asserts.assert_equal(results["rx_count_fail_tx_indication"], 0, 429 "Message received but Tx didn't get ACK", results) 430 431 # permissible failures based on thresholds 432 asserts.assert_true( 433 results["rx_count_duplicate"] <= 434 (self.MAX_DUPLICATE_RX_PERCENTAGE * results["tx_count_success"] / 100), 435 "Number of duplicate Rx exceeds threshold", extras=results) 436 437 # check working status message queue limit per UID 438 asserts.assert_true( 439 results["tx_count_success"] >= self.MESSAGE_QUEUE_DEPTH_PER_UID * 2, 440 "Number of messages did not reach uid message queue limit", extras=results) 441 asserts.assert_true( 442 results["tx_count_success"] < self.NUM_ITERATIONS * 2, 443 "Seems uid message queue limit is not working, Tx all message", extras=results) 444 445 asserts.explicit_pass("test_stress_message_throttling done", extras=results) 446