1# -*- Mode: Python; tab-width: 4 -*- 2# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <rushing@nightmare.com> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28r"""A class supporting chat-style (command/response) protocols. 29 30This class adds support for 'chat' style protocols - where one side 31sends a 'command', and the other sends a response (examples would be 32the common internet protocols - smtp, nntp, ftp, etc..). 33 34The handle_read() method looks at the input stream for the current 35'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' 36for multi-line output), calling self.found_terminator() on its 37receipt. 38 39for example: 40Say you build an async nntp client using this class. At the start 41of the connection, you'll have self.terminator set to '\r\n', in 42order to process the single-line greeting. Just before issuing a 43'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST 44command will be accumulated (using your own 'collect_incoming_data' 45method) up to the terminator, and then control will be returned to 46you - by calling your self.found_terminator() method. 47""" 48import asyncore 49from collections import deque 50 51from warnings import warn 52warn( 53 'The asynchat module is deprecated. ' 54 'The recommended replacement is asyncio', 55 DeprecationWarning, 56 stacklevel=2) 57 58 59 60class async_chat(asyncore.dispatcher): 61 """This is an abstract class. You must derive from this class, and add 62 the two methods collect_incoming_data() and found_terminator()""" 63 64 # these are overridable defaults 65 66 ac_in_buffer_size = 65536 67 ac_out_buffer_size = 65536 68 69 # we don't want to enable the use of encoding by default, because that is a 70 # sign of an application bug that we don't want to pass silently 71 72 use_encoding = 0 73 encoding = 'latin-1' 74 75 def __init__(self, sock=None, map=None): 76 # for string terminator matching 77 self.ac_in_buffer = b'' 78 79 # we use a list here rather than io.BytesIO for a few reasons... 80 # del lst[:] is faster than bio.truncate(0) 81 # lst = [] is faster than bio.truncate(0) 82 self.incoming = [] 83 84 # we toss the use of the "simple producer" and replace it with 85 # a pure deque, which the original fifo was a wrapping of 86 self.producer_fifo = deque() 87 asyncore.dispatcher.__init__(self, sock, map) 88 89 def collect_incoming_data(self, data): 90 raise NotImplementedError("must be implemented in subclass") 91 92 def _collect_incoming_data(self, data): 93 self.incoming.append(data) 94 95 def _get_data(self): 96 d = b''.join(self.incoming) 97 del self.incoming[:] 98 return d 99 100 def found_terminator(self): 101 raise NotImplementedError("must be implemented in subclass") 102 103 def set_terminator(self, term): 104 """Set the input delimiter. 105 106 Can be a fixed string of any length, an integer, or None. 107 """ 108 if isinstance(term, str) and self.use_encoding: 109 term = bytes(term, self.encoding) 110 elif isinstance(term, int) and term < 0: 111 raise ValueError('the number of received bytes must be positive') 112 self.terminator = term 113 114 def get_terminator(self): 115 return self.terminator 116 117 # grab some more data from the socket, 118 # throw it to the collector method, 119 # check for the terminator, 120 # if found, transition to the next state. 121 122 def handle_read(self): 123 124 try: 125 data = self.recv(self.ac_in_buffer_size) 126 except BlockingIOError: 127 return 128 except OSError: 129 self.handle_error() 130 return 131 132 if isinstance(data, str) and self.use_encoding: 133 data = bytes(str, self.encoding) 134 self.ac_in_buffer = self.ac_in_buffer + data 135 136 # Continue to search for self.terminator in self.ac_in_buffer, 137 # while calling self.collect_incoming_data. The while loop 138 # is necessary because we might read several data+terminator 139 # combos with a single recv(4096). 140 141 while self.ac_in_buffer: 142 lb = len(self.ac_in_buffer) 143 terminator = self.get_terminator() 144 if not terminator: 145 # no terminator, collect it all 146 self.collect_incoming_data(self.ac_in_buffer) 147 self.ac_in_buffer = b'' 148 elif isinstance(terminator, int): 149 # numeric terminator 150 n = terminator 151 if lb < n: 152 self.collect_incoming_data(self.ac_in_buffer) 153 self.ac_in_buffer = b'' 154 self.terminator = self.terminator - lb 155 else: 156 self.collect_incoming_data(self.ac_in_buffer[:n]) 157 self.ac_in_buffer = self.ac_in_buffer[n:] 158 self.terminator = 0 159 self.found_terminator() 160 else: 161 # 3 cases: 162 # 1) end of buffer matches terminator exactly: 163 # collect data, transition 164 # 2) end of buffer matches some prefix: 165 # collect data to the prefix 166 # 3) end of buffer does not match any prefix: 167 # collect data 168 terminator_len = len(terminator) 169 index = self.ac_in_buffer.find(terminator) 170 if index != -1: 171 # we found the terminator 172 if index > 0: 173 # don't bother reporting the empty string 174 # (source of subtle bugs) 175 self.collect_incoming_data(self.ac_in_buffer[:index]) 176 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] 177 # This does the Right Thing if the terminator 178 # is changed here. 179 self.found_terminator() 180 else: 181 # check for a prefix of the terminator 182 index = find_prefix_at_end(self.ac_in_buffer, terminator) 183 if index: 184 if index != lb: 185 # we found a prefix, collect up to the prefix 186 self.collect_incoming_data(self.ac_in_buffer[:-index]) 187 self.ac_in_buffer = self.ac_in_buffer[-index:] 188 break 189 else: 190 # no prefix, collect it all 191 self.collect_incoming_data(self.ac_in_buffer) 192 self.ac_in_buffer = b'' 193 194 def handle_write(self): 195 self.initiate_send() 196 197 def handle_close(self): 198 self.close() 199 200 def push(self, data): 201 if not isinstance(data, (bytes, bytearray, memoryview)): 202 raise TypeError('data argument must be byte-ish (%r)', 203 type(data)) 204 sabs = self.ac_out_buffer_size 205 if len(data) > sabs: 206 for i in range(0, len(data), sabs): 207 self.producer_fifo.append(data[i:i+sabs]) 208 else: 209 self.producer_fifo.append(data) 210 self.initiate_send() 211 212 def push_with_producer(self, producer): 213 self.producer_fifo.append(producer) 214 self.initiate_send() 215 216 def readable(self): 217 "predicate for inclusion in the readable for select()" 218 # cannot use the old predicate, it violates the claim of the 219 # set_terminator method. 220 221 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) 222 return 1 223 224 def writable(self): 225 "predicate for inclusion in the writable for select()" 226 return self.producer_fifo or (not self.connected) 227 228 def close_when_done(self): 229 "automatically close this channel once the outgoing queue is empty" 230 self.producer_fifo.append(None) 231 232 def initiate_send(self): 233 while self.producer_fifo and self.connected: 234 first = self.producer_fifo[0] 235 # handle empty string/buffer or None entry 236 if not first: 237 del self.producer_fifo[0] 238 if first is None: 239 self.handle_close() 240 return 241 242 # handle classic producer behavior 243 obs = self.ac_out_buffer_size 244 try: 245 data = first[:obs] 246 except TypeError: 247 data = first.more() 248 if data: 249 self.producer_fifo.appendleft(data) 250 else: 251 del self.producer_fifo[0] 252 continue 253 254 if isinstance(data, str) and self.use_encoding: 255 data = bytes(data, self.encoding) 256 257 # send the data 258 try: 259 num_sent = self.send(data) 260 except OSError: 261 self.handle_error() 262 return 263 264 if num_sent: 265 if num_sent < len(data) or obs < len(first): 266 self.producer_fifo[0] = first[num_sent:] 267 else: 268 del self.producer_fifo[0] 269 # we tried to send some actual data 270 return 271 272 def discard_buffers(self): 273 # Emergencies only! 274 self.ac_in_buffer = b'' 275 del self.incoming[:] 276 self.producer_fifo.clear() 277 278 279class simple_producer: 280 281 def __init__(self, data, buffer_size=512): 282 self.data = data 283 self.buffer_size = buffer_size 284 285 def more(self): 286 if len(self.data) > self.buffer_size: 287 result = self.data[:self.buffer_size] 288 self.data = self.data[self.buffer_size:] 289 return result 290 else: 291 result = self.data 292 self.data = b'' 293 return result 294 295 296# Given 'haystack', see if any prefix of 'needle' is at its end. This 297# assumes an exact match has already been checked. Return the number of 298# characters matched. 299# for example: 300# f_p_a_e("qwerty\r", "\r\n") => 1 301# f_p_a_e("qwertydkjf", "\r\n") => 0 302# f_p_a_e("qwerty\r\n", "\r\n") => <undefined> 303 304# this could maybe be made faster with a computed regex? 305# [answer: no; circa Python-2.0, Jan 2001] 306# new python: 28961/s 307# old python: 18307/s 308# re: 12820/s 309# regex: 14035/s 310 311def find_prefix_at_end(haystack, needle): 312 l = len(needle) - 1 313 while l and not haystack.endswith(needle[:l]): 314 l -= 1 315 return l 316