1"""Mock socket module used by the smtpd and smtplib tests. 2""" 3 4# imported for _GLOBAL_DEFAULT_TIMEOUT 5import socket as socket_module 6 7# Mock socket module 8_defaulttimeout = None 9_reply_data = None 10 11# This is used to queue up data to be read through socket.makefile, typically 12# *before* the socket object is even created. It is intended to handle a single 13# line which the socket will feed on recv() or makefile(). 14def reply_with(line): 15 global _reply_data 16 _reply_data = line 17 18 19class MockFile: 20 """Mock file object returned by MockSocket.makefile(). 21 """ 22 def __init__(self, lines): 23 self.lines = lines 24 def readline(self, limit=-1): 25 result = self.lines.pop(0) + b'\r\n' 26 if limit >= 0: 27 # Re-insert the line, removing the \r\n we added. 28 self.lines.insert(0, result[limit:-2]) 29 result = result[:limit] 30 return result 31 def close(self): 32 pass 33 34 35class MockSocket: 36 """Mock socket object used by smtpd and smtplib tests. 37 """ 38 def __init__(self, family=None): 39 global _reply_data 40 self.family = family 41 self.output = [] 42 self.lines = [] 43 if _reply_data: 44 self.lines.append(_reply_data) 45 _reply_data = None 46 self.conn = None 47 self.timeout = None 48 49 def queue_recv(self, line): 50 self.lines.append(line) 51 52 def recv(self, bufsize, flags=None): 53 data = self.lines.pop(0) + b'\r\n' 54 return data 55 56 def fileno(self): 57 return 0 58 59 def settimeout(self, timeout): 60 if timeout is None: 61 self.timeout = _defaulttimeout 62 else: 63 self.timeout = timeout 64 65 def gettimeout(self): 66 return self.timeout 67 68 def setsockopt(self, level, optname, value): 69 pass 70 71 def getsockopt(self, level, optname, buflen=None): 72 return 0 73 74 def bind(self, address): 75 pass 76 77 def accept(self): 78 self.conn = MockSocket() 79 return self.conn, 'c' 80 81 def getsockname(self): 82 return ('0.0.0.0', 0) 83 84 def setblocking(self, flag): 85 pass 86 87 def listen(self, backlog): 88 pass 89 90 def makefile(self, mode='r', bufsize=-1): 91 handle = MockFile(self.lines) 92 return handle 93 94 def sendall(self, data, flags=None): 95 self.last = data 96 self.output.append(data) 97 return len(data) 98 99 def send(self, data, flags=None): 100 self.last = data 101 self.output.append(data) 102 return len(data) 103 104 def getpeername(self): 105 return ('peer-address', 'peer-port') 106 107 def close(self): 108 pass 109 110 def connect(self, host): 111 pass 112 113 114def socket(family=None, type=None, proto=None): 115 return MockSocket(family) 116 117def create_connection(address, timeout=socket_module._GLOBAL_DEFAULT_TIMEOUT, 118 source_address=None): 119 try: 120 int_port = int(address[1]) 121 except ValueError: 122 raise error 123 ms = MockSocket() 124 if timeout is socket_module._GLOBAL_DEFAULT_TIMEOUT: 125 timeout = getdefaulttimeout() 126 ms.settimeout(timeout) 127 return ms 128 129 130def setdefaulttimeout(timeout): 131 global _defaulttimeout 132 _defaulttimeout = timeout 133 134 135def getdefaulttimeout(): 136 return _defaulttimeout 137 138 139def getfqdn(): 140 return "" 141 142 143def gethostname(): 144 pass 145 146 147def gethostbyname(name): 148 return "" 149 150def getaddrinfo(*args, **kw): 151 return socket_module.getaddrinfo(*args, **kw) 152 153gaierror = socket_module.gaierror 154error = socket_module.error 155 156 157# Constants 158_GLOBAL_DEFAULT_TIMEOUT = socket_module._GLOBAL_DEFAULT_TIMEOUT 159AF_INET = socket_module.AF_INET 160AF_INET6 = socket_module.AF_INET6 161SOCK_STREAM = socket_module.SOCK_STREAM 162SOL_SOCKET = None 163SO_REUSEADDR = None 164 165if hasattr(socket_module, 'AF_UNIX'): 166 AF_UNIX = socket_module.AF_UNIX 167