• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# MQTT layer unit tests
2# Copyright (C) Santiago Hernandez Ramos <shramos@protonmail.com>
3#
4# Type the following command to launch start the tests:
5# $ test/run_tests -P "load_contrib('mqtt')" -t test/contrib/mqtt.uts
6
7+ Syntax check
8= Import the MQTT layer
9from scapy.contrib.mqtt import *
10
11
12+ MQTT protocol test
13
14= MQTTPublish, packet instantiation
15p = MQTT()/MQTTPublish(topic='test1',value='test2')
16assert p.type == 3
17assert p.topic == b'test1'
18assert p.value == b'test2'
19assert p.len == None
20assert p.length == None
21
22= Fixed header and MQTTPublish, packet dissection
23s = b'0\n\x00\x04testtest'
24publish = MQTT(s)
25assert publish.type == 3
26assert publish.QOS == 0
27assert publish.DUP == 0
28assert publish.RETAIN == 0
29assert publish.len == 10
30assert publish[MQTTPublish].length == 4
31assert publish[MQTTPublish].topic == b'test'
32assert publish[MQTTPublish].value == b'test'
33
34= MQTTPublish
35
36topicC = "testtopic/command"
37
38p1 = MQTT(
39            QOS=1
40        ) / MQTTPublish(
41            topic=topicC,
42            msgid=1234,
43            value="msg1"
44        )
45p2 = MQTT(
46            QOS=1
47        ) / MQTTPublish(
48            topic=topicC,
49            msgid=1235,
50            value="msg2"
51        )
52
53p = MQTT(raw(p1 / p2))
54assert p[1].msgid == 1234
55
56= MQTTConnect, packet instantiation
57c = MQTT()/MQTTConnect(clientIdlen=5, clientId='newid')
58assert c.type == 1
59assert c.clientId == b'newid'
60assert c.clientIdlen == 5
61
62= MQTTConnect, packet dissection
63s = b'\x10\x1f\x00\x06MQIsdp\x03\x02\x00<\x00\x11mosqpub/1440-kali'
64connect = MQTT(s)
65assert connect.length == 6
66assert connect.protoname == b'MQIsdp'
67assert connect.protolevel == 3
68assert connect.usernameflag == 0
69assert connect.passwordflag == 0
70assert connect.willretainflag == 0
71assert connect.willQOSflag == 0
72assert connect.willflag == 0
73assert connect.cleansess == 1
74assert connect.reserved == 0
75assert connect.klive == 60
76assert connect.clientIdlen == 17
77assert connect.clientId == b'mosqpub/1440-kali'
78
79= MQTTDisconnect
80mr = raw(MQTT()/MQTTDisconnect())
81dc= MQTT(mr)
82assert dc.type == 14
83
84=MQTTConnack, packet instantiation
85ck = MQTT()/MQTTConnack(sessPresentFlag=1,retcode=0)
86assert ck.type == 2
87assert ck.sessPresentFlag == 1
88assert ck.retcode == 0
89
90= MQTTConnack, packet dissection
91s = b' \x02\x00\x00'
92connack = MQTT(s)
93assert connack.sessPresentFlag == 0
94assert connack.retcode == 0
95
96
97= MQTTSubscribe, packet instantiation
98sb = MQTT()/MQTTSubscribe(msgid=1, topics=[MQTTTopicQOS(topic='newtopic', QOS=1, length=0)])
99assert sb.type == 8
100assert sb.msgid == 1
101assert sb.topics[0].topic == b'newtopic'
102assert sb.topics[0].length == 0
103assert sb[MQTTSubscribe][MQTTTopicQOS].QOS == 1
104
105= MQTTSubscribe, packet dissection
106s = b'\x82\t\x00\x01\x00\x04test\x01'
107subscribe = MQTT(s)
108assert subscribe.msgid == 1
109assert subscribe.topics[0].length == 4
110assert subscribe.topics[0].topic == b'test'
111assert subscribe.topics[0].QOS == 1
112
113
114= MQTTSuback, packet instantiation
115sk = MQTT()/MQTTSuback(msgid=1, retcodes=[0])
116assert sk.type == 9
117assert sk.msgid == 1
118assert sk.retcodes == [0]
119
120= MQTTSuback, packet dissection
121s = b'\x90\x03\x00\x01\x00'
122suback = MQTT(s)
123assert suback.msgid == 1
124assert suback.retcodes == [0]
125
126s = b'\x90\x03\x00\x01\x00\x01'
127suback = MQTT(s)
128assert suback.msgid == 1
129assert suback.retcodes == [0, 1]
130
131= MQTTUnsubscribe, packet instantiation
132unsb = MQTT()/MQTTUnsubscribe(msgid=1, topics=[MQTTTopic(topic='newtopic',length=0)])
133assert unsb.type == 10
134assert unsb.msgid == 1
135assert unsb.topics[0].topic == b'newtopic'
136assert unsb.topics[0].length == 0
137
138= MQTTUnsubscribe, packet dissection
139u = b'\xA2\x09\x00\x01\x00\x03\x61\x2F\x62'
140unsubscribe = MQTT(u)
141assert unsubscribe.msgid == 1
142assert unsubscribe.topics[0].length == 3
143assert unsubscribe.topics[0].topic == b'a/b'
144
145= MQTTUnsuback, packet instantiation
146unsk = MQTT()/MQTTUnsuback(msgid=1)
147assert unsk.type == 11
148assert unsk.msgid == 1
149
150= MQTTUnsuback, packet dissection
151u = b'\xb0\x02\x00\x01'
152unsuback = MQTT(u)
153assert unsuback.type == 11
154assert unsuback.msgid == 1
155
156= MQTTPubrec, packet instantiation
157pc = MQTT()/MQTTPubrec(msgid=1)
158assert pc.type == 5
159assert pc.msgid == 1
160
161= MQTTPubrec packet dissection
162s = b'P\x02\x00\x01'
163pubrec = MQTT(s)
164assert pubrec.msgid == 1
165
166= MQTTPublish, long value
167p = MQTT()/MQTTPublish(topic='test1',value='a'*200)
168assert bytes(p)
169assert p.type == 3
170assert p.topic == b'test1'
171assert p.value == b'a'*200
172assert p.len == None
173assert p.length == None
174
175= MQTT without payload
176p = MQTT()
177assert bytes(p) == b'\x10\x00'
178
179= MQTT RandVariableFieldLen
180assert type(MQTT().fieldtype['len'].randval()) == RandVariableFieldLen
181assert type(MQTT().fieldtype['len'].randval() + 0) == int
182
183= MQTTUnsubscribe
184u = MQTT(b'\xA2\x0C\x00\x01\x00\x03\x61\x2F\x62\x00\x03\x63\x2F\x64')
185assert MQTTUnsubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d"
186
187= MQTTSubscribe
188u = MQTT(b'\x82\x10\x00\x01\x00\x03\x61\x2F\x62\x02\x00\x03\x63\x2F\x64\x00')
189assert MQTTSubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d"
190