1# OAM unit tests 2# 3# Type the following command to launch start the tests: 4# $ test/run_tests -P "load_contrib('oam')" -t test/contrib/oam.uts 5 6+ TLV 7 8= Generic TLV 9 10pkt = OAM_TLV(raw(OAM_TLV()/Raw(b'123'))) 11assert pkt.type == 1 12assert pkt.length == 3 13 14= Data TLV 15 16pkt = OAM_DATA_TLV(raw(OAM_DATA_TLV()/Raw(b'123'))) 17assert pkt.type == 3 18assert pkt.length == 3 19 20= Test TLV 21 22from binascii import crc32 23 24pkt = OAM_TEST_TLV(raw(OAM_TEST_TLV(pat_type="Null signal without CRC-32")/Raw(b'123'))) 25assert pkt.type == 32 26assert pkt.length == 4 27assert raw(pkt.payload) == b'123' 28pkt = OAM_TEST_TLV(raw(OAM_TEST_TLV(pat_type="Null signal with CRC-32")/Raw(b'123'))) 29assert pkt.type == 32 30assert pkt.length == 8 31assert pkt.crc == crc32(raw(pkt)[:-4]) % (1 << 32) 32assert pkt.crc == 0xad147086 33assert raw(pkt.payload) == b'123' 34pkt = OAM_TEST_TLV(raw(OAM_TEST_TLV(pat_type="PRBS 2^-31 - 1 without CRC-32")/Raw(b'123'))) 35assert pkt.type == 32 36assert pkt.length == 4 37assert raw(pkt.payload) == b'123' 38pkt = OAM_TEST_TLV(raw(OAM_TEST_TLV(pat_type="PRBS 2^-31 - 1 with CRC-32")/Raw(b'123'))) 39assert pkt.type == 32 40assert pkt.length == 8 41assert pkt.crc == crc32(raw(pkt)[:-4]) % (1 << 32) 42assert pkt.crc == 0x71db80d 43assert raw(pkt.payload) == b'123' 44 45= LTM TLV 46 47pkt = OAM_LTM_TLV(raw(OAM_LTM_TLV(egress_id=3)/Raw(b'123'))) 48assert pkt.type == 7 49assert pkt.length == 8 50assert pkt.egress_id == 3 51 52= LTR TLV 53 54pkt = OAM_LTR_TLV(raw(OAM_LTR_TLV(last_egress_id=2, next_egress_id=4)/Raw(b'123'))) 55assert pkt.type == 8 56assert pkt.length == 16 57assert pkt.last_egress_id == 2 58assert pkt.next_egress_id == 4 59 60= LTR IG TLV 61 62pkt = OAM_LTR_IG_TLV(raw(OAM_LTR_IG_TLV(ingress_act=2, ingress_mac="00:11:22:33:44:55")/Raw(b'123'))) 63assert pkt.type == 5 64assert pkt.length == 7 65assert pkt.ingress_act == 2 66assert pkt.ingress_mac == "00:11:22:33:44:55" 67 68= LTR EG TLV 69 70pkt = OAM_LTR_EG_TLV(raw(OAM_LTR_EG_TLV(egress_act=2, egress_mac="00:11:22:33:44:55")/Raw(b'123'))) 71assert pkt.type == 6 72assert pkt.length == 7 73assert pkt.egress_act == 2 74assert pkt.egress_mac == "00:11:22:33:44:55" 75 76= TEST ID TLV 77 78pkt = OAM_TEST_ID_TLV(raw(OAM_TEST_ID_TLV(test_id=1)/Raw(b'123'))) 79assert pkt.type == 36 80assert pkt.length == 32 81assert pkt.test_id == 1 82 83= PTP TIMESTAMP 84 85pkt = PTP_TIMESTAMP(raw(PTP_TIMESTAMP(seconds=5, nanoseconds=10)/Raw(b'123'))) 86assert pkt.seconds == 5 87assert pkt.nanoseconds == 10 88 89= APS 90 91pkt = APS(raw(APS(req_st="Wait-to-restore (WTR)", 92 prot_type="D+A", 93 req_sig="Normal traffic", 94 br_sig="Normal traffic", 95 br_type="T")/Raw(b'123'))) 96assert pkt.req_st == 0b0101 97assert pkt.prot_type == 0b1010 98assert pkt.req_sig == 1 99assert pkt.br_sig == 1 100assert pkt.br_type == 0b10000000 101 102= RAPS 103 104pkt = RAPS(raw(RAPS(req_st="Signal fail(SF)", 105 status="RB+BPR", 106 node_id="00:11:22:33:44:55")/Raw(b'123'))) 107assert pkt.req_st == 0b1011 108assert pkt.sub_code == 0b0000 109assert pkt.status == 0b10100000 110assert pkt.node_id == "00:11:22:33:44:55" 111 112+ MEG ID 113 114= MEG ID 115 116pkt = MegId(raw(MegId(format=1, 117 values=int(0xdeadbeef)))) 118assert pkt.format == 1 119# FIXME: make compatible with python2 120# assert pkt.values.to_bytes(45, "little")[-4:] == b"\xde\xad\xbe\xef" 121assert pkt.length == 45 122assert len(raw(pkt)) == 48 123 124= MEG ICC ID 125 126pkt = MegId(raw(MegId(format=32, 127 values=list(range(13))))) 128 129assert pkt.format == 32 130assert pkt.values == list(range(13)) 131assert pkt.length == 13 132assert len(raw(pkt)) == 48 133 134= MEG ICC and CC ID 135 136pkt = MegId(raw(MegId(format=33, 137 values=list(range(15))))) 138 139assert pkt.format == 33 140assert pkt.values == list(range(15)) 141assert pkt.length == 15 142assert len(raw(pkt)) == 48 143 144+ OAM 145~ tshark 146 147= Define check_tshark function 148 149def check_tshark(pkt, string): 150 import tempfile, os 151 fd, pcapfilename = tempfile.mkstemp() 152 wrpcap(pcapfilename, pkt) 153 rv = tcpdump(pcapfilename, prog=conf.prog.tshark, getfd=True, args=['-Y', 'cfm'], dump=True, wait=True) 154 assert string in rv.decode("utf8") 155 os.close(fd) 156 os.unlink(pcapfilename) 157 158= CCM 159 160pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 161 OAM(opcode="Continuity Check Message (CCM)", 162 flags="RDI", 163 period="Trans Int 10s", 164 mep_id=0xffff, 165 meg_id=MegId(format=32, 166 values=list(range(13))), 167 txfcf=1, 168 rxfcb=2, 169 txfcb=3))) 170 171assert pkt[OAM].opcode == 1 172assert pkt[OAM].period == 5 173assert pkt[OAM].tlv_offset == 70 174assert pkt[OAM].flags.RDI == True 175assert pkt[OAM].flags == 1<<4 176assert pkt[OAM].mep_id == 0xffff 177assert pkt[OAM].meg_id.format == 32 178assert pkt[OAM].meg_id.length == 13 179assert pkt[OAM].meg_id.values == list(range(13)) 180assert pkt[OAM].txfcf == 1 181assert pkt[OAM].rxfcb == 2 182assert pkt[OAM].txfcb == 3 183 184check_tshark(pkt, "(CCM)") 185 186= LBM 187 188pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 189 OAM(opcode="Loopback Message (LBM)", 190 seq_num=33, 191 tlvs=[OAM_DATA_TLV()/Raw(b'123'), 192 OAM_DATA_TLV()/Raw(b'456'), 193 OAM_DATA_TLV()/Raw(b'789')]))) 194 195assert pkt[OAM].opcode == 3 196assert pkt[OAM].tlv_offset == 4 197assert pkt[OAM].seq_num == 33 198for i in range(3): 199 assert pkt[OAM].tlvs[i].type == 3 200 assert pkt[OAM].tlvs[i].length == 3 201 202assert raw(pkt[OAM].tlvs[0].payload) == b'123' 203assert raw(pkt[OAM].tlvs[1].payload) == b'456' 204assert raw(pkt[OAM].tlvs[2].payload) == b'789' 205 206check_tshark(pkt, "(LBM)") 207 208= LTM 209 210pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 211 OAM(opcode="Linktrace Message (LTM)", 212 trans_id=12, 213 ttl=21, 214 flags="HWonly", 215 orig_mac="12:34:56:78:90:11", 216 targ_mac="12:34:56:78:90:22", 217 tlvs=[OAM_LTM_TLV(egress_id=12)]))) 218 219assert pkt[OAM].opcode == 5 220assert pkt[OAM].tlv_offset == 17 221assert pkt[OAM].ttl == 21 222assert pkt[OAM].flags.HWonly == True 223assert pkt[OAM].flags == 1<<7 224assert pkt[OAM].orig_mac == "12:34:56:78:90:11" 225assert pkt[OAM].targ_mac == "12:34:56:78:90:22" 226assert pkt[OAM].tlvs[0].type == 7 227assert pkt[OAM].tlvs[0].length == 8 228assert pkt[OAM].tlvs[0].egress_id == 12 229 230check_tshark(pkt, "(LTM)") 231 232= LTR 233 234pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 235 OAM(opcode="Linktrace Reply (LTR)", 236 trans_id=21, 237 ttl=12, 238 flags="HWonly+TerminalMEP", 239 relay_act=8, 240 tlvs=[OAM_LTR_TLV(last_egress_id=1, next_egress_id=2), 241 OAM_LTR_TLV(last_egress_id=3, next_egress_id=4), 242 OAM_LTR_IG_TLV(ingress_act=1, ingress_mac="12:34:56:78:90:11"), 243 OAM_LTR_IG_TLV(ingress_act=6, ingress_mac="12:34:56:78:90:22"), 244 OAM_LTR_EG_TLV(egress_act=2, egress_mac="12:34:56:78:90:33"), 245 OAM_LTR_EG_TLV(egress_act=3, egress_mac="12:34:56:78:90:44")]))) 246 247assert pkt[OAM].opcode == 4 248assert pkt[OAM].tlv_offset == 6 249assert pkt[OAM].ttl == 12 250assert pkt[OAM].flags.HWonly == True 251assert pkt[OAM].flags.FwdYes == False 252assert pkt[OAM].flags.TerminalMEP == True 253assert pkt[OAM].flags == (1<<7) | (1<<5) 254assert pkt[OAM].relay_act == 8 255assert pkt[OAM].tlvs[0].type == 8 256assert pkt[OAM].tlvs[0].length == 16 257assert pkt[OAM].tlvs[0].last_egress_id == 1 258assert pkt[OAM].tlvs[0].next_egress_id == 2 259assert pkt[OAM].tlvs[1].type == 8 260assert pkt[OAM].tlvs[1].length == 16 261assert pkt[OAM].tlvs[1].last_egress_id == 3 262assert pkt[OAM].tlvs[1].next_egress_id == 4 263assert pkt[OAM].tlvs[2].type == 5 264assert pkt[OAM].tlvs[2].length == 7 265assert pkt[OAM].tlvs[2].ingress_act == 1 266assert pkt[OAM].tlvs[2].ingress_mac == "12:34:56:78:90:11" 267assert pkt[OAM].tlvs[3].type == 5 268assert pkt[OAM].tlvs[3].length == 7 269assert pkt[OAM].tlvs[3].ingress_act == 6 270assert pkt[OAM].tlvs[3].ingress_mac == "12:34:56:78:90:22" 271assert pkt[OAM].tlvs[4].type == 6 272assert pkt[OAM].tlvs[4].length == 7 273assert pkt[OAM].tlvs[4].egress_act == 2 274assert pkt[OAM].tlvs[4].egress_mac == "12:34:56:78:90:33" 275assert pkt[OAM].tlvs[5].type == 6 276assert pkt[OAM].tlvs[5].length == 7 277assert pkt[OAM].tlvs[5].egress_act == 3 278assert pkt[OAM].tlvs[5].egress_mac == "12:34:56:78:90:44" 279 280check_tshark(pkt, "(LTR)") 281 282= AIS 283 284pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 285 OAM(opcode="Alarm Indication Signal (AIS)", 286 period="1 frame per second"))) 287 288assert pkt[OAM].opcode == 33 289assert pkt[OAM].tlv_offset == 0 290assert pkt[OAM].period == 0b100 291 292check_tshark(pkt, "(AIS)") 293 294= LCK 295 296pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 297 OAM(opcode="Lock Signal (LCK)", 298 period="1 frame per second"))) 299 300assert pkt[OAM].opcode == 35 301assert pkt[OAM].tlv_offset == 0 302assert pkt[OAM].period == 0b100 303 304check_tshark(pkt, "(LCK)") 305 306= TST 307 308pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 309 OAM(opcode="Test Signal (TST)", 310 seq_num=15, 311 tlvs=[OAM_TEST_TLV(pat_type="Null signal without CRC-32")/Raw(b'123'), 312 OAM_TEST_TLV(pat_type="Null signal without CRC-32")/Raw(b'23456'), 313 OAM_TEST_TLV(pat_type="Null signal with CRC-32")/Raw(b'123'), 314 OAM_TEST_TLV(pat_type="Null signal with CRC-32")/Raw(b'23456'), 315 OAM_TEST_TLV(pat_type="PRBS 2^-31 - 1 without CRC-32")/Raw(b'123'), 316 OAM_TEST_TLV(pat_type="PRBS 2^-31 - 1 without CRC-32")/Raw(b'23456'), 317 OAM_TEST_TLV(pat_type="PRBS 2^-31 - 1 with CRC-32")/Raw(b'123'), 318 OAM_TEST_TLV(pat_type="PRBS 2^-31 - 1 with CRC-32")/Raw(b'23456')]))) 319 320assert pkt[OAM].opcode == 37 321assert pkt[OAM].tlv_offset == 4 322assert pkt[OAM].seq_num == 15 323 324assert pkt[OAM].tlvs[0].type == 32 325assert pkt[OAM].tlvs[0].length == 4 326assert pkt[OAM].tlvs[0].pat_type == 0 327assert raw(pkt[OAM].tlvs[0].payload) == b'123' 328assert pkt[OAM].tlvs[1].type == 32 329assert pkt[OAM].tlvs[1].length == 6 330assert pkt[OAM].tlvs[1].pat_type == 0 331assert raw(pkt[OAM].tlvs[1].payload) == b'23456' 332assert pkt[OAM].tlvs[2].type == 32 333assert pkt[OAM].tlvs[2].length == 8 334assert pkt[OAM].tlvs[2].pat_type == 1 335assert raw(pkt[OAM].tlvs[2].payload) == b'123' 336assert pkt[OAM].tlvs[2].crc == crc32(raw(pkt[OAM].tlvs[2])[:-4]) % (1 << 32) 337assert pkt[OAM].tlvs[3].type == 32 338assert pkt[OAM].tlvs[3].length == 10 339assert pkt[OAM].tlvs[3].pat_type == 1 340assert raw(pkt[OAM].tlvs[3].payload) == b'23456' 341assert pkt[OAM].tlvs[3].crc == crc32(raw(pkt[OAM].tlvs[3])[:-4]) % (1 << 32) 342assert pkt[OAM].tlvs[4].type == 32 343assert pkt[OAM].tlvs[4].length == 4 344assert pkt[OAM].tlvs[4].pat_type == 2 345assert raw(pkt[OAM].tlvs[4].payload) == b'123' 346assert pkt[OAM].tlvs[5].type == 32 347assert pkt[OAM].tlvs[5].length == 6 348assert pkt[OAM].tlvs[5].pat_type == 2 349assert raw(pkt[OAM].tlvs[5].payload) == b'23456' 350assert pkt[OAM].tlvs[6].type == 32 351assert pkt[OAM].tlvs[6].length == 8 352assert pkt[OAM].tlvs[6].pat_type == 3 353assert raw(pkt[OAM].tlvs[6].payload) == b'123' 354assert pkt[OAM].tlvs[6].crc == crc32(raw(pkt[OAM].tlvs[6])[:-4]) % (1 << 32) 355assert pkt[OAM].tlvs[7].type == 32 356assert pkt[OAM].tlvs[7].length == 10 357assert pkt[OAM].tlvs[7].pat_type == 3 358assert raw(pkt[OAM].tlvs[7].payload) == b'23456' 359assert pkt[OAM].tlvs[7].crc == crc32(raw(pkt[OAM].tlvs[7])[:-4]) % (1 << 32) 360 361check_tshark(pkt, "(TST)") 362 363= APS 364 365pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 366 OAM(opcode="Automatic Protection Switching (APS)", 367 aps=APS(req_st="Forced switch (FS)", 368 prot_type="A+B+R", 369 req_sig="Normal traffic", 370 br_sig="Null signal", 371 br_type="T")))) 372 373assert pkt[OAM].opcode == 39 374assert pkt[APS].req_st == 0b1101 375assert pkt[APS].prot_type.A == True 376assert pkt[APS].prot_type.B == True 377assert pkt[APS].prot_type.R == True 378assert pkt[APS].prot_type == 0b1101 379assert pkt[APS].req_sig == 1 380assert pkt[APS].br_sig == 0 381assert pkt[APS].br_type.T == True 382assert pkt[APS].br_type == (1 << 7) 383 384check_tshark(pkt, "(APS)") 385 386= RAPS 387 388pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 389 OAM(opcode="Ring-Automatic Protection Switching (R-APS)", 390 raps=RAPS(req_st="Event", 391 sub_code="Flush", 392 status="RB+BPR", 393 node_id="12:12:12:23:23:23")))) 394 395assert pkt[OAM].opcode == 40 396assert pkt[RAPS].req_st == 0b1110 397assert pkt[RAPS].sub_code == 0b0000 398assert pkt[RAPS].status.RB == True 399assert pkt[RAPS].status.DNF == False 400assert pkt[RAPS].status.BPR == True 401assert pkt[RAPS].status == (1 << 7) | (1 << 5) 402assert pkt[RAPS].node_id == "12:12:12:23:23:23" 403 404check_tshark(pkt, "(R-APS)") 405 406= MCC 407 408pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 409 OAM(opcode="Maintenance Communication Channel (MCC)", 410 oui=12, 411 subopcode=2))) 412 413assert pkt[OAM].opcode == 41 414assert pkt[OAM].oui == 12 415assert pkt[OAM].subopcode == 2 416 417check_tshark(pkt, "(MCC)") 418 419= LMM 420 421pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 422 OAM(opcode="Loss Measurement Message (LMM)", 423 flags="Proactive", 424 txfcf=1, 425 rxfcf=2, 426 txfcb=3))) 427 428assert pkt[OAM].opcode == 43 429assert pkt[OAM].version == 1 430assert pkt[OAM].tlv_offset == 12 431assert pkt[OAM].flags == 1 432assert pkt[OAM].flags.Proactive == True 433assert pkt[OAM].txfcf == 1 434assert pkt[OAM].rxfcf == 2 435assert pkt[OAM].txfcb == 3 436 437check_tshark(pkt, "(LMM)") 438 439= LMR 440 441pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 442 OAM(opcode="Loss Measurement Reply (LMR)", 443 txfcf=1, 444 rxfcf=2, 445 txfcb=3))) 446 447assert pkt[OAM].opcode == 42 448assert pkt[OAM].txfcf == 1 449assert pkt[OAM].rxfcf == 2 450assert pkt[OAM].txfcb == 3 451 452check_tshark(pkt, "(LMR)") 453 454= 1DM 455 456pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 457 OAM(opcode="One Way Delay Measurement (1DM)", 458 txtsf=PTP_TIMESTAMP(seconds=1, nanoseconds=2), 459 rxtsf=PTP_TIMESTAMP(seconds=3, nanoseconds=4), 460 tlvs=[OAM_DATA_TLV()/Raw(b'123'), 461 OAM_DATA_TLV()/Raw(b'456789'), 462 OAM_TEST_ID_TLV(test_id=5)]))) 463 464assert pkt[OAM].opcode == 45 465assert pkt[OAM].version == 1 466assert pkt[OAM].tlv_offset == 16 467assert pkt[OAM].txtsf.seconds == 1 468assert pkt[OAM].txtsf.nanoseconds == 2 469assert pkt[OAM].rxtsf.seconds == 3 470assert pkt[OAM].rxtsf.nanoseconds == 4 471assert pkt[OAM].tlvs[0].type == 3 472assert pkt[OAM].tlvs[0].length == 3 473assert raw(pkt[OAM].tlvs[0].payload) == b'123' 474assert pkt[OAM].tlvs[1].type == 3 475assert pkt[OAM].tlvs[1].length == 6 476assert raw(pkt[OAM].tlvs[1].payload) == b'456789' 477assert pkt[OAM].tlvs[2].type == 36 478assert pkt[OAM].tlvs[2].length == 32 479assert pkt[OAM].tlvs[2].test_id == 5 480 481# FIXME: for some reason wireshark does not like OAM_TEST_ID_TLV here 482check_tshark(pkt, "(1DM)") 483 484= DMM 485 486pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 487 OAM(opcode="Delay Measurement Message (DMM)", 488 txtsf=PTP_TIMESTAMP(seconds=1, nanoseconds=2), 489 txtsb=PTP_TIMESTAMP(seconds=2, nanoseconds=1), 490 rxtsf=PTP_TIMESTAMP(seconds=3, nanoseconds=4), 491 rxtsb=PTP_TIMESTAMP(seconds=6, nanoseconds=5), 492 tlvs=[OAM_DATA_TLV()/Raw(b'123'), 493 OAM_DATA_TLV()/Raw(b'456789'), 494 OAM_TEST_ID_TLV(test_id=5)]))) 495 496assert pkt[OAM].opcode == 47 497assert pkt[OAM].version == 1 498assert pkt[OAM].tlv_offset == 32 499assert pkt[OAM].txtsf.seconds == 1 500assert pkt[OAM].txtsf.nanoseconds == 2 501assert pkt[OAM].rxtsf.seconds == 3 502assert pkt[OAM].rxtsf.nanoseconds == 4 503assert pkt[OAM].txtsb.seconds == 2 504assert pkt[OAM].txtsb.nanoseconds == 1 505assert pkt[OAM].rxtsb.seconds == 6 506assert pkt[OAM].rxtsb.nanoseconds == 5 507assert pkt[OAM].tlvs[0].type == 3 508assert pkt[OAM].tlvs[0].length == 3 509assert raw(pkt[OAM].tlvs[0].payload) == b'123' 510assert pkt[OAM].tlvs[1].type == 3 511assert pkt[OAM].tlvs[1].length == 6 512assert raw(pkt[OAM].tlvs[1].payload) == b'456789' 513assert pkt[OAM].tlvs[2].type == 36 514assert pkt[OAM].tlvs[2].length == 32 515assert pkt[OAM].tlvs[2].test_id == 5 516 517# FIXME: for some reason wireshark does not like OAM_TEST_ID_TLV here 518check_tshark(pkt, "(DMM)") 519 520= EXM 521 522pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 523 OAM(opcode="Experimental OAM Message (EXM)", 524 oui=123, 525 subopcode=33))) 526 527assert pkt[OAM].opcode == 49 528assert pkt[OAM].oui == 123 529assert pkt[OAM].subopcode == 33 530 531check_tshark(pkt, "(EXM)") 532 533= EXR 534 535pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 536 OAM(opcode="Experimental OAM Reply (EXR)", 537 oui=123, 538 subopcode=33))) 539 540assert pkt[OAM].opcode == 48 541assert pkt[OAM].oui == 123 542assert pkt[OAM].subopcode == 33 543 544check_tshark(pkt, "(EXR)") 545 546= VSM 547 548pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 549 OAM(opcode="Vendor Specific Message (VSM)", 550 oui=123, 551 subopcode=33))) 552 553assert pkt[OAM].opcode == 51 554assert pkt[OAM].oui == 123 555assert pkt[OAM].subopcode == 33 556 557check_tshark(pkt, "(VSM)") 558 559= CSF 560 561pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 562 OAM(opcode="Client Signal Fail (CSF)", 563 flags="RDI", 564 period="1 frame per minute"))) 565 566assert pkt[OAM].opcode == 52 567assert pkt[OAM].tlv_offset == 0 568assert pkt[OAM].flags == 0b010 569assert pkt[OAM].period == 0b110 570 571check_tshark(pkt, "(CSF)") 572 573= SLM 574 575pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 576 OAM(opcode="Synthetic Loss Message (SLM)", 577 test_id=11, 578 src_mep_id=12, 579 rcv_mep_id=34, 580 txfcf=3, 581 txfcb=9, 582 tlvs=[OAM_DATA_TLV()/Raw(b'123'), 583 OAM_DATA_TLV()/Raw(b'456789')]))) 584 585assert pkt[OAM].opcode == 55 586assert pkt[OAM].tlv_offset == 16 587assert pkt[OAM].test_id == 11 588assert pkt[OAM].src_mep_id == 12 589assert pkt[OAM].rcv_mep_id == 34 590assert pkt[OAM].txfcf == 3 591assert pkt[OAM].txfcb == 9 592assert pkt[OAM].tlvs[0].type == 3 593assert pkt[OAM].tlvs[0].length == 3 594assert raw(pkt[OAM].tlvs[0].payload) == b'123' 595assert pkt[OAM].tlvs[1].type == 3 596assert pkt[OAM].tlvs[1].length == 6 597assert raw(pkt[OAM].tlvs[1].payload) == b'456789' 598 599check_tshark(pkt, "(SLM)") 600 601= SLR 602 603pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 604 OAM(opcode="Synthetic Loss Reply (SLR)", 605 test_id=11, 606 src_mep_id=12, 607 rcv_mep_id=34, 608 txfcf=3, 609 txfcb=9, 610 tlvs=[OAM_DATA_TLV()/Raw(b'123'), 611 OAM_DATA_TLV()/Raw(b'456789')]))) 612 613assert pkt[OAM].opcode == 54 614assert pkt[OAM].tlv_offset == 16 615assert pkt[OAM].test_id == 11 616assert pkt[OAM].src_mep_id == 12 617assert pkt[OAM].rcv_mep_id == 34 618assert pkt[OAM].txfcf == 3 619assert pkt[OAM].txfcb == 9 620assert pkt[OAM].tlvs[0].type == 3 621assert pkt[OAM].tlvs[0].length == 3 622assert raw(pkt[OAM].tlvs[0].payload) == b'123' 623assert pkt[OAM].tlvs[1].type == 3 624assert pkt[OAM].tlvs[1].length == 6 625assert raw(pkt[OAM].tlvs[1].payload) == b'456789' 626 627check_tshark(pkt, "(SLR)") 628 629= 1SL 630 631pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 632 OAM(opcode="One Way Synthetic Loss Measurement (1SL)", 633 test_id=11, 634 src_mep_id=12, 635 txfcf=3, 636 tlvs=[OAM_DATA_TLV()/Raw(b'123'), 637 OAM_DATA_TLV()/Raw(b'456789')]))) 638 639assert pkt[OAM].opcode == 53 640assert pkt[OAM].tlv_offset == 16 641assert pkt[OAM].test_id == 11 642assert pkt[OAM].src_mep_id == 12 643assert pkt[OAM].txfcf == 3 644assert pkt[OAM].tlvs[0].type == 3 645assert pkt[OAM].tlvs[0].length == 3 646assert raw(pkt[OAM].tlvs[0].payload) == b'123' 647assert pkt[OAM].tlvs[1].type == 3 648assert pkt[OAM].tlvs[1].length == 6 649assert raw(pkt[OAM].tlvs[1].payload) == b'456789' 650 651check_tshark(pkt, "(1SL)") 652 653= GNM 654 655pkt = Ether(raw(Ether(dst="00:11:22:33:44:55")/Dot1Q()/ 656 OAM(opcode="Generic Notification Message (GNM)", 657 period="1 frame per minute", 658 nom_bdw=1, 659 curr_bdw=2, 660 port_id=3))) 661 662assert pkt[OAM].opcode == 32 663assert pkt[OAM].tlv_offset == 13 664assert pkt[OAM].period == 0b110 665assert pkt[OAM].subopcode == 1 666assert pkt[OAM].nom_bdw == 1 667assert pkt[OAM].curr_bdw == 2 668assert pkt[OAM].port_id == 3 669 670check_tshark(pkt, "(GNM)") 671