diff options
Diffstat (limited to 'lib/python2.7/imaplib.py')
-rw-r--r-- | lib/python2.7/imaplib.py | 1536 |
1 files changed, 1536 insertions, 0 deletions
diff --git a/lib/python2.7/imaplib.py b/lib/python2.7/imaplib.py new file mode 100644 index 0000000..826eea2 --- /dev/null +++ b/lib/python2.7/imaplib.py @@ -0,0 +1,1536 @@ +"""IMAP4 client. + +Based on RFC 2060. + +Public class: IMAP4 +Public variable: Debug +Public functions: Internaldate2tuple + Int2AP + ParseFlags + Time2Internaldate +""" + +# Author: Piers Lauder <piers@cs.su.oz.au> December 1997. +# +# Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998. +# String method conversion by ESR, February 2001. +# GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001. +# IMAP4_SSL contributed by Tino Lange <Tino.Lange@isg.de> March 2002. +# GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002. +# PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002. +# GET/SETANNOTATION contributed by Tomas Lindroos <skitta@abo.fi> June 2005. + +__version__ = "2.58" + +import binascii, errno, random, re, socket, subprocess, sys, time + +__all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple", + "Int2AP", "ParseFlags", "Time2Internaldate"] + +# Globals + +CRLF = '\r\n' +Debug = 0 +IMAP4_PORT = 143 +IMAP4_SSL_PORT = 993 +AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first + +# Maximal line length when calling readline(). This is to prevent +# reading arbitrary length lines. RFC 3501 and 2060 (IMAP 4rev1) +# don't specify a line length. RFC 2683 suggests limiting client +# command lines to 1000 octets and that servers should be prepared +# to accept command lines up to 8000 octets, so we used to use 10K here. +# In the modern world (eg: gmail) the response to, for example, a +# search command can be quite large, so we now use 1M. +_MAXLINE = 1000000 + + +# Commands + +Commands = { + # name valid states + 'APPEND': ('AUTH', 'SELECTED'), + 'AUTHENTICATE': ('NONAUTH',), + 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), + 'CHECK': ('SELECTED',), + 'CLOSE': ('SELECTED',), + 'COPY': ('SELECTED',), + 'CREATE': ('AUTH', 'SELECTED'), + 'DELETE': ('AUTH', 'SELECTED'), + 'DELETEACL': ('AUTH', 'SELECTED'), + 'EXAMINE': ('AUTH', 'SELECTED'), + 'EXPUNGE': ('SELECTED',), + 'FETCH': ('SELECTED',), + 'GETACL': ('AUTH', 'SELECTED'), + 'GETANNOTATION':('AUTH', 'SELECTED'), + 'GETQUOTA': ('AUTH', 'SELECTED'), + 'GETQUOTAROOT': ('AUTH', 'SELECTED'), + 'MYRIGHTS': ('AUTH', 'SELECTED'), + 'LIST': ('AUTH', 'SELECTED'), + 'LOGIN': ('NONAUTH',), + 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), + 'LSUB': ('AUTH', 'SELECTED'), + 'NAMESPACE': ('AUTH', 'SELECTED'), + 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), + 'PARTIAL': ('SELECTED',), # NB: obsolete + 'PROXYAUTH': ('AUTH',), + 'RENAME': ('AUTH', 'SELECTED'), + 'SEARCH': ('SELECTED',), + 'SELECT': ('AUTH', 'SELECTED'), + 'SETACL': ('AUTH', 'SELECTED'), + 'SETANNOTATION':('AUTH', 'SELECTED'), + 'SETQUOTA': ('AUTH', 'SELECTED'), + 'SORT': ('SELECTED',), + 'STATUS': ('AUTH', 'SELECTED'), + 'STORE': ('SELECTED',), + 'SUBSCRIBE': ('AUTH', 'SELECTED'), + 'THREAD': ('SELECTED',), + 'UID': ('SELECTED',), + 'UNSUBSCRIBE': ('AUTH', 'SELECTED'), + } + +# Patterns to match server responses + +Continuation = re.compile(r'\+( (?P<data>.*))?') +Flags = re.compile(r'.*FLAGS \((?P<flags>[^\)]*)\)') +InternalDate = re.compile(r'.*INTERNALDATE "' + r'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])' + r' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])' + r' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])' + r'"') +Literal = re.compile(r'.*{(?P<size>\d+)}$') +MapCRLF = re.compile(r'\r\n|\r|\n') +Response_code = re.compile(r'\[(?P<type>[A-Z-]+)( (?P<data>[^\]]*))?\]') +Untagged_response = re.compile(r'\* (?P<type>[A-Z-]+)( (?P<data>.*))?') +Untagged_status = re.compile(r'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?') + + + +class IMAP4: + + """IMAP4 client class. + + Instantiate with: IMAP4([host[, port]]) + + host - host's name (default: localhost); + port - port number (default: standard IMAP4 port). + + All IMAP4rev1 commands are supported by methods of the same + name (in lower-case). + + All arguments to commands are converted to strings, except for + AUTHENTICATE, and the last argument to APPEND which is passed as + an IMAP4 literal. If necessary (the string contains any + non-printing characters or white-space and isn't enclosed with + either parentheses or double quotes) each string is quoted. + However, the 'password' argument to the LOGIN command is always + quoted. If you want to avoid having an argument string quoted + (eg: the 'flags' argument to STORE) then enclose the string in + parentheses (eg: "(\Deleted)"). + + Each command returns a tuple: (type, [data, ...]) where 'type' + is usually 'OK' or 'NO', and 'data' is either the text from the + tagged response, or untagged results from command. Each 'data' + is either a string, or a tuple. If a tuple, then the first part + is the header of the response, and the second part contains + the data (ie: 'literal' value). + + Errors raise the exception class <instance>.error("<reason>"). + IMAP4 server errors raise <instance>.abort("<reason>"), + which is a sub-class of 'error'. Mailbox status changes + from READ-WRITE to READ-ONLY raise the exception class + <instance>.readonly("<reason>"), which is a sub-class of 'abort'. + + "error" exceptions imply a program error. + "abort" exceptions imply the connection should be reset, and + the command re-tried. + "readonly" exceptions imply the command should be re-tried. + + Note: to use this module, you must read the RFCs pertaining to the + IMAP4 protocol, as the semantics of the arguments to each IMAP4 + command are left to the invoker, not to mention the results. Also, + most IMAP servers implement a sub-set of the commands available here. + """ + + class error(Exception): pass # Logical errors - debug required + class abort(error): pass # Service errors - close and retry + class readonly(abort): pass # Mailbox status changed to READ-ONLY + + mustquote = re.compile(r"[^\w!#$%&'*+,.:;<=>?^`|~-]") + + def __init__(self, host = '', port = IMAP4_PORT): + self.debug = Debug + self.state = 'LOGOUT' + self.literal = None # A literal argument to a command + self.tagged_commands = {} # Tagged commands awaiting response + self.untagged_responses = {} # {typ: [data, ...], ...} + self.continuation_response = '' # Last continuation response + self.is_readonly = False # READ-ONLY desired state + self.tagnum = 0 + + # Open socket to server. + + self.open(host, port) + + # Create unique tag for this session, + # and compile tagged response matcher. + + self.tagpre = Int2AP(random.randint(4096, 65535)) + self.tagre = re.compile(r'(?P<tag>' + + self.tagpre + + r'\d+) (?P<type>[A-Z]+) (?P<data>.*)') + + # Get server welcome message, + # request and store CAPABILITY response. + + if __debug__: + self._cmd_log_len = 10 + self._cmd_log_idx = 0 + self._cmd_log = {} # Last `_cmd_log_len' interactions + if self.debug >= 1: + self._mesg('imaplib version %s' % __version__) + self._mesg('new IMAP4 connection, tag=%s' % self.tagpre) + + self.welcome = self._get_response() + if 'PREAUTH' in self.untagged_responses: + self.state = 'AUTH' + elif 'OK' in self.untagged_responses: + self.state = 'NONAUTH' + else: + raise self.error(self.welcome) + + typ, dat = self.capability() + if dat == [None]: + raise self.error('no CAPABILITY response from server') + self.capabilities = tuple(dat[-1].upper().split()) + + if __debug__: + if self.debug >= 3: + self._mesg('CAPABILITIES: %r' % (self.capabilities,)) + + for version in AllowedVersions: + if not version in self.capabilities: + continue + self.PROTOCOL_VERSION = version + return + + raise self.error('server not IMAP4 compliant') + + + def __getattr__(self, attr): + # Allow UPPERCASE variants of IMAP4 command methods. + if attr in Commands: + return getattr(self, attr.lower()) + raise AttributeError("Unknown IMAP4 command: '%s'" % attr) + + + + # Overridable methods + + + def open(self, host = '', port = IMAP4_PORT): + """Setup connection to remote server on "host:port" + (default: localhost:standard IMAP4 port). + This connection will be used by the routines: + read, readline, send, shutdown. + """ + self.host = host + self.port = port + self.sock = socket.create_connection((host, port)) + self.file = self.sock.makefile('rb') + + + def read(self, size): + """Read 'size' bytes from remote.""" + return self.file.read(size) + + + def readline(self): + """Read line from remote.""" + line = self.file.readline(_MAXLINE + 1) + if len(line) > _MAXLINE: + raise self.error("got more than %d bytes" % _MAXLINE) + return line + + + def send(self, data): + """Send data to remote.""" + self.sock.sendall(data) + + + def shutdown(self): + """Close I/O established in "open".""" + self.file.close() + try: + self.sock.shutdown(socket.SHUT_RDWR) + except socket.error as e: + # The server might already have closed the connection + if e.errno != errno.ENOTCONN: + raise + finally: + self.sock.close() + + + def socket(self): + """Return socket instance used to connect to IMAP4 server. + + socket = <instance>.socket() + """ + return self.sock + + + + # Utility methods + + + def recent(self): + """Return most recent 'RECENT' responses if any exist, + else prompt server for an update using the 'NOOP' command. + + (typ, [data]) = <instance>.recent() + + 'data' is None if no new messages, + else list of RECENT responses, most recent last. + """ + name = 'RECENT' + typ, dat = self._untagged_response('OK', [None], name) + if dat[-1]: + return typ, dat + typ, dat = self.noop() # Prod server for response + return self._untagged_response(typ, dat, name) + + + def response(self, code): + """Return data for response 'code' if received, or None. + + Old value for response 'code' is cleared. + + (code, [data]) = <instance>.response(code) + """ + return self._untagged_response(code, [None], code.upper()) + + + + # IMAP4 commands + + + def append(self, mailbox, flags, date_time, message): + """Append message to named mailbox. + + (typ, [data]) = <instance>.append(mailbox, flags, date_time, message) + + All args except `message' can be None. + """ + name = 'APPEND' + if not mailbox: + mailbox = 'INBOX' + if flags: + if (flags[0],flags[-1]) != ('(',')'): + flags = '(%s)' % flags + else: + flags = None + if date_time: + date_time = Time2Internaldate(date_time) + else: + date_time = None + self.literal = MapCRLF.sub(CRLF, message) + return self._simple_command(name, mailbox, flags, date_time) + + + def authenticate(self, mechanism, authobject): + """Authenticate command - requires response processing. + + 'mechanism' specifies which authentication mechanism is to + be used - it must appear in <instance>.capabilities in the + form AUTH=<mechanism>. + + 'authobject' must be a callable object: + + data = authobject(response) + + It will be called to process server continuation responses. + It should return data that will be encoded and sent to server. + It should return None if the client abort response '*' should + be sent instead. + """ + mech = mechanism.upper() + # XXX: shouldn't this code be removed, not commented out? + #cap = 'AUTH=%s' % mech + #if not cap in self.capabilities: # Let the server decide! + # raise self.error("Server doesn't allow %s authentication." % mech) + self.literal = _Authenticator(authobject).process + typ, dat = self._simple_command('AUTHENTICATE', mech) + if typ != 'OK': + raise self.error(dat[-1]) + self.state = 'AUTH' + return typ, dat + + + def capability(self): + """(typ, [data]) = <instance>.capability() + Fetch capabilities list from server.""" + + name = 'CAPABILITY' + typ, dat = self._simple_command(name) + return self._untagged_response(typ, dat, name) + + + def check(self): + """Checkpoint mailbox on server. + + (typ, [data]) = <instance>.check() + """ + return self._simple_command('CHECK') + + + def close(self): + """Close currently selected mailbox. + + Deleted messages are removed from writable mailbox. + This is the recommended command before 'LOGOUT'. + + (typ, [data]) = <instance>.close() + """ + try: + typ, dat = self._simple_command('CLOSE') + finally: + self.state = 'AUTH' + return typ, dat + + + def copy(self, message_set, new_mailbox): + """Copy 'message_set' messages onto end of 'new_mailbox'. + + (typ, [data]) = <instance>.copy(message_set, new_mailbox) + """ + return self._simple_command('COPY', message_set, new_mailbox) + + + def create(self, mailbox): + """Create new mailbox. + + (typ, [data]) = <instance>.create(mailbox) + """ + return self._simple_command('CREATE', mailbox) + + + def delete(self, mailbox): + """Delete old mailbox. + + (typ, [data]) = <instance>.delete(mailbox) + """ + return self._simple_command('DELETE', mailbox) + + def deleteacl(self, mailbox, who): + """Delete the ACLs (remove any rights) set for who on mailbox. + + (typ, [data]) = <instance>.deleteacl(mailbox, who) + """ + return self._simple_command('DELETEACL', mailbox, who) + + def expunge(self): + """Permanently remove deleted items from selected mailbox. + + Generates 'EXPUNGE' response for each deleted message. + + (typ, [data]) = <instance>.expunge() + + 'data' is list of 'EXPUNGE'd message numbers in order received. + """ + name = 'EXPUNGE' + typ, dat = self._simple_command(name) + return self._untagged_response(typ, dat, name) + + + def fetch(self, message_set, message_parts): + """Fetch (parts of) messages. + + (typ, [data, ...]) = <instance>.fetch(message_set, message_parts) + + 'message_parts' should be a string of selected parts + enclosed in parentheses, eg: "(UID BODY[TEXT])". + + 'data' are tuples of message part envelope and data. + """ + name = 'FETCH' + typ, dat = self._simple_command(name, message_set, message_parts) + return self._untagged_response(typ, dat, name) + + + def getacl(self, mailbox): + """Get the ACLs for a mailbox. + + (typ, [data]) = <instance>.getacl(mailbox) + """ + typ, dat = self._simple_command('GETACL', mailbox) + return self._untagged_response(typ, dat, 'ACL') + + + def getannotation(self, mailbox, entry, attribute): + """(typ, [data]) = <instance>.getannotation(mailbox, entry, attribute) + Retrieve ANNOTATIONs.""" + + typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute) + return self._untagged_response(typ, dat, 'ANNOTATION') + + + def getquota(self, root): + """Get the quota root's resource usage and limits. + + Part of the IMAP4 QUOTA extension defined in rfc2087. + + (typ, [data]) = <instance>.getquota(root) + """ + typ, dat = self._simple_command('GETQUOTA', root) + return self._untagged_response(typ, dat, 'QUOTA') + + + def getquotaroot(self, mailbox): + """Get the list of quota roots for the named mailbox. + + (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = <instance>.getquotaroot(mailbox) + """ + typ, dat = self._simple_command('GETQUOTAROOT', mailbox) + typ, quota = self._untagged_response(typ, dat, 'QUOTA') + typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT') + return typ, [quotaroot, quota] + + + def list(self, directory='""', pattern='*'): + """List mailbox names in directory matching pattern. + + (typ, [data]) = <instance>.list(directory='""', pattern='*') + + 'data' is list of LIST responses. + """ + name = 'LIST' + typ, dat = self._simple_command(name, directory, pattern) + return self._untagged_response(typ, dat, name) + + + def login(self, user, password): + """Identify client using plaintext password. + + (typ, [data]) = <instance>.login(user, password) + + NB: 'password' will be quoted. + """ + typ, dat = self._simple_command('LOGIN', user, self._quote(password)) + if typ != 'OK': + raise self.error(dat[-1]) + self.state = 'AUTH' + return typ, dat + + + def login_cram_md5(self, user, password): + """ Force use of CRAM-MD5 authentication. + + (typ, [data]) = <instance>.login_cram_md5(user, password) + """ + self.user, self.password = user, password + return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH) + + + def _CRAM_MD5_AUTH(self, challenge): + """ Authobject to use with CRAM-MD5 authentication. """ + import hmac + return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest() + + + def logout(self): + """Shutdown connection to server. + + (typ, [data]) = <instance>.logout() + + Returns server 'BYE' response. + """ + self.state = 'LOGOUT' + try: typ, dat = self._simple_command('LOGOUT') + except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]] + self.shutdown() + if 'BYE' in self.untagged_responses: + return 'BYE', self.untagged_responses['BYE'] + return typ, dat + + + def lsub(self, directory='""', pattern='*'): + """List 'subscribed' mailbox names in directory matching pattern. + + (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*') + + 'data' are tuples of message part envelope and data. + """ + name = 'LSUB' + typ, dat = self._simple_command(name, directory, pattern) + return self._untagged_response(typ, dat, name) + + def myrights(self, mailbox): + """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox). + + (typ, [data]) = <instance>.myrights(mailbox) + """ + typ,dat = self._simple_command('MYRIGHTS', mailbox) + return self._untagged_response(typ, dat, 'MYRIGHTS') + + def namespace(self): + """ Returns IMAP namespaces ala rfc2342 + + (typ, [data, ...]) = <instance>.namespace() + """ + name = 'NAMESPACE' + typ, dat = self._simple_command(name) + return self._untagged_response(typ, dat, name) + + + def noop(self): + """Send NOOP command. + + (typ, [data]) = <instance>.noop() + """ + if __debug__: + if self.debug >= 3: + self._dump_ur(self.untagged_responses) + return self._simple_command('NOOP') + + + def partial(self, message_num, message_part, start, length): + """Fetch truncated part of a message. + + (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length) + + 'data' is tuple of message part envelope and data. + """ + name = 'PARTIAL' + typ, dat = self._simple_command(name, message_num, message_part, start, length) + return self._untagged_response(typ, dat, 'FETCH') + + + def proxyauth(self, user): + """Assume authentication as "user". + + Allows an authorised administrator to proxy into any user's + mailbox. + + (typ, [data]) = <instance>.proxyauth(user) + """ + + name = 'PROXYAUTH' + return self._simple_command('PROXYAUTH', user) + + + def rename(self, oldmailbox, newmailbox): + """Rename old mailbox name to new. + + (typ, [data]) = <instance>.rename(oldmailbox, newmailbox) + """ + return self._simple_command('RENAME', oldmailbox, newmailbox) + + + def search(self, charset, *criteria): + """Search mailbox for matching messages. + + (typ, [data]) = <instance>.search(charset, criterion, ...) + + 'data' is space separated list of matching message numbers. + """ + name = 'SEARCH' + if charset: + typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria) + else: + typ, dat = self._simple_command(name, *criteria) + return self._untagged_response(typ, dat, name) + + + def select(self, mailbox='INBOX', readonly=False): + """Select a mailbox. + + Flush all untagged responses. + + (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=False) + + 'data' is count of messages in mailbox ('EXISTS' response). + + Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY'), so + other responses should be obtained via <instance>.response('FLAGS') etc. + """ + self.untagged_responses = {} # Flush old responses. + self.is_readonly = readonly + if readonly: + name = 'EXAMINE' + else: + name = 'SELECT' + typ, dat = self._simple_command(name, mailbox) + if typ != 'OK': + self.state = 'AUTH' # Might have been 'SELECTED' + return typ, dat + self.state = 'SELECTED' + if 'READ-ONLY' in self.untagged_responses \ + and not readonly: + if __debug__: + if self.debug >= 1: + self._dump_ur(self.untagged_responses) + raise self.readonly('%s is not writable' % mailbox) + return typ, self.untagged_responses.get('EXISTS', [None]) + + + def setacl(self, mailbox, who, what): + """Set a mailbox acl. + + (typ, [data]) = <instance>.setacl(mailbox, who, what) + """ + return self._simple_command('SETACL', mailbox, who, what) + + + def setannotation(self, *args): + """(typ, [data]) = <instance>.setannotation(mailbox[, entry, attribute]+) + Set ANNOTATIONs.""" + + typ, dat = self._simple_command('SETANNOTATION', *args) + return self._untagged_response(typ, dat, 'ANNOTATION') + + + def setquota(self, root, limits): + """Set the quota root's resource limits. + + (typ, [data]) = <instance>.setquota(root, limits) + """ + typ, dat = self._simple_command('SETQUOTA', root, limits) + return self._untagged_response(typ, dat, 'QUOTA') + + + def sort(self, sort_criteria, charset, *search_criteria): + """IMAP4rev1 extension SORT command. + + (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...) + """ + name = 'SORT' + #if not name in self.capabilities: # Let the server decide! + # raise self.error('unimplemented extension command: %s' % name) + if (sort_criteria[0],sort_criteria[-1]) != ('(',')'): + sort_criteria = '(%s)' % sort_criteria + typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria) + return self._untagged_response(typ, dat, name) + + + def status(self, mailbox, names): + """Request named status conditions for mailbox. + + (typ, [data]) = <instance>.status(mailbox, names) + """ + name = 'STATUS' + #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! + # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) + typ, dat = self._simple_command(name, mailbox, names) + return self._untagged_response(typ, dat, name) + + + def store(self, message_set, command, flags): + """Alters flag dispositions for messages in mailbox. + + (typ, [data]) = <instance>.store(message_set, command, flags) + """ + if (flags[0],flags[-1]) != ('(',')'): + flags = '(%s)' % flags # Avoid quoting the flags + typ, dat = self._simple_command('STORE', message_set, command, flags) + return self._untagged_response(typ, dat, 'FETCH') + + + def subscribe(self, mailbox): + """Subscribe to new mailbox. + + (typ, [data]) = <instance>.subscribe(mailbox) + """ + return self._simple_command('SUBSCRIBE', mailbox) + + + def thread(self, threading_algorithm, charset, *search_criteria): + """IMAPrev1 extension THREAD command. + + (type, [data]) = <instance>.thread(threading_algorithm, charset, search_criteria, ...) + """ + name = 'THREAD' + typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria) + return self._untagged_response(typ, dat, name) + + + def uid(self, command, *args): + """Execute "command arg ..." with messages identified by UID, + rather than message number. + + (typ, [data]) = <instance>.uid(command, arg1, arg2, ...) + + Returns response appropriate to 'command'. + """ + command = command.upper() + if not command in Commands: + raise self.error("Unknown IMAP4 UID command: %s" % command) + if self.state not in Commands[command]: + raise self.error("command %s illegal in state %s, " + "only allowed in states %s" % + (command, self.state, + ', '.join(Commands[command]))) + name = 'UID' + typ, dat = self._simple_command(name, command, *args) + if command in ('SEARCH', 'SORT', 'THREAD'): + name = command + else: + name = 'FETCH' + return self._untagged_response(typ, dat, name) + + + def unsubscribe(self, mailbox): + """Unsubscribe from old mailbox. + + (typ, [data]) = <instance>.unsubscribe(mailbox) + """ + return self._simple_command('UNSUBSCRIBE', mailbox) + + + def xatom(self, name, *args): + """Allow simple extension commands + notified by server in CAPABILITY response. + + Assumes command is legal in current state. + + (typ, [data]) = <instance>.xatom(name, arg, ...) + + Returns response appropriate to extension command `name'. + """ + name = name.upper() + #if not name in self.capabilities: # Let the server decide! + # raise self.error('unknown extension command: %s' % name) + if not name in Commands: + Commands[name] = (self.state,) + return self._simple_command(name, *args) + + + + # Private methods + + + def _append_untagged(self, typ, dat): + + if dat is None: dat = '' + ur = self.untagged_responses + if __debug__: + if self.debug >= 5: + self._mesg('untagged_responses[%s] %s += ["%s"]' % + (typ, len(ur.get(typ,'')), dat)) + if typ in ur: + ur[typ].append(dat) + else: + ur[typ] = [dat] + + + def _check_bye(self): + bye = self.untagged_responses.get('BYE') + if bye: + raise self.abort(bye[-1]) + + + def _command(self, name, *args): + + if self.state not in Commands[name]: + self.literal = None + raise self.error("command %s illegal in state %s, " + "only allowed in states %s" % + (name, self.state, + ', '.join(Commands[name]))) + + for typ in ('OK', 'NO', 'BAD'): + if typ in self.untagged_responses: + del self.untagged_responses[typ] + + if 'READ-ONLY' in self.untagged_responses \ + and not self.is_readonly: + raise self.readonly('mailbox status changed to READ-ONLY') + + tag = self._new_tag() + data = '%s %s' % (tag, name) + for arg in args: + if arg is None: continue + data = '%s %s' % (data, self._checkquote(arg)) + + literal = self.literal + if literal is not None: + self.literal = None + if type(literal) is type(self._command): + literator = literal + else: + literator = None + data = '%s {%s}' % (data, len(literal)) + + if __debug__: + if self.debug >= 4: + self._mesg('> %s' % data) + else: + self._log('> %s' % data) + + try: + self.send('%s%s' % (data, CRLF)) + except (socket.error, OSError), val: + raise self.abort('socket error: %s' % val) + + if literal is None: + return tag + + while 1: + # Wait for continuation response + + while self._get_response(): + if self.tagged_commands[tag]: # BAD/NO? + return tag + + # Send literal + + if literator: + literal = literator(self.continuation_response) + + if __debug__: + if self.debug >= 4: + self._mesg('write literal size %s' % len(literal)) + + try: + self.send(literal) + self.send(CRLF) + except (socket.error, OSError), val: + raise self.abort('socket error: %s' % val) + + if not literator: + break + + return tag + + + def _command_complete(self, name, tag): + # BYE is expected after LOGOUT + if name != 'LOGOUT': + self._check_bye() + try: + typ, data = self._get_tagged_response(tag) + except self.abort, val: + raise self.abort('command: %s => %s' % (name, val)) + except self.error, val: + raise self.error('command: %s => %s' % (name, val)) + if name != 'LOGOUT': + self._check_bye() + if typ == 'BAD': + raise self.error('%s command error: %s %s' % (name, typ, data)) + return typ, data + + + def _get_response(self): + + # Read response and store. + # + # Returns None for continuation responses, + # otherwise first response line received. + + resp = self._get_line() + + # Command completion response? + + if self._match(self.tagre, resp): + tag = self.mo.group('tag') + if not tag in self.tagged_commands: + raise self.abort('unexpected tagged response: %s' % resp) + + typ = self.mo.group('type') + dat = self.mo.group('data') + self.tagged_commands[tag] = (typ, [dat]) + else: + dat2 = None + + # '*' (untagged) responses? + + if not self._match(Untagged_response, resp): + if self._match(Untagged_status, resp): + dat2 = self.mo.group('data2') + + if self.mo is None: + # Only other possibility is '+' (continuation) response... + + if self._match(Continuation, resp): + self.continuation_response = self.mo.group('data') + return None # NB: indicates continuation + + raise self.abort("unexpected response: '%s'" % resp) + + typ = self.mo.group('type') + dat = self.mo.group('data') + if dat is None: dat = '' # Null untagged response + if dat2: dat = dat + ' ' + dat2 + + # Is there a literal to come? + + while self._match(Literal, dat): + + # Read literal direct from connection. + + size = int(self.mo.group('size')) + if __debug__: + if self.debug >= 4: + self._mesg('read literal size %s' % size) + data = self.read(size) + + # Store response with literal as tuple + + self._append_untagged(typ, (dat, data)) + + # Read trailer - possibly containing another literal + + dat = self._get_line() + + self._append_untagged(typ, dat) + + # Bracketed response information? + + if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat): + self._append_untagged(self.mo.group('type'), self.mo.group('data')) + + if __debug__: + if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'): + self._mesg('%s response: %s' % (typ, dat)) + + return resp + + + def _get_tagged_response(self, tag): + + while 1: + result = self.tagged_commands[tag] + if result is not None: + del self.tagged_commands[tag] + return result + + # If we've seen a BYE at this point, the socket will be + # closed, so report the BYE now. + + self._check_bye() + + # Some have reported "unexpected response" exceptions. + # Note that ignoring them here causes loops. + # Instead, send me details of the unexpected response and + # I'll update the code in `_get_response()'. + + try: + self._get_response() + except self.abort, val: + if __debug__: + if self.debug >= 1: + self.print_log() + raise + + + def _get_line(self): + + line = self.readline() + if not line: + raise self.abort('socket error: EOF') + + # Protocol mandates all lines terminated by CRLF + if not line.endswith('\r\n'): + raise self.abort('socket error: unterminated line') + + line = line[:-2] + if __debug__: + if self.debug >= 4: + self._mesg('< %s' % line) + else: + self._log('< %s' % line) + return line + + + def _match(self, cre, s): + + # Run compiled regular expression match method on 's'. + # Save result, return success. + + self.mo = cre.match(s) + if __debug__: + if self.mo is not None and self.debug >= 5: + self._mesg("\tmatched r'%s' => %r" % (cre.pattern, self.mo.groups())) + return self.mo is not None + + + def _new_tag(self): + + tag = '%s%s' % (self.tagpre, self.tagnum) + self.tagnum = self.tagnum + 1 + self.tagged_commands[tag] = None + return tag + + + def _checkquote(self, arg): + + # Must quote command args if non-alphanumeric chars present, + # and not already quoted. + + if type(arg) is not type(''): + return arg + if len(arg) >= 2 and (arg[0],arg[-1]) in (('(',')'),('"','"')): + return arg + if arg and self.mustquote.search(arg) is None: + return arg + return self._quote(arg) + + + def _quote(self, arg): + + arg = arg.replace('\\', '\\\\') + arg = arg.replace('"', '\\"') + + return '"%s"' % arg + + + def _simple_command(self, name, *args): + + return self._command_complete(name, self._command(name, *args)) + + + def _untagged_response(self, typ, dat, name): + + if typ == 'NO': + return typ, dat + if not name in self.untagged_responses: + return typ, [None] + data = self.untagged_responses.pop(name) + if __debug__: + if self.debug >= 5: + self._mesg('untagged_responses[%s] => %s' % (name, data)) + return typ, data + + + if __debug__: + + def _mesg(self, s, secs=None): + if secs is None: + secs = time.time() + tm = time.strftime('%M:%S', time.localtime(secs)) + sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s)) + sys.stderr.flush() + + def _dump_ur(self, dict): + # Dump untagged responses (in `dict'). + l = dict.items() + if not l: return + t = '\n\t\t' + l = map(lambda x:'%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l) + self._mesg('untagged responses dump:%s%s' % (t, t.join(l))) + + def _log(self, line): + # Keep log of last `_cmd_log_len' interactions for debugging. + self._cmd_log[self._cmd_log_idx] = (line, time.time()) + self._cmd_log_idx += 1 + if self._cmd_log_idx >= self._cmd_log_len: + self._cmd_log_idx = 0 + + def print_log(self): + self._mesg('last %d IMAP4 interactions:' % len(self._cmd_log)) + i, n = self._cmd_log_idx, self._cmd_log_len + while n: + try: + self._mesg(*self._cmd_log[i]) + except: + pass + i += 1 + if i >= self._cmd_log_len: + i = 0 + n -= 1 + + + +try: + import ssl +except ImportError: + pass +else: + class IMAP4_SSL(IMAP4): + + """IMAP4 client class over SSL connection + + Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile]]]]) + + host - host's name (default: localhost); + port - port number (default: standard IMAP4 SSL port). + keyfile - PEM formatted file that contains your private key (default: None); + certfile - PEM formatted certificate chain file (default: None); + + for more documentation see the docstring of the parent class IMAP4. + """ + + + def __init__(self, host = '', port = IMAP4_SSL_PORT, keyfile = None, certfile = None): + self.keyfile = keyfile + self.certfile = certfile + IMAP4.__init__(self, host, port) + + + def open(self, host = '', port = IMAP4_SSL_PORT): + """Setup connection to remote server on "host:port". + (default: localhost:standard IMAP4 SSL port). + This connection will be used by the routines: + read, readline, send, shutdown. + """ + self.host = host + self.port = port + self.sock = socket.create_connection((host, port)) + self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile) + self.file = self.sslobj.makefile('rb') + + + def read(self, size): + """Read 'size' bytes from remote.""" + return self.file.read(size) + + + def readline(self): + """Read line from remote.""" + return self.file.readline() + + + def send(self, data): + """Send data to remote.""" + bytes = len(data) + while bytes > 0: + sent = self.sslobj.write(data) + if sent == bytes: + break # avoid copy + data = data[sent:] + bytes = bytes - sent + + + def shutdown(self): + """Close I/O established in "open".""" + self.file.close() + self.sock.close() + + + def socket(self): + """Return socket instance used to connect to IMAP4 server. + + socket = <instance>.socket() + """ + return self.sock + + + def ssl(self): + """Return SSLObject instance used to communicate with the IMAP4 server. + + ssl = ssl.wrap_socket(<instance>.socket) + """ + return self.sslobj + + __all__.append("IMAP4_SSL") + + +class IMAP4_stream(IMAP4): + + """IMAP4 client class over a stream + + Instantiate with: IMAP4_stream(command) + + where "command" is a string that can be passed to subprocess.Popen() + + for more documentation see the docstring of the parent class IMAP4. + """ + + + def __init__(self, command): + self.command = command + IMAP4.__init__(self) + + + def open(self, host = None, port = None): + """Setup a stream connection. + This connection will be used by the routines: + read, readline, send, shutdown. + """ + self.host = None # For compatibility with parent class + self.port = None + self.sock = None + self.file = None + self.process = subprocess.Popen(self.command, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + shell=True, close_fds=True) + self.writefile = self.process.stdin + self.readfile = self.process.stdout + + + def read(self, size): + """Read 'size' bytes from remote.""" + return self.readfile.read(size) + + + def readline(self): + """Read line from remote.""" + return self.readfile.readline() + + + def send(self, data): + """Send data to remote.""" + self.writefile.write(data) + self.writefile.flush() + + + def shutdown(self): + """Close I/O established in "open".""" + self.readfile.close() + self.writefile.close() + self.process.wait() + + + +class _Authenticator: + + """Private class to provide en/decoding + for base64-based authentication conversation. + """ + + def __init__(self, mechinst): + self.mech = mechinst # Callable object to provide/process data + + def process(self, data): + ret = self.mech(self.decode(data)) + if ret is None: + return '*' # Abort conversation + return self.encode(ret) + + def encode(self, inp): + # + # Invoke binascii.b2a_base64 iteratively with + # short even length buffers, strip the trailing + # line feed from the result and append. "Even" + # means a number that factors to both 6 and 8, + # so when it gets to the end of the 8-bit input + # there's no partial 6-bit output. + # + oup = '' + while inp: + if len(inp) > 48: + t = inp[:48] + inp = inp[48:] + else: + t = inp + inp = '' + e = binascii.b2a_base64(t) + if e: + oup = oup + e[:-1] + return oup + + def decode(self, inp): + if not inp: + return '' + return binascii.a2b_base64(inp) + + + +Mon2num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, + 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} + +def Internaldate2tuple(resp): + """Parse an IMAP4 INTERNALDATE string. + + Return corresponding local time. The return value is a + time.struct_time instance or None if the string has wrong format. + """ + + mo = InternalDate.match(resp) + if not mo: + return None + + mon = Mon2num[mo.group('mon')] + zonen = mo.group('zonen') + + day = int(mo.group('day')) + year = int(mo.group('year')) + hour = int(mo.group('hour')) + min = int(mo.group('min')) + sec = int(mo.group('sec')) + zoneh = int(mo.group('zoneh')) + zonem = int(mo.group('zonem')) + + # INTERNALDATE timezone must be subtracted to get UT + + zone = (zoneh*60 + zonem)*60 + if zonen == '-': + zone = -zone + + tt = (year, mon, day, hour, min, sec, -1, -1, -1) + + utc = time.mktime(tt) + + # Following is necessary because the time module has no 'mkgmtime'. + # 'mktime' assumes arg in local timezone, so adds timezone/altzone. + + lt = time.localtime(utc) + if time.daylight and lt[-1]: + zone = zone + time.altzone + else: + zone = zone + time.timezone + + return time.localtime(utc - zone) + + + +def Int2AP(num): + + """Convert integer to A-P string representation.""" + + val = ''; AP = 'ABCDEFGHIJKLMNOP' + num = int(abs(num)) + while num: + num, mod = divmod(num, 16) + val = AP[mod] + val + return val + + + +def ParseFlags(resp): + + """Convert IMAP4 flags response to python tuple.""" + + mo = Flags.match(resp) + if not mo: + return () + + return tuple(mo.group('flags').split()) + + +def Time2Internaldate(date_time): + + """Convert date_time to IMAP4 INTERNALDATE representation. + + Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'. The + date_time argument can be a number (int or float) representing + seconds since epoch (as returned by time.time()), a 9-tuple + representing local time (as returned by time.localtime()), or a + double-quoted string. In the last case, it is assumed to already + be in the correct format. + """ + + if isinstance(date_time, (int, float)): + tt = time.localtime(date_time) + elif isinstance(date_time, (tuple, time.struct_time)): + tt = date_time + elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'): + return date_time # Assume in correct format + else: + raise ValueError("date_time not of a known type") + + dt = time.strftime("%d-%b-%Y %H:%M:%S", tt) + if dt[0] == '0': + dt = ' ' + dt[1:] + if time.daylight and tt[-1]: + zone = -time.altzone + else: + zone = -time.timezone + return '"' + dt + " %+03d%02d" % divmod(zone//60, 60) + '"' + + + +if __name__ == '__main__': + + # To test: invoke either as 'python imaplib.py [IMAP4_server_hostname]' + # or 'python imaplib.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"' + # to test the IMAP4_stream class + + import getopt, getpass + + try: + optlist, args = getopt.getopt(sys.argv[1:], 'd:s:') + except getopt.error, val: + optlist, args = (), () + + stream_command = None + for opt,val in optlist: + if opt == '-d': + Debug = int(val) + elif opt == '-s': + stream_command = val + if not args: args = (stream_command,) + + if not args: args = ('',) + + host = args[0] + + USER = getpass.getuser() + PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost")) + + test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':'\n'} + test_seq1 = ( + ('login', (USER, PASSWD)), + ('create', ('/tmp/xxx 1',)), + ('rename', ('/tmp/xxx 1', '/tmp/yyy')), + ('CREATE', ('/tmp/yyz 2',)), + ('append', ('/tmp/yyz 2', None, None, test_mesg)), + ('list', ('/tmp', 'yy*')), + ('select', ('/tmp/yyz 2',)), + ('search', (None, 'SUBJECT', 'test')), + ('fetch', ('1', '(FLAGS INTERNALDATE RFC822)')), + ('store', ('1', 'FLAGS', '(\Deleted)')), + ('namespace', ()), + ('expunge', ()), + ('recent', ()), + ('close', ()), + ) + + test_seq2 = ( + ('select', ()), + ('response',('UIDVALIDITY',)), + ('uid', ('SEARCH', 'ALL')), + ('response', ('EXISTS',)), + ('append', (None, None, None, test_mesg)), + ('recent', ()), + ('logout', ()), + ) + + def run(cmd, args): + M._mesg('%s %s' % (cmd, args)) + typ, dat = getattr(M, cmd)(*args) + M._mesg('%s => %s %s' % (cmd, typ, dat)) + if typ == 'NO': raise dat[0] + return dat + + try: + if stream_command: + M = IMAP4_stream(stream_command) + else: + M = IMAP4(host) + if M.state == 'AUTH': + test_seq1 = test_seq1[1:] # Login not needed + M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION) + M._mesg('CAPABILITIES = %r' % (M.capabilities,)) + + for cmd,args in test_seq1: + run(cmd, args) + + for ml in run('list', ('/tmp/', 'yy%')): + mo = re.match(r'.*"([^"]+)"$', ml) + if mo: path = mo.group(1) + else: path = ml.split()[-1] + run('delete', (path,)) + + for cmd,args in test_seq2: + dat = run(cmd, args) + + if (cmd,args) != ('uid', ('SEARCH', 'ALL')): + continue + + uid = dat[-1].split() + if not uid: continue + run('uid', ('FETCH', '%s' % uid[-1], + '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)')) + + print '\nAll tests OK.' + + except: + print '\nTests failed.' + + if not Debug: + print ''' +If you would like to see debugging output, +try: %s -d5 +''' % sys.argv[0] + + raise |