• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2010 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import BaseHTTPServer
17import certutils
18import collections
19import errno
20import logging
21import socket
22import SocketServer
23import ssl
24import sys
25import time
26import urlparse
27
28import daemonserver
29import httparchive
30import platformsettings
31import proxyshaper
32import sslproxy
33
34def _HandleSSLCertificateError():
35  """
36  This method is intended to be called from
37  BaseHTTPServer.HTTPServer.handle_error().
38  """
39  exc_type, exc_value, exc_traceback = sys.exc_info()
40  if isinstance(exc_value, ssl.SSLError):
41    return
42
43  raise
44
45
46class HttpProxyError(Exception):
47  """Module catch-all error."""
48  pass
49
50
51class HttpProxyServerError(HttpProxyError):
52  """Raised for errors like 'Address already in use'."""
53  pass
54
55
56class HttpArchiveHandler(BaseHTTPServer.BaseHTTPRequestHandler):
57  protocol_version = 'HTTP/1.1'  # override BaseHTTPServer setting
58
59  # Since we do lots of small wfile.write() calls, turn on buffering.
60  wbufsize = -1  # override StreamRequestHandler (a base class) setting
61
62  def setup(self):
63    """Override StreamRequestHandler method."""
64    BaseHTTPServer.BaseHTTPRequestHandler.setup(self)
65    if self.server.traffic_shaping_up_bps:
66      self.rfile = proxyshaper.RateLimitedFile(
67          self.server.get_active_request_count, self.rfile,
68          self.server.traffic_shaping_up_bps)
69    if self.server.traffic_shaping_down_bps:
70      self.wfile = proxyshaper.RateLimitedFile(
71          self.server.get_active_request_count, self.wfile,
72          self.server.traffic_shaping_down_bps)
73
74  # Make request handler logging match our logging format.
75  def log_request(self, code='-', size='-'):
76    pass
77
78  def log_error(self, format, *args):  # pylint:disable=redefined-builtin
79    logging.error(format, *args)
80
81  def log_message(self, format, *args):  # pylint:disable=redefined-builtin
82    logging.info(format, *args)
83
84  def read_request_body(self):
85    request_body = None
86    length = int(self.headers.get('content-length', 0)) or None
87    if length:
88      request_body = self.rfile.read(length)
89    return request_body
90
91  def get_header_dict(self):
92    return dict(self.headers.items())
93
94  def get_archived_http_request(self):
95    host = self.headers.get('host')
96    if host is None:
97      logging.error('Request without host header')
98      return None
99
100    parsed = urlparse.urlparse(self.path)
101    params = ';%s' % parsed.params if parsed.params else ''
102    query = '?%s' % parsed.query if parsed.query else ''
103    fragment = '#%s' % parsed.fragment if parsed.fragment else ''
104    full_path = '%s%s%s%s' % (parsed.path, params, query, fragment)
105
106    StubRequest = collections.namedtuple('StubRequest', ('host', 'full_path'))
107    request, response = StubRequest(host, full_path), None
108
109    self.server.log_url(request, response)
110
111    return httparchive.ArchivedHttpRequest(
112        self.command,
113        host,
114        full_path,
115        self.read_request_body(),
116        self.get_header_dict(),
117        self.server.is_ssl)
118
119  def send_archived_http_response(self, response):
120    try:
121      # We need to set the server name before we start the response.
122      is_chunked = response.is_chunked()
123      has_content_length = response.get_header('content-length') is not None
124      self.server_version = response.get_header('server', 'WebPageReplay')
125      self.sys_version = ''
126
127      if response.version == 10:
128        self.protocol_version = 'HTTP/1.0'
129
130      # If we don't have chunked encoding and there is no content length,
131      # we need to manually compute the content-length.
132      if not is_chunked and not has_content_length:
133        content_length = sum(len(c) for c in response.response_data)
134        response.headers.append(('content-length', str(content_length)))
135
136      is_replay = not self.server.http_archive_fetch.is_record_mode
137      if is_replay and self.server.traffic_shaping_delay_ms:
138        logging.debug('Using round trip delay: %sms',
139                      self.server.traffic_shaping_delay_ms)
140        time.sleep(self.server.traffic_shaping_delay_ms / 1000.0)
141      if is_replay and self.server.use_delays:
142        logging.debug('Using delays (ms): %s', response.delays)
143        time.sleep(response.delays['headers'] / 1000.0)
144        delays = response.delays['data']
145      else:
146        delays = [0] * len(response.response_data)
147      self.send_response(response.status, response.reason)
148      # TODO(mbelshe): This is lame - each write is a packet!
149      for header, value in response.headers:
150        if header in ('last-modified', 'expires'):
151          self.send_header(header, response.update_date(value))
152        elif header not in ('date', 'server'):
153          self.send_header(header, value)
154      self.end_headers()
155
156      for chunk, delay in zip(response.response_data, delays):
157        if delay:
158          self.wfile.flush()
159          time.sleep(delay / 1000.0)
160        if is_chunked:
161          # Write chunk length (hex) and data (e.g. "A\r\nTESSELATED\r\n").
162          self.wfile.write('%x\r\n%s\r\n' % (len(chunk), chunk))
163        else:
164          self.wfile.write(chunk)
165      if is_chunked:
166        self.wfile.write('0\r\n\r\n')  # write final, zero-length chunk.
167      self.wfile.flush()
168
169      # TODO(mbelshe): This connection close doesn't seem to work.
170      if response.version == 10:
171        self.close_connection = 1
172
173    except Exception, e:
174      logging.error('Error sending response for %s%s: %s',
175                    self.headers['host'], self.path, e)
176
177  def handle_one_request(self):
178    """Handle a single HTTP request.
179
180    This method overrides a method from BaseHTTPRequestHandler. When this
181    method returns, it must leave self.close_connection in the correct state.
182    If this method raises an exception, the state of self.close_connection
183    doesn't matter.
184    """
185    try:
186      self.raw_requestline = self.rfile.readline(65537)
187      self.do_parse_and_handle_one_request()
188    except socket.timeout, e:
189      # A read or a write timed out.  Discard this connection
190      self.log_error('Request timed out: %r', e)
191      self.close_connection = 1
192      return
193    except ssl.SSLError:
194      # There is insufficient information passed up the stack from OpenSSL to
195      # determine the true cause of the SSL error. This almost always happens
196      # because the client refuses to accept the self-signed certs of
197      # WebPageReplay.
198      self.close_connection = 1
199      return
200    except socket.error, e:
201      # Connection reset errors happen all the time due to the browser closing
202      # without terminating the connection properly.  They can be safely
203      # ignored.
204      if e[0] == errno.ECONNRESET:
205        self.close_connection = 1
206        return
207      raise
208
209
210  def do_parse_and_handle_one_request(self):
211    start_time = time.time()
212    self.server.num_active_requests += 1
213    request = None
214    try:
215      if len(self.raw_requestline) > 65536:
216        self.requestline = ''
217        self.request_version = ''
218        self.command = ''
219        self.send_error(414)
220        self.close_connection = 0
221        return
222      if not self.raw_requestline:
223        # This indicates that the socket has been closed by the client.
224        self.close_connection = 1
225        return
226
227      # self.parse_request() sets self.close_connection. There is no need to
228      # set the property after the method is executed, unless custom behavior
229      # is desired.
230      if not self.parse_request():
231        # An error code has been sent, just exit.
232        return
233
234      try:
235        response = None
236        request = self.get_archived_http_request()
237
238        if request is None:
239          self.send_error(500)
240          return
241        response = self.server.custom_handlers.handle(request)
242        if not response:
243          response = self.server.http_archive_fetch(request)
244        if response:
245          self.send_archived_http_response(response)
246        else:
247          self.send_error(404)
248      finally:
249        self.wfile.flush()  # Actually send the response if not already done.
250    finally:
251      request_time_ms = (time.time() - start_time) * 1000.0
252      self.server.total_request_time += request_time_ms
253      if request:
254        if response:
255          logging.debug('Served: %s (%dms)', request, request_time_ms)
256        else:
257          logging.warning('Failed to find response for: %s (%dms)',
258                          request, request_time_ms)
259      self.server.num_active_requests -= 1
260
261  def send_error(self, status, body=None):
262    """Override the default send error with a version that doesn't unnecessarily
263    close the connection.
264    """
265    response = httparchive.create_response(status, body=body)
266    self.send_archived_http_response(response)
267
268
269class HttpProxyServer(SocketServer.ThreadingMixIn,
270                      BaseHTTPServer.HTTPServer,
271                      daemonserver.DaemonServer):
272  HANDLER = HttpArchiveHandler
273
274  # Increase the request queue size. The default value, 5, is set in
275  # SocketServer.TCPServer (the parent of BaseHTTPServer.HTTPServer).
276  # Since we're intercepting many domains through this single server,
277  # it is quite possible to get more than 5 concurrent requests.
278  request_queue_size = 256
279
280  # The number of simultaneous connections that the HTTP server supports. This
281  # is primarily limited by system limits such as RLIMIT_NOFILE.
282  connection_limit = 500
283
284  # Allow sockets to be reused. See
285  # http://svn.python.org/projects/python/trunk/Lib/SocketServer.py for more
286  # details.
287  allow_reuse_address = True
288
289  # Don't prevent python from exiting when there is thread activity.
290  daemon_threads = True
291
292  def __init__(self, http_archive_fetch, custom_handlers, rules,
293               host='localhost', port=80, use_delays=False, is_ssl=False,
294               protocol='HTTP',
295               down_bandwidth='0', up_bandwidth='0', delay_ms='0'):
296    """Start HTTP server.
297
298    Args:
299      rules: a rule_parser Rules.
300      host: a host string (name or IP) for the web proxy.
301      port: a port string (e.g. '80') for the web proxy.
302      use_delays: if True, add response data delays during replay.
303      is_ssl: True iff proxy is using SSL.
304      up_bandwidth: Upload bandwidth
305      down_bandwidth: Download bandwidth
306           Bandwidths measured in [K|M]{bit/s|Byte/s}. '0' means unlimited.
307      delay_ms: Propagation delay in milliseconds. '0' means no delay.
308    """
309    if platformsettings.SupportsFdLimitControl():
310      # BaseHTTPServer opens a new thread and two fds for each connection.
311      # Check that the process can open at least 1000 fds.
312      soft_limit, hard_limit = platformsettings.GetFdLimit()
313      # Add some wiggle room since there are probably fds not associated with
314      # connections.
315      wiggle_room = 100
316      desired_limit = 2 * HttpProxyServer.connection_limit + wiggle_room
317      if soft_limit < desired_limit:
318        assert desired_limit <= hard_limit, (
319            'The hard limit for number of open files per process is %s which '
320            'is lower than the desired limit of %s.' %
321            (hard_limit, desired_limit))
322        platformsettings.AdjustFdLimit(desired_limit, hard_limit)
323
324    try:
325      BaseHTTPServer.HTTPServer.__init__(self, (host, port), self.HANDLER)
326    except Exception, e:
327      raise HttpProxyServerError('Could not start HTTPServer on port %d: %s' %
328                                 (port, e))
329    self.http_archive_fetch = http_archive_fetch
330    self.custom_handlers = custom_handlers
331    self.use_delays = use_delays
332    self.is_ssl = is_ssl
333    self.traffic_shaping_down_bps = proxyshaper.GetBitsPerSecond(down_bandwidth)
334    self.traffic_shaping_up_bps = proxyshaper.GetBitsPerSecond(up_bandwidth)
335    self.traffic_shaping_delay_ms = int(delay_ms)
336    self.num_active_requests = 0
337    self.num_active_connections = 0
338    self.total_request_time = 0
339    self.protocol = protocol
340    self.log_url = rules.Find('log_url')
341
342    # Note: This message may be scraped. Do not change it.
343    logging.warning(
344        '%s server started on %s:%d' % (self.protocol, self.server_address[0],
345                                        self.server_address[1]))
346
347  def cleanup(self):
348    try:
349      self.shutdown()
350      self.server_close()
351    except KeyboardInterrupt:
352      pass
353    logging.info('Stopped %s server. Total time processing requests: %dms',
354                 self.protocol, self.total_request_time)
355
356  def get_active_request_count(self):
357    return self.num_active_requests
358
359  def get_request(self):
360    self.num_active_connections += 1
361    if self.num_active_connections >= HttpProxyServer.connection_limit:
362      logging.error(
363          'Number of active connections (%s) surpasses the '
364          'supported limit of %s.' %
365          (self.num_active_connections, HttpProxyServer.connection_limit))
366    return BaseHTTPServer.HTTPServer.get_request(self)
367
368  def close_request(self, request):
369    BaseHTTPServer.HTTPServer.close_request(self, request)
370    self.num_active_connections -= 1
371
372
373class HttpsProxyServer(HttpProxyServer):
374  """SSL server that generates certs for each host."""
375
376  def __init__(self, http_archive_fetch, custom_handlers, rules,
377               https_root_ca_cert_path, **kwargs):
378    self.ca_cert_path = https_root_ca_cert_path
379    self.HANDLER = sslproxy.wrap_handler(HttpArchiveHandler)
380    HttpProxyServer.__init__(self, http_archive_fetch, custom_handlers, rules,
381                             is_ssl=True, protocol='HTTPS', **kwargs)
382    with open(self.ca_cert_path, 'r') as cert_file:
383      self._ca_cert_str = cert_file.read()
384    self._host_to_cert_map = {}
385    self._server_cert_to_cert_map = {}
386
387  def cleanup(self):
388    try:
389      self.shutdown()
390      self.server_close()
391    except KeyboardInterrupt:
392      pass
393
394  def get_certificate(self, host):
395    if host in self._host_to_cert_map:
396      return self._host_to_cert_map[host]
397
398    server_cert = self.http_archive_fetch.http_archive.get_server_cert(host)
399    if server_cert in self._server_cert_to_cert_map:
400      cert = self._server_cert_to_cert_map[server_cert]
401      self._host_to_cert_map[host] = cert
402      return cert
403
404    cert = certutils.generate_cert(self._ca_cert_str, server_cert, host)
405    self._server_cert_to_cert_map[server_cert] = cert
406    self._host_to_cert_map[host] = cert
407    return cert
408
409  def handle_error(self, request, client_address):
410    _HandleSSLCertificateError()
411
412
413class SingleCertHttpsProxyServer(HttpProxyServer):
414  """SSL server."""
415
416  def __init__(self, http_archive_fetch, custom_handlers, rules,
417               https_root_ca_cert_path, **kwargs):
418    HttpProxyServer.__init__(self, http_archive_fetch, custom_handlers, rules,
419                             is_ssl=True, protocol='HTTPS', **kwargs)
420    self.socket = ssl.wrap_socket(
421        self.socket, certfile=https_root_ca_cert_path, server_side=True,
422        do_handshake_on_connect=False)
423    # Ancestor class, DaemonServer, calls serve_forever() during its __init__.
424
425  def handle_error(self, request, client_address):
426    _HandleSSLCertificateError()
427
428
429class HttpToHttpsProxyServer(HttpProxyServer):
430  """Listens for HTTP requests but sends them to the target as HTTPS requests"""
431
432  def __init__(self, http_archive_fetch, custom_handlers, rules, **kwargs):
433    HttpProxyServer.__init__(self, http_archive_fetch, custom_handlers, rules,
434                             is_ssl=True, protocol='HTTP-to-HTTPS', **kwargs)
435
436  def handle_error(self, request, client_address):
437    _HandleSSLCertificateError()
438