1## This file is part of Scapy 2## See http://www.secdev.org/projects/scapy for more informations 3## Copyright (C) Philippe Biondi <phil@secdev.org> 4## This program is published under a GPLv2 license 5 6""" 7Answering machines. 8""" 9 10######################## 11## Answering machines ## 12######################## 13 14from __future__ import absolute_import 15from __future__ import print_function 16from scapy.sendrecv import send,sendp,sniff 17from scapy.config import conf 18from scapy.error import log_interactive 19import scapy.modules.six as six 20 21class ReferenceAM(type): 22 def __new__(cls, name, bases, dct): 23 o = super(ReferenceAM, cls).__new__(cls, name, bases, dct) 24 if o.function_name: 25 globals()[o.function_name] = lambda o=o,*args,**kargs: o(*args,**kargs)() 26 return o 27 28 29class AnsweringMachine(six.with_metaclass(ReferenceAM, object)): 30 function_name = "" 31 filter = None 32 sniff_options = { "store":0 } 33 sniff_options_list = [ "store", "iface", "count", "promisc", "filter", "type", "prn", "stop_filter" ] 34 send_options = { "verbose":0 } 35 send_options_list = ["iface", "inter", "loop", "verbose"] 36 send_function = staticmethod(send) 37 38 39 def __init__(self, **kargs): 40 self.mode = 0 41 if self.filter: 42 kargs.setdefault("filter",self.filter) 43 kargs.setdefault("prn", self.reply) 44 self.optam1 = {} 45 self.optam2 = {} 46 self.optam0 = {} 47 doptsend,doptsniff = self.parse_all_options(1, kargs) 48 self.defoptsend = self.send_options.copy() 49 self.defoptsend.update(doptsend) 50 self.defoptsniff = self.sniff_options.copy() 51 self.defoptsniff.update(doptsniff) 52 self.optsend,self.optsniff = [{},{}] 53 54 def __getattr__(self, attr): 55 for d in [self.optam2, self.optam1]: 56 if attr in d: 57 return d[attr] 58 raise AttributeError(attr) 59 60 def __setattr__(self, attr, val): 61 mode = self.__dict__.get("mode",0) 62 if mode == 0: 63 self.__dict__[attr] = val 64 else: 65 [self.optam1, self.optam2][mode-1][attr] = val 66 67 def parse_options(self): 68 pass 69 70 def parse_all_options(self, mode, kargs): 71 sniffopt = {} 72 sendopt = {} 73 for k in list(kargs): # use list(): kargs is modified in the loop 74 if k in self.sniff_options_list: 75 sniffopt[k] = kargs[k] 76 if k in self.send_options_list: 77 sendopt[k] = kargs[k] 78 if k in self.sniff_options_list+self.send_options_list: 79 del kargs[k] 80 if mode != 2 or kargs: 81 if mode == 1: 82 self.optam0 = kargs 83 elif mode == 2 and kargs: 84 k = self.optam0.copy() 85 k.update(kargs) 86 self.parse_options(**k) 87 kargs = k 88 omode = self.__dict__.get("mode",0) 89 self.__dict__["mode"] = mode 90 self.parse_options(**kargs) 91 self.__dict__["mode"] = omode 92 return sendopt,sniffopt 93 94 def is_request(self, req): 95 return 1 96 97 def make_reply(self, req): 98 return req 99 100 def send_reply(self, reply): 101 self.send_function(reply, **self.optsend) 102 103 def print_reply(self, req, reply): 104 print("%s ==> %s" % (req.summary(),reply.summary())) 105 106 def reply(self, pkt): 107 if not self.is_request(pkt): 108 return 109 reply = self.make_reply(pkt) 110 self.send_reply(reply) 111 if conf.verb >= 0: 112 self.print_reply(pkt, reply) 113 114 def run(self, *args, **kargs): 115 log_interactive.warning("run() method deprecated. The instance is now callable") 116 self(*args,**kargs) 117 118 def __call__(self, *args, **kargs): 119 optsend,optsniff = self.parse_all_options(2,kargs) 120 self.optsend=self.defoptsend.copy() 121 self.optsend.update(optsend) 122 self.optsniff=self.defoptsniff.copy() 123 self.optsniff.update(optsniff) 124 125 try: 126 self.sniff() 127 except KeyboardInterrupt: 128 print("Interrupted by user") 129 130 def sniff(self): 131 sniff(**self.optsniff) 132 133