1# -*- Mode: Python; tab-width: 4 -*- 2# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <rushing@nightmare.com> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28r"""A class supporting chat-style (command/response) protocols. 29 30This class adds support for 'chat' style protocols - where one side 31sends a 'command', and the other sends a response (examples would be 32the common internet protocols - smtp, nntp, ftp, etc..). 33 34The handle_read() method looks at the input stream for the current 35'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' 36for multi-line output), calling self.found_terminator() on its 37receipt. 38 39for example: 40Say you build an async nntp client using this class. At the start 41of the connection, you'll have self.terminator set to '\r\n', in 42order to process the single-line greeting. Just before issuing a 43'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST 44command will be accumulated (using your own 'collect_incoming_data' 45method) up to the terminator, and then control will be returned to 46you - by calling your self.found_terminator() method. 47""" 48import asyncore 49from collections import deque 50 51 52class async_chat(asyncore.dispatcher): 53 """This is an abstract class. You must derive from this class, and add 54 the two methods collect_incoming_data() and found_terminator()""" 55 56 # these are overridable defaults 57 58 ac_in_buffer_size = 65536 59 ac_out_buffer_size = 65536 60 61 # we don't want to enable the use of encoding by default, because that is a 62 # sign of an application bug that we don't want to pass silently 63 64 use_encoding = 0 65 encoding = 'latin-1' 66 67 def __init__(self, sock=None, map=None): 68 # for string terminator matching 69 self.ac_in_buffer = b'' 70 71 # we use a list here rather than io.BytesIO for a few reasons... 72 # del lst[:] is faster than bio.truncate(0) 73 # lst = [] is faster than bio.truncate(0) 74 self.incoming = [] 75 76 # we toss the use of the "simple producer" and replace it with 77 # a pure deque, which the original fifo was a wrapping of 78 self.producer_fifo = deque() 79 asyncore.dispatcher.__init__(self, sock, map) 80 81 def collect_incoming_data(self, data): 82 raise NotImplementedError("must be implemented in subclass") 83 84 def _collect_incoming_data(self, data): 85 self.incoming.append(data) 86 87 def _get_data(self): 88 d = b''.join(self.incoming) 89 del self.incoming[:] 90 return d 91 92 def found_terminator(self): 93 raise NotImplementedError("must be implemented in subclass") 94 95 def set_terminator(self, term): 96 """Set the input delimiter. 97 98 Can be a fixed string of any length, an integer, or None. 99 """ 100 if isinstance(term, str) and self.use_encoding: 101 term = bytes(term, self.encoding) 102 elif isinstance(term, int) and term < 0: 103 raise ValueError('the number of received bytes must be positive') 104 self.terminator = term 105 106 def get_terminator(self): 107 return self.terminator 108 109 # grab some more data from the socket, 110 # throw it to the collector method, 111 # check for the terminator, 112 # if found, transition to the next state. 113 114 def handle_read(self): 115 116 try: 117 data = self.recv(self.ac_in_buffer_size) 118 except BlockingIOError: 119 return 120 except OSError: 121 self.handle_error() 122 return 123 124 if isinstance(data, str) and self.use_encoding: 125 data = bytes(str, self.encoding) 126 self.ac_in_buffer = self.ac_in_buffer + data 127 128 # Continue to search for self.terminator in self.ac_in_buffer, 129 # while calling self.collect_incoming_data. The while loop 130 # is necessary because we might read several data+terminator 131 # combos with a single recv(4096). 132 133 while self.ac_in_buffer: 134 lb = len(self.ac_in_buffer) 135 terminator = self.get_terminator() 136 if not terminator: 137 # no terminator, collect it all 138 self.collect_incoming_data(self.ac_in_buffer) 139 self.ac_in_buffer = b'' 140 elif isinstance(terminator, int): 141 # numeric terminator 142 n = terminator 143 if lb < n: 144 self.collect_incoming_data(self.ac_in_buffer) 145 self.ac_in_buffer = b'' 146 self.terminator = self.terminator - lb 147 else: 148 self.collect_incoming_data(self.ac_in_buffer[:n]) 149 self.ac_in_buffer = self.ac_in_buffer[n:] 150 self.terminator = 0 151 self.found_terminator() 152 else: 153 # 3 cases: 154 # 1) end of buffer matches terminator exactly: 155 # collect data, transition 156 # 2) end of buffer matches some prefix: 157 # collect data to the prefix 158 # 3) end of buffer does not match any prefix: 159 # collect data 160 terminator_len = len(terminator) 161 index = self.ac_in_buffer.find(terminator) 162 if index != -1: 163 # we found the terminator 164 if index > 0: 165 # don't bother reporting the empty string 166 # (source of subtle bugs) 167 self.collect_incoming_data(self.ac_in_buffer[:index]) 168 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] 169 # This does the Right Thing if the terminator 170 # is changed here. 171 self.found_terminator() 172 else: 173 # check for a prefix of the terminator 174 index = find_prefix_at_end(self.ac_in_buffer, terminator) 175 if index: 176 if index != lb: 177 # we found a prefix, collect up to the prefix 178 self.collect_incoming_data(self.ac_in_buffer[:-index]) 179 self.ac_in_buffer = self.ac_in_buffer[-index:] 180 break 181 else: 182 # no prefix, collect it all 183 self.collect_incoming_data(self.ac_in_buffer) 184 self.ac_in_buffer = b'' 185 186 def handle_write(self): 187 self.initiate_send() 188 189 def handle_close(self): 190 self.close() 191 192 def push(self, data): 193 if not isinstance(data, (bytes, bytearray, memoryview)): 194 raise TypeError('data argument must be byte-ish (%r)', 195 type(data)) 196 sabs = self.ac_out_buffer_size 197 if len(data) > sabs: 198 for i in range(0, len(data), sabs): 199 self.producer_fifo.append(data[i:i+sabs]) 200 else: 201 self.producer_fifo.append(data) 202 self.initiate_send() 203 204 def push_with_producer(self, producer): 205 self.producer_fifo.append(producer) 206 self.initiate_send() 207 208 def readable(self): 209 "predicate for inclusion in the readable for select()" 210 # cannot use the old predicate, it violates the claim of the 211 # set_terminator method. 212 213 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) 214 return 1 215 216 def writable(self): 217 "predicate for inclusion in the writable for select()" 218 return self.producer_fifo or (not self.connected) 219 220 def close_when_done(self): 221 "automatically close this channel once the outgoing queue is empty" 222 self.producer_fifo.append(None) 223 224 def initiate_send(self): 225 while self.producer_fifo and self.connected: 226 first = self.producer_fifo[0] 227 # handle empty string/buffer or None entry 228 if not first: 229 del self.producer_fifo[0] 230 if first is None: 231 self.handle_close() 232 return 233 234 # handle classic producer behavior 235 obs = self.ac_out_buffer_size 236 try: 237 data = first[:obs] 238 except TypeError: 239 data = first.more() 240 if data: 241 self.producer_fifo.appendleft(data) 242 else: 243 del self.producer_fifo[0] 244 continue 245 246 if isinstance(data, str) and self.use_encoding: 247 data = bytes(data, self.encoding) 248 249 # send the data 250 try: 251 num_sent = self.send(data) 252 except OSError: 253 self.handle_error() 254 return 255 256 if num_sent: 257 if num_sent < len(data) or obs < len(first): 258 self.producer_fifo[0] = first[num_sent:] 259 else: 260 del self.producer_fifo[0] 261 # we tried to send some actual data 262 return 263 264 def discard_buffers(self): 265 # Emergencies only! 266 self.ac_in_buffer = b'' 267 del self.incoming[:] 268 self.producer_fifo.clear() 269 270 271class simple_producer: 272 273 def __init__(self, data, buffer_size=512): 274 self.data = data 275 self.buffer_size = buffer_size 276 277 def more(self): 278 if len(self.data) > self.buffer_size: 279 result = self.data[:self.buffer_size] 280 self.data = self.data[self.buffer_size:] 281 return result 282 else: 283 result = self.data 284 self.data = b'' 285 return result 286 287 288# Given 'haystack', see if any prefix of 'needle' is at its end. This 289# assumes an exact match has already been checked. Return the number of 290# characters matched. 291# for example: 292# f_p_a_e("qwerty\r", "\r\n") => 1 293# f_p_a_e("qwertydkjf", "\r\n") => 0 294# f_p_a_e("qwerty\r\n", "\r\n") => <undefined> 295 296# this could maybe be made faster with a computed regex? 297# [answer: no; circa Python-2.0, Jan 2001] 298# new python: 28961/s 299# old python: 18307/s 300# re: 12820/s 301# regex: 14035/s 302 303def find_prefix_at_end(haystack, needle): 304 l = len(needle) - 1 305 while l and not haystack.endswith(needle[:l]): 306 l -= 1 307 return l 308