• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18
19from acts import asserts
20from acts.test_decorators import test_tracker_info
21from acts_contrib.test_utils.wifi.aware import aware_const as aconsts
22from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils
23from acts_contrib.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
24
25KEY_ID = "id"
26KEY_TX_OK_COUNT = "tx_ok_count"
27KEY_TX_FAIL_COUNT = "tx_fail_count"
28KEY_RX_COUNT = "rx_count"
29
30
31class MessagesStressTest(AwareBaseTest):
32    """Set of stress tests for Wi-Fi Aware L2 (layer 2) message exchanges."""
33    # Number of the message queue depth per Uid from framework
34    MESSAGE_QUEUE_DEPTH_PER_UID = 50
35
36    # Number of iterations in the stress test (number of messages)
37    # Should be larger than MESSAGE_QUEUE_DEPTH_PER_UID
38    NUM_ITERATIONS = 200
39
40    # Number of message to send per round to avoid exceed message queue depth limit
41    # Should be less than or equal to 1/2 of MESSAGE_QUEUE_DEPTH_PER_UID
42    NUM_PER_ROUND = 20
43    NUM_ROUNDS = 5
44
45    # Maximum permitted percentage of messages which fail to be transmitted
46    # correctly
47    MAX_TX_FAILURE_PERCENTAGE = 2
48
49    # Maximum permitted percentage of messages which are received more than once
50    # (indicating, most likely, that the ACK wasn't received and the message was
51    # retransmitted)
52    MAX_DUPLICATE_RX_PERCENTAGE = 2
53
54    SERVICE_NAME = "GoogleTestServiceXY"
55
56    def init_info(self, msg, id, messages_by_msg, messages_by_id):
57        """Initialize the message data structures.
58
59    Args:
60      msg: message text
61      id: message id
62      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
63      messages_by_id: {id -> text}
64    """
65        messages_by_msg[msg] = {}
66        messages_by_msg[msg][KEY_ID] = id
67        messages_by_msg[msg][KEY_TX_OK_COUNT] = 0
68        messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0
69        messages_by_msg[msg][KEY_RX_COUNT] = 0
70        messages_by_id[id] = msg
71
72    def wait_for_tx_events(self, dut, num_msgs, messages_by_msg,
73                           messages_by_id):
74        """Wait for messages to be transmitted and update data structures.
75
76    Args:
77      dut: device under test
78      num_msgs: number of expected message tx
79      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
80      messages_by_id: {id -> text}
81    """
82        num_ok_tx_confirmations = 0
83        num_fail_tx_confirmations = 0
84        num_unexpected_ids = 0
85        tx_events_regex = "%s|%s" % (aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED,
86                                     aconsts.SESSION_CB_ON_MESSAGE_SENT)
87        while num_ok_tx_confirmations + num_fail_tx_confirmations < num_msgs:
88            try:
89                events = dut.ed.pop_events(tx_events_regex,
90                                           autils.EVENT_TIMEOUT)
91                for event in events:
92                    if (event["name"] != aconsts.SESSION_CB_ON_MESSAGE_SENT
93                            and event["name"] !=
94                            aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED):
95                        asserts.fail("Unexpected event: %s" % event)
96                    is_tx_ok = event[
97                        "name"] == aconsts.SESSION_CB_ON_MESSAGE_SENT
98
99                    id = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_ID]
100                    if id in messages_by_id:
101                        msg = messages_by_id[id]
102                        if is_tx_ok:
103                            messages_by_msg[msg][
104                                KEY_TX_OK_COUNT] = messages_by_msg[msg][KEY_TX_OK_COUNT] + 1
105                            if messages_by_msg[msg][KEY_TX_OK_COUNT] == 1:
106                                num_ok_tx_confirmations = num_ok_tx_confirmations + 1
107                        else:
108                            messages_by_msg[msg][KEY_TX_FAIL_COUNT] = (
109                                messages_by_msg[msg][KEY_TX_FAIL_COUNT] + 1)
110                            if messages_by_msg[msg][KEY_TX_FAIL_COUNT] == 1:
111                                num_fail_tx_confirmations = num_fail_tx_confirmations + 1
112                    else:
113                        self.log.warning(
114                            "Tx confirmation of unknown message ID received: %s",
115                            event)
116                        num_unexpected_ids = num_unexpected_ids + 1
117            except queue.Empty:
118                self.log.warning(
119                    "[%s] Timed out waiting for any MESSAGE_SEND* event - "
120                    "assuming the rest are not coming", dut.pretty_name)
121                break
122
123        return (num_ok_tx_confirmations, num_fail_tx_confirmations,
124                num_unexpected_ids)
125
126    def wait_for_rx_events(self, dut, num_msgs, messages_by_msg):
127        """Wait for messages to be received and update data structures
128
129    Args:
130      dut: device under test
131      num_msgs: number of expected messages to receive
132      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
133    """
134        num_rx_msgs = 0
135        while num_rx_msgs < num_msgs:
136            try:
137                event = dut.ed.pop_event(
138                    aconsts.SESSION_CB_ON_MESSAGE_RECEIVED,
139                    autils.EVENT_TIMEOUT)
140                msg = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_AS_STRING]
141                if msg not in messages_by_msg:
142                    messages_by_msg[msg] = {}
143                    messages_by_msg[msg][KEY_ID] = -1
144                    messages_by_msg[msg][KEY_TX_OK_COUNT] = 0
145                    messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0
146                    messages_by_msg[msg][KEY_RX_COUNT] = 1
147
148                messages_by_msg[msg][
149                    KEY_RX_COUNT] = messages_by_msg[msg][KEY_RX_COUNT] + 1
150                if messages_by_msg[msg][KEY_RX_COUNT] == 1:
151                    num_rx_msgs = num_rx_msgs + 1
152            except queue.Empty:
153                self.log.warning(
154                    "[%s] Timed out waiting for ON_MESSAGE_RECEIVED event - "
155                    "assuming the rest are not coming", dut.pretty_name)
156                break
157
158    def analyze_results(self, results, messages_by_msg):
159        """Analyze the results of the stress message test and add to the results
160    dictionary
161
162    Args:
163      results: result dictionary into which to add data
164      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
165    """
166        results["raw_data"] = messages_by_msg
167        results["tx_count_success"] = 0
168        results["tx_count_duplicate_success"] = 0
169        results["tx_count_fail"] = 0
170        results["tx_count_duplicate_fail"] = 0
171        results["tx_count_neither"] = 0
172        results["tx_count_tx_ok_but_no_rx"] = 0
173        results["rx_count"] = 0
174        results["rx_count_duplicate"] = 0
175        results["rx_count_no_ok_tx_indication"] = 0
176        results["rx_count_fail_tx_indication"] = 0
177        results["rx_count_no_tx_message"] = 0
178
179        for msg, data in messages_by_msg.items():
180            if data[KEY_TX_OK_COUNT] > 0:
181                results["tx_count_success"] = results["tx_count_success"] + 1
182            if data[KEY_TX_OK_COUNT] > 1:
183                results["tx_count_duplicate_success"] += 1
184            if data[KEY_TX_FAIL_COUNT] > 0:
185                results["tx_count_fail"] += 1
186            if data[KEY_TX_FAIL_COUNT] > 1:
187                results["tx_count_duplicate_fail"] += 1
188            if (data[KEY_TX_OK_COUNT] == 0 and data[KEY_TX_FAIL_COUNT] == 0
189                    and data[KEY_ID] != -1):
190                results["tx_count_neither"] += 1
191            if data[KEY_TX_OK_COUNT] > 0 and data[KEY_RX_COUNT] == 0:
192                results["tx_count_tx_ok_but_no_rx"] += 1
193            if data[KEY_RX_COUNT] > 0:
194                results["rx_count"] += 1
195            if data[KEY_RX_COUNT] > 1:
196                results["rx_count_duplicate"] += 1
197            if data[KEY_RX_COUNT] > 0 and data[KEY_TX_OK_COUNT] == 0:
198                results["rx_count_no_ok_tx_indication"] += 1
199            if data[KEY_RX_COUNT] > 0 and data[KEY_TX_FAIL_COUNT] > 0:
200                results["rx_count_fail_tx_indication"] += 1
201            if data[KEY_RX_COUNT] > 0 and data[KEY_ID] == -1:
202                results["rx_count_no_tx_message"] += 1
203
204    #######################################################################
205
206    @test_tracker_info(uuid="e88c060f-4ca7-41c1-935a-d3d62878ec0b")
207    def test_stress_message_no_throttling(self):
208        """Stress test for bi-directional message transmission and reception no throttling"""
209        p_dut = self.android_devices[0]
210        s_dut = self.android_devices[1]
211
212        # Start up a discovery session
213        discovery_data = autils.create_discovery_pair(
214            p_dut,
215            s_dut,
216            p_config=autils.create_discovery_config(
217                self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
218            s_config=autils.create_discovery_config(
219                self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE),
220            device_startup_offset=self.device_startup_offset,
221            msg_id=self.get_next_msg_id())
222        p_id = discovery_data[0]
223        s_id = discovery_data[1]
224        p_disc_id = discovery_data[2]
225        s_disc_id = discovery_data[3]
226        peer_id_on_sub = discovery_data[4]
227        peer_id_on_pub = discovery_data[5]
228
229        # Store information on Tx & Rx messages
230        messages_by_msg = {}  # keyed by message text
231        # {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
232        messages_by_id = {}  # keyed by message ID {id -> text}
233        iterations = 0
234        p_tx_ok_count_total = 0
235        p_tx_fail_count_total = 0
236        p_tx_unknown_id_total = 0
237        s_tx_ok_count_total = 0
238        s_tx_fail_count_total = 0
239        s_tx_unknown_id_total = 0
240
241        # First round will fill up the message queue
242        num_of_messages_this_round = self.MESSAGE_QUEUE_DEPTH_PER_UID
243
244        # send messages (one in each direction) in rounds to avoid exceed the queue limit
245        for j in range(self.NUM_ROUNDS):
246            for k in range(num_of_messages_this_round):
247                msg_p2s = "Message Publisher -> Subscriber #%d" % iterations
248                next_msg_id = self.get_next_msg_id()
249                self.init_info(msg_p2s, next_msg_id, messages_by_msg,
250                               messages_by_id)
251                p_dut.droid.wifiAwareSendMessage(p_disc_id, peer_id_on_pub,
252                                                 next_msg_id, msg_p2s, 0)
253
254                msg_s2p = "Message Subscriber -> Publisher #%d" % iterations
255                next_msg_id = self.get_next_msg_id()
256                self.init_info(msg_s2p, next_msg_id, messages_by_msg,
257                               messages_by_id)
258                s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub,
259                                                 next_msg_id, msg_s2p, 0)
260                iterations += 1
261
262            # wait for message tx confirmation
263            (p_tx_ok_count, p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events(
264                p_dut, self.NUM_PER_ROUND, messages_by_msg, messages_by_id)
265            p_tx_ok_count_total += p_tx_ok_count
266            p_tx_fail_count_total += p_tx_fail_count
267            p_tx_unknown_id_total += p_tx_unknown_id
268            (s_tx_ok_count, s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events(
269                s_dut, self.NUM_PER_ROUND, messages_by_msg, messages_by_id)
270            s_tx_ok_count_total += s_tx_ok_count
271            s_tx_fail_count_total += s_tx_fail_count
272            s_tx_unknown_id_total += s_tx_unknown_id
273
274            num_of_messages_this_round = self.NUM_PER_ROUND
275
276        # wait for the rest message tx confirmation
277        p_tx_total = p_tx_ok_count_total + p_tx_fail_count_total + p_tx_unknown_id_total
278        s_tx_total = s_tx_ok_count_total + s_tx_fail_count_total + s_tx_unknown_id_total
279        (p_tx_ok_count, p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events(
280            p_dut, iterations - p_tx_total, messages_by_msg, messages_by_id)
281        (s_tx_ok_count, s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events(
282            s_dut, iterations - s_tx_total, messages_by_msg, messages_by_id)
283        p_tx_ok_count_total += p_tx_ok_count
284        p_tx_fail_count_total += p_tx_fail_count
285        p_tx_unknown_id_total += p_tx_unknown_id
286        s_tx_ok_count_total += s_tx_ok_count
287        s_tx_fail_count_total += s_tx_fail_count
288        s_tx_unknown_id_total += s_tx_unknown_id
289        self.log.info(
290            "Transmission done: pub=%d, sub=%d transmitted successfully",
291            p_tx_ok_count_total, s_tx_ok_count_total)
292
293        # wait for message rx confirmation (giving it the total number of messages
294        # transmitted rather than just those transmitted correctly since sometimes
295        # the Tx doesn't get that information correctly. I.e. a message the Tx
296        # thought was not transmitted correctly is actually received - missing ACK?
297        # bug?)
298        self.wait_for_rx_events(p_dut, iterations, messages_by_msg)
299        self.wait_for_rx_events(s_dut, iterations, messages_by_msg)
300
301        # analyze results
302        results = {}
303        results["tx_count"] = 2 * iterations
304        results["tx_unknown_ids"] = p_tx_unknown_id_total + s_tx_unknown_id_total
305        self.analyze_results(results, messages_by_msg)
306
307        # clear errors
308        asserts.assert_equal(results["tx_unknown_ids"], 0,
309                             "Message ID corruption", results)
310        asserts.assert_equal(results["tx_count_neither"], 0,
311                             "Tx message with no success or fail indication",
312                             results)
313        asserts.assert_equal(results["tx_count_duplicate_fail"], 0,
314                             "Duplicate Tx fail messages", results)
315        asserts.assert_equal(results["tx_count_duplicate_success"], 0,
316                             "Duplicate Tx success messages", results)
317        asserts.assert_equal(results["rx_count_no_tx_message"], 0,
318                             "Rx message which wasn't sent - message corruption?", results)
319        asserts.assert_equal(results["tx_count_tx_ok_but_no_rx"], 0,
320                             "Tx got ACK but Rx didn't get message", results)
321
322        # possibly ok - but flag since most frequently a bug
323        asserts.assert_equal(results["rx_count_no_ok_tx_indication"], 0,
324                             "Message received but Tx didn't get ACK", results)
325        asserts.assert_equal(results["rx_count_fail_tx_indication"], 0,
326                             "Message received but Tx didn't get ACK", results)
327
328        # permissible failures based on thresholds
329        asserts.assert_true(
330            results["tx_count_fail"] <=
331            (self.MAX_TX_FAILURE_PERCENTAGE * iterations * 2 / 100),
332            "Number of Tx failures exceeds threshold", extras=results)
333        asserts.assert_true(
334            results["rx_count_duplicate"] <=
335            (self.MAX_DUPLICATE_RX_PERCENTAGE * iterations * 2 / 100),
336            "Number of duplicate Rx exceeds threshold", extras=results)
337
338        asserts.explicit_pass("test_stress_message_no_throttling done", extras=results)
339
340    @test_tracker_info(uuid="546b0c6f-3071-4330-8e23-842ecbd07018")
341    def test_stress_message_throttling(self):
342        """Stress test for bi-directional message transmission and reception with throttling"""
343        p_dut = self.android_devices[0]
344        s_dut = self.android_devices[1]
345
346        # Start up a discovery session
347        discovery_data = autils.create_discovery_pair(
348            p_dut,
349            s_dut,
350            p_config=autils.create_discovery_config(
351                self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
352            s_config=autils.create_discovery_config(
353                self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE),
354            device_startup_offset=self.device_startup_offset,
355            msg_id=self.get_next_msg_id())
356        p_id = discovery_data[0]
357        s_id = discovery_data[1]
358        p_disc_id = discovery_data[2]
359        s_disc_id = discovery_data[3]
360        peer_id_on_sub = discovery_data[4]
361        peer_id_on_pub = discovery_data[5]
362
363        # Store information on Tx & Rx messages
364        messages_by_msg = {}  # keyed by message text
365        # {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
366        messages_by_id = {}  # keyed by message ID {id -> text}
367
368        # send all messages at once (one in each direction)
369        for i in range(self.NUM_ITERATIONS):
370            msg_p2s = "Message Publisher -> Subscriber #%d" % i
371            next_msg_id = self.get_next_msg_id()
372            self.init_info(msg_p2s, next_msg_id, messages_by_msg,
373                           messages_by_id)
374            p_dut.droid.wifiAwareSendMessage(p_disc_id, peer_id_on_pub,
375                                             next_msg_id, msg_p2s, 0)
376
377            msg_s2p = "Message Subscriber -> Publisher #%d" % i
378            next_msg_id = self.get_next_msg_id()
379            self.init_info(msg_s2p, next_msg_id, messages_by_msg,
380                           messages_by_id)
381            s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub,
382                                             next_msg_id, msg_s2p, 0)
383
384        # wait for message tx confirmation
385        (p_tx_ok_count,
386         p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events(
387             p_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id)
388        (s_tx_ok_count,
389         s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events(
390             s_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id)
391        self.log.info(
392            "Transmission done: pub=%d, sub=%d transmitted successfully",
393            p_tx_ok_count, s_tx_ok_count)
394
395        # wait for message rx confirmation (giving it the total number of messages
396        # transmitted rather than just those transmitted correctly since sometimes
397        # the Tx doesn't get that information correctly. I.e. a message the Tx
398        # thought was not transmitted correctly is actually received - missing ACK?
399        # bug?)
400        self.wait_for_rx_events(p_dut, self.NUM_ITERATIONS, messages_by_msg)
401        self.wait_for_rx_events(s_dut, self.NUM_ITERATIONS, messages_by_msg)
402
403        # analyze results
404        results = {}
405        results["tx_count"] = 2 * self.NUM_ITERATIONS
406        results["tx_unknown_ids"] = p_tx_unknown_id + s_tx_unknown_id
407        self.analyze_results(results, messages_by_msg)
408
409        # clear errors
410        asserts.assert_equal(results["tx_unknown_ids"], 0,
411                             "Message ID corruption", results)
412        asserts.assert_equal(results["tx_count_neither"], 0,
413                             "Tx message with no success or fail indication",
414                             results)
415        asserts.assert_equal(results["tx_count_duplicate_fail"], 0,
416                             "Duplicate Tx fail messages", results)
417        asserts.assert_equal(results["tx_count_duplicate_success"], 0,
418                             "Duplicate Tx success messages", results)
419        asserts.assert_equal(
420            results["rx_count_no_tx_message"], 0,
421            "Rx message which wasn't sent - message corruption?", results)
422        asserts.assert_equal(results["tx_count_tx_ok_but_no_rx"], 0,
423                             "Tx got ACK but Rx didn't get message", results)
424
425        # possibly ok - but flag since most frequently a bug
426        asserts.assert_equal(results["rx_count_no_ok_tx_indication"], 0,
427                             "Message received but Tx didn't get ACK", results)
428        asserts.assert_equal(results["rx_count_fail_tx_indication"], 0,
429                             "Message received but Tx didn't get ACK", results)
430
431        # permissible failures based on thresholds
432        asserts.assert_true(
433            results["rx_count_duplicate"] <=
434            (self.MAX_DUPLICATE_RX_PERCENTAGE * results["tx_count_success"] / 100),
435            "Number of duplicate Rx exceeds threshold", extras=results)
436
437        # check working status message queue limit per UID
438        asserts.assert_true(
439            results["tx_count_success"] >= self.MESSAGE_QUEUE_DEPTH_PER_UID * 2,
440            "Number of messages did not reach uid message queue limit", extras=results)
441        asserts.assert_true(
442            results["tx_count_success"] < self.NUM_ITERATIONS * 2,
443            "Seems uid message queue limit is not working, Tx all message", extras=results)
444
445        asserts.explicit_pass("test_stress_message_throttling done", extras=results)
446