#!/usr/bin/env python3 # # Copyright (c) 2019, The OpenThread Authors. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of the copyright holder nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # This is a test script for checking layer fields against a given test.pcap. # import logging import unittest from pktverify import layer_fields from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr from pktverify.bytes import Bytes from pktverify.consts import REAL_LAYER_NAMES, VALID_LAYER_NAMES from pktverify.layer_fields_container import LayerFieldsContainer from pktverify.null_field import nullField from pktverify.packet import Packet from pktverify.pcap_reader import PcapReader class TestLayerFields(unittest.TestCase): def test(self): logging.basicConfig(level=logging.DEBUG) pkts = PcapReader.read("test.pcap") for i, p in enumerate(pkts): logging.info("check packet #%d", i + 1) for layer_name in VALID_LAYER_NAMES: if layer_name == 'lowpan': # we already checked 6lowpan continue layer = getattr(p, layer_name) if hasattr(p._packet, layer_name): if layer_name in REAL_LAYER_NAMES: self.assertTrue(layer) checker = getattr(self, '_test_' + layer_name, None) if checker is None: continue try: checker(p) except Exception: layer.show() raise else: if layer_name in REAL_LAYER_NAMES: self.assertFalse(layer) for layer in p._packet.layers: self._check_missing_fields(p, layer.layer_name, layer) def _test_coap(self, p): coap = p.coap self.assertIsInstance(coap.version, int) self.assertIsInstance(coap.type, int) self.assertIsInstance(coap.token_len, int) self.assertIsInstance(coap.code, int) self.assertIsInstance(coap.mid, int) self.assertIsInstanceOrNull(coap.token, int) self.assertIsInstanceOrNull(coap.opt.uri_path_recon, str) self.assertIsInstanceOrNull(coap.payload, bytearray) print(p.coap.tlv.type, p.coap.tlv) assert isinstance(coap.tlv, LayerFieldsContainer), repr(coap.tlv) self.assertIsInstanceOrNull(coap.tlv.type, list) def _test_mle(self, p): mle = p.mle self._must_have_wpan_aux_sec(p) self.assertIsInstance(mle.cmd, int) self.assertIsInstanceOrNull(mle.tlv.mode.receiver_on_idle, int) self.assertIsInstanceOrNull(mle.tlv.mode.reserved1, int) self.assertIsInstanceOrNull(mle.tlv.mode.reserved2, int) self.assertIsInstanceOrNull(mle.tlv.mode.device_type_bit, int) self.assertIsInstanceOrNull(mle.tlv.mode.network_data, int) self.assertIsInstanceOrNull(mle.tlv.challenge, Bytes) self.assertIsInstanceOrNull(mle.tlv.scan_mask.r, int) self.assertIsInstanceOrNull(mle.tlv.scan_mask.e, int) self.assertIsInstanceOrNull(mle.tlv.version, int) self.assertIsInstanceOrNull(mle.tlv.source_addr, int) self.assertIsInstanceOrNull(mle.tlv.active_tstamp, int) self.assertIsInstanceOrNull(mle.tlv.leader_data.partition_id, int) self.assertIsInstanceOrNull(mle.tlv.leader_data.weighting, int) self.assertIsInstanceOrNull(mle.tlv.leader_data.data_version, int) self.assertIsInstanceOrNull(mle.tlv.leader_data.stable_data_version, int) self.assertIsInstanceOrNull(mle.tlv.leader_data.router_id, int) def _test_wpan(self, p): wpan = p.wpan self.assertIsInstance(wpan.fcf, int) self.assertIsInstance(wpan.fcs, int) self.assertIsInstance(wpan.security, int) self.assertIsInstance(wpan.pending, int) self.assertIsInstance(wpan.ack_request, int) self.assertIsInstance(wpan.pan_id_compression, int) self.assertIsInstance(wpan.seqno_suppression, int) self.assertIsInstance(wpan.ie_present, int) self.assertIsInstance(wpan.dst_addr_mode, int) self.assertIsInstance(wpan.version, int) self.assertIsInstance(wpan.src_addr_mode, int) self.assertIsInstance(wpan.seq_no, int) if not wpan.is_ack: self.assertIsInstanceOrNull(wpan.dst_pan, int) self.assertIsInstanceOrNull(wpan.dst16, int) self.assertIsInstanceOrNull(wpan.src16, int) self.assertIsInstanceOrNull(wpan.src64, ExtAddr) self.assertIsInstanceOrNull(wpan.dst64, ExtAddr) if wpan.aux_sec: self._must_have_wpan_aux_sec(p) def _must_have_wpan_aux_sec(self, p): wpan = p.wpan self.assertIsInstanceOrNull(wpan.aux_sec.sec_suite, int) self.assertIsInstanceOrNull(wpan.aux_sec.security_control_field, int) self.assertIsInstanceOrNull(wpan.aux_sec.sec_level, int) self.assertIsInstanceOrNull(wpan.aux_sec.key_id_mode, int) self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter_suppression, int) self.assertIsInstanceOrNull(wpan.aux_sec.asn_in_nonce, int) self.assertIsInstanceOrNull(wpan.aux_sec.reserved, int) self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter, int) self.assertIsInstanceOrNull(wpan.aux_sec.key_source, int) self.assertIsInstanceOrNull(wpan.aux_sec.key_index, int) def assertIsInstanceOrNull(self, field, type): if field is not nullField: self.assertIsInstance(field, type) def _test_thread_bl(self, p): thread_bl = p.thread_bl self.assertTrue(thread_bl) self.assertIsInstanceOrNull(thread_bl.tlv.target_eid, Ipv6Addr) self.assertIsInstanceOrNull(thread_bl.tlv.ml_eid, ExtAddr) self.assertIsInstanceOrNull(thread_bl.tlv.last_transaction_time, int) self.assertIsInstanceOrNull(p.thread_meshcop.tlv.net_name, list) def _test_thread_meshcop(self, p: Packet): thread_meshcop = p.thread_meshcop for layer in sorted(p.layers, key=lambda l: l.layer_name): if 'thread_meshcop.tlv.commissioner_sess_id' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.commissioner_sess_id, int) if 'thread_meshcop.tlv.net_name' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.net_name, list) if 'thread_meshcop.tlv.channel_page' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.channel_page, int) if 'thread_meshcop.tlv.channel' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.channel, list) if 'thread_meshcop.tlv.chan_mask_page' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.chan_mask_page, int) if 'thread_meshcop.tlv.chan_mask_len' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.chan_mask_len, int) if 'thread_meshcop.tlv.chan_mask_mask' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.chan_mask_mask, int) if 'thread_meshcop.tlv.panid' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.panid, int) if 'thread_meshcop.tlv.ml_prefix' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.ml_prefix, Bytes) if 'thread_meshcop.tlv.master_key' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.master_key, Bytes) if 'thread_meshcop.tlv.pskc' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.pskc, Bytes) if 'thread_meshcop.tlv.sec_policy_rot' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.sec_policy_rot, int) if 'thread_meshcop.tlv.sec_policy_o' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.sec_policy_o, int) if 'thread_meshcop.tlv.sec_policy_n' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.sec_policy_n, int) if 'thread_meshcop.tlv.sec_policy_r' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.sec_policy_r, int) if 'thread_meshcop.tlv.sec_policy_c' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.sec_policy_c, int) if 'thread_meshcop.tlv.sec_policy_b' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.sec_policy_b, int) if 'thread_meshcop.tlv.pan_id' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.pan_id, list) if 'thread_meshcop.tlv.xpan_id' in layer._layer._all_fields: self.assertIsInstance(thread_meshcop.tlv.xpan_id, Bytes) for field in layer._layer._all_fields: if field.startswith('thread_meshcop') and not layer_fields.is_layer_field(field) and field not in ( 'thread_meshcop.tlv', 'thread_meshcop.tlv.type', 'thread_meshcop.tlv.len8'): print('found %s = %s in layer %s' % ( field, layer._layer.get_field(field), layer.layer_name, )) def _test_icmpv6(self, p): icmpv6 = p.icmpv6 self.assertTrue(p.icmpv6) self.assertIsInstance(icmpv6.type, int) self.assertIsInstance(icmpv6.code, int) self.assertIsInstance(icmpv6.checksum, int) self.assertIsInstanceOrNull(icmpv6.reserved, int) self.assertIsInstanceOrNull(icmpv6.nd.na.flag.s, int) self.assertIsInstanceOrNull(icmpv6.nd.na.flag.o, int) self.assertIsInstanceOrNull(icmpv6.nd.na.flag.r, int) self.assertIsInstanceOrNull(icmpv6.nd.na.flag.rsv, int) self.assertIsInstanceOrNull(icmpv6.nd.ra.cur_hop_limit, int) self.assertIsInstanceOrNull(icmpv6.mldr.nb_mcast_records, int) self.assertIsInstanceOrNull(icmpv6.nd.ns.target_address, Ipv6Addr) self.assertIsInstanceOrNull(icmpv6.mldr.mar.multicast_address, list) def get_field(self, p: Packet, f): secs = f.split('.') assert len(secs) >= 2 v = p for sec in secs: v = getattr(v, sec) return v def _test_6lowpan(self, p): lowpan = p.lowpan assert lowpan is getattr(p, '6lowpan') self.assertIsInstanceOrNull(lowpan.src, Ipv6Addr) self.assertIsInstanceOrNull(lowpan.dst, Ipv6Addr) self.assertIsInstanceOrNull(lowpan.udp.src, int) self.assertIsInstanceOrNull(lowpan.udp.dst, int) self.assertIsInstanceOrNull(lowpan.udp.checksum, int) self.assertIsInstanceOrNull(lowpan.frag.size, int) self.assertIsInstanceOrNull(lowpan.frag.tag, int) self.assertIsInstanceOrNull(lowpan.frag.offset, int) self.assertIsInstanceOrNull(lowpan.nhc.pattern, list) self.assertIsInstanceOrNull(lowpan.nhc.udp.checksum, int) self.assertIsInstanceOrNull(lowpan.nhc.udp.ports, int) self.assertIsInstanceOrNull(lowpan.pattern, list) self.assertIsInstanceOrNull(lowpan.iphc.tf, int) self.assertIsInstanceOrNull(lowpan.iphc.nh, int) self.assertIsInstanceOrNull(lowpan.iphc.hlim, int) self.assertIsInstanceOrNull(lowpan.iphc.cid, int) self.assertIsInstanceOrNull(lowpan.iphc.sac, int) self.assertIsInstanceOrNull(lowpan.iphc.sam, int) self.assertIsInstanceOrNull(lowpan.iphc.m, int) self.assertIsInstanceOrNull(lowpan.iphc.dac, int) self.assertIsInstanceOrNull(lowpan.iphc.dam, int) self.assertIsInstanceOrNull(lowpan.iphc.sctx.prefix, Bytes) self.assertIsInstanceOrNull(lowpan.iphc.dctx.prefix, Bytes) def _test_ip(self, p): pass def _test_ipv6(self, p): pass def _test_udp(self, p): pass def _test_eth(self, p): eth = p.eth self.assertIsInstance(eth.src, EthAddr) self.assertIsInstance(eth.dst, EthAddr) self.assertIsInstance(eth.type, int) def _check_missing_fields(self, p, layer_name, _layer): for f in sorted(_layer._all_fields.keys(), reverse=True): if f.startswith('_ws') or f.startswith('data'): continue logging.info('_check_missing_fields in layer %s: %s = %r' % (layer_name, f, _layer._all_fields[f])) if f in { '', 'icmpv6.checksum.status', 'ip.ttl.lncb', 'wpan.aux_sec.key_source.bytes', 'wpan.src64.origin' }: # TODO: handle these fields continue v = _layer._all_fields[f] if layer_fields.is_layer_field_container(f): continue try: rv = self.get_field(p, f) self.assertIsNot(rv, nullField) parser = layer_fields._LAYER_FIELDS[f] if isinstance(parser, layer_fields._first): parser = parser._sub_parse if parser in (layer_fields._raw_hex, layer_fields._hex, layer_fields._raw_hex_rev, layer_fields._dec, layer_fields._auto): self.assertIsInstance(rv, int) elif isinstance(parser, layer_fields._list): self.assertIsInstance(rv, list) elif parser is layer_fields._ipv6_addr: self.assertIsInstance(rv, Ipv6Addr) elif parser is layer_fields._eth_addr: self.assertIsInstance(rv, EthAddr) elif parser is layer_fields._ext_addr: self.assertIsInstance(rv, ExtAddr) elif parser is layer_fields._str: self.assertIsInstance(rv, str) elif parser is layer_fields._bytes: self.assertIsInstance(rv, Bytes) elif parser is layer_fields._payload: self.assertIsInstance(rv, bytearray) elif parser is layer_fields._float: self.assertIsInstance(rv, float) else: raise NotImplementedError(parser) except Exception: logging.info('checking [%s] %s=%r, %r, %r (%d)' % (layer_name, f, v, v.get_default_value(), v.raw_value, len(v.fields))) raise