1# -*- Mode: Python; tab-width: 4 -*- 2# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <rushing@nightmare.com> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28r"""A class supporting chat-style (command/response) protocols. 29 30This class adds support for 'chat' style protocols - where one side 31sends a 'command', and the other sends a response (examples would be 32the common internet protocols - smtp, nntp, ftp, etc..). 33 34The handle_read() method looks at the input stream for the current 35'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' 36for multi-line output), calling self.found_terminator() on its 37receipt. 38 39for example: 40Say you build an async nntp client using this class. At the start 41of the connection, you'll have self.terminator set to '\r\n', in 42order to process the single-line greeting. Just before issuing a 43'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST 44command will be accumulated (using your own 'collect_incoming_data' 45method) up to the terminator, and then control will be returned to 46you - by calling your self.found_terminator() method. 47""" 48 49import asyncore 50import errno 51import socket 52from collections import deque 53from sys import py3kwarning 54from warnings import filterwarnings, catch_warnings 55 56_BLOCKING_IO_ERRORS = (errno.EAGAIN, errno.EALREADY, errno.EINPROGRESS, 57 errno.EWOULDBLOCK) 58 59 60class async_chat (asyncore.dispatcher): 61 """This is an abstract class. You must derive from this class, and add 62 the two methods collect_incoming_data() and found_terminator()""" 63 64 # these are overridable defaults 65 66 ac_in_buffer_size = 4096 67 ac_out_buffer_size = 4096 68 69 def __init__ (self, sock=None, map=None): 70 # for string terminator matching 71 self.ac_in_buffer = '' 72 73 # we use a list here rather than cStringIO for a few reasons... 74 # del lst[:] is faster than sio.truncate(0) 75 # lst = [] is faster than sio.truncate(0) 76 # cStringIO will be gaining unicode support in py3k, which 77 # will negatively affect the performance of bytes compared to 78 # a ''.join() equivalent 79 self.incoming = [] 80 81 # we toss the use of the "simple producer" and replace it with 82 # a pure deque, which the original fifo was a wrapping of 83 self.producer_fifo = deque() 84 asyncore.dispatcher.__init__ (self, sock, map) 85 86 def collect_incoming_data(self, data): 87 raise NotImplementedError("must be implemented in subclass") 88 89 def _collect_incoming_data(self, data): 90 self.incoming.append(data) 91 92 def _get_data(self): 93 d = ''.join(self.incoming) 94 del self.incoming[:] 95 return d 96 97 def found_terminator(self): 98 raise NotImplementedError("must be implemented in subclass") 99 100 def set_terminator (self, term): 101 "Set the input delimiter. Can be a fixed string of any length, an integer, or None" 102 self.terminator = term 103 104 def get_terminator (self): 105 return self.terminator 106 107 # grab some more data from the socket, 108 # throw it to the collector method, 109 # check for the terminator, 110 # if found, transition to the next state. 111 112 def handle_read (self): 113 114 try: 115 data = self.recv (self.ac_in_buffer_size) 116 except socket.error, why: 117 if why.args[0] in _BLOCKING_IO_ERRORS: 118 return 119 self.handle_error() 120 return 121 122 self.ac_in_buffer = self.ac_in_buffer + data 123 124 # Continue to search for self.terminator in self.ac_in_buffer, 125 # while calling self.collect_incoming_data. The while loop 126 # is necessary because we might read several data+terminator 127 # combos with a single recv(4096). 128 129 while self.ac_in_buffer: 130 lb = len(self.ac_in_buffer) 131 terminator = self.get_terminator() 132 if not terminator: 133 # no terminator, collect it all 134 self.collect_incoming_data (self.ac_in_buffer) 135 self.ac_in_buffer = '' 136 elif isinstance(terminator, (int, long)): 137 # numeric terminator 138 n = terminator 139 if lb < n: 140 self.collect_incoming_data (self.ac_in_buffer) 141 self.ac_in_buffer = '' 142 self.terminator = self.terminator - lb 143 else: 144 self.collect_incoming_data (self.ac_in_buffer[:n]) 145 self.ac_in_buffer = self.ac_in_buffer[n:] 146 self.terminator = 0 147 self.found_terminator() 148 else: 149 # 3 cases: 150 # 1) end of buffer matches terminator exactly: 151 # collect data, transition 152 # 2) end of buffer matches some prefix: 153 # collect data to the prefix 154 # 3) end of buffer does not match any prefix: 155 # collect data 156 terminator_len = len(terminator) 157 index = self.ac_in_buffer.find(terminator) 158 if index != -1: 159 # we found the terminator 160 if index > 0: 161 # don't bother reporting the empty string (source of subtle bugs) 162 self.collect_incoming_data (self.ac_in_buffer[:index]) 163 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] 164 # This does the Right Thing if the terminator is changed here. 165 self.found_terminator() 166 else: 167 # check for a prefix of the terminator 168 index = find_prefix_at_end (self.ac_in_buffer, terminator) 169 if index: 170 if index != lb: 171 # we found a prefix, collect up to the prefix 172 self.collect_incoming_data (self.ac_in_buffer[:-index]) 173 self.ac_in_buffer = self.ac_in_buffer[-index:] 174 break 175 else: 176 # no prefix, collect it all 177 self.collect_incoming_data (self.ac_in_buffer) 178 self.ac_in_buffer = '' 179 180 def handle_write (self): 181 self.initiate_send() 182 183 def handle_close (self): 184 self.close() 185 186 def push (self, data): 187 sabs = self.ac_out_buffer_size 188 if len(data) > sabs: 189 for i in xrange(0, len(data), sabs): 190 self.producer_fifo.append(data[i:i+sabs]) 191 else: 192 self.producer_fifo.append(data) 193 self.initiate_send() 194 195 def push_with_producer (self, producer): 196 self.producer_fifo.append(producer) 197 self.initiate_send() 198 199 def readable (self): 200 "predicate for inclusion in the readable for select()" 201 # cannot use the old predicate, it violates the claim of the 202 # set_terminator method. 203 204 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) 205 return 1 206 207 def writable (self): 208 "predicate for inclusion in the writable for select()" 209 return self.producer_fifo or (not self.connected) 210 211 def close_when_done (self): 212 "automatically close this channel once the outgoing queue is empty" 213 self.producer_fifo.append(None) 214 215 def initiate_send(self): 216 while self.producer_fifo and self.connected: 217 first = self.producer_fifo[0] 218 # handle empty string/buffer or None entry 219 if not first: 220 del self.producer_fifo[0] 221 if first is None: 222 self.handle_close() 223 return 224 225 # handle classic producer behavior 226 obs = self.ac_out_buffer_size 227 try: 228 with catch_warnings(): 229 if py3kwarning: 230 filterwarnings("ignore", ".*buffer", DeprecationWarning) 231 data = buffer(first, 0, obs) 232 except TypeError: 233 data = first.more() 234 if data: 235 self.producer_fifo.appendleft(data) 236 else: 237 del self.producer_fifo[0] 238 continue 239 240 # send the data 241 try: 242 num_sent = self.send(data) 243 except socket.error: 244 self.handle_error() 245 return 246 247 if num_sent: 248 if num_sent < len(data) or obs < len(first): 249 self.producer_fifo[0] = first[num_sent:] 250 else: 251 del self.producer_fifo[0] 252 # we tried to send some actual data 253 return 254 255 def discard_buffers (self): 256 # Emergencies only! 257 self.ac_in_buffer = '' 258 del self.incoming[:] 259 self.producer_fifo.clear() 260 261class simple_producer: 262 263 def __init__ (self, data, buffer_size=512): 264 self.data = data 265 self.buffer_size = buffer_size 266 267 def more (self): 268 if len (self.data) > self.buffer_size: 269 result = self.data[:self.buffer_size] 270 self.data = self.data[self.buffer_size:] 271 return result 272 else: 273 result = self.data 274 self.data = '' 275 return result 276 277class fifo: 278 def __init__ (self, list=None): 279 if not list: 280 self.list = deque() 281 else: 282 self.list = deque(list) 283 284 def __len__ (self): 285 return len(self.list) 286 287 def is_empty (self): 288 return not self.list 289 290 def first (self): 291 return self.list[0] 292 293 def push (self, data): 294 self.list.append(data) 295 296 def pop (self): 297 if self.list: 298 return (1, self.list.popleft()) 299 else: 300 return (0, None) 301 302# Given 'haystack', see if any prefix of 'needle' is at its end. This 303# assumes an exact match has already been checked. Return the number of 304# characters matched. 305# for example: 306# f_p_a_e ("qwerty\r", "\r\n") => 1 307# f_p_a_e ("qwertydkjf", "\r\n") => 0 308# f_p_a_e ("qwerty\r\n", "\r\n") => <undefined> 309 310# this could maybe be made faster with a computed regex? 311# [answer: no; circa Python-2.0, Jan 2001] 312# new python: 28961/s 313# old python: 18307/s 314# re: 12820/s 315# regex: 14035/s 316 317def find_prefix_at_end (haystack, needle): 318 l = len(needle) - 1 319 while l and not haystack.endswith(needle[:l]): 320 l -= 1 321 return l 322