1# MQTT layer unit tests 2# Copyright (C) Santiago Hernandez Ramos <shramos@protonmail.com> 3# 4# Type the following command to launch start the tests: 5# $ test/run_tests -P "load_contrib('mqtt')" -t test/contrib/mqtt.uts 6 7+ Syntax check 8= Import the MQTT layer 9from scapy.contrib.mqtt import * 10 11 12+ MQTT protocol test 13 14= MQTTPublish, packet instantiation 15p = MQTT()/MQTTPublish(topic='test1',value='test2') 16assert p.type == 3 17assert p.topic == b'test1' 18assert p.value == b'test2' 19assert p.len == None 20assert p.length == None 21 22= Fixed header and MQTTPublish, packet dissection 23s = b'0\n\x00\x04testtest' 24publish = MQTT(s) 25assert publish.type == 3 26assert publish.QOS == 0 27assert publish.DUP == 0 28assert publish.RETAIN == 0 29assert publish.len == 10 30assert publish[MQTTPublish].length == 4 31assert publish[MQTTPublish].topic == b'test' 32assert publish[MQTTPublish].value == b'test' 33 34= MQTTPublish 35 36topicC = "testtopic/command" 37 38p1 = MQTT( 39 QOS=1 40 ) / MQTTPublish( 41 topic=topicC, 42 msgid=1234, 43 value="msg1" 44 ) 45p2 = MQTT( 46 QOS=1 47 ) / MQTTPublish( 48 topic=topicC, 49 msgid=1235, 50 value="msg2" 51 ) 52 53p = MQTT(raw(p1 / p2)) 54assert p[1].msgid == 1234 55 56= MQTTConnect, packet instantiation 57c = MQTT()/MQTTConnect(clientIdlen=5, clientId='newid') 58assert c.type == 1 59assert c.clientId == b'newid' 60assert c.clientIdlen == 5 61 62= MQTTConnect, packet dissection 63s = b'\x10\x1f\x00\x06MQIsdp\x03\x02\x00<\x00\x11mosqpub/1440-kali' 64connect = MQTT(s) 65assert connect.length == 6 66assert connect.protoname == b'MQIsdp' 67assert connect.protolevel == 3 68assert connect.usernameflag == 0 69assert connect.passwordflag == 0 70assert connect.willretainflag == 0 71assert connect.willQOSflag == 0 72assert connect.willflag == 0 73assert connect.cleansess == 1 74assert connect.reserved == 0 75assert connect.klive == 60 76assert connect.clientIdlen == 17 77assert connect.clientId == b'mosqpub/1440-kali' 78 79= MQTTDisconnect 80mr = raw(MQTT()/MQTTDisconnect()) 81dc= MQTT(mr) 82assert dc.type == 14 83 84=MQTTConnack, packet instantiation 85ck = MQTT()/MQTTConnack(sessPresentFlag=1,retcode=0) 86assert ck.type == 2 87assert ck.sessPresentFlag == 1 88assert ck.retcode == 0 89 90= MQTTConnack, packet dissection 91s = b' \x02\x00\x00' 92connack = MQTT(s) 93assert connack.sessPresentFlag == 0 94assert connack.retcode == 0 95 96 97= MQTTSubscribe, packet instantiation 98sb = MQTT()/MQTTSubscribe(msgid=1, topics=[MQTTTopicQOS(topic='newtopic', QOS=1, length=0)]) 99assert sb.type == 8 100assert sb.msgid == 1 101assert sb.topics[0].topic == b'newtopic' 102assert sb.topics[0].length == 0 103assert sb[MQTTSubscribe][MQTTTopicQOS].QOS == 1 104 105= MQTTSubscribe, packet dissection 106s = b'\x82\t\x00\x01\x00\x04test\x01' 107subscribe = MQTT(s) 108assert subscribe.msgid == 1 109assert subscribe.topics[0].length == 4 110assert subscribe.topics[0].topic == b'test' 111assert subscribe.topics[0].QOS == 1 112 113 114= MQTTSuback, packet instantiation 115sk = MQTT()/MQTTSuback(msgid=1, retcodes=[0]) 116assert sk.type == 9 117assert sk.msgid == 1 118assert sk.retcodes == [0] 119 120= MQTTSuback, packet dissection 121s = b'\x90\x03\x00\x01\x00' 122suback = MQTT(s) 123assert suback.msgid == 1 124assert suback.retcodes == [0] 125 126s = b'\x90\x03\x00\x01\x00\x01' 127suback = MQTT(s) 128assert suback.msgid == 1 129assert suback.retcodes == [0, 1] 130 131= MQTTUnsubscribe, packet instantiation 132unsb = MQTT()/MQTTUnsubscribe(msgid=1, topics=[MQTTTopic(topic='newtopic',length=0)]) 133assert unsb.type == 10 134assert unsb.msgid == 1 135assert unsb.topics[0].topic == b'newtopic' 136assert unsb.topics[0].length == 0 137 138= MQTTUnsubscribe, packet dissection 139u = b'\xA2\x09\x00\x01\x00\x03\x61\x2F\x62' 140unsubscribe = MQTT(u) 141assert unsubscribe.msgid == 1 142assert unsubscribe.topics[0].length == 3 143assert unsubscribe.topics[0].topic == b'a/b' 144 145= MQTTUnsuback, packet instantiation 146unsk = MQTT()/MQTTUnsuback(msgid=1) 147assert unsk.type == 11 148assert unsk.msgid == 1 149 150= MQTTUnsuback, packet dissection 151u = b'\xb0\x02\x00\x01' 152unsuback = MQTT(u) 153assert unsuback.type == 11 154assert unsuback.msgid == 1 155 156= MQTTPubrec, packet instantiation 157pc = MQTT()/MQTTPubrec(msgid=1) 158assert pc.type == 5 159assert pc.msgid == 1 160 161= MQTTPubrec packet dissection 162s = b'P\x02\x00\x01' 163pubrec = MQTT(s) 164assert pubrec.msgid == 1 165 166= MQTTPublish, long value 167p = MQTT()/MQTTPublish(topic='test1',value='a'*200) 168assert bytes(p) 169assert p.type == 3 170assert p.topic == b'test1' 171assert p.value == b'a'*200 172assert p.len == None 173assert p.length == None 174 175= MQTT without payload 176p = MQTT() 177assert bytes(p) == b'\x10\x00' 178 179= MQTT RandVariableFieldLen 180assert type(MQTT().fieldtype['len'].randval()) == RandVariableFieldLen 181assert type(MQTT().fieldtype['len'].randval() + 0) == int 182 183= MQTTUnsubscribe 184u = MQTT(b'\xA2\x0C\x00\x01\x00\x03\x61\x2F\x62\x00\x03\x63\x2F\x64') 185assert MQTTUnsubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d" 186 187= MQTTSubscribe 188u = MQTT(b'\x82\x10\x00\x01\x00\x03\x61\x2F\x62\x02\x00\x03\x63\x2F\x64\x00') 189assert MQTTSubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d" 190