• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for TestSocket
2
3+ Configuration
4~ conf
5
6= Imports
7
8from test.testsocket import TestSocket, cleanup_testsockets
9
10= Create Dummy Packet
11
12class TestPacket(Packet):
13    fields_desc = [
14        IntField("identifier", 0),
15        StrField("data", b"")
16    ]
17    def answers(self, other):
18        if other.__class__ != self.__class__:
19            return False
20        if self.identifier % 2:
21            return False
22        if self.identifier == (other.identifier + 1):
23            return True
24        return False
25    def hashret(self):
26        return struct.pack('I', self.identifier + (self.identifier % 2))
27
28
29= Create Sockets
30
31sender = TestSocket(TestPacket)
32receiver = TestSocket(TestPacket)
33sender.pair(receiver)
34
35+ Basic tests
36
37= Simple ping pong
38
39def create_answer(p):
40    ans = TestPacket(identifier=p.identifier + 1, data=p.data + b"_answer")
41    receiver.send(ans)
42
43t = AsyncSniffer(timeout=50, prn=create_answer, opened_socket=receiver)
44t.start()
45
46pks = PacketList()
47
48for i in range(1, 2000, 2):
49    txp = TestPacket(identifier=i, data=b"hello"*i)
50    rxp = sender.sr1(txp, verbose=False, timeout=0.5)
51    pks.append(txp)
52    pks.append(rxp)
53
54t.stop(join=True)
55convs = pks.sr()
56
57sender.close()
58receiver.close()
59
60assert len(t.results) == 1000
61assert len(pks) == 2000
62assert len(convs[0]) == 1000
63
64= Simple ping pong with sr with packet generator 500
65
66testlen = 500
67
68sender = TestSocket(TestPacket)
69receiver = TestSocket(TestPacket)
70sender.pair(receiver)
71
72t = AsyncSniffer(timeout=10, prn=create_answer, opened_socket=receiver)
73t.start()
74
75txp = TestPacket(identifier=range(1, testlen * 2, 2), data=b"test1")
76rxp = sender.sr(txp, timeout=10, verbose=False, prebuild=True)
77t.stop(join=True)
78
79print(rxp)
80print(rxp[0].summary())
81
82sender.close()
83receiver.close()
84
85assert len(t.results) == testlen
86assert len(rxp[0]) == testlen
87
88= Simple ping pong with sr with generated packets
89
90sender = TestSocket(TestPacket)
91receiver = TestSocket(TestPacket)
92sender.pair(receiver)
93
94t = AsyncSniffer(timeout=10, prn=create_answer, opened_socket=receiver)
95t.start()
96
97txp = [TestPacket(identifier=i, data=b"hello") for i in range(1, 2000, 2)]
98rxp = sender.sr(txp, timeout=10, verbose=False)
99t.stop(join=True)
100
101print(rxp)
102assert len(t.results) == 1000
103assert len(rxp[0]) == 1000
104
105+ Cleanup
106
107= Delete TestSockets
108
109cleanup_testsockets()