• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python3.4
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18
19import acts.base_test as base_test
20import acts.controllers.android_device as android_device
21import acts.test_utils.wifi.wifi_test_utils as wutils
22
23from acts import asserts
24
25ON_IDENTITY_CHANGED = "WifiNanOnIdentityChanged"
26ON_MATCH = "WifiNanSessionOnMatch"
27ON_MESSAGE_RX = "WifiNanSessionOnMessageReceived"
28ON_MESSAGE_TX_FAIL = "WifiNanSessionOnMessageSendFail"
29ON_MESSAGE_TX_OK = "WifiNanSessionOnMessageSendSuccess"
30
31class WifiNanManagerTest(base_test.BaseTestClass):
32    msg_id = 10
33
34    def __init__(self, controllers):
35        base_test.BaseTestClass.__init__(self, controllers)
36        self.publisher = self.android_devices[0]
37        self.subscriber = self.android_devices[1]
38        self.tests = (
39            "test_nan_base_test",
40            )
41
42    def setup_class(self):
43        required_params = (
44            "config_request1",
45            "config_request2",
46            "publish_data",
47            "publish_settings",
48            "subscribe_data",
49            "subscribe_settings"
50        )
51        self.unpack_userparams(required_params)
52
53    def setup_test(self):
54        assert wutils.wifi_toggle_state(self.publisher, True)
55        assert wutils.wifi_toggle_state(self.subscriber, True)
56
57    def teardown_class(self):
58        assert wutils.wifi_toggle_state(self.publisher, False)
59        assert wutils.wifi_toggle_state(self.subscriber, False)
60
61    def reliable_tx(self, device, peer, msg):
62        num_tries = 0
63        max_num_tries = 10
64        events_regex = '%s|%s' % (ON_MESSAGE_TX_FAIL, ON_MESSAGE_TX_OK)
65        self.msg_id = self.msg_id + 1
66
67        while True:
68            try:
69                num_tries += 1
70                device.droid.wifiNanSendMessage(peer, msg, self.msg_id)
71                events = device.ed.pop_events(events_regex, 30)
72                for event in events:
73                    self.log.info('%s: %s' % (event['name'], event['data']))
74                    if event['data']['messageId'] != self.msg_id:
75                        continue
76                    if event['name'] == ON_MESSAGE_TX_OK:
77                        return True
78                    if num_tries == max_num_tries:
79                        self.log.info("Max number of retries reached")
80                        return False
81            except queue.Empty:
82                self.log.info('Timed out while waiting for %s' % events_regex)
83                return False
84
85    def test_nan_base_test(self):
86        """Perform NAN configuration, discovery, and message exchange.
87
88        Configuration: 2 devices, one acting as Publisher (P) and the
89        other as Subscriber (S)
90
91        Logical steps:
92          * P & S configure NAN
93          * P & S wait for NAN configuration confirmation
94          * P starts publishing
95          * S starts subscribing
96          * S waits for a match (discovery) notification
97          * S sends a message to P, confirming that sent successfully
98          * P waits for a message and confirms that received (uncorrupted)
99          * P sends a message to S, confirming that sent successfully
100          * S waits for a message and confirms that received (uncorrupted)
101        """
102        self.publisher.droid.wifiNanEnable(self.config_request1)
103        self.subscriber.droid.wifiNanEnable(self.config_request2)
104
105        sub2pub_msg = "How are you doing?"
106        pub2sub_msg = "Doing ok - thanks!"
107
108        try:
109            event = self.publisher.ed.pop_event(ON_IDENTITY_CHANGED, 30)
110            self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
111        except queue.Empty:
112            asserts.fail('Timed out while waiting for %s on Publisher' %
113                      ON_IDENTITY_CHANGED)
114        self.log.debug(event)
115
116        try:
117            event = self.subscriber.ed.pop_event(ON_IDENTITY_CHANGED, 30)
118            self.log.info('%s: %s' % (ON_IDENTITY_CHANGED, event['data']))
119        except queue.Empty:
120            asserts.fail('Timed out while waiting for %s on Subscriber' %
121                      ON_IDENTITY_CHANGED)
122        self.log.debug(event)
123
124        self.publisher.droid.wifiNanPublish(self.publish_data,
125                                            self.publish_settings, 0)
126        self.subscriber.droid.wifiNanSubscribe(self.subscribe_data,
127                                               self.subscribe_settings, 0)
128
129        try:
130            event = self.subscriber.ed.pop_event(ON_MATCH, 30)
131            self.log.info('%s: %s' % (ON_MATCH, event['data']))
132        except queue.Empty:
133            asserts.fail('Timed out while waiting for %s on Subscriber' % ON_MATCH)
134        self.log.debug(event)
135
136        asserts.assert_true(self.reliable_tx(self.subscriber,
137                                          event['data']['peerId'],
138                                          sub2pub_msg),
139                         "Failed to transmit from subscriber")
140
141        try:
142            event = self.publisher.ed.pop_event(ON_MESSAGE_RX, 10)
143            self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
144            asserts.assert_true(event['data']['messageAsString'] == sub2pub_msg,
145                             "Subscriber -> publisher message corrupted")
146        except queue.Empty:
147            asserts.fail('Timed out while waiting for %s on publisher' %
148                      ON_MESSAGE_RX)
149
150        asserts.assert_true(self.reliable_tx(self.publisher,
151                                          event['data']['peerId'],
152                                          pub2sub_msg),
153                         "Failed to transmit from publisher")
154
155        try:
156            event = self.subscriber.ed.pop_event(ON_MESSAGE_RX, 10)
157            self.log.info('%s: %s' % (ON_MESSAGE_RX, event['data']))
158            asserts.assert_true(event['data']['messageAsString'] == pub2sub_msg,
159                             "Publisher -> subscriber message corrupted")
160        except queue.Empty:
161            asserts.fail('Timed out while waiting for %s on subscriber' %
162                      ON_MESSAGE_RX)
163