1######################## 2% Pipetool related tests 3######################## 4 5+ Basic tests 6 7= Test default test case 8 9s = PeriodicSource("hello", 1, name="src") 10d1 = Drain(name="d1") 11c = ConsoleSink(name="c") 12tf = TransformDrain(lambda x: "Got %s" % x) 13s > d1 > c 14d1 > tf 15try: 16 t = TermSink(name="PipeToolsPeriodicTest", keepterm=False) 17 tf > t 18except (IOError, OSError): 19 pass 20 21p = PipeEngine(s) 22p.start() 23time.sleep(3) 24s.msg = [] 25p.stop() 26 27= Test add_pipe 28 29s = AutoSource() 30p = PipeEngine(s) 31p.add(Pipe()) 32assert len(p.active_pipes) == 2 33 34x = p.spawn_Pipe() 35assert len(p.active_pipes) == 3 36assert isinstance(x, Pipe) 37 38= Test exhausted source 39 40s = AutoSource() 41s._gen_data("hello") 42s.is_exhausted = True 43d1 = Drain(name="d1") 44c = ConsoleSink(name="c") 45s > d1 > c 46 47p = PipeEngine(s) 48p.start() 49p.wait_and_stop() 50 51= Test add_pipe on running instance 52 53p = PipeEngine() 54p.start() 55 56s = CLIFeeder() 57 58d1 = Drain(name="d1") 59c = QueueSink(name="c") 60s > d1 > c 61 62p.add(s) 63 64s.send("hello") 65s.send("hi") 66 67assert c.q.get(timeout=5) == "hello" 68assert c.q.get(timeout=5) == "hi" 69 70p.stop() 71 72= Test Operators 73 74s = AutoSource() 75p = PipeEngine(s) 76assert p == p 77 78a = AutoSource() 79b = AutoSource() 80a >> b 81assert len(a.high_sinks) == 1 82assert len(a.high_sources) == 0 83assert len(b.high_sinks) == 0 84assert len(b.high_sources) == 1 85a 86b 87 88a = Sink() 89b = AutoSource() 90a << b 91assert len(a.high_sinks) == 0 92assert len(a.high_sources) == 1 93assert len(b.high_sinks) == 1 94assert len(b.high_sources) == 0 95a 96b 97 98a = Sink() 99b = Sink() 100a % b 101assert len(a.sinks) == 1 102assert len(a.sources) == 1 103assert len(b.sinks) == 1 104assert len(b.sources) == 1 105 106a = Sink() 107b = Sink() 108a//b 109assert len(a.high_sinks) == 1 110assert len(a.high_sources) == 1 111assert len(b.high_sinks) == 1 112assert len(b.high_sources) == 1 113 114a = AutoSource() 115b = Sink() 116a^b 117assert len(b.trigger_sources) == 1 118assert len(a.trigger_sinks) == 1 119 120= Test doc 121 122s = AutoSource() 123p = PipeEngine(s) 124p.list_pipes() 125p.list_pipes_detailed() 126 127= Test RawConsoleSink with CLIFeeder 128 129p = PipeEngine() 130 131s = CLIFeeder() 132s.send("hello") 133s.is_exhausted = True 134 135r, w = os.pipe() 136 137d1 = Drain(name="d1") 138c = RawConsoleSink(name="c") 139c._write_pipe = w 140s > d1 > c 141 142p.add(s) 143p.start() 144 145assert os.read(r, 20) == b"hello\n" 146p.wait_and_stop() 147 148= Test QueueSink with CLIFeeder 149 150p = PipeEngine() 151 152s = CLIFeeder() 153s.send("hello") 154s.is_exhausted = True 155 156d1 = Drain(name="d1") 157c = QueueSink(name="c") 158s > d1 > c 159 160p.add(s) 161p.start() 162 163p.wait_and_stop() 164assert c.recv() == "hello" 165assert c.recv(block=False) is None 166 167= Test UpDrain 168 169test_val = None 170 171class TestSink(Sink): 172 def high_push(self, msg): 173 global test_val 174 test_val = msg 175 176p = PipeEngine() 177 178s = CLIFeeder() 179s.send("hello") 180s.is_exhausted = True 181 182d1 = UpDrain(name="d1") 183c = TestSink(name="c") 184s > d1 185d1 >> c 186 187p.add(s) 188p.start() 189 190p.wait_and_stop() 191assert test_val == "hello" 192 193= Test DownDrain 194 195test_val = None 196 197class TestSink(Sink): 198 def push(self, msg): 199 global test_val 200 test_val = msg 201 202p = PipeEngine() 203 204s = CLIHighFeeder() 205s.send("hello") 206s.is_exhausted = True 207 208d1 = DownDrain(name="d1") 209c = TestSink(name="c") 210s >> d1 211d1 > c 212 213p.add(s) 214p.start() 215 216p.wait_and_stop() 217assert test_val == "hello" 218 219= Test PeriodicSource exhaustion 220 221s = PeriodicSource("", 1) 222s.msg = [] 223p = PipeEngine(s) 224p.start() 225p.wait_and_stop() 226 227+ Advanced ScapyPipes pipetools tests 228 229= Test SniffSource 230 231from unittest import mock 232fd = ObjectPipe("sniffsource") 233fd.write("test") 234 235@mock.patch("scapy.scapypipes.conf.L2listen") 236def _test(l2listen): 237 l2listen.return_value=Bunch(close=lambda *args: None, fileno=lambda: fd.fileno(), recv=lambda *args: Raw("data")) 238 p = PipeEngine() 239 s = SniffSource() 240 assert s.s is None 241 d1 = Drain(name="d1") 242 c = QueueSink(name="c") 243 s > d1 > c 244 p.add(s) 245 p.start() 246 x = c.q.get(2) 247 assert bytes(x) == b"data" 248 assert s.s is not None 249 p.stop() 250 251try: 252 _test() 253finally: 254 fd.close() 255 256= Test SniffSource with socket 257 258fd = ObjectPipe("sniffsource_socket") 259fd.write("test") 260 261class FakeSocket(object): 262 def __init__(self): 263 self.times = 0 264 def recv(self, x=None): 265 if self.times > 2: 266 return 267 self.times += 1 268 return Raw(b'hello') 269 def fileno(self): 270 return fd.fileno() 271 272try: 273 p = PipeEngine() 274 s = SniffSource(socket=FakeSocket()) 275 assert s.s is not None 276 d = Drain() 277 c = QueueSink() 278 p.add(s > d > c) 279 p.start() 280 msg = c.q.get(timeout=1) 281 p.stop() 282 assert raw(msg) == b'hello' 283finally: 284 fd.close() 285 286= Test SniffSource with invalid args 287 288try: 289 s = SniffSource(iface='eth0', socket='not a socket') 290except ValueError: 291 pass 292else: 293 # expected ValueError 294 assert False 295 296= Test exhausted AutoSource and SniffSource 297 298from unittest import mock 299from scapy.error import Scapy_Exception 300 301def _fail(): 302 raise Scapy_Exception() 303 304a = AutoSource() 305a._send = mock.MagicMock(side_effect=_fail) 306a.send("x") 307try: 308 a.deliver() 309except: 310 pass 311 312s = SniffSource() 313s.s = mock.MagicMock() 314s.s.recv = mock.MagicMock(side_effect=_fail) 315try: 316 s.deliver() 317except: 318 pass 319 320= Test WiresharkSink 321~ wiresharksink 322 323q = ObjectPipe("wiresharksink") 324pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() 325 326from unittest import mock 327with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: 328 sink = WiresharkSink() 329 sink.start() 330 331sink.push(pkt) 332 333q.recv() 334q.recv() 335assert raw(pkt) in q.recv() 336 337popen.assert_called_once_with( 338 [conf.prog.wireshark, '-Slki', '-'], stdin=subprocess.PIPE, stdout=None, 339 stderr=None) 340 341= Test WiresharkSink with linktype 342~ wiresharksink 343 344linktype = scapy.data.DLT_EN3MB 345q = ObjectPipe("wiresharksink_linktype") 346pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() 347 348from unittest import mock 349with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: 350 sink = WiresharkSink(linktype=linktype) 351 sink.start() 352 353sink.push(pkt) 354 355chb(linktype) in q.recv() 356q.recv() 357assert raw(pkt) in q.recv() 358 359= Test WiresharkSink with args 360~ wiresharksink 361 362linktype = scapy.data.DLT_EN3MB 363q = ObjectPipe("wiresharksink_args") 364pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() 365 366from unittest import mock 367with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: 368 sink = WiresharkSink(args=['-c', '1']) 369 sink.start() 370 371sink.push(pkt) 372 373popen.assert_called_once_with( 374 [conf.prog.wireshark, '-Slki', '-', '-c', '1'], 375 stdin=subprocess.PIPE, stdout=None, stderr=None) 376 377= Test RdpcapSource and WrpcapSink 378 379dname = get_temp_dir() 380 381req = Ether(b'E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') 382rpy = Ether(b'\x8c\xf8\x13C5P\xdcS`\xeb\x80H\x08\x00E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') 383 384wrpcap(os.path.join(dname, "t.pcap"), [req, rpy]) 385 386p = PipeEngine() 387 388s = RdpcapSource(os.path.join(dname, "t.pcap")) 389d1 = Drain(name="d1") 390c = WrpcapSink(os.path.join(dname, "t2.pcap.gz"), name="c", gz=1) 391s > d1 > c 392p.add(s) 393p.start() 394p.wait_and_stop() 395 396results = rdpcap(os.path.join(dname, "t2.pcap.gz")) 397 398assert raw(results[0]) == raw(req) 399assert raw(results[1]) == raw(rpy) 400 401os.unlink(os.path.join(dname, "t.pcap")) 402os.unlink(os.path.join(dname, "t2.pcap.gz")) 403 404= Test InjectSink and Inject3Sink 405~ needs_root 406 407from unittest import mock 408 409a = IP(dst="192.168.0.1")/ICMP() 410msgs = [] 411 412class FakeSocket(object): 413 def __init__(self, *arg, **karg): 414 pass 415 def close(self): 416 pass 417 def send(self, msg): 418 global msgs 419 msgs.append(msg) 420 421@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket) 422@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket) 423def _inject_sink(i3): 424 s = CLIFeeder() 425 s.send(a) 426 s.is_exhausted = True 427 d1 = Drain(name="d1") 428 c = Inject3Sink() if i3 else InjectSink() 429 s > d1 > c 430 p = PipeEngine(s) 431 p.start() 432 p.wait_and_stop() 433 434_inject_sink(False) # InjectSink 435_inject_sink(True) # Inject3Sink 436 437assert msgs == [a,a] 438 439= TriggerDrain and TriggeredValve with CLIFeeder 440 441s = CLIFeeder() 442d1 = TriggerDrain(lambda x:x=="trigger") 443d2 = TriggeredValve() 444c = QueueSink() 445 446s > d1 > d2 > c 447d1 ^ d2 448 449p = PipeEngine(s) 450p.start() 451 452s.send("hello") 453s.send("trigger") 454s.send("hello2") 455s.send("trigger") 456s.send("hello3") 457 458assert c.q.get(timeout=5) == "hello" 459assert c.q.get(timeout=5) == "trigger" 460assert c.q.get(timeout=5) == "hello3" 461 462p.stop() 463 464= TriggerDrain and TriggeredValve with CLIHighFeeder 465 466s = CLIHighFeeder() 467d1 = TriggerDrain(lambda x:x=="trigger") 468d2 = TriggeredValve() 469c = QueueSink() 470 471s >> d1 472d1 >> d2 473d2 >> c 474d1 ^ d2 475 476p = PipeEngine(s) 477p.start() 478 479s.send("hello") 480s.send("trigger") 481s.send("hello2") 482s.send("trigger") 483s.send("hello3") 484 485assert c.q.get(timeout=5) == "hello" 486assert c.q.get(timeout=5) == "trigger" 487assert c.q.get(timeout=5) == "hello3" 488 489p.stop() 490 491= TriggerDrain and TriggeredQueueingValve with CLIFeeder 492 493s = CLIFeeder() 494d1 = TriggerDrain(lambda x:x=="trigger") 495d2 = TriggeredValve() 496c = QueueSink() 497 498s > d1 > d2 > c 499d1 ^ d2 500 501p = PipeEngine(s) 502p.start() 503 504s.send("hello") 505s.send("trigger") 506s.send("hello2") 507s.send("trigger") 508s.send("hello3") 509 510assert c.q.get(timeout=5) == "hello" 511assert c.q.get(timeout=5) == "trigger" 512assert c.q.get(timeout=5) == "hello3" 513 514p.stop() 515 516= TriggerDrain and TriggeredSwitch with CLIFeeder on high channel 517 518s = CLIFeeder() 519d1 = TriggerDrain(lambda x:x=="trigger") 520d2 = TriggeredSwitch() 521c = QueueSink() 522 523s > d1 > d2 524d2 >> c 525d1 ^ d2 526 527p = PipeEngine(s) 528p.start() 529 530s.send("hello") 531s.send("trigger") 532s.send("hello2") 533s.send("trigger") 534s.send("hello3") 535 536assert c.q.get(timeout=5) == "trigger" 537assert c.q.get(timeout=5) == "hello2" 538 539p.stop() 540 541= TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel 542 543s = CLIHighFeeder() 544d1 = TriggerDrain(lambda x:x=="trigger") 545d2 = TriggeredSwitch() 546c = QueueSink() 547 548s >> d1 549d1 >> d2 550d2 > c 551d1 ^ d2 552 553p = PipeEngine(s) 554p.start() 555 556s.send("hello") 557s.send("trigger") 558s.send("hello2") 559s.send("trigger") 560s.send("hello3") 561 562assert c.q.get(timeout=5) == "hello" 563assert c.q.get(timeout=5) == "trigger" 564assert c.q.get(timeout=5) == "hello3" 565 566p.stop() 567 568= TriggerDrain and TriggeredMessage 569 570s = CLIFeeder() 571d1 = TriggerDrain(lambda x:x=="trigger") 572d2 = TriggeredMessage("hello") 573c = QueueSink() 574 575s > d1 > d2 > c 576d1 ^ d2 577 578p = PipeEngine(s) 579p.start() 580 581s.send("trigger") 582 583r = [c.q.get(timeout=5), c.q.get(timeout=5)] 584assert "hello" in r 585assert "trigger" in r 586 587p.stop() 588 589= TriggerDrain and TriggeredQueueingValve on low channel 590 591p = PipeEngine() 592 593s = CLIFeeder() 594r, w = os.pipe() 595 596d1 = TriggerDrain(lambda x:x=="trigger") 597d2 = TriggeredQueueingValve() 598c = QueueSink(name="c") 599s > d1 > d2 > c 600d1 ^ d2 601 602p.add(s) 603p.start() 604 605s.send("trigger") 606s.send("hello") 607s.send("trigger") 608assert c.q.get(timeout=3) == "trigger" 609assert c.q.get(timeout=3) in ['hello', 'trigger'] 610assert c.q.get(timeout=3) in ['hello', 'trigger'] 611assert d2.q.qsize() == 0 612 613p.stop() 614 615= TriggerDrain and TriggeredQueueingValve on high channel 616 617p = PipeEngine() 618 619s = CLIHighFeeder() 620r, w = os.pipe() 621 622d1 = TriggerDrain(lambda x:x=="trigger") 623d2 = TriggeredQueueingValve() 624c = QueueSink(name="c") 625s >> d1 >> d2 >> c 626d1 ^ d2 627 628p.add(s) 629p.start() 630 631s.send("trigger") 632s.send("hello") 633s.send("trigger") 634assert c.q.get(timeout=3) == "trigger" 635assert c.q.get(timeout=3) == "hello" 636assert d2.q.qsize() == 0 637 638p.stop() 639 640= UDPDrain 641 642p = PipeEngine() 643 644s = CLIFeeder() 645s2 = CLIHighFeeder() 646d1 = UDPDrain() 647c = QueueSink() 648 649s > d1 > c 650s2 >> d1 >> c 651 652p.add(s) 653p.add(s2) 654p.start() 655 656pkt = DNS() 657 658s.send(IP(src="127.0.0.1")/UDP()/DNS()) 659s2.send(pkt) 660 661res = [c.q.get(timeout=2), c.q.get(timeout=2)] 662assert raw(pkt) in res 663res.remove(raw(pkt)) 664assert DNS in res[0] and res[0][UDP].sport == 1234 665 666p.stop() 667 668= FDSourceSink on a ObjectPipe object 669 670obj = ObjectPipe("fdsourcesink") 671obj.send("hello") 672 673s = FDSourceSink(obj) 674d = Drain() 675c = QueueSink() 676s > d > c 677 678s.push("data") 679s.deliver() 680assert c.q.get(timeout=1) == "hello" 681 682= UDPClientPipe and UDPServerPipe 683~ networking needs_root 684 685p = PipeEngine() 686 687s = CLIFeeder() 688srv = UDPServerPipe(name="srv", port=10000) 689cli = UDPClientPipe(name="cli", addr="127.0.0.1", port=10000) 690c = QueueSink(name="c") 691 692s > cli 693srv > c 694 695p.add(s, c) 696p.start() 697 698s.send(b"hello") 699p.start() 700assert c.recv() == b"hello" 701p.stop() 702srv.stop() 703 704= TCPConnectPipe networking test 705~ networking needs_root 706 707p = PipeEngine() 708 709s = CLIFeeder() 710d1 = TCPConnectPipe(addr="www.google.com", port=80) 711c = QueueSink() 712 713s > d1 > c 714 715p.add(s) 716p.start() 717 718from scapy.layers.http import HTTPRequest, HTTP 719s.send(bytes(HTTP()/HTTPRequest(Host="www.google.com"))) 720result = c.q.get(timeout=10) 721p.stop() 722 723result 724assert result.startswith(b"HTTP/1.1 200 OK") or result.startswith(b"HTTP/1.1 302 Found") 725 726