% PFCP tests # Type the following command to launch start the tests: # $ test/run_tests -P "load_contrib('pfcp')" -t test/contrib/pfcp.uts + Build packets & dissect = Verify IEs import scapy.contrib.pfcp as pfcp_mod skip_IEs = [ IE_Base, IE_Compound ] for name, cls in pfcp_mod.__dict__.items(): if name.startswith("IE_") and type(cls) == Packet_metaclass and cls not in skip_IEs: print("testing %s" % name) pkt = cls() bs = bytes(pkt) restored = cls(bs) assert bytes(restored) == bs # TODO: also test packet field equality = Verify PCAPs ~ pcaps # the following can be useful while adding more IE types # (e.g. updating for a newer version of the spec) def command(pkt): f = [] for fn, fv in sorted(pkt.fields.items(), key=lambda item: item[0]): if fn in ("length", "message_type"): continue if fn == "ietype" and not isinstance(pkt, IE_EnterpriseSpecific) and \ not isinstance(pkt, IE_NotImplemented): continue if fn.startswith("num_") or fn.endswith("_length"): continue if fv is None: continue fld = pkt.get_field(fn) if isinstance(fld, ConditionalField) and not fld._evalcond(pkt): continue # if fv == fld.default: # continue if isinstance(fv, (list, dict, set)) and len(fv) == 0: continue if isinstance(fv, Packet): fv = command(fv) elif fld.islist and fld.holds_packets and isinstance(fv, list): fv = "[%s]" % ",".join(map(command, fv)) elif isinstance(fld, FlagsField): fv = int(fv) else: fv = repr(fv) f.append("%s=%s" % (fn, fv)) c = "%s(%s)" % (pkt.__class__.__name__, ", ".join(f)) if not isinstance(pkt.payload, NoPayload): pc = command(pkt.payload) if pc: c += "/" + pc return c broken_ies = set([]) broken_ie_types = set([ cls.ie_type for cls in broken_ies ]) ignore = set([]) def find_raw_or_not_implemented(pkt, prefix=""): if prefix in ignore: return False, False if hasattr(pkt, "IE_list"): prev = None found_any = False for n, ie in enumerate(pkt.IE_list, 1): if type(ie) in broken_ies: return False, False name = "%s-%d-%s" % (prefix, n, type(ie).__name__) found, leaf = find_raw_or_not_implemented(ie, prefix=name) if found: found_any = True if found and leaf: print("gotcha: %s %r" % (prefix, ie)) bs = b"" if prev is not None: bs = bytes(prev) bs += bytes(ie) if prev is not None: prev.show2() ie.show2() print("%s -- bad val: %s" % (prefix, bytes_hex(bs).decode())) if len(bs) > 4: l = bs[2] * 256 + bs[3] if len(bs) >= l + 4: print("bad val (length-limited): %s" % bytes_hex(bs[:l + 4]).decode()) print("bad val (short): %s" % bytes_hex(bytes(ie)).decode()) prev = ie return found_any, False if isinstance(pkt, Raw): bs = bytes(pkt) if len(bs) > 4: ie_type = bs[0] * 256 + bs[1] if ie_type in broken_ie_types: return False, True return True, True if isinstance(pkt, Padding) or isinstance(pkt, IE_NotImplemented): return True, True return False, True def find_mismatching_command(pkt, prefix=""): c = command(pkt) if hasattr(pkt, "IE_list"): for n, ie in enumerate(pkt.IE_list, 1): name = "%s-%d-%s" % (prefix, n, type(ie).__name__) find_mismatching_command(ie, prefix=name) if bytes(eval(c)) != bytes(pkt): print(prefix) print("ORIG: %s" % bytes_hex(bytes(pkt))) print("EVAL: %s" % bytes_hex(bytes(eval(c)))) raise AssertionError("bad command: %s" % c) for n, pkt in enumerate(rdpcap("test/pcaps/pfcp.pcap"), 1): if PFCP in pkt: # if IE_DLBufferingSuggestedPacketCount in pkt: # continue pkt0 = pkt[PFCP] if IE_NotImplemented in pkt0 or Raw in pkt0 or IE_NotImplemented in pkt0 or Padding in pkt0: found, leaf = find_raw_or_not_implemented(pkt, prefix=str(n)) if not found: # ignored continue pkt0.show2() raise AssertionError("IE_NotImplemented / Raw / Padding detected") bs = bytes(pkt0) pkt1 = PFCP(bs) # TODO: diff show2() result c0 = command(pkt0) c1 = command(pkt1) pkt2 = eval(c1) c2 = command(pkt2) if bytes(pkt2) != bs: find_mismatching_command(pkt0, prefix=str(n)) print(bytes_hex(bytes(pkt2))) print(bytes_hex(bs)) raise AssertionError("bytes(pkt2) != bs") if bs != pkt0.original: print(bytes_hex(bs)) print(bytes_hex(pkt0.original)) raise AssertionError("bs != pkt0.original") if bytes(pkt1) != bs: print(bytes_hex(bytes(pkt1))) print(bytes_hex(bs)) raise AssertionError("bytes(pkt1) != bs") if c0 != c1: print("COMMAND MISMATCH:\n----\n%s\n----\n%s\n\n" % (c0, c1)) pkt0.show2() pkt1.show2() print(bytes_hex(bytes(pkt0))) print("packet index: %d\n" % n) raise AssertionError("c0 != c1") if c0 != c2: print("EVAL COMMAND MISMATCH:\n----\n%s\n----\n%s\n\n" % (c0, c2)) pkt0.show2() pkt2.show2() print(bytes_hex(bytes(pkt0))) print("packet index: %d\n" % n) raise AssertionError("c0 != c2") = Build and dissect PFCP Association Setup Request pfcpASReqBytes = hex_bytes("200500160000010000600004e1a47d08003c0006020465726777") pfcpASReq = PFCP(version=1, S=0, seq=1) / \ PFCPAssociationSetupRequest(IE_list=[ IE_RecoveryTimeStamp(timestamp=3785653512), IE_NodeId(id_type="FQDN", id="ergw") ]) # print("%r" % bytes(pfcpASReq)) # print("%r" % pfcpASReqBytes) assert bytes(pfcpASReq) == pfcpASReqBytes pfcpASReq = PFCP(pfcpASReqBytes) assert pfcpASReq.version == 1 assert pfcpASReq.MP == 0 assert pfcpASReq.S == 0 assert pfcpASReq.message_type == 5 assert pfcpASReq.length == 22 ies = pfcpASReq[PFCPAssociationSetupRequest].IE_list assert isinstance(ies[0], IE_RecoveryTimeStamp) assert ies[0].ietype == 96 assert ies[0].length == 4 assert ies[0].timestamp == 3785653512 assert isinstance(ies[1], IE_NodeId) assert ies[1].ietype == 60 assert ies[1].length == 6 assert ies[1].id_type == 2 assert ies[1].id == b"ergw" = Build and dissect PFCP Association Setup Response pfcpASRespBytes = hex_bytes("2006008c00000100001300010100600004e1a47af9002b00020001007400092980ac1201020263708002006448f9767070207631392e30382e312d3339377e673465333431343066612d6469727479206275696c7420627920726f6f74206f6e206275696c646b697473616e64626f7820617420576564204465632031312031353a30323a3535205554432032303139") pfcpASResp = PFCP(version=1, S=0, seq=1) / \ PFCPAssociationSetupResponse(IE_list=[ IE_Cause(cause="Request accepted"), IE_RecoveryTimeStamp(timestamp=3785652985), IE_UPFunctionFeatures( TREU=0, HEEU=0, PFDM=0, FTUP=0, TRST=0, DLBD=0, DDND=0, BUCP=0, spare=0, PFDE=0, FRRT=0, TRACE=0, QUOAC=0, UDBC=0, PDIU=0, EMPU=1), IE_UserPlaneIPResourceInformation( ASSOSI=0, ASSONI=1, TEIDRI=2, V6=0, V4=1, teid_range=0x80, ipv4="172.18.1.2", network_instance="cp"), IE_EnterpriseSpecific( ietype=32770, enterprise_id=18681, data="vpp v19.08.1-397~g4e34140fa-dirty built by root on buildkitsandbox at Wed Dec 11 15:02:55 UTC 2019") ]) pfcpASResp.show2() assert bytes(pfcpASResp) == pfcpASRespBytes pfcpASResp = PFCP(pfcpASRespBytes) assert pfcpASResp.version == 1 assert pfcpASResp.MP == 0 assert pfcpASResp.S == 0 assert pfcpASResp.message_type == 6 assert pfcpASResp.length == 140 ies = pfcpASResp[PFCPAssociationSetupResponse].IE_list assert isinstance(ies[0], IE_Cause) assert ies[0].ietype == 19 assert ies[0].length == 1 assert ies[0].cause == 1 assert isinstance(ies[1], IE_RecoveryTimeStamp) assert ies[1].ietype == 96 assert ies[1].length == 4 assert ies[1].timestamp == 3785652985 assert isinstance(ies[2], IE_UPFunctionFeatures) assert ies[2].ietype == 43 assert ies[2].length == 2 assert ies[2].TREU == 0 assert ies[2].HEEU == 0 assert ies[2].PFDM == 0 assert ies[2].FTUP == 0 assert ies[2].TRST == 0 assert ies[2].DLBD == 0 assert ies[2].DDND == 0 assert ies[2].BUCP == 0 assert ies[2].spare == 0 assert ies[2].PFDE == 0 assert ies[2].FRRT == 0 assert ies[2].TRACE == 0 assert ies[2].QUOAC == 0 assert ies[2].UDBC == 0 assert ies[2].PDIU == 0 assert ies[2].EMPU == 1 assert isinstance(ies[3], IE_UserPlaneIPResourceInformation) assert ies[3].ASSOSI == 0 assert ies[3].ASSONI == 1 assert ies[3].TEIDRI == 2 assert ies[3].V6 == 0 assert ies[3].V4 == 1 assert ies[3].teid_range == 0x80 assert ies[3].ipv4 == "172.18.1.2" assert ies[3].network_instance == b"cp" assert isinstance(ies[4], IE_EnterpriseSpecific) assert ies[4].ietype == 32770 assert ies[4].enterprise_id == 18681 assert ies[4].data == b"vpp v19.08.1-397~g4e34140fa-dirty built by root on buildkitsandbox at Wed Dec 11 15:02:55 UTC 2019" assert pfcpASResp.answers(pfcpASReq) # = Build and dissect PFCP Session Establishment Request pfcpSEReq1Bytes = hex_bytes("2132011300000000000000000000020000030021002c000102006c00040000000200040010002a00010000160007066163636573730003000d002c000101006c00040000000100010038006c000400000002005f000100000200190015000901104c9033ac120102001600030263700014000103003800020002001d00040000006400010057006c000400000001000200350016000706616363657373001700210100001d7065726d6974206f75742069702066726f6d20616e7920746f20616e790014000100003800020001001d00040000fde800510004000000010006001b003e000104002500021000004a00040000003c00510004000000010039000d02ffde7210bf97810aac120101003c0006020465726777") pfcpSEReq1 = PFCP(version=1, S=1, seq=2, seid=0, spare_oct=0) / \ PFCPSessionEstablishmentRequest(IE_list=[ IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=2), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="Access"), IE_NetworkInstance(instance="access"), ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(DROP=1), IE_FAR_Id(id=1) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=2), IE_OuterHeaderRemoval(header="GTP-U/UDP/IPv4"), IE_PDI(IE_list=[ IE_FTEID(V4=1, TEID=0x104c9033, ipv4="172.18.1.2"), IE_NetworkInstance(instance="cp"), IE_SourceInterface(interface="CP-function"), ]), IE_PDR_Id(id=2), IE_Precedence(precedence=100) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=1), IE_PDI(IE_list=[ IE_NetworkInstance(instance="access"), IE_SDF_Filter(FD=1, flow_description="permit out ip from any to any"), IE_SourceInterface(interface="Access"), ]), IE_PDR_Id(id=1), IE_Precedence(precedence=65000), IE_URR_Id(id=1) ]), IE_CreateURR(IE_list=[ IE_MeasurementMethod(EVENT=1), IE_ReportingTriggers(start_of_traffic=1), IE_TimeQuota(quota=60), IE_URR_Id(id=1) ]), IE_FSEID(v4=1, seid=0xffde7210bf97810a, ipv4="172.18.1.1"), IE_NodeId(id_type="FQDN", id="ergw") ]) assert bytes(pfcpSEReq1) == pfcpSEReq1Bytes assert bytes(PFCP(pfcpSEReq1Bytes)) == pfcpSEReq1Bytes pfcpSEReq2Bytes = hex_bytes("213202ba00000000000000000000080000030037002c000102006c00040000000400040026002a000102001600040373676900260015020012687474703a2f2f6578616d706c652e636f6d0003001e002c000102006c0004000000020004000d002a000102001600040373676900030021002c000102006c00040000000300040010002a000100001600070661636365737300030021002c000102006c00040000000100040010002a00010000160007066163636573730001006d006c0004000000040002004b00160007066163636573730017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3420746f2061737369676e65640014000100005d0005020ac00000003800020004001d00040000006400510004000000020001006d006c0004000000020002004b00160007066163636573730017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3220746f2061737369676e65640014000100005d0005020ac00000003800020002001d0004000000c800510004000000010001006a006c0004000000030002004800160004037367690017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3420746f2061737369676e65640014000102005d0005060ac00000003800020003001d00040000006400510004000000020001006a006c0004000000010002004800160004037367690017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3220746f2061737369676e65640014000102005d0005060ac00000003800020001001d0004000000c8005100040000000100060013003e000102002500020000005100040000000200060013003e00010200250002000000510004000000010039000d02ffde7210d971c146ac120101003c0006020465726777") pfcpSEReq2 = PFCP(seq=8) / PFCPSessionEstablishmentRequest(IE_list=[ IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=4), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="SGi-LAN/N6-LAN"), IE_NetworkInstance(instance="sgi"), IE_RedirectInformation(type="URL", address="http://example.com"), ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=2), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="SGi-LAN/N6-LAN"), IE_NetworkInstance(instance="sgi"), ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=3), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="Access"), IE_NetworkInstance(instance="access") ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=1), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="Access"), IE_NetworkInstance(instance="access") ]) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=4), IE_PDI(IE_list=[ IE_NetworkInstance(instance="access"), IE_SDF_Filter( FD=1, flow_description="permit out ip from 198.19.65.4 to assigned"), IE_SourceInterface(interface="Access"), IE_UE_IP_Address(ipv4="10.192.0.0", V4=1) ]), IE_PDR_Id(id=4), IE_Precedence(precedence=100), IE_URR_Id(id=2) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=2), IE_PDI(IE_list=[ IE_NetworkInstance(instance="access"), IE_SDF_Filter(FD=1, flow_description="permit out ip from 198.19.65.2 to assigned"), IE_SourceInterface(interface="Access"), IE_UE_IP_Address(ipv4="10.192.0.0", V4=1) ]), IE_PDR_Id(id=2), IE_Precedence(precedence=200), IE_URR_Id(id=1) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=3), IE_PDI(IE_list=[ IE_NetworkInstance(instance="sgi"), IE_SDF_Filter(FD=1, flow_description="permit out ip from 198.19.65.4 to assigned"), IE_SourceInterface(interface="SGi-LAN/N6-LAN"), IE_UE_IP_Address(ipv4="10.192.0.0", SD=1, V4=1) ]), IE_PDR_Id(id=3), IE_Precedence(precedence=100), IE_URR_Id(id=2) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=1), IE_PDI(IE_list=[ IE_NetworkInstance(instance="sgi"), IE_SDF_Filter(FD=1, flow_description="permit out ip from 198.19.65.2 to assigned"), IE_SourceInterface(interface="SGi-LAN/N6-LAN"), IE_UE_IP_Address(ipv4="10.192.0.0", SD=1, V4=1) ]), IE_PDR_Id(id=1), IE_Precedence(precedence=200), IE_URR_Id(id=1) ]), IE_CreateURR(IE_list=[ IE_MeasurementMethod(VOLUM=1), IE_ReportingTriggers(), IE_URR_Id(id=2) ]), IE_CreateURR(IE_list=[ IE_MeasurementMethod(VOLUM=1), IE_ReportingTriggers(), IE_URR_Id(id=1) ]), IE_FSEID(ipv4="172.18.1.1", v4=1, seid=0xffde7210d971c146), IE_NodeId(id_type="FQDN", id="ergw")]) assert bytes(pfcpSEReq2) == pfcpSEReq2Bytes assert bytes(PFCP(pfcpSEReq2Bytes)) == pfcpSEReq2Bytes pfcpSEReq3Bytes = hex_bytes("213203a10000000000000000000003000003001e002c000102006c0004000000060004000d002a000102001600040373676900030037002c000102006c00040000000400040026002a000102001600040373676900260015020012687474703a2f2f6578616d706c652e636f6d0003001e002c000102006c0004000000020004000d002a000102001600040373676900030021002c000102006c00040000000500040010002a000100001600070661636365737300030021002c000102006c00040000000300040010002a000100001600070661636365737300030021002c000102006c00040000000100040010002a000100001600070661636365737300010042006c000400000006000200200018000354535400160007066163636573730014000100005d0005020ac00000003800020006001d00040000009600510004000000030001006d006c0004000000040002004b00160007066163636573730017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3420746f2061737369676e65640014000100005d0005020ac00000003800020004001d00040000006400510004000000020001006d006c0004000000020002004b00160007066163636573730017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3220746f2061737369676e65640014000100005d0005020ac00000003800020002001d0004000000c800510004000000010001003f006c0004000000050002001d0018000354535400160004037367690014000102005d0005060ac00000003800020005001d00040000009600510004000000030001006a006c0004000000030002004800160004037367690017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3420746f2061737369676e65640014000102005d0005060ac00000003800020003001d00040000006400510004000000020001006a006c0004000000010002004800160004037367690017002e0100002a7065726d6974206f75742069702066726f6d203139382e31392e36352e3220746f2061737369676e65640014000102005d0005060ac00000003800020001001d0004000000c8005100040000000100060013003e000102002500020000005100040000000200060013003e000103002500020000005100040000000300060013003e00010200250002000000510004000000010039000d02ffde7211a5ab800aac120101003c0006020465726777") pfcpSEReq3 = PFCP(seq=3) / \ PFCPSessionEstablishmentRequest(IE_list=[ IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=6), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="SGi-LAN/N6-LAN"), IE_NetworkInstance(instance="sgi") ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=4), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="SGi-LAN/N6-LAN"), IE_NetworkInstance(instance="sgi"), IE_RedirectInformation(type="URL", address="http://example.com") ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=2), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="SGi-LAN/N6-LAN"), IE_NetworkInstance(instance="sgi") ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=5), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="Access"), IE_NetworkInstance(instance="access") ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=3), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="Access"), IE_NetworkInstance(instance="access") ]) ]), IE_CreateFAR(IE_list=[ IE_ApplyAction(FORW=1), IE_FAR_Id(id=1), IE_ForwardingParameters(IE_list=[ IE_DestinationInterface(interface="Access"), IE_NetworkInstance(instance="access") ]) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=6), IE_PDI(IE_list=[ IE_ApplicationId(id="TST"), IE_NetworkInstance(instance="access"), IE_SourceInterface(interface="Access"), IE_UE_IP_Address(ipv4='10.192.0.0', V4=1) ]), IE_PDR_Id(id=6), IE_Precedence(precedence=150), IE_URR_Id(id=3) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=4), IE_PDI(IE_list=[ IE_NetworkInstance(instance="access"), IE_SDF_Filter(FD=1, flow_description="permit out ip from 198.19.65.4 to assigned"), IE_SourceInterface(interface="Access"), IE_UE_IP_Address(ipv4='10.192.0.0', V4=1) ]), IE_PDR_Id(id=4), IE_Precedence(precedence=100), IE_URR_Id(id=2) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=2), IE_PDI(IE_list=[ IE_NetworkInstance(instance="access"), IE_SDF_Filter(FD=1, flow_description="permit out ip from 198.19.65.2 to assigned"), IE_SourceInterface(interface="Access"), IE_UE_IP_Address(ipv4='10.192.0.0', V4=1) ]), IE_PDR_Id(id=2), IE_Precedence(precedence=200), IE_URR_Id(id=1) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=5), IE_PDI(IE_list=[ IE_ApplicationId(id="TST"), IE_NetworkInstance(instance="sgi"), IE_SourceInterface(interface="SGi-LAN/N6-LAN"), IE_UE_IP_Address(ipv4='10.192.0.0', SD=1, V4=1) ]), IE_PDR_Id(id=5), IE_Precedence(precedence=150), IE_URR_Id(id=3) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=3), IE_PDI(IE_list=[ IE_NetworkInstance(instance="sgi"), IE_SDF_Filter(FD=1, flow_description="permit out ip from 198.19.65.4 to assigned"), IE_SourceInterface(interface="SGi-LAN/N6-LAN"), IE_UE_IP_Address(ipv4='10.192.0.0', SD=1, V4=1) ]), IE_PDR_Id(id=3), IE_Precedence(precedence=100), IE_URR_Id(id=2) ]), IE_CreatePDR(IE_list=[ IE_FAR_Id(id=1), IE_PDI(IE_list=[ IE_NetworkInstance(instance="sgi"), IE_SDF_Filter(FD=1, flow_description="permit out ip from 198.19.65.2 to assigned"), IE_SourceInterface(interface="SGi-LAN/N6-LAN"), IE_UE_IP_Address(ipv4='10.192.0.0', SD=1, V4=1) ]), IE_PDR_Id(id=1), IE_Precedence(precedence=200), IE_URR_Id(id=1) ]), IE_CreateURR(IE_list=[ IE_MeasurementMethod(VOLUM=1), IE_ReportingTriggers(), IE_URR_Id(id=2) ]), IE_CreateURR(IE_list=[ IE_MeasurementMethod(VOLUM=1, DURAT=1), IE_ReportingTriggers(), IE_URR_Id(id=3) ]), IE_CreateURR(IE_list=[ IE_MeasurementMethod(VOLUM=1), IE_ReportingTriggers(), IE_URR_Id(id=1) ]), IE_FSEID(ipv4='172.18.1.1', v4=1, seid=0xffde7211a5ab800a), IE_NodeId(id_type="FQDN", id="ergw") ]) assert bytes(pfcpSEReq3) == pfcpSEReq3Bytes assert bytes(PFCP(pfcpSEReq3Bytes)) == pfcpSEReq3Bytes = Build and dissect PFCP Session Establishment Response pfcpSERespBytes = hex_bytes("21330022ffde7210bf97810a0000020000130001010039000d02ffde7210bf97810aac120102") pfcpSEResp = PFCP(version=1, S=1, seq=2, seid=0xffde7210bf97810a) / \ PFCPSessionEstablishmentResponse(IE_list=[ IE_Cause(cause="Request accepted"), IE_FSEID(ipv4="172.18.1.2", v4=1, seid=0xffde7210bf97810a), ]) assert bytes(pfcpSEResp) == pfcpSERespBytes assert bytes(PFCP(pfcpSERespBytes)) == pfcpSERespBytes assert pfcpSEResp.answers(pfcpSEReq1) = Build and dissect PFCP Heartbeat Request pfcpHReqBytes = hex_bytes("2001000c0000030000600004e1a47d08") pfcpHReq = PFCP(version=1, S=0, seq=3) / \ PFCPHeartbeatRequest(IE_list=[ IE_RecoveryTimeStamp(timestamp=3785653512) ]) assert bytes(pfcpHReq) == pfcpHReqBytes assert bytes(PFCP(pfcpHReqBytes)) == pfcpHReqBytes # = Build and dissect PFCP Heartbeat Response pfcpHRespBytes = hex_bytes("2002000c0000030000600004e1a47af9") pfcpHResp = PFCP(version=1, S=0, seq=3) / \ PFCPHeartbeatResponse(IE_list=[ IE_RecoveryTimeStamp(timestamp=3785652985) ]) assert bytes(pfcpHResp) == pfcpHRespBytes assert bytes(PFCP(pfcpHRespBytes)) == pfcpHRespBytes assert pfcpHResp.answers(pfcpHReq) # = Build and dissect PFCP Session Report Request pfcpSRReq1Bytes = hex_bytes("21380034ffde7210bf99c00300006b0000270001020050001f00510004000000010068000400000001003f00021000005d0005020ac00001") pfcpSRReq1 = PFCP(seq=107, version=1, S=1, seid=18437299340760956931) / \ PFCPSessionReportRequest(IE_list=[ IE_ReportType(USAR=1), IE_UsageReport_SRR(IE_list=[ IE_URR_Id(id=1), IE_UR_SEQN(number=1), IE_UsageReportTrigger(START=1), IE_UE_IP_Address(ipv4="10.192.0.1", V4=1) ]) ]) assert bytes(pfcpSRReq1) == pfcpSRReq1Bytes assert bytes(PFCP(pfcpSRReq1Bytes)) == pfcpSRReq1Bytes pfcpSRReq2Bytes = hex_bytes("2138008a0ffde7210bf940000000310000270001020050007500510004000000030068000400000018003f00020100004b0004e1b44787004c0004e1b447910042001907000000000000000000000000000000000000000000000000004300040000000a8003000a48f9e1b4479137cbd8008004000a48f9e1b4478737cbd8008005000a48f9e1b4479137cbd800") pfcpSRReq2 = PFCP(seq=49, seid=1152331208797536256) / \ PFCPSessionReportRequest(IE_list=[ IE_ReportType(USAR=1), IE_UsageReport_SRR(IE_list=[ IE_URR_Id(id=3), IE_UR_SEQN(number=24), IE_UsageReportTrigger(PERIO=1), IE_StartTime(timestamp=3786688391), IE_EndTime(timestamp=3786688401), IE_VolumeMeasurement( DLVOL=1, ULVOL=1, TOVOL=1, total=0, uplink=0, downlink=0), IE_DurationMeasurement(duration=10), IE_EnterpriseSpecific( ietype=32771, enterprise_id=18681, data=b'\xe1\xb4G\x917\xcb\xd8\x00'), IE_EnterpriseSpecific( ietype=32772, enterprise_id=18681, data=b'\xe1\xb4G\x877\xcb\xd8\x00'), IE_EnterpriseSpecific( ietype=32773, enterprise_id=18681, data=b'\xe1\xb4G\x917\xcb\xd8\x00') ]) ]) assert bytes(pfcpSRReq2) == pfcpSRReq2Bytes assert bytes(PFCP(pfcpSRReq2Bytes)) == pfcpSRReq2Bytes pfcpSRReq3Bytes = hex_bytes("21380035a2a2aa9ad7f316fd0000010000270001020050002000510004000000010068000400000000003f0003100000005d000502ac100202") pfcpSRReq3 = PFCP(seq=1, seid=11719116762396169981) / \ PFCPSessionReportRequest(IE_list=[ IE_ReportType(USAR=1), IE_UsageReport_SRR(IE_list=[ IE_URR_Id(id=1), IE_UR_SEQN(number=0), IE_UsageReportTrigger(START=1, extra_data=b'\x00'), IE_UE_IP_Address(ipv4='172.16.2.2', V4=1) ]) ]) assert bytes(pfcpSRReq3) == pfcpSRReq3Bytes assert bytes(PFCP(pfcpSRReq3Bytes)) == pfcpSRReq3Bytes = Build and dissect PFCP Session Report Response pfcpSRRespBytes = hex_bytes("21390011ffde7210bf99c00300006b000013000101") pfcpSRResp = PFCP(version=1, S=1, seq=107, seid=0xffde7210bf99c003) / \ PFCPSessionReportResponse(IE_list=[ IE_Cause(cause="Request accepted") ]) assert bytes(pfcpSRResp) == pfcpSRRespBytes assert bytes(PFCP(pfcpSRRespBytes)) == pfcpSRRespBytes assert pfcpSRResp.answers(pfcpSRReq1) = Build and dissect PFCP Session Modification Request pfcpSMReqBytes = hex_bytes("21340018ffde72125aeb00a300000600004d00080051000400000001") pfcpSMReq = PFCP(pfcpSMReqBytes) pfcpSMReq = PFCP(version=1, seq=6, seid=0xffde72125aeb00a3) / \ PFCPSessionModificationRequest(IE_list=[ IE_QueryURR(IE_list=[IE_URR_Id(id=1)]) ]) assert bytes(pfcpSMReq) == pfcpSMReqBytes assert bytes(PFCP(pfcpSMReqBytes)) == pfcpSMReqBytes = Build and dissect PFCP Session Modification Response pfcpSMRespBytes = hex_bytes("2135008affde72125aeb00a3000006000013000101004e007500510004000000010068000400000000003f00028000004b0004e16e7efa004c0004e16e7efa004200190700000000000000000000000000000000000000000000000000430004000000008003000a48f9e16e7efa05566c008004000a48f9e16e7efa027f08008005000a48f9e16e7efa027f0800") pfcpSMResp = PFCP(version=1, seq=6, seid=0xffde72125aeb00a3) / \ PFCPSessionModificationResponse(IE_list=[ IE_Cause(cause=1), IE_UsageReport_SMR(IE_list=[ IE_URR_Id(id=1), IE_UR_SEQN(number=0), IE_UsageReportTrigger(IMMER=1), IE_StartTime(timestamp=3782115066), IE_EndTime(timestamp=3782115066), IE_VolumeMeasurement(DLVOL=1, ULVOL=1, TOVOL=1), IE_DurationMeasurement(), IE_EnterpriseSpecific(ietype=32771, enterprise_id=18681, data=b'\xe1n~\xfa\x05Vl\x00'), IE_EnterpriseSpecific(ietype=32772, enterprise_id=18681, data=b'\xe1n~\xfa\x02\x7f\x08\x00'), IE_EnterpriseSpecific(ietype=32773, enterprise_id=18681, data=b'\xe1n~\xfa\x02\x7f\x08\x00') ]) ]) assert bytes(pfcpSMResp) == pfcpSMRespBytes assert bytes(PFCP(pfcpSMRespBytes)) == pfcpSMRespBytes assert pfcpSMResp.answers(pfcpSMReq) = Verify IEs from difflib import unified_diff cases = [ dict( hex="0054000a0100010000000a177645", expect=IE_OuterHeaderCreation(GTPUUDPIPV4=1, TEID=0x01000000, ipv4="10.23.118.69")), dict( hex="002900050461626364", expect=IE_ForwardingPolicy(policy_identifier="abcd")), dict( hex="002e0001ae", expect=IE_DownlinkDataNotificationDelay(delay=174)), dict( hex="003d00020000", expect=IE_PFDContents()), dict( hex="005e00070300205903e95d", expect=IE_PacketRate(ULPR=1, DLPR=1, ul_time_unit="minute", ul_max_packet_rate=8281, dl_time_unit="day", dl_max_packet_rate=59741)), dict( hex="00850007010906638dccd5", expect=IE_MACAddress(SOUR=1, source_mac="09:06:63:8d:cc:d5")), dict( hex="00540014080017d0bd69dceb747a1e036c0f9c8d4af115d0", expect=IE_OuterHeaderCreation(UDPIPV6=1, ipv6="17d0:bd69:dceb:747a:1e03:6c0f:9c8d:4af1", port=5584)), dict( hex="006700050280df69b2", expect=IE_RemoteGTP_U_Peer(V4=1, ipv4="128.223.105.178")), ] for case in cases: bs = hex_bytes(case["hex"]) exp = case["expect"] dissected = type(exp)(bs) exp_text = exp.show2(dump=True) dissected_text = dissected.show2(dump=True) if exp_text != dissected_text: print("---\n%s\n---\n%s\n" % (exp_text, dissected_text)) for line in unified_diff(exp_text.split("\n"), dissected_text.split("\n"), fromfile="expected", tofile="dissected"): print(line) raise AssertionError("text mismatch") assert bytes(dissected) == bs assert bytes(exp) == bs # from difflib import unified_diff # expected = PFCP(pfcpSRReq2Bytes).show2(dump=True).split("\n") # actual = pfcpSRReq2.show2(dump=True).split("\n") # for line in unified_diff(expected, actual, fromfile="expected", tofile="actual"): # print(line)