1"""IMAP4 client. 2 3Based on RFC 2060. 4 5Public class: IMAP4 6Public variable: Debug 7Public functions: Internaldate2tuple 8 Int2AP 9 ParseFlags 10 Time2Internaldate 11""" 12 13# Author: Piers Lauder <piers@cs.su.oz.au> December 1997. 14# 15# Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998. 16# String method conversion by ESR, February 2001. 17# GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001. 18# IMAP4_SSL contributed by Tino Lange <Tino.Lange@isg.de> March 2002. 19# GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002. 20# PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002. 21# GET/SETANNOTATION contributed by Tomas Lindroos <skitta@abo.fi> June 2005. 22 23__version__ = "2.58" 24 25import binascii, errno, random, re, socket, subprocess, sys, time, calendar 26from datetime import datetime, timezone, timedelta 27from io import DEFAULT_BUFFER_SIZE 28 29try: 30 import ssl 31 HAVE_SSL = True 32except ImportError: 33 HAVE_SSL = False 34 35__all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple", 36 "Int2AP", "ParseFlags", "Time2Internaldate"] 37 38# Globals 39 40CRLF = b'\r\n' 41Debug = 0 42IMAP4_PORT = 143 43IMAP4_SSL_PORT = 993 44AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first 45 46# Maximal line length when calling readline(). This is to prevent 47# reading arbitrary length lines. RFC 3501 and 2060 (IMAP 4rev1) 48# don't specify a line length. RFC 2683 suggests limiting client 49# command lines to 1000 octets and that servers should be prepared 50# to accept command lines up to 8000 octets, so we used to use 10K here. 51# In the modern world (eg: gmail) the response to, for example, a 52# search command can be quite large, so we now use 1M. 53_MAXLINE = 1000000 54 55 56# Commands 57 58Commands = { 59 # name valid states 60 'APPEND': ('AUTH', 'SELECTED'), 61 'AUTHENTICATE': ('NONAUTH',), 62 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 63 'CHECK': ('SELECTED',), 64 'CLOSE': ('SELECTED',), 65 'COPY': ('SELECTED',), 66 'CREATE': ('AUTH', 'SELECTED'), 67 'DELETE': ('AUTH', 'SELECTED'), 68 'DELETEACL': ('AUTH', 'SELECTED'), 69 'ENABLE': ('AUTH', ), 70 'EXAMINE': ('AUTH', 'SELECTED'), 71 'EXPUNGE': ('SELECTED',), 72 'FETCH': ('SELECTED',), 73 'GETACL': ('AUTH', 'SELECTED'), 74 'GETANNOTATION':('AUTH', 'SELECTED'), 75 'GETQUOTA': ('AUTH', 'SELECTED'), 76 'GETQUOTAROOT': ('AUTH', 'SELECTED'), 77 'MYRIGHTS': ('AUTH', 'SELECTED'), 78 'LIST': ('AUTH', 'SELECTED'), 79 'LOGIN': ('NONAUTH',), 80 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 81 'LSUB': ('AUTH', 'SELECTED'), 82 'NAMESPACE': ('AUTH', 'SELECTED'), 83 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 84 'PARTIAL': ('SELECTED',), # NB: obsolete 85 'PROXYAUTH': ('AUTH',), 86 'RENAME': ('AUTH', 'SELECTED'), 87 'SEARCH': ('SELECTED',), 88 'SELECT': ('AUTH', 'SELECTED'), 89 'SETACL': ('AUTH', 'SELECTED'), 90 'SETANNOTATION':('AUTH', 'SELECTED'), 91 'SETQUOTA': ('AUTH', 'SELECTED'), 92 'SORT': ('SELECTED',), 93 'STARTTLS': ('NONAUTH',), 94 'STATUS': ('AUTH', 'SELECTED'), 95 'STORE': ('SELECTED',), 96 'SUBSCRIBE': ('AUTH', 'SELECTED'), 97 'THREAD': ('SELECTED',), 98 'UID': ('SELECTED',), 99 'UNSUBSCRIBE': ('AUTH', 'SELECTED'), 100 } 101 102# Patterns to match server responses 103 104Continuation = re.compile(br'\+( (?P<data>.*))?') 105Flags = re.compile(br'.*FLAGS \((?P<flags>[^\)]*)\)') 106InternalDate = re.compile(br'.*INTERNALDATE "' 107 br'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])' 108 br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])' 109 br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])' 110 br'"') 111# Literal is no longer used; kept for backward compatibility. 112Literal = re.compile(br'.*{(?P<size>\d+)}$', re.ASCII) 113MapCRLF = re.compile(br'\r\n|\r|\n') 114# We no longer exclude the ']' character from the data portion of the response 115# code, even though it violates the RFC. Popular IMAP servers such as Gmail 116# allow flags with ']', and there are programs (including imaplib!) that can 117# produce them. The problem with this is if the 'text' portion of the response 118# includes a ']' we'll parse the response wrong (which is the point of the RFC 119# restriction). However, that seems less likely to be a problem in practice 120# than being unable to correctly parse flags that include ']' chars, which 121# was reported as a real-world problem in issue #21815. 122Response_code = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>.*))?\]') 123Untagged_response = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?') 124# Untagged_status is no longer used; kept for backward compatibility 125Untagged_status = re.compile( 126 br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?', re.ASCII) 127# We compile these in _mode_xxx. 128_Literal = br'.*{(?P<size>\d+)}$' 129_Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?' 130 131 132 133class IMAP4: 134 135 r"""IMAP4 client class. 136 137 Instantiate with: IMAP4([host[, port]]) 138 139 host - host's name (default: localhost); 140 port - port number (default: standard IMAP4 port). 141 142 All IMAP4rev1 commands are supported by methods of the same 143 name (in lower-case). 144 145 All arguments to commands are converted to strings, except for 146 AUTHENTICATE, and the last argument to APPEND which is passed as 147 an IMAP4 literal. If necessary (the string contains any 148 non-printing characters or white-space and isn't enclosed with 149 either parentheses or double quotes) each string is quoted. 150 However, the 'password' argument to the LOGIN command is always 151 quoted. If you want to avoid having an argument string quoted 152 (eg: the 'flags' argument to STORE) then enclose the string in 153 parentheses (eg: "(\Deleted)"). 154 155 Each command returns a tuple: (type, [data, ...]) where 'type' 156 is usually 'OK' or 'NO', and 'data' is either the text from the 157 tagged response, or untagged results from command. Each 'data' 158 is either a string, or a tuple. If a tuple, then the first part 159 is the header of the response, and the second part contains 160 the data (ie: 'literal' value). 161 162 Errors raise the exception class <instance>.error("<reason>"). 163 IMAP4 server errors raise <instance>.abort("<reason>"), 164 which is a sub-class of 'error'. Mailbox status changes 165 from READ-WRITE to READ-ONLY raise the exception class 166 <instance>.readonly("<reason>"), which is a sub-class of 'abort'. 167 168 "error" exceptions imply a program error. 169 "abort" exceptions imply the connection should be reset, and 170 the command re-tried. 171 "readonly" exceptions imply the command should be re-tried. 172 173 Note: to use this module, you must read the RFCs pertaining to the 174 IMAP4 protocol, as the semantics of the arguments to each IMAP4 175 command are left to the invoker, not to mention the results. Also, 176 most IMAP servers implement a sub-set of the commands available here. 177 """ 178 179 class error(Exception): pass # Logical errors - debug required 180 class abort(error): pass # Service errors - close and retry 181 class readonly(abort): pass # Mailbox status changed to READ-ONLY 182 183 def __init__(self, host='', port=IMAP4_PORT): 184 self.debug = Debug 185 self.state = 'LOGOUT' 186 self.literal = None # A literal argument to a command 187 self.tagged_commands = {} # Tagged commands awaiting response 188 self.untagged_responses = {} # {typ: [data, ...], ...} 189 self.continuation_response = '' # Last continuation response 190 self.is_readonly = False # READ-ONLY desired state 191 self.tagnum = 0 192 self._tls_established = False 193 self._mode_ascii() 194 195 # Open socket to server. 196 197 self.open(host, port) 198 199 try: 200 self._connect() 201 except Exception: 202 try: 203 self.shutdown() 204 except OSError: 205 pass 206 raise 207 208 def _mode_ascii(self): 209 self.utf8_enabled = False 210 self._encoding = 'ascii' 211 self.Literal = re.compile(_Literal, re.ASCII) 212 self.Untagged_status = re.compile(_Untagged_status, re.ASCII) 213 214 215 def _mode_utf8(self): 216 self.utf8_enabled = True 217 self._encoding = 'utf-8' 218 self.Literal = re.compile(_Literal) 219 self.Untagged_status = re.compile(_Untagged_status) 220 221 222 def _connect(self): 223 # Create unique tag for this session, 224 # and compile tagged response matcher. 225 226 self.tagpre = Int2AP(random.randint(4096, 65535)) 227 self.tagre = re.compile(br'(?P<tag>' 228 + self.tagpre 229 + br'\d+) (?P<type>[A-Z]+) (?P<data>.*)', re.ASCII) 230 231 # Get server welcome message, 232 # request and store CAPABILITY response. 233 234 if __debug__: 235 self._cmd_log_len = 10 236 self._cmd_log_idx = 0 237 self._cmd_log = {} # Last `_cmd_log_len' interactions 238 if self.debug >= 1: 239 self._mesg('imaplib version %s' % __version__) 240 self._mesg('new IMAP4 connection, tag=%s' % self.tagpre) 241 242 self.welcome = self._get_response() 243 if 'PREAUTH' in self.untagged_responses: 244 self.state = 'AUTH' 245 elif 'OK' in self.untagged_responses: 246 self.state = 'NONAUTH' 247 else: 248 raise self.error(self.welcome) 249 250 self._get_capabilities() 251 if __debug__: 252 if self.debug >= 3: 253 self._mesg('CAPABILITIES: %r' % (self.capabilities,)) 254 255 for version in AllowedVersions: 256 if not version in self.capabilities: 257 continue 258 self.PROTOCOL_VERSION = version 259 return 260 261 raise self.error('server not IMAP4 compliant') 262 263 264 def __getattr__(self, attr): 265 # Allow UPPERCASE variants of IMAP4 command methods. 266 if attr in Commands: 267 return getattr(self, attr.lower()) 268 raise AttributeError("Unknown IMAP4 command: '%s'" % attr) 269 270 def __enter__(self): 271 return self 272 273 def __exit__(self, *args): 274 try: 275 self.logout() 276 except OSError: 277 pass 278 279 280 # Overridable methods 281 282 283 def _create_socket(self): 284 return socket.create_connection((self.host, self.port)) 285 286 def open(self, host = '', port = IMAP4_PORT): 287 """Setup connection to remote server on "host:port" 288 (default: localhost:standard IMAP4 port). 289 This connection will be used by the routines: 290 read, readline, send, shutdown. 291 """ 292 self.host = host 293 self.port = port 294 self.sock = self._create_socket() 295 self.file = self.sock.makefile('rb') 296 297 298 def read(self, size): 299 """Read 'size' bytes from remote.""" 300 return self.file.read(size) 301 302 303 def readline(self): 304 """Read line from remote.""" 305 line = self.file.readline(_MAXLINE + 1) 306 if len(line) > _MAXLINE: 307 raise self.error("got more than %d bytes" % _MAXLINE) 308 return line 309 310 311 def send(self, data): 312 """Send data to remote.""" 313 self.sock.sendall(data) 314 315 316 def shutdown(self): 317 """Close I/O established in "open".""" 318 self.file.close() 319 try: 320 self.sock.shutdown(socket.SHUT_RDWR) 321 except OSError as e: 322 # The server might already have closed the connection 323 if e.errno != errno.ENOTCONN: 324 raise 325 finally: 326 self.sock.close() 327 328 329 def socket(self): 330 """Return socket instance used to connect to IMAP4 server. 331 332 socket = <instance>.socket() 333 """ 334 return self.sock 335 336 337 338 # Utility methods 339 340 341 def recent(self): 342 """Return most recent 'RECENT' responses if any exist, 343 else prompt server for an update using the 'NOOP' command. 344 345 (typ, [data]) = <instance>.recent() 346 347 'data' is None if no new messages, 348 else list of RECENT responses, most recent last. 349 """ 350 name = 'RECENT' 351 typ, dat = self._untagged_response('OK', [None], name) 352 if dat[-1]: 353 return typ, dat 354 typ, dat = self.noop() # Prod server for response 355 return self._untagged_response(typ, dat, name) 356 357 358 def response(self, code): 359 """Return data for response 'code' if received, or None. 360 361 Old value for response 'code' is cleared. 362 363 (code, [data]) = <instance>.response(code) 364 """ 365 return self._untagged_response(code, [None], code.upper()) 366 367 368 369 # IMAP4 commands 370 371 372 def append(self, mailbox, flags, date_time, message): 373 """Append message to named mailbox. 374 375 (typ, [data]) = <instance>.append(mailbox, flags, date_time, message) 376 377 All args except `message' can be None. 378 """ 379 name = 'APPEND' 380 if not mailbox: 381 mailbox = 'INBOX' 382 if flags: 383 if (flags[0],flags[-1]) != ('(',')'): 384 flags = '(%s)' % flags 385 else: 386 flags = None 387 if date_time: 388 date_time = Time2Internaldate(date_time) 389 else: 390 date_time = None 391 literal = MapCRLF.sub(CRLF, message) 392 if self.utf8_enabled: 393 literal = b'UTF8 (' + literal + b')' 394 self.literal = literal 395 return self._simple_command(name, mailbox, flags, date_time) 396 397 398 def authenticate(self, mechanism, authobject): 399 """Authenticate command - requires response processing. 400 401 'mechanism' specifies which authentication mechanism is to 402 be used - it must appear in <instance>.capabilities in the 403 form AUTH=<mechanism>. 404 405 'authobject' must be a callable object: 406 407 data = authobject(response) 408 409 It will be called to process server continuation responses; the 410 response argument it is passed will be a bytes. It should return bytes 411 data that will be base64 encoded and sent to the server. It should 412 return None if the client abort response '*' should be sent instead. 413 """ 414 mech = mechanism.upper() 415 # XXX: shouldn't this code be removed, not commented out? 416 #cap = 'AUTH=%s' % mech 417 #if not cap in self.capabilities: # Let the server decide! 418 # raise self.error("Server doesn't allow %s authentication." % mech) 419 self.literal = _Authenticator(authobject).process 420 typ, dat = self._simple_command('AUTHENTICATE', mech) 421 if typ != 'OK': 422 raise self.error(dat[-1].decode('utf-8', 'replace')) 423 self.state = 'AUTH' 424 return typ, dat 425 426 427 def capability(self): 428 """(typ, [data]) = <instance>.capability() 429 Fetch capabilities list from server.""" 430 431 name = 'CAPABILITY' 432 typ, dat = self._simple_command(name) 433 return self._untagged_response(typ, dat, name) 434 435 436 def check(self): 437 """Checkpoint mailbox on server. 438 439 (typ, [data]) = <instance>.check() 440 """ 441 return self._simple_command('CHECK') 442 443 444 def close(self): 445 """Close currently selected mailbox. 446 447 Deleted messages are removed from writable mailbox. 448 This is the recommended command before 'LOGOUT'. 449 450 (typ, [data]) = <instance>.close() 451 """ 452 try: 453 typ, dat = self._simple_command('CLOSE') 454 finally: 455 self.state = 'AUTH' 456 return typ, dat 457 458 459 def copy(self, message_set, new_mailbox): 460 """Copy 'message_set' messages onto end of 'new_mailbox'. 461 462 (typ, [data]) = <instance>.copy(message_set, new_mailbox) 463 """ 464 return self._simple_command('COPY', message_set, new_mailbox) 465 466 467 def create(self, mailbox): 468 """Create new mailbox. 469 470 (typ, [data]) = <instance>.create(mailbox) 471 """ 472 return self._simple_command('CREATE', mailbox) 473 474 475 def delete(self, mailbox): 476 """Delete old mailbox. 477 478 (typ, [data]) = <instance>.delete(mailbox) 479 """ 480 return self._simple_command('DELETE', mailbox) 481 482 def deleteacl(self, mailbox, who): 483 """Delete the ACLs (remove any rights) set for who on mailbox. 484 485 (typ, [data]) = <instance>.deleteacl(mailbox, who) 486 """ 487 return self._simple_command('DELETEACL', mailbox, who) 488 489 def enable(self, capability): 490 """Send an RFC5161 enable string to the server. 491 492 (typ, [data]) = <intance>.enable(capability) 493 """ 494 if 'ENABLE' not in self.capabilities: 495 raise IMAP4.error("Server does not support ENABLE") 496 typ, data = self._simple_command('ENABLE', capability) 497 if typ == 'OK' and 'UTF8=ACCEPT' in capability.upper(): 498 self._mode_utf8() 499 return typ, data 500 501 def expunge(self): 502 """Permanently remove deleted items from selected mailbox. 503 504 Generates 'EXPUNGE' response for each deleted message. 505 506 (typ, [data]) = <instance>.expunge() 507 508 'data' is list of 'EXPUNGE'd message numbers in order received. 509 """ 510 name = 'EXPUNGE' 511 typ, dat = self._simple_command(name) 512 return self._untagged_response(typ, dat, name) 513 514 515 def fetch(self, message_set, message_parts): 516 """Fetch (parts of) messages. 517 518 (typ, [data, ...]) = <instance>.fetch(message_set, message_parts) 519 520 'message_parts' should be a string of selected parts 521 enclosed in parentheses, eg: "(UID BODY[TEXT])". 522 523 'data' are tuples of message part envelope and data. 524 """ 525 name = 'FETCH' 526 typ, dat = self._simple_command(name, message_set, message_parts) 527 return self._untagged_response(typ, dat, name) 528 529 530 def getacl(self, mailbox): 531 """Get the ACLs for a mailbox. 532 533 (typ, [data]) = <instance>.getacl(mailbox) 534 """ 535 typ, dat = self._simple_command('GETACL', mailbox) 536 return self._untagged_response(typ, dat, 'ACL') 537 538 539 def getannotation(self, mailbox, entry, attribute): 540 """(typ, [data]) = <instance>.getannotation(mailbox, entry, attribute) 541 Retrieve ANNOTATIONs.""" 542 543 typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute) 544 return self._untagged_response(typ, dat, 'ANNOTATION') 545 546 547 def getquota(self, root): 548 """Get the quota root's resource usage and limits. 549 550 Part of the IMAP4 QUOTA extension defined in rfc2087. 551 552 (typ, [data]) = <instance>.getquota(root) 553 """ 554 typ, dat = self._simple_command('GETQUOTA', root) 555 return self._untagged_response(typ, dat, 'QUOTA') 556 557 558 def getquotaroot(self, mailbox): 559 """Get the list of quota roots for the named mailbox. 560 561 (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = <instance>.getquotaroot(mailbox) 562 """ 563 typ, dat = self._simple_command('GETQUOTAROOT', mailbox) 564 typ, quota = self._untagged_response(typ, dat, 'QUOTA') 565 typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT') 566 return typ, [quotaroot, quota] 567 568 569 def list(self, directory='""', pattern='*'): 570 """List mailbox names in directory matching pattern. 571 572 (typ, [data]) = <instance>.list(directory='""', pattern='*') 573 574 'data' is list of LIST responses. 575 """ 576 name = 'LIST' 577 typ, dat = self._simple_command(name, directory, pattern) 578 return self._untagged_response(typ, dat, name) 579 580 581 def login(self, user, password): 582 """Identify client using plaintext password. 583 584 (typ, [data]) = <instance>.login(user, password) 585 586 NB: 'password' will be quoted. 587 """ 588 typ, dat = self._simple_command('LOGIN', user, self._quote(password)) 589 if typ != 'OK': 590 raise self.error(dat[-1]) 591 self.state = 'AUTH' 592 return typ, dat 593 594 595 def login_cram_md5(self, user, password): 596 """ Force use of CRAM-MD5 authentication. 597 598 (typ, [data]) = <instance>.login_cram_md5(user, password) 599 """ 600 self.user, self.password = user, password 601 return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH) 602 603 604 def _CRAM_MD5_AUTH(self, challenge): 605 """ Authobject to use with CRAM-MD5 authentication. """ 606 import hmac 607 pwd = (self.password.encode('utf-8') if isinstance(self.password, str) 608 else self.password) 609 return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest() 610 611 612 def logout(self): 613 """Shutdown connection to server. 614 615 (typ, [data]) = <instance>.logout() 616 617 Returns server 'BYE' response. 618 """ 619 self.state = 'LOGOUT' 620 try: typ, dat = self._simple_command('LOGOUT') 621 except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]] 622 self.shutdown() 623 if 'BYE' in self.untagged_responses: 624 return 'BYE', self.untagged_responses['BYE'] 625 return typ, dat 626 627 628 def lsub(self, directory='""', pattern='*'): 629 """List 'subscribed' mailbox names in directory matching pattern. 630 631 (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*') 632 633 'data' are tuples of message part envelope and data. 634 """ 635 name = 'LSUB' 636 typ, dat = self._simple_command(name, directory, pattern) 637 return self._untagged_response(typ, dat, name) 638 639 def myrights(self, mailbox): 640 """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox). 641 642 (typ, [data]) = <instance>.myrights(mailbox) 643 """ 644 typ,dat = self._simple_command('MYRIGHTS', mailbox) 645 return self._untagged_response(typ, dat, 'MYRIGHTS') 646 647 def namespace(self): 648 """ Returns IMAP namespaces ala rfc2342 649 650 (typ, [data, ...]) = <instance>.namespace() 651 """ 652 name = 'NAMESPACE' 653 typ, dat = self._simple_command(name) 654 return self._untagged_response(typ, dat, name) 655 656 657 def noop(self): 658 """Send NOOP command. 659 660 (typ, [data]) = <instance>.noop() 661 """ 662 if __debug__: 663 if self.debug >= 3: 664 self._dump_ur(self.untagged_responses) 665 return self._simple_command('NOOP') 666 667 668 def partial(self, message_num, message_part, start, length): 669 """Fetch truncated part of a message. 670 671 (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length) 672 673 'data' is tuple of message part envelope and data. 674 """ 675 name = 'PARTIAL' 676 typ, dat = self._simple_command(name, message_num, message_part, start, length) 677 return self._untagged_response(typ, dat, 'FETCH') 678 679 680 def proxyauth(self, user): 681 """Assume authentication as "user". 682 683 Allows an authorised administrator to proxy into any user's 684 mailbox. 685 686 (typ, [data]) = <instance>.proxyauth(user) 687 """ 688 689 name = 'PROXYAUTH' 690 return self._simple_command('PROXYAUTH', user) 691 692 693 def rename(self, oldmailbox, newmailbox): 694 """Rename old mailbox name to new. 695 696 (typ, [data]) = <instance>.rename(oldmailbox, newmailbox) 697 """ 698 return self._simple_command('RENAME', oldmailbox, newmailbox) 699 700 701 def search(self, charset, *criteria): 702 """Search mailbox for matching messages. 703 704 (typ, [data]) = <instance>.search(charset, criterion, ...) 705 706 'data' is space separated list of matching message numbers. 707 If UTF8 is enabled, charset MUST be None. 708 """ 709 name = 'SEARCH' 710 if charset: 711 if self.utf8_enabled: 712 raise IMAP4.error("Non-None charset not valid in UTF8 mode") 713 typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria) 714 else: 715 typ, dat = self._simple_command(name, *criteria) 716 return self._untagged_response(typ, dat, name) 717 718 719 def select(self, mailbox='INBOX', readonly=False): 720 """Select a mailbox. 721 722 Flush all untagged responses. 723 724 (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=False) 725 726 'data' is count of messages in mailbox ('EXISTS' response). 727 728 Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY'), so 729 other responses should be obtained via <instance>.response('FLAGS') etc. 730 """ 731 self.untagged_responses = {} # Flush old responses. 732 self.is_readonly = readonly 733 if readonly: 734 name = 'EXAMINE' 735 else: 736 name = 'SELECT' 737 typ, dat = self._simple_command(name, mailbox) 738 if typ != 'OK': 739 self.state = 'AUTH' # Might have been 'SELECTED' 740 return typ, dat 741 self.state = 'SELECTED' 742 if 'READ-ONLY' in self.untagged_responses \ 743 and not readonly: 744 if __debug__: 745 if self.debug >= 1: 746 self._dump_ur(self.untagged_responses) 747 raise self.readonly('%s is not writable' % mailbox) 748 return typ, self.untagged_responses.get('EXISTS', [None]) 749 750 751 def setacl(self, mailbox, who, what): 752 """Set a mailbox acl. 753 754 (typ, [data]) = <instance>.setacl(mailbox, who, what) 755 """ 756 return self._simple_command('SETACL', mailbox, who, what) 757 758 759 def setannotation(self, *args): 760 """(typ, [data]) = <instance>.setannotation(mailbox[, entry, attribute]+) 761 Set ANNOTATIONs.""" 762 763 typ, dat = self._simple_command('SETANNOTATION', *args) 764 return self._untagged_response(typ, dat, 'ANNOTATION') 765 766 767 def setquota(self, root, limits): 768 """Set the quota root's resource limits. 769 770 (typ, [data]) = <instance>.setquota(root, limits) 771 """ 772 typ, dat = self._simple_command('SETQUOTA', root, limits) 773 return self._untagged_response(typ, dat, 'QUOTA') 774 775 776 def sort(self, sort_criteria, charset, *search_criteria): 777 """IMAP4rev1 extension SORT command. 778 779 (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...) 780 """ 781 name = 'SORT' 782 #if not name in self.capabilities: # Let the server decide! 783 # raise self.error('unimplemented extension command: %s' % name) 784 if (sort_criteria[0],sort_criteria[-1]) != ('(',')'): 785 sort_criteria = '(%s)' % sort_criteria 786 typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria) 787 return self._untagged_response(typ, dat, name) 788 789 790 def starttls(self, ssl_context=None): 791 name = 'STARTTLS' 792 if not HAVE_SSL: 793 raise self.error('SSL support missing') 794 if self._tls_established: 795 raise self.abort('TLS session already established') 796 if name not in self.capabilities: 797 raise self.abort('TLS not supported by server') 798 # Generate a default SSL context if none was passed. 799 if ssl_context is None: 800 ssl_context = ssl._create_stdlib_context() 801 typ, dat = self._simple_command(name) 802 if typ == 'OK': 803 self.sock = ssl_context.wrap_socket(self.sock, 804 server_hostname=self.host) 805 self.file = self.sock.makefile('rb') 806 self._tls_established = True 807 self._get_capabilities() 808 else: 809 raise self.error("Couldn't establish TLS session") 810 return self._untagged_response(typ, dat, name) 811 812 813 def status(self, mailbox, names): 814 """Request named status conditions for mailbox. 815 816 (typ, [data]) = <instance>.status(mailbox, names) 817 """ 818 name = 'STATUS' 819 #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! 820 # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) 821 typ, dat = self._simple_command(name, mailbox, names) 822 return self._untagged_response(typ, dat, name) 823 824 825 def store(self, message_set, command, flags): 826 """Alters flag dispositions for messages in mailbox. 827 828 (typ, [data]) = <instance>.store(message_set, command, flags) 829 """ 830 if (flags[0],flags[-1]) != ('(',')'): 831 flags = '(%s)' % flags # Avoid quoting the flags 832 typ, dat = self._simple_command('STORE', message_set, command, flags) 833 return self._untagged_response(typ, dat, 'FETCH') 834 835 836 def subscribe(self, mailbox): 837 """Subscribe to new mailbox. 838 839 (typ, [data]) = <instance>.subscribe(mailbox) 840 """ 841 return self._simple_command('SUBSCRIBE', mailbox) 842 843 844 def thread(self, threading_algorithm, charset, *search_criteria): 845 """IMAPrev1 extension THREAD command. 846 847 (type, [data]) = <instance>.thread(threading_algorithm, charset, search_criteria, ...) 848 """ 849 name = 'THREAD' 850 typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria) 851 return self._untagged_response(typ, dat, name) 852 853 854 def uid(self, command, *args): 855 """Execute "command arg ..." with messages identified by UID, 856 rather than message number. 857 858 (typ, [data]) = <instance>.uid(command, arg1, arg2, ...) 859 860 Returns response appropriate to 'command'. 861 """ 862 command = command.upper() 863 if not command in Commands: 864 raise self.error("Unknown IMAP4 UID command: %s" % command) 865 if self.state not in Commands[command]: 866 raise self.error("command %s illegal in state %s, " 867 "only allowed in states %s" % 868 (command, self.state, 869 ', '.join(Commands[command]))) 870 name = 'UID' 871 typ, dat = self._simple_command(name, command, *args) 872 if command in ('SEARCH', 'SORT', 'THREAD'): 873 name = command 874 else: 875 name = 'FETCH' 876 return self._untagged_response(typ, dat, name) 877 878 879 def unsubscribe(self, mailbox): 880 """Unsubscribe from old mailbox. 881 882 (typ, [data]) = <instance>.unsubscribe(mailbox) 883 """ 884 return self._simple_command('UNSUBSCRIBE', mailbox) 885 886 887 def xatom(self, name, *args): 888 """Allow simple extension commands 889 notified by server in CAPABILITY response. 890 891 Assumes command is legal in current state. 892 893 (typ, [data]) = <instance>.xatom(name, arg, ...) 894 895 Returns response appropriate to extension command `name'. 896 """ 897 name = name.upper() 898 #if not name in self.capabilities: # Let the server decide! 899 # raise self.error('unknown extension command: %s' % name) 900 if not name in Commands: 901 Commands[name] = (self.state,) 902 return self._simple_command(name, *args) 903 904 905 906 # Private methods 907 908 909 def _append_untagged(self, typ, dat): 910 if dat is None: 911 dat = b'' 912 ur = self.untagged_responses 913 if __debug__: 914 if self.debug >= 5: 915 self._mesg('untagged_responses[%s] %s += ["%r"]' % 916 (typ, len(ur.get(typ,'')), dat)) 917 if typ in ur: 918 ur[typ].append(dat) 919 else: 920 ur[typ] = [dat] 921 922 923 def _check_bye(self): 924 bye = self.untagged_responses.get('BYE') 925 if bye: 926 raise self.abort(bye[-1].decode(self._encoding, 'replace')) 927 928 929 def _command(self, name, *args): 930 931 if self.state not in Commands[name]: 932 self.literal = None 933 raise self.error("command %s illegal in state %s, " 934 "only allowed in states %s" % 935 (name, self.state, 936 ', '.join(Commands[name]))) 937 938 for typ in ('OK', 'NO', 'BAD'): 939 if typ in self.untagged_responses: 940 del self.untagged_responses[typ] 941 942 if 'READ-ONLY' in self.untagged_responses \ 943 and not self.is_readonly: 944 raise self.readonly('mailbox status changed to READ-ONLY') 945 946 tag = self._new_tag() 947 name = bytes(name, self._encoding) 948 data = tag + b' ' + name 949 for arg in args: 950 if arg is None: continue 951 if isinstance(arg, str): 952 arg = bytes(arg, self._encoding) 953 data = data + b' ' + arg 954 955 literal = self.literal 956 if literal is not None: 957 self.literal = None 958 if type(literal) is type(self._command): 959 literator = literal 960 else: 961 literator = None 962 data = data + bytes(' {%s}' % len(literal), self._encoding) 963 964 if __debug__: 965 if self.debug >= 4: 966 self._mesg('> %r' % data) 967 else: 968 self._log('> %r' % data) 969 970 try: 971 self.send(data + CRLF) 972 except OSError as val: 973 raise self.abort('socket error: %s' % val) 974 975 if literal is None: 976 return tag 977 978 while 1: 979 # Wait for continuation response 980 981 while self._get_response(): 982 if self.tagged_commands[tag]: # BAD/NO? 983 return tag 984 985 # Send literal 986 987 if literator: 988 literal = literator(self.continuation_response) 989 990 if __debug__: 991 if self.debug >= 4: 992 self._mesg('write literal size %s' % len(literal)) 993 994 try: 995 self.send(literal) 996 self.send(CRLF) 997 except OSError as val: 998 raise self.abort('socket error: %s' % val) 999 1000 if not literator: 1001 break 1002 1003 return tag 1004 1005 1006 def _command_complete(self, name, tag): 1007 # BYE is expected after LOGOUT 1008 if name != 'LOGOUT': 1009 self._check_bye() 1010 try: 1011 typ, data = self._get_tagged_response(tag) 1012 except self.abort as val: 1013 raise self.abort('command: %s => %s' % (name, val)) 1014 except self.error as val: 1015 raise self.error('command: %s => %s' % (name, val)) 1016 if name != 'LOGOUT': 1017 self._check_bye() 1018 if typ == 'BAD': 1019 raise self.error('%s command error: %s %s' % (name, typ, data)) 1020 return typ, data 1021 1022 1023 def _get_capabilities(self): 1024 typ, dat = self.capability() 1025 if dat == [None]: 1026 raise self.error('no CAPABILITY response from server') 1027 dat = str(dat[-1], self._encoding) 1028 dat = dat.upper() 1029 self.capabilities = tuple(dat.split()) 1030 1031 1032 def _get_response(self): 1033 1034 # Read response and store. 1035 # 1036 # Returns None for continuation responses, 1037 # otherwise first response line received. 1038 1039 resp = self._get_line() 1040 1041 # Command completion response? 1042 1043 if self._match(self.tagre, resp): 1044 tag = self.mo.group('tag') 1045 if not tag in self.tagged_commands: 1046 raise self.abort('unexpected tagged response: %r' % resp) 1047 1048 typ = self.mo.group('type') 1049 typ = str(typ, self._encoding) 1050 dat = self.mo.group('data') 1051 self.tagged_commands[tag] = (typ, [dat]) 1052 else: 1053 dat2 = None 1054 1055 # '*' (untagged) responses? 1056 1057 if not self._match(Untagged_response, resp): 1058 if self._match(self.Untagged_status, resp): 1059 dat2 = self.mo.group('data2') 1060 1061 if self.mo is None: 1062 # Only other possibility is '+' (continuation) response... 1063 1064 if self._match(Continuation, resp): 1065 self.continuation_response = self.mo.group('data') 1066 return None # NB: indicates continuation 1067 1068 raise self.abort("unexpected response: %r" % resp) 1069 1070 typ = self.mo.group('type') 1071 typ = str(typ, self._encoding) 1072 dat = self.mo.group('data') 1073 if dat is None: dat = b'' # Null untagged response 1074 if dat2: dat = dat + b' ' + dat2 1075 1076 # Is there a literal to come? 1077 1078 while self._match(self.Literal, dat): 1079 1080 # Read literal direct from connection. 1081 1082 size = int(self.mo.group('size')) 1083 if __debug__: 1084 if self.debug >= 4: 1085 self._mesg('read literal size %s' % size) 1086 data = self.read(size) 1087 1088 # Store response with literal as tuple 1089 1090 self._append_untagged(typ, (dat, data)) 1091 1092 # Read trailer - possibly containing another literal 1093 1094 dat = self._get_line() 1095 1096 self._append_untagged(typ, dat) 1097 1098 # Bracketed response information? 1099 1100 if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat): 1101 typ = self.mo.group('type') 1102 typ = str(typ, self._encoding) 1103 self._append_untagged(typ, self.mo.group('data')) 1104 1105 if __debug__: 1106 if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'): 1107 self._mesg('%s response: %r' % (typ, dat)) 1108 1109 return resp 1110 1111 1112 def _get_tagged_response(self, tag): 1113 1114 while 1: 1115 result = self.tagged_commands[tag] 1116 if result is not None: 1117 del self.tagged_commands[tag] 1118 return result 1119 1120 # If we've seen a BYE at this point, the socket will be 1121 # closed, so report the BYE now. 1122 1123 self._check_bye() 1124 1125 # Some have reported "unexpected response" exceptions. 1126 # Note that ignoring them here causes loops. 1127 # Instead, send me details of the unexpected response and 1128 # I'll update the code in `_get_response()'. 1129 1130 try: 1131 self._get_response() 1132 except self.abort as val: 1133 if __debug__: 1134 if self.debug >= 1: 1135 self.print_log() 1136 raise 1137 1138 1139 def _get_line(self): 1140 1141 line = self.readline() 1142 if not line: 1143 raise self.abort('socket error: EOF') 1144 1145 # Protocol mandates all lines terminated by CRLF 1146 if not line.endswith(b'\r\n'): 1147 raise self.abort('socket error: unterminated line: %r' % line) 1148 1149 line = line[:-2] 1150 if __debug__: 1151 if self.debug >= 4: 1152 self._mesg('< %r' % line) 1153 else: 1154 self._log('< %r' % line) 1155 return line 1156 1157 1158 def _match(self, cre, s): 1159 1160 # Run compiled regular expression match method on 's'. 1161 # Save result, return success. 1162 1163 self.mo = cre.match(s) 1164 if __debug__: 1165 if self.mo is not None and self.debug >= 5: 1166 self._mesg("\tmatched r'%r' => %r" % (cre.pattern, self.mo.groups())) 1167 return self.mo is not None 1168 1169 1170 def _new_tag(self): 1171 1172 tag = self.tagpre + bytes(str(self.tagnum), self._encoding) 1173 self.tagnum = self.tagnum + 1 1174 self.tagged_commands[tag] = None 1175 return tag 1176 1177 1178 def _quote(self, arg): 1179 1180 arg = arg.replace('\\', '\\\\') 1181 arg = arg.replace('"', '\\"') 1182 1183 return '"' + arg + '"' 1184 1185 1186 def _simple_command(self, name, *args): 1187 1188 return self._command_complete(name, self._command(name, *args)) 1189 1190 1191 def _untagged_response(self, typ, dat, name): 1192 if typ == 'NO': 1193 return typ, dat 1194 if not name in self.untagged_responses: 1195 return typ, [None] 1196 data = self.untagged_responses.pop(name) 1197 if __debug__: 1198 if self.debug >= 5: 1199 self._mesg('untagged_responses[%s] => %s' % (name, data)) 1200 return typ, data 1201 1202 1203 if __debug__: 1204 1205 def _mesg(self, s, secs=None): 1206 if secs is None: 1207 secs = time.time() 1208 tm = time.strftime('%M:%S', time.localtime(secs)) 1209 sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s)) 1210 sys.stderr.flush() 1211 1212 def _dump_ur(self, dict): 1213 # Dump untagged responses (in `dict'). 1214 l = dict.items() 1215 if not l: return 1216 t = '\n\t\t' 1217 l = map(lambda x:'%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l) 1218 self._mesg('untagged responses dump:%s%s' % (t, t.join(l))) 1219 1220 def _log(self, line): 1221 # Keep log of last `_cmd_log_len' interactions for debugging. 1222 self._cmd_log[self._cmd_log_idx] = (line, time.time()) 1223 self._cmd_log_idx += 1 1224 if self._cmd_log_idx >= self._cmd_log_len: 1225 self._cmd_log_idx = 0 1226 1227 def print_log(self): 1228 self._mesg('last %d IMAP4 interactions:' % len(self._cmd_log)) 1229 i, n = self._cmd_log_idx, self._cmd_log_len 1230 while n: 1231 try: 1232 self._mesg(*self._cmd_log[i]) 1233 except: 1234 pass 1235 i += 1 1236 if i >= self._cmd_log_len: 1237 i = 0 1238 n -= 1 1239 1240 1241if HAVE_SSL: 1242 1243 class IMAP4_SSL(IMAP4): 1244 1245 """IMAP4 client class over SSL connection 1246 1247 Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile[, ssl_context]]]]]) 1248 1249 host - host's name (default: localhost); 1250 port - port number (default: standard IMAP4 SSL port); 1251 keyfile - PEM formatted file that contains your private key (default: None); 1252 certfile - PEM formatted certificate chain file (default: None); 1253 ssl_context - a SSLContext object that contains your certificate chain 1254 and private key (default: None) 1255 Note: if ssl_context is provided, then parameters keyfile or 1256 certfile should not be set otherwise ValueError is raised. 1257 1258 for more documentation see the docstring of the parent class IMAP4. 1259 """ 1260 1261 1262 def __init__(self, host='', port=IMAP4_SSL_PORT, keyfile=None, 1263 certfile=None, ssl_context=None): 1264 if ssl_context is not None and keyfile is not None: 1265 raise ValueError("ssl_context and keyfile arguments are mutually " 1266 "exclusive") 1267 if ssl_context is not None and certfile is not None: 1268 raise ValueError("ssl_context and certfile arguments are mutually " 1269 "exclusive") 1270 if keyfile is not None or certfile is not None: 1271 import warnings 1272 warnings.warn("keyfile and certfile are deprecated, use a" 1273 "custom ssl_context instead", DeprecationWarning, 2) 1274 self.keyfile = keyfile 1275 self.certfile = certfile 1276 if ssl_context is None: 1277 ssl_context = ssl._create_stdlib_context(certfile=certfile, 1278 keyfile=keyfile) 1279 self.ssl_context = ssl_context 1280 IMAP4.__init__(self, host, port) 1281 1282 def _create_socket(self): 1283 sock = IMAP4._create_socket(self) 1284 return self.ssl_context.wrap_socket(sock, 1285 server_hostname=self.host) 1286 1287 def open(self, host='', port=IMAP4_SSL_PORT): 1288 """Setup connection to remote server on "host:port". 1289 (default: localhost:standard IMAP4 SSL port). 1290 This connection will be used by the routines: 1291 read, readline, send, shutdown. 1292 """ 1293 IMAP4.open(self, host, port) 1294 1295 __all__.append("IMAP4_SSL") 1296 1297 1298class IMAP4_stream(IMAP4): 1299 1300 """IMAP4 client class over a stream 1301 1302 Instantiate with: IMAP4_stream(command) 1303 1304 "command" - a string that can be passed to subprocess.Popen() 1305 1306 for more documentation see the docstring of the parent class IMAP4. 1307 """ 1308 1309 1310 def __init__(self, command): 1311 self.command = command 1312 IMAP4.__init__(self) 1313 1314 1315 def open(self, host = None, port = None): 1316 """Setup a stream connection. 1317 This connection will be used by the routines: 1318 read, readline, send, shutdown. 1319 """ 1320 self.host = None # For compatibility with parent class 1321 self.port = None 1322 self.sock = None 1323 self.file = None 1324 self.process = subprocess.Popen(self.command, 1325 bufsize=DEFAULT_BUFFER_SIZE, 1326 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1327 shell=True, close_fds=True) 1328 self.writefile = self.process.stdin 1329 self.readfile = self.process.stdout 1330 1331 def read(self, size): 1332 """Read 'size' bytes from remote.""" 1333 return self.readfile.read(size) 1334 1335 1336 def readline(self): 1337 """Read line from remote.""" 1338 return self.readfile.readline() 1339 1340 1341 def send(self, data): 1342 """Send data to remote.""" 1343 self.writefile.write(data) 1344 self.writefile.flush() 1345 1346 1347 def shutdown(self): 1348 """Close I/O established in "open".""" 1349 self.readfile.close() 1350 self.writefile.close() 1351 self.process.wait() 1352 1353 1354 1355class _Authenticator: 1356 1357 """Private class to provide en/decoding 1358 for base64-based authentication conversation. 1359 """ 1360 1361 def __init__(self, mechinst): 1362 self.mech = mechinst # Callable object to provide/process data 1363 1364 def process(self, data): 1365 ret = self.mech(self.decode(data)) 1366 if ret is None: 1367 return b'*' # Abort conversation 1368 return self.encode(ret) 1369 1370 def encode(self, inp): 1371 # 1372 # Invoke binascii.b2a_base64 iteratively with 1373 # short even length buffers, strip the trailing 1374 # line feed from the result and append. "Even" 1375 # means a number that factors to both 6 and 8, 1376 # so when it gets to the end of the 8-bit input 1377 # there's no partial 6-bit output. 1378 # 1379 oup = b'' 1380 if isinstance(inp, str): 1381 inp = inp.encode('utf-8') 1382 while inp: 1383 if len(inp) > 48: 1384 t = inp[:48] 1385 inp = inp[48:] 1386 else: 1387 t = inp 1388 inp = b'' 1389 e = binascii.b2a_base64(t) 1390 if e: 1391 oup = oup + e[:-1] 1392 return oup 1393 1394 def decode(self, inp): 1395 if not inp: 1396 return b'' 1397 return binascii.a2b_base64(inp) 1398 1399Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ') 1400Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])} 1401 1402def Internaldate2tuple(resp): 1403 """Parse an IMAP4 INTERNALDATE string. 1404 1405 Return corresponding local time. The return value is a 1406 time.struct_time tuple or None if the string has wrong format. 1407 """ 1408 1409 mo = InternalDate.match(resp) 1410 if not mo: 1411 return None 1412 1413 mon = Mon2num[mo.group('mon')] 1414 zonen = mo.group('zonen') 1415 1416 day = int(mo.group('day')) 1417 year = int(mo.group('year')) 1418 hour = int(mo.group('hour')) 1419 min = int(mo.group('min')) 1420 sec = int(mo.group('sec')) 1421 zoneh = int(mo.group('zoneh')) 1422 zonem = int(mo.group('zonem')) 1423 1424 # INTERNALDATE timezone must be subtracted to get UT 1425 1426 zone = (zoneh*60 + zonem)*60 1427 if zonen == b'-': 1428 zone = -zone 1429 1430 tt = (year, mon, day, hour, min, sec, -1, -1, -1) 1431 utc = calendar.timegm(tt) - zone 1432 1433 return time.localtime(utc) 1434 1435 1436 1437def Int2AP(num): 1438 1439 """Convert integer to A-P string representation.""" 1440 1441 val = b''; AP = b'ABCDEFGHIJKLMNOP' 1442 num = int(abs(num)) 1443 while num: 1444 num, mod = divmod(num, 16) 1445 val = AP[mod:mod+1] + val 1446 return val 1447 1448 1449 1450def ParseFlags(resp): 1451 1452 """Convert IMAP4 flags response to python tuple.""" 1453 1454 mo = Flags.match(resp) 1455 if not mo: 1456 return () 1457 1458 return tuple(mo.group('flags').split()) 1459 1460 1461def Time2Internaldate(date_time): 1462 1463 """Convert date_time to IMAP4 INTERNALDATE representation. 1464 1465 Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'. The 1466 date_time argument can be a number (int or float) representing 1467 seconds since epoch (as returned by time.time()), a 9-tuple 1468 representing local time, an instance of time.struct_time (as 1469 returned by time.localtime()), an aware datetime instance or a 1470 double-quoted string. In the last case, it is assumed to already 1471 be in the correct format. 1472 """ 1473 if isinstance(date_time, (int, float)): 1474 dt = datetime.fromtimestamp(date_time, 1475 timezone.utc).astimezone() 1476 elif isinstance(date_time, tuple): 1477 try: 1478 gmtoff = date_time.tm_gmtoff 1479 except AttributeError: 1480 if time.daylight: 1481 dst = date_time[8] 1482 if dst == -1: 1483 dst = time.localtime(time.mktime(date_time))[8] 1484 gmtoff = -(time.timezone, time.altzone)[dst] 1485 else: 1486 gmtoff = -time.timezone 1487 delta = timedelta(seconds=gmtoff) 1488 dt = datetime(*date_time[:6], tzinfo=timezone(delta)) 1489 elif isinstance(date_time, datetime): 1490 if date_time.tzinfo is None: 1491 raise ValueError("date_time must be aware") 1492 dt = date_time 1493 elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'): 1494 return date_time # Assume in correct format 1495 else: 1496 raise ValueError("date_time not of a known type") 1497 fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month]) 1498 return dt.strftime(fmt) 1499 1500 1501 1502if __name__ == '__main__': 1503 1504 # To test: invoke either as 'python imaplib.py [IMAP4_server_hostname]' 1505 # or 'python imaplib.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"' 1506 # to test the IMAP4_stream class 1507 1508 import getopt, getpass 1509 1510 try: 1511 optlist, args = getopt.getopt(sys.argv[1:], 'd:s:') 1512 except getopt.error as val: 1513 optlist, args = (), () 1514 1515 stream_command = None 1516 for opt,val in optlist: 1517 if opt == '-d': 1518 Debug = int(val) 1519 elif opt == '-s': 1520 stream_command = val 1521 if not args: args = (stream_command,) 1522 1523 if not args: args = ('',) 1524 1525 host = args[0] 1526 1527 USER = getpass.getuser() 1528 PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost")) 1529 1530 test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':'\n'} 1531 test_seq1 = ( 1532 ('login', (USER, PASSWD)), 1533 ('create', ('/tmp/xxx 1',)), 1534 ('rename', ('/tmp/xxx 1', '/tmp/yyy')), 1535 ('CREATE', ('/tmp/yyz 2',)), 1536 ('append', ('/tmp/yyz 2', None, None, test_mesg)), 1537 ('list', ('/tmp', 'yy*')), 1538 ('select', ('/tmp/yyz 2',)), 1539 ('search', (None, 'SUBJECT', 'test')), 1540 ('fetch', ('1', '(FLAGS INTERNALDATE RFC822)')), 1541 ('store', ('1', 'FLAGS', r'(\Deleted)')), 1542 ('namespace', ()), 1543 ('expunge', ()), 1544 ('recent', ()), 1545 ('close', ()), 1546 ) 1547 1548 test_seq2 = ( 1549 ('select', ()), 1550 ('response',('UIDVALIDITY',)), 1551 ('uid', ('SEARCH', 'ALL')), 1552 ('response', ('EXISTS',)), 1553 ('append', (None, None, None, test_mesg)), 1554 ('recent', ()), 1555 ('logout', ()), 1556 ) 1557 1558 def run(cmd, args): 1559 M._mesg('%s %s' % (cmd, args)) 1560 typ, dat = getattr(M, cmd)(*args) 1561 M._mesg('%s => %s %s' % (cmd, typ, dat)) 1562 if typ == 'NO': raise dat[0] 1563 return dat 1564 1565 try: 1566 if stream_command: 1567 M = IMAP4_stream(stream_command) 1568 else: 1569 M = IMAP4(host) 1570 if M.state == 'AUTH': 1571 test_seq1 = test_seq1[1:] # Login not needed 1572 M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION) 1573 M._mesg('CAPABILITIES = %r' % (M.capabilities,)) 1574 1575 for cmd,args in test_seq1: 1576 run(cmd, args) 1577 1578 for ml in run('list', ('/tmp/', 'yy%')): 1579 mo = re.match(r'.*"([^"]+)"$', ml) 1580 if mo: path = mo.group(1) 1581 else: path = ml.split()[-1] 1582 run('delete', (path,)) 1583 1584 for cmd,args in test_seq2: 1585 dat = run(cmd, args) 1586 1587 if (cmd,args) != ('uid', ('SEARCH', 'ALL')): 1588 continue 1589 1590 uid = dat[-1].split() 1591 if not uid: continue 1592 run('uid', ('FETCH', '%s' % uid[-1], 1593 '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)')) 1594 1595 print('\nAll tests OK.') 1596 1597 except: 1598 print('\nTests failed.') 1599 1600 if not Debug: 1601 print(''' 1602If you would like to see debugging output, 1603try: %s -d5 1604''' % sys.argv[0]) 1605 1606 raise 1607