% PNIO RPC layer test campaign + Syntax check = Import the PNIO RPC layer from scapy.layers.dcerpc import * from scapy.contrib.pnio import * from scapy.contrib.pnio_rpc import * = Check that we have UUIDs for v in RPC_INTERFACE_UUID.values(): assert isinstance(v, UUID) + Check Block = Block default values bytes(Block()) == bytearray.fromhex('000000020100') = Block basic example bytes(Block(load=b'\x01\x02\x03')) == bytearray.fromhex('000000050100010203') = Block has no payload (only padding) p = Block(bytearray.fromhex('000000050100010203040506')) p == Block(block_length=5, load=b'\x01\x02\x03') / conf.padding_layer(b'\x04\x05\x06') #################################################################### #################################################################### + Check IODControlReq = IODControlReq default values bytes(IODControlReq()) == bytearray.fromhex('0000001c01000000000000000000000000000000000000000000000000000000') = IODControlReq basic example (IODControlReq PrmEnd control) bytes(IODControlReq(ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=2, ControlCommand_PrmEnd=1)) == bytearray.fromhex('0110001c010000000123456789abcdef0123456789abcdef0002000000010000') = IODControlReq dissection p = IODControlReq(bytearray.fromhex('0118001c010000000123456789abcdef0123456789abcdef0005000000400000ef')) p == IODControlReq(ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=5, ControlCommand_PrmBegin=1, block_type='IODBlockReq_connect_begin', block_length=28, padding=b'\0\0') / conf.padding_layer(b'\xef') = IODControlReq response p = p.get_response() p == IODControlRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=5, block_type='IODBlockRes_connect_begin') #################################################################### + Check IODControlRes = IODControlRes default values bytes(IODControlRes()) == bytearray.fromhex('8110001c01000000000000000000000000000000000000000000000000080000') = IODControlRes basic example (IODControlRes PrmEnd control) bytes(IODControlRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=2, block_type='IODBlockRes_connect_end')) == bytearray.fromhex('8110001c010000000123456789abcdef0123456789abcdef0002000000080000') = IODControlRes dissection p = IODControlRes(bytearray.fromhex('8118001c010000000123456789abcdef0123456789abcdef0005000000080000ef')) p == IODControlRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=5, block_type='IODBlockRes_connect_begin', block_length=28, padding=b'\0\0') / conf.padding_layer(b'\xef') #################################################################### #################################################################### + Check IODWriteReq = IODWriteReq default values bytes(IODWriteReq()) == bytearray.fromhex('0008003c010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000') = IODWriteReq basic example bytes(IODWriteReq( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321 ) / b'\xab\xcd' ) == bytearray.fromhex('0008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000002000000000000000000000000000000000000000000000000abcd') = IODWriteReq dissection p = IODWriteReq(bytearray.fromhex('0008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000002000000000000000000000000000000000000000000000000abcdef')) p == IODWriteReq(ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321, block_length=60, recordDataLength=2, padding='\0\0', RWPadding=b'\0'*24 ) / b'\xab\xcd' / conf.padding_layer(b'\xef') = IODWriteReq response p = p.get_response() p == IODWriteRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321) #################################################################### + Check IODWriteRes = IODWriteRes default values bytes(IODWriteRes()) == bytearray.fromhex('8008003c010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000') = IODWriteRes basic example bytes(IODWriteRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321 )) == bytearray.fromhex('8008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000000000000000000000000000000000000000000000000000000') = IODWriteRes dissection p = IODWriteRes(bytearray.fromhex('8008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000000000000000000000000000000000000000000000000000000ef')) p == IODWriteRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321, recordDataLength=0, block_length=60, padding=b'\0\0', RWPadding=b'\0'*16 ) / conf.padding_layer(b'\xef') #################################################################### #################################################################### + Check IODWriteMultipleReq ######### = IODWriteMultipleReq default values bytes(IODWriteMultipleReq()) == bytearray.fromhex('0008003c0100000000000000000000000000000000000000ffffffffffffffff0000e04000000000000000000000000000000000000000000000000000000000') ######### = IODWriteMultipleReq basic example bytes(IODWriteMultipleReq(ARUUID='01234567-89ab-cdef-0123-456789abcdef', blocks=[ IODWriteReq( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321 ) / b'\xab\xcd', IODWriteReq( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=2, API=2, slotNumber=3, subslotNumber=4, index=0x1234 ) / b'\x01\x02', ]) ) == bytearray.fromhex('0008003c010000000123456789abcdef0123456789abcdefffffffffffffffff0000e04000000086000000000000000000000000000000000000000000000000') + \ bytearray.fromhex('0008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000002000000000000000000000000000000000000000000000000abcd') + b'\0\0' + \ bytearray.fromhex('0008003c010000020123456789abcdef0123456789abcdef000000020003000400001234000000020000000000000000000000000000000000000000000000000102') ######### = IODWriteMultipleReq dissection p = IODWriteMultipleReq( bytearray.fromhex('0008003c010000000123456789abcdef0123456789abcdefffffffffffffffff0000e04000000086000000000000000000000000000000000000000000000000') + \ bytearray.fromhex('0008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000002000000000000000000000000000000000000000000000000abcd') + b'\0\0' + \ bytearray.fromhex('0008003c010000020123456789abcdef0123456789abcdef000000020003000400001234000000020000000000000000000000000000000000000000000000000102') + b'\xef' ) p == IODWriteMultipleReq(ARUUID='01234567-89ab-cdef-0123-456789abcdef', recordDataLength=0x86, padding=b'\0'*2, RWPadding=b'\0'*24, block_length=60, blocks=[ IODWriteReq( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321, recordDataLength=2, padding=b'\0'*2, RWPadding=b'\0'*24, block_length=60 ) / b'\xab\xcd', IODWriteReq( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=2, API=2, slotNumber=3, subslotNumber=4, index=0x1234, recordDataLength=2, padding=b'\0'*2, RWPadding=b'\0'*24, block_length=60 ) / b'\x01\x02', ]) / conf.padding_layer(b'\xef') ######### = IODWriteMultipleReq response p = p.get_response() p == IODWriteMultipleRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', blocks=[ IODWriteRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321 ), IODWriteRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=2, API=2, slotNumber=3, subslotNumber=4, index=0x1234 ), ]) #################################################################### + Check IODWriteMultipleRes = IODWriteMultipleRes default values bytes(IODWriteMultipleRes()) == bytearray.fromhex('8008003c0100000000000000000000000000000000000000ffffffffffffffff0000e04000000000000000000000000000000000000000000000000000000000') = IODWriteMultipleRes basic example bytes(IODWriteMultipleRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', blocks=[ IODWriteRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321 ), IODWriteRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=2, API=2, slotNumber=3, subslotNumber=4, index=0x1234 ), ]) ) == bytearray.fromhex('8008003c010000000123456789abcdef0123456789abcdefffffffffffffffff0000e04000000080000000000000000000000000000000000000000000000000') + \ bytearray.fromhex('8008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000000000000000000000000000000000000000000000000000000') + \ bytearray.fromhex('8008003c010000020123456789abcdef0123456789abcdef00000002000300040000123400000000000000000000000000000000000000000000000000000000') = IODWriteMultipleRes dissection p = IODWriteMultipleRes( bytearray.fromhex('8008003c010000000123456789abcdef0123456789abcdefffffffffffffffff0000e04000000080000000000000000000000000000000000000000000000000') + \ bytearray.fromhex('8008003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000000000000000000000000000000000000000000000000000000') + \ bytearray.fromhex('8008003c010000020123456789abcdef0123456789abcdef00000002000300040000123400000000000000000000000000000000000000000000000000000000') + b'\xef' ) p == IODWriteMultipleRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', recordDataLength=0x80, padding=b'\0'*2, RWPadding=b'\0'*16, block_length=60, blocks=[ IODWriteRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321, recordDataLength=0, padding=b'\0'*2, RWPadding=b'\0'*16, block_length=60 ), IODWriteRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=2, API=2, slotNumber=3, subslotNumber=4, index=0x1234, recordDataLength=0, padding=b'\0'*2, RWPadding=b'\0'*16, block_length=60 ), ]) / conf.padding_layer(b'\xef') #################################################################### + Check IODReadReq = IODReadReq default values bytes(IODReadReq()) == bytearray.fromhex('0009003c010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000') = IODReadReq basic example bytes(IODReadReq( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321) ) == bytearray.fromhex('0009003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000000000000000000000000000000000000000000000000000000') = IODReadReq dissection p = IODReadReq(bytearray.fromhex('0009003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000002000000000000000000000000000000000000000000000000abcdef')) p == IODReadReq(ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321, block_length=60, recordDataLength=2, padding='\0\0', RWPadding=b'\0'*24 ) / b'\xab\xcd' / conf.padding_layer(b'\xef') = IODReadReq response p = p.get_response() p == IODReadRes(ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321) #################################################################### + Check IODReadRes = IODReadRes default values bytes(IODReadRes()) == bytearray.fromhex('8009003c010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000') = IODReadRes basic example bytes(IODReadRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321)) == bytearray.fromhex('8009003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000000000000000000000000000000000000000000000000000000') = IODReadRes dissection p = IODReadRes(bytearray.fromhex('8009003c010000010123456789abcdef0123456789abcdef00000001000200030000432100000000000000000000000000000000000000000000000000000000ef')) p == IODReadRes( ARUUID='01234567-89ab-cdef-0123-456789abcdef', seqNum=1, API=1, slotNumber=2, subslotNumber=3, index=0x4321, recordDataLength=0, block_length=60, padding=b'\0\0', RWPadding=b'\0'*20 ) / conf.padding_layer(b'\xef') #################################################################### #################################################################### + Check I&M = IM0Block default values raw(IM0Block()) == bytearray.fromhex('002000380100000000000000000000000000000000000000000000000000000000000000000000000000000000005600000000000000000001010000') = IM0Block basic example raw(IM0Block(OrderID='foobar', IMSerialNumber='ABCDEF1234567890')) == bytearray.fromhex('0020003801000000666f6f62617200000000000000000000000000004142434445463132333435363738393000005600000000000000000001010000') = IM1Block default values raw(IM1Block()) == bytearray.fromhex('002100380100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000') = IM2Block default values raw(IM2Block()) == bytearray.fromhex('00220012010000000000000000000000000000000000') = IM3Block default values raw(IM3Block()) == bytearray.fromhex('002300380100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000') = IM4Block default values raw(IM4Block()) == bytearray.fromhex('002400380100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000') #################################################################### #################################################################### + Check ARBlockReq = ARBlockReq default values bytes(ARBlockReq()) == bytearray.fromhex('0101003601000001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000103e888920000') = ARBlockReq basic example bytes(ARBlockReq( ARType='IOCARSingle', ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=1, CMInitiatorObjectUUID='dea00000-6c97-11d1-8271-010203040506', ARProperties_ParametrizationServer='CM_Initator', CMInitiatorStationName='plc1') ) == bytearray.fromhex('0101003a010000010123456789abcdef0123456789abcdef0001000000000000dea000006c9711d182710102030405060000001103e888920004') + b'plc1' = ARBlockReq dissection p = ARBlockReq(bytearray.fromhex('0101003a010000010123456789abcdef0123456789abcdef0001010203040506dea000006c9711d182710102030405060000001103e888920004') + b'plc1' + b'\xef') p == ARBlockReq( ARType='IOCARSingle', ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=1, CMInitiatorObjectUUID='dea00000-6c97-11d1-8271-010203040506', ARProperties_ParametrizationServer='CM_Initator', CMInitiatorMacAdd='01:02:03:04:05:06', StationNameLength=4, CMInitiatorStationName='plc1', block_length=58 ) / conf.padding_layer(b'\xef') = ARBlockReq response p = p.get_response() p == ARBlockRes(ARType='IOCARSingle', ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=1) #################################################################### + Check ARBlockRes = ARBlockRes default values bytes(ARBlockRes()) == bytearray.fromhex('8101001e010000010000000000000000000000000000000000000000000000008892') = ARBlockRes basic example bytes( ARBlockRes(ARType='IOCARSingle', ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=1) ) == bytearray.fromhex('8101001e010000010123456789abcdef0123456789abcdef00010000000000008892') = ARBlockRes dissection p = ARBlockRes(bytearray.fromhex('8101001e010000010123456789abcdef0123456789abcdef00010102030405068892ef')) p == ARBlockRes( ARType='IOCARSingle', ARUUID='01234567-89ab-cdef-0123-456789abcdef', SessionKey=1, CMResponderMacAdd='01:02:03:04:05:06', block_length=30) / conf.padding_layer(b'\xef') #################################################################### #################################################################### + Check IOCRBlockReq = IOCRBlockReq default values bytes(IOCRBlockReq()) == bytearray.fromhex('0102002a010000010001889200000000002880000020002000010000ffffffff000a000ac0000000000000000000') = IOCRAPI default values bytes(IOCRAPI()) == bytearray.fromhex('0000000000000000') = IOCRAPIObject default values bytes(IOCRAPIObject()) == bytearray.fromhex('000000000000') = IOCRBlockReq basic example p = IOCRBlockReq( IOCRType='OutputCR', IOCRReference=1, SendClockFactor=2, ReductionRatio=32, DataLength=47, FrameID=0x8014, APIs=[ IOCRAPI( API=4, IODataObjects=[ IOCRAPIObject(SlotNumber=2, SubslotNumber=1, FrameOffset=0), IOCRAPIObject(SlotNumber=1, SubslotNumber=1, FrameOffset=9), ] ), IOCRAPI( API=0, IODataObjects=[ IOCRAPIObject(SlotNumber=3, SubslotNumber=1, FrameOffset=15), ], IOCSs=[ IOCRAPIObject(SlotNumber=3, SubslotNumber=1, FrameOffset=4), ], ), ] ) bytes(p) == \ bytearray.fromhex('01020052010000020001889200000000002f80140002002000010000ffffffff000a000ac0000000000000000002' + \ '0000000400020002000100000001000100090000' + \ '00000000000100030001000f0001000300010004') = IOCRBlockReq dissection p = IOCRBlockReq( bytearray.fromhex('01020052010000020001889200000000002f80140002002000010000ffffffff000a000ac0000102030405060002' + \ '0000000400020002000100000001000100090000' + \ '00000000000100030001000f0001000300010004') + b'\xef') p == IOCRBlockReq( IOCRType='OutputCR', IOCRReference=1, SendClockFactor=2, IOCRMulticastMACAdd='01:02:03:04:05:06', ReductionRatio=32, DataLength=47, FrameID=0x8014, block_length=82, NumberOfAPIs=2, APIs=[ IOCRAPI( API=4, NumberOfIODataObjects=2, IODataObjects=[ IOCRAPIObject(SlotNumber=2, SubslotNumber=1, FrameOffset=0), IOCRAPIObject(SlotNumber=1, SubslotNumber=1, FrameOffset=9), ], NumberOfIOCS=0 ), IOCRAPI( API=0, NumberOfIODataObjects=1, IODataObjects=[ IOCRAPIObject(SlotNumber=3, SubslotNumber=1, FrameOffset=15), ], NumberOfIOCS=1, IOCSs=[ IOCRAPIObject(SlotNumber=3, SubslotNumber=1, FrameOffset=4), ], ), ] ) / conf.padding_layer(b'\xef') = IOCRBlockReq response p = p.get_response() p == IOCRBlockRes(IOCRType='OutputCR', IOCRReference=1, FrameID=0x8014) #################################################################### + Check IOCRBlockRes = IOCRBlockRes default values bytes(IOCRBlockRes()) == bytearray.fromhex('810200080100000100018000') = IOCRBlockRes basic example bytes( IOCRBlockRes(IOCRType='InputCR', IOCRReference=2, FrameID=0x8014) ) == bytearray.fromhex('810200080100000100028014') = IOCRBlockRes dissection p = IOCRBlockRes(bytearray.fromhex('810200080100000100028014ef')) p == IOCRBlockRes(IOCRType='InputCR', IOCRReference=2, FrameID=0x8014, block_length=8) / conf.padding_layer(b'\xef') #################################################################### #################################################################### + Check ExpectedSubmoduleBlockReq = ExpectedSubmoduleBlockReq default values bytes(ExpectedSubmoduleBlockReq()) == bytearray.fromhex('0104000401000000') = ExpectedSubmoduleAPI default values bytes(ExpectedSubmoduleAPI()) == bytearray.fromhex('0000000000000000000000000000') = ExpectedSubmodule default values bytes(ExpectedSubmodule()) == bytearray.fromhex('0000000000000000') = ExpectedSubmoduleDataDescription default values bytes(ExpectedSubmoduleDataDescription()) == bytearray.fromhex('000000000000') = ExpectedSubmoduleBlockReq basic example p = ExpectedSubmoduleBlockReq( APIs=[ ExpectedSubmoduleAPI( API=4, SlotNumber=2, ModuleIdentNumber=0x08c4, Submodules=[ ExpectedSubmodule( SubslotNumber=1, SubmoduleIdentNumber=0x0123, SubmoduleProperties_Type='INPUT_OUTPUT', DataDescription=[ ExpectedSubmoduleDataDescription( DataDescription='Output', SubmoduleDataLength=5, LengthIOPS=2, LengthIOCS=0 ), ExpectedSubmoduleDataDescription( DataDescription='Input', SubmoduleDataLength=3, LengthIOPS=1, LengthIOCS=2 ) ] ), ] ), ] ) bytes(p) == \ bytearray.fromhex('0104002601000001') + \ bytearray.fromhex('000000040002000008c400000001') + \ bytearray.fromhex('0001000001230003') + \ bytearray.fromhex('000200050002') + \ bytearray.fromhex('000100030201') = ExpectedSubmoduleBlockReq dissection p = ExpectedSubmoduleBlockReq( bytearray.fromhex('0104002601000001') + \ bytearray.fromhex('000000040002000008c400000001') + \ bytearray.fromhex('0001000001230003') + \ bytearray.fromhex('000200050002') + \ bytearray.fromhex('000100030201') + b'\xef' ) p == ExpectedSubmoduleBlockReq(block_length=38, NumberOfAPIs=1, APIs=[ ExpectedSubmoduleAPI( API=4, SlotNumber=2, ModuleIdentNumber=0x08c4, NumberOfSubmodules=1 ,Submodules=[ ExpectedSubmodule( SubslotNumber=1, SubmoduleIdentNumber=0x0123, SubmoduleProperties_Type='INPUT_OUTPUT', DataDescription=[ ExpectedSubmoduleDataDescription( DataDescription='Output', SubmoduleDataLength=5, LengthIOPS=2, LengthIOCS=0 ), ExpectedSubmoduleDataDescription( DataDescription='Input', SubmoduleDataLength=3, LengthIOPS=1, LengthIOCS=2 ) ] ), ] ), ] ) / conf.padding_layer(b'\xef') #################################################################### #################################################################### + Check AlarmCRBlockReq = AlarmCRBlockReq default values bytes(AlarmCRBlockReq()) == bytearray.fromhex('010300160100000188920000000000010003000300c8c000a000') = AlarmCRBlockReq with transport bytes(AlarmCRBlockReq(AlarmCRProperties_Transport=1)) == bytearray.fromhex('010300160100000108004000000000010003000300c8c000a000') = AlarmCRBlockReq dissection p = AlarmCRBlockReq(bytearray.fromhex('010300160100000188920000000000010003000300c8c000a000')) p[AlarmCRBlockReq].AlarmCRType == 0x0001 p[AlarmCRBlockReq].LocalAlarmReference == 0x0003 = AlarmCRBlockReq response p = p.get_response() p == AlarmCRBlockRes(AlarmCRType=0x0001, LocalAlarmReference=0x0003) + Check AlarmCRBlockRes = AlarmCRBlockRes default values bytes(AlarmCRBlockRes()) == bytearray.fromhex('810300080100000100000000') = AlarmCRBlockRes dissection p = AlarmCRBlockRes(bytearray.fromhex('810300080100000100030000')) p[AlarmCRBlockRes].AlarmCRType == 0x0001 p[AlarmCRBlockRes].LocalAlarmReference == 0x0003 #################################################################### #################################################################### + Check PNIOServiceReqPDU = PNIOServiceReqPDU basic example * PNIOServiceReqPDU must always be placed above a DCE/RPC layer as it requires the endianness field p = DceRpc4() / PNIOServiceReqPDU(blocks=[ Block(load=b'\x01\x02') ]) s = bytes(p) # Remove the random UUID part before comparison assert s[:18] + s[24:] == b'\x04\x00\x00\x00\x10\x00\x00\x00\x00\x00\xa0\xde\x97l\xd1\x11\x82q\x01\x00\xa0\xde\x97l\xd1\x11\x82q\x00\xa0$B\xdf}\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\x1c\x00\x00\x00\x00\x00\x08\x00\x00\x00\x08\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x04\x01\x00\x01\x02' = PNIOServiceReqPDU dissection p = DceRpc4( bytes(bytearray.fromhex('0400000000000000dea000006c9711d18271010203040506dea000016c9711d1827100a02442df7d000000000000000000000000000000000000000000000001000000000000ffffffff001c00000000' + \ '0000000f000000080000000f0000000000000008' + \ '0000000401000102ef') )) bytes(p.payload) == bytes(PNIOServiceReqPDU(args_length=8, args_max=15, max_count=15, actual_count=8, blocks=[ Block(block_length=4, load=b'\x01\x02') ] ) / b'\xef') #################################################################### #################################################################### + Check PNIOServiceResPDU = PNIOServiceResPDU basic example * PNIOServiceResPDU must always be placed above a DCE/RPC layer as it requires the endianness field p = DceRpc4() / PNIOServiceResPDU(blocks=[ Block(load=b'\x01\x02') ]) s = bytes(p) # Remove the random UUID part before comparison assert s[:18] + s[24:] == b'\x04\x02\x00\x00\x10\x00\x00\x00\x00\x00\xa0\xde\x97l\xd1\x11\x82q\x02\x00\xa0\xde\x97l\xd1\x11\x82q\x00\xa0$B\xdf}\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x04\x01\x00\x01\x02' = PNIOServiceResPDU dissection p = DceRpc4( bytes(bytearray.fromhex('0402000000000000dea000006c9711d18271010203040506dea000026c9711d1827100a02442df7d000000000000000000000000000000000000000000000001000000000000ffffffff001c00000000' + \ '00001234000000080000000f0000000000000008' + \ '0000000401000102ef') )) bytes(p.payload) == bytes(PNIOServiceResPDU(status=0x1234, args_length=8, max_count=15, actual_count=8, blocks=[ Block(block_length=4, load=b'\x01\x02') ] ) / b'\xef') #################################################################### ## Some usual examples ## #################################################################### + Check some basic examples #### Connect Request = A PNIO RPC Connect Request p = DceRpc4( endian='little', opnum=0, seqnum=0, object='dea00000-6c97-11d1-8271-010203040506', act_id='01234567-89ab-cdef-0123-456789abcdef' ) / PNIOServiceReqPDU( blocks=[ # AR block ARBlockReq( ARType='IOCARSingle', ARUUID='fedcba98-7654-3210-fedc-ba9876543210', SessionKey=0, CMInitiatorMacAdd='01:02:03:04:05:06', CMInitiatorStationName='plc-1', CMInitiatorObjectUUID='dea00000-6c97-11d1-8271-010203040506', ARProperties_ParametrizationServer='CM_Initator' ), # IO CR input block IOCRBlockReq( IOCRType='InputCR', IOCRReference=1, SendClockFactor=2, ReductionRatio=32, DataLength=40, APIs=[ IOCRAPI( API=0, IODataObjects=[ IOCRAPIObject(SlotNumber=3, SubslotNumber=1,FrameOffset=15), ], IOCSs=[ IOCRAPIObject(SlotNumber=3, SubslotNumber=1,FrameOffset=4), ], ), ] ), # IO CR output block IOCRBlockReq( IOCRType='OutputCR', IOCRReference=2, SendClockFactor=8, ReductionRatio=32, DataLength=52, FrameID=0xffff, APIs=[ IOCRAPI( API=0, IODataObjects=[ IOCRAPIObject(SlotNumber=3, SubslotNumber=1,FrameOffset=0), IOCRAPIObject(SlotNumber=1, SubslotNumber=2,FrameOffset=9), ], ), ] ), # List of expected submodules ExpectedSubmoduleBlockReq( APIs=[ ExpectedSubmoduleAPI( API=0, SlotNumber=3, ModuleIdentNumber=0x08c4, Submodules=[ ExpectedSubmodule( SubslotNumber=1, SubmoduleIdentNumber=0x0124, SubmoduleProperties_Type='INPUT_OUTPUT', DataDescription=[ ExpectedSubmoduleDataDescription( DataDescription='Output', SubmoduleDataLength=3, LengthIOPS=1, LengthIOCS=1 ), ExpectedSubmoduleDataDescription( DataDescription='Input', SubmoduleDataLength=5, LengthIOPS=2, LengthIOCS=0 ) ] ), ] ), ] ), ExpectedSubmoduleBlockReq( APIs=[ ExpectedSubmoduleAPI( API=0, SlotNumber=1, ModuleIdentNumber=0x08c3, Submodules=[ ExpectedSubmodule( SubslotNumber=2, SubmoduleIdentNumber=0x0424, SubmoduleProperties_Type='OUTPUT', DataDescription=[ ExpectedSubmoduleDataDescription( DataDescription='Output', SubmoduleDataLength=5, LengthIOPS=1, LengthIOCS=0 ) ] ), ] ), ] ), ] ) bytes(p) == bytearray.fromhex( '04000000100000000000a0de976cd11182710102030405060100a0de976cd111827100a02442df7d67452301ab89efcd0123456789abcdef0000000001000000000000000000ffffffff250100000000' + \ '1101000011010000110100000000000011010000' + \ '0101003b01000001fedcba9876543210fedcba98765432100000010203040506dea000006c9711d182710102030405060000001103e888920005') + b'plc-1' + \ bytearray.fromhex('0102003e010000010001889200000000002880000002002000010000ffffffff000a000ac0000000000000000001' + \ '00000000000100030001000f0001000300010004' + \ '0102003e0100000200028892000000000034ffff0008002000010000ffffffff000a000ac0000000000000000001' + \ '0000000000020003000100000001000200090000' + \ '0104002601000001000000000003000008c400000001' + \ '0001000001240003' + \ '000200030101' + \ '000100050002' + \ '0104002001000001000000000001000008c300000001' + \ '0002000004240002' + \ '000200050001') #### Write Request = A PNIO RPC Write Request p = DceRpc4( endian='little', opnum=2, seqnum=1, object='dea00000-6c97-11d1-8271-010203040506', act_id='01234567-89ab-cdef-0123-456789abcdef' ) / PNIOServiceReqPDU( blocks=[ IODWriteMultipleReq( seqNum=0, ARUUID='fedcba98-7654-3210-fedc-ba9876543210', blocks=[ IODWriteReq( seqNum=1, API=0, slotNumber=1, subslotNumber=1, index=0x123, ARUUID='fedcba98-7654-3210-fedc-ba9876543210' ) / b'\x01\x02', IODWriteReq( seqNum=2, API=0, slotNumber=3, subslotNumber=1, ARUUID='fedcba98-7654-3210-fedc-ba9876543210' ) / FParametersBlock( F_CRC_Seed=1, F_CRC_Length='CRC-24', F_Source_Add=0xc1, F_Dest_Add=0xc2, F_WD_Time=500, F_Par_CRC=0x1234 ), ] ) ] ) bytes(p) == bytearray.fromhex( '04000000100000000000a0de976cd11182710102030405060100a0de976cd111827100a02442df7d67452301ab89efcd0123456789abcdef0000000001000000010000000200ffffffffe20000000000' + \ 'ce000000ce000000ce00000000000000ce000000' + \ '0008003c01000000fedcba9876543210fedcba9876543210ffffffffffffffff0000e0400000008e' + '00' * 24 + \ '0008003c01000001fedcba9876543210fedcba987654321000000000000100010000012300000002' + '00' * 24 + \ '01020000' + \ '0008003c01000002fedcba9876543210fedcba98765432100000000000030001000001000000000a' + '00' * 24 + \ '484000c100c201f41234') #### PrmEnd control Request = A PNIO RPC PrmEnd Control Request p = DceRpc4( endian='little', opnum=0, seqnum=2, object='dea00000-6c97-11d1-8271-010203040506', act_id='01234567-89ab-cdef-0123-456789abcdef' ) / PNIOServiceReqPDU( blocks=[ IODControlReq(ARUUID='fedcba98-7654-3210-fedc-ba9876543210', SessionKey=0, ControlCommand_PrmEnd=1) ] ) bytes(p) == bytearray.fromhex( '04000000100000000000a0de976cd11182710102030405060100a0de976cd111827100a02442df7d67452301ab89efcd0123456789abcdef0000000001000000020000000000ffffffff340000000000' + \ '2000000020000000200000000000000020000000' + \ '0110001c01000000fedcba9876543210fedcba98765432100000000000010000') #### ApplicationReady control Request = A PNIO RPC PrmEnd Control Request p = DceRpc4( endian='little', opnum=0, seqnum=0, object='dea00000-6c97-11d1-8271-060504030201', act_id='01020304-0506-0708-090a-0b0c0d0e0f00', if_id=RPC_INTERFACE_UUID['UUID_IO_ControllerInterface'], ) / PNIOServiceReqPDU( blocks=[ IODControlReq(ARUUID='fedcba98-7654-3210-fedc-ba9876543210', SessionKey=0, ControlCommand_ApplicationReady=1) ] ) bytes(p) == bytearray.fromhex( '04000000100000000000a0de976cd11182710605040302010200a0de976cd111827100a02442df7d0403020106050807090a0b0c0d0e0f000000000001000000000000000000ffffffff340000000000' + \ '2000000020000000200000000000000020000000' + \ '0112001c01000000fedcba9876543210fedcba98765432100000000000020000') ### PNIO Alarms = PNIO Alarm decoding (Alarm_Low) p = Ether(b'\x00\x11\x22\x33\x44\x55' \ b'\x00\x66\x77\x88\x99\xaa' \ b'\x81\x00\xa0\x00' \ b'\x88\x92' \ b'\xfe\x01' \ b'\x00\x03\x00\x03\x11\x11\xff\xff\xff\xfe\x00\x36' \ b'\x00\x02\x00\x32\x01\x00\x00\x01\x00\x00\x00\x00\x00' \ b'\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00' \ b'\x81\x00\x0f\x00\x00\x08\x01\x00\x00\x00\x00\x00\x00\x02' \ b'\x80\x02\x00\x0f\x2c\x00\x00\x05\x80\x00\x00\x00\x00\x22') assert p[ProfinetIO].frameID == 0xfe01 assert isinstance(p[ProfinetIO].payload, Alarm_Low) assert p[AlarmNotification_Low].block_type == 0x0002 assert isinstance(p[AlarmNotification_Low].AlarmPayload[0], MaintenanceItem) assert p[MaintenanceItem].UserStructureIdentifier == 0x8100 assert isinstance(p[AlarmNotification_Low].AlarmPayload[1], DiagnosisItem) assert p[DiagnosisItem].UserStructureIdentifier == 0x8002 = PNIO Alarm decoding (Alarm_High) p = Ether(b'\x00\x11\x22\x33\x44\x55' \ b'\x00\x66\x77\x88\x99\xaa' \ b'\x81\x00\xa0\x00' \ b'\x88\x92' \ b'\xfc\x01' \ b'\x00\x03\x00\x03\x11\x11\xff\xff\xff\xfe\x00\x36' \ b'\x00\x02\x00\x32\x01\x00\x00\x01\x00\x00\x00\x00\x00' \ b'\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x08\x00' \ b'\x81\x00\x0f\x00\x00\x08\x01\x00\x00\x00\x00\x00\x00\x02' \ b'\x80\x02\x00\x0f\x2c\x00\x00\x05\x80\x00\x00\x00\x00\x22') assert p[ProfinetIO].frameID == 0xfc01 assert isinstance(p[ProfinetIO].payload, Alarm_High) assert p[AlarmNotification_High].block_type == 0x0002 assert isinstance(p[AlarmNotification_High].AlarmPayload[0], MaintenanceItem) assert p[MaintenanceItem].UserStructureIdentifier == 0x8100 assert isinstance(p[AlarmNotification_High].AlarmPayload[1], DiagnosisItem) assert p[DiagnosisItem].UserStructureIdentifier == 0x8002 = PNIO Alarm DiagnosisItem p = AlarmNotification_Low(AlarmPayload=[MaintenanceItem(), DiagnosisItem()]) assert raw(p) == bytearray.fromhex('0002002c0100000000000000000000000000000000000000000000000000000001000000000000000000000000000000') = PNIO Alarm UploadRetrievalItem p = AlarmNotification_Low(AlarmPayload=[MaintenanceItem(), UploadRetrievalItem()]) assert raw(p) == bytearray.fromhex('00020036010000000000000000000000000000000000000000000000000000000100000000000000000000000000010000000000000000000000') = PNIO Alarm iParameterItem p = AlarmNotification_Low(AlarmPayload=[MaintenanceItem(), iParameterItem()]) assert raw(p) == bytearray.fromhex('0002003e0100000000000000000000000000000000000000000000000000000001000000000000000000000000000100000000000000000000000000000000000000') = PNIO Alarm RS_AlarmItem p = AlarmNotification_Low(AlarmPayload=[MaintenanceItem(), RS_AlarmItem()]) assert raw(p) == bytearray.fromhex('0002002801000000000000000000000000000000000000000000000000000000010000000000000000000000') = PNIO Alarm PRAL_AlarmItem p = AlarmNotification_Low(AlarmPayload=[MaintenanceItem(), PRAL_AlarmItem()]) assert raw(p) == bytearray.fromhex('0002002e01000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000') = PNIO PDPortDataAdjust Decoding raw = bytearray.fromhex('0402280000000000dea000006c9711d182710001000305f9dea000016' \ 'c9711d1827100a02442df7d0777bc51ddaa4d07addb7075183fc28b00' \ '00000000000001000000000002ffffffff007c0000000000000000000' \ '000680000004000000000000000688009003c0100000002ba501cd47e' \ '40d3a0b545fd4ac70eb900000000000080020000802f0000002800000' \ '000000000000000000000000000000000000000000002020024010000' \ '00000080020224000c010000000000000100000000021b00080100000' \ '000010000') p = DceRpc4(raw) assert p[PDPortDataAdjust].subslotNumber == 0x8002 assert p[AdjustPeerToPeerBoundary].peerToPeerBoundary == 0x1 assert LINKSTATE_LINK[p[AdjustLinkState].LinkState] == 'Up'