1# Copyright (C) 2001-2010 Python Software Foundation 2# Author: Barry Warsaw 3# Contact: email-sig@python.org 4 5"""Miscellaneous utilities.""" 6 7__all__ = [ 8 'collapse_rfc2231_value', 9 'decode_params', 10 'decode_rfc2231', 11 'encode_rfc2231', 12 'formataddr', 13 'formatdate', 14 'format_datetime', 15 'getaddresses', 16 'make_msgid', 17 'mktime_tz', 18 'parseaddr', 19 'parsedate', 20 'parsedate_tz', 21 'parsedate_to_datetime', 22 'unquote', 23 ] 24 25import os 26import re 27import time 28import random 29import socket 30import datetime 31import urllib.parse 32 33from email._parseaddr import quote 34from email._parseaddr import AddressList as _AddressList 35from email._parseaddr import mktime_tz 36 37from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz 38 39# Intrapackage imports 40from email.charset import Charset 41 42COMMASPACE = ', ' 43EMPTYSTRING = '' 44UEMPTYSTRING = '' 45CRLF = '\r\n' 46TICK = "'" 47 48specialsre = re.compile(r'[][\\()<>@,:;".]') 49escapesre = re.compile(r'[\\"]') 50 51 52def _has_surrogates(s): 53 """Return True if s contains surrogate-escaped binary data.""" 54 # This check is based on the fact that unless there are surrogates, utf8 55 # (Python's default encoding) can encode any string. This is the fastest 56 # way to check for surrogates, see issue 11454 for timings. 57 try: 58 s.encode() 59 return False 60 except UnicodeEncodeError: 61 return True 62 63# How to deal with a string containing bytes before handing it to the 64# application through the 'normal' interface. 65def _sanitize(string): 66 # Turn any escaped bytes into unicode 'unknown' char. If the escaped 67 # bytes happen to be utf-8 they will instead get decoded, even if they 68 # were invalid in the charset the source was supposed to be in. This 69 # seems like it is not a bad thing; a defect was still registered. 70 original_bytes = string.encode('utf-8', 'surrogateescape') 71 return original_bytes.decode('utf-8', 'replace') 72 73 74 75# Helpers 76 77def formataddr(pair, charset='utf-8'): 78 """The inverse of parseaddr(), this takes a 2-tuple of the form 79 (realname, email_address) and returns the string value suitable 80 for an RFC 2822 From, To or Cc header. 81 82 If the first element of pair is false, then the second element is 83 returned unmodified. 84 85 The optional charset is the character set that is used to encode 86 realname in case realname is not ASCII safe. Can be an instance of str or 87 a Charset-like object which has a header_encode method. Default is 88 'utf-8'. 89 """ 90 name, address = pair 91 # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't. 92 address.encode('ascii') 93 if name: 94 try: 95 name.encode('ascii') 96 except UnicodeEncodeError: 97 if isinstance(charset, str): 98 charset = Charset(charset) 99 encoded_name = charset.header_encode(name) 100 return "%s <%s>" % (encoded_name, address) 101 else: 102 quotes = '' 103 if specialsre.search(name): 104 quotes = '"' 105 name = escapesre.sub(r'\\\g<0>', name) 106 return '%s%s%s <%s>' % (quotes, name, quotes, address) 107 return address 108 109 110def _iter_escaped_chars(addr): 111 pos = 0 112 escape = False 113 for pos, ch in enumerate(addr): 114 if escape: 115 yield (pos, '\\' + ch) 116 escape = False 117 elif ch == '\\': 118 escape = True 119 else: 120 yield (pos, ch) 121 if escape: 122 yield (pos, '\\') 123 124 125def _strip_quoted_realnames(addr): 126 """Strip real names between quotes.""" 127 if '"' not in addr: 128 # Fast path 129 return addr 130 131 start = 0 132 open_pos = None 133 result = [] 134 for pos, ch in _iter_escaped_chars(addr): 135 if ch == '"': 136 if open_pos is None: 137 open_pos = pos 138 else: 139 if start != open_pos: 140 result.append(addr[start:open_pos]) 141 start = pos + 1 142 open_pos = None 143 144 if start < len(addr): 145 result.append(addr[start:]) 146 147 return ''.join(result) 148 149 150supports_strict_parsing = True 151 152def getaddresses(fieldvalues, *, strict=True): 153 """Return a list of (REALNAME, EMAIL) or ('','') for each fieldvalue. 154 When parsing fails for a fieldvalue, a 2-tuple of ('', '') is returned in 155 its place. 156 If strict is true, use a strict parser which rejects malformed inputs. 157 """ 158 159 # If strict is true, if the resulting list of parsed addresses is greater 160 # than the number of fieldvalues in the input list, a parsing error has 161 # occurred and consequently a list containing a single empty 2-tuple [('', 162 # '')] is returned in its place. This is done to avoid invalid output. 163 # 164 # Malformed input: getaddresses(['alice@example.com <bob@example.com>']) 165 # Invalid output: [('', 'alice@example.com'), ('', 'bob@example.com')] 166 # Safe output: [('', '')] 167 168 if not strict: 169 all = COMMASPACE.join(str(v) for v in fieldvalues) 170 a = _AddressList(all) 171 return a.addresslist 172 173 fieldvalues = [str(v) for v in fieldvalues] 174 fieldvalues = _pre_parse_validation(fieldvalues) 175 addr = COMMASPACE.join(fieldvalues) 176 a = _AddressList(addr) 177 result = _post_parse_validation(a.addresslist) 178 179 # Treat output as invalid if the number of addresses is not equal to the 180 # expected number of addresses. 181 n = 0 182 for v in fieldvalues: 183 # When a comma is used in the Real Name part it is not a deliminator. 184 # So strip those out before counting the commas. 185 v = _strip_quoted_realnames(v) 186 # Expected number of addresses: 1 + number of commas 187 n += 1 + v.count(',') 188 if len(result) != n: 189 return [('', '')] 190 191 return result 192 193 194def _check_parenthesis(addr): 195 # Ignore parenthesis in quoted real names. 196 addr = _strip_quoted_realnames(addr) 197 198 opens = 0 199 for pos, ch in _iter_escaped_chars(addr): 200 if ch == '(': 201 opens += 1 202 elif ch == ')': 203 opens -= 1 204 if opens < 0: 205 return False 206 return (opens == 0) 207 208 209def _pre_parse_validation(email_header_fields): 210 accepted_values = [] 211 for v in email_header_fields: 212 if not _check_parenthesis(v): 213 v = "('', '')" 214 accepted_values.append(v) 215 216 return accepted_values 217 218 219def _post_parse_validation(parsed_email_header_tuples): 220 accepted_values = [] 221 # The parser would have parsed a correctly formatted domain-literal 222 # The existence of an [ after parsing indicates a parsing failure 223 for v in parsed_email_header_tuples: 224 if '[' in v[1]: 225 v = ('', '') 226 accepted_values.append(v) 227 228 return accepted_values 229 230 231def _format_timetuple_and_zone(timetuple, zone): 232 return '%s, %02d %s %04d %02d:%02d:%02d %s' % ( 233 ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]], 234 timetuple[2], 235 ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 236 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1], 237 timetuple[0], timetuple[3], timetuple[4], timetuple[5], 238 zone) 239 240def formatdate(timeval=None, localtime=False, usegmt=False): 241 """Returns a date string as specified by RFC 2822, e.g.: 242 243 Fri, 09 Nov 2001 01:08:47 -0000 244 245 Optional timeval if given is a floating point time value as accepted by 246 gmtime() and localtime(), otherwise the current time is used. 247 248 Optional localtime is a flag that when True, interprets timeval, and 249 returns a date relative to the local timezone instead of UTC, properly 250 taking daylight savings time into account. 251 252 Optional argument usegmt means that the timezone is written out as 253 an ascii string, not numeric one (so "GMT" instead of "+0000"). This 254 is needed for HTTP, and is only used when localtime==False. 255 """ 256 # Note: we cannot use strftime() because that honors the locale and RFC 257 # 2822 requires that day and month names be the English abbreviations. 258 if timeval is None: 259 timeval = time.time() 260 if localtime or usegmt: 261 dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc) 262 else: 263 dt = datetime.datetime.utcfromtimestamp(timeval) 264 if localtime: 265 dt = dt.astimezone() 266 usegmt = False 267 return format_datetime(dt, usegmt) 268 269def format_datetime(dt, usegmt=False): 270 """Turn a datetime into a date string as specified in RFC 2822. 271 272 If usegmt is True, dt must be an aware datetime with an offset of zero. In 273 this case 'GMT' will be rendered instead of the normal +0000 required by 274 RFC2822. This is to support HTTP headers involving date stamps. 275 """ 276 now = dt.timetuple() 277 if usegmt: 278 if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc: 279 raise ValueError("usegmt option requires a UTC datetime") 280 zone = 'GMT' 281 elif dt.tzinfo is None: 282 zone = '-0000' 283 else: 284 zone = dt.strftime("%z") 285 return _format_timetuple_and_zone(now, zone) 286 287 288def make_msgid(idstring=None, domain=None): 289 """Returns a string suitable for RFC 2822 compliant Message-ID, e.g: 290 291 <142480216486.20800.16526388040877946887@nightshade.la.mastaler.com> 292 293 Optional idstring if given is a string used to strengthen the 294 uniqueness of the message id. Optional domain if given provides the 295 portion of the message id after the '@'. It defaults to the locally 296 defined hostname. 297 """ 298 timeval = int(time.time()*100) 299 pid = os.getpid() 300 randint = random.getrandbits(64) 301 if idstring is None: 302 idstring = '' 303 else: 304 idstring = '.' + idstring 305 if domain is None: 306 domain = socket.getfqdn() 307 msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain) 308 return msgid 309 310 311def parsedate_to_datetime(data): 312 parsed_date_tz = _parsedate_tz(data) 313 if parsed_date_tz is None: 314 raise ValueError('Invalid date value or format "%s"' % str(data)) 315 *dtuple, tz = parsed_date_tz 316 if tz is None: 317 return datetime.datetime(*dtuple[:6]) 318 return datetime.datetime(*dtuple[:6], 319 tzinfo=datetime.timezone(datetime.timedelta(seconds=tz))) 320 321 322def parseaddr(addr, *, strict=True): 323 """ 324 Parse addr into its constituent realname and email address parts. 325 326 Return a tuple of realname and email address, unless the parse fails, in 327 which case return a 2-tuple of ('', ''). 328 329 If strict is True, use a strict parser which rejects malformed inputs. 330 """ 331 if not strict: 332 addrs = _AddressList(addr).addresslist 333 if not addrs: 334 return ('', '') 335 return addrs[0] 336 337 if isinstance(addr, list): 338 addr = addr[0] 339 340 if not isinstance(addr, str): 341 return ('', '') 342 343 addr = _pre_parse_validation([addr])[0] 344 addrs = _post_parse_validation(_AddressList(addr).addresslist) 345 346 if not addrs or len(addrs) > 1: 347 return ('', '') 348 349 return addrs[0] 350 351 352# rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3. 353def unquote(str): 354 """Remove quotes from a string.""" 355 if len(str) > 1: 356 if str.startswith('"') and str.endswith('"'): 357 return str[1:-1].replace('\\\\', '\\').replace('\\"', '"') 358 if str.startswith('<') and str.endswith('>'): 359 return str[1:-1] 360 return str 361 362 363 364# RFC2231-related functions - parameter encoding and decoding 365def decode_rfc2231(s): 366 """Decode string according to RFC 2231""" 367 parts = s.split(TICK, 2) 368 if len(parts) <= 2: 369 return None, None, s 370 return parts 371 372 373def encode_rfc2231(s, charset=None, language=None): 374 """Encode string according to RFC 2231. 375 376 If neither charset nor language is given, then s is returned as-is. If 377 charset is given but not language, the string is encoded using the empty 378 string for language. 379 """ 380 s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii') 381 if charset is None and language is None: 382 return s 383 if language is None: 384 language = '' 385 return "%s'%s'%s" % (charset, language, s) 386 387 388rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$', 389 re.ASCII) 390 391def decode_params(params): 392 """Decode parameters list according to RFC 2231. 393 394 params is a sequence of 2-tuples containing (param name, string value). 395 """ 396 new_params = [params[0]] 397 # Map parameter's name to a list of continuations. The values are a 398 # 3-tuple of the continuation number, the string value, and a flag 399 # specifying whether a particular segment is %-encoded. 400 rfc2231_params = {} 401 for name, value in params[1:]: 402 encoded = name.endswith('*') 403 value = unquote(value) 404 mo = rfc2231_continuation.match(name) 405 if mo: 406 name, num = mo.group('name', 'num') 407 if num is not None: 408 num = int(num) 409 rfc2231_params.setdefault(name, []).append((num, value, encoded)) 410 else: 411 new_params.append((name, '"%s"' % quote(value))) 412 if rfc2231_params: 413 for name, continuations in rfc2231_params.items(): 414 value = [] 415 extended = False 416 # Sort by number 417 continuations.sort() 418 # And now append all values in numerical order, converting 419 # %-encodings for the encoded segments. If any of the 420 # continuation names ends in a *, then the entire string, after 421 # decoding segments and concatenating, must have the charset and 422 # language specifiers at the beginning of the string. 423 for num, s, encoded in continuations: 424 if encoded: 425 # Decode as "latin-1", so the characters in s directly 426 # represent the percent-encoded octet values. 427 # collapse_rfc2231_value treats this as an octet sequence. 428 s = urllib.parse.unquote(s, encoding="latin-1") 429 extended = True 430 value.append(s) 431 value = quote(EMPTYSTRING.join(value)) 432 if extended: 433 charset, language, value = decode_rfc2231(value) 434 new_params.append((name, (charset, language, '"%s"' % value))) 435 else: 436 new_params.append((name, '"%s"' % value)) 437 return new_params 438 439def collapse_rfc2231_value(value, errors='replace', 440 fallback_charset='us-ascii'): 441 if not isinstance(value, tuple) or len(value) != 3: 442 return unquote(value) 443 # While value comes to us as a unicode string, we need it to be a bytes 444 # object. We do not want bytes() normal utf-8 decoder, we want a straight 445 # interpretation of the string as character bytes. 446 charset, language, text = value 447 if charset is None: 448 # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse 449 # the value, so use the fallback_charset. 450 charset = fallback_charset 451 rawbytes = bytes(text, 'raw-unicode-escape') 452 try: 453 return str(rawbytes, charset, errors) 454 except LookupError: 455 # charset is not a known codec. 456 return unquote(text) 457 458 459# 460# datetime doesn't provide a localtime function yet, so provide one. Code 461# adapted from the patch in issue 9527. This may not be perfect, but it is 462# better than not having it. 463# 464 465def localtime(dt=None, isdst=-1): 466 """Return local time as an aware datetime object. 467 468 If called without arguments, return current time. Otherwise *dt* 469 argument should be a datetime instance, and it is converted to the 470 local time zone according to the system time zone database. If *dt* is 471 naive (that is, dt.tzinfo is None), it is assumed to be in local time. 472 In this case, a positive or zero value for *isdst* causes localtime to 473 presume initially that summer time (for example, Daylight Saving Time) 474 is or is not (respectively) in effect for the specified time. A 475 negative value for *isdst* causes the localtime() function to attempt 476 to divine whether summer time is in effect for the specified time. 477 478 """ 479 if dt is None: 480 return datetime.datetime.now(datetime.timezone.utc).astimezone() 481 if dt.tzinfo is not None: 482 return dt.astimezone() 483 # We have a naive datetime. Convert to a (localtime) timetuple and pass to 484 # system mktime together with the isdst hint. System mktime will return 485 # seconds since epoch. 486 tm = dt.timetuple()[:-1] + (isdst,) 487 seconds = time.mktime(tm) 488 localtm = time.localtime(seconds) 489 try: 490 delta = datetime.timedelta(seconds=localtm.tm_gmtoff) 491 tz = datetime.timezone(delta, localtm.tm_zone) 492 except AttributeError: 493 # Compute UTC offset and compare with the value implied by tm_isdst. 494 # If the values match, use the zone name implied by tm_isdst. 495 delta = dt - datetime.datetime(*time.gmtime(seconds)[:6]) 496 dst = time.daylight and localtm.tm_isdst > 0 497 gmtoff = -(time.altzone if dst else time.timezone) 498 if delta == datetime.timedelta(seconds=gmtoff): 499 tz = datetime.timezone(delta, time.tzname[dst]) 500 else: 501 tz = datetime.timezone(delta) 502 return dt.replace(tzinfo=tz) 503