% Regression tests for TFTP # More information at http://www.secdev.org/projects/UTscapy/ + TFTP coverage tests = Test answers assert TFTP_DATA(block=1).answers(TFTP_RRQ()) assert not TFTP_WRQ().answers(TFTP_RRQ()) assert not TFTP_RRQ().answers(TFTP_WRQ()) assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1)) assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1)) assert TFTP_ACK(block=0).answers(TFTP_RRQ()) assert not TFTP_ACK().answers(TFTP_ACK()) assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK()) assert TFTP_OACK().answers(TFTP_WRQ()) + TFTP Automatons ~ linux = Utilities ~ linux from scapy.automaton import select_objects class MockTFTPSocket(object): packets = [] def recv(self, n=None): pkt = self.packets.pop(0) return pkt def send(self, *args, **kargs): pass def close(self): pass @classmethod def select(classname, inputs, remain): test = [s for s in inputs if isinstance(s, classname)] if test: if len(test[0].packets): return test else: inputs = [s for s in inputs if not isinstance(s, classname)] return select_objects(inputs, remain) = TFTP_read() automaton ~ linux class MockReadSocket(MockTFTPSocket): packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512), IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"] tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, ll=MockReadSocket, recvsock=MockReadSocket, debug=5) res = tftp_read.run() assert res == (b"P" * 512 + b"<3") = TFTP_read() automaton error ~ linux class MockReadSocket(MockTFTPSocket): packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, ll=MockReadSocket, recvsock=MockReadSocket) try: tftp_read.run() assert False except Automaton.ErrorState as e: assert "Reached ERROR" in str(e) assert "ERROR Access violation" in str(e) = TFTP_write() automaton ~ linux data_received = b"" class MockWriteSocket(MockTFTPSocket): packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0), IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ] def send(self, *args, **kargs): if len(args) and Raw in args[0]: global data_received data_received += args[0][Raw].load tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, ll=MockWriteSocket, recvsock=MockWriteSocket) tftp_write.run() assert data_received == (b"P" * 767 + b"Scapy <3") = TFTP_write() automaton error ~ linux class MockWriteSocket(MockTFTPSocket): packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, ll=MockWriteSocket, recvsock=MockWriteSocket) try: tftp_write.run() assert False except Automaton.ErrorState as e: assert "Reached ERROR" in str(e) assert "ERROR Access violation" in str(e) = TFTP_WRQ_server() automaton ~ linux class MockWRQSocket(MockTFTPSocket): packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"), IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512), IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, ll=MockWRQSocket, recvsock=MockWRQSocket) assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3")) = TFTP_WRQ_server() automaton with options ~ linux class MockWRQSocket(MockTFTPSocket): packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100), IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, ll=MockWRQSocket, recvsock=MockWRQSocket) assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3")) = TFTP_RRQ_server() automaton ~ linux sent_data = "P" * 512 + "<3" import tempfile filename = tempfile.mktemp(suffix=".txt") fdesc = open(filename, "w") fdesc.write(sent_data) fdesc.close() received_data = "" class MockRRQSocket(MockTFTPSocket): packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(), IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1), IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ] def send(self, *args, **kargs): if len(args): pkt = args[0] if TFTP_DATA in pkt: global received_data received_data += pkt[Raw].load.decode("utf-8") tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True, ll=MockRRQSocket, recvsock=MockRRQSocket) tftp_rrq.run() assert received_data == sent_data import os os.unlink(filename)