• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2001-2010 Python Software Foundation
2# Author: Barry Warsaw
3# Contact: email-sig@python.org
4
5"""Miscellaneous utilities."""
6
7__all__ = [
8    'collapse_rfc2231_value',
9    'decode_params',
10    'decode_rfc2231',
11    'encode_rfc2231',
12    'formataddr',
13    'formatdate',
14    'format_datetime',
15    'getaddresses',
16    'make_msgid',
17    'mktime_tz',
18    'parseaddr',
19    'parsedate',
20    'parsedate_tz',
21    'parsedate_to_datetime',
22    'unquote',
23    ]
24
25import os
26import re
27import time
28import random
29import socket
30import datetime
31import urllib.parse
32
33from email._parseaddr import quote
34from email._parseaddr import AddressList as _AddressList
35from email._parseaddr import mktime_tz
36
37from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz
38
39# Intrapackage imports
40from email.charset import Charset
41
42COMMASPACE = ', '
43EMPTYSTRING = ''
44UEMPTYSTRING = ''
45CRLF = '\r\n'
46TICK = "'"
47
48specialsre = re.compile(r'[][\\()<>@,:;".]')
49escapesre = re.compile(r'[\\"]')
50
51
52def _has_surrogates(s):
53    """Return True if s contains surrogate-escaped binary data."""
54    # This check is based on the fact that unless there are surrogates, utf8
55    # (Python's default encoding) can encode any string.  This is the fastest
56    # way to check for surrogates, see issue 11454 for timings.
57    try:
58        s.encode()
59        return False
60    except UnicodeEncodeError:
61        return True
62
63# How to deal with a string containing bytes before handing it to the
64# application through the 'normal' interface.
65def _sanitize(string):
66    # Turn any escaped bytes into unicode 'unknown' char.  If the escaped
67    # bytes happen to be utf-8 they will instead get decoded, even if they
68    # were invalid in the charset the source was supposed to be in.  This
69    # seems like it is not a bad thing; a defect was still registered.
70    original_bytes = string.encode('utf-8', 'surrogateescape')
71    return original_bytes.decode('utf-8', 'replace')
72
73
74
75# Helpers
76
77def formataddr(pair, charset='utf-8'):
78    """The inverse of parseaddr(), this takes a 2-tuple of the form
79    (realname, email_address) and returns the string value suitable
80    for an RFC 2822 From, To or Cc header.
81
82    If the first element of pair is false, then the second element is
83    returned unmodified.
84
85    The optional charset is the character set that is used to encode
86    realname in case realname is not ASCII safe.  Can be an instance of str or
87    a Charset-like object which has a header_encode method.  Default is
88    'utf-8'.
89    """
90    name, address = pair
91    # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't.
92    address.encode('ascii')
93    if name:
94        try:
95            name.encode('ascii')
96        except UnicodeEncodeError:
97            if isinstance(charset, str):
98                charset = Charset(charset)
99            encoded_name = charset.header_encode(name)
100            return "%s <%s>" % (encoded_name, address)
101        else:
102            quotes = ''
103            if specialsre.search(name):
104                quotes = '"'
105            name = escapesre.sub(r'\\\g<0>', name)
106            return '%s%s%s <%s>' % (quotes, name, quotes, address)
107    return address
108
109
110def _iter_escaped_chars(addr):
111    pos = 0
112    escape = False
113    for pos, ch in enumerate(addr):
114        if escape:
115            yield (pos, '\\' + ch)
116            escape = False
117        elif ch == '\\':
118            escape = True
119        else:
120            yield (pos, ch)
121    if escape:
122        yield (pos, '\\')
123
124
125def _strip_quoted_realnames(addr):
126    """Strip real names between quotes."""
127    if '"' not in addr:
128        # Fast path
129        return addr
130
131    start = 0
132    open_pos = None
133    result = []
134    for pos, ch in _iter_escaped_chars(addr):
135        if ch == '"':
136            if open_pos is None:
137                open_pos = pos
138            else:
139                if start != open_pos:
140                    result.append(addr[start:open_pos])
141                start = pos + 1
142                open_pos = None
143
144    if start < len(addr):
145        result.append(addr[start:])
146
147    return ''.join(result)
148
149
150supports_strict_parsing = True
151
152def getaddresses(fieldvalues, *, strict=True):
153    """Return a list of (REALNAME, EMAIL) or ('','') for each fieldvalue.
154    When parsing fails for a fieldvalue, a 2-tuple of ('', '') is returned in
155    its place.
156    If strict is true, use a strict parser which rejects malformed inputs.
157    """
158
159    # If strict is true, if the resulting list of parsed addresses is greater
160    # than the number of fieldvalues in the input list, a parsing error has
161    # occurred and consequently a list containing a single empty 2-tuple [('',
162    # '')] is returned in its place. This is done to avoid invalid output.
163    #
164    # Malformed input: getaddresses(['alice@example.com <bob@example.com>'])
165    # Invalid output: [('', 'alice@example.com'), ('', 'bob@example.com')]
166    # Safe output: [('', '')]
167
168    if not strict:
169        all = COMMASPACE.join(str(v) for v in fieldvalues)
170        a = _AddressList(all)
171        return a.addresslist
172
173    fieldvalues = [str(v) for v in fieldvalues]
174    fieldvalues = _pre_parse_validation(fieldvalues)
175    addr = COMMASPACE.join(fieldvalues)
176    a = _AddressList(addr)
177    result = _post_parse_validation(a.addresslist)
178
179    # Treat output as invalid if the number of addresses is not equal to the
180    # expected number of addresses.
181    n = 0
182    for v in fieldvalues:
183        # When a comma is used in the Real Name part it is not a deliminator.
184        # So strip those out before counting the commas.
185        v = _strip_quoted_realnames(v)
186        # Expected number of addresses: 1 + number of commas
187        n += 1 + v.count(',')
188    if len(result) != n:
189        return [('', '')]
190
191    return result
192
193
194def _check_parenthesis(addr):
195    # Ignore parenthesis in quoted real names.
196    addr = _strip_quoted_realnames(addr)
197
198    opens = 0
199    for pos, ch in _iter_escaped_chars(addr):
200        if ch == '(':
201            opens += 1
202        elif ch == ')':
203            opens -= 1
204            if opens < 0:
205                return False
206    return (opens == 0)
207
208
209def _pre_parse_validation(email_header_fields):
210    accepted_values = []
211    for v in email_header_fields:
212        if not _check_parenthesis(v):
213            v = "('', '')"
214        accepted_values.append(v)
215
216    return accepted_values
217
218
219def _post_parse_validation(parsed_email_header_tuples):
220    accepted_values = []
221    # The parser would have parsed a correctly formatted domain-literal
222    # The existence of an [ after parsing indicates a parsing failure
223    for v in parsed_email_header_tuples:
224        if '[' in v[1]:
225            v = ('', '')
226        accepted_values.append(v)
227
228    return accepted_values
229
230
231def _format_timetuple_and_zone(timetuple, zone):
232    return '%s, %02d %s %04d %02d:%02d:%02d %s' % (
233        ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]],
234        timetuple[2],
235        ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
236         'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1],
237        timetuple[0], timetuple[3], timetuple[4], timetuple[5],
238        zone)
239
240def formatdate(timeval=None, localtime=False, usegmt=False):
241    """Returns a date string as specified by RFC 2822, e.g.:
242
243    Fri, 09 Nov 2001 01:08:47 -0000
244
245    Optional timeval if given is a floating point time value as accepted by
246    gmtime() and localtime(), otherwise the current time is used.
247
248    Optional localtime is a flag that when True, interprets timeval, and
249    returns a date relative to the local timezone instead of UTC, properly
250    taking daylight savings time into account.
251
252    Optional argument usegmt means that the timezone is written out as
253    an ascii string, not numeric one (so "GMT" instead of "+0000"). This
254    is needed for HTTP, and is only used when localtime==False.
255    """
256    # Note: we cannot use strftime() because that honors the locale and RFC
257    # 2822 requires that day and month names be the English abbreviations.
258    if timeval is None:
259        timeval = time.time()
260    if localtime or usegmt:
261        dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc)
262    else:
263        dt = datetime.datetime.utcfromtimestamp(timeval)
264    if localtime:
265        dt = dt.astimezone()
266        usegmt = False
267    return format_datetime(dt, usegmt)
268
269def format_datetime(dt, usegmt=False):
270    """Turn a datetime into a date string as specified in RFC 2822.
271
272    If usegmt is True, dt must be an aware datetime with an offset of zero.  In
273    this case 'GMT' will be rendered instead of the normal +0000 required by
274    RFC2822.  This is to support HTTP headers involving date stamps.
275    """
276    now = dt.timetuple()
277    if usegmt:
278        if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc:
279            raise ValueError("usegmt option requires a UTC datetime")
280        zone = 'GMT'
281    elif dt.tzinfo is None:
282        zone = '-0000'
283    else:
284        zone = dt.strftime("%z")
285    return _format_timetuple_and_zone(now, zone)
286
287
288def make_msgid(idstring=None, domain=None):
289    """Returns a string suitable for RFC 2822 compliant Message-ID, e.g:
290
291    <142480216486.20800.16526388040877946887@nightshade.la.mastaler.com>
292
293    Optional idstring if given is a string used to strengthen the
294    uniqueness of the message id.  Optional domain if given provides the
295    portion of the message id after the '@'.  It defaults to the locally
296    defined hostname.
297    """
298    timeval = int(time.time()*100)
299    pid = os.getpid()
300    randint = random.getrandbits(64)
301    if idstring is None:
302        idstring = ''
303    else:
304        idstring = '.' + idstring
305    if domain is None:
306        domain = socket.getfqdn()
307    msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain)
308    return msgid
309
310
311def parsedate_to_datetime(data):
312    parsed_date_tz = _parsedate_tz(data)
313    if parsed_date_tz is None:
314        raise ValueError('Invalid date value or format "%s"' % str(data))
315    *dtuple, tz = parsed_date_tz
316    if tz is None:
317        return datetime.datetime(*dtuple[:6])
318    return datetime.datetime(*dtuple[:6],
319            tzinfo=datetime.timezone(datetime.timedelta(seconds=tz)))
320
321
322def parseaddr(addr, *, strict=True):
323    """
324    Parse addr into its constituent realname and email address parts.
325
326    Return a tuple of realname and email address, unless the parse fails, in
327    which case return a 2-tuple of ('', '').
328
329    If strict is True, use a strict parser which rejects malformed inputs.
330    """
331    if not strict:
332        addrs = _AddressList(addr).addresslist
333        if not addrs:
334            return ('', '')
335        return addrs[0]
336
337    if isinstance(addr, list):
338        addr = addr[0]
339
340    if not isinstance(addr, str):
341        return ('', '')
342
343    addr = _pre_parse_validation([addr])[0]
344    addrs = _post_parse_validation(_AddressList(addr).addresslist)
345
346    if not addrs or len(addrs) > 1:
347        return ('', '')
348
349    return addrs[0]
350
351
352# rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
353def unquote(str):
354    """Remove quotes from a string."""
355    if len(str) > 1:
356        if str.startswith('"') and str.endswith('"'):
357            return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
358        if str.startswith('<') and str.endswith('>'):
359            return str[1:-1]
360    return str
361
362
363
364# RFC2231-related functions - parameter encoding and decoding
365def decode_rfc2231(s):
366    """Decode string according to RFC 2231"""
367    parts = s.split(TICK, 2)
368    if len(parts) <= 2:
369        return None, None, s
370    return parts
371
372
373def encode_rfc2231(s, charset=None, language=None):
374    """Encode string according to RFC 2231.
375
376    If neither charset nor language is given, then s is returned as-is.  If
377    charset is given but not language, the string is encoded using the empty
378    string for language.
379    """
380    s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii')
381    if charset is None and language is None:
382        return s
383    if language is None:
384        language = ''
385    return "%s'%s'%s" % (charset, language, s)
386
387
388rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$',
389    re.ASCII)
390
391def decode_params(params):
392    """Decode parameters list according to RFC 2231.
393
394    params is a sequence of 2-tuples containing (param name, string value).
395    """
396    new_params = [params[0]]
397    # Map parameter's name to a list of continuations.  The values are a
398    # 3-tuple of the continuation number, the string value, and a flag
399    # specifying whether a particular segment is %-encoded.
400    rfc2231_params = {}
401    for name, value in params[1:]:
402        encoded = name.endswith('*')
403        value = unquote(value)
404        mo = rfc2231_continuation.match(name)
405        if mo:
406            name, num = mo.group('name', 'num')
407            if num is not None:
408                num = int(num)
409            rfc2231_params.setdefault(name, []).append((num, value, encoded))
410        else:
411            new_params.append((name, '"%s"' % quote(value)))
412    if rfc2231_params:
413        for name, continuations in rfc2231_params.items():
414            value = []
415            extended = False
416            # Sort by number
417            continuations.sort()
418            # And now append all values in numerical order, converting
419            # %-encodings for the encoded segments.  If any of the
420            # continuation names ends in a *, then the entire string, after
421            # decoding segments and concatenating, must have the charset and
422            # language specifiers at the beginning of the string.
423            for num, s, encoded in continuations:
424                if encoded:
425                    # Decode as "latin-1", so the characters in s directly
426                    # represent the percent-encoded octet values.
427                    # collapse_rfc2231_value treats this as an octet sequence.
428                    s = urllib.parse.unquote(s, encoding="latin-1")
429                    extended = True
430                value.append(s)
431            value = quote(EMPTYSTRING.join(value))
432            if extended:
433                charset, language, value = decode_rfc2231(value)
434                new_params.append((name, (charset, language, '"%s"' % value)))
435            else:
436                new_params.append((name, '"%s"' % value))
437    return new_params
438
439def collapse_rfc2231_value(value, errors='replace',
440                           fallback_charset='us-ascii'):
441    if not isinstance(value, tuple) or len(value) != 3:
442        return unquote(value)
443    # While value comes to us as a unicode string, we need it to be a bytes
444    # object.  We do not want bytes() normal utf-8 decoder, we want a straight
445    # interpretation of the string as character bytes.
446    charset, language, text = value
447    if charset is None:
448        # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse
449        # the value, so use the fallback_charset.
450        charset = fallback_charset
451    rawbytes = bytes(text, 'raw-unicode-escape')
452    try:
453        return str(rawbytes, charset, errors)
454    except LookupError:
455        # charset is not a known codec.
456        return unquote(text)
457
458
459#
460# datetime doesn't provide a localtime function yet, so provide one.  Code
461# adapted from the patch in issue 9527.  This may not be perfect, but it is
462# better than not having it.
463#
464
465def localtime(dt=None, isdst=-1):
466    """Return local time as an aware datetime object.
467
468    If called without arguments, return current time.  Otherwise *dt*
469    argument should be a datetime instance, and it is converted to the
470    local time zone according to the system time zone database.  If *dt* is
471    naive (that is, dt.tzinfo is None), it is assumed to be in local time.
472    In this case, a positive or zero value for *isdst* causes localtime to
473    presume initially that summer time (for example, Daylight Saving Time)
474    is or is not (respectively) in effect for the specified time.  A
475    negative value for *isdst* causes the localtime() function to attempt
476    to divine whether summer time is in effect for the specified time.
477
478    """
479    if dt is None:
480        return datetime.datetime.now(datetime.timezone.utc).astimezone()
481    if dt.tzinfo is not None:
482        return dt.astimezone()
483    # We have a naive datetime.  Convert to a (localtime) timetuple and pass to
484    # system mktime together with the isdst hint.  System mktime will return
485    # seconds since epoch.
486    tm = dt.timetuple()[:-1] + (isdst,)
487    seconds = time.mktime(tm)
488    localtm = time.localtime(seconds)
489    try:
490        delta = datetime.timedelta(seconds=localtm.tm_gmtoff)
491        tz = datetime.timezone(delta, localtm.tm_zone)
492    except AttributeError:
493        # Compute UTC offset and compare with the value implied by tm_isdst.
494        # If the values match, use the zone name implied by tm_isdst.
495        delta = dt - datetime.datetime(*time.gmtime(seconds)[:6])
496        dst = time.daylight and localtm.tm_isdst > 0
497        gmtoff = -(time.altzone if dst else time.timezone)
498        if delta == datetime.timedelta(seconds=gmtoff):
499            tz = datetime.timezone(delta, time.tzname[dst])
500        else:
501            tz = datetime.timezone(delta)
502    return dt.replace(tzinfo=tz)
503