1# coding: utf8 2% ProfinetIO layer test campaign 3 4+ Syntax check 5= Import the ProfinetIO layer 6from scapy.contrib.pnio import * 7from scapy.config import conf 8import re 9old_conf_dissector = conf.debug_dissector 10conf.debug_dissector=True 11 12 13+ Check DCE/RPC layer 14 15= ProfinetIO default values 16raw(ProfinetIO()) == b'\x00\x00' 17 18= ProfinetIO overloads Ethertype 19p = Ether() / ProfinetIO() 20p.type == 0x8892 21 22= ProfinetIO overloads UDP dport 23p = UDP() / ProfinetIO() 24p.dport == 0x8892 25 26= Ether guesses ProfinetIO as payload class 27p = Ether(hex_bytes('ffffffffffff00000000000088920102')) 28isinstance(p.payload, ProfinetIO) and p.frameID == 0x0102 29 30= UDP guesses ProfinetIO as payload class 31p = UDP(hex_bytes('12348892000a00000102')) 32isinstance(p.payload, ProfinetIO) and p.frameID == 0x0102 33 34 35+ PNIO RTC PDU tests 36 37= ProfinetIO PNIORealTime_IOxS parsing of a single status 38 39p = PNIORealTime_IOxS(b'\x80') 40assert p.dataState == 1 41assert p.instance == 0 42assert p.reserved == 0 43assert p.extension == 0 44 45p = PNIORealTime_IOxS(b'\xe1') 46assert p.dataState == 1 47assert p.instance == 3 48assert p.reserved == 0 49assert p.extension == 1 50True 51 52= ProfinetIO PNIORealTime_IOxS building of a single status 53p = PNIORealTime_IOxS(dataState = 'good', instance='subslot', extension=0) 54assert raw(p) == b'\x80' 55 56p = PNIORealTime_IOxS(dataState = 'bad', instance='device', extension=1) 57assert raw(p) == b'\x41' 58True 59 60= ProfinetIO PNIORealTime_IOxS parsing with multiple statuses 61TestPacket = type( 62 'TestPacket', 63 (Packet,), 64 { 65 'name': 'TestPacket', 66 'fields_desc': [ 67 PacketListField('data', [], next_cls_cb= PNIORealTime_IOxS.is_extension_set) 68 ], 69 } 70) 71 72p = TestPacket(b'\x81\xe1\x01\x80') 73assert len(p.data) == 4 74assert p.data[0].dataState == 1 75assert p.data[0].instance == 0 76assert p.data[0].reserved == 0 77assert p.data[0].extension == 1 78assert p.data[1].dataState == 1 79assert p.data[1].instance == 3 80assert p.data[1].reserved == 0 81assert p.data[1].extension == 1 82assert p.data[2].dataState == 0 83assert p.data[2].instance == 0 84assert p.data[2].reserved == 0 85assert p.data[2].extension == 1 86assert p.data[3].dataState == 1 87assert p.data[3].instance == 0 88assert p.data[3].reserved == 0 89assert p.data[3].extension == 0 90 91= ProfinetIO RTC PDU parsing without configuration 92p = Ether(b'\x00\x02\x04\x06\x08\x0a\x01\x03\x05\x07\x09\x0B\x88\x92\x80\x00\x01\x02\x03\x04\xf0\x00\x35\x00') 93assert p[Ether].dst == '00:02:04:06:08:0a' 94assert p[Ether].src == '01:03:05:07:09:0b' 95assert p[Ether].type == 0x8892 96assert p[ProfinetIO].frameID == 0x8000 97assert isinstance(p[ProfinetIO].payload, PNIORealTimeCyclicPDU) 98assert len(p[PNIORealTimeCyclicPDU].data) == 1 99assert isinstance(p[PNIORealTimeCyclicPDU].data[0], PNIORealTimeCyclicDefaultRawData) 100assert p[PNIORealTimeCyclicDefaultRawData].data == b'\x01\x02\x03\x04' 101assert p[PNIORealTimeCyclicPDU].padding == b'' 102assert p[PNIORealTimeCyclicPDU].cycleCounter == 0xf000 103assert p[PNIORealTimeCyclicPDU].dataStatus == 0x35 104assert p[PNIORealTimeCyclicPDU].transferStatus == 0 105True 106 107= ProfinetIO RTC PDU building 108p = Ether(src='01:03:05:07:09:0b', dst='00:02:04:06:08:0a')/ProfinetIO(frameID = 'PTCP-RTSyncPDU')/PNIORealTimeCyclicPDU( 109 data=[ 110 PNIORealTimeCyclicPDU.build_fixed_len_raw_type(10)(data = b'\x80'*10) 111 ], 112 padding = b'\x00'*8, 113 cycleCounter = 900, 114 dataStatus = 0x35, 115 transferStatus = 0 116) 117 118assert( 119 raw(p) == \ 120 b'\x00\x02\x04\x06\x08\x0a' \ 121 b'\x01\x03\x05\x07\x09\x0b' \ 122 b'\x88\x92' \ 123 b'\x00\x80' \ 124 b'\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80' \ 125 b'\x00\x00\x00\x00\x00\x00\x00\x00' \ 126 b'\x03\x84' \ 127 b'\x35' \ 128 b'\x00' 129) 130 131= ProfinetIO RTC PDU parsing with config 132 133scapy.config.conf.contribs['PNIO_RTC'][('01:03:05:07:09:0b', '00:02:04:06:08:0a', 0x8010)] = [ 134 PNIORealTimeCyclicPDU.build_fixed_len_raw_type(5), 135 PNIORealTimeCyclicPDU.build_fixed_len_raw_type(3), 136 PNIORealTimeCyclicPDU.build_fixed_len_raw_type(2) 137] 138p = Ether( 139 b'\x00\x02\x04\x06\x08\x0a' \ 140 b'\x01\x03\x05\x07\x09\x0B' \ 141 b'\x88\x92' \ 142 b'\x80\x10' \ 143 b'\x01\x02\x03\x04\x05' \ 144 b'\x01\x02\x03' \ 145 b'\x01\x02' \ 146 b'\x00\x00' \ 147 b'\xf0\x00' \ 148 b'\x35' \ 149 b'\x00' 150) 151 152assert p[Ether].dst == '00:02:04:06:08:0a' 153assert p[Ether].src == '01:03:05:07:09:0b' 154assert p[Ether].type == 0x8892 155assert p[ProfinetIO].frameID == 0x8010 156assert isinstance(p[ProfinetIO].payload, PNIORealTimeCyclicPDU) 157assert len(p[PNIORealTimeCyclicPDU].data) == 3 158assert isinstance(p[PNIORealTimeCyclicPDU].data[0], scapy.config.conf.raw_layer) 159assert p[PNIORealTimeCyclicPDU].data[0].data == b'\x01\x02\x03\x04\x05' 160assert isinstance(p[PNIORealTimeCyclicPDU].data[1], scapy.config.conf.raw_layer) 161assert p[PNIORealTimeCyclicPDU].data[1].data == b'\x01\x02\x03' 162assert isinstance(p[PNIORealTimeCyclicPDU].data[2], scapy.config.conf.raw_layer) 163assert p[PNIORealTimeCyclicPDU].data[2].data == b'\x01\x02' 164assert p[PNIORealTimeCyclicPDU].padding == b'\x00' * 2 165assert p[PNIORealTimeCyclicPDU].cycleCounter == 0xf000 166assert p[PNIORealTimeCyclicPDU].dataStatus == 0x35 167assert p[PNIORealTimeCyclicPDU].transferStatus == 0 168 169p = Ether(b'\x00\x02\x04\x06\x08\x0a\x01\x03\x05\x07\x09\x0B\x88\x92\x80\x00\x01\x02\x03\x04\xf0\x00\x35\x00') 170assert p[Ether].dst == '00:02:04:06:08:0a' 171assert p[Ether].src == '01:03:05:07:09:0b' 172assert p[Ether].type == 0x8892 173assert p[ProfinetIO].frameID == 0x8000 174assert isinstance(p[ProfinetIO].payload, PNIORealTimeCyclicPDU) 175assert len(p[PNIORealTimeCyclicPDU].data) == 1 176assert isinstance(p[PNIORealTimeCyclicPDU].data[0], PNIORealTimeCyclicDefaultRawData) 177assert p[PNIORealTimeCyclicDefaultRawData].data == b'\x01\x02\x03\x04' 178assert p[PNIORealTimeCyclicPDU].padding == b'' 179assert p[PNIORealTimeCyclicPDU].cycleCounter == 0xf000 180assert p[PNIORealTimeCyclicPDU].dataStatus == 0x35 181assert p[PNIORealTimeCyclicPDU].transferStatus == 0 182True 183 184= PROFIsafe parsing (query with F_CRC_SEED=0) 185p = PROFIsafe.build_PROFIsafe_class(PROFIsafeControl, 2)(b'\x80\x80\x40\x01\x02\x03') 186assert p.data == b'\x80\x80' 187assert p.control == 0x40 188assert p.crc == 0x010203 189True 190 191= PROFIsafe parsing (query with F_CRC_SEED=1) 192p = PROFIsafe.build_PROFIsafe_class(PROFIsafeControlCRCSeed, 2)(b'\x80\x80\x40\x01\x02\x03\x04') 193assert p.data == b'\x80\x80' 194assert p.control == 0x40 195assert p.crc == 0x01020304 196True 197 198= PROFIsafe parsing (response with F_CRC_SEED=0) 199p = PROFIsafe.build_PROFIsafe_class(PROFIsafeStatus, 1)(b'\x80\x40\x01\x02\x03') 200assert p.data == b'\x80' 201assert p.status == 0x40 202assert p.crc == 0x010203 203True 204 205= PROFIsafe parsing (response with F_CRC_SEED=1) 206p = PROFIsafe.build_PROFIsafe_class(PROFIsafeStatusCRCSeed, 1)(b'\x80\x40\x01\x02\x03\x04') 207assert p.data == b'\x80' 208assert p.status == 0x40 209assert p.crc == 0x01020304 210True 211 212= PROFIsafe building (query with F_CRC_SEED=0) 213p = PROFIsafe.build_PROFIsafe_class(PROFIsafeControl, 2)(data = b'\x81\x80', control=0x40, crc=0x040506) 214assert raw(p) == b'\x81\x80\x40\x04\x05\x06' 215 216= PROFIsafe building (query with F_CRC_SEED=1) 217p = PROFIsafe.build_PROFIsafe_class(PROFIsafeControlCRCSeed, 2)(data = b'\x81\x80', control=0x02, crc=0x04050607) 218assert raw(p) == b'\x81\x80\x02\x04\x05\x06\x07' 219 220= PROFIsafe building (response with F_CRC_SEED=0) 221p = PROFIsafe.build_PROFIsafe_class(PROFIsafeStatus, 3)(data = b'\x01\x81\x00', status=0x01, crc=0x040506) 222assert raw(p) == b'\x01\x81\x00\x01\x04\x05\x06' 223 224= PROFIsafe building (response with F_CRC_SEED=1) 225p = PROFIsafe.build_PROFIsafe_class(PROFIsafeStatusCRCSeed, 3)(data = b'\x01\x81\x80', status=0x01, crc=0x04050607) 226assert raw(p) == b'\x01\x81\x80\x01\x04\x05\x06\x07' 227 228conf.debug_dissector = old_conf_dissector 229