# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import threading import select import socket import subprocess import sys import unittest from lansim import host from lansim import simulator from lansim import tuntap def raise_exception(): """Raises an exception.""" raise Exception('Something bad.') class InfoTCPServer(threading.Thread): """A TCP server running on a separated thread. This simple TCP server thread listen for connections for every new connection it sends the address information of the connected client. """ def __init__(self, host, port): """Creates the TCP server on the host:port address. @param host: The IP address in plain text. @param port: The TCP port number where the server listens on.""" threading.Thread.__init__(self) self._port = port self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.bind((host, port)) self._sock.listen(1) self._must_exit = False def run(self): while not self._must_exit: # Check the must_exit flag every second. rlist, wlist, xlist = select.select([self._sock], [], [], 1.) if self._sock in rlist: conn, (addr, port) = self._sock.accept() # Send back the client address, port and our port conn.send('%s %d %d' % (addr, port, self._port)) conn.close() self._sock.close() def stop(self): """Signal the termination of the running thread.""" self._must_exit = True def GetInfoTCP(host, port): """Connects to a InfoTCPServer on host:port and reads all the information. @param host: The host where the InfoTCPServer is running. @param port: The port where the InfoTCPServer is running. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) data = sock.recv(1024) sock.close() return data class SimulatorTest(unittest.TestCase): """Unit tests for the Simulator class.""" def setUp(self): """Creates a Simulator under test over a TAP device.""" self._tap = tuntap.TunTap(tuntap.IFF_TAP, name="faketap") # According to RFC 3927 (Dynamic Configuration of IPv4 Link-Local # Addresses), a host can pseudorandomly assign an IPv4 address on the # 169.254/16 network to communicate with other devices on the same link # on absence of a DHCP server and other source of network configuration. # The tests on this class explicitly specify the interface to use, so # they can run in parallel even when there are more than one interface # with the same IPv4 address. A TUN/TAP interface with an IPv4 address # on this range shouldn't collide with any useful service running on a # different (physical) interface. self._tap.set_addr('169.254.11.11') self._tap.up() self._sim = simulator.Simulator(self._tap) def tearDown(self): """Stops and destroy the interface.""" self._tap.down() def testTimeout(self): """Tests that the Simulator can start and run for a short time.""" # Run for at most 100ms and finish the test. This implies that the # stop() method works. self._sim.run(timeout=0.1) def testRemoveTimeout(self): """Tests that the Simulator can remove unfired timeout calls.""" # Schedule the callback far in time, run the simulator for a short time # and remove it. self._sim.add_timeout(60, raise_exception) self._sim.run(timeout=0.1) self.assertTrue(self._sim.remove_timeout(raise_exception)) self.assertFalse(self._sim.remove_timeout(raise_exception)) def testUntil(self): """Tests that the Simulator can start run until a condition is met.""" tasks_done = [] # After 0.2 seconds we add a task to tasks_done that should break the # loop. If it doesn't, the a second value will be added making the test # fail. self._sim.add_timeout(0.2, lambda: tasks_done.append('good task')) self._sim.add_timeout(4.0, lambda: tasks_done.append('bad task')) self._sim.run(timeout=5.0, until=lambda: tasks_done) self.assertEqual(len(tasks_done), 1) def testHost(self): """Tests that the Simulator can add rules from the SimpleHost.""" # The IP and MAC addresses simulated are unknown to the rest of the # system as they only live on this interface. Again, any IP on the # network 169.254/16 should not cause any problem with other services # running on this host. host.SimpleHost(self._sim, '12:34:56:78:90:AB', '169.254.11.22') self._sim.run(timeout=0.1) class SimulatorThreadTest(unittest.TestCase): """Unit tests for the SimulatorThread class.""" def setUp(self): """Creates a SimulatorThread under test over a TAP device.""" self._tap = tuntap.TunTap(tuntap.IFF_TAP, name="faketap") # See note about IP addresses on SimulatorTest.setUp(). self._ip_addr = '169.254.11.11' self._tap.set_addr(self._ip_addr) self._tap.up() # 20 seconds timeout for unittest completion (they should run in about # 2 seconds each). self._sim = simulator.SimulatorThread(self._tap, timeout=20) def tearDown(self): """Stops and destroy the thread.""" self._sim.stop() # stop() is idempotent. self._sim.join() self._tap.down() if self._sim.error: sys.stderr.write('SimulatorThread exception: %r' % self._sim.error) sys.stderr.write(self._sim.traceback) raise self._sim.error def testError(self): """Exceptions raised on the thread appear on the exc_info member.""" self._sim.add_timeout(0.1, raise_exception) self._sim.start() self._sim.join() self.assertEqual(self._sim.error.message, 'Something bad.') # Clean the error before tearDown() self._sim.error = None def testARPPing(self): """Test that the simulator properly handles a ARP request/response.""" host.SimpleHost(self._sim, '12:34:56:78:90:22', '169.254.11.22') host.SimpleHost(self._sim, '12:34:56:78:90:33', '169.254.11.33') host.SimpleHost(self._sim, '12:34:56:78:90:44', '169.254.11.33') self._sim.start() # arping and wait for one second for the responses. out = subprocess.check_output( ['arping', '-I', self._tap.name, '169.254.11.22', '-c', '1', '-w', '1']) resp = [line for line in out.splitlines() if 'Unicast reply' in line] self.assertEqual(len(resp), 1) self.assertTrue(resp[0].startswith( 'Unicast reply from 169.254.11.22 [12:34:56:78:90:22]')) out = subprocess.check_output( ['arping', '-I', self._tap.name, '169.254.11.33', '-c', '1', '-w', '1']) resp = [line for line in out.splitlines() if 'Unicast reply' in line] self.assertEqual(len(resp), 2) resp.sort() self.assertTrue(resp[0].startswith( 'Unicast reply from 169.254.11.33 [12:34:56:78:90:33]')) self.assertTrue(resp[1].startswith( 'Unicast reply from 169.254.11.33 [12:34:56:78:90:44]')) def testTCPForward(self): """Host can forward TCP traffic back to the kernel network stack.""" h = host.SimpleHost(self._sim, '12:34:56:78:90:22', '169.254.11.22') # Launch two TCP servers on the network interface end. srv1 = InfoTCPServer(self._ip_addr, 1080) srv1.start() srv2 = InfoTCPServer(self._ip_addr, 1081) srv2.start() # Map those two ports to a given IP address on the fake network. h.tcp_forward(80, self._ip_addr, 1080) h.tcp_forward(81, self._ip_addr, 1081) # Start the simulation. self._sim.start() try: srv1data = GetInfoTCP('169.254.11.22', 80) srv2data = GetInfoTCP('169.254.11.22', 81) finally: srv1.stop() srv2.stop() srv1.join() srv2.join() # First connection is seen from the .11.22:1024 client. self.assertEqual(srv1data, '169.254.11.22 1024 1080') # Second connection is seen from the .11.22:1024 client because is made # to a different port. self.assertEqual(srv2data, '169.254.11.22 1024 1081') def testWaitForCondition(self): """Main thread can wait until a condition is met on the simulator.""" self._sim.start() # Wait for an always False condition. condition = lambda: False ret = self._sim.wait_for_condition(condition, timeout=1.5) self.assertFalse(ret) # Wait for a trivially True condition. condition = lambda: True ret = self._sim.wait_for_condition(condition, timeout=10.) self.assertTrue(ret) # Without timeout. ret = self._sim.wait_for_condition(condition, timeout=None) self.assertTrue(ret) # Wait for a condition that takes 3 calls to meet. var = [] condition = lambda: var if len(var) == 3 else var.append(None) ret = self._sim.wait_for_condition(condition, timeout=10.) self.assertEqual(len(ret), 3) if __name__ == '__main__': unittest.main()