cement/tests/ext/test_ext_smtp.py

607 lines
18 KiB
Python

import os
import mock
import requests
import json
import png
from time import sleep
from pytest import raises
from cement.utils.test import TestApp
from cement.utils.misc import init_defaults
if 'SMTP_HOST' in os.environ.keys():
smtp_host = os.environ['SMTP_HOST']
else:
smtp_host = 'mailpit'
mailpit_api = f'http://{smtp_host}:8025/api/v1'
defaults = init_defaults('mail.smtp')
defaults['mail.smtp']['host'] = smtp_host
defaults['mail.smtp']['port'] = 1025
defaults['mail.smtp']['to'] = 'noreply@localhost'
defaults['mail.smtp']['from_addr'] = 'nobody@localhost'
defaults['mail.smtp']['subject_prefix'] = 'UNIT TEST >'
class SMTPApp(TestApp):
class Meta:
extensions = ['smtp']
mail_handler = 'smtp'
def delete_msg(message_id):
payload = {
"ids": [
message_id
]
}
payload = json.dumps(payload)
headers = {
'accept': 'application/json',
'content-type': 'application/json',
}
requests.delete(f"{mailpit_api}/messages", data=payload, headers=headers)
def test_smtp_send(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['From']['Address'] == f'from-{rando}@localhost'
assert msg['To'][0]['Address'] == f'to-{rando}@localhost'
assert msg['Subject'] == f'UNIT TEST > {rando}'
assert msg['Attachments'] == 0
assert msg['Cc'] in [None, []]
assert msg['Bcc'] in [None, []]
delete_msg(msg['ID'])
def test_smtp_send_with_message_id(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
message_id=f'message_id_{rando}')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Mailpit doesn't support this??? See: PR #742
# assert msg['Message-Id'] == f'message_id_{rando}'
delete_msg(msg['ID'])
def test_smtp_send_with_return_path(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
return_path=f'return_path_{rando}')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Mailpit doesn't support this??? See: PR #742
# assert msg['Return-Path'] == f'return_path_{rando}'
delete_msg(msg['ID'])
def test_smtp_send_with_reply_to(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
reply_to=f'reply_to_{rando}@localhost')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['ReplyTo'][0]['Address'] == f'reply_to_{rando}@localhost'
delete_msg(msg['ID'])
def test_smtp_send_with_x_headers(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
X_Test_Header=f'x_header_{rando}')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Mailpit doesn't support this??? See: PR #742
# assert msg['X_Test_Header'] == f'x_header_{rando}'
delete_msg(msg['ID'])
def test_smtp_send_with_base64_encoding(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
header_encoding='base64',
body_encoding='base64')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Not sure how to test this? See: PR #742
delete_msg(msg['ID'])
def test_smtp_send_with_qp_encoding(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
to=[f'to-{rando}@localhost'],
from_addr=f'from-{rando}@localhost',
header_encoding='qp',
body_encoding='qp')
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
# FIXME: Not sure how to test this? See: PR #742
delete_msg(msg['ID'])
def test_smtp_html(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send((f"{rando}", f"<body>{rando}</body>"))
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
text_url = f"http://{smtp_host}:8025/view/{msg['ID']}.txt"
res_text = requests.get(text_url)
assert res_text.content.decode('utf-8') == rando
html_url = f"http://{smtp_host}:8025/view/{msg['ID']}.html"
res_html = requests.get(html_url)
assert res_html.content.decode('utf-8') == f"<body>{rando}</body>"
delete_msg(msg['ID'])
def test_smtp_dict_text_and_html(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
body = dict(
text=rando,
html=f"<body>{rando}</body>"
)
app.mail.send(body)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
text_url = f"http://{smtp_host}:8025/view/{msg['ID']}.txt"
res_text = requests.get(text_url)
assert res_text.content.decode('utf-8') == rando
html_url = f"http://{smtp_host}:8025/view/{msg['ID']}.html"
res_html = requests.get(html_url)
assert res_html.content.decode('utf-8') == f"<body>{rando}</body>"
delete_msg(msg['ID'])
def test_smtp_dict_html_only(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
body = dict(
html=f"<body>{rando}</body>"
)
app.mail.send(body)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
html_url = f"http://{smtp_host}:8025/view/{msg['ID']}.html"
res_html = requests.get(html_url)
assert res_html.content.decode('utf-8') == f"<body>{rando}</body>"
delete_msg(msg['ID'])
def test_smtp_html_bad_body_type(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
# ruff: noqa: E501
error_msg = "(.*)Message body must be string, tuple(.*)"
with raises(TypeError, match=error_msg):
app.mail.send(['text', '<body>html</body>'])
def test_smtp_cc_bcc(rando):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
cc = [f"cc1-{rando}@localhost", f"cc2-{rando}@localhost"]
bcc = [f"bcc1-{rando}@localhost", f"bcc2-{rando}@localhost"]
app.mail.send(f"{rando}",
cc=cc,
bcc=bcc)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
cc_verify = [x['Address'] for x in msg['Cc']]
bcc_verify = [x['Address'] for x in msg['Bcc']]
assert f"cc1-{rando}@localhost" in cc_verify
assert f"cc2-{rando}@localhost" in cc_verify
assert f"bcc1-{rando}@localhost" in bcc_verify
assert f"bcc2-{rando}@localhost" in bcc_verify
delete_msg(msg['ID'])
def test_smtp_files(rando, tmp):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
files = []
for iter in [1, 2, 3]:
_file = f"{tmp.file}-{iter}"
with open(_file, 'w') as _open_file:
_open_file.write(f"{rando}-{iter}")
files.append(_file)
app.mail.send(f"{rando}", files=files)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['Attachments'] == 3
delete_msg(msg['ID'])
def test_smtp_dict_and_files(rando, tmp):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
files = []
for iter in [1, 2, 3]:
_file = f"{tmp.file}-{iter}"
with open(_file, 'w') as _open_file:
_open_file.write(f"{rando}-{iter}")
files.append(_file)
body = dict(
text=rando,
html=f"<body>{rando}</body>"
)
app.mail.send(body, files=files)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['Attachments'] == 3
delete_msg(msg['ID'])
def test_smtp_files_alt_name(rando, tmp):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
files = [(f"alt-filename-{rando}", tmp.file)]
app.mail.send(f"{rando}", files=files)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['Attachments'] == 1
res_full = requests.get(f"{mailpit_api}/message/{msg['ID']}")
data = res_full.json()
assert data['Attachments'][0]['FileName'] == f"alt-filename-{rando}"
delete_msg(msg['ID'])
def test_smtp_image_files_as_dict(rando, tmp):
defaults['mail.smtp']['subject'] = rando
image_file = os.path.join(tmp.dir, 'gradient.png')
# create a png (coverage)
width = 255
height = 255
img = []
for y in range(height):
row = ()
for x in range(width):
row = row + (x, max(0, 255 - x - y), y)
img.append(row)
with open(image_file, 'wb') as f:
w = png.Writer(width, height, greyscale=False)
w.write(f, img)
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
files=[dict(name=f'alt-filename-{rando}', path=image_file)])
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['Attachments'] == 1
res_full = requests.get(f"{mailpit_api}/message/{msg['ID']}")
data = res_full.json()
assert data['Attachments'][0]['FileName'] == f"alt-filename-{rando}"
delete_msg(msg['ID'])
def test_smtp_image_files_as_dict_inline(rando, tmp):
defaults['mail.smtp']['subject'] = rando
image_file = os.path.join(tmp.dir, 'gradient.png')
# create a png (coverage)
width = 255
height = 255
img = []
for y in range(height):
row = ()
for x in range(width):
row = row + (x, max(0, 255 - x - y), y)
img.append(row)
with open(image_file, 'wb') as f:
w = png.Writer(width, height, greyscale=False)
w.write(f, img)
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send(f"{rando}",
files=[dict(name=f'alt-filename-{rando}', path=image_file, cid=f'cid-{rando}')])
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
delete_msg(msg['ID'])
def test_smtp_files_path_does_not_exist(rando, tmp):
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
files = [(f"{tmp.file}-does-not-exist")]
with raises(FileNotFoundError):
app.mail.send(f"{rando}", files=files)
def test_smtp_files_alt_name_is_path(rando, tmp):
# ensure only the basename of file is set if full path is passed as alt
# name
defaults['mail.smtp']['subject'] = rando
with SMTPApp(config_defaults=defaults) as app:
app.run()
files = [(tmp.file, tmp.file)]
app.mail.send(f"{rando}", files=files)
sleep(3)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
assert msg['Attachments'] == 1
res_full = requests.get(f"{mailpit_api}/message/{msg['ID']}")
data = res_full.json()
assert data['Attachments'][0]['FileName'] == os.path.basename(tmp.file)
delete_msg(msg['ID'])
def test_smtp_tls(rando):
defaults['mail.smtp']['subject'] = rando
defaults['mail.smtp']['tls'] = True
with SMTPApp(config_defaults=defaults, debug=True) as app:
app.run()
app.mail.send(rando)
res = requests.get(f"{mailpit_api}/search?query={rando}")
data = res.json()
assert len(data['messages']) == 1
msg = data['messages'][0]
delete_msg(msg['ID'])
# FIXME: need to replace old mocks with mailpit tests
def test_mock_smtp_defaults():
defaults = init_defaults('mail.smtp')
defaults['mail.smtp']['to'] = 'test_smtp_defaults@localhost'
defaults['mail.smtp']['from_addr'] = 'nobody@localhost'
defaults['mail.smtp']['cc'] = 'nobody@localhost'
defaults['mail.smtp']['bcc'] = 'nobody@localhost'
defaults['mail.smtp']['subject'] = 'test_smtp_defaults'
defaults['mail.smtp']['subject_prefix'] = 'UNIT TEST >'
with mock.patch('smtplib.SMTP') as mock_smtp:
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send('TEST MESSAGE')
instance = mock_smtp.return_value
assert instance.send_message.call_count == 1
def test_mock_smtp_ssl():
defaults = init_defaults('mail.smtp')
defaults['mail.smtp']['ssl'] = True
defaults['mail.smtp']['tls'] = True
defaults['mail.smtp']['port'] = 25
defaults['mail.smtp']['subject'] = 'test_smtp_ssl'
defaults['mail.smtp']['subject_prefix'] = 'UNIT TEST >'
with mock.patch('smtplib.SMTP_SSL') as mock_smtp:
with SMTPApp(config_defaults=defaults, debug=True) as app:
app.run()
app.mail.send('TEST MESSAGE',
to=['test_smtp_ssl@localhost'],
from_addr='noreply@localhost')
instance = mock_smtp.return_value
assert instance.send_message.call_count == 1
def test_mock_smtp_tls_no_ssl():
defaults = init_defaults('mail.smtp')
defaults['mail.smtp']['ssl'] = False
defaults['mail.smtp']['tls'] = True
defaults['mail.smtp']['port'] = 25
defaults['mail.smtp']['subject'] = 'test_smtp_tls_no_ssl'
defaults['mail.smtp']['subject_prefix'] = 'UNIT TEST >'
with mock.patch('smtplib.SMTP') as mock_smtp:
with SMTPApp(config_defaults=defaults, debug=True) as app:
app.run()
app.mail.send('TEST MESSAGE',
to=['test_smtp_tls_no_ssl@localhost'],
from_addr='noreply@localhost')
instance = mock_smtp.return_value
assert instance.send_message.call_count == 1
assert instance.starttls.call_count == 1
def test_mock_smtp_tls_over_ssl():
defaults = init_defaults('mail.smtp')
defaults['mail.smtp']['ssl'] = True
defaults['mail.smtp']['tls'] = True
defaults['mail.smtp']['port'] = 25
defaults['mail.smtp']['subject'] = 'test_smtp_tls_over_ssl'
defaults['mail.smtp']['subject_prefix'] = 'UNIT TEST >'
with mock.patch('smtplib.SMTP_SSL') as mock_smtp:
with SMTPApp(config_defaults=defaults, debug=True) as app:
app.run()
app.mail.send('TEST MESSAGE',
to=['test_smtp_tls_no_ssl@localhost'],
from_addr='noreply@localhost')
instance = mock_smtp.return_value
assert instance.send_message.call_count == 1
assert instance.starttls.call_count == 1
def test_mock_smtp_auth(rando):
defaults = init_defaults(rando, 'mail.smtp')
defaults[rando]['debug'] = True
defaults['mail.smtp']['auth'] = True
defaults['mail.smtp']['username'] = 'john.doe'
defaults['mail.smtp']['password'] = 'password'
with mock.patch('smtplib.SMTP') as mock_smtp:
with SMTPApp(config_defaults=defaults) as app:
app.run()
app.mail.send('TEST MESSAGE',
to=['me@localhost'],
from_addr='noreply@localhost')
instance = mock_smtp.return_value
assert instance.login.call_count == 1
assert instance.send_message.call_count == 1