• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# MQTT layer unit tests
2# Copyright (C) Santiago Hernandez Ramos <shramos@protonmail.com>
3#
4# Type the following command to launch start the tests:
5# $ test/run_tests -P "load_contrib('mqtt')" -t scapy/contrib/mqtt.uts
6
7+ Syntax check
8= Import the MQTT layer
9from scapy.contrib.mqtt import *
10
11
12+ MQTT protocol test
13
14= MQTTPublish, packet instanciation
15p = MQTT()/MQTTPublish(topic='test1',value='test2')
16assert(p.type == 3)
17assert(p.topic == b'test1')
18assert(p.value == b'test2')
19assert(p.len == None)
20assert(p.length == None)
21
22= Fixed header and MQTTPublish, packet dissection
23s = b'0\n\x00\x04testtest'
24publish = MQTT(s)
25assert(publish.type == 3)
26assert(publish.QOS == 0)
27assert(publish.DUP == 0)
28assert(publish.RETAIN == 0)
29assert(publish.len == 10)
30assert(publish[MQTTPublish].length == 4)
31assert(publish[MQTTPublish].topic == b'test')
32assert(publish[MQTTPublish].value == b'test')
33
34
35= MQTTConnect, packet instanciation
36c = MQTT()/MQTTConnect(clientIdlen=5, clientId='newid')
37assert(c.type == 1)
38assert(c.clientId == b'newid')
39assert(c.clientIdlen == 5)
40
41= MQTTConnect, packet dissection
42s = b'\x10\x1f\x00\x06MQIsdp\x03\x02\x00<\x00\x11mosqpub/1440-kali'
43connect = MQTT(s)
44assert(connect.length == 6)
45assert(connect.protoname == b'MQIsdp')
46assert(connect.protolevel == 3)
47assert(connect.usernameflag == 0)
48assert(connect.passwordflag == 0)
49assert(connect.willretainflag == 0)
50assert(connect.willQOSflag == 0)
51assert(connect.willflag == 0)
52assert(connect.cleansess == 1)
53assert(connect.reserved == 0)
54assert(connect.klive == 60)
55assert(connect.clientIdlen == 17)
56assert(connect.clientId == b'mosqpub/1440-kali')
57
58
59=MQTTConnack, packet instanciation
60ck = MQTT()/MQTTConnack(sessPresentFlag=1,retcode=0)
61assert(ck.type == 2)
62assert(ck.sessPresentFlag == 1)
63assert(ck.retcode == 0)
64
65= MQTTConnack, packet dissection
66s = b' \x02\x00\x00'
67connack = MQTT(s)
68assert(connack.sessPresentFlag == 0)
69assert(connack.retcode == 0)
70
71
72= MQTTSubscribe, packet instanciation
73sb = MQTT()/MQTTSubscribe(msgid=1,topic='newtopic',QOS=0,length=0)
74assert(sb.type == 8)
75assert(sb.msgid == 1)
76assert(sb.topic == b'newtopic')
77assert(sb.length == 0)
78assert(sb[MQTTSubscribe].QOS == 0)
79
80= MQTTSubscribe, packet dissection
81s = b'\x82\t\x00\x01\x00\x04test\x00'
82subscribe = MQTT(s)
83assert(subscribe.msgid == 1)
84assert(subscribe.length == 4)
85assert(subscribe.topic == b'test')
86assert(subscribe.QOS == 1)
87
88
89= MQTTSuback, packet instanciation
90sk = MQTT()/MQTTSuback(msgid=1, retcode=0)
91assert(sk.type == 9)
92assert(sk.msgid == 1)
93assert(sk.retcode == 0)
94
95= MQTTSuback, packet dissection
96s = b'\x90\x03\x00\x01\x00'
97suback = MQTT(s)
98assert(suback.msgid == 1)
99assert(suback.retcode == 0)
100
101
102= MQTTPubrec, packet instanciation
103pc = MQTT()/MQTTPubrec(msgid=1)
104assert(pc.type == 5)
105assert(pc.msgid == 1)
106
107= MQTTPubrec packet dissection
108s = b'P\x02\x00\x01'
109pubrec = MQTT(s)
110assert(pubrec.msgid == 1)
111