• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1########################
2% Pipetool related tests
3########################
4
5+ Basic tests
6
7= Test default test case
8
9s = PeriodicSource("hello", 1, name="src")
10d1 = Drain(name="d1")
11c = ConsoleSink(name="c")
12tf = TransformDrain(lambda x: "Got %s" % x)
13s > d1 > c
14d1 > tf
15try:
16  t = TermSink(name="PipeToolsPeriodicTest", keepterm=False)
17  tf > t
18except (IOError, OSError):
19  pass
20
21p = PipeEngine(s)
22p.start()
23time.sleep(3)
24s.msg = []
25p.stop()
26
27= Test add_pipe
28
29s = AutoSource()
30p = PipeEngine(s)
31p.add(Pipe())
32assert len(p.active_pipes) == 2
33
34x = p.spawn_Pipe()
35assert len(p.active_pipes) == 3
36assert isinstance(x, Pipe)
37
38= Test exhausted source
39
40s = AutoSource()
41s._gen_data("hello")
42s.is_exhausted = True
43d1 = Drain(name="d1")
44c = ConsoleSink(name="c")
45s > d1 > c
46
47p = PipeEngine(s)
48p.start()
49p.wait_and_stop()
50
51= Test add_pipe on running instance
52
53p = PipeEngine()
54p.start()
55
56s = CLIFeeder()
57
58d1 = Drain(name="d1")
59c = QueueSink(name="c")
60s > d1 > c
61
62p.add(s)
63
64s.send("hello")
65s.send("hi")
66
67assert c.q.get(timeout=5) == "hello"
68assert c.q.get(timeout=5) == "hi"
69
70p.stop()
71
72= Test Operators
73
74s = AutoSource()
75p = PipeEngine(s)
76assert p == p
77
78a = AutoSource()
79b = AutoSource()
80a >> b
81assert len(a.high_sinks) == 1
82assert len(a.high_sources) == 0
83assert len(b.high_sinks) == 0
84assert len(b.high_sources) == 1
85a
86b
87
88a = Sink()
89b = AutoSource()
90a << b
91assert len(a.high_sinks) == 0
92assert len(a.high_sources) == 1
93assert len(b.high_sinks) == 1
94assert len(b.high_sources) == 0
95a
96b
97
98a = Sink()
99b = Sink()
100a % b
101assert len(a.sinks) == 1
102assert len(a.sources) == 1
103assert len(b.sinks) == 1
104assert len(b.sources) == 1
105
106a = Sink()
107b = Sink()
108a//b
109assert len(a.high_sinks) == 1
110assert len(a.high_sources) == 1
111assert len(b.high_sinks) == 1
112assert len(b.high_sources) == 1
113
114a = AutoSource()
115b = Sink()
116a^b
117assert len(b.trigger_sources) == 1
118assert len(a.trigger_sinks) == 1
119
120= Test doc
121
122s = AutoSource()
123p = PipeEngine(s)
124p.list_pipes()
125p.list_pipes_detailed()
126
127= Test RawConsoleSink with CLIFeeder
128
129p = PipeEngine()
130
131s = CLIFeeder()
132s.send("hello")
133s.is_exhausted = True
134
135r, w = os.pipe()
136
137d1 = Drain(name="d1")
138c = RawConsoleSink(name="c")
139c._write_pipe = w
140s > d1 > c
141
142p.add(s)
143p.start()
144
145assert os.read(r, 20) == b"hello\n"
146p.wait_and_stop()
147
148= Test QueueSink with CLIFeeder
149
150p = PipeEngine()
151
152s = CLIFeeder()
153s.send("hello")
154s.is_exhausted = True
155
156d1 = Drain(name="d1")
157c = QueueSink(name="c")
158s > d1 > c
159
160p.add(s)
161p.start()
162
163p.wait_and_stop()
164assert c.recv() == "hello"
165assert c.recv(block=False) is None
166
167= Test UpDrain
168
169test_val = None
170
171class TestSink(Sink):
172    def high_push(self, msg):
173        global test_val
174        test_val = msg
175
176p = PipeEngine()
177
178s = CLIFeeder()
179s.send("hello")
180s.is_exhausted = True
181
182d1 = UpDrain(name="d1")
183c = TestSink(name="c")
184s > d1
185d1 >> c
186
187p.add(s)
188p.start()
189
190p.wait_and_stop()
191assert test_val == "hello"
192
193= Test DownDrain
194
195test_val = None
196
197class TestSink(Sink):
198    def push(self, msg):
199        global test_val
200        test_val = msg
201
202p = PipeEngine()
203
204s = CLIHighFeeder()
205s.send("hello")
206s.is_exhausted = True
207
208d1 = DownDrain(name="d1")
209c = TestSink(name="c")
210s >> d1
211d1 > c
212
213p.add(s)
214p.start()
215
216p.wait_and_stop()
217assert test_val == "hello"
218
219= Test PeriodicSource exhaustion
220
221s = PeriodicSource("", 1)
222s.msg = []
223p = PipeEngine(s)
224p.start()
225p.wait_and_stop()
226
227+ Advanced ScapyPipes pipetools tests
228
229= Test SniffSource
230
231from unittest import mock
232fd = ObjectPipe("sniffsource")
233fd.write("test")
234
235@mock.patch("scapy.scapypipes.conf.L2listen")
236def _test(l2listen):
237    l2listen.return_value=Bunch(close=lambda *args: None, fileno=lambda: fd.fileno(), recv=lambda *args: Raw("data"))
238    p = PipeEngine()
239    s = SniffSource()
240    assert s.s is None
241    d1 = Drain(name="d1")
242    c = QueueSink(name="c")
243    s > d1 > c
244    p.add(s)
245    p.start()
246    x = c.q.get(2)
247    assert bytes(x) == b"data"
248    assert s.s is not None
249    p.stop()
250
251try:
252    _test()
253finally:
254    fd.close()
255
256= Test SniffSource with socket
257
258fd = ObjectPipe("sniffsource_socket")
259fd.write("test")
260
261class FakeSocket(object):
262    def __init__(self):
263        self.times = 0
264    def recv(self, x=None):
265        if self.times > 2:
266            return
267        self.times += 1
268        return Raw(b'hello')
269    def fileno(self):
270        return fd.fileno()
271
272try:
273    p = PipeEngine()
274    s = SniffSource(socket=FakeSocket())
275    assert s.s is not None
276    d = Drain()
277    c = QueueSink()
278    p.add(s > d > c)
279    p.start()
280    msg = c.q.get(timeout=1)
281    p.stop()
282    assert raw(msg) == b'hello'
283finally:
284    fd.close()
285
286= Test SniffSource with invalid args
287
288try:
289    s = SniffSource(iface='eth0', socket='not a socket')
290except ValueError:
291    pass
292else:
293    # expected ValueError
294    assert False
295
296= Test exhausted AutoSource and SniffSource
297
298from unittest import mock
299from scapy.error import Scapy_Exception
300
301def _fail():
302    raise Scapy_Exception()
303
304a = AutoSource()
305a._send = mock.MagicMock(side_effect=_fail)
306a.send("x")
307try:
308    a.deliver()
309except:
310    pass
311
312s = SniffSource()
313s.s = mock.MagicMock()
314s.s.recv = mock.MagicMock(side_effect=_fail)
315try:
316    s.deliver()
317except:
318    pass
319
320= Test WiresharkSink
321~ wiresharksink
322
323q = ObjectPipe("wiresharksink")
324pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP()
325
326from unittest import mock
327with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen:
328    sink = WiresharkSink()
329    sink.start()
330
331sink.push(pkt)
332
333q.recv()
334q.recv()
335assert raw(pkt) in q.recv()
336
337popen.assert_called_once_with(
338    [conf.prog.wireshark, '-Slki', '-'], stdin=subprocess.PIPE, stdout=None,
339    stderr=None)
340
341= Test WiresharkSink with linktype
342~ wiresharksink
343
344linktype = scapy.data.DLT_EN3MB
345q = ObjectPipe("wiresharksink_linktype")
346pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP()
347
348from unittest import mock
349with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen:
350    sink = WiresharkSink(linktype=linktype)
351    sink.start()
352
353sink.push(pkt)
354
355chb(linktype) in q.recv()
356q.recv()
357assert raw(pkt) in q.recv()
358
359= Test WiresharkSink with args
360~ wiresharksink
361
362linktype = scapy.data.DLT_EN3MB
363q = ObjectPipe("wiresharksink_args")
364pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP()
365
366from unittest import mock
367with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen:
368    sink = WiresharkSink(args=['-c', '1'])
369    sink.start()
370
371sink.push(pkt)
372
373popen.assert_called_once_with(
374    [conf.prog.wireshark, '-Slki', '-', '-c', '1'],
375    stdin=subprocess.PIPE, stdout=None, stderr=None)
376
377= Test RdpcapSource and WrpcapSink
378
379dname = get_temp_dir()
380
381req = Ether(b'E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
382rpy = Ether(b'\x8c\xf8\x13C5P\xdcS`\xeb\x80H\x08\x00E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
383
384wrpcap(os.path.join(dname, "t.pcap"), [req, rpy])
385
386p = PipeEngine()
387
388s = RdpcapSource(os.path.join(dname, "t.pcap"))
389d1 = Drain(name="d1")
390c = WrpcapSink(os.path.join(dname, "t2.pcap.gz"), name="c", gz=1)
391s > d1 > c
392p.add(s)
393p.start()
394p.wait_and_stop()
395
396results = rdpcap(os.path.join(dname, "t2.pcap.gz"))
397
398assert raw(results[0]) == raw(req)
399assert raw(results[1]) == raw(rpy)
400
401os.unlink(os.path.join(dname, "t.pcap"))
402os.unlink(os.path.join(dname, "t2.pcap.gz"))
403
404= Test InjectSink and Inject3Sink
405~ needs_root
406
407from unittest import mock
408
409a = IP(dst="192.168.0.1")/ICMP()
410msgs = []
411
412class FakeSocket(object):
413    def __init__(self, *arg, **karg):
414        pass
415    def close(self):
416        pass
417    def send(self, msg):
418        global msgs
419        msgs.append(msg)
420
421@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
422@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
423def _inject_sink(i3):
424    s = CLIFeeder()
425    s.send(a)
426    s.is_exhausted = True
427    d1 = Drain(name="d1")
428    c = Inject3Sink() if i3 else InjectSink()
429    s > d1 > c
430    p = PipeEngine(s)
431    p.start()
432    p.wait_and_stop()
433
434_inject_sink(False) # InjectSink
435_inject_sink(True) # Inject3Sink
436
437assert msgs == [a,a]
438
439= TriggerDrain and TriggeredValve with CLIFeeder
440
441s = CLIFeeder()
442d1 = TriggerDrain(lambda x:x=="trigger")
443d2 = TriggeredValve()
444c = QueueSink()
445
446s > d1 > d2 > c
447d1 ^ d2
448
449p = PipeEngine(s)
450p.start()
451
452s.send("hello")
453s.send("trigger")
454s.send("hello2")
455s.send("trigger")
456s.send("hello3")
457
458assert c.q.get(timeout=5) == "hello"
459assert c.q.get(timeout=5) == "trigger"
460assert c.q.get(timeout=5) == "hello3"
461
462p.stop()
463
464= TriggerDrain and TriggeredValve with CLIHighFeeder
465
466s = CLIHighFeeder()
467d1 = TriggerDrain(lambda x:x=="trigger")
468d2 = TriggeredValve()
469c = QueueSink()
470
471s >> d1
472d1 >> d2
473d2 >> c
474d1 ^ d2
475
476p = PipeEngine(s)
477p.start()
478
479s.send("hello")
480s.send("trigger")
481s.send("hello2")
482s.send("trigger")
483s.send("hello3")
484
485assert c.q.get(timeout=5) == "hello"
486assert c.q.get(timeout=5) == "trigger"
487assert c.q.get(timeout=5) == "hello3"
488
489p.stop()
490
491= TriggerDrain and TriggeredQueueingValve with CLIFeeder
492
493s = CLIFeeder()
494d1 = TriggerDrain(lambda x:x=="trigger")
495d2 = TriggeredValve()
496c = QueueSink()
497
498s > d1 > d2 > c
499d1 ^ d2
500
501p = PipeEngine(s)
502p.start()
503
504s.send("hello")
505s.send("trigger")
506s.send("hello2")
507s.send("trigger")
508s.send("hello3")
509
510assert c.q.get(timeout=5) == "hello"
511assert c.q.get(timeout=5) == "trigger"
512assert c.q.get(timeout=5) == "hello3"
513
514p.stop()
515
516= TriggerDrain and TriggeredSwitch with CLIFeeder on high channel
517
518s = CLIFeeder()
519d1 = TriggerDrain(lambda x:x=="trigger")
520d2 = TriggeredSwitch()
521c = QueueSink()
522
523s > d1 > d2
524d2 >> c
525d1 ^ d2
526
527p = PipeEngine(s)
528p.start()
529
530s.send("hello")
531s.send("trigger")
532s.send("hello2")
533s.send("trigger")
534s.send("hello3")
535
536assert c.q.get(timeout=5) == "trigger"
537assert c.q.get(timeout=5) == "hello2"
538
539p.stop()
540
541= TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel
542
543s = CLIHighFeeder()
544d1 = TriggerDrain(lambda x:x=="trigger")
545d2 = TriggeredSwitch()
546c = QueueSink()
547
548s >> d1
549d1 >> d2
550d2 > c
551d1 ^ d2
552
553p = PipeEngine(s)
554p.start()
555
556s.send("hello")
557s.send("trigger")
558s.send("hello2")
559s.send("trigger")
560s.send("hello3")
561
562assert c.q.get(timeout=5) == "hello"
563assert c.q.get(timeout=5) == "trigger"
564assert c.q.get(timeout=5) == "hello3"
565
566p.stop()
567
568= TriggerDrain and TriggeredMessage
569
570s = CLIFeeder()
571d1 = TriggerDrain(lambda x:x=="trigger")
572d2 = TriggeredMessage("hello")
573c = QueueSink()
574
575s > d1 > d2 > c
576d1 ^ d2
577
578p = PipeEngine(s)
579p.start()
580
581s.send("trigger")
582
583r = [c.q.get(timeout=5), c.q.get(timeout=5)]
584assert "hello" in r
585assert "trigger" in r
586
587p.stop()
588
589= TriggerDrain and TriggeredQueueingValve on low channel
590
591p = PipeEngine()
592
593s = CLIFeeder()
594r, w = os.pipe()
595
596d1 = TriggerDrain(lambda x:x=="trigger")
597d2 = TriggeredQueueingValve()
598c = QueueSink(name="c")
599s > d1 > d2 > c
600d1 ^ d2
601
602p.add(s)
603p.start()
604
605s.send("trigger")
606s.send("hello")
607s.send("trigger")
608assert c.q.get(timeout=3) == "trigger"
609assert c.q.get(timeout=3) in ['hello', 'trigger']
610assert c.q.get(timeout=3) in ['hello', 'trigger']
611assert d2.q.qsize() == 0
612
613p.stop()
614
615= TriggerDrain and TriggeredQueueingValve on high channel
616
617p = PipeEngine()
618
619s = CLIHighFeeder()
620r, w = os.pipe()
621
622d1 = TriggerDrain(lambda x:x=="trigger")
623d2 = TriggeredQueueingValve()
624c = QueueSink(name="c")
625s >> d1 >> d2 >> c
626d1 ^ d2
627
628p.add(s)
629p.start()
630
631s.send("trigger")
632s.send("hello")
633s.send("trigger")
634assert c.q.get(timeout=3) == "trigger"
635assert c.q.get(timeout=3) == "hello"
636assert d2.q.qsize() == 0
637
638p.stop()
639
640= UDPDrain
641
642p = PipeEngine()
643
644s = CLIFeeder()
645s2 = CLIHighFeeder()
646d1 = UDPDrain()
647c = QueueSink()
648
649s > d1 > c
650s2 >> d1 >> c
651
652p.add(s)
653p.add(s2)
654p.start()
655
656pkt = DNS()
657
658s.send(IP(src="127.0.0.1")/UDP()/DNS())
659s2.send(pkt)
660
661res = [c.q.get(timeout=2), c.q.get(timeout=2)]
662assert raw(pkt) in res
663res.remove(raw(pkt))
664assert DNS in res[0] and res[0][UDP].sport == 1234
665
666p.stop()
667
668= FDSourceSink on a ObjectPipe object
669
670obj = ObjectPipe("fdsourcesink")
671obj.send("hello")
672
673s = FDSourceSink(obj)
674d = Drain()
675c = QueueSink()
676s > d > c
677
678s.push("data")
679s.deliver()
680assert c.q.get(timeout=1) == "hello"
681
682= UDPClientPipe and UDPServerPipe
683~ networking needs_root
684
685p = PipeEngine()
686
687s = CLIFeeder()
688srv = UDPServerPipe(name="srv", port=10000)
689cli = UDPClientPipe(name="cli", addr="127.0.0.1", port=10000)
690c = QueueSink(name="c")
691
692s > cli
693srv > c
694
695p.add(s, c)
696p.start()
697
698s.send(b"hello")
699p.start()
700assert c.recv() == b"hello"
701p.stop()
702srv.stop()
703
704= TCPConnectPipe networking test
705~ networking needs_root
706
707p = PipeEngine()
708
709s = CLIFeeder()
710d1 = TCPConnectPipe(addr="www.google.com", port=80)
711c = QueueSink()
712
713s > d1 > c
714
715p.add(s)
716p.start()
717
718from scapy.layers.http import HTTPRequest, HTTP
719s.send(bytes(HTTP()/HTTPRequest(Host="www.google.com")))
720result = c.q.get(timeout=10)
721p.stop()
722
723result
724assert result.startswith(b"HTTP/1.1 200 OK") or result.startswith(b"HTTP/1.1 302 Found")
725
726