• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import unittest
6
7import dpkt
8import fake_host
9import socket
10import zeroconf
11
12
13FAKE_HOSTNAME = 'fakehost1'
14
15FAKE_IPADDR = '192.168.11.22'
16
17
18class TestZeroconfDaemon(unittest.TestCase):
19    """Test class for ZeroconfDaemon."""
20
21    def setUp(self):
22        self._host = fake_host.FakeHost(FAKE_IPADDR)
23        self._zero = zeroconf.ZeroconfDaemon(self._host, FAKE_HOSTNAME)
24
25
26    def _query_A(self, name):
27        """Returns the list of A records matching the given name.
28
29        @param name: A domain name.
30        @return a list of dpkt.dns.DNS.RR objects, one for each matching record.
31        """
32        q = dpkt.dns.DNS.Q(name=name, type=dpkt.dns.DNS_A)
33        return self._zero._process_A(q)
34
35
36    def testRegisterService(self):
37        """Tests that we get appropriate records after registering a service."""
38        SERVICE_PORT = 9
39        SERVICE_TXT_LIST = ['lies=lies']
40        self._zero.register_service('unique_prefix', '_service_type',
41                                    '_tcp', SERVICE_PORT, SERVICE_TXT_LIST)
42        name = '_service_type._tcp.local'
43        fq_name = 'unique_prefix.' + name
44        # Issue SRV, PTR, and TXT queries
45        q_srv = dpkt.dns.DNS.Q(name=fq_name, type=dpkt.dns.DNS_SRV)
46        q_txt = dpkt.dns.DNS.Q(name=fq_name, type=dpkt.dns.DNS_TXT)
47        q_ptr = dpkt.dns.DNS.Q(name=name, type=dpkt.dns.DNS_PTR)
48        ptr_responses = self._zero._process_PTR(q_ptr)
49        srv_responses = self._zero._process_SRV(q_srv)
50        txt_responses = self._zero._process_TXT(q_txt)
51        self.assertTrue(ptr_responses)
52        self.assertTrue(srv_responses)
53        self.assertTrue(txt_responses)
54        ptr_resp = ptr_responses[0]
55        srv_resp = [resp for resp in srv_responses
56                    if resp.type == dpkt.dns.DNS_SRV][0]
57        txt_resp = txt_responses[0]
58        # Check that basic things are right.
59        self.assertEqual(fq_name, ptr_resp.ptrname)
60        self.assertEqual(FAKE_HOSTNAME + '.' + self._zero.domain,
61                         srv_resp.srvname)
62        self.assertEqual(SERVICE_PORT, srv_resp.port)
63        self.assertEqual(SERVICE_TXT_LIST, txt_resp.text)
64
65
66    def testProperties(self):
67        """Test the initial properties set by the constructor."""
68        self.assertEqual(self._zero.host, self._host)
69        self.assertEqual(self._zero.hostname, FAKE_HOSTNAME)
70        self.assertEqual(self._zero.domain, 'local') # Default domain
71        self.assertEqual(self._zero.full_hostname, FAKE_HOSTNAME + '.local')
72
73
74    def testSocketInit(self):
75        """Test that the constructor listens for mDNS traffic."""
76
77        # Should create an UDP socket and bind it to the mDNS address and port.
78        self.assertEqual(len(self._host._sockets), 1)
79        sock = self._host._sockets[0]
80
81        self.assertEqual(sock._family, socket.AF_INET) # IPv4
82        self.assertEqual(sock._sock_type, socket.SOCK_DGRAM) # UDP
83
84        # Check it is listening for UDP packets on the mDNS address and port.
85        self.assertTrue(sock._bound)
86        self.assertEqual(sock._bind_ip_addr, '224.0.0.251') # mDNS address
87        self.assertEqual(sock._bind_port, 5353) # mDNS port
88        self.assertTrue(callable(sock._bind_recv_callback))
89
90
91    def testRecordsInit(self):
92        """Test the A record of the host is registered."""
93        host_A = self._query_A(self._zero.full_hostname)
94        self.assertGreater(len(host_A), 0)
95
96        record = host_A[0]
97        # Check the hostname and the packed IP address.
98        self.assertEqual(record.name, self._zero.full_hostname)
99        self.assertEqual(record.ip, socket.inet_aton(self._host.ip_addr))
100
101
102    def testDoubleTXTProcessing(self):
103        """Test when more than one TXT record is present in a packet.
104
105        A mDNS packet can include several answer records for several domains and
106        record type. A corner case found on the field presents a mDNS packet
107        with two TXT records for the same domain name on the same packet on its
108        authoritative answers section while the packet itself is a query.
109        """
110        # Build the mDNS packet with two TXT records.
111        domain_name = 'other_host.local'
112        answers = [
113                dpkt.dns.DNS.RR(
114                        type = dpkt.dns.DNS_TXT,
115                        cls = dpkt.dns.DNS_IN,
116                        ttl = 120,
117                        name = domain_name,
118                        text = ['one', 'two']),
119                dpkt.dns.DNS.RR(
120                        type = dpkt.dns.DNS_TXT,
121                        cls = dpkt.dns.DNS_IN,
122                        ttl = 120,
123                        name = domain_name,
124                        text = ['two'])]
125        # The packet is a query packet, with extra answers on the autoritative
126        # section.
127        mdns = dpkt.dns.DNS(
128                op = dpkt.dns.DNS_QUERY, # Standard query
129                rcode = dpkt.dns.DNS_RCODE_NOERR,
130                q = [],
131                an = [],
132                ns = answers)
133
134        # Record the new answers received on the answer_calls list.
135        answer_calls = []
136        self._zero.add_answer_observer(lambda args: answer_calls.extend(args))
137
138        # Send the packet to the registered callback.
139        sock = self._host._sockets[0]
140        cbk = sock._bind_recv_callback
141        cbk(str(mdns), '1234', 5353)
142
143        # Check that the answers callback is called with all the answers in the
144        # received order.
145        self.assertEqual(len(answer_calls), 2)
146        ans1, ans2 = answer_calls # Each ans is a (rrtype, rrname, data)
147        self.assertEqual(ans1[2], ('one', 'two'))
148        self.assertEqual(ans2[2], ('two',))
149
150        # Check that the two records were cached.
151        records = self._zero.cached_results(domain_name, dpkt.dns.DNS_TXT)
152        self.assertEqual(len(records), 2)
153
154
155if __name__ == '__main__':
156    unittest.main()
157