1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7SNMP (Simple Network Management Protocol). 8""" 9 10from scapy.packet import bind_layers, bind_bottom_up 11from scapy.asn1packet import ASN1_Packet 12from scapy.asn1fields import ASN1F_INTEGER, ASN1F_IPADDRESS, ASN1F_OID, \ 13 ASN1F_SEQUENCE, ASN1F_SEQUENCE_OF, ASN1F_STRING, ASN1F_TIME_TICKS, \ 14 ASN1F_enum_INTEGER, ASN1F_field, ASN1F_CHOICE, ASN1F_optional, ASN1F_NULL 15from scapy.asn1.asn1 import ASN1_Class_UNIVERSAL, ASN1_Codecs, ASN1_NULL, \ 16 ASN1_SEQUENCE 17from scapy.asn1.ber import BERcodec_SEQUENCE 18from scapy.sendrecv import sr1 19from scapy.volatile import RandShort, IntAutoTime 20from scapy.layers.inet import UDP, IP, ICMP 21 22# Import needed to initialize conf.mib 23from scapy.asn1.mib import conf # noqa: F401 24 25########## 26# SNMP # 27########## 28 29# [ ASN1 class ] # 30 31 32class ASN1_Class_SNMP(ASN1_Class_UNIVERSAL): 33 name = "SNMP" 34 PDU_GET = 0xa0 35 PDU_NEXT = 0xa1 36 PDU_RESPONSE = 0xa2 37 PDU_SET = 0xa3 38 PDU_TRAPv1 = 0xa4 39 PDU_BULK = 0xa5 40 PDU_INFORM = 0xa6 41 PDU_TRAPv2 = 0xa7 42 43 44class ASN1_SNMP_PDU_GET(ASN1_SEQUENCE): 45 tag = ASN1_Class_SNMP.PDU_GET 46 47 48class ASN1_SNMP_PDU_NEXT(ASN1_SEQUENCE): 49 tag = ASN1_Class_SNMP.PDU_NEXT 50 51 52class ASN1_SNMP_PDU_RESPONSE(ASN1_SEQUENCE): 53 tag = ASN1_Class_SNMP.PDU_RESPONSE 54 55 56class ASN1_SNMP_PDU_SET(ASN1_SEQUENCE): 57 tag = ASN1_Class_SNMP.PDU_SET 58 59 60class ASN1_SNMP_PDU_TRAPv1(ASN1_SEQUENCE): 61 tag = ASN1_Class_SNMP.PDU_TRAPv1 62 63 64class ASN1_SNMP_PDU_BULK(ASN1_SEQUENCE): 65 tag = ASN1_Class_SNMP.PDU_BULK 66 67 68class ASN1_SNMP_PDU_INFORM(ASN1_SEQUENCE): 69 tag = ASN1_Class_SNMP.PDU_INFORM 70 71 72class ASN1_SNMP_PDU_TRAPv2(ASN1_SEQUENCE): 73 tag = ASN1_Class_SNMP.PDU_TRAPv2 74 75 76# [ BER codecs ] # 77 78class BERcodec_SNMP_PDU_GET(BERcodec_SEQUENCE): 79 tag = ASN1_Class_SNMP.PDU_GET 80 81 82class BERcodec_SNMP_PDU_NEXT(BERcodec_SEQUENCE): 83 tag = ASN1_Class_SNMP.PDU_NEXT 84 85 86class BERcodec_SNMP_PDU_RESPONSE(BERcodec_SEQUENCE): 87 tag = ASN1_Class_SNMP.PDU_RESPONSE 88 89 90class BERcodec_SNMP_PDU_SET(BERcodec_SEQUENCE): 91 tag = ASN1_Class_SNMP.PDU_SET 92 93 94class BERcodec_SNMP_PDU_TRAPv1(BERcodec_SEQUENCE): 95 tag = ASN1_Class_SNMP.PDU_TRAPv1 96 97 98class BERcodec_SNMP_PDU_BULK(BERcodec_SEQUENCE): 99 tag = ASN1_Class_SNMP.PDU_BULK 100 101 102class BERcodec_SNMP_PDU_INFORM(BERcodec_SEQUENCE): 103 tag = ASN1_Class_SNMP.PDU_INFORM 104 105 106class BERcodec_SNMP_PDU_TRAPv2(BERcodec_SEQUENCE): 107 tag = ASN1_Class_SNMP.PDU_TRAPv2 108 109 110# [ ASN1 fields ] # 111 112class ASN1F_SNMP_PDU_GET(ASN1F_SEQUENCE): 113 ASN1_tag = ASN1_Class_SNMP.PDU_GET 114 115 116class ASN1F_SNMP_PDU_NEXT(ASN1F_SEQUENCE): 117 ASN1_tag = ASN1_Class_SNMP.PDU_NEXT 118 119 120class ASN1F_SNMP_PDU_RESPONSE(ASN1F_SEQUENCE): 121 ASN1_tag = ASN1_Class_SNMP.PDU_RESPONSE 122 123 124class ASN1F_SNMP_PDU_SET(ASN1F_SEQUENCE): 125 ASN1_tag = ASN1_Class_SNMP.PDU_SET 126 127 128class ASN1F_SNMP_PDU_TRAPv1(ASN1F_SEQUENCE): 129 ASN1_tag = ASN1_Class_SNMP.PDU_TRAPv1 130 131 132class ASN1F_SNMP_PDU_BULK(ASN1F_SEQUENCE): 133 ASN1_tag = ASN1_Class_SNMP.PDU_BULK 134 135 136class ASN1F_SNMP_PDU_INFORM(ASN1F_SEQUENCE): 137 ASN1_tag = ASN1_Class_SNMP.PDU_INFORM 138 139 140class ASN1F_SNMP_PDU_TRAPv2(ASN1F_SEQUENCE): 141 ASN1_tag = ASN1_Class_SNMP.PDU_TRAPv2 142 143 144# [ SNMP Packet ] # 145 146 147SNMP_error = {0: "no_error", 148 1: "too_big", 149 2: "no_such_name", 150 3: "bad_value", 151 4: "read_only", 152 5: "generic_error", 153 6: "no_access", 154 7: "wrong_type", 155 8: "wrong_length", 156 9: "wrong_encoding", 157 10: "wrong_value", 158 11: "no_creation", 159 12: "inconsistent_value", 160 13: "resource_unavailable", 161 14: "commit_failed", 162 15: "undo_failed", 163 16: "authorization_error", 164 17: "not_writable", 165 18: "inconsistent_name", 166 } 167 168SNMP_trap_types = {0: "cold_start", 169 1: "warm_start", 170 2: "link_down", 171 3: "link_up", 172 4: "auth_failure", 173 5: "egp_neigh_loss", 174 6: "enterprise_specific", 175 } 176 177 178class SNMPvarbind(ASN1_Packet): 179 ASN1_codec = ASN1_Codecs.BER 180 ASN1_root = ASN1F_SEQUENCE( 181 ASN1F_OID("oid", "1.3"), 182 ASN1F_optional( 183 ASN1F_field("value", ASN1_NULL(0)) 184 ), 185 186 # exceptions in responses 187 ASN1F_optional(ASN1F_NULL("noSuchObject", None, implicit_tag=0x80)), 188 ASN1F_optional(ASN1F_NULL("noSuchInstance", None, implicit_tag=0x81)), 189 ASN1F_optional(ASN1F_NULL("endOfMibView", None, implicit_tag=0x82)), 190 ) 191 192 193class SNMPget(ASN1_Packet): 194 ASN1_codec = ASN1_Codecs.BER 195 ASN1_root = ASN1F_SNMP_PDU_GET(ASN1F_INTEGER("id", 0), 196 ASN1F_enum_INTEGER("error", 0, SNMP_error), 197 ASN1F_INTEGER("error_index", 0), 198 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 199 ) 200 201 202class SNMPnext(ASN1_Packet): 203 ASN1_codec = ASN1_Codecs.BER 204 ASN1_root = ASN1F_SNMP_PDU_NEXT(ASN1F_INTEGER("id", 0), 205 ASN1F_enum_INTEGER("error", 0, SNMP_error), 206 ASN1F_INTEGER("error_index", 0), 207 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 208 ) 209 210 211class SNMPresponse(ASN1_Packet): 212 ASN1_codec = ASN1_Codecs.BER 213 ASN1_root = ASN1F_SNMP_PDU_RESPONSE(ASN1F_INTEGER("id", 0), 214 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501 215 ASN1F_INTEGER("error_index", 0), 216 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 217 ) 218 219 220class SNMPset(ASN1_Packet): 221 ASN1_codec = ASN1_Codecs.BER 222 ASN1_root = ASN1F_SNMP_PDU_SET(ASN1F_INTEGER("id", 0), 223 ASN1F_enum_INTEGER("error", 0, SNMP_error), 224 ASN1F_INTEGER("error_index", 0), 225 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 226 ) 227 228 229class SNMPtrapv1(ASN1_Packet): 230 ASN1_codec = ASN1_Codecs.BER 231 ASN1_root = ASN1F_SNMP_PDU_TRAPv1(ASN1F_OID("enterprise", "1.3"), 232 ASN1F_IPADDRESS("agent_addr", "0.0.0.0"), 233 ASN1F_enum_INTEGER("generic_trap", 0, SNMP_trap_types), # noqa: E501 234 ASN1F_INTEGER("specific_trap", 0), 235 ASN1F_TIME_TICKS("time_stamp", IntAutoTime()), # noqa: E501 236 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 237 ) 238 239 240class SNMPbulk(ASN1_Packet): 241 ASN1_codec = ASN1_Codecs.BER 242 ASN1_root = ASN1F_SNMP_PDU_BULK(ASN1F_INTEGER("id", 0), 243 ASN1F_INTEGER("non_repeaters", 0), 244 ASN1F_INTEGER("max_repetitions", 0), 245 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 246 ) 247 248 249class SNMPinform(ASN1_Packet): 250 ASN1_codec = ASN1_Codecs.BER 251 ASN1_root = ASN1F_SNMP_PDU_INFORM(ASN1F_INTEGER("id", 0), 252 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501 253 ASN1F_INTEGER("error_index", 0), 254 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 255 ) 256 257 258class SNMPtrapv2(ASN1_Packet): 259 ASN1_codec = ASN1_Codecs.BER 260 ASN1_root = ASN1F_SNMP_PDU_TRAPv2(ASN1F_INTEGER("id", 0), 261 ASN1F_enum_INTEGER("error", 0, SNMP_error), # noqa: E501 262 ASN1F_INTEGER("error_index", 0), 263 ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind) # noqa: E501 264 ) 265 266 267class SNMP(ASN1_Packet): 268 ASN1_codec = ASN1_Codecs.BER 269 ASN1_root = ASN1F_SEQUENCE( 270 ASN1F_enum_INTEGER("version", 1, {0: "v1", 1: "v2c", 2: "v2", 3: "v3"}), # noqa: E501 271 ASN1F_STRING("community", "public"), 272 ASN1F_CHOICE("PDU", SNMPget(), 273 SNMPget, SNMPnext, SNMPresponse, SNMPset, 274 SNMPtrapv1, SNMPbulk, SNMPinform, SNMPtrapv2) 275 ) 276 277 def answers(self, other): 278 return (isinstance(self.PDU, SNMPresponse) and 279 isinstance(other.PDU, (SNMPget, SNMPnext, SNMPset)) and 280 self.PDU.id == other.PDU.id) 281 282 283bind_bottom_up(UDP, SNMP, sport=161) 284bind_bottom_up(UDP, SNMP, dport=161) 285bind_bottom_up(UDP, SNMP, sport=162) 286bind_bottom_up(UDP, SNMP, dport=162) 287bind_layers(UDP, SNMP, sport=161, dport=161) 288 289 290def snmpwalk(dst, oid="1", community="public"): 291 try: 292 while True: 293 r = sr1(IP(dst=dst) / UDP(sport=RandShort()) / SNMP(community=community, PDU=SNMPnext(varbindlist=[SNMPvarbind(oid=oid)])), timeout=2, chainCC=1, verbose=0, retry=2) # noqa: E501 294 if r is None: 295 print("No answers") 296 break 297 if ICMP in r: 298 print(repr(r)) 299 break 300 print("%-40s: %r" % (r[SNMPvarbind].oid.val, r[SNMPvarbind].value)) 301 oid = r[SNMPvarbind].oid 302 303 except KeyboardInterrupt: 304 pass 305