• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for TFTP
2
3# More information at http://www.secdev.org/projects/UTscapy/
4
5+ TFTP coverage tests
6
7= Test answers
8
9assert TFTP_DATA(block=1).answers(TFTP_RRQ())
10assert not TFTP_WRQ().answers(TFTP_RRQ())
11assert not TFTP_RRQ().answers(TFTP_WRQ())
12assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1))
13assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1))
14assert TFTP_ACK(block=0).answers(TFTP_RRQ())
15assert not TFTP_ACK().answers(TFTP_ACK())
16assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK())
17assert TFTP_OACK().answers(TFTP_WRQ())
18
19+ TFTP Automatons
20~ linux
21
22= Utilities
23~ linux
24
25from scapy.automaton import select_objects
26
27class MockTFTPSocket(object):
28    packets = []
29    def recv(self, n=None):
30        pkt = self.packets.pop(0)
31        return pkt
32    def send(self, *args, **kargs):
33        pass
34    def close(self):
35        pass
36    @classmethod
37    def select(classname, inputs, remain):
38        test = [s for s in inputs if isinstance(s, classname)]
39        if test:
40            if len(test[0].packets):
41                return test
42            else:
43                inputs = [s for s in inputs if not isinstance(s, classname)]
44        return select_objects(inputs, remain)
45
46
47= TFTP_read() automaton
48~ linux
49
50class MockReadSocket(MockTFTPSocket):
51    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512),
52               IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"]
53
54tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
55                      ll=MockReadSocket,
56                      recvsock=MockReadSocket, debug=5)
57
58res = tftp_read.run()
59assert res == (b"P" * 512 + b"<3")
60
61= TFTP_read() automaton error
62~ linux
63
64class MockReadSocket(MockTFTPSocket):
65    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]
66
67tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
68                      ll=MockReadSocket,
69                      recvsock=MockReadSocket)
70
71try:
72    tftp_read.run()
73    assert False
74except Automaton.ErrorState as e:
75    assert "Reached ERROR" in str(e)
76    assert "ERROR Access violation" in str(e)
77
78
79= TFTP_write() automaton
80~ linux
81
82data_received = b""
83
84class MockWriteSocket(MockTFTPSocket):
85    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0),
86               IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ]
87    def send(self, *args, **kargs):
88        if len(args) and Raw in args[0]:
89            global data_received
90            data_received += args[0][Raw].load
91
92tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
93                        ll=MockWriteSocket,
94                        recvsock=MockWriteSocket)
95
96tftp_write.run()
97assert data_received == (b"P" * 767 + b"Scapy <3")
98
99= TFTP_write() automaton error
100~ linux
101
102class MockWriteSocket(MockTFTPSocket):
103    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]
104
105tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
106                        ll=MockWriteSocket,
107                        recvsock=MockWriteSocket)
108
109try:
110    tftp_write.run()
111    assert False
112except Automaton.ErrorState as e:
113    assert "Reached ERROR" in str(e)
114    assert "ERROR Access violation" in str(e)
115
116
117= TFTP_WRQ_server() automaton
118~ linux
119
120class MockWRQSocket(MockTFTPSocket):
121    packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"),
122               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512),
123               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]
124
125tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
126                           ll=MockWRQSocket,
127                           recvsock=MockWRQSocket)
128assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3"))
129
130= TFTP_WRQ_server() automaton with options
131~ linux
132
133class MockWRQSocket(MockTFTPSocket):
134    packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
135               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100),
136               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]
137
138tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
139                           ll=MockWRQSocket,
140                           recvsock=MockWRQSocket)
141assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3"))
142
143= TFTP_RRQ_server() automaton
144~ linux
145
146sent_data = "P" * 512 + "<3"
147import tempfile
148filename = tempfile.mktemp(suffix=".txt")
149fdesc = open(filename, "w")
150fdesc.write(sent_data)
151fdesc.close()
152
153received_data = ""
154
155class MockRRQSocket(MockTFTPSocket):
156    packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
157               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(),
158               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1),
159               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ]
160    def send(self, *args, **kargs):
161        if len(args):
162            pkt = args[0]
163            if TFTP_DATA in pkt:
164                global received_data
165                received_data += pkt[Raw].load.decode("utf-8")
166
167tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True,
168                           ll=MockRRQSocket,
169                           recvsock=MockRRQSocket)
170tftp_rrq.run()
171assert received_data == sent_data
172
173import os
174os.unlink(filename)
175