1#!/usr/bin/python 2""" 3Copyright 2016 Google Inc. All Rights Reserved. 4 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16""" 17import asyncore 18import gc 19import logging 20import Queue 21import signal 22import socket 23import sys 24import threading 25import time 26 27server = None 28in_pipe = None 29out_pipe = None 30must_exit = False 31options = None 32dest_addresses = None 33connections = {} 34dns_cache = {} 35port_mappings = None 36map_localhost = False 37needs_flush = False 38flush_pipes = False 39REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0 40 41 42def PrintMessage(msg): 43 # Print the message to stdout & flush to make sure that the message is not 44 # buffered when tsproxy is run as a subprocess. 45 print >> sys.stdout, msg 46 sys.stdout.flush() 47 48 49######################################################################################################################## 50# Traffic-shaping pipe (just passthrough for now) 51######################################################################################################################## 52class TSPipe(): 53 PIPE_IN = 0 54 PIPE_OUT = 1 55 56 def __init__(self, direction, latency, kbps): 57 self.direction = direction 58 self.latency = latency 59 self.kbps = kbps 60 self.queue = Queue.Queue() 61 self.last_tick = time.clock() 62 self.next_message = None 63 self.available_bytes = .0 64 self.peer = 'server' 65 if self.direction == self.PIPE_IN: 66 self.peer = 'client' 67 68 def SendMessage(self, message): 69 global connections 70 try: 71 connection_id = message['connection'] 72 if connection_id in connections and self.peer in connections[connection_id]: 73 now = time.clock() 74 if message['message'] == 'closed': 75 message['time'] = now 76 else: 77 message['time'] = time.clock() + self.latency 78 message['size'] = .0 79 if 'data' in message: 80 message['size'] = float(len(message['data'])) 81 self.queue.put(message) 82 except: 83 pass 84 85 def tick(self): 86 global connections 87 global flush_pipes 88 processed_messages = False 89 now = time.clock() 90 try: 91 if self.next_message is None: 92 self.next_message = self.queue.get_nowait() 93 94 # Accumulate bandwidth if an available packet/message was waiting since our last tick 95 if self.next_message is not None and self.kbps > .0 and self.next_message['time'] <= now: 96 elapsed = now - self.last_tick 97 accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0 98 self.available_bytes += accumulated_bytes 99 100 # process messages as long as the next message is sendable (latency or available bytes) 101 while (self.next_message is not None) and\ 102 (flush_pipes or ((self.next_message['time'] <= now) and\ 103 (self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))): 104 processed_messages = True 105 self.queue.task_done() 106 connection_id = self.next_message['connection'] 107 if connection_id in connections: 108 if self.peer in connections[connection_id]: 109 try: 110 if self.kbps > .0: 111 self.available_bytes -= self.next_message['size'] 112 connections[connection_id][self.peer].handle_message(self.next_message) 113 except: 114 # Clean up any disconnected connections 115 try: 116 connections[connection_id]['server'].close() 117 except: 118 pass 119 try: 120 connections[connection_id]['client'].close() 121 except: 122 pass 123 del connections[connection_id] 124 self.next_message = None 125 self.next_message = self.queue.get_nowait() 126 except: 127 pass 128 129 # Only accumulate bytes while we have messages that are ready to send 130 if self.next_message is None or self.next_message['time'] > now: 131 self.available_bytes = .0 132 self.last_tick = now 133 134 return processed_messages 135 136 137######################################################################################################################## 138# Threaded DNS resolver 139######################################################################################################################## 140class AsyncDNS(threading.Thread): 141 def __init__(self, client_id, hostname, port, result_pipe): 142 threading.Thread.__init__(self) 143 self.hostname = hostname 144 self.port = port 145 self.client_id = client_id 146 self.result_pipe = result_pipe 147 148 def run(self): 149 try: 150 addresses = socket.getaddrinfo(self.hostname, self.port) 151 logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port)) 152 except: 153 addresses = () 154 logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port)) 155 message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses} 156 self.result_pipe.SendMessage(message) 157 158 159######################################################################################################################## 160# TCP Client 161######################################################################################################################## 162class TCPConnection(asyncore.dispatcher): 163 STATE_ERROR = -1 164 STATE_IDLE = 0 165 STATE_RESOLVING = 1 166 STATE_CONNECTING = 2 167 STATE_CONNECTED = 3 168 169 def __init__(self, client_id): 170 global options 171 asyncore.dispatcher.__init__(self) 172 self.client_id = client_id 173 self.state = self.STATE_IDLE 174 self.buffer = '' 175 self.addr = None 176 self.dns_thread = None 177 self.hostname = None 178 self.port = None 179 self.needs_config = True 180 self.needs_close = False 181 self.read_available = False 182 self.window_available = options.window 183 self.is_localhost = False 184 self.did_resolve = False; 185 186 def SendMessage(self, type, message): 187 message['message'] = type 188 message['connection'] = self.client_id 189 in_pipe.SendMessage(message) 190 191 def handle_message(self, message): 192 if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED: 193 if not self.needs_close: 194 self.buffer += message['data'] 195 self.SendMessage('ack', {}) 196 elif message['message'] == 'ack': 197 # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB) 198 self.window_available = min(self.window_available + 2, 350) 199 if self.read_available: 200 self.handle_read() 201 elif message['message'] == 'resolve': 202 self.HandleResolve(message) 203 elif message['message'] == 'connect': 204 self.HandleConnect(message) 205 elif message['message'] == 'closed': 206 if len(self.buffer) == 0: 207 self.handle_close() 208 else: 209 self.needs_close = True 210 211 def handle_error(self): 212 logging.warning('[{0:d}] Error'.format(self.client_id)) 213 if self.state == self.STATE_CONNECTING: 214 self.SendMessage('connected', {'success': False, 'address': self.addr}) 215 216 def handle_close(self): 217 logging.info('[{0:d}] Server Connection Closed'.format(self.client_id)) 218 self.state = self.STATE_ERROR 219 self.close() 220 try: 221 if self.client_id in connections: 222 if 'server' in connections[self.client_id]: 223 del connections[self.client_id]['server'] 224 if 'client' in connections[self.client_id]: 225 self.SendMessage('closed', {}) 226 else: 227 del connections[self.client_id] 228 except: 229 pass 230 231 def writable(self): 232 if self.state == self.STATE_CONNECTING: 233 self.state = self.STATE_CONNECTED 234 self.SendMessage('connected', {'success': True, 'address': self.addr}) 235 logging.info('[{0:d}] Connected'.format(self.client_id)) 236 return (len(self.buffer) > 0 and self.state == self.STATE_CONNECTED) 237 238 def handle_write(self): 239 if self.needs_config: 240 self.needs_config = False 241 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 242 sent = self.send(self.buffer) 243 logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent)) 244 self.buffer = self.buffer[sent:] 245 if self.needs_close and len(self.buffer) == 0: 246 self.needs_close = False 247 self.handle_close() 248 249 def handle_read(self): 250 if self.window_available == 0: 251 self.read_available = True 252 return 253 self.read_available = False 254 try: 255 while self.window_available > 0: 256 data = self.recv(1460) 257 if data: 258 if self.state == self.STATE_CONNECTED: 259 self.window_available -= 1 260 logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data))) 261 self.SendMessage('data', {'data': data}) 262 else: 263 return 264 except: 265 pass 266 267 def HandleResolve(self, message): 268 global in_pipe 269 global map_localhost 270 self.did_resolve = True 271 if 'hostname' in message: 272 self.hostname = message['hostname'] 273 self.port = 0 274 if 'port' in message: 275 self.port = message['port'] 276 logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostname, self.port)) 277 if self.hostname == 'localhost': 278 self.hostname = '127.0.0.1' 279 if self.hostname == '127.0.0.1': 280 logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id)) 281 self.is_localhost = True 282 if (dest_addresses is not None) and (not self.is_localhost or map_localhost): 283 self.SendMessage('resolved', {'addresses': dest_addresses}) 284 else: 285 self.state = self.STATE_RESOLVING 286 self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe) 287 self.dns_thread.start() 288 289 def HandleConnect(self, message): 290 global map_localhost 291 if 'addresses' in message and len(message['addresses']): 292 self.state = self.STATE_CONNECTING 293 if not self.did_resolve and message['addresses'][0] == '127.0.0.1': 294 logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id)) 295 self.is_localhost = True 296 if (dest_addresses is not None) and (not self.is_localhost or map_localhost): 297 self.addr = dest_addresses[0] 298 else: 299 self.addr = message['addresses'][0] 300 self.create_socket(self.addr[0], socket.SOCK_STREAM) 301 addr = self.addr[4][0] 302 if not self.is_localhost or map_localhost: 303 port = GetDestPort(message['port']) 304 else: 305 port = message['port'] 306 logging.info('[{0:d}] Connecting to {1}:{2:d}'.format(self.client_id, addr, port)) 307 self.connect((addr, port)) 308 309 310######################################################################################################################## 311# Socks5 Server 312######################################################################################################################## 313class Socks5Server(asyncore.dispatcher): 314 315 def __init__(self, host, port): 316 asyncore.dispatcher.__init__(self) 317 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 318 try: 319 #self.set_reuse_addr() 320 self.bind((host, port)) 321 self.listen(socket.SOMAXCONN) 322 self.ipaddr, self.port = self.getsockname() 323 self.current_client_id = 0 324 except: 325 PrintMessage("Unable to listen on {0}:{1}. Is the port already in use?".format(host, port)) 326 exit(1) 327 328 def handle_accept(self): 329 global connections 330 pair = self.accept() 331 if pair is not None: 332 sock, addr = pair 333 self.current_client_id += 1 334 logging.info('[{0:d}] Incoming connection from {1}'.format(self.current_client_id, repr(addr))) 335 connections[self.current_client_id] = { 336 'client' : Socks5Connection(sock, self.current_client_id), 337 'server' : None 338 } 339 340 341# Socks5 reference: https://en.wikipedia.org/wiki/SOCKS#SOCKS5 342class Socks5Connection(asyncore.dispatcher): 343 STATE_ERROR = -1 344 STATE_WAITING_FOR_HANDSHAKE = 0 345 STATE_WAITING_FOR_CONNECT_REQUEST = 1 346 STATE_RESOLVING = 2 347 STATE_CONNECTING = 3 348 STATE_CONNECTED = 4 349 350 def __init__(self, connected_socket, client_id): 351 global options 352 asyncore.dispatcher.__init__(self, connected_socket) 353 self.client_id = client_id 354 self.state = self.STATE_WAITING_FOR_HANDSHAKE 355 self.ip = None 356 self.addresses = None 357 self.hostname = None 358 self.port = None 359 self.requested_address = None 360 self.buffer = '' 361 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 362 self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460) 363 self.needs_close = False 364 self.read_available = False 365 self.window_available = options.window 366 367 def SendMessage(self, type, message): 368 message['message'] = type 369 message['connection'] = self.client_id 370 out_pipe.SendMessage(message) 371 372 def handle_message(self, message): 373 if message['message'] == 'data' and 'data' in message and len(message['data']) and self.state == self.STATE_CONNECTED: 374 if not self.needs_close: 375 self.buffer += message['data'] 376 self.SendMessage('ack', {}) 377 elif message['message'] == 'ack': 378 # Increase the congestion window by 2 packets for every packet transmitted up to 350 packets (~512KB) 379 self.window_available = min(self.window_available + 2, 350) 380 if self.read_available: 381 self.handle_read() 382 elif message['message'] == 'resolved': 383 self.HandleResolved(message) 384 elif message['message'] == 'connected': 385 self.HandleConnected(message) 386 elif message['message'] == 'closed': 387 if len(self.buffer) == 0: 388 self.handle_close() 389 else: 390 self.needs_close = True 391 392 def writable(self): 393 return (len(self.buffer) > 0) 394 395 def handle_write(self): 396 sent = self.send(self.buffer) 397 logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent)) 398 self.buffer = self.buffer[sent:] 399 if self.needs_close and len(self.buffer) == 0: 400 self.needs_close = False 401 self.handle_close() 402 403 def handle_read(self): 404 global connections 405 global dns_cache 406 if self.window_available == 0: 407 self.read_available = True 408 return 409 self.read_available = False 410 try: 411 while self.window_available > 0: 412 # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames) 413 data = self.recv(1460) 414 if data: 415 data_len = len(data) 416 if self.state == self.STATE_CONNECTED: 417 logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len)) 418 self.window_available -= 1 419 self.SendMessage('data', {'data': data}) 420 elif self.state == self.STATE_WAITING_FOR_HANDSHAKE: 421 self.state = self.STATE_ERROR #default to an error state, set correctly if things work out 422 if data_len >= 2 and ord(data[0]) == 0x05: 423 supports_no_auth = False 424 auth_count = ord(data[1]) 425 if data_len == auth_count + 2: 426 for i in range(auth_count): 427 offset = i + 2 428 if ord(data[offset]) == 0: 429 supports_no_auth = True 430 if supports_no_auth: 431 # Respond with a message that "No Authentication" was agreed to 432 logging.info('[{0:d}] New Socks5 client'.format(self.client_id)) 433 response = chr(0x05) + chr(0x00) 434 self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST 435 self.buffer += response 436 elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST: 437 self.state = self.STATE_ERROR #default to an error state, set correctly if things work out 438 if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00: 439 if ord(data[1]) == 0x01: #TCP connection (only supported method for now) 440 connections[self.client_id]['server'] = TCPConnection(self.client_id) 441 self.requested_address = data[3:] 442 port_offset = 0 443 if ord(data[3]) == 0x01: 444 port_offset = 8 445 self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(data[5]), ord(data[6]), ord(data[7])) 446 elif ord(data[3]) == 0x03: 447 name_len = ord(data[4]) 448 if data_len >= 6 + name_len: 449 port_offset = 5 + name_len 450 self.hostname = data[5:5 + name_len] 451 elif ord(data[3]) == 0x04 and data_len >= 22: 452 port_offset = 20 453 self.ip = '' 454 for i in range(16): 455 self.ip += '{0:02x}'.format(ord(data[4 + i])) 456 if i % 2 and i < 15: 457 self.ip += ':' 458 if port_offset and connections[self.client_id]['server'] is not None: 459 self.port = 256 * ord(data[port_offset]) + ord(data[port_offset + 1]) 460 if self.port: 461 if self.ip is None and self.hostname is not None: 462 if self.hostname in dns_cache: 463 self.state = self.STATE_CONNECTING 464 self.addresses = dns_cache[self.hostname] 465 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port}) 466 else: 467 self.state = self.STATE_RESOLVING 468 self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port}) 469 elif self.ip is not None: 470 self.state = self.STATE_CONNECTING 471 self.addresses = socket.getaddrinfo(self.ip, self.port) 472 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port}) 473 else: 474 return 475 except: 476 pass 477 478 def handle_close(self): 479 logging.info('[{0:d}] Browser Connection Closed'.format(self.client_id)) 480 self.state = self.STATE_ERROR 481 self.close() 482 try: 483 if self.client_id in connections: 484 if 'client' in connections[self.client_id]: 485 del connections[self.client_id]['client'] 486 if 'server' in connections[self.client_id]: 487 self.SendMessage('closed', {}) 488 else: 489 del connections[self.client_id] 490 except: 491 pass 492 493 def HandleResolved(self, message): 494 global dns_cache 495 if self.state == self.STATE_RESOLVING: 496 if 'addresses' in message and len(message['addresses']): 497 self.state = self.STATE_CONNECTING 498 self.addresses = message['addresses'] 499 dns_cache[self.hostname] = self.addresses 500 logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname)) 501 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port}) 502 else: 503 # Send host unreachable error 504 self.state = self.STATE_ERROR 505 self.buffer += chr(0x05) + chr(0x04) + self.requested_address 506 507 def HandleConnected(self, message): 508 if 'success' in message and self.state == self.STATE_CONNECTING: 509 response = chr(0x05) 510 if message['success']: 511 response += chr(0x00) 512 logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hostname)) 513 self.state = self.STATE_CONNECTED 514 else: 515 response += chr(0x04) 516 self.state = self.STATE_ERROR 517 response += chr(0x00) 518 response += self.requested_address 519 self.buffer += response 520 521 522######################################################################################################################## 523# stdin command processor 524######################################################################################################################## 525class CommandProcessor(): 526 def __init__(self): 527 thread = threading.Thread(target = self.run, args=()) 528 thread.daemon = True 529 thread.start() 530 531 def run(self): 532 global must_exit 533 while not must_exit: 534 for line in iter(sys.stdin.readline, ''): 535 self.ProcessCommand(line.strip()) 536 537 def ProcessCommand(self, input): 538 global in_pipe 539 global out_pipe 540 global needs_flush 541 global REMOVE_TCP_OVERHEAD 542 if len(input): 543 ok = False 544 try: 545 command = input.split() 546 if len(command) and len(command[0]): 547 if command[0].lower() == 'flush': 548 needs_flush = True 549 ok = True 550 elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'rtt' and len(command[2]): 551 rtt = float(command[2]) 552 latency = rtt / 2000.0 553 in_pipe.latency = latency 554 out_pipe.latency = latency 555 needs_flush = True 556 ok = True 557 elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'inkbps' and len(command[2]): 558 in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD 559 needs_flush = True 560 ok = True 561 elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'outkbps' and len(command[2]): 562 out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD 563 needs_flush = True 564 ok = True 565 elif len(command) >= 3 and command[0].lower() == 'set' and command[1].lower() == 'mapports' and len(command[2]): 566 SetPortMappings(command[2]) 567 needs_flush = True 568 ok = True 569 except: 570 pass 571 if not ok: 572 PrintMessage('ERROR') 573 574 575######################################################################################################################## 576# Main Entry Point 577######################################################################################################################## 578def main(): 579 global server 580 global options 581 global in_pipe 582 global out_pipe 583 global dest_addresses 584 global port_mappings 585 global map_localhost 586 import argparse 587 global REMOVE_TCP_OVERHEAD 588 parser = argparse.ArgumentParser(description='Traffic-shaping socks5 proxy.', 589 prog='tsproxy') 590 parser.add_argument('-v', '--verbose', action='count', help="Increase verbosity (specify multiple times for more). -vvvv for full debug output.") 591 parser.add_argument('-b', '--bind', default='localhost', help="Server interface address (defaults to localhost).") 592 parser.add_argument('-p', '--port', type=int, default=1080, help="Server port (defaults to 1080, use 0 for randomly assigned).") 593 parser.add_argument('-r', '--rtt', type=float, default=.0, help="Round Trip Time Latency (in ms).") 594 parser.add_argument('-i', '--inkbps', type=float, default=.0, help="Download Bandwidth (in 1000 bits/s - Kbps).") 595 parser.add_argument('-o', '--outkbps', type=float, default=.0, help="Upload Bandwidth (in 1000 bits/s - Kbps).") 596 parser.add_argument('-w', '--window', type=int, default=10, help="Emulated TCP initial congestion window (defaults to 10).") 597 parser.add_argument('-d', '--desthost', help="Redirect all outbound connections to the specified host.") 598 parser.add_argument('-m', '--mapports', help="Remap outbound ports. Comma-separated list of original:new with * as a wildcard. --mapports '443:8443,*:8080'") 599 parser.add_argument('-l', '--localhost', action='store_true', default=False, 600 help="Include connections already destined for localhost/127.0.0.1 in the host and port remapping.") 601 options = parser.parse_args() 602 603 # Set up logging 604 log_level = logging.CRITICAL 605 if options.verbose == 1: 606 log_level = logging.ERROR 607 elif options.verbose == 2: 608 log_level = logging.WARNING 609 elif options.verbose == 3: 610 log_level = logging.INFO 611 elif options.verbose >= 4: 612 log_level = logging.DEBUG 613 logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S") 614 615 # Parse any port mappings 616 if options.mapports: 617 SetPortMappings(options.mapports) 618 619 map_localhost = options.localhost 620 621 # Resolve the address for a rewrite destination host if one was specified 622 if options.desthost: 623 dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80)) 624 625 # Set up the pipes. 1/2 of the latency gets applied in each direction (and /1000 to convert to seconds) 626 in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE_TCP_OVERHEAD) 627 out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REMOVE_TCP_OVERHEAD) 628 629 signal.signal(signal.SIGINT, signal_handler) 630 server = Socks5Server(options.bind, options.port) 631 command_processor = CommandProcessor() 632 PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.format(server.ipaddr, server.port)) 633 run_loop() 634 635def signal_handler(signal, frame): 636 global server 637 global must_exit 638 logging.error('Exiting...') 639 must_exit = True 640 del server 641 642 643# Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms 644def run_loop(): 645 global must_exit 646 global in_pipe 647 global out_pipe 648 global needs_flush 649 global flush_pipes 650 gc_check_count = 0 651 last_activity = time.clock() 652 # disable gc to avoid pauses during traffic shaping/proxying 653 gc.disable() 654 while not must_exit: 655 asyncore.poll(0.001, asyncore.socket_map) 656 if needs_flush: 657 flush_pipes = True 658 needs_flush = False 659 if in_pipe.tick(): 660 last_activity = time.clock() 661 if out_pipe.tick(): 662 last_activity = time.clock() 663 if flush_pipes: 664 PrintMessage('OK') 665 flush_pipes = False 666 # Every 500 loops (~0.5 second) check to see if it is a good time to do a gc 667 if gc_check_count > 1000: 668 gc_check_count = 0 669 # manually gc after 5 seconds of idle 670 if time.clock() - last_activity >= 5: 671 last_activity = time.clock() 672 logging.debug("Triggering manual GC") 673 gc.collect() 674 else: 675 gc_check_count += 1 676 677 678def GetDestPort(port): 679 global port_mappings 680 if port_mappings is not None: 681 src_port = str(port) 682 if src_port in port_mappings: 683 return port_mappings[src_port] 684 elif 'default' in port_mappings: 685 return port_mappings['default'] 686 return port 687 688 689def SetPortMappings(map_string): 690 global port_mappings 691 port_mappings = {} 692 map_string = map_string.strip('\'" \t\r\n') 693 for pair in map_string.split(','): 694 (src, dest) = pair.split(':') 695 if src == '*': 696 port_mappings['default'] = int(dest) 697 logging.debug("Default port mapped to port {0}".format(dest)) 698 else: 699 logging.debug("Port {0} mapped to port {1}".format(src, dest)) 700 port_mappings[src] = int(dest) 701 702 703if '__main__' == __name__: 704 main() 705