######################## % Pipetool related tests ######################## + Basic tests = Test default test case s = PeriodicSource("hello", 1, name="src") d1 = Drain(name="d1") c = ConsoleSink(name="c") tf = TransformDrain(lambda x: "Got %s" % x) s > d1 > c d1 > tf try: t = TermSink(name="PipeToolsPeriodicTest", keepterm=False) tf > t except (IOError, OSError): pass p = PipeEngine(s) p.start() time.sleep(3) s.msg = [] p.stop() = Test add_pipe s = AutoSource() p = PipeEngine(s) p.add(Pipe()) assert len(p.active_pipes) == 2 x = p.spawn_Pipe() assert len(p.active_pipes) == 3 assert isinstance(x, Pipe) = Test exhausted source s = AutoSource() s._gen_data("hello") s.is_exhausted = True d1 = Drain(name="d1") c = ConsoleSink(name="c") s > d1 > c p = PipeEngine(s) p.start() p.wait_and_stop() = Test add_pipe on running instance p = PipeEngine() p.start() s = CLIFeeder() d1 = Drain(name="d1") c = QueueSink(name="c") s > d1 > c p.add(s) s.send("hello") s.send("hi") assert c.q.get(timeout=5) == "hello" assert c.q.get(timeout=5) == "hi" p.stop() = Test Operators s = AutoSource() p = PipeEngine(s) assert p == p a = AutoSource() b = AutoSource() a >> b assert len(a.high_sinks) == 1 assert len(a.high_sources) == 0 assert len(b.high_sinks) == 0 assert len(b.high_sources) == 1 a b a = Sink() b = AutoSource() a << b assert len(a.high_sinks) == 0 assert len(a.high_sources) == 1 assert len(b.high_sinks) == 1 assert len(b.high_sources) == 0 a b a = Sink() b = Sink() a % b assert len(a.sinks) == 1 assert len(a.sources) == 1 assert len(b.sinks) == 1 assert len(b.sources) == 1 a = Sink() b = Sink() a//b assert len(a.high_sinks) == 1 assert len(a.high_sources) == 1 assert len(b.high_sinks) == 1 assert len(b.high_sources) == 1 a = AutoSource() b = Sink() a^b assert len(b.trigger_sources) == 1 assert len(a.trigger_sinks) == 1 = Test doc s = AutoSource() p = PipeEngine(s) p.list_pipes() p.list_pipes_detailed() = Test RawConsoleSink with CLIFeeder p = PipeEngine() s = CLIFeeder() s.send("hello") s.is_exhausted = True r, w = os.pipe() d1 = Drain(name="d1") c = RawConsoleSink(name="c") c._write_pipe = w s > d1 > c p.add(s) p.start() assert os.read(r, 20) == b"hello\n" p.wait_and_stop() = Test QueueSink with CLIFeeder p = PipeEngine() s = CLIFeeder() s.send("hello") s.is_exhausted = True d1 = Drain(name="d1") c = QueueSink(name="c") s > d1 > c p.add(s) p.start() p.wait_and_stop() assert c.recv() == "hello" assert c.recv(block=False) is None = Test UpDrain test_val = None class TestSink(Sink): def high_push(self, msg): global test_val test_val = msg p = PipeEngine() s = CLIFeeder() s.send("hello") s.is_exhausted = True d1 = UpDrain(name="d1") c = TestSink(name="c") s > d1 d1 >> c p.add(s) p.start() p.wait_and_stop() assert test_val == "hello" = Test DownDrain test_val = None class TestSink(Sink): def push(self, msg): global test_val test_val = msg p = PipeEngine() s = CLIHighFeeder() s.send("hello") s.is_exhausted = True d1 = DownDrain(name="d1") c = TestSink(name="c") s >> d1 d1 > c p.add(s) p.start() p.wait_and_stop() assert test_val == "hello" = Test PeriodicSource exhaustion s = PeriodicSource("", 1) s.msg = [] p = PipeEngine(s) p.start() p.wait_and_stop() + Advanced ScapyPipes pipetools tests = Test SniffSource from unittest import mock fd = ObjectPipe("sniffsource") fd.write("test") @mock.patch("scapy.scapypipes.conf.L2listen") def _test(l2listen): l2listen.return_value=Bunch(close=lambda *args: None, fileno=lambda: fd.fileno(), recv=lambda *args: Raw("data")) p = PipeEngine() s = SniffSource() assert s.s is None d1 = Drain(name="d1") c = QueueSink(name="c") s > d1 > c p.add(s) p.start() x = c.q.get(2) assert bytes(x) == b"data" assert s.s is not None p.stop() try: _test() finally: fd.close() = Test SniffSource with socket fd = ObjectPipe("sniffsource_socket") fd.write("test") class FakeSocket(object): def __init__(self): self.times = 0 def recv(self, x=None): if self.times > 2: return self.times += 1 return Raw(b'hello') def fileno(self): return fd.fileno() try: p = PipeEngine() s = SniffSource(socket=FakeSocket()) assert s.s is not None d = Drain() c = QueueSink() p.add(s > d > c) p.start() msg = c.q.get(timeout=1) p.stop() assert raw(msg) == b'hello' finally: fd.close() = Test SniffSource with invalid args try: s = SniffSource(iface='eth0', socket='not a socket') except ValueError: pass else: # expected ValueError assert False = Test exhausted AutoSource and SniffSource from unittest import mock from scapy.error import Scapy_Exception def _fail(): raise Scapy_Exception() a = AutoSource() a._send = mock.MagicMock(side_effect=_fail) a.send("x") try: a.deliver() except: pass s = SniffSource() s.s = mock.MagicMock() s.s.recv = mock.MagicMock(side_effect=_fail) try: s.deliver() except: pass = Test WiresharkSink ~ wiresharksink q = ObjectPipe("wiresharksink") pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() from unittest import mock with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: sink = WiresharkSink() sink.start() sink.push(pkt) q.recv() q.recv() assert raw(pkt) in q.recv() popen.assert_called_once_with( [conf.prog.wireshark, '-Slki', '-'], stdin=subprocess.PIPE, stdout=None, stderr=None) = Test WiresharkSink with linktype ~ wiresharksink linktype = scapy.data.DLT_EN3MB q = ObjectPipe("wiresharksink_linktype") pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() from unittest import mock with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: sink = WiresharkSink(linktype=linktype) sink.start() sink.push(pkt) chb(linktype) in q.recv() q.recv() assert raw(pkt) in q.recv() = Test WiresharkSink with args ~ wiresharksink linktype = scapy.data.DLT_EN3MB q = ObjectPipe("wiresharksink_args") pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() from unittest import mock with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: sink = WiresharkSink(args=['-c', '1']) sink.start() sink.push(pkt) popen.assert_called_once_with( [conf.prog.wireshark, '-Slki', '-', '-c', '1'], stdin=subprocess.PIPE, stdout=None, stderr=None) = Test RdpcapSource and WrpcapSink dname = get_temp_dir() req = Ether(b'E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') rpy = Ether(b'\x8c\xf8\x13C5P\xdcS`\xeb\x80H\x08\x00E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') wrpcap(os.path.join(dname, "t.pcap"), [req, rpy]) p = PipeEngine() s = RdpcapSource(os.path.join(dname, "t.pcap")) d1 = Drain(name="d1") c = WrpcapSink(os.path.join(dname, "t2.pcap.gz"), name="c", gz=1) s > d1 > c p.add(s) p.start() p.wait_and_stop() results = rdpcap(os.path.join(dname, "t2.pcap.gz")) assert raw(results[0]) == raw(req) assert raw(results[1]) == raw(rpy) os.unlink(os.path.join(dname, "t.pcap")) os.unlink(os.path.join(dname, "t2.pcap.gz")) = Test InjectSink and Inject3Sink ~ needs_root from unittest import mock a = IP(dst="192.168.0.1")/ICMP() msgs = [] class FakeSocket(object): def __init__(self, *arg, **karg): pass def close(self): pass def send(self, msg): global msgs msgs.append(msg) @mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket) @mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket) def _inject_sink(i3): s = CLIFeeder() s.send(a) s.is_exhausted = True d1 = Drain(name="d1") c = Inject3Sink() if i3 else InjectSink() s > d1 > c p = PipeEngine(s) p.start() p.wait_and_stop() _inject_sink(False) # InjectSink _inject_sink(True) # Inject3Sink assert msgs == [a,a] = TriggerDrain and TriggeredValve with CLIFeeder s = CLIFeeder() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredValve() c = QueueSink() s > d1 > d2 > c d1 ^ d2 p = PipeEngine(s) p.start() s.send("hello") s.send("trigger") s.send("hello2") s.send("trigger") s.send("hello3") assert c.q.get(timeout=5) == "hello" assert c.q.get(timeout=5) == "trigger" assert c.q.get(timeout=5) == "hello3" p.stop() = TriggerDrain and TriggeredValve with CLIHighFeeder s = CLIHighFeeder() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredValve() c = QueueSink() s >> d1 d1 >> d2 d2 >> c d1 ^ d2 p = PipeEngine(s) p.start() s.send("hello") s.send("trigger") s.send("hello2") s.send("trigger") s.send("hello3") assert c.q.get(timeout=5) == "hello" assert c.q.get(timeout=5) == "trigger" assert c.q.get(timeout=5) == "hello3" p.stop() = TriggerDrain and TriggeredQueueingValve with CLIFeeder s = CLIFeeder() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredValve() c = QueueSink() s > d1 > d2 > c d1 ^ d2 p = PipeEngine(s) p.start() s.send("hello") s.send("trigger") s.send("hello2") s.send("trigger") s.send("hello3") assert c.q.get(timeout=5) == "hello" assert c.q.get(timeout=5) == "trigger" assert c.q.get(timeout=5) == "hello3" p.stop() = TriggerDrain and TriggeredSwitch with CLIFeeder on high channel s = CLIFeeder() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredSwitch() c = QueueSink() s > d1 > d2 d2 >> c d1 ^ d2 p = PipeEngine(s) p.start() s.send("hello") s.send("trigger") s.send("hello2") s.send("trigger") s.send("hello3") assert c.q.get(timeout=5) == "trigger" assert c.q.get(timeout=5) == "hello2" p.stop() = TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel s = CLIHighFeeder() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredSwitch() c = QueueSink() s >> d1 d1 >> d2 d2 > c d1 ^ d2 p = PipeEngine(s) p.start() s.send("hello") s.send("trigger") s.send("hello2") s.send("trigger") s.send("hello3") assert c.q.get(timeout=5) == "hello" assert c.q.get(timeout=5) == "trigger" assert c.q.get(timeout=5) == "hello3" p.stop() = TriggerDrain and TriggeredMessage s = CLIFeeder() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredMessage("hello") c = QueueSink() s > d1 > d2 > c d1 ^ d2 p = PipeEngine(s) p.start() s.send("trigger") r = [c.q.get(timeout=5), c.q.get(timeout=5)] assert "hello" in r assert "trigger" in r p.stop() = TriggerDrain and TriggeredQueueingValve on low channel p = PipeEngine() s = CLIFeeder() r, w = os.pipe() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredQueueingValve() c = QueueSink(name="c") s > d1 > d2 > c d1 ^ d2 p.add(s) p.start() s.send("trigger") s.send("hello") s.send("trigger") assert c.q.get(timeout=3) == "trigger" assert c.q.get(timeout=3) in ['hello', 'trigger'] assert c.q.get(timeout=3) in ['hello', 'trigger'] assert d2.q.qsize() == 0 p.stop() = TriggerDrain and TriggeredQueueingValve on high channel p = PipeEngine() s = CLIHighFeeder() r, w = os.pipe() d1 = TriggerDrain(lambda x:x=="trigger") d2 = TriggeredQueueingValve() c = QueueSink(name="c") s >> d1 >> d2 >> c d1 ^ d2 p.add(s) p.start() s.send("trigger") s.send("hello") s.send("trigger") assert c.q.get(timeout=3) == "trigger" assert c.q.get(timeout=3) == "hello" assert d2.q.qsize() == 0 p.stop() = UDPDrain p = PipeEngine() s = CLIFeeder() s2 = CLIHighFeeder() d1 = UDPDrain() c = QueueSink() s > d1 > c s2 >> d1 >> c p.add(s) p.add(s2) p.start() pkt = DNS() s.send(IP(src="127.0.0.1")/UDP()/DNS()) s2.send(pkt) res = [c.q.get(timeout=2), c.q.get(timeout=2)] assert raw(pkt) in res res.remove(raw(pkt)) assert DNS in res[0] and res[0][UDP].sport == 1234 p.stop() = FDSourceSink on a ObjectPipe object obj = ObjectPipe("fdsourcesink") obj.send("hello") s = FDSourceSink(obj) d = Drain() c = QueueSink() s > d > c s.push("data") s.deliver() assert c.q.get(timeout=1) == "hello" = UDPClientPipe and UDPServerPipe ~ networking needs_root p = PipeEngine() s = CLIFeeder() srv = UDPServerPipe(name="srv", port=10000) cli = UDPClientPipe(name="cli", addr="127.0.0.1", port=10000) c = QueueSink(name="c") s > cli srv > c p.add(s, c) p.start() s.send(b"hello") p.start() assert c.recv() == b"hello" p.stop() srv.stop() = TCPConnectPipe networking test ~ networking needs_root p = PipeEngine() s = CLIFeeder() d1 = TCPConnectPipe(addr="www.google.com", port=80) c = QueueSink() s > d1 > c p.add(s) p.start() from scapy.layers.http import HTTPRequest, HTTP s.send(bytes(HTTP()/HTTPRequest(Host="www.google.com"))) result = c.q.get(timeout=10) p.stop() result assert result.startswith(b"HTTP/1.1 200 OK") or result.startswith(b"HTTP/1.1 302 Found")