fix(ext_smtp): misc fixes and updates to better support content types

Ref: PR #742
This commit is contained in:
BJ Dierkes 2025-05-05 11:08:35 -05:00
parent a7d004b82d
commit 822c22a1ff
5 changed files with 388 additions and 110 deletions

View File

@ -6,6 +6,8 @@ Bugs:
- `[ext_jinja2]` Refactor hard-coded reference to `jinja2` template handler.
- [Issue #749](https://github.com/datafolklabs/cement/issues/749)
- `[ext_smtp]` Misc fixes and updates to better support content types.
- [PR #742](https://github.com/datafolklabs/cement/pull/742)
Features:

View File

@ -14,7 +14,7 @@ from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email import encoders
from email.utils import format_datetime, make_msgid
from typing import Any, Dict, Union, Tuple, TYPE_CHECKING
from typing import Any, Optional, Dict, Union, Tuple, TYPE_CHECKING
from ..core import mail
from ..utils import fs
from ..utils.misc import minimal_logger, is_true
@ -90,9 +90,7 @@ class SMTPMailHandler(mail.MailHandler):
params[item] = self.app.config.get(self._meta.config_section, item)
# some are only set by message
for item in [
'date', 'message_id', 'return_path', 'reply_to'
]:
for item in ['date', 'message_id', 'return_path', 'reply_to']:
value = kw.get(item, None)
if value is not None and str.strip(f'{value}') != '':
params[item] = kw.get(item, config_item)
@ -178,23 +176,33 @@ class SMTPMailHandler(mail.MailHandler):
server.quit()
# FIXME: how to check success? docs don't say return type
# - `[ext.scrub]` [Issue #724](https://github.com/datafolklabs/cement/issues/724)
return res
# FIXME: should deprecate for 3.0 and change in 3.2
# For smtplib this would be "senderrs" (dict), but for backward compat
# we need to return bool
# https://github.com/python/cpython/blob/3.13/Lib/smtplib.py#L899
self.app.log.error(f"SMTPHandler Errors: {res}")
if len(res) > 0:
# this will be difficult to test with Mailpit as it accepts everything... no cover
return False # pragma: nocover
else:
return True
def _header(self, value, _charset=None, **params):
return Header(value, charset=_charset) if params['header_encoding'] else value
def _header(self, value: Optional[str] = None, _charset: Optional[Charset] = None,
**params: Dict[str, Any]) -> Header:
header = Header(value, charset=_charset) if params['header_encoding'] else value
return header # type: ignore
def _make_message(self, body, **params):
def _make_message(self, body: Union[str, Tuple[str, str]], **params: Dict[str, Any]) \
-> MIMEMultipart:
# use encoding for header parts
cs_header = Charset(params['charset'])
cs_header = Charset(params['charset']) # type: ignore
if params['header_encoding'] == 'base64':
cs_header.header_encoding = BASE64
elif params['header_encoding'] == 'qp' or params['body_encoding'] == 'quoted-printable':
cs_header.header_encoding = QP
# use encoding for body parts
cs_body = Charset(params['charset'])
cs_body = Charset(params['charset']) # type: ignore
if params['body_encoding'] == 'base64':
cs_body.body_encoding = BASE64
elif params['body_encoding'] == 'qp' or params['body_encoding'] == 'quoted-printable':
@ -211,18 +219,24 @@ class SMTPMailHandler(mail.MailHandler):
raise TypeError(error_msg)
if isinstance(body, str):
partText = MIMEText(body, 'plain', _charset=cs_body)
partText = MIMEText(body, 'plain', _charset=cs_body) # type: ignore
elif isinstance(body, tuple):
# handle plain text
if len(body) >= 1 and body[0] and str.strip(body[0]) != '':
partText = MIMEText(str.strip(body[0]), 'plain', _charset=cs_body)
partText = MIMEText(str.strip(body[0]),
'plain',
_charset=cs_body) # type: ignore
# handle html
if len(body) >= 2 and body[1] and str.strip(body[1]) != '':
partHtml = MIMEText(str.strip(body[1]), 'html', _charset=cs_body)
partHtml = MIMEText(str.strip(body[1]),
'html',
_charset=cs_body) # type: ignore
elif isinstance(body, dict):
# handle plain text
if 'text' in body and str.strip(body['text']) != '':
partText = MIMEText(str.strip(body['text']), 'plain', _charset=cs_body)
partText = MIMEText(str.strip(body['text']),
'plain',
_charset=cs_body)
# handle html
if 'html' in body and str.strip(body['html']) != '':
partHtml = MIMEText(str.strip(body['html']), 'html', _charset=cs_body)
@ -236,50 +250,56 @@ class SMTPMailHandler(mail.MailHandler):
# Set message charset and encoding based on parts
if params['files']:
msg = MIMEMultipart('mixed')
msg.set_charset(params['charset'])
msg.set_charset(params['charset']) # type: ignore
elif partText and partHtml:
msg = MIMEMultipart('alternative')
msg.set_charset(params['charset'])
msg.set_charset(params['charset']) # type: ignore
elif partHtml:
msg = MIMEBase('text', 'html')
msg = MIMEBase('text', 'html') # type: ignore
msg.set_charset(cs_body)
else:
msg = MIMEBase('text', 'plain')
msg = MIMEBase('text', 'plain') # type: ignore
msg.set_charset(cs_body)
# create message
msg['From'] = params['from_addr']
msg['From'] = params['from_addr'] # type: ignore
msg['To'] = ', '.join(params['to'])
if params['cc']:
msg['Cc'] = ', '.join(params['cc'])
if params['bcc']:
msg['Bcc'] = ', '.join(params['bcc'])
if params['subject_prefix'] not in [None, '']:
msg['Subject'] = self._header(f"{params['subject_prefix']} {params['subject']}", _charset=cs_header, **params)
msg['Subject'] = self._header(f"{params['subject_prefix']} {params['subject']}",
_charset=cs_header, **params) # type: ignore
else:
msg['Subject'] = self._header(params['subject'], _charset=cs_header, **params)
msg['Subject'] = self._header(params['subject'], # type: ignore
_charset=cs_header,
**params)
# check for date
if is_true(params['date_enforce']) and not params.get('date', None):
params['date'] = format_datetime(datetime.now(timezone.utc))
params['date'] = format_datetime(datetime.now(timezone.utc)) # type: ignore
# check for message-id
if is_true(params['msgid_enforce']) and not params.get('message_id', None):
params['message_id'] = make_msgid(params['msgid_str'], params['msgid_domain'])
params['message_id'] = make_msgid(params['msgid_str'], # type: ignore
params['msgid_domain']) # type: ignore
# check for message headers
if params.get('date', None):
msg['Date'] = params['date']
msg['Date'] = params['date'] # type: ignore
if params.get('message_id', None):
msg['Message-Id'] = params['message_id']
msg['Message-Id'] = params['message_id'] # type: ignore
if params.get('return_path', None):
msg['Return-Path'] = params['return_path']
msg['Return-Path'] = params['return_path'] # type: ignore
if params.get('reply_to', None):
msg['Reply-To'] = params['reply_to']
msg['Reply-To'] = params['reply_to'] # type: ignore
# check for X-headers
for item in params.keys():
if item.startswith('X-'):
msg.add_header(item.title(), self._header(f'{params[item]}', _charset=cs_header, **params))
msg.add_header(item.title(),
self._header(f'{params[item]}', # type: ignore
_charset=cs_header, **params))
# append the body parts
if params['files']:
@ -303,7 +323,7 @@ class SMTPMailHandler(mail.MailHandler):
msg.attach(partText)
else:
# no body no files = empty message = just headers
pass
pass # pragma: no cover
else:
# multipart/alternative
if partText and partHtml:
@ -318,7 +338,7 @@ class SMTPMailHandler(mail.MailHandler):
msg.set_payload(partHtml.get_payload(), charset=cs_body)
else:
# no body no files = empty message = just headers
pass
pass # pragma: no cover
# attach files
if params['files']:
@ -326,26 +346,26 @@ class SMTPMailHandler(mail.MailHandler):
# support for alternative file name if its tuple or dict
# like [
# 'path/simple.ext',
# ('attname.ext', 'path/filename.ext'),
# ('attname.ext', 'path/filename.ext', 'cidname'),
# {'name': 'attname', 'path': 'path/filename.ext', cid: 'cidname'},
# ('altname.ext', 'path/filename.ext'),
# ('altname.ext', 'path/filename.ext', 'content_id'),
# {'name': 'altname', 'path': 'path/filename.ext', cid: 'cidname'},
# ]
if isinstance(in_path, tuple):
attname = in_path[0]
altname = os.path.basename(in_path[0])
path = in_path[1]
cid = in_path[2] if len(in_path) >= 3 else None
elif isinstance(in_path, dict):
attname = in_path.get('name', None)
altname = os.path.basename(in_path.get('name', None))
path = in_path.get('path')
cid = in_path.get('cid', None)
else:
attname = None
altname = None
path = in_path
cid = None
path = fs.abspath(path)
if not attname:
attname = os.path.basename(path)
if not altname:
altname = os.path.basename(path)
# add attachment payload from file
with open(path, 'rb') as file:
@ -363,57 +383,20 @@ class SMTPMailHandler(mail.MailHandler):
if cid:
part.add_header(
'Content-Disposition',
f'inline; filename={attname}',
f'inline; filename={altname}',
)
part.add_header('Content-ID', f'<{cid}>')
rel.attach(part)
msg.attach(part)
else:
# attname header
# altname header
part.add_header(
'Content-Disposition',
f'attachment; filename={attname}',
f'attachment; filename={altname}',
)
msg.attach(part)
return msg
def send_by_template(self, template, data={}, **kw):
# test if template exists by loading it
def _template_exists(template):
# check if stored in cache already
if template in self._template_exists_cache:
return self._template_exists_cache[template]
# do first time check from file or module
result = False
try:
# successfully load when available
self.app.template.load(template)
result = True
except:
pass
# store flag in cache list to prevent often load access
self._template_exists_cache[template] = result
# return state
return result
# prepare email params
params = dict(**kw)
# check render subject
if 'subject' not in params:
if _template_exists(f'{template}.title.jinja2'):
params['subject'] = self.app.render(data, f'{template}.title.jinja2', out=None)
# build body
body = list()
if _template_exists(f'{template}.plain.jinja2'):
body.append(self.app.render(dict(**data, mail_params=params), f'{template}.plain.jinja2', out=None))
if _template_exists(f'{template}.html.jinja2'):
# before adding a html part make sure that plain part exists
if len(body) == 0:
body.append('Content is delivered as HTML only.')
body.append(self.app.render(dict(**data, mail_params=params), f'{template}.html.jinja2', out=None))
# send the message
self.send(body=body, **params)
def load(app: App) -> None:
app.handler.register(SMTPMailHandler)

View File

@ -5,7 +5,7 @@
groups = ["default", "alarm", "argparse", "cli", "colorlog", "configparser", "daemon", "dev", "docs", "dummy", "generate", "jinja2", "json", "logging", "memcached", "mustache", "plugin", "print", "redis", "scrub", "smtp", "tabulate", "watchdog", "yaml"]
strategy = ["cross_platform", "inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:7837e4994dfec4e6dc58ed6160458ce67c146f206cca44eceef966cae3293b18"
content_hash = "sha256:edb453570d1f7e2dcd14a71e866fa1814be1c5341a716c96164858feae2e3662"
[[metadata.targets]]
requires_python = ">=3.8"
@ -644,6 +644,16 @@ files = [
{file = "pylibmc-1.6.3.tar.gz", hash = "sha256:eefa46115537abad65fbe2e032acd1b3463d9bf9e335af4b0916df4e4d3206e0"},
]
[[package]]
name = "pypng"
version = "0.20220715.0"
summary = "Pure Python library for saving and loading PNG images"
groups = ["dev"]
files = [
{file = "pypng-0.20220715.0-py3-none-any.whl", hash = "sha256:4a43e969b8f5aaafb2a415536c1a8ec7e341cd6a3f957fd5b5f32a4cfeed902c"},
{file = "pypng-0.20220715.0.tar.gz", hash = "sha256:739c433ba96f078315de54c0db975aee537cbc3e1d0ae4ed9aab0ca1e427e2c1"},
]
[[package]]
name = "pystache"
version = "0.6.8"

View File

@ -17,6 +17,7 @@ classifiers = [
dynamic = ["version", "README"]
requires-python = ">=3.8"
dependencies = []
[project.optional-dependencies]
alarm = []
@ -42,9 +43,6 @@ watchdog = ["watchdog"]
yaml = ["pyYaml"]
cli = ["cement[yaml,jinja2]"]
[tool.pdm.scripts]
cement = {call = "cement.cli.main:main"}
[project.scripts]
cement = "cement.cli.main:main"
@ -70,30 +68,6 @@ python_files= "test_*.py"
precision = 2
[tool.pdm.build]
package-dir = "."
includes = [
"cement/",
"cement/cli/templates/generate/",
"CONTRIBUTORS.md",
"CHANGELOG.md"
]
excludes = ["tests/"]
[tool.pdm.version]
source = "call"
getter = "cement.utils.version:get_version"
[tool.pdm.dev-dependencies]
dev = [
"pytest>=4.3.1",
"pytest-cov>=2.6.1",
"coverage>=4.5.3",
"mypy>=1.9.0",
"ruff>=0.3.2",
"mock>=5.1.0",
]
[tool.ruff]
target-version = "py38"
line-length = 100
@ -160,3 +134,32 @@ exclude = """(?x)(
^.git/ |
^tests
)"""
[tool.pdm.scripts]
cement = {call = "cement.cli.main:main"}
[tool.pdm.build]
package-dir = "."
includes = [
"cement/",
"cement/cli/templates/generate/",
"CONTRIBUTORS.md",
"CHANGELOG.md"
]
excludes = ["tests/"]
[tool.pdm.version]
source = "call"
getter = "cement.utils.version:get_version"
[dependency-groups]
dev = [
"pytest>=4.3.1",
"pytest-cov>=2.6.1",
"coverage>=4.5.3",
"mypy>=1.9.0",
"ruff>=0.3.2",
"mock>=5.1.0",
"pypng>=0.20220715.0",
]

View File

@ -3,6 +3,7 @@ import os
import mock
import requests
import json
import png
from time import sleep
from pytest import raises
from cement.utils.test import TestApp
@ -69,6 +70,137 @@ def test_smtp_send(rando):
delete_msg(msg['ID'])
def test_smtp_send_with_message_id(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
message_id=f'message_id_{rando}')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Mailpit doesn't support this??? See: PR #742
# assert msg['Message-Id'] == f'message_id_{rando}'
delete_msg(msg['ID'])
def test_smtp_send_with_return_path(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
return_path=f'return_path_{rando}')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Mailpit doesn't support this??? See: PR #742
# assert msg['Return-Path'] == f'return_path_{rando}'
delete_msg(msg['ID'])
def test_smtp_send_with_reply_to(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
reply_to=f'reply_to_{rando}@localhost')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['ReplyTo'][0]['Address'] == f'reply_to_{rando}@localhost'
delete_msg(msg['ID'])
def test_smtp_send_with_x_headers(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
X_Test_Header=f'x_header_{rando}')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Mailpit doesn't support this??? See: PR #742
# assert msg['X_Test_Header'] == f'x_header_{rando}'
delete_msg(msg['ID'])
def test_smtp_send_with_base64_encoding(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
header_encoding='base64',
body_encoding='base64')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Not sure how to test this? See: PR #742
delete_msg(msg['ID'])
def test_smtp_send_with_qp_encoding(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
header_encoding='qp',
body_encoding='qp')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Not sure how to test this? See: PR #742
delete_msg(msg['ID'])
def test_smtp_html(rando):
defaults['mail.smtp']['subject'] = rando
@ -93,13 +225,65 @@ def test_smtp_html(rando):
delete_msg(msg['ID'])
def test_smtp_dict_text_and_html(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
body = dict(
text=rando,
html=f"<body>{rando}</body>"
)
app.mail.send(body)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
text_url = f"http://{smtp_host}:8025/view/{msg['ID']}.txt"
res_text = requests.get(text_url)
assert res_text.content.decode('utf-8') == rando
html_url = f"http://{smtp_host}:8025/view/{msg['ID']}.html"
res_html = requests.get(html_url)
assert res_html.content.decode('utf-8') == f"<body>{rando}</body>"
delete_msg(msg['ID'])
def test_smtp_dict_html_only(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
body = dict(
html=f"<body>{rando}</body>"
)
app.mail.send(body)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
html_url = f"http://{smtp_host}:8025/view/{msg['ID']}.html"
res_html = requests.get(html_url)
assert res_html.content.decode('utf-8') == f"<body>{rando}</body>"
delete_msg(msg['ID'])
def test_smtp_html_bad_body_type(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
error_msg = '(.*)Message body must be string or tuple(.*)'
# ruff: noqa: E501
error_msg = "(.*)Message body must be string, tuple(.*)"
with raises(TypeError, match=error_msg):
app.mail.send(['text', '<body>html</body>'])
@ -155,6 +339,35 @@ def test_smtp_files(rando, tmp):
delete_msg(msg['ID'])
def test_smtp_dict_and_files(rando, tmp):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
files = []
for iter in [1, 2, 3]:
_file = f"{tmp.file}-{iter}"
with open(_file, 'w') as _open_file:
_open_file.write(f"{rando}-{iter}")
files.append(_file)
body = dict(
text=rando,
html=f"<body>{rando}</body>"
)
app.mail.send(body, files=files)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['Attachments'] == 3
delete_msg(msg['ID'])
def test_smtp_files_alt_name(rando, tmp):
defaults['mail.smtp']['subject'] = rando
@ -177,6 +390,73 @@ def test_smtp_files_alt_name(rando, tmp):
delete_msg(msg['ID'])
def test_smtp_image_files_as_dict(rando, tmp):
defaults['mail.smtp']['subject'] = rando
image_file = os.path.join(tmp.dir, 'gradient.png')
# create a png (coverage)
width = 255
height = 255
img = []
for y in range(height):
row = ()
for x in range(width):
row = row + (x, max(0, 255 - x - y), y)
img.append(row)
with open(image_file, 'wb') as f:
w = png.Writer(width, height, greyscale=False)
w.write(f, img)
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
files=[dict(name=f'alt-filename-{rando}', path=image_file)])
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['Attachments'] == 1
res_full = requests.get(f"{mailpit_api}/message/{msg['ID']}")
data = res_full.json()
assert data['Attachments'][0]['FileName'] == f"alt-filename-{rando}"
delete_msg(msg['ID'])
def test_smtp_image_files_as_dict_inline(rando, tmp):
defaults['mail.smtp']['subject'] = rando
image_file = os.path.join(tmp.dir, 'gradient.png')
# create a png (coverage)
width = 255
height = 255
img = []
for y in range(height):
row = ()
for x in range(width):
row = row + (x, max(0, 255 - x - y), y)
img.append(row)
with open(image_file, 'wb') as f:
w = png.Writer(width, height, greyscale=False)
w.write(f, img)
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
files=[dict(name=f'alt-filename-{rando}', path=image_file, cid=f'cid-{rando}')])
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
delete_msg(msg['ID'])
def test_smtp_files_path_does_not_exist(rando, tmp):
defaults['mail.smtp']['subject'] = rando