# MQTT-SN layer unit tests # Copyright (C) 2019 Martine Lenders # # This program is published under GPLv2 license # # Type the following command to start the test # $ test/run_tests -P "load_contrib('mqttsn')" -t test/contrib/mqttsn.uts + Syntax check = Import the MQTT-SN layer from scapy.contrib.mqttsn import * + MQTT-SN protocol test = MQTTSN + MQTTSNAdvertise, packet instantiation and len field adjust p = MQTTSN() / MQTTSNAdvertise(gw_id=142, duration=54403) assert p.len is None assert p.type == ADVERTISE assert p.gw_id == 142 assert p.duration == 54403 b = bytes(p) p = MQTTSN(b) assert p.len == 5 assert p.type == ADVERTISE assert p.gw_id == 142 assert p.duration == 54403 = MQTTSNAdvertise, packet dissection b = b"\x05\x00\x98\x2b\x9a" p = MQTTSN(b) assert p.len == 5 assert p.type == ADVERTISE assert p.gw_id == 0x98 assert p.duration == 0x2b9a = MQTTSNSearchGW, packet instantiation p = MQTTSN() / MQTTSNSearchGW(radius=175) assert p.len is None assert p.type == SEARCHGW assert p.radius == 175 = MQTTSNSearchGW, packet dissection b = b"\x03\x01\xcc" p = MQTTSN(b) assert p.len == 3 assert p.type == SEARCHGW assert p.radius == 0xcc = MQTTSNGwInfo, packet instantiation p = MQTTSN() / MQTTSNGwInfo(gw_id=135, gw_addr="test\0test") assert p.len is None assert p.type == GWINFO assert p.gw_id == 135 assert p.gw_addr == b"test\x00test" = MQTTSN + MQTTSNGwInfo, packet instantiation and len field adjust p = MQTTSN(len=7) / MQTTSNGwInfo(gw_id=7, gw_addr="test") / "xyz" assert p.len == 7 assert p.type == GWINFO assert p.gw_id == 7 assert p.gw_addr == b"test" assert p.load == b"xyz" b = bytes(p) p = MQTTSN(b) assert p.len == 7 assert p.type == GWINFO assert p.gw_id == 7 assert p.gw_addr == b"test" assert p.load == b"xyz" = MQTTSNGwInfo, packet dissection b = b"\x07\x02\x14testing" p = MQTTSN(b) assert p.len == 7 assert p.type == GWINFO assert p.gw_id == 0x14 assert p.gw_addr == b"test" assert p.load == b"ing" = MQTTSNGwInfo, packet dissection - invalid length b = b"\x01\x00\x01\x02\x14test" p = MQTTSN(b) print(type(p), repr(p)) assert p.len == 1 assert p.type == GWINFO assert p.gw_id == 0x14 assert p.gw_addr == b"" = MQTTSNConnect, packet instantiation p = MQTTSN() / MQTTSNConnect(duration=40666, client_id="test") assert p.len is None assert p.type == CONNECT assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.prot_id == 1 assert p.duration == 40666 assert p.client_id == b"test" = MQTTSNConnect, packet dissection b = b"\x0a\x04\x04\x1a\x77\x5btesting" p = MQTTSN(b) assert p.len == 10 assert p.type == CONNECT assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 1 assert p.tid_type == TID_NORMAL assert p.prot_id == 0x1a assert p.duration == 0x775b assert p.client_id == b"test" assert p.load == b"ing" = MQTTSNConnack, packet instantiation p = MQTTSN() / MQTTSNConnack() assert p.len is None assert p.type == CONNACK assert p.return_code == ACCEPTED = MQTTSNConnack, packet dissection b = b"\x03\x05\x02" p = MQTTSN(b) assert p.len == 3 assert p.type == CONNACK assert p.return_code == REJ_TID = MQTTSNWillTopicReq, packet instantiation p = MQTTSN() / MQTTSNWillTopicReq() assert p.len is None assert p.type == WILLTOPICREQ = MQTTSNWillTopicReq, packet dissection b = b"\x02\x06" p = MQTTSN(b) assert p.len == 2 assert p.type == WILLTOPICREQ = MQTTSNWillTopic, packet instantiation p = MQTTSN() / MQTTSNWillTopic(will_topic="/test") assert p.len is None assert p.type == WILLTOPIC assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.will_topic == b"/test" = MQTTSNWillTopic, packet dissection b = b"\x08\x07\x00/testing" p = MQTTSN(b) assert p.len == 8 assert p.type == WILLTOPIC assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.will_topic == b"/test" = MQTTSNWillMsgReq, packet instantiation p = MQTTSN() / MQTTSNWillMsgReq() assert p.len is None assert p.type == WILLMSGREQ = MQTTSNWillMsgReq, packet dissection b = b"\x02\x08" p = MQTTSN(b) assert p.len == 2 assert p.type == WILLMSGREQ = MQTTSNWillMsg, packet instantiation p = MQTTSN() / MQTTSNWillMsg(will_msg="test") assert p.len is None assert p.type == WILLMSG assert p.will_msg == b"test" = MQTTSNWillMsg, packet dissection b = b"\x06\x09testing" p = MQTTSN(b) assert p.len == 6 assert p.type == WILLMSG assert p.will_msg == b"test" assert p.load == b"ing" = MQTTSNRegister, packet instantiation p = MQTTSN() / MQTTSNRegister(mid=30533, topic_name="/test") assert p.len is None assert p.type == REGISTER assert p.tid == 0 assert p.mid == 30533 assert p.topic_name == b"/test" = MQTTSNRegister, packet dissection b = b"\x0b\x0a\0\0\x48\x8a/testing" p = MQTTSN(b) assert p.len == 11 assert p.type == REGISTER assert p.tid == 0 assert p.mid == 0x488a assert p.topic_name == b"/test" assert p.load == b"ing" = MQTTSNRegack, packet instantiation p = MQTTSN() / MQTTSNRegack(tid=61547, mid=8593, return_code=REJ_NOTSUP) assert p.len is None assert p.type == REGACK assert p.tid == 61547 assert p.mid == 8593 assert p.return_code == REJ_NOTSUP = MQTTSNRegack, packet dissection b = b"\x08\x0b\xc5\xe8\x31\x87\x01" p = MQTTSN(b) assert p.len == 8 assert p.type == REGACK assert p.tid == 0xc5e8 assert p.mid == 0x3187 assert p.return_code == REJ_CONJ = MQTTSNPublish, packet instantiation p = MQTTSN() / MQTTSNPublish(qos=QOS_1, tid=52032, mid=35252, data="Hello world!") assert p.len is None assert p.type == PUBLISH assert p.dup == 0 assert p.qos == QOS_1 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.tid == 52032 assert p.mid == 35252 assert p.data == b"Hello world!" = MQTTSNPublish, packet instantiation - long data p = MQTTSN() / MQTTSNPublish(qos=QOS_NEG1, tid=62839, mid=36181, data=726 * "X") assert p.len is None assert p.type == PUBLISH assert p.dup == 0 assert p.qos == QOS_NEG1 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.tid == 62839 assert p.mid == 36181 assert p.data == 726 * b"X" # Check if length field was constructed correctly b = bytes(p) assert b[:3] == b'\x01\x02\xdf' p = MQTTSN(b) assert p.len == 735 assert p.data == 726 * b"X" = MQTTSNPublish, packet dissection b = b"\x0b\x0c\x40\x19\x7f\x6a\x26testing" p = MQTTSN(b) assert p.len == 11 assert p.type == PUBLISH assert p.dup == 0 assert p.qos == QOS_2 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.tid == 0x197f assert p.mid == 0x6a26 assert p.data == b"test" assert p.load == b"ing" = MQTTSNPublish, packet dissection - long data b = b"\x01\x04\x64\x0c" + b"\x00\xb1\x39\xd7\x4a" + (1115 * b"X") p = MQTTSN(b) assert p.len == 0x0464 == (4 + 5 + 1115) assert p.type == PUBLISH assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.tid == 0xb139 assert p.mid == 0xd74a assert p.data == 1115 * b"X" = MQTTSNPuback, packet instantiation p = MQTTSN() / MQTTSNPuback(tid=27610, mid=30284, return_code=ACCEPTED) assert p.len is None assert p.type == PUBACK assert p.tid == 27610 assert p.mid == 30284 assert p.return_code == ACCEPTED = MQTTSNPuback, packet dissection b = b"\x08\x0d\x03\xda\x73\x9a\x02" p = MQTTSN(b) assert p.len == 8 assert p.type == PUBACK assert p.tid == 0x03da assert p.mid == 0x739a assert p.return_code == REJ_TID = MQTTSNPubcomp, packet instantiation p = MQTTSN() / MQTTSNPubcomp(mid=36193) assert p.len is None assert p.type == PUBCOMP assert p.mid == 36193 = MQTTSNPubcomp, packet dissection b = b"\x04\x0e\x26\xa2" p = MQTTSN(b) assert p.len == 4 assert p.type == PUBCOMP assert p.mid == 0x26a2 = MQTTSNPubrec, packet instantiation p = MQTTSN() / MQTTSNPubrec(mid=44837) assert p.len is None assert p.type == PUBREC assert p.mid == 44837 = MQTTSNPubrec, packet dissection b = b"\x04\x0f\x36\xc4" p = MQTTSN(b) assert p.len == 4 assert p.type == PUBREC assert p.mid == 0x36c4 = MQTTSNPubrel, packet instantiation p = MQTTSN() / MQTTSNPubrel(mid=42834) assert p.len is None assert p.type == PUBREL assert p.mid == 42834 = MQTTSNPubrel, packet dissection b = b"\x04\x10\x94\x0f" p = MQTTSN(b) assert p.len == 4 assert p.type == PUBREL assert p.mid == 0x940f = MQTTSNSubscribe, packet instantiation - topic name p = MQTTSN() / MQTTSNSubscribe(mid=63780, topic_name="/test") assert p.len is None assert p.type == SUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.topic_name == b"/test" assert p.short_topic is None assert p.tid is None = MQTTSNSubscribe, packet instantiation - predefined topic ID p = MQTTSN() / MQTTSNSubscribe(mid=63780, tid_type=TID_PREDEF, tid=1187) assert p.len is None assert p.type == SUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_PREDEF assert p.topic_name is None assert p.short_topic is None assert p.tid == 1187 = MQTTSNSubscribe, packet instantiation - short topic p = MQTTSN() / MQTTSNSubscribe(mid=63780, tid_type=TID_SHORT, short_topic="fx") assert p.len is None assert p.type == SUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_SHORT assert p.topic_name is None assert p.short_topic == b"fx" assert p.tid is None = MQTTSNSubscribe, packet dissection - topic name b = b"\x07\x12\x00\x66\x8a/t" p = MQTTSN(b) assert p.len == 7 assert p.type == SUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.mid == 0x668a assert p.topic_name == b"/t" assert p.short_topic is None assert p.tid is None = MQTTSNSubscribe, packet dissection - short topic b = b"\x07\x12\x01\x66\x8a/t" p = MQTTSN(b) assert p.len == 7 assert p.type == SUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_PREDEF assert p.mid == 0x668a assert p.topic_name is None assert p.short_topic is None assert p.tid == (ord("/") << 8 | ord("t")) == 12148 = MQTTSNSubscribe, packet dissection - predefined topic ID b = b"\x07\x12\x02\x66\x8a/t" p = MQTTSN(b) assert p.len == 7 assert p.type == SUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_SHORT assert p.mid == 0x668a assert p.topic_name is None assert p.short_topic == b"/t" assert p.tid is None = MQTTSNSuback, packet instantiation p = MQTTSN() / MQTTSNSuback(qos=QOS_0, tid=5496, mid=63108, return_code=REJ_TID) assert p.len is None assert p.type == SUBACK assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.tid == 5496 assert p.mid == 63108 assert p.return_code == REJ_TID = MQTTSNSuback, packet dissection b = b"\x08\x13\xa4\x93\x0b\x02\xc6\x00" p = MQTTSN(b) assert p.len == 8 assert p.type == SUBACK assert p.dup == 1 assert p.qos == QOS_1 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 1 assert p.tid_type == TID_NORMAL assert p.tid == 0x930b assert p.mid == 0x02c6 assert p.return_code == ACCEPTED = MQTTSNUnsubscribe, packet instantiation - topic name p = MQTTSN() / MQTTSNUnsubscribe(mid=63780, topic_name="/test") assert p.len is None assert p.type == UNSUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.topic_name == b"/test" assert p.short_topic is None assert p.tid is None = MQTTSNUnsubscribe, packet instantiation - predefined topic ID p = MQTTSN() / MQTTSNUnsubscribe(mid=63780, tid_type=TID_PREDEF, tid=1187) assert p.len is None assert p.type == UNSUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_PREDEF assert p.topic_name is None assert p.short_topic is None assert p.tid == 1187 = MQTTSNUnsubscribe, packet instantiation - short topic p = MQTTSN() / MQTTSNUnsubscribe(mid=63780, tid_type=TID_SHORT, short_topic="fx") assert p.len is None assert p.type == UNSUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_SHORT assert p.topic_name is None assert p.short_topic == b"fx" assert p.tid is None = MQTTSNUnsubscribe, packet dissection - topic name b = b"\x07\x14\x00\x66\x8a/t" p = MQTTSN(b) assert p.len == 7 assert p.type == UNSUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.mid == 0x668a assert p.topic_name == b"/t" assert p.short_topic is None assert p.tid is None = MQTTSNUnsubscribe, packet dissection - short topic b = b"\x07\x14\x01\x66\x8a/t" p = MQTTSN(b) assert p.len == 7 assert p.type == UNSUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_PREDEF assert p.mid == 0x668a assert p.topic_name is None assert p.short_topic is None assert p.tid == (ord("/") << 8 | ord("t")) == 12148 = MQTTSNUnsubscribe, packet dissection - predefined topic ID b = b"\x07\x14\x02\x66\x8a/t" p = MQTTSN(b) assert p.len == 7 assert p.type == UNSUBSCRIBE assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_SHORT assert p.mid == 0x668a assert p.topic_name is None assert p.short_topic == b"/t" assert p.tid is None = MQTTSNUnsuback, packet instantiation p = MQTTSN() / MQTTSNUnsuback(mid=44541) assert p.len is None assert p.type == UNSUBACK assert p.mid == 44541 = MQTTSNUnsuback, packet dissection b = b"\x08\x15\xcb\x3d" p = MQTTSN(b) assert p.len == 8 assert p.type == UNSUBACK assert p.mid == 0xcb3d = MQTTSNPingReq, packet instantiation - no client ID p = MQTTSN() / MQTTSNPingReq() assert p.len is None assert p.type == PINGREQ assert p.client_id == b"" = MQTTSNPingReq, packet instantiation - with client ID p = MQTTSN() / MQTTSNPingReq(client_id="test") assert p.len is None assert p.type == PINGREQ assert p.client_id == b"test" = MQTTSNPingReq, packet dissection b = b"\x07\x16hello" p = MQTTSN(b) assert p.len == 7 assert p.type == PINGREQ assert p.client_id == b"hello" = MQTTSNPingResp, packet instantiation p = MQTTSN() / MQTTSNPingResp() assert p.len is None assert p.type == PINGRESP = MQTTSNPingResp, packet dissection b = b"\x02\x17" p = MQTTSN(b) assert p.len == 2 assert p.type == PINGRESP = MQTTSNDisconnect, packet instantiation and len field adjust - w/o duration p = MQTTSN() / MQTTSNDisconnect() assert p.len is None assert p.type == DISCONNECT assert p.duration is None b = bytes(p) p = MQTTSN(b) assert p.len == 2 assert p.type == DISCONNECT = MQTTSNDisconnect, packet instantiation and len field adjust - w duration p = MQTTSN() / MQTTSNDisconnect(duration=19567) assert p.len is None assert p.type == DISCONNECT assert p.duration == 19567 b = bytes(p) p = MQTTSN(b) assert p.len == 4 assert p.type == DISCONNECT assert p.duration == 19567 = MQTTSNDisconnect, packet dissection - w/o duration b = b"\x02\x18" p = MQTTSN(b) assert p.len == 2 assert p.type == DISCONNECT = MQTTSNDisconnect, packet dissection - w duration b = b"\x04\x18\x03\x12" p = MQTTSN(b) assert p.len == 4 assert p.type == DISCONNECT assert p.duration == 0x0312 = MQTTSNWillTopicUpd, packet instantiation p = MQTTSN() / MQTTSNWillTopicUpd(will_topic="/test") assert p.len is None assert p.type == WILLTOPICUPD assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.will_topic == b"/test" = MQTTSNWillTopicUpd, packet dissection b = b"\x08\x1a\x00/testing" p = MQTTSN(b) assert p.len == 8 assert p.type == WILLTOPICUPD assert p.dup == 0 assert p.qos == QOS_0 assert p.retain == 0 assert p.will == 0 assert p.cleansess == 0 assert p.tid_type == TID_NORMAL assert p.will_topic == b"/test" = MQTTSNWillTopicResp, packet instantiation p = MQTTSN() / MQTTSNWillTopicResp() assert p.len is None assert p.type == WILLTOPICRESP assert p.return_code == ACCEPTED = MQTTSNWillTopicResp, packet dissection b = b"\x03\x1b\x02" p = MQTTSN(b) assert p.len == 3 assert p.type == WILLTOPICRESP assert p.return_code == REJ_TID = MQTTSNWillMsgUpd, packet instantiation p = MQTTSN() / MQTTSNWillMsgUpd(will_msg="test") assert p.len is None assert p.type == WILLMSGUPD assert p.will_msg == b"test" = MQTTSNWillMsgUpd, packet dissection b = b"\x06\x1ctesting" p = MQTTSN(b) assert p.len == 6 assert p.type == WILLMSGUPD assert p.will_msg == b"test" assert p.load == b"ing" = MQTTSNWillMsgResp, packet instantiation p = MQTTSN() / MQTTSNWillMsgResp() assert p.len is None assert p.type == WILLMSGRESP assert p.return_code == ACCEPTED = MQTTSNWillMsgResp, packet dissection b = b"\x03\x1d\x02" p = MQTTSN(b) assert p.len == 3 assert p.type == WILLMSGRESP assert p.return_code == REJ_TID = MQTTSNEncaps, packet instantiation p = MQTTSN() / MQTTSNEncaps(radius=1, w_node_id="test") / MQTTSN() / \ MQTTSNConnack() assert p.len is None assert p.type == ENCAPS_MSG assert p.radius == 1 assert p.w_node_id == b"test" assert p.payload.payload.len is None assert p.payload.payload.type == CONNACK assert p.payload.payload.return_code == ACCEPTED b = bytes(p) p = MQTTSN(b) assert p.len == 7 assert p.type == ENCAPS_MSG assert p.radius == 1 assert p.w_node_id == b"test" assert p.return_code == ACCEPTED assert p.payload.payload.len == 3 assert p.payload.payload.type == CONNACK assert p.payload.payload.return_code == ACCEPTED = MQTTSNEncaps, packet dissection b = b"\x07\xfe\x02test\x03\x05\x00" p = MQTTSN(b) assert p.len == 7 assert p.type == ENCAPS_MSG assert p.radius == 2 assert p.w_node_id == b"test" assert p.payload.payload.len == 3 assert p.payload.payload.type == CONNACK assert p.payload.payload.return_code == ACCEPTED = MQTTSNEncaps, packet dissection -- long payload b = b"\x07\xfe\x02test" + b"\x01\x04\x64\x0c" + b"\x00\xb1\x39\xd7\x4a" + \ (1115 * b"X") p = MQTTSN(b) assert p.len == 7 assert p.type == ENCAPS_MSG assert p.radius == 2 assert p.w_node_id == b"test" assert p.payload.payload.len == 4 + 5 + 1115 == 0x0464 assert p.payload.payload.type == PUBLISH assert p.payload.payload.dup == 0 assert p.payload.payload.qos == QOS_0 assert p.payload.payload.retain == 0 assert p.payload.payload.will == 0 assert p.payload.payload.cleansess == 0 assert p.payload.payload.tid_type == TID_NORMAL assert p.payload.payload.tid == 0xb139 assert p.payload.payload.mid == 0xd74a assert p.payload.payload.data == 1115 * b"X" = MQTTSN without payload p = MQTTSN() assert bytes(p) == b"\x02\x00" = MQTTSN without payload -- invalid lengths p = MQTTSN(len=1) try: bytes(p) # expect Scapy_Exception assert false except Scapy_Exception: pass p = MQTTSN(len=0x10000) try: bytes(p) # expect Scapy_Exception assert false except Scapy_Exception: pass b = '\x01' try: p = MQTTSN(b) # expect Scapy_Exception assert false except Scapy_Exception: pass b = '\x01\x02' try: p = MQTTSN(b) # expect Scapy_Exception assert false except Scapy_Exception: pass = MQTT-SN RandVariableFieldLen assert type(MQTTSN().fieldtype["len"].randval()) == RandVariableFieldLen assert type(MQTTSN().fieldtype["len"].randval() + 0) == int = Disect full IPv6 packages ~ dport == 1883 (0x75b) b = b"\x60\x00\x00\x00\x00\x2c\x11\x40\x20\x01\x0d\xb8\x00\x00\x00\x00" \ b"\x17\x11\x6b\x10\x65\xf7\x5f\x0a\x20\x01\x0d\xb8\x00\x00\x00\x00" \ b"\x17\x11\x6b\x10\x65\xfd\xbe\x06\xc0\x00\x07\x5b\x00\x2c\x40\x7e" \ b"\x0b\x0a\0\0\x48\x8a/testing" p = IPv6(b) assert MQTTSNRegister in p ~ sport == 1883 (0x75b) b = b"\x60\x00\x00\x00\x00\x0f\x11\x40\x20\x01\x0d\xb8\x00\x00\x00\x00" \ b"\x17\x11\x6b\x10\x65\xfd\xbe\x06\x20\x01\x0d\xb8\x00\x00\x00\x00" \ b"\x17\x11\x6b\x10\x65\xf7\x5f\x0a\x07\x5b\xc0\x00\x00\x0f\x62\x7c" \ b"\x07\x0d\x00\x01\x86\x2f\x00" p = IPv6(b) assert MQTTSNPuback in p = UDP packet instantiation b = bytes(UDP() / MQTTSN() / MQTTSNConnack()) p = UDP(b) assert MQTTSNConnack in p assert p.sport == 1883 assert p.dport == 1883