1"""IMAP4 client. 2 3Based on RFC 2060. 4 5Public class: IMAP4 6Public variable: Debug 7Public functions: Internaldate2tuple 8 Int2AP 9 ParseFlags 10 Time2Internaldate 11""" 12 13# Author: Piers Lauder <piers@cs.su.oz.au> December 1997. 14# 15# Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998. 16# String method conversion by ESR, February 2001. 17# GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001. 18# IMAP4_SSL contributed by Tino Lange <Tino.Lange@isg.de> March 2002. 19# GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002. 20# PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002. 21# GET/SETANNOTATION contributed by Tomas Lindroos <skitta@abo.fi> June 2005. 22 23__version__ = "2.58" 24 25import binascii, errno, random, re, socket, subprocess, sys, time, calendar 26from datetime import datetime, timezone, timedelta 27from io import DEFAULT_BUFFER_SIZE 28 29try: 30 import ssl 31 HAVE_SSL = True 32except ImportError: 33 HAVE_SSL = False 34 35__all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple", 36 "Int2AP", "ParseFlags", "Time2Internaldate"] 37 38# Globals 39 40CRLF = b'\r\n' 41Debug = 0 42IMAP4_PORT = 143 43IMAP4_SSL_PORT = 993 44AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first 45 46# Maximal line length when calling readline(). This is to prevent 47# reading arbitrary length lines. RFC 3501 and 2060 (IMAP 4rev1) 48# don't specify a line length. RFC 2683 suggests limiting client 49# command lines to 1000 octets and that servers should be prepared 50# to accept command lines up to 8000 octets, so we used to use 10K here. 51# In the modern world (eg: gmail) the response to, for example, a 52# search command can be quite large, so we now use 1M. 53_MAXLINE = 1000000 54 55 56# Commands 57 58Commands = { 59 # name valid states 60 'APPEND': ('AUTH', 'SELECTED'), 61 'AUTHENTICATE': ('NONAUTH',), 62 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 63 'CHECK': ('SELECTED',), 64 'CLOSE': ('SELECTED',), 65 'COPY': ('SELECTED',), 66 'CREATE': ('AUTH', 'SELECTED'), 67 'DELETE': ('AUTH', 'SELECTED'), 68 'DELETEACL': ('AUTH', 'SELECTED'), 69 'ENABLE': ('AUTH', ), 70 'EXAMINE': ('AUTH', 'SELECTED'), 71 'EXPUNGE': ('SELECTED',), 72 'FETCH': ('SELECTED',), 73 'GETACL': ('AUTH', 'SELECTED'), 74 'GETANNOTATION':('AUTH', 'SELECTED'), 75 'GETQUOTA': ('AUTH', 'SELECTED'), 76 'GETQUOTAROOT': ('AUTH', 'SELECTED'), 77 'MYRIGHTS': ('AUTH', 'SELECTED'), 78 'LIST': ('AUTH', 'SELECTED'), 79 'LOGIN': ('NONAUTH',), 80 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 81 'LSUB': ('AUTH', 'SELECTED'), 82 'MOVE': ('SELECTED',), 83 'NAMESPACE': ('AUTH', 'SELECTED'), 84 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 85 'PARTIAL': ('SELECTED',), # NB: obsolete 86 'PROXYAUTH': ('AUTH',), 87 'RENAME': ('AUTH', 'SELECTED'), 88 'SEARCH': ('SELECTED',), 89 'SELECT': ('AUTH', 'SELECTED'), 90 'SETACL': ('AUTH', 'SELECTED'), 91 'SETANNOTATION':('AUTH', 'SELECTED'), 92 'SETQUOTA': ('AUTH', 'SELECTED'), 93 'SORT': ('SELECTED',), 94 'STARTTLS': ('NONAUTH',), 95 'STATUS': ('AUTH', 'SELECTED'), 96 'STORE': ('SELECTED',), 97 'SUBSCRIBE': ('AUTH', 'SELECTED'), 98 'THREAD': ('SELECTED',), 99 'UID': ('SELECTED',), 100 'UNSUBSCRIBE': ('AUTH', 'SELECTED'), 101 } 102 103# Patterns to match server responses 104 105Continuation = re.compile(br'\+( (?P<data>.*))?') 106Flags = re.compile(br'.*FLAGS \((?P<flags>[^\)]*)\)') 107InternalDate = re.compile(br'.*INTERNALDATE "' 108 br'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])' 109 br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])' 110 br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])' 111 br'"') 112# Literal is no longer used; kept for backward compatibility. 113Literal = re.compile(br'.*{(?P<size>\d+)}$', re.ASCII) 114MapCRLF = re.compile(br'\r\n|\r|\n') 115# We no longer exclude the ']' character from the data portion of the response 116# code, even though it violates the RFC. Popular IMAP servers such as Gmail 117# allow flags with ']', and there are programs (including imaplib!) that can 118# produce them. The problem with this is if the 'text' portion of the response 119# includes a ']' we'll parse the response wrong (which is the point of the RFC 120# restriction). However, that seems less likely to be a problem in practice 121# than being unable to correctly parse flags that include ']' chars, which 122# was reported as a real-world problem in issue #21815. 123Response_code = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>.*))?\]') 124Untagged_response = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?') 125# Untagged_status is no longer used; kept for backward compatibility 126Untagged_status = re.compile( 127 br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?', re.ASCII) 128# We compile these in _mode_xxx. 129_Literal = br'.*{(?P<size>\d+)}$' 130_Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?' 131 132 133 134class IMAP4: 135 136 r"""IMAP4 client class. 137 138 Instantiate with: IMAP4([host[, port]]) 139 140 host - host's name (default: localhost); 141 port - port number (default: standard IMAP4 port). 142 143 All IMAP4rev1 commands are supported by methods of the same 144 name (in lower-case). 145 146 All arguments to commands are converted to strings, except for 147 AUTHENTICATE, and the last argument to APPEND which is passed as 148 an IMAP4 literal. If necessary (the string contains any 149 non-printing characters or white-space and isn't enclosed with 150 either parentheses or double quotes) each string is quoted. 151 However, the 'password' argument to the LOGIN command is always 152 quoted. If you want to avoid having an argument string quoted 153 (eg: the 'flags' argument to STORE) then enclose the string in 154 parentheses (eg: "(\Deleted)"). 155 156 Each command returns a tuple: (type, [data, ...]) where 'type' 157 is usually 'OK' or 'NO', and 'data' is either the text from the 158 tagged response, or untagged results from command. Each 'data' 159 is either a string, or a tuple. If a tuple, then the first part 160 is the header of the response, and the second part contains 161 the data (ie: 'literal' value). 162 163 Errors raise the exception class <instance>.error("<reason>"). 164 IMAP4 server errors raise <instance>.abort("<reason>"), 165 which is a sub-class of 'error'. Mailbox status changes 166 from READ-WRITE to READ-ONLY raise the exception class 167 <instance>.readonly("<reason>"), which is a sub-class of 'abort'. 168 169 "error" exceptions imply a program error. 170 "abort" exceptions imply the connection should be reset, and 171 the command re-tried. 172 "readonly" exceptions imply the command should be re-tried. 173 174 Note: to use this module, you must read the RFCs pertaining to the 175 IMAP4 protocol, as the semantics of the arguments to each IMAP4 176 command are left to the invoker, not to mention the results. Also, 177 most IMAP servers implement a sub-set of the commands available here. 178 """ 179 180 class error(Exception): pass # Logical errors - debug required 181 class abort(error): pass # Service errors - close and retry 182 class readonly(abort): pass # Mailbox status changed to READ-ONLY 183 184 def __init__(self, host='', port=IMAP4_PORT): 185 self.debug = Debug 186 self.state = 'LOGOUT' 187 self.literal = None # A literal argument to a command 188 self.tagged_commands = {} # Tagged commands awaiting response 189 self.untagged_responses = {} # {typ: [data, ...], ...} 190 self.continuation_response = '' # Last continuation response 191 self.is_readonly = False # READ-ONLY desired state 192 self.tagnum = 0 193 self._tls_established = False 194 self._mode_ascii() 195 196 # Open socket to server. 197 198 self.open(host, port) 199 200 try: 201 self._connect() 202 except Exception: 203 try: 204 self.shutdown() 205 except OSError: 206 pass 207 raise 208 209 def _mode_ascii(self): 210 self.utf8_enabled = False 211 self._encoding = 'ascii' 212 self.Literal = re.compile(_Literal, re.ASCII) 213 self.Untagged_status = re.compile(_Untagged_status, re.ASCII) 214 215 216 def _mode_utf8(self): 217 self.utf8_enabled = True 218 self._encoding = 'utf-8' 219 self.Literal = re.compile(_Literal) 220 self.Untagged_status = re.compile(_Untagged_status) 221 222 223 def _connect(self): 224 # Create unique tag for this session, 225 # and compile tagged response matcher. 226 227 self.tagpre = Int2AP(random.randint(4096, 65535)) 228 self.tagre = re.compile(br'(?P<tag>' 229 + self.tagpre 230 + br'\d+) (?P<type>[A-Z]+) (?P<data>.*)', re.ASCII) 231 232 # Get server welcome message, 233 # request and store CAPABILITY response. 234 235 if __debug__: 236 self._cmd_log_len = 10 237 self._cmd_log_idx = 0 238 self._cmd_log = {} # Last `_cmd_log_len' interactions 239 if self.debug >= 1: 240 self._mesg('imaplib version %s' % __version__) 241 self._mesg('new IMAP4 connection, tag=%s' % self.tagpre) 242 243 self.welcome = self._get_response() 244 if 'PREAUTH' in self.untagged_responses: 245 self.state = 'AUTH' 246 elif 'OK' in self.untagged_responses: 247 self.state = 'NONAUTH' 248 else: 249 raise self.error(self.welcome) 250 251 self._get_capabilities() 252 if __debug__: 253 if self.debug >= 3: 254 self._mesg('CAPABILITIES: %r' % (self.capabilities,)) 255 256 for version in AllowedVersions: 257 if not version in self.capabilities: 258 continue 259 self.PROTOCOL_VERSION = version 260 return 261 262 raise self.error('server not IMAP4 compliant') 263 264 265 def __getattr__(self, attr): 266 # Allow UPPERCASE variants of IMAP4 command methods. 267 if attr in Commands: 268 return getattr(self, attr.lower()) 269 raise AttributeError("Unknown IMAP4 command: '%s'" % attr) 270 271 def __enter__(self): 272 return self 273 274 def __exit__(self, *args): 275 if self.state == "LOGOUT": 276 return 277 278 try: 279 self.logout() 280 except OSError: 281 pass 282 283 284 # Overridable methods 285 286 287 def _create_socket(self): 288 # Default value of IMAP4.host is '', but socket.getaddrinfo() 289 # (which is used by socket.create_connection()) expects None 290 # as a default value for host. 291 host = None if not self.host else self.host 292 sys.audit("imaplib.open", self, self.host, self.port) 293 return socket.create_connection((host, self.port)) 294 295 def open(self, host = '', port = IMAP4_PORT): 296 """Setup connection to remote server on "host:port" 297 (default: localhost:standard IMAP4 port). 298 This connection will be used by the routines: 299 read, readline, send, shutdown. 300 """ 301 self.host = host 302 self.port = port 303 self.sock = self._create_socket() 304 self.file = self.sock.makefile('rb') 305 306 307 def read(self, size): 308 """Read 'size' bytes from remote.""" 309 return self.file.read(size) 310 311 312 def readline(self): 313 """Read line from remote.""" 314 line = self.file.readline(_MAXLINE + 1) 315 if len(line) > _MAXLINE: 316 raise self.error("got more than %d bytes" % _MAXLINE) 317 return line 318 319 320 def send(self, data): 321 """Send data to remote.""" 322 sys.audit("imaplib.send", self, data) 323 self.sock.sendall(data) 324 325 326 def shutdown(self): 327 """Close I/O established in "open".""" 328 self.file.close() 329 try: 330 self.sock.shutdown(socket.SHUT_RDWR) 331 except OSError as exc: 332 # The server might already have closed the connection. 333 # On Windows, this may result in WSAEINVAL (error 10022): 334 # An invalid operation was attempted. 335 if (exc.errno != errno.ENOTCONN 336 and getattr(exc, 'winerror', 0) != 10022): 337 raise 338 finally: 339 self.sock.close() 340 341 342 def socket(self): 343 """Return socket instance used to connect to IMAP4 server. 344 345 socket = <instance>.socket() 346 """ 347 return self.sock 348 349 350 351 # Utility methods 352 353 354 def recent(self): 355 """Return most recent 'RECENT' responses if any exist, 356 else prompt server for an update using the 'NOOP' command. 357 358 (typ, [data]) = <instance>.recent() 359 360 'data' is None if no new messages, 361 else list of RECENT responses, most recent last. 362 """ 363 name = 'RECENT' 364 typ, dat = self._untagged_response('OK', [None], name) 365 if dat[-1]: 366 return typ, dat 367 typ, dat = self.noop() # Prod server for response 368 return self._untagged_response(typ, dat, name) 369 370 371 def response(self, code): 372 """Return data for response 'code' if received, or None. 373 374 Old value for response 'code' is cleared. 375 376 (code, [data]) = <instance>.response(code) 377 """ 378 return self._untagged_response(code, [None], code.upper()) 379 380 381 382 # IMAP4 commands 383 384 385 def append(self, mailbox, flags, date_time, message): 386 """Append message to named mailbox. 387 388 (typ, [data]) = <instance>.append(mailbox, flags, date_time, message) 389 390 All args except `message' can be None. 391 """ 392 name = 'APPEND' 393 if not mailbox: 394 mailbox = 'INBOX' 395 if flags: 396 if (flags[0],flags[-1]) != ('(',')'): 397 flags = '(%s)' % flags 398 else: 399 flags = None 400 if date_time: 401 date_time = Time2Internaldate(date_time) 402 else: 403 date_time = None 404 literal = MapCRLF.sub(CRLF, message) 405 if self.utf8_enabled: 406 literal = b'UTF8 (' + literal + b')' 407 self.literal = literal 408 return self._simple_command(name, mailbox, flags, date_time) 409 410 411 def authenticate(self, mechanism, authobject): 412 """Authenticate command - requires response processing. 413 414 'mechanism' specifies which authentication mechanism is to 415 be used - it must appear in <instance>.capabilities in the 416 form AUTH=<mechanism>. 417 418 'authobject' must be a callable object: 419 420 data = authobject(response) 421 422 It will be called to process server continuation responses; the 423 response argument it is passed will be a bytes. It should return bytes 424 data that will be base64 encoded and sent to the server. It should 425 return None if the client abort response '*' should be sent instead. 426 """ 427 mech = mechanism.upper() 428 # XXX: shouldn't this code be removed, not commented out? 429 #cap = 'AUTH=%s' % mech 430 #if not cap in self.capabilities: # Let the server decide! 431 # raise self.error("Server doesn't allow %s authentication." % mech) 432 self.literal = _Authenticator(authobject).process 433 typ, dat = self._simple_command('AUTHENTICATE', mech) 434 if typ != 'OK': 435 raise self.error(dat[-1].decode('utf-8', 'replace')) 436 self.state = 'AUTH' 437 return typ, dat 438 439 440 def capability(self): 441 """(typ, [data]) = <instance>.capability() 442 Fetch capabilities list from server.""" 443 444 name = 'CAPABILITY' 445 typ, dat = self._simple_command(name) 446 return self._untagged_response(typ, dat, name) 447 448 449 def check(self): 450 """Checkpoint mailbox on server. 451 452 (typ, [data]) = <instance>.check() 453 """ 454 return self._simple_command('CHECK') 455 456 457 def close(self): 458 """Close currently selected mailbox. 459 460 Deleted messages are removed from writable mailbox. 461 This is the recommended command before 'LOGOUT'. 462 463 (typ, [data]) = <instance>.close() 464 """ 465 try: 466 typ, dat = self._simple_command('CLOSE') 467 finally: 468 self.state = 'AUTH' 469 return typ, dat 470 471 472 def copy(self, message_set, new_mailbox): 473 """Copy 'message_set' messages onto end of 'new_mailbox'. 474 475 (typ, [data]) = <instance>.copy(message_set, new_mailbox) 476 """ 477 return self._simple_command('COPY', message_set, new_mailbox) 478 479 480 def create(self, mailbox): 481 """Create new mailbox. 482 483 (typ, [data]) = <instance>.create(mailbox) 484 """ 485 return self._simple_command('CREATE', mailbox) 486 487 488 def delete(self, mailbox): 489 """Delete old mailbox. 490 491 (typ, [data]) = <instance>.delete(mailbox) 492 """ 493 return self._simple_command('DELETE', mailbox) 494 495 def deleteacl(self, mailbox, who): 496 """Delete the ACLs (remove any rights) set for who on mailbox. 497 498 (typ, [data]) = <instance>.deleteacl(mailbox, who) 499 """ 500 return self._simple_command('DELETEACL', mailbox, who) 501 502 def enable(self, capability): 503 """Send an RFC5161 enable string to the server. 504 505 (typ, [data]) = <intance>.enable(capability) 506 """ 507 if 'ENABLE' not in self.capabilities: 508 raise IMAP4.error("Server does not support ENABLE") 509 typ, data = self._simple_command('ENABLE', capability) 510 if typ == 'OK' and 'UTF8=ACCEPT' in capability.upper(): 511 self._mode_utf8() 512 return typ, data 513 514 def expunge(self): 515 """Permanently remove deleted items from selected mailbox. 516 517 Generates 'EXPUNGE' response for each deleted message. 518 519 (typ, [data]) = <instance>.expunge() 520 521 'data' is list of 'EXPUNGE'd message numbers in order received. 522 """ 523 name = 'EXPUNGE' 524 typ, dat = self._simple_command(name) 525 return self._untagged_response(typ, dat, name) 526 527 528 def fetch(self, message_set, message_parts): 529 """Fetch (parts of) messages. 530 531 (typ, [data, ...]) = <instance>.fetch(message_set, message_parts) 532 533 'message_parts' should be a string of selected parts 534 enclosed in parentheses, eg: "(UID BODY[TEXT])". 535 536 'data' are tuples of message part envelope and data. 537 """ 538 name = 'FETCH' 539 typ, dat = self._simple_command(name, message_set, message_parts) 540 return self._untagged_response(typ, dat, name) 541 542 543 def getacl(self, mailbox): 544 """Get the ACLs for a mailbox. 545 546 (typ, [data]) = <instance>.getacl(mailbox) 547 """ 548 typ, dat = self._simple_command('GETACL', mailbox) 549 return self._untagged_response(typ, dat, 'ACL') 550 551 552 def getannotation(self, mailbox, entry, attribute): 553 """(typ, [data]) = <instance>.getannotation(mailbox, entry, attribute) 554 Retrieve ANNOTATIONs.""" 555 556 typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute) 557 return self._untagged_response(typ, dat, 'ANNOTATION') 558 559 560 def getquota(self, root): 561 """Get the quota root's resource usage and limits. 562 563 Part of the IMAP4 QUOTA extension defined in rfc2087. 564 565 (typ, [data]) = <instance>.getquota(root) 566 """ 567 typ, dat = self._simple_command('GETQUOTA', root) 568 return self._untagged_response(typ, dat, 'QUOTA') 569 570 571 def getquotaroot(self, mailbox): 572 """Get the list of quota roots for the named mailbox. 573 574 (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = <instance>.getquotaroot(mailbox) 575 """ 576 typ, dat = self._simple_command('GETQUOTAROOT', mailbox) 577 typ, quota = self._untagged_response(typ, dat, 'QUOTA') 578 typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT') 579 return typ, [quotaroot, quota] 580 581 582 def list(self, directory='""', pattern='*'): 583 """List mailbox names in directory matching pattern. 584 585 (typ, [data]) = <instance>.list(directory='""', pattern='*') 586 587 'data' is list of LIST responses. 588 """ 589 name = 'LIST' 590 typ, dat = self._simple_command(name, directory, pattern) 591 return self._untagged_response(typ, dat, name) 592 593 594 def login(self, user, password): 595 """Identify client using plaintext password. 596 597 (typ, [data]) = <instance>.login(user, password) 598 599 NB: 'password' will be quoted. 600 """ 601 typ, dat = self._simple_command('LOGIN', user, self._quote(password)) 602 if typ != 'OK': 603 raise self.error(dat[-1]) 604 self.state = 'AUTH' 605 return typ, dat 606 607 608 def login_cram_md5(self, user, password): 609 """ Force use of CRAM-MD5 authentication. 610 611 (typ, [data]) = <instance>.login_cram_md5(user, password) 612 """ 613 self.user, self.password = user, password 614 return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH) 615 616 617 def _CRAM_MD5_AUTH(self, challenge): 618 """ Authobject to use with CRAM-MD5 authentication. """ 619 import hmac 620 pwd = (self.password.encode('utf-8') if isinstance(self.password, str) 621 else self.password) 622 return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest() 623 624 625 def logout(self): 626 """Shutdown connection to server. 627 628 (typ, [data]) = <instance>.logout() 629 630 Returns server 'BYE' response. 631 """ 632 self.state = 'LOGOUT' 633 typ, dat = self._simple_command('LOGOUT') 634 self.shutdown() 635 return typ, dat 636 637 638 def lsub(self, directory='""', pattern='*'): 639 """List 'subscribed' mailbox names in directory matching pattern. 640 641 (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*') 642 643 'data' are tuples of message part envelope and data. 644 """ 645 name = 'LSUB' 646 typ, dat = self._simple_command(name, directory, pattern) 647 return self._untagged_response(typ, dat, name) 648 649 def myrights(self, mailbox): 650 """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox). 651 652 (typ, [data]) = <instance>.myrights(mailbox) 653 """ 654 typ,dat = self._simple_command('MYRIGHTS', mailbox) 655 return self._untagged_response(typ, dat, 'MYRIGHTS') 656 657 def namespace(self): 658 """ Returns IMAP namespaces ala rfc2342 659 660 (typ, [data, ...]) = <instance>.namespace() 661 """ 662 name = 'NAMESPACE' 663 typ, dat = self._simple_command(name) 664 return self._untagged_response(typ, dat, name) 665 666 667 def noop(self): 668 """Send NOOP command. 669 670 (typ, [data]) = <instance>.noop() 671 """ 672 if __debug__: 673 if self.debug >= 3: 674 self._dump_ur(self.untagged_responses) 675 return self._simple_command('NOOP') 676 677 678 def partial(self, message_num, message_part, start, length): 679 """Fetch truncated part of a message. 680 681 (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length) 682 683 'data' is tuple of message part envelope and data. 684 """ 685 name = 'PARTIAL' 686 typ, dat = self._simple_command(name, message_num, message_part, start, length) 687 return self._untagged_response(typ, dat, 'FETCH') 688 689 690 def proxyauth(self, user): 691 """Assume authentication as "user". 692 693 Allows an authorised administrator to proxy into any user's 694 mailbox. 695 696 (typ, [data]) = <instance>.proxyauth(user) 697 """ 698 699 name = 'PROXYAUTH' 700 return self._simple_command('PROXYAUTH', user) 701 702 703 def rename(self, oldmailbox, newmailbox): 704 """Rename old mailbox name to new. 705 706 (typ, [data]) = <instance>.rename(oldmailbox, newmailbox) 707 """ 708 return self._simple_command('RENAME', oldmailbox, newmailbox) 709 710 711 def search(self, charset, *criteria): 712 """Search mailbox for matching messages. 713 714 (typ, [data]) = <instance>.search(charset, criterion, ...) 715 716 'data' is space separated list of matching message numbers. 717 If UTF8 is enabled, charset MUST be None. 718 """ 719 name = 'SEARCH' 720 if charset: 721 if self.utf8_enabled: 722 raise IMAP4.error("Non-None charset not valid in UTF8 mode") 723 typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria) 724 else: 725 typ, dat = self._simple_command(name, *criteria) 726 return self._untagged_response(typ, dat, name) 727 728 729 def select(self, mailbox='INBOX', readonly=False): 730 """Select a mailbox. 731 732 Flush all untagged responses. 733 734 (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=False) 735 736 'data' is count of messages in mailbox ('EXISTS' response). 737 738 Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY'), so 739 other responses should be obtained via <instance>.response('FLAGS') etc. 740 """ 741 self.untagged_responses = {} # Flush old responses. 742 self.is_readonly = readonly 743 if readonly: 744 name = 'EXAMINE' 745 else: 746 name = 'SELECT' 747 typ, dat = self._simple_command(name, mailbox) 748 if typ != 'OK': 749 self.state = 'AUTH' # Might have been 'SELECTED' 750 return typ, dat 751 self.state = 'SELECTED' 752 if 'READ-ONLY' in self.untagged_responses \ 753 and not readonly: 754 if __debug__: 755 if self.debug >= 1: 756 self._dump_ur(self.untagged_responses) 757 raise self.readonly('%s is not writable' % mailbox) 758 return typ, self.untagged_responses.get('EXISTS', [None]) 759 760 761 def setacl(self, mailbox, who, what): 762 """Set a mailbox acl. 763 764 (typ, [data]) = <instance>.setacl(mailbox, who, what) 765 """ 766 return self._simple_command('SETACL', mailbox, who, what) 767 768 769 def setannotation(self, *args): 770 """(typ, [data]) = <instance>.setannotation(mailbox[, entry, attribute]+) 771 Set ANNOTATIONs.""" 772 773 typ, dat = self._simple_command('SETANNOTATION', *args) 774 return self._untagged_response(typ, dat, 'ANNOTATION') 775 776 777 def setquota(self, root, limits): 778 """Set the quota root's resource limits. 779 780 (typ, [data]) = <instance>.setquota(root, limits) 781 """ 782 typ, dat = self._simple_command('SETQUOTA', root, limits) 783 return self._untagged_response(typ, dat, 'QUOTA') 784 785 786 def sort(self, sort_criteria, charset, *search_criteria): 787 """IMAP4rev1 extension SORT command. 788 789 (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...) 790 """ 791 name = 'SORT' 792 #if not name in self.capabilities: # Let the server decide! 793 # raise self.error('unimplemented extension command: %s' % name) 794 if (sort_criteria[0],sort_criteria[-1]) != ('(',')'): 795 sort_criteria = '(%s)' % sort_criteria 796 typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria) 797 return self._untagged_response(typ, dat, name) 798 799 800 def starttls(self, ssl_context=None): 801 name = 'STARTTLS' 802 if not HAVE_SSL: 803 raise self.error('SSL support missing') 804 if self._tls_established: 805 raise self.abort('TLS session already established') 806 if name not in self.capabilities: 807 raise self.abort('TLS not supported by server') 808 # Generate a default SSL context if none was passed. 809 if ssl_context is None: 810 ssl_context = ssl._create_stdlib_context() 811 typ, dat = self._simple_command(name) 812 if typ == 'OK': 813 self.sock = ssl_context.wrap_socket(self.sock, 814 server_hostname=self.host) 815 self.file = self.sock.makefile('rb') 816 self._tls_established = True 817 self._get_capabilities() 818 else: 819 raise self.error("Couldn't establish TLS session") 820 return self._untagged_response(typ, dat, name) 821 822 823 def status(self, mailbox, names): 824 """Request named status conditions for mailbox. 825 826 (typ, [data]) = <instance>.status(mailbox, names) 827 """ 828 name = 'STATUS' 829 #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! 830 # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) 831 typ, dat = self._simple_command(name, mailbox, names) 832 return self._untagged_response(typ, dat, name) 833 834 835 def store(self, message_set, command, flags): 836 """Alters flag dispositions for messages in mailbox. 837 838 (typ, [data]) = <instance>.store(message_set, command, flags) 839 """ 840 if (flags[0],flags[-1]) != ('(',')'): 841 flags = '(%s)' % flags # Avoid quoting the flags 842 typ, dat = self._simple_command('STORE', message_set, command, flags) 843 return self._untagged_response(typ, dat, 'FETCH') 844 845 846 def subscribe(self, mailbox): 847 """Subscribe to new mailbox. 848 849 (typ, [data]) = <instance>.subscribe(mailbox) 850 """ 851 return self._simple_command('SUBSCRIBE', mailbox) 852 853 854 def thread(self, threading_algorithm, charset, *search_criteria): 855 """IMAPrev1 extension THREAD command. 856 857 (type, [data]) = <instance>.thread(threading_algorithm, charset, search_criteria, ...) 858 """ 859 name = 'THREAD' 860 typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria) 861 return self._untagged_response(typ, dat, name) 862 863 864 def uid(self, command, *args): 865 """Execute "command arg ..." with messages identified by UID, 866 rather than message number. 867 868 (typ, [data]) = <instance>.uid(command, arg1, arg2, ...) 869 870 Returns response appropriate to 'command'. 871 """ 872 command = command.upper() 873 if not command in Commands: 874 raise self.error("Unknown IMAP4 UID command: %s" % command) 875 if self.state not in Commands[command]: 876 raise self.error("command %s illegal in state %s, " 877 "only allowed in states %s" % 878 (command, self.state, 879 ', '.join(Commands[command]))) 880 name = 'UID' 881 typ, dat = self._simple_command(name, command, *args) 882 if command in ('SEARCH', 'SORT', 'THREAD'): 883 name = command 884 else: 885 name = 'FETCH' 886 return self._untagged_response(typ, dat, name) 887 888 889 def unsubscribe(self, mailbox): 890 """Unsubscribe from old mailbox. 891 892 (typ, [data]) = <instance>.unsubscribe(mailbox) 893 """ 894 return self._simple_command('UNSUBSCRIBE', mailbox) 895 896 897 def xatom(self, name, *args): 898 """Allow simple extension commands 899 notified by server in CAPABILITY response. 900 901 Assumes command is legal in current state. 902 903 (typ, [data]) = <instance>.xatom(name, arg, ...) 904 905 Returns response appropriate to extension command `name'. 906 """ 907 name = name.upper() 908 #if not name in self.capabilities: # Let the server decide! 909 # raise self.error('unknown extension command: %s' % name) 910 if not name in Commands: 911 Commands[name] = (self.state,) 912 return self._simple_command(name, *args) 913 914 915 916 # Private methods 917 918 919 def _append_untagged(self, typ, dat): 920 if dat is None: 921 dat = b'' 922 ur = self.untagged_responses 923 if __debug__: 924 if self.debug >= 5: 925 self._mesg('untagged_responses[%s] %s += ["%r"]' % 926 (typ, len(ur.get(typ,'')), dat)) 927 if typ in ur: 928 ur[typ].append(dat) 929 else: 930 ur[typ] = [dat] 931 932 933 def _check_bye(self): 934 bye = self.untagged_responses.get('BYE') 935 if bye: 936 raise self.abort(bye[-1].decode(self._encoding, 'replace')) 937 938 939 def _command(self, name, *args): 940 941 if self.state not in Commands[name]: 942 self.literal = None 943 raise self.error("command %s illegal in state %s, " 944 "only allowed in states %s" % 945 (name, self.state, 946 ', '.join(Commands[name]))) 947 948 for typ in ('OK', 'NO', 'BAD'): 949 if typ in self.untagged_responses: 950 del self.untagged_responses[typ] 951 952 if 'READ-ONLY' in self.untagged_responses \ 953 and not self.is_readonly: 954 raise self.readonly('mailbox status changed to READ-ONLY') 955 956 tag = self._new_tag() 957 name = bytes(name, self._encoding) 958 data = tag + b' ' + name 959 for arg in args: 960 if arg is None: continue 961 if isinstance(arg, str): 962 arg = bytes(arg, self._encoding) 963 data = data + b' ' + arg 964 965 literal = self.literal 966 if literal is not None: 967 self.literal = None 968 if type(literal) is type(self._command): 969 literator = literal 970 else: 971 literator = None 972 data = data + bytes(' {%s}' % len(literal), self._encoding) 973 974 if __debug__: 975 if self.debug >= 4: 976 self._mesg('> %r' % data) 977 else: 978 self._log('> %r' % data) 979 980 try: 981 self.send(data + CRLF) 982 except OSError as val: 983 raise self.abort('socket error: %s' % val) 984 985 if literal is None: 986 return tag 987 988 while 1: 989 # Wait for continuation response 990 991 while self._get_response(): 992 if self.tagged_commands[tag]: # BAD/NO? 993 return tag 994 995 # Send literal 996 997 if literator: 998 literal = literator(self.continuation_response) 999 1000 if __debug__: 1001 if self.debug >= 4: 1002 self._mesg('write literal size %s' % len(literal)) 1003 1004 try: 1005 self.send(literal) 1006 self.send(CRLF) 1007 except OSError as val: 1008 raise self.abort('socket error: %s' % val) 1009 1010 if not literator: 1011 break 1012 1013 return tag 1014 1015 1016 def _command_complete(self, name, tag): 1017 logout = (name == 'LOGOUT') 1018 # BYE is expected after LOGOUT 1019 if not logout: 1020 self._check_bye() 1021 try: 1022 typ, data = self._get_tagged_response(tag, expect_bye=logout) 1023 except self.abort as val: 1024 raise self.abort('command: %s => %s' % (name, val)) 1025 except self.error as val: 1026 raise self.error('command: %s => %s' % (name, val)) 1027 if not logout: 1028 self._check_bye() 1029 if typ == 'BAD': 1030 raise self.error('%s command error: %s %s' % (name, typ, data)) 1031 return typ, data 1032 1033 1034 def _get_capabilities(self): 1035 typ, dat = self.capability() 1036 if dat == [None]: 1037 raise self.error('no CAPABILITY response from server') 1038 dat = str(dat[-1], self._encoding) 1039 dat = dat.upper() 1040 self.capabilities = tuple(dat.split()) 1041 1042 1043 def _get_response(self): 1044 1045 # Read response and store. 1046 # 1047 # Returns None for continuation responses, 1048 # otherwise first response line received. 1049 1050 resp = self._get_line() 1051 1052 # Command completion response? 1053 1054 if self._match(self.tagre, resp): 1055 tag = self.mo.group('tag') 1056 if not tag in self.tagged_commands: 1057 raise self.abort('unexpected tagged response: %r' % resp) 1058 1059 typ = self.mo.group('type') 1060 typ = str(typ, self._encoding) 1061 dat = self.mo.group('data') 1062 self.tagged_commands[tag] = (typ, [dat]) 1063 else: 1064 dat2 = None 1065 1066 # '*' (untagged) responses? 1067 1068 if not self._match(Untagged_response, resp): 1069 if self._match(self.Untagged_status, resp): 1070 dat2 = self.mo.group('data2') 1071 1072 if self.mo is None: 1073 # Only other possibility is '+' (continuation) response... 1074 1075 if self._match(Continuation, resp): 1076 self.continuation_response = self.mo.group('data') 1077 return None # NB: indicates continuation 1078 1079 raise self.abort("unexpected response: %r" % resp) 1080 1081 typ = self.mo.group('type') 1082 typ = str(typ, self._encoding) 1083 dat = self.mo.group('data') 1084 if dat is None: dat = b'' # Null untagged response 1085 if dat2: dat = dat + b' ' + dat2 1086 1087 # Is there a literal to come? 1088 1089 while self._match(self.Literal, dat): 1090 1091 # Read literal direct from connection. 1092 1093 size = int(self.mo.group('size')) 1094 if __debug__: 1095 if self.debug >= 4: 1096 self._mesg('read literal size %s' % size) 1097 data = self.read(size) 1098 1099 # Store response with literal as tuple 1100 1101 self._append_untagged(typ, (dat, data)) 1102 1103 # Read trailer - possibly containing another literal 1104 1105 dat = self._get_line() 1106 1107 self._append_untagged(typ, dat) 1108 1109 # Bracketed response information? 1110 1111 if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat): 1112 typ = self.mo.group('type') 1113 typ = str(typ, self._encoding) 1114 self._append_untagged(typ, self.mo.group('data')) 1115 1116 if __debug__: 1117 if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'): 1118 self._mesg('%s response: %r' % (typ, dat)) 1119 1120 return resp 1121 1122 1123 def _get_tagged_response(self, tag, expect_bye=False): 1124 1125 while 1: 1126 result = self.tagged_commands[tag] 1127 if result is not None: 1128 del self.tagged_commands[tag] 1129 return result 1130 1131 if expect_bye: 1132 typ = 'BYE' 1133 bye = self.untagged_responses.pop(typ, None) 1134 if bye is not None: 1135 # Server replies to the "LOGOUT" command with "BYE" 1136 return (typ, bye) 1137 1138 # If we've seen a BYE at this point, the socket will be 1139 # closed, so report the BYE now. 1140 self._check_bye() 1141 1142 # Some have reported "unexpected response" exceptions. 1143 # Note that ignoring them here causes loops. 1144 # Instead, send me details of the unexpected response and 1145 # I'll update the code in `_get_response()'. 1146 1147 try: 1148 self._get_response() 1149 except self.abort as val: 1150 if __debug__: 1151 if self.debug >= 1: 1152 self.print_log() 1153 raise 1154 1155 1156 def _get_line(self): 1157 1158 line = self.readline() 1159 if not line: 1160 raise self.abort('socket error: EOF') 1161 1162 # Protocol mandates all lines terminated by CRLF 1163 if not line.endswith(b'\r\n'): 1164 raise self.abort('socket error: unterminated line: %r' % line) 1165 1166 line = line[:-2] 1167 if __debug__: 1168 if self.debug >= 4: 1169 self._mesg('< %r' % line) 1170 else: 1171 self._log('< %r' % line) 1172 return line 1173 1174 1175 def _match(self, cre, s): 1176 1177 # Run compiled regular expression match method on 's'. 1178 # Save result, return success. 1179 1180 self.mo = cre.match(s) 1181 if __debug__: 1182 if self.mo is not None and self.debug >= 5: 1183 self._mesg("\tmatched %r => %r" % (cre.pattern, self.mo.groups())) 1184 return self.mo is not None 1185 1186 1187 def _new_tag(self): 1188 1189 tag = self.tagpre + bytes(str(self.tagnum), self._encoding) 1190 self.tagnum = self.tagnum + 1 1191 self.tagged_commands[tag] = None 1192 return tag 1193 1194 1195 def _quote(self, arg): 1196 1197 arg = arg.replace('\\', '\\\\') 1198 arg = arg.replace('"', '\\"') 1199 1200 return '"' + arg + '"' 1201 1202 1203 def _simple_command(self, name, *args): 1204 1205 return self._command_complete(name, self._command(name, *args)) 1206 1207 1208 def _untagged_response(self, typ, dat, name): 1209 if typ == 'NO': 1210 return typ, dat 1211 if not name in self.untagged_responses: 1212 return typ, [None] 1213 data = self.untagged_responses.pop(name) 1214 if __debug__: 1215 if self.debug >= 5: 1216 self._mesg('untagged_responses[%s] => %s' % (name, data)) 1217 return typ, data 1218 1219 1220 if __debug__: 1221 1222 def _mesg(self, s, secs=None): 1223 if secs is None: 1224 secs = time.time() 1225 tm = time.strftime('%M:%S', time.localtime(secs)) 1226 sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s)) 1227 sys.stderr.flush() 1228 1229 def _dump_ur(self, dict): 1230 # Dump untagged responses (in `dict'). 1231 l = dict.items() 1232 if not l: return 1233 t = '\n\t\t' 1234 l = map(lambda x:'%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l) 1235 self._mesg('untagged responses dump:%s%s' % (t, t.join(l))) 1236 1237 def _log(self, line): 1238 # Keep log of last `_cmd_log_len' interactions for debugging. 1239 self._cmd_log[self._cmd_log_idx] = (line, time.time()) 1240 self._cmd_log_idx += 1 1241 if self._cmd_log_idx >= self._cmd_log_len: 1242 self._cmd_log_idx = 0 1243 1244 def print_log(self): 1245 self._mesg('last %d IMAP4 interactions:' % len(self._cmd_log)) 1246 i, n = self._cmd_log_idx, self._cmd_log_len 1247 while n: 1248 try: 1249 self._mesg(*self._cmd_log[i]) 1250 except: 1251 pass 1252 i += 1 1253 if i >= self._cmd_log_len: 1254 i = 0 1255 n -= 1 1256 1257 1258if HAVE_SSL: 1259 1260 class IMAP4_SSL(IMAP4): 1261 1262 """IMAP4 client class over SSL connection 1263 1264 Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile[, ssl_context]]]]]) 1265 1266 host - host's name (default: localhost); 1267 port - port number (default: standard IMAP4 SSL port); 1268 keyfile - PEM formatted file that contains your private key (default: None); 1269 certfile - PEM formatted certificate chain file (default: None); 1270 ssl_context - a SSLContext object that contains your certificate chain 1271 and private key (default: None) 1272 Note: if ssl_context is provided, then parameters keyfile or 1273 certfile should not be set otherwise ValueError is raised. 1274 1275 for more documentation see the docstring of the parent class IMAP4. 1276 """ 1277 1278 1279 def __init__(self, host='', port=IMAP4_SSL_PORT, keyfile=None, 1280 certfile=None, ssl_context=None): 1281 if ssl_context is not None and keyfile is not None: 1282 raise ValueError("ssl_context and keyfile arguments are mutually " 1283 "exclusive") 1284 if ssl_context is not None and certfile is not None: 1285 raise ValueError("ssl_context and certfile arguments are mutually " 1286 "exclusive") 1287 if keyfile is not None or certfile is not None: 1288 import warnings 1289 warnings.warn("keyfile and certfile are deprecated, use a " 1290 "custom ssl_context instead", DeprecationWarning, 2) 1291 self.keyfile = keyfile 1292 self.certfile = certfile 1293 if ssl_context is None: 1294 ssl_context = ssl._create_stdlib_context(certfile=certfile, 1295 keyfile=keyfile) 1296 self.ssl_context = ssl_context 1297 IMAP4.__init__(self, host, port) 1298 1299 def _create_socket(self): 1300 sock = IMAP4._create_socket(self) 1301 return self.ssl_context.wrap_socket(sock, 1302 server_hostname=self.host) 1303 1304 def open(self, host='', port=IMAP4_SSL_PORT): 1305 """Setup connection to remote server on "host:port". 1306 (default: localhost:standard IMAP4 SSL port). 1307 This connection will be used by the routines: 1308 read, readline, send, shutdown. 1309 """ 1310 IMAP4.open(self, host, port) 1311 1312 __all__.append("IMAP4_SSL") 1313 1314 1315class IMAP4_stream(IMAP4): 1316 1317 """IMAP4 client class over a stream 1318 1319 Instantiate with: IMAP4_stream(command) 1320 1321 "command" - a string that can be passed to subprocess.Popen() 1322 1323 for more documentation see the docstring of the parent class IMAP4. 1324 """ 1325 1326 1327 def __init__(self, command): 1328 self.command = command 1329 IMAP4.__init__(self) 1330 1331 1332 def open(self, host = None, port = None): 1333 """Setup a stream connection. 1334 This connection will be used by the routines: 1335 read, readline, send, shutdown. 1336 """ 1337 self.host = None # For compatibility with parent class 1338 self.port = None 1339 self.sock = None 1340 self.file = None 1341 self.process = subprocess.Popen(self.command, 1342 bufsize=DEFAULT_BUFFER_SIZE, 1343 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1344 shell=True, close_fds=True) 1345 self.writefile = self.process.stdin 1346 self.readfile = self.process.stdout 1347 1348 def read(self, size): 1349 """Read 'size' bytes from remote.""" 1350 return self.readfile.read(size) 1351 1352 1353 def readline(self): 1354 """Read line from remote.""" 1355 return self.readfile.readline() 1356 1357 1358 def send(self, data): 1359 """Send data to remote.""" 1360 self.writefile.write(data) 1361 self.writefile.flush() 1362 1363 1364 def shutdown(self): 1365 """Close I/O established in "open".""" 1366 self.readfile.close() 1367 self.writefile.close() 1368 self.process.wait() 1369 1370 1371 1372class _Authenticator: 1373 1374 """Private class to provide en/decoding 1375 for base64-based authentication conversation. 1376 """ 1377 1378 def __init__(self, mechinst): 1379 self.mech = mechinst # Callable object to provide/process data 1380 1381 def process(self, data): 1382 ret = self.mech(self.decode(data)) 1383 if ret is None: 1384 return b'*' # Abort conversation 1385 return self.encode(ret) 1386 1387 def encode(self, inp): 1388 # 1389 # Invoke binascii.b2a_base64 iteratively with 1390 # short even length buffers, strip the trailing 1391 # line feed from the result and append. "Even" 1392 # means a number that factors to both 6 and 8, 1393 # so when it gets to the end of the 8-bit input 1394 # there's no partial 6-bit output. 1395 # 1396 oup = b'' 1397 if isinstance(inp, str): 1398 inp = inp.encode('utf-8') 1399 while inp: 1400 if len(inp) > 48: 1401 t = inp[:48] 1402 inp = inp[48:] 1403 else: 1404 t = inp 1405 inp = b'' 1406 e = binascii.b2a_base64(t) 1407 if e: 1408 oup = oup + e[:-1] 1409 return oup 1410 1411 def decode(self, inp): 1412 if not inp: 1413 return b'' 1414 return binascii.a2b_base64(inp) 1415 1416Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ') 1417Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])} 1418 1419def Internaldate2tuple(resp): 1420 """Parse an IMAP4 INTERNALDATE string. 1421 1422 Return corresponding local time. The return value is a 1423 time.struct_time tuple or None if the string has wrong format. 1424 """ 1425 1426 mo = InternalDate.match(resp) 1427 if not mo: 1428 return None 1429 1430 mon = Mon2num[mo.group('mon')] 1431 zonen = mo.group('zonen') 1432 1433 day = int(mo.group('day')) 1434 year = int(mo.group('year')) 1435 hour = int(mo.group('hour')) 1436 min = int(mo.group('min')) 1437 sec = int(mo.group('sec')) 1438 zoneh = int(mo.group('zoneh')) 1439 zonem = int(mo.group('zonem')) 1440 1441 # INTERNALDATE timezone must be subtracted to get UT 1442 1443 zone = (zoneh*60 + zonem)*60 1444 if zonen == b'-': 1445 zone = -zone 1446 1447 tt = (year, mon, day, hour, min, sec, -1, -1, -1) 1448 utc = calendar.timegm(tt) - zone 1449 1450 return time.localtime(utc) 1451 1452 1453 1454def Int2AP(num): 1455 1456 """Convert integer to A-P string representation.""" 1457 1458 val = b''; AP = b'ABCDEFGHIJKLMNOP' 1459 num = int(abs(num)) 1460 while num: 1461 num, mod = divmod(num, 16) 1462 val = AP[mod:mod+1] + val 1463 return val 1464 1465 1466 1467def ParseFlags(resp): 1468 1469 """Convert IMAP4 flags response to python tuple.""" 1470 1471 mo = Flags.match(resp) 1472 if not mo: 1473 return () 1474 1475 return tuple(mo.group('flags').split()) 1476 1477 1478def Time2Internaldate(date_time): 1479 1480 """Convert date_time to IMAP4 INTERNALDATE representation. 1481 1482 Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'. The 1483 date_time argument can be a number (int or float) representing 1484 seconds since epoch (as returned by time.time()), a 9-tuple 1485 representing local time, an instance of time.struct_time (as 1486 returned by time.localtime()), an aware datetime instance or a 1487 double-quoted string. In the last case, it is assumed to already 1488 be in the correct format. 1489 """ 1490 if isinstance(date_time, (int, float)): 1491 dt = datetime.fromtimestamp(date_time, 1492 timezone.utc).astimezone() 1493 elif isinstance(date_time, tuple): 1494 try: 1495 gmtoff = date_time.tm_gmtoff 1496 except AttributeError: 1497 if time.daylight: 1498 dst = date_time[8] 1499 if dst == -1: 1500 dst = time.localtime(time.mktime(date_time))[8] 1501 gmtoff = -(time.timezone, time.altzone)[dst] 1502 else: 1503 gmtoff = -time.timezone 1504 delta = timedelta(seconds=gmtoff) 1505 dt = datetime(*date_time[:6], tzinfo=timezone(delta)) 1506 elif isinstance(date_time, datetime): 1507 if date_time.tzinfo is None: 1508 raise ValueError("date_time must be aware") 1509 dt = date_time 1510 elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'): 1511 return date_time # Assume in correct format 1512 else: 1513 raise ValueError("date_time not of a known type") 1514 fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month]) 1515 return dt.strftime(fmt) 1516 1517 1518 1519if __name__ == '__main__': 1520 1521 # To test: invoke either as 'python imaplib.py [IMAP4_server_hostname]' 1522 # or 'python imaplib.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"' 1523 # to test the IMAP4_stream class 1524 1525 import getopt, getpass 1526 1527 try: 1528 optlist, args = getopt.getopt(sys.argv[1:], 'd:s:') 1529 except getopt.error as val: 1530 optlist, args = (), () 1531 1532 stream_command = None 1533 for opt,val in optlist: 1534 if opt == '-d': 1535 Debug = int(val) 1536 elif opt == '-s': 1537 stream_command = val 1538 if not args: args = (stream_command,) 1539 1540 if not args: args = ('',) 1541 1542 host = args[0] 1543 1544 USER = getpass.getuser() 1545 PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost")) 1546 1547 test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':'\n'} 1548 test_seq1 = ( 1549 ('login', (USER, PASSWD)), 1550 ('create', ('/tmp/xxx 1',)), 1551 ('rename', ('/tmp/xxx 1', '/tmp/yyy')), 1552 ('CREATE', ('/tmp/yyz 2',)), 1553 ('append', ('/tmp/yyz 2', None, None, test_mesg)), 1554 ('list', ('/tmp', 'yy*')), 1555 ('select', ('/tmp/yyz 2',)), 1556 ('search', (None, 'SUBJECT', 'test')), 1557 ('fetch', ('1', '(FLAGS INTERNALDATE RFC822)')), 1558 ('store', ('1', 'FLAGS', r'(\Deleted)')), 1559 ('namespace', ()), 1560 ('expunge', ()), 1561 ('recent', ()), 1562 ('close', ()), 1563 ) 1564 1565 test_seq2 = ( 1566 ('select', ()), 1567 ('response',('UIDVALIDITY',)), 1568 ('uid', ('SEARCH', 'ALL')), 1569 ('response', ('EXISTS',)), 1570 ('append', (None, None, None, test_mesg)), 1571 ('recent', ()), 1572 ('logout', ()), 1573 ) 1574 1575 def run(cmd, args): 1576 M._mesg('%s %s' % (cmd, args)) 1577 typ, dat = getattr(M, cmd)(*args) 1578 M._mesg('%s => %s %s' % (cmd, typ, dat)) 1579 if typ == 'NO': raise dat[0] 1580 return dat 1581 1582 try: 1583 if stream_command: 1584 M = IMAP4_stream(stream_command) 1585 else: 1586 M = IMAP4(host) 1587 if M.state == 'AUTH': 1588 test_seq1 = test_seq1[1:] # Login not needed 1589 M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION) 1590 M._mesg('CAPABILITIES = %r' % (M.capabilities,)) 1591 1592 for cmd,args in test_seq1: 1593 run(cmd, args) 1594 1595 for ml in run('list', ('/tmp/', 'yy%')): 1596 mo = re.match(r'.*"([^"]+)"$', ml) 1597 if mo: path = mo.group(1) 1598 else: path = ml.split()[-1] 1599 run('delete', (path,)) 1600 1601 for cmd,args in test_seq2: 1602 dat = run(cmd, args) 1603 1604 if (cmd,args) != ('uid', ('SEARCH', 'ALL')): 1605 continue 1606 1607 uid = dat[-1].split() 1608 if not uid: continue 1609 run('uid', ('FETCH', '%s' % uid[-1], 1610 '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)')) 1611 1612 print('\nAll tests OK.') 1613 1614 except: 1615 print('\nTests failed.') 1616 1617 if not Debug: 1618 print(''' 1619If you would like to see debugging output, 1620try: %s -d5 1621''' % sys.argv[0]) 1622 1623 raise 1624