# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Error handler middleware
"""
import sys
import traceback
import cgi
from six.moves import cStringIO as StringIO
from paste.exceptions import formatter, collector, reporter
from paste import wsgilib
from paste import request
import six
__all__ = ['ErrorMiddleware', 'handle_exception']
class _NoDefault(object):
def __repr__(self):
return ' Additionally an error occurred while sending the %s report:
Error in .close():
%s'
% close_response)
if not self.start_checker.response_started:
self.start_checker('500 Internal Server Error',
[('content-type', 'text/html')],
exc_info)
if six.PY3:
response = response.encode('utf8')
return response
__next__ = next
def close(self):
# This should at least print something to stderr if the
# close method fails at this point
if not self.closed:
self._close()
def _close(self):
"""Close and return any error message"""
if not hasattr(self.app_iterable, 'close'):
return None
try:
self.app_iterable.close()
return None
except:
close_response = self.error_middleware.exception_handler(
sys.exc_info(), self.environ)
return close_response
class Supplement(object):
"""
This is a supplement used to display standard WSGI information in
the traceback.
"""
def __init__(self, middleware, environ):
self.middleware = middleware
self.environ = environ
self.source_url = request.construct_url(environ)
def extraData(self):
data = {}
cgi_vars = data[('extra', 'CGI Variables')] = {}
wsgi_vars = data[('extra', 'WSGI Variables')] = {}
hide_vars = ['paste.config', 'wsgi.errors', 'wsgi.input',
'wsgi.multithread', 'wsgi.multiprocess',
'wsgi.run_once', 'wsgi.version',
'wsgi.url_scheme']
for name, value in self.environ.items():
if name.upper() == name:
if value:
cgi_vars[name] = value
elif name not in hide_vars:
wsgi_vars[name] = value
if self.environ['wsgi.version'] != (1, 0):
wsgi_vars['wsgi.version'] = self.environ['wsgi.version']
proc_desc = tuple([int(bool(self.environ[key]))
for key in ('wsgi.multiprocess',
'wsgi.multithread',
'wsgi.run_once')])
wsgi_vars['wsgi process'] = self.process_combos[proc_desc]
wsgi_vars['application'] = self.middleware.application
if 'paste.config' in self.environ:
data[('extra', 'Configuration')] = dict(self.environ['paste.config'])
return data
process_combos = {
# multiprocess, multithread, run_once
(0, 0, 0): 'Non-concurrent server',
(0, 1, 0): 'Multithreaded',
(1, 0, 0): 'Multiprocess',
(1, 1, 0): 'Multi process AND threads (?)',
(0, 0, 1): 'Non-concurrent CGI',
(0, 1, 1): 'Multithread CGI (?)',
(1, 0, 1): 'CGI',
(1, 1, 1): 'Multi thread/process CGI (?)',
}
def handle_exception(exc_info, error_stream, html=True,
debug_mode=False,
error_email=None,
error_log=None,
show_exceptions_in_wsgi_errors=False,
error_email_from='errors@localhost',
smtp_server='localhost',
smtp_username=None,
smtp_password=None,
smtp_use_tls=False,
error_subject_prefix='',
error_message=None,
simple_html_error=False,
):
"""
For exception handling outside of a web context
Use like::
import sys
from paste.exceptions.errormiddleware import handle_exception
try:
do stuff
except:
handle_exception(
sys.exc_info(), sys.stderr, html=False, ...other config...)
If you want to report, but not fully catch the exception, call
``raise`` after ``handle_exception``, which (when given no argument)
will reraise the exception.
"""
reported = False
exc_data = collector.collect_exception(*exc_info)
extra_data = ''
if error_email:
rep = reporter.EmailReporter(
to_addresses=error_email,
from_address=error_email_from,
smtp_server=smtp_server,
smtp_username=smtp_username,
smtp_password=smtp_password,
smtp_use_tls=smtp_use_tls,
subject_prefix=error_subject_prefix)
rep_err = send_report(rep, exc_data, html=html)
if rep_err:
extra_data += rep_err
else:
reported = True
if error_log:
rep = reporter.LogReporter(
filename=error_log)
rep_err = send_report(rep, exc_data, html=html)
if rep_err:
extra_data += rep_err
else:
reported = True
if show_exceptions_in_wsgi_errors:
rep = reporter.FileReporter(
file=error_stream)
rep_err = send_report(rep, exc_data, html=html)
if rep_err:
extra_data += rep_err
else:
reported = True
else:
line = ('Error - %s: %s\n'
% (exc_data.exception_type, exc_data.exception_value))
if six.PY3:
line = line.encode('utf8')
error_stream.write(line)
if html:
if debug_mode and simple_html_error:
return_error = formatter.format_html(
exc_data, include_hidden_frames=False,
include_reusable=False, show_extra_data=False)
reported = True
elif debug_mode and not simple_html_error:
error_html = formatter.format_html(
exc_data,
include_hidden_frames=True,
include_reusable=False)
head_html = formatter.error_css + formatter.hide_display_js
return_error = error_template(
head_html, error_html, extra_data)
extra_data = ''
reported = True
else:
msg = error_message or '''
An error occurred. See the error logs for more information.
(Turn debug on to display exception reports here)
'''
return_error = error_template('', msg, '')
else:
return_error = None
if not reported and error_stream:
err_report = formatter.format_text(exc_data, show_hidden_frames=True)
err_report += '\n' + '-'*60 + '\n'
error_stream.write(err_report)
if extra_data:
error_stream.write(extra_data)
return return_error
def send_report(rep, exc_data, html=True):
try:
rep.report(exc_data)
except:
output = StringIO()
traceback.print_exc(file=output)
if html:
return """
%s