• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2002-2007 Python Software Foundation
2# Author: Ben Gertzfield, Barry Warsaw
3# Contact: email-sig@python.org
4
5"""Header encoding and decoding functionality."""
6
7__all__ = [
8    'Header',
9    'decode_header',
10    'make_header',
11    ]
12
13import re
14import binascii
15
16import email.quoprimime
17import email.base64mime
18
19from email.errors import HeaderParseError
20from email import charset as _charset
21Charset = _charset.Charset
22
23NL = '\n'
24SPACE = ' '
25BSPACE = b' '
26SPACE8 = ' ' * 8
27EMPTYSTRING = ''
28MAXLINELEN = 78
29FWS = ' \t'
30
31USASCII = Charset('us-ascii')
32UTF8 = Charset('utf-8')
33
34# Match encoded-word strings in the form =?charset?q?Hello_World?=
35ecre = re.compile(r'''
36  =\?                   # literal =?
37  (?P<charset>[^?]*?)   # non-greedy up to the next ? is the charset
38  \?                    # literal ?
39  (?P<encoding>[qQbB])  # either a "q" or a "b", case insensitive
40  \?                    # literal ?
41  (?P<encoded>.*?)      # non-greedy up to the next ?= is the encoded string
42  \?=                   # literal ?=
43  ''', re.VERBOSE | re.MULTILINE)
44
45# Field name regexp, including trailing colon, but not separating whitespace,
46# according to RFC 2822.  Character range is from tilde to exclamation mark.
47# For use with .match()
48fcre = re.compile(r'[\041-\176]+:$')
49
50# Find a header embedded in a putative header value.  Used to check for
51# header injection attack.
52_embedded_header = re.compile(r'\n[^ \t]+:')
53
54
55
56# Helpers
57_max_append = email.quoprimime._max_append
58
59
60
61def decode_header(header):
62    """Decode a message header value without converting charset.
63
64    Returns a list of (string, charset) pairs containing each of the decoded
65    parts of the header.  Charset is None for non-encoded parts of the header,
66    otherwise a lower-case string containing the name of the character set
67    specified in the encoded string.
68
69    header may be a string that may or may not contain RFC2047 encoded words,
70    or it may be a Header object.
71
72    An email.errors.HeaderParseError may be raised when certain decoding error
73    occurs (e.g. a base64 decoding exception).
74    """
75    # If it is a Header object, we can just return the encoded chunks.
76    if hasattr(header, '_chunks'):
77        return [(_charset._encode(string, str(charset)), str(charset))
78                    for string, charset in header._chunks]
79    # If no encoding, just return the header with no charset.
80    if not ecre.search(header):
81        return [(header, None)]
82    # First step is to parse all the encoded parts into triplets of the form
83    # (encoded_string, encoding, charset).  For unencoded strings, the last
84    # two parts will be None.
85    words = []
86    for line in header.splitlines():
87        parts = ecre.split(line)
88        first = True
89        while parts:
90            unencoded = parts.pop(0)
91            if first:
92                unencoded = unencoded.lstrip()
93                first = False
94            if unencoded:
95                words.append((unencoded, None, None))
96            if parts:
97                charset = parts.pop(0).lower()
98                encoding = parts.pop(0).lower()
99                encoded = parts.pop(0)
100                words.append((encoded, encoding, charset))
101    # Now loop over words and remove words that consist of whitespace
102    # between two encoded strings.
103    droplist = []
104    for n, w in enumerate(words):
105        if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace():
106            droplist.append(n-1)
107    for d in reversed(droplist):
108        del words[d]
109
110    # The next step is to decode each encoded word by applying the reverse
111    # base64 or quopri transformation.  decoded_words is now a list of the
112    # form (decoded_word, charset).
113    decoded_words = []
114    for encoded_string, encoding, charset in words:
115        if encoding is None:
116            # This is an unencoded word.
117            decoded_words.append((encoded_string, charset))
118        elif encoding == 'q':
119            word = email.quoprimime.header_decode(encoded_string)
120            decoded_words.append((word, charset))
121        elif encoding == 'b':
122            paderr = len(encoded_string) % 4   # Postel's law: add missing padding
123            if paderr:
124                encoded_string += '==='[:4 - paderr]
125            try:
126                word = email.base64mime.decode(encoded_string)
127            except binascii.Error:
128                raise HeaderParseError('Base64 decoding error')
129            else:
130                decoded_words.append((word, charset))
131        else:
132            raise AssertionError('Unexpected encoding: ' + encoding)
133    # Now convert all words to bytes and collapse consecutive runs of
134    # similarly encoded words.
135    collapsed = []
136    last_word = last_charset = None
137    for word, charset in decoded_words:
138        if isinstance(word, str):
139            word = bytes(word, 'raw-unicode-escape')
140        if last_word is None:
141            last_word = word
142            last_charset = charset
143        elif charset != last_charset:
144            collapsed.append((last_word, last_charset))
145            last_word = word
146            last_charset = charset
147        elif last_charset is None:
148            last_word += BSPACE + word
149        else:
150            last_word += word
151    collapsed.append((last_word, last_charset))
152    return collapsed
153
154
155
156def make_header(decoded_seq, maxlinelen=None, header_name=None,
157                continuation_ws=' '):
158    """Create a Header from a sequence of pairs as returned by decode_header()
159
160    decode_header() takes a header value string and returns a sequence of
161    pairs of the format (decoded_string, charset) where charset is the string
162    name of the character set.
163
164    This function takes one of those sequence of pairs and returns a Header
165    instance.  Optional maxlinelen, header_name, and continuation_ws are as in
166    the Header constructor.
167    """
168    h = Header(maxlinelen=maxlinelen, header_name=header_name,
169               continuation_ws=continuation_ws)
170    for s, charset in decoded_seq:
171        # None means us-ascii but we can simply pass it on to h.append()
172        if charset is not None and not isinstance(charset, Charset):
173            charset = Charset(charset)
174        h.append(s, charset)
175    return h
176
177
178
179class Header:
180    def __init__(self, s=None, charset=None,
181                 maxlinelen=None, header_name=None,
182                 continuation_ws=' ', errors='strict'):
183        """Create a MIME-compliant header that can contain many character sets.
184
185        Optional s is the initial header value.  If None, the initial header
186        value is not set.  You can later append to the header with .append()
187        method calls.  s may be a byte string or a Unicode string, but see the
188        .append() documentation for semantics.
189
190        Optional charset serves two purposes: it has the same meaning as the
191        charset argument to the .append() method.  It also sets the default
192        character set for all subsequent .append() calls that omit the charset
193        argument.  If charset is not provided in the constructor, the us-ascii
194        charset is used both as s's initial charset and as the default for
195        subsequent .append() calls.
196
197        The maximum line length can be specified explicitly via maxlinelen. For
198        splitting the first line to a shorter value (to account for the field
199        header which isn't included in s, e.g. `Subject') pass in the name of
200        the field in header_name.  The default maxlinelen is 78 as recommended
201        by RFC 2822.
202
203        continuation_ws must be RFC 2822 compliant folding whitespace (usually
204        either a space or a hard tab) which will be prepended to continuation
205        lines.
206
207        errors is passed through to the .append() call.
208        """
209        if charset is None:
210            charset = USASCII
211        elif not isinstance(charset, Charset):
212            charset = Charset(charset)
213        self._charset = charset
214        self._continuation_ws = continuation_ws
215        self._chunks = []
216        if s is not None:
217            self.append(s, charset, errors)
218        if maxlinelen is None:
219            maxlinelen = MAXLINELEN
220        self._maxlinelen = maxlinelen
221        if header_name is None:
222            self._headerlen = 0
223        else:
224            # Take the separating colon and space into account.
225            self._headerlen = len(header_name) + 2
226
227    def __str__(self):
228        """Return the string value of the header."""
229        self._normalize()
230        uchunks = []
231        lastcs = None
232        lastspace = None
233        for string, charset in self._chunks:
234            # We must preserve spaces between encoded and non-encoded word
235            # boundaries, which means for us we need to add a space when we go
236            # from a charset to None/us-ascii, or from None/us-ascii to a
237            # charset.  Only do this for the second and subsequent chunks.
238            # Don't add a space if the None/us-ascii string already has
239            # a space (trailing or leading depending on transition)
240            nextcs = charset
241            if nextcs == _charset.UNKNOWN8BIT:
242                original_bytes = string.encode('ascii', 'surrogateescape')
243                string = original_bytes.decode('ascii', 'replace')
244            if uchunks:
245                hasspace = string and self._nonctext(string[0])
246                if lastcs not in (None, 'us-ascii'):
247                    if nextcs in (None, 'us-ascii') and not hasspace:
248                        uchunks.append(SPACE)
249                        nextcs = None
250                elif nextcs not in (None, 'us-ascii') and not lastspace:
251                    uchunks.append(SPACE)
252            lastspace = string and self._nonctext(string[-1])
253            lastcs = nextcs
254            uchunks.append(string)
255        return EMPTYSTRING.join(uchunks)
256
257    # Rich comparison operators for equality only.  BAW: does it make sense to
258    # have or explicitly disable <, <=, >, >= operators?
259    def __eq__(self, other):
260        # other may be a Header or a string.  Both are fine so coerce
261        # ourselves to a unicode (of the unencoded header value), swap the
262        # args and do another comparison.
263        return other == str(self)
264
265    def append(self, s, charset=None, errors='strict'):
266        """Append a string to the MIME header.
267
268        Optional charset, if given, should be a Charset instance or the name
269        of a character set (which will be converted to a Charset instance).  A
270        value of None (the default) means that the charset given in the
271        constructor is used.
272
273        s may be a byte string or a Unicode string.  If it is a byte string
274        (i.e. isinstance(s, str) is false), then charset is the encoding of
275        that byte string, and a UnicodeError will be raised if the string
276        cannot be decoded with that charset.  If s is a Unicode string, then
277        charset is a hint specifying the character set of the characters in
278        the string.  In either case, when producing an RFC 2822 compliant
279        header using RFC 2047 rules, the string will be encoded using the
280        output codec of the charset.  If the string cannot be encoded to the
281        output codec, a UnicodeError will be raised.
282
283        Optional `errors' is passed as the errors argument to the decode
284        call if s is a byte string.
285        """
286        if charset is None:
287            charset = self._charset
288        elif not isinstance(charset, Charset):
289            charset = Charset(charset)
290        if not isinstance(s, str):
291            input_charset = charset.input_codec or 'us-ascii'
292            if input_charset == _charset.UNKNOWN8BIT:
293                s = s.decode('us-ascii', 'surrogateescape')
294            else:
295                s = s.decode(input_charset, errors)
296        # Ensure that the bytes we're storing can be decoded to the output
297        # character set, otherwise an early error is raised.
298        output_charset = charset.output_codec or 'us-ascii'
299        if output_charset != _charset.UNKNOWN8BIT:
300            try:
301                s.encode(output_charset, errors)
302            except UnicodeEncodeError:
303                if output_charset!='us-ascii':
304                    raise
305                charset = UTF8
306        self._chunks.append((s, charset))
307
308    def _nonctext(self, s):
309        """True if string s is not a ctext character of RFC822.
310        """
311        return s.isspace() or s in ('(', ')', '\\')
312
313    def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):
314        r"""Encode a message header into an RFC-compliant format.
315
316        There are many issues involved in converting a given string for use in
317        an email header.  Only certain character sets are readable in most
318        email clients, and as header strings can only contain a subset of
319        7-bit ASCII, care must be taken to properly convert and encode (with
320        Base64 or quoted-printable) header strings.  In addition, there is a
321        75-character length limit on any given encoded header field, so
322        line-wrapping must be performed, even with double-byte character sets.
323
324        Optional maxlinelen specifies the maximum length of each generated
325        line, exclusive of the linesep string.  Individual lines may be longer
326        than maxlinelen if a folding point cannot be found.  The first line
327        will be shorter by the length of the header name plus ": " if a header
328        name was specified at Header construction time.  The default value for
329        maxlinelen is determined at header construction time.
330
331        Optional splitchars is a string containing characters which should be
332        given extra weight by the splitting algorithm during normal header
333        wrapping.  This is in very rough support of RFC 2822's `higher level
334        syntactic breaks':  split points preceded by a splitchar are preferred
335        during line splitting, with the characters preferred in the order in
336        which they appear in the string.  Space and tab may be included in the
337        string to indicate whether preference should be given to one over the
338        other as a split point when other split chars do not appear in the line
339        being split.  Splitchars does not affect RFC 2047 encoded lines.
340
341        Optional linesep is a string to be used to separate the lines of
342        the value.  The default value is the most useful for typical
343        Python applications, but it can be set to \r\n to produce RFC-compliant
344        line separators when needed.
345        """
346        self._normalize()
347        if maxlinelen is None:
348            maxlinelen = self._maxlinelen
349        # A maxlinelen of 0 means don't wrap.  For all practical purposes,
350        # choosing a huge number here accomplishes that and makes the
351        # _ValueFormatter algorithm much simpler.
352        if maxlinelen == 0:
353            maxlinelen = 1000000
354        formatter = _ValueFormatter(self._headerlen, maxlinelen,
355                                    self._continuation_ws, splitchars)
356        lastcs = None
357        hasspace = lastspace = None
358        for string, charset in self._chunks:
359            if hasspace is not None:
360                hasspace = string and self._nonctext(string[0])
361                if lastcs not in (None, 'us-ascii'):
362                    if not hasspace or charset not in (None, 'us-ascii'):
363                        formatter.add_transition()
364                elif charset not in (None, 'us-ascii') and not lastspace:
365                    formatter.add_transition()
366            lastspace = string and self._nonctext(string[-1])
367            lastcs = charset
368            hasspace = False
369            lines = string.splitlines()
370            if lines:
371                formatter.feed('', lines[0], charset)
372            else:
373                formatter.feed('', '', charset)
374            for line in lines[1:]:
375                formatter.newline()
376                if charset.header_encoding is not None:
377                    formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
378                                   charset)
379                else:
380                    sline = line.lstrip()
381                    fws = line[:len(line)-len(sline)]
382                    formatter.feed(fws, sline, charset)
383            if len(lines) > 1:
384                formatter.newline()
385        if self._chunks:
386            formatter.add_transition()
387        value = formatter._str(linesep)
388        if _embedded_header.search(value):
389            raise HeaderParseError("header value appears to contain "
390                "an embedded header: {!r}".format(value))
391        return value
392
393    def _normalize(self):
394        # Step 1: Normalize the chunks so that all runs of identical charsets
395        # get collapsed into a single unicode string.
396        chunks = []
397        last_charset = None
398        last_chunk = []
399        for string, charset in self._chunks:
400            if charset == last_charset:
401                last_chunk.append(string)
402            else:
403                if last_charset is not None:
404                    chunks.append((SPACE.join(last_chunk), last_charset))
405                last_chunk = [string]
406                last_charset = charset
407        if last_chunk:
408            chunks.append((SPACE.join(last_chunk), last_charset))
409        self._chunks = chunks
410
411
412
413class _ValueFormatter:
414    def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
415        self._maxlen = maxlen
416        self._continuation_ws = continuation_ws
417        self._continuation_ws_len = len(continuation_ws)
418        self._splitchars = splitchars
419        self._lines = []
420        self._current_line = _Accumulator(headerlen)
421
422    def _str(self, linesep):
423        self.newline()
424        return linesep.join(self._lines)
425
426    def __str__(self):
427        return self._str(NL)
428
429    def newline(self):
430        end_of_line = self._current_line.pop()
431        if end_of_line != (' ', ''):
432            self._current_line.push(*end_of_line)
433        if len(self._current_line) > 0:
434            if self._current_line.is_onlyws() and self._lines:
435                self._lines[-1] += str(self._current_line)
436            else:
437                self._lines.append(str(self._current_line))
438        self._current_line.reset()
439
440    def add_transition(self):
441        self._current_line.push(' ', '')
442
443    def feed(self, fws, string, charset):
444        # If the charset has no header encoding (i.e. it is an ASCII encoding)
445        # then we must split the header at the "highest level syntactic break"
446        # possible. Note that we don't have a lot of smarts about field
447        # syntax; we just try to break on semi-colons, then commas, then
448        # whitespace.  Eventually, this should be pluggable.
449        if charset.header_encoding is None:
450            self._ascii_split(fws, string, self._splitchars)
451            return
452        # Otherwise, we're doing either a Base64 or a quoted-printable
453        # encoding which means we don't need to split the line on syntactic
454        # breaks.  We can basically just find enough characters to fit on the
455        # current line, minus the RFC 2047 chrome.  What makes this trickier
456        # though is that we have to split at octet boundaries, not character
457        # boundaries but it's only safe to split at character boundaries so at
458        # best we can only get close.
459        encoded_lines = charset.header_encode_lines(string, self._maxlengths())
460        # The first element extends the current line, but if it's None then
461        # nothing more fit on the current line so start a new line.
462        try:
463            first_line = encoded_lines.pop(0)
464        except IndexError:
465            # There are no encoded lines, so we're done.
466            return
467        if first_line is not None:
468            self._append_chunk(fws, first_line)
469        try:
470            last_line = encoded_lines.pop()
471        except IndexError:
472            # There was only one line.
473            return
474        self.newline()
475        self._current_line.push(self._continuation_ws, last_line)
476        # Everything else are full lines in themselves.
477        for line in encoded_lines:
478            self._lines.append(self._continuation_ws + line)
479
480    def _maxlengths(self):
481        # The first line's length.
482        yield self._maxlen - len(self._current_line)
483        while True:
484            yield self._maxlen - self._continuation_ws_len
485
486    def _ascii_split(self, fws, string, splitchars):
487        # The RFC 2822 header folding algorithm is simple in principle but
488        # complex in practice.  Lines may be folded any place where "folding
489        # white space" appears by inserting a linesep character in front of the
490        # FWS.  The complication is that not all spaces or tabs qualify as FWS,
491        # and we are also supposed to prefer to break at "higher level
492        # syntactic breaks".  We can't do either of these without intimate
493        # knowledge of the structure of structured headers, which we don't have
494        # here.  So the best we can do here is prefer to break at the specified
495        # splitchars, and hope that we don't choose any spaces or tabs that
496        # aren't legal FWS.  (This is at least better than the old algorithm,
497        # where we would sometimes *introduce* FWS after a splitchar, or the
498        # algorithm before that, where we would turn all white space runs into
499        # single spaces or tabs.)
500        parts = re.split("(["+FWS+"]+)", fws+string)
501        if parts[0]:
502            parts[:0] = ['']
503        else:
504            parts.pop(0)
505        for fws, part in zip(*[iter(parts)]*2):
506            self._append_chunk(fws, part)
507
508    def _append_chunk(self, fws, string):
509        self._current_line.push(fws, string)
510        if len(self._current_line) > self._maxlen:
511            # Find the best split point, working backward from the end.
512            # There might be none, on a long first line.
513            for ch in self._splitchars:
514                for i in range(self._current_line.part_count()-1, 0, -1):
515                    if ch.isspace():
516                        fws = self._current_line[i][0]
517                        if fws and fws[0]==ch:
518                            break
519                    prevpart = self._current_line[i-1][1]
520                    if prevpart and prevpart[-1]==ch:
521                        break
522                else:
523                    continue
524                break
525            else:
526                fws, part = self._current_line.pop()
527                if self._current_line._initial_size > 0:
528                    # There will be a header, so leave it on a line by itself.
529                    self.newline()
530                    if not fws:
531                        # We don't use continuation_ws here because the whitespace
532                        # after a header should always be a space.
533                        fws = ' '
534                self._current_line.push(fws, part)
535                return
536            remainder = self._current_line.pop_from(i)
537            self._lines.append(str(self._current_line))
538            self._current_line.reset(remainder)
539
540
541class _Accumulator(list):
542
543    def __init__(self, initial_size=0):
544        self._initial_size = initial_size
545        super().__init__()
546
547    def push(self, fws, string):
548        self.append((fws, string))
549
550    def pop_from(self, i=0):
551        popped = self[i:]
552        self[i:] = []
553        return popped
554
555    def pop(self):
556        if self.part_count()==0:
557            return ('', '')
558        return super().pop()
559
560    def __len__(self):
561        return sum((len(fws)+len(part) for fws, part in self),
562                   self._initial_size)
563
564    def __str__(self):
565        return EMPTYSTRING.join((EMPTYSTRING.join((fws, part))
566                                for fws, part in self))
567
568    def reset(self, startval=None):
569        if startval is None:
570            startval = []
571        self[:] = startval
572        self._initial_size = 0
573
574    def is_onlyws(self):
575        return self._initial_size==0 and (not self or str(self).isspace())
576
577    def part_count(self):
578        return super().__len__()
579