• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo
18
19import errno
20import os
21import posix
22import random
23import re
24from socket import *  # pylint: disable=wildcard-import
25import threading
26import time
27import unittest
28
29from scapy import all as scapy
30
31import csocket
32import multinetwork_base
33import net_test
34
35
36HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
37
38ICMP_ECHO = 8
39ICMP_ECHOREPLY = 0
40ICMPV6_ECHO_REQUEST = 128
41ICMPV6_ECHO_REPLY = 129
42
43
44class PingReplyThread(threading.Thread):
45
46  MIN_TTL = 10
47  INTERMEDIATE_IPV4 = "192.0.2.2"
48  INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
49  NEIGHBOURS = ["fe80::1"]
50
51  def __init__(self, tun, mymac, routermac):
52    super(PingReplyThread, self).__init__()
53    self._tun = tun
54    self._stopped = False
55    self._mymac = mymac
56    self._routermac = routermac
57
58  def Stop(self):
59    self._stopped = True
60
61  def ChecksumValid(self, packet):
62    # Get and clear the checksums.
63    def GetAndClearChecksum(layer):
64      if not layer:
65        return
66      try:
67        checksum = layer.chksum
68        del layer.chksum
69      except AttributeError:
70        checksum = layer.cksum
71        del layer.cksum
72      return checksum
73
74    def GetChecksum(layer):
75      try:
76        return layer.chksum
77      except AttributeError:
78        return layer.cksum
79
80    layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
81    sums = {}
82    for name in layers:
83      sums[name] = GetAndClearChecksum(packet.getlayer(name))
84
85    # Serialize the packet, so scapy recalculates the checksums, and compare
86    # them with the ones in the packet.
87    packet = packet.__class__(str(packet))
88    for name in layers:
89      layer = packet.getlayer(name)
90      if layer and GetChecksum(layer) != sums[name]:
91        return False
92
93    return True
94
95  def SendTimeExceeded(self, version, packet):
96    if version == 4:
97      src = packet.getlayer(scapy.IP).src
98      self.SendPacket(
99          scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
100          scapy.ICMP(type=11, code=0) /
101          packet)
102    elif version == 6:
103      src = packet.getlayer(scapy.IPv6).src
104      self.SendPacket(
105          scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
106          scapy.ICMPv6TimeExceeded(code=0) /
107          packet)
108
109  def IPv4Packet(self, ip):
110    icmp = ip.getlayer(scapy.ICMP)
111
112    # We only support ping for now.
113    if (ip.proto != IPPROTO_ICMP or
114        icmp.type != ICMP_ECHO or
115        icmp.code != 0):
116      return
117
118    # Check the checksums.
119    if not self.ChecksumValid(ip):
120      return
121
122    if ip.ttl < self.MIN_TTL:
123      self.SendTimeExceeded(4, ip)
124      return
125
126    icmp.type = ICMP_ECHOREPLY
127    self.SwapAddresses(ip)
128    self.SendPacket(ip)
129
130  def IPv6Packet(self, ipv6):
131    icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
132
133    # We only support ping for now.
134    if (ipv6.nh != IPPROTO_ICMPV6 or
135        not icmpv6 or
136        icmpv6.type != ICMPV6_ECHO_REQUEST or
137        icmpv6.code != 0):
138      return
139
140    # Check the checksums.
141    if not self.ChecksumValid(ipv6):
142      return
143
144    if ipv6.dst.startswith("ff02::"):
145      ipv6.dst = ipv6.src
146      for src in self.NEIGHBOURS:
147        ipv6.src = src
148        icmpv6.type = ICMPV6_ECHO_REPLY
149        self.SendPacket(ipv6)
150    elif ipv6.hlim < self.MIN_TTL:
151      self.SendTimeExceeded(6, ipv6)
152    else:
153      icmpv6.type = ICMPV6_ECHO_REPLY
154      self.SwapAddresses(ipv6)
155      self.SendPacket(ipv6)
156
157  def SwapAddresses(self, packet):
158    src = packet.src
159    packet.src = packet.dst
160    packet.dst = src
161
162  def SendPacket(self, packet):
163    packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
164    try:
165      posix.write(self._tun.fileno(), str(packet))
166    except ValueError:
167      pass
168
169  def run(self):
170    while not self._stopped:
171
172      try:
173        packet = posix.read(self._tun.fileno(), 4096)
174      except OSError, e:
175        if e.errno == errno.EAGAIN:
176          continue
177        else:
178          break
179
180      ether = scapy.Ether(packet)
181      if ether.type == net_test.ETH_P_IPV6:
182        self.IPv6Packet(ether.payload)
183      elif ether.type == net_test.ETH_P_IP:
184        self.IPv4Packet(ether.payload)
185
186
187class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
188
189  @classmethod
190  def setUpClass(cls):
191    super(Ping6Test, cls).setUpClass()
192    cls.netid = random.choice(cls.NETIDS)
193    cls.reply_thread = PingReplyThread(
194        cls.tuns[cls.netid],
195        cls.MyMacAddress(cls.netid),
196        cls.RouterMacAddress(cls.netid))
197    cls.SetDefaultNetwork(cls.netid)
198    cls.reply_thread.start()
199
200  @classmethod
201  def tearDownClass(cls):
202    cls.reply_thread.Stop()
203    cls.ClearDefaultNetwork()
204    super(Ping6Test, cls).tearDownClass()
205
206  def setUp(self):
207    self.ifname = self.GetInterfaceName(self.netid)
208    self.ifindex = self.ifindices[self.netid]
209    self.lladdr = net_test.GetLinkAddress(self.ifname, True)
210    self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
211
212  def assertValidPingResponse(self, s, data):
213    family = s.family
214
215    # Receive the reply.
216    rcvd, src = s.recvfrom(32768)
217    self.assertNotEqual(0, len(rcvd), "No data received")
218
219    # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
220    # as IPv4.
221    if src[0].startswith("::ffff:"):
222      family = AF_INET
223      src = (src[0].replace("::ffff:", ""), src[1:])
224
225    # Check the data being sent is valid.
226    self.assertGreater(len(data), 7, "Not enough data for ping packet")
227    if family == AF_INET:
228      self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
229    elif family == AF_INET6:
230      self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
231    else:
232      self.fail("Unknown socket address family %d" * s.family)
233
234    # Check address, ICMP type, and ICMP code.
235    if family == AF_INET:
236      addr, unused_port = src
237      self.assertGreaterEqual(len(addr), len("1.1.1.1"))
238      self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
239    else:
240      addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
241      self.assertGreaterEqual(len(addr), len("::"))
242      self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
243      # Check that the flow label is zero and that the scope ID is sane.
244      self.assertEqual(flowlabel, 0)
245      if addr.startswith("fe80::"):
246        self.assertTrue(scope_id in self.ifindices.values())
247      else:
248        self.assertEquals(0, scope_id)
249
250    # TODO: check the checksum. We can't do this easily now for ICMPv6 because
251    # we don't have the IP addresses so we can't construct the pseudoheader.
252
253    # Check the sequence number and the data.
254    self.assertEqual(len(data), len(rcvd))
255    self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
256
257  def ReadProcNetSocket(self, protocol):
258    # Read file.
259    lines = open("/proc/net/%s" % protocol).readlines()
260
261    # Possibly check, and strip, header.
262    if protocol in ["icmp6", "raw6", "udp6"]:
263      self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0])
264    lines = lines[1:]
265
266    # Check contents.
267    if protocol.endswith("6"):
268      addrlen = 32
269    else:
270      addrlen = 8
271    regexp = re.compile(r" *(\d+): "                    # bucket
272                        "([0-9A-F]{%d}:[0-9A-F]{4}) "   # srcaddr, port
273                        "([0-9A-F]{%d}:[0-9A-F]{4}) "   # dstaddr, port
274                        "([0-9A-F][0-9A-F]) "           # state
275                        "([0-9A-F]{8}:[0-9A-F]{8}) "    # mem
276                        "([0-9A-F]{2}:[0-9A-F]{8}) "    # ?
277                        "([0-9A-F]{8}) +"               # ?
278                        "([0-9]+) +"                    # uid
279                        "([0-9]+) +"                    # ?
280                        "([0-9]+) +"                    # inode
281                        "([0-9]+) +"                    # refcnt
282                        "([0-9a-f]+) +"                 # sp
283                        "([0-9]+) *$"                   # drops, icmp has spaces
284                        % (addrlen, addrlen))
285    # Return a list of lists with only source / dest addresses for now.
286    out = []
287    for line in lines:
288      (_, src, dst, state, mem,
289       _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups()
290      out.append([src, dst, state, mem, uid, refcnt, drops])
291    return out
292
293  def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
294                        txmem=0, rxmem=0):
295    expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
296                "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
297                "%02X" % state,
298                "%08X:%08X" % (txmem, rxmem),
299                str(os.getuid()), "2", "0"]
300    actual = self.ReadProcNetSocket(name)[-1]
301    self.assertListEqual(expected, actual)
302
303  def testIPv4SendWithNoConnection(self):
304    s = net_test.IPv4PingSocket()
305    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
306
307  def testIPv6SendWithNoConnection(self):
308    s = net_test.IPv6PingSocket()
309    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
310
311  def testIPv4LoopbackPingWithConnect(self):
312    s = net_test.IPv4PingSocket()
313    s.connect(("127.0.0.1", 55))
314    data = net_test.IPV4_PING + "foobarbaz"
315    s.send(data)
316    self.assertValidPingResponse(s, data)
317
318  def testIPv6LoopbackPingWithConnect(self):
319    s = net_test.IPv6PingSocket()
320    s.connect(("::1", 55))
321    s.send(net_test.IPV6_PING)
322    self.assertValidPingResponse(s, net_test.IPV6_PING)
323
324  def testIPv4PingUsingSendto(self):
325    s = net_test.IPv4PingSocket()
326    written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
327    self.assertEquals(len(net_test.IPV4_PING), written)
328    self.assertValidPingResponse(s, net_test.IPV4_PING)
329
330  def testIPv6PingUsingSendto(self):
331    s = net_test.IPv6PingSocket()
332    written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
333    self.assertEquals(len(net_test.IPV6_PING), written)
334    self.assertValidPingResponse(s, net_test.IPV6_PING)
335
336  def testIPv4NoCrash(self):
337    # Python 2.x does not provide either read() or recvmsg.
338    s = net_test.IPv4PingSocket()
339    written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
340    self.assertEquals(len(net_test.IPV4_PING), written)
341    fd = s.fileno()
342    reply = posix.read(fd, 4096)
343    self.assertEquals(written, len(reply))
344
345  def testIPv6NoCrash(self):
346    # Python 2.x does not provide either read() or recvmsg.
347    s = net_test.IPv6PingSocket()
348    written = s.sendto(net_test.IPV6_PING, ("::1", 55))
349    self.assertEquals(len(net_test.IPV6_PING), written)
350    fd = s.fileno()
351    reply = posix.read(fd, 4096)
352    self.assertEquals(written, len(reply))
353
354  def testCrossProtocolCrash(self):
355    # Checks that an ICMP error containing a ping packet that matches the ID
356    # of a socket of the wrong protocol (which can happen when using 464xlat)
357    # doesn't crash the kernel.
358
359    # We can only test this using IPv6 unreachables and IPv4 ping sockets,
360    # because IPv4 packets sent by scapy.send() on loopback are not received by
361    # the kernel. So we don't actually use this function yet.
362    def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
363      return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
364              scapy.ICMP(type=3, code=0) /
365              scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
366              scapy.ICMP(type=8, id=port, seq=1))
367
368    def GetIPv6Unreachable(port):
369      return (scapy.IPv6(src="::1", dst="::1") /
370              scapy.ICMPv6DestUnreach() /
371              scapy.IPv6(src="::1", dst="::1") /
372              scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
373
374    # An unreachable matching the ID of a socket of the wrong protocol
375    # shouldn't crash.
376    s = net_test.IPv4PingSocket()
377    s.connect(("127.0.0.1", 12345))
378    _, port = s.getsockname()
379    scapy.send(GetIPv6Unreachable(port))
380    # No crash? Good.
381
382  def testCrossProtocolCalls(self):
383    """Tests that passing in the wrong family returns EAFNOSUPPORT."""
384
385    def CheckEAFNoSupport(function, *args):
386      self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
387
388    ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
389
390    # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
391    # IPv4 socket address structures, we need to pass down a socket address
392    # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
393    # will fail immediately with EINVAL because the passed-in socket length is
394    # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
395    ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
396    ipv4sockaddr = csocket.SockaddrIn6(
397        ipv4sockaddr.Pack() +
398        "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
399
400    s4 = net_test.IPv4PingSocket()
401    s6 = net_test.IPv6PingSocket()
402
403    # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
404    # address family, because the Python implementation will just pass garbage
405    # down to the kernel. So call the C functions directly.
406    CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
407    CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
408    CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
409    CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
410    CheckEAFNoSupport(csocket.Sendmsg,
411                      s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
412    CheckEAFNoSupport(csocket.Sendmsg,
413                      s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
414
415  def testIPv4Bind(self):
416    # Bind to unspecified address.
417    s = net_test.IPv4PingSocket()
418    s.bind(("0.0.0.0", 544))
419    self.assertEquals(("0.0.0.0", 544), s.getsockname())
420
421    # Bind to loopback.
422    s = net_test.IPv4PingSocket()
423    s.bind(("127.0.0.1", 99))
424    self.assertEquals(("127.0.0.1", 99), s.getsockname())
425
426    # Binding twice is not allowed.
427    self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
428
429    # But binding two different sockets to the same ID is allowed.
430    s2 = net_test.IPv4PingSocket()
431    s2.bind(("127.0.0.1", 99))
432    self.assertEquals(("127.0.0.1", 99), s2.getsockname())
433    s3 = net_test.IPv4PingSocket()
434    s3.bind(("127.0.0.1", 99))
435    self.assertEquals(("127.0.0.1", 99), s3.getsockname())
436
437    # If two sockets bind to the same port, the first one to call read() gets
438    # the response.
439    s4 = net_test.IPv4PingSocket()
440    s5 = net_test.IPv4PingSocket()
441    s4.bind(("0.0.0.0", 167))
442    s5.bind(("0.0.0.0", 167))
443    s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
444    self.assertValidPingResponse(s5, net_test.IPV4_PING)
445    net_test.SetSocketTimeout(s4, 100)
446    self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
447
448    # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
449    s6 = net_test.IPv4PingSocket()
450    s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
451    self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
452
453    # Can't bind after sendto.
454    s = net_test.IPv4PingSocket()
455    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
456    self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
457
458  def testIPv6Bind(self):
459    # Bind to unspecified address.
460    s = net_test.IPv6PingSocket()
461    s.bind(("::", 769))
462    self.assertEquals(("::", 769, 0, 0), s.getsockname())
463
464    # Bind to loopback.
465    s = net_test.IPv6PingSocket()
466    s.bind(("::1", 99))
467    self.assertEquals(("::1", 99, 0, 0), s.getsockname())
468
469    # Binding twice is not allowed.
470    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
471
472    # But binding two different sockets to the same ID is allowed.
473    s2 = net_test.IPv6PingSocket()
474    s2.bind(("::1", 99))
475    self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
476    s3 = net_test.IPv6PingSocket()
477    s3.bind(("::1", 99))
478    self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
479
480    # Binding both IPv4 and IPv6 to the same socket works.
481    s4 = net_test.IPv4PingSocket()
482    s6 = net_test.IPv6PingSocket()
483    s4.bind(("0.0.0.0", 444))
484    s6.bind(("::", 666, 0, 0))
485
486    # Can't bind after sendto.
487    s = net_test.IPv6PingSocket()
488    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
489    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
490
491  def testIPv4InvalidBind(self):
492    s = net_test.IPv4PingSocket()
493    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
494                           s.bind, ("255.255.255.255", 1026))
495    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
496                           s.bind, ("224.0.0.1", 651))
497    # Binding to an address we don't have only works with IP_TRANSPARENT.
498    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
499                           s.bind, (net_test.IPV4_ADDR, 651))
500    try:
501      s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
502      s.bind((net_test.IPV4_ADDR, 651))
503    except IOError, e:
504      if e.errno == errno.EACCES:
505        pass  # We're not root. let it go for now.
506
507  def testIPv6InvalidBind(self):
508    s = net_test.IPv6PingSocket()
509    self.assertRaisesErrno(errno.EINVAL,
510                           s.bind, ("ff02::2", 1026))
511
512    # Binding to an address we don't have only works with IPV6_TRANSPARENT.
513    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
514                           s.bind, (net_test.IPV6_ADDR, 651))
515    try:
516      s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
517      s.bind((net_test.IPV6_ADDR, 651))
518    except IOError, e:
519      if e.errno == errno.EACCES:
520        pass  # We're not root. let it go for now.
521
522  def testAfUnspecBind(self):
523    # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
524    s4 = net_test.IPv4PingSocket()
525    sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
526    sockaddr.family = AF_UNSPEC
527    csocket.Bind(s4, sockaddr)
528    self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
529
530    # But not if the address is anything else.
531    sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
532    sockaddr.family = AF_UNSPEC
533    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
534
535    # This doesn't work for IPv6.
536    s6 = net_test.IPv6PingSocket()
537    sockaddr = csocket.Sockaddr(("::1", 58997))
538    sockaddr.family = AF_UNSPEC
539    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
540
541  def testIPv6ScopedBind(self):
542    # Can't bind to a link-local address without a scope ID.
543    s = net_test.IPv6PingSocket()
544    self.assertRaisesErrno(errno.EINVAL,
545                           s.bind, (self.lladdr, 1026, 0, 0))
546
547    # Binding to a link-local address with a scope ID works, and the scope ID is
548    # returned by a subsequent getsockname. Interestingly, Python's getsockname
549    # returns "fe80:1%foo", even though it does not understand it.
550    expected = self.lladdr + "%" + self.ifname
551    s.bind((self.lladdr, 4646, 0, self.ifindex))
552    self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
553
554    # Of course, for the above to work the address actually has to be configured
555    # on the machine.
556    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
557                           s.bind, ("fe80::f00", 1026, 0, 1))
558
559    # Scope IDs on non-link-local addresses are silently ignored.
560    s = net_test.IPv6PingSocket()
561    s.bind(("::1", 1234, 0, 1))
562    self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
563
564  def testBindAffectsIdentifier(self):
565    s = net_test.IPv6PingSocket()
566    s.bind((self.globaladdr, 0xf976))
567    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
568    self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
569
570    s = net_test.IPv6PingSocket()
571    s.bind((self.globaladdr, 0xace))
572    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
573    self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
574
575  def testLinkLocalAddress(self):
576    s = net_test.IPv6PingSocket()
577    # Sending to a link-local address with no scope fails with EINVAL.
578    self.assertRaisesErrno(errno.EINVAL,
579                           s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
580    # Sending to link-local address with a scope succeeds. Note that Python
581    # doesn't understand the "fe80::1%lo" format, even though it returns it.
582    s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
583    # No exceptions? Good.
584
585  def testMappedAddressFails(self):
586    s = net_test.IPv6PingSocket()
587    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
588    self.assertValidPingResponse(s, net_test.IPV6_PING)
589    s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
590    self.assertValidPingResponse(s, net_test.IPV6_PING)
591    self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
592                           ("::ffff:192.0.2.1", 55))
593
594  @unittest.skipUnless(False, "skipping: does not work yet")
595  def testFlowLabel(self):
596    s = net_test.IPv6PingSocket()
597
598    # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
599    # the flow label in the packet is not set.
600    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
601    self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
602
603    # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
604    # that is not registered with the flow manager should return EINVAL...
605    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
606    # ... but this doesn't work yet.
607    if False:
608      self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
609                             (net_test.IPV6_ADDR, 93, 0xdead, 0))
610
611    # After registering the flow label, it gets sent properly, appears in the
612    # output packet, and is returned in the response.
613    net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
614    self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
615                                     net_test.IPV6_FLOWINFO_SEND))
616    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
617    _, src = s.recvfrom(32768)
618    _, _, flowlabel, _ = src
619    self.assertEqual(0xdead, flowlabel & 0xfffff)
620
621  def testIPv4Error(self):
622    s = net_test.IPv4PingSocket()
623    s.setsockopt(SOL_IP, IP_TTL, 2)
624    s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
625    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
626    # We can't check the actual error because Python 2.7 doesn't implement
627    # recvmsg, but we can at least check that the socket returns an error.
628    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
629
630  def testIPv6Error(self):
631    s = net_test.IPv6PingSocket()
632    s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
633    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
634    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
635    # We can't check the actual error because Python 2.7 doesn't implement
636    # recvmsg, but we can at least check that the socket returns an error.
637    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
638
639  def testIPv6MulticastPing(self):
640    s = net_test.IPv6PingSocket()
641    # Send a multicast ping and check we get at least one duplicate.
642    # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
643    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
644    s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
645    self.assertValidPingResponse(s, net_test.IPV6_PING)
646    self.assertValidPingResponse(s, net_test.IPV6_PING)
647
648  def testIPv4LargePacket(self):
649    s = net_test.IPv4PingSocket()
650    data = net_test.IPV4_PING + 20000 * "a"
651    s.sendto(data, ("127.0.0.1", 987))
652    self.assertValidPingResponse(s, data)
653
654  def testIPv6LargePacket(self):
655    s = net_test.IPv6PingSocket()
656    s.bind(("::", 0xace))
657    data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
658    s.sendto(data, ("::1", 953))
659
660  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
661  def testIcmpSocketsNotInIcmp6(self):
662    numrows = len(self.ReadProcNetSocket("icmp"))
663    numrows6 = len(self.ReadProcNetSocket("icmp6"))
664    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
665    s.bind(("127.0.0.1", 0xace))
666    s.connect(("127.0.0.1", 0xbeef))
667    self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
668    self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
669
670  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
671  def testIcmp6SocketsNotInIcmp(self):
672    numrows = len(self.ReadProcNetSocket("icmp"))
673    numrows6 = len(self.ReadProcNetSocket("icmp6"))
674    s = net_test.IPv6PingSocket()
675    s.bind(("::1", 0xace))
676    s.connect(("::1", 0xbeef))
677    self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
678    self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
679
680  def testProcNetIcmp(self):
681    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
682    s.bind(("127.0.0.1", 0xace))
683    s.connect(("127.0.0.1", 0xbeef))
684    self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
685
686  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
687  def testProcNetIcmp6(self):
688    numrows6 = len(self.ReadProcNetSocket("icmp6"))
689    s = net_test.IPv6PingSocket()
690    s.bind(("::1", 0xace))
691    s.connect(("::1", 0xbeef))
692    self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
693
694    # Check the row goes away when the socket is closed.
695    s.close()
696    self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
697
698    # Try send, bind and connect to check the addresses and the state.
699    s = net_test.IPv6PingSocket()
700    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
701    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
702    self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
703
704    # Can't bind after sendto, apparently.
705    s = net_test.IPv6PingSocket()
706    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
707    s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
708    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
709
710    # Check receive bytes.
711    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
712    s.connect(("ff02::1", 0xdead))
713    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
714    s.send(net_test.IPV6_PING)
715    time.sleep(0.01)  # Give the other thread time to reply.
716    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
717                           txmem=0, rxmem=0x300)
718    self.assertValidPingResponse(s, net_test.IPV6_PING)
719    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
720                           txmem=0, rxmem=0)
721
722  def testProcNetUdp6(self):
723    s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
724    s.bind(("::1", 0xace))
725    s.connect(("::1", 0xbeef))
726    self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
727
728  def testProcNetRaw6(self):
729    s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
730    s.bind(("::1", 0xace))
731    s.connect(("::1", 0xbeef))
732    self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
733
734
735if __name__ == "__main__":
736  unittest.main()
737