1% Regression tests for TFTP 2 3# More information at http://www.secdev.org/projects/UTscapy/ 4 5+ TFTP coverage tests 6 7= Test answers 8 9assert TFTP_DATA(block=1).answers(TFTP_RRQ()) 10assert not TFTP_WRQ().answers(TFTP_RRQ()) 11assert not TFTP_RRQ().answers(TFTP_WRQ()) 12assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1)) 13assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1)) 14assert TFTP_ACK(block=0).answers(TFTP_RRQ()) 15assert not TFTP_ACK().answers(TFTP_ACK()) 16assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK()) 17assert TFTP_OACK().answers(TFTP_WRQ()) 18 19+ TFTP Automatons 20~ linux 21 22= Utilities 23~ linux 24 25from scapy.automaton import select_objects 26 27class MockTFTPSocket(object): 28 packets = [] 29 def recv(self, n=None): 30 pkt = self.packets.pop(0) 31 return pkt 32 def send(self, *args, **kargs): 33 pass 34 def close(self): 35 pass 36 @classmethod 37 def select(classname, inputs, remain): 38 test = [s for s in inputs if isinstance(s, classname)] 39 if test: 40 if len(test[0].packets): 41 return test 42 else: 43 inputs = [s for s in inputs if not isinstance(s, classname)] 44 return select_objects(inputs, remain) 45 46 47= TFTP_read() automaton 48~ linux 49 50class MockReadSocket(MockTFTPSocket): 51 packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512), 52 IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"] 53 54tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, 55 ll=MockReadSocket, 56 recvsock=MockReadSocket, debug=5) 57 58res = tftp_read.run() 59assert res == (b"P" * 512 + b"<3") 60 61= TFTP_read() automaton error 62~ linux 63 64class MockReadSocket(MockTFTPSocket): 65 packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] 66 67tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, 68 ll=MockReadSocket, 69 recvsock=MockReadSocket) 70 71try: 72 tftp_read.run() 73 assert False 74except Automaton.ErrorState as e: 75 assert "Reached ERROR" in str(e) 76 assert "ERROR Access violation" in str(e) 77 78 79= TFTP_write() automaton 80~ linux 81 82data_received = b"" 83 84class MockWriteSocket(MockTFTPSocket): 85 packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0), 86 IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ] 87 def send(self, *args, **kargs): 88 if len(args) and Raw in args[0]: 89 global data_received 90 data_received += args[0][Raw].load 91 92tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, 93 ll=MockWriteSocket, 94 recvsock=MockWriteSocket) 95 96tftp_write.run() 97assert data_received == (b"P" * 767 + b"Scapy <3") 98 99= TFTP_write() automaton error 100~ linux 101 102class MockWriteSocket(MockTFTPSocket): 103 packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] 104 105tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, 106 ll=MockWriteSocket, 107 recvsock=MockWriteSocket) 108 109try: 110 tftp_write.run() 111 assert False 112except Automaton.ErrorState as e: 113 assert "Reached ERROR" in str(e) 114 assert "ERROR Access violation" in str(e) 115 116 117= TFTP_WRQ_server() automaton 118~ linux 119 120class MockWRQSocket(MockTFTPSocket): 121 packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"), 122 IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512), 123 IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] 124 125tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, 126 ll=MockWRQSocket, 127 recvsock=MockWRQSocket) 128assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3")) 129 130= TFTP_WRQ_server() automaton with options 131~ linux 132 133class MockWRQSocket(MockTFTPSocket): 134 packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), 135 IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100), 136 IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] 137 138tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, 139 ll=MockWRQSocket, 140 recvsock=MockWRQSocket) 141assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3")) 142 143= TFTP_RRQ_server() automaton 144~ linux 145 146sent_data = "P" * 512 + "<3" 147import tempfile 148filename = tempfile.mktemp(suffix=".txt") 149fdesc = open(filename, "w") 150fdesc.write(sent_data) 151fdesc.close() 152 153received_data = "" 154 155class MockRRQSocket(MockTFTPSocket): 156 packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), 157 IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(), 158 IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1), 159 IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ] 160 def send(self, *args, **kargs): 161 if len(args): 162 pkt = args[0] 163 if TFTP_DATA in pkt: 164 global received_data 165 received_data += pkt[Raw].load.decode("utf-8") 166 167tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True, 168 ll=MockRRQSocket, 169 recvsock=MockRRQSocket) 170tftp_rrq.run() 171assert received_data == sent_data 172 173import os 174os.unlink(filename) 175