# MQTT layer unit tests # Copyright (C) Santiago Hernandez Ramos # # Type the following command to launch start the tests: # $ test/run_tests -P "load_contrib('mqtt')" -t test/contrib/mqtt.uts + Syntax check = Import the MQTT layer from scapy.contrib.mqtt import * + MQTT protocol test = MQTTPublish, packet instantiation p = MQTT()/MQTTPublish(topic='test1',value='test2') assert p.type == 3 assert p.topic == b'test1' assert p.value == b'test2' assert p.len == None assert p.length == None = Fixed header and MQTTPublish, packet dissection s = b'0\n\x00\x04testtest' publish = MQTT(s) assert publish.type == 3 assert publish.QOS == 0 assert publish.DUP == 0 assert publish.RETAIN == 0 assert publish.len == 10 assert publish[MQTTPublish].length == 4 assert publish[MQTTPublish].topic == b'test' assert publish[MQTTPublish].value == b'test' = MQTTPublish topicC = "testtopic/command" p1 = MQTT( QOS=1 ) / MQTTPublish( topic=topicC, msgid=1234, value="msg1" ) p2 = MQTT( QOS=1 ) / MQTTPublish( topic=topicC, msgid=1235, value="msg2" ) p = MQTT(raw(p1 / p2)) assert p[1].msgid == 1234 = MQTTConnect, packet instantiation c = MQTT()/MQTTConnect(clientIdlen=5, clientId='newid') assert c.type == 1 assert c.clientId == b'newid' assert c.clientIdlen == 5 = MQTTConnect, packet dissection s = b'\x10\x1f\x00\x06MQIsdp\x03\x02\x00<\x00\x11mosqpub/1440-kali' connect = MQTT(s) assert connect.length == 6 assert connect.protoname == b'MQIsdp' assert connect.protolevel == 3 assert connect.usernameflag == 0 assert connect.passwordflag == 0 assert connect.willretainflag == 0 assert connect.willQOSflag == 0 assert connect.willflag == 0 assert connect.cleansess == 1 assert connect.reserved == 0 assert connect.klive == 60 assert connect.clientIdlen == 17 assert connect.clientId == b'mosqpub/1440-kali' = MQTTDisconnect mr = raw(MQTT()/MQTTDisconnect()) dc= MQTT(mr) assert dc.type == 14 =MQTTConnack, packet instantiation ck = MQTT()/MQTTConnack(sessPresentFlag=1,retcode=0) assert ck.type == 2 assert ck.sessPresentFlag == 1 assert ck.retcode == 0 = MQTTConnack, packet dissection s = b' \x02\x00\x00' connack = MQTT(s) assert connack.sessPresentFlag == 0 assert connack.retcode == 0 = MQTTSubscribe, packet instantiation sb = MQTT()/MQTTSubscribe(msgid=1, topics=[MQTTTopicQOS(topic='newtopic', QOS=1, length=0)]) assert sb.type == 8 assert sb.msgid == 1 assert sb.topics[0].topic == b'newtopic' assert sb.topics[0].length == 0 assert sb[MQTTSubscribe][MQTTTopicQOS].QOS == 1 = MQTTSubscribe, packet dissection s = b'\x82\t\x00\x01\x00\x04test\x01' subscribe = MQTT(s) assert subscribe.msgid == 1 assert subscribe.topics[0].length == 4 assert subscribe.topics[0].topic == b'test' assert subscribe.topics[0].QOS == 1 = MQTTSuback, packet instantiation sk = MQTT()/MQTTSuback(msgid=1, retcodes=[0]) assert sk.type == 9 assert sk.msgid == 1 assert sk.retcodes == [0] = MQTTSuback, packet dissection s = b'\x90\x03\x00\x01\x00' suback = MQTT(s) assert suback.msgid == 1 assert suback.retcodes == [0] s = b'\x90\x03\x00\x01\x00\x01' suback = MQTT(s) assert suback.msgid == 1 assert suback.retcodes == [0, 1] = MQTTUnsubscribe, packet instantiation unsb = MQTT()/MQTTUnsubscribe(msgid=1, topics=[MQTTTopic(topic='newtopic',length=0)]) assert unsb.type == 10 assert unsb.msgid == 1 assert unsb.topics[0].topic == b'newtopic' assert unsb.topics[0].length == 0 = MQTTUnsubscribe, packet dissection u = b'\xA2\x09\x00\x01\x00\x03\x61\x2F\x62' unsubscribe = MQTT(u) assert unsubscribe.msgid == 1 assert unsubscribe.topics[0].length == 3 assert unsubscribe.topics[0].topic == b'a/b' = MQTTUnsuback, packet instantiation unsk = MQTT()/MQTTUnsuback(msgid=1) assert unsk.type == 11 assert unsk.msgid == 1 = MQTTUnsuback, packet dissection u = b'\xb0\x02\x00\x01' unsuback = MQTT(u) assert unsuback.type == 11 assert unsuback.msgid == 1 = MQTTPubrec, packet instantiation pc = MQTT()/MQTTPubrec(msgid=1) assert pc.type == 5 assert pc.msgid == 1 = MQTTPubrec packet dissection s = b'P\x02\x00\x01' pubrec = MQTT(s) assert pubrec.msgid == 1 = MQTTPublish, long value p = MQTT()/MQTTPublish(topic='test1',value='a'*200) assert bytes(p) assert p.type == 3 assert p.topic == b'test1' assert p.value == b'a'*200 assert p.len == None assert p.length == None = MQTT without payload p = MQTT() assert bytes(p) == b'\x10\x00' = MQTT RandVariableFieldLen assert type(MQTT().fieldtype['len'].randval()) == RandVariableFieldLen assert type(MQTT().fieldtype['len'].randval() + 0) == int = MQTTUnsubscribe u = MQTT(b'\xA2\x0C\x00\x01\x00\x03\x61\x2F\x62\x00\x03\x63\x2F\x64') assert MQTTUnsubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d" = MQTTSubscribe u = MQTT(b'\x82\x10\x00\x01\x00\x03\x61\x2F\x62\x02\x00\x03\x63\x2F\x64\x00') assert MQTTSubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d"