Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Doc/library/poplib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,23 @@ A :class:`POP3` instance has the following methods:
.. versionadded:: 3.4


.. method:: POP3.auth(self, mechanism, authobject, *, initial_response_ok=True)

Authenticate using the POP3 ``AUTH`` command as specified in :rfc:`5034`.

If *initial_response_ok* is true, ``authobject()`` is called first with no
arguments to obtain an optional initial response. It may return :class:`bytes`
or :class:`str`; if it returns :const:`None`, no initial response is sent.
The returned value is base64-encoded before being sent.

For each server challenge, ``authobject(challenge)`` is called with the
base64-decoded ``bytes`` challenge and must return the client response as
:class:`bytes` or :class:`str`. If it returns :const:`None`, an empty response
is sent. To abort the exchange, return ``b'*'`` (the client sends ``'*'``).

.. versionadded:: next


Instances of :class:`POP3_SSL` have no additional methods. The interface of this
subclass is identical to its parent.

Expand Down
78 changes: 78 additions & 0 deletions Lib/poplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

# Imports

import base64
import binascii
import errno
import re
import socket
Expand Down Expand Up @@ -46,6 +48,8 @@ class error_proto(Exception): pass
# 512 characters, including CRLF. We have selected 2048 just to be on
# the safe side.
_MAXLINE = 2048
# maximum number of AUTH challenges we are willing to process (parity with smtplib)
_MAXCHALLENGE = 5


class POP3:
Expand Down Expand Up @@ -424,6 +428,79 @@ def stls(self, context=None):
self._tls_established = True
return resp

def auth(self, mechanism, authobject, *, initial_response_ok=True):
"""Authenticate to the POP3 server using AUTH (RFC 5034)."""
mech = mechanism.upper()

initial = None
if initial_response_ok and callable(authobject):
initial = authobject()
if isinstance(initial, str):
initial = initial.encode('ascii', 'strict')
if initial is not None and not isinstance(initial, (bytes, bytearray)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow bytearray, this should be documented.

raise TypeError('authobject() must return str or bytes for initial response')

if initial is not None:
b64 = base64.b64encode(initial).decode('ascii')
cmd = f'AUTH {mech} {b64}'
if len(cmd.encode('ascii')) <= 253:
self._putcmd(cmd)
else:
self._putcmd(f'AUTH {mech}')
else:
self._putcmd(f'AUTH {mech}')

auth_challenge_count = 0
while True:
line, _ = self._getline()

if line.startswith(b'+OK'):
return line

if line.startswith(b'-ERR'):
raise error_proto(line.decode('ascii', 'replace'))
# Challenge line: "+ <b64>" or just "+" (empty challenge)
if line != b'+' and not line.startswith(b'+ '):
raise error_proto(f'malformed AUTH challenge line: {line!r}')

auth_challenge_count += 1
if auth_challenge_count > _MAXCHALLENGE:
raise error_proto('Server AUTH mechanism infinite loop. Last response: ', repr(line))

chal = line[1:]
if chal.startswith(b' '):
chal = chal[1:]
chal = chal.rstrip(b'\r\n')
if chal:
try:
challenge = base64.b64decode(chal, validate=True)
except (binascii.Error, ValueError):
self._putcmd('*')
line, _ = self._getline()
raise error_proto(line.decode('ascii', 'replace'))
else:
challenge = b''

resp = authobject(challenge)
if resp is None:
resp = b''
if isinstance(resp, str):
resp = resp.encode('ascii', 'strict')
if not isinstance(resp, (bytes, bytearray)):
raise TypeError('authobject(challenge) must return str or bytes')

if resp == b'*':
self._putcmd('*')
else:
self._putcmd(base64.b64encode(resp).decode('ascii'))

def auth_plain(self, user, password, authzid=''):
"""Return an authobject suitable for SASL PLAIN."""
def _auth_plain(challenge=None):
# Per RFC 4616, the response is: authzid UTF8 NUL authcid UTF8 NUL passwd UTF8
return f"{authzid}\0{user}\0{password}"
return _auth_plain


if HAVE_SSL:

Expand Down Expand Up @@ -459,6 +536,7 @@ def stls(self, context=None):
"""
raise error_proto('-ERR TLS session already established')


__all__.append("POP3_SSL")

if __name__ == "__main__":
Expand Down
81 changes: 80 additions & 1 deletion Lib/test/test_poplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
# a real test suite

import base64
import poplib
import socket
import os
Expand Down Expand Up @@ -49,7 +50,7 @@

class DummyPOP3Handler(asynchat.async_chat):

CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
CAPAS = {'UIDL': [], 'SASL': ['PLAIN'], 'IMPLEMENTATION': ['python-testlib-pop-server']}
enable_UTF8 = False

def __init__(self, conn):
Expand All @@ -59,6 +60,8 @@ def __init__(self, conn):
self.push('+OK dummy pop3 server ready. <timestamp>')
self.tls_active = False
self.tls_starting = False
self._auth_pending = False
self._auth_mech = None

def collect_incoming_data(self, data):
self.in_buffer.append(data)
Expand All @@ -67,6 +70,20 @@ def found_terminator(self):
line = b''.join(self.in_buffer)
line = str(line, 'ISO-8859-1')
self.in_buffer = []

if self._auth_pending:
self._auth_pending = False
if line == '*':
self.push('-ERR authentication cancelled')
return
try:
base64.b64decode(line.encode('ascii'))
except Exception:
self.push('-ERR invalid base64')
return
self.push('+OK Logged in.')
return

cmd = line.split(' ')[0].lower()
space = line.find(' ')
if space != -1:
Expand All @@ -85,6 +102,28 @@ def handle_error(self):
def push(self, data):
asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')

def cmd_auth(self, arg):
parts = arg.split()
if not parts:
self.push('-ERR missing mechanism')
return
mech = parts[0].upper()
if mech != 'PLAIN':
self.push('-ERR unsupported mechanism')
return
if len(parts) >= 2:
try:
base64.b64decode(parts[1].encode('ascii'))
except Exception:
self.push('-ERR invalid base64')
return
self.push('+OK Logged in.')
else:
self._auth_pending = True
self._auth_mech = mech
self.in_buffer.clear()
self.push('+ ')

def cmd_echo(self, arg):
# sends back the received string (used by the test suite)
self.push(arg)
Expand Down Expand Up @@ -286,6 +325,43 @@ def test_pass_(self):
self.assertOK(self.client.pass_('python'))
self.assertRaises(poplib.error_proto, self.client.user, 'invalid')

def test_auth_plain_initial_response(self):
secret = b"user\x00adminuser\x00password"
resp = self.client.auth("PLAIN", authobject=lambda: secret)
self.assertStartsWith(resp, b"+OK")

def test_auth_plain_challenge_response(self):
secret = b"user\x00adminuser\x00password"
def authobject():
return secret
resp = self.client.auth("PLAIN", authobject=authobject)
self.assertStartsWith(resp, b"+OK")

def test_auth_unsupported_mechanism(self):
with self.assertRaises(poplib.error_proto):
self.client.auth("FOO", authobject=lambda: b"")

def test_auth_cancel(self):
with self.assertRaises(poplib.error_proto):
self.client.auth("PLAIN", authobject=lambda chal: b"*", initial_response_ok=False)

def test_auth_mechanism_case_insensitive(self):
secret = b"user\x00adminuser\x00password"
# use lowercase mechanism name to ensure server accepts
resp = self.client.auth("plain", authobject=lambda: secret)
self.assertStartsWith(resp, b"+OK")

def test_auth_initial_response_str(self):
secret = "user\x00adminuser\x00password" # str, not bytes
resp = self.client.auth("PLAIN", authobject=lambda: secret)
self.assertStartsWith(resp, b"+OK")

def test_auth_authobject_returns_str(self):
def authobject():
return "user\x00adminuser\x00password"
resp = self.client.auth("PLAIN", authobject=authobject)
self.assertStartsWith(resp, b"+OK")

def test_stat(self):
self.assertEqual(self.client.stat(), (10, 100))

Expand Down Expand Up @@ -434,6 +510,9 @@ def __init__(self, conn):
self.push('+OK dummy pop3 server ready. <timestamp>')
self.tls_active = True
self.tls_starting = False
# Initialize AUTH state like DummyPOP3Handler to avoid AttributeError
self._auth_pending = False
self._auth_mech = None


@requires_ssl
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:mod:`poplib`: add :rfc:`5034` ``AUTH`` support.
Loading