• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2001-2010 Python Software Foundation
2# Author: Barry Warsaw
3# Contact: email-sig@python.org
4
5"""Miscellaneous utilities."""
6
7__all__ = [
8    'collapse_rfc2231_value',
9    'decode_params',
10    'decode_rfc2231',
11    'encode_rfc2231',
12    'formataddr',
13    'formatdate',
14    'format_datetime',
15    'getaddresses',
16    'make_msgid',
17    'mktime_tz',
18    'parseaddr',
19    'parsedate',
20    'parsedate_tz',
21    'parsedate_to_datetime',
22    'unquote',
23    ]
24
25import os
26import re
27import time
28import random
29import socket
30import datetime
31import urllib.parse
32
33from email._parseaddr import quote
34from email._parseaddr import AddressList as _AddressList
35from email._parseaddr import mktime_tz
36
37from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz
38
39# Intrapackage imports
40from email.charset import Charset
41
42COMMASPACE = ', '
43EMPTYSTRING = ''
44UEMPTYSTRING = ''
45CRLF = '\r\n'
46TICK = "'"
47
48specialsre = re.compile(r'[][\\()<>@,:;".]')
49escapesre = re.compile(r'[\\"]')
50
51def _has_surrogates(s):
52    """Return True if s contains surrogate-escaped binary data."""
53    # This check is based on the fact that unless there are surrogates, utf8
54    # (Python's default encoding) can encode any string.  This is the fastest
55    # way to check for surrogates, see issue 11454 for timings.
56    try:
57        s.encode()
58        return False
59    except UnicodeEncodeError:
60        return True
61
62# How to deal with a string containing bytes before handing it to the
63# application through the 'normal' interface.
64def _sanitize(string):
65    # Turn any escaped bytes into unicode 'unknown' char.  If the escaped
66    # bytes happen to be utf-8 they will instead get decoded, even if they
67    # were invalid in the charset the source was supposed to be in.  This
68    # seems like it is not a bad thing; a defect was still registered.
69    original_bytes = string.encode('utf-8', 'surrogateescape')
70    return original_bytes.decode('utf-8', 'replace')
71
72
73
74# Helpers
75
76def formataddr(pair, charset='utf-8'):
77    """The inverse of parseaddr(), this takes a 2-tuple of the form
78    (realname, email_address) and returns the string value suitable
79    for an RFC 2822 From, To or Cc header.
80
81    If the first element of pair is false, then the second element is
82    returned unmodified.
83
84    Optional charset if given is the character set that is used to encode
85    realname in case realname is not ASCII safe.  Can be an instance of str or
86    a Charset-like object which has a header_encode method.  Default is
87    'utf-8'.
88    """
89    name, address = pair
90    # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't.
91    address.encode('ascii')
92    if name:
93        try:
94            name.encode('ascii')
95        except UnicodeEncodeError:
96            if isinstance(charset, str):
97                charset = Charset(charset)
98            encoded_name = charset.header_encode(name)
99            return "%s <%s>" % (encoded_name, address)
100        else:
101            quotes = ''
102            if specialsre.search(name):
103                quotes = '"'
104            name = escapesre.sub(r'\\\g<0>', name)
105            return '%s%s%s <%s>' % (quotes, name, quotes, address)
106    return address
107
108
109
110def getaddresses(fieldvalues):
111    """Return a list of (REALNAME, EMAIL) for each fieldvalue."""
112    all = COMMASPACE.join(fieldvalues)
113    a = _AddressList(all)
114    return a.addresslist
115
116
117def _format_timetuple_and_zone(timetuple, zone):
118    return '%s, %02d %s %04d %02d:%02d:%02d %s' % (
119        ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]],
120        timetuple[2],
121        ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
122         'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1],
123        timetuple[0], timetuple[3], timetuple[4], timetuple[5],
124        zone)
125
126def formatdate(timeval=None, localtime=False, usegmt=False):
127    """Returns a date string as specified by RFC 2822, e.g.:
128
129    Fri, 09 Nov 2001 01:08:47 -0000
130
131    Optional timeval if given is a floating point time value as accepted by
132    gmtime() and localtime(), otherwise the current time is used.
133
134    Optional localtime is a flag that when True, interprets timeval, and
135    returns a date relative to the local timezone instead of UTC, properly
136    taking daylight savings time into account.
137
138    Optional argument usegmt means that the timezone is written out as
139    an ascii string, not numeric one (so "GMT" instead of "+0000"). This
140    is needed for HTTP, and is only used when localtime==False.
141    """
142    # Note: we cannot use strftime() because that honors the locale and RFC
143    # 2822 requires that day and month names be the English abbreviations.
144    if timeval is None:
145        timeval = time.time()
146    if localtime or usegmt:
147        dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc)
148    else:
149        dt = datetime.datetime.utcfromtimestamp(timeval)
150    if localtime:
151        dt = dt.astimezone()
152        usegmt = False
153    return format_datetime(dt, usegmt)
154
155def format_datetime(dt, usegmt=False):
156    """Turn a datetime into a date string as specified in RFC 2822.
157
158    If usegmt is True, dt must be an aware datetime with an offset of zero.  In
159    this case 'GMT' will be rendered instead of the normal +0000 required by
160    RFC2822.  This is to support HTTP headers involving date stamps.
161    """
162    now = dt.timetuple()
163    if usegmt:
164        if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc:
165            raise ValueError("usegmt option requires a UTC datetime")
166        zone = 'GMT'
167    elif dt.tzinfo is None:
168        zone = '-0000'
169    else:
170        zone = dt.strftime("%z")
171    return _format_timetuple_and_zone(now, zone)
172
173
174def make_msgid(idstring=None, domain=None):
175    """Returns a string suitable for RFC 2822 compliant Message-ID, e.g:
176
177    <142480216486.20800.16526388040877946887@nightshade.la.mastaler.com>
178
179    Optional idstring if given is a string used to strengthen the
180    uniqueness of the message id.  Optional domain if given provides the
181    portion of the message id after the '@'.  It defaults to the locally
182    defined hostname.
183    """
184    timeval = int(time.time()*100)
185    pid = os.getpid()
186    randint = random.getrandbits(64)
187    if idstring is None:
188        idstring = ''
189    else:
190        idstring = '.' + idstring
191    if domain is None:
192        domain = socket.getfqdn()
193    msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain)
194    return msgid
195
196
197def parsedate_to_datetime(data):
198    *dtuple, tz = _parsedate_tz(data)
199    if tz is None:
200        return datetime.datetime(*dtuple[:6])
201    return datetime.datetime(*dtuple[:6],
202            tzinfo=datetime.timezone(datetime.timedelta(seconds=tz)))
203
204
205def parseaddr(addr):
206    """
207    Parse addr into its constituent realname and email address parts.
208
209    Return a tuple of realname and email address, unless the parse fails, in
210    which case return a 2-tuple of ('', '').
211    """
212    addrs = _AddressList(addr).addresslist
213    if not addrs:
214        return '', ''
215    return addrs[0]
216
217
218# rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
219def unquote(str):
220    """Remove quotes from a string."""
221    if len(str) > 1:
222        if str.startswith('"') and str.endswith('"'):
223            return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
224        if str.startswith('<') and str.endswith('>'):
225            return str[1:-1]
226    return str
227
228
229
230# RFC2231-related functions - parameter encoding and decoding
231def decode_rfc2231(s):
232    """Decode string according to RFC 2231"""
233    parts = s.split(TICK, 2)
234    if len(parts) <= 2:
235        return None, None, s
236    return parts
237
238
239def encode_rfc2231(s, charset=None, language=None):
240    """Encode string according to RFC 2231.
241
242    If neither charset nor language is given, then s is returned as-is.  If
243    charset is given but not language, the string is encoded using the empty
244    string for language.
245    """
246    s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii')
247    if charset is None and language is None:
248        return s
249    if language is None:
250        language = ''
251    return "%s'%s'%s" % (charset, language, s)
252
253
254rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$',
255    re.ASCII)
256
257def decode_params(params):
258    """Decode parameters list according to RFC 2231.
259
260    params is a sequence of 2-tuples containing (param name, string value).
261    """
262    # Copy params so we don't mess with the original
263    params = params[:]
264    new_params = []
265    # Map parameter's name to a list of continuations.  The values are a
266    # 3-tuple of the continuation number, the string value, and a flag
267    # specifying whether a particular segment is %-encoded.
268    rfc2231_params = {}
269    name, value = params.pop(0)
270    new_params.append((name, value))
271    while params:
272        name, value = params.pop(0)
273        if name.endswith('*'):
274            encoded = True
275        else:
276            encoded = False
277        value = unquote(value)
278        mo = rfc2231_continuation.match(name)
279        if mo:
280            name, num = mo.group('name', 'num')
281            if num is not None:
282                num = int(num)
283            rfc2231_params.setdefault(name, []).append((num, value, encoded))
284        else:
285            new_params.append((name, '"%s"' % quote(value)))
286    if rfc2231_params:
287        for name, continuations in rfc2231_params.items():
288            value = []
289            extended = False
290            # Sort by number
291            continuations.sort()
292            # And now append all values in numerical order, converting
293            # %-encodings for the encoded segments.  If any of the
294            # continuation names ends in a *, then the entire string, after
295            # decoding segments and concatenating, must have the charset and
296            # language specifiers at the beginning of the string.
297            for num, s, encoded in continuations:
298                if encoded:
299                    # Decode as "latin-1", so the characters in s directly
300                    # represent the percent-encoded octet values.
301                    # collapse_rfc2231_value treats this as an octet sequence.
302                    s = urllib.parse.unquote(s, encoding="latin-1")
303                    extended = True
304                value.append(s)
305            value = quote(EMPTYSTRING.join(value))
306            if extended:
307                charset, language, value = decode_rfc2231(value)
308                new_params.append((name, (charset, language, '"%s"' % value)))
309            else:
310                new_params.append((name, '"%s"' % value))
311    return new_params
312
313def collapse_rfc2231_value(value, errors='replace',
314                           fallback_charset='us-ascii'):
315    if not isinstance(value, tuple) or len(value) != 3:
316        return unquote(value)
317    # While value comes to us as a unicode string, we need it to be a bytes
318    # object.  We do not want bytes() normal utf-8 decoder, we want a straight
319    # interpretation of the string as character bytes.
320    charset, language, text = value
321    if charset is None:
322        # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse
323        # the value, so use the fallback_charset.
324        charset = fallback_charset
325    rawbytes = bytes(text, 'raw-unicode-escape')
326    try:
327        return str(rawbytes, charset, errors)
328    except LookupError:
329        # charset is not a known codec.
330        return unquote(text)
331
332
333#
334# datetime doesn't provide a localtime function yet, so provide one.  Code
335# adapted from the patch in issue 9527.  This may not be perfect, but it is
336# better than not having it.
337#
338
339def localtime(dt=None, isdst=-1):
340    """Return local time as an aware datetime object.
341
342    If called without arguments, return current time.  Otherwise *dt*
343    argument should be a datetime instance, and it is converted to the
344    local time zone according to the system time zone database.  If *dt* is
345    naive (that is, dt.tzinfo is None), it is assumed to be in local time.
346    In this case, a positive or zero value for *isdst* causes localtime to
347    presume initially that summer time (for example, Daylight Saving Time)
348    is or is not (respectively) in effect for the specified time.  A
349    negative value for *isdst* causes the localtime() function to attempt
350    to divine whether summer time is in effect for the specified time.
351
352    """
353    if dt is None:
354        return datetime.datetime.now(datetime.timezone.utc).astimezone()
355    if dt.tzinfo is not None:
356        return dt.astimezone()
357    # We have a naive datetime.  Convert to a (localtime) timetuple and pass to
358    # system mktime together with the isdst hint.  System mktime will return
359    # seconds since epoch.
360    tm = dt.timetuple()[:-1] + (isdst,)
361    seconds = time.mktime(tm)
362    localtm = time.localtime(seconds)
363    try:
364        delta = datetime.timedelta(seconds=localtm.tm_gmtoff)
365        tz = datetime.timezone(delta, localtm.tm_zone)
366    except AttributeError:
367        # Compute UTC offset and compare with the value implied by tm_isdst.
368        # If the values match, use the zone name implied by tm_isdst.
369        delta = dt - datetime.datetime(*time.gmtime(seconds)[:6])
370        dst = time.daylight and localtm.tm_isdst > 0
371        gmtoff = -(time.altzone if dst else time.timezone)
372        if delta == datetime.timedelta(seconds=gmtoff):
373            tz = datetime.timezone(delta, time.tzname[dst])
374        else:
375            tz = datetime.timezone(delta)
376    return dt.replace(tzinfo=tz)
377