Add mail message preprocessor for gpg encrypted mails

This commit is contained in:
Daniel Bankmann 2024-08-12 21:01:54 +02:00 committed by shamoon
parent f5ec6de047
commit 5e40321883
3 changed files with 209 additions and 2 deletions

View File

@ -10,6 +10,7 @@ from datetime import timedelta
from fnmatch import fnmatch
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Callable
from typing import Optional
from typing import Union
@ -43,6 +44,7 @@ from documents.tasks import consume_file
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.models import ProcessedMail
from paperless_mail.preprocessor import MailMessageDecryptor
# Apple Mail sets multiple IMAP KEYWORD and the general "\Flagged" FLAG
# imaplib => conn.fetch(b"<message_id>", "FLAGS")
@ -426,8 +428,11 @@ class MailAccountHandler(LoggingMixin):
logging_name = "paperless_mail"
def __init__(self) -> None:
_message_preprocessors: list[Callable[[MailMessage], MailMessage]] = []
def __init__(self, *gpgArgs, **gpgkwArgs) -> None:
super().__init__()
self._message_preprocessors.append(MailMessageDecryptor(*gpgArgs, **gpgkwArgs))
self.renew_logging_group()
def _correspondent_from_name(self, name: str) -> Optional[Correspondent]:
@ -535,6 +540,11 @@ class MailAccountHandler(LoggingMixin):
return total_processed_files
def _preprocess_message(self, message: MailMessage):
for preprocessor in self._message_preprocessors:
message = preprocessor(message)
return message
def _handle_mail_rule(
self,
M: MailBox,
@ -613,6 +623,8 @@ class MailAccountHandler(LoggingMixin):
return total_processed_files
def _handle_message(self, message, rule: MailRule) -> int:
message = self._preprocess_message(message)
processed_elements = 0
# Skip Message handling when only attachments are to be processed but

View File

@ -0,0 +1,67 @@
from email import message_from_bytes
from email import policy
from email.message import Message
from gnupg import GPG
from imap_tools import MailMessage
from documents.loggers import LoggingMixin
class MailMessageDecryptor(LoggingMixin):
logging_name = "paperless_mail_message_decryptor"
def __init__(self, *args, **kwargs):
super().__init__()
self.renew_logging_group()
self._gpg = GPG(*args, **kwargs)
def __call__(self, message: MailMessage) -> MailMessage:
if not hasattr(message, "obj"):
self.log.debug("Message does not have 'obj' attribute")
return message
if message.obj.get_content_type() != "multipart/encrypted":
self.log.debug("Message not encrypted. Keep unchanged")
return message
self.log.debug("Message is encrypted.")
email_message = self._to_email_message(message)
decrypted_raw_message = self._gpg.decrypt(email_message.as_string())
if not decrypted_raw_message.ok:
self.log.debug(
f"Message decryption failed with status message "
f"{decrypted_raw_message.status}",
)
raise Exception(
f"Decryption failed: {decrypted_raw_message.status}, {decrypted_raw_message.stderr}",
)
self.log.debug("Message decrypted successfully.")
decrypted_message = self._build_decrypted_message(
decrypted_raw_message,
email_message,
)
return MailMessage(
[(f"UID {message.uid}".encode(), decrypted_message.as_bytes())],
)
@staticmethod
def _to_email_message(message: MailMessage) -> Message:
email_message = message_from_bytes(
message.obj.as_bytes(),
policy=policy.default,
)
return email_message
@staticmethod
def _build_decrypted_message(decrypted_raw_message, email_message):
decrypted_message = message_from_bytes(
decrypted_raw_message.data,
policy=policy.default,
)
for header, value in email_message.items():
if not decrypted_message.get(header):
decrypted_message.add_header(header, value)
return decrypted_message

View File

@ -1,13 +1,17 @@
import dataclasses
import email.contentmanager
import random
import tempfile
import uuid
from collections import namedtuple
from contextlib import AbstractContextManager
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from typing import Optional
from typing import Union
from unittest import mock
import gnupg
import pytest
from django.core.management import call_command
from django.db import DatabaseError
@ -30,6 +34,7 @@ from paperless_mail.mail import TagMailAction
from paperless_mail.mail import apply_mail_action
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.preprocessor import MailMessageDecryptor
@dataclasses.dataclass
@ -193,6 +198,58 @@ def fake_magic_from_buffer(buffer, mime=False):
return "Some verbose file description"
class MessageEncryptor:
def __init__(self):
self.gpg_home = tempfile.mkdtemp()
self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
self._testUser = "testuser@example.com"
# Generate a new key
input_data = self.gpg.gen_key_input(
name_email=self._testUser,
passphrase=None,
key_type="RSA",
key_length=2048,
expire_date=0,
no_protection=True,
)
self.gpg.gen_key(input_data)
def encrypt(self, message):
original_email: email.message.Message = message.obj
encrypted_data = self.gpg.encrypt(
original_email.as_bytes(),
self._testUser,
armor=True,
)
if not encrypted_data.ok:
raise Exception(f"Encryption failed: {encrypted_data.stderr}")
encrypted_email_content = encrypted_data.data
new_email = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
new_email["From"] = original_email["From"]
new_email["To"] = original_email["To"]
new_email["Subject"] = original_email["Subject"]
# Add the control part
control_part = MIMEApplication(_data=b"", _subtype="pgp-encrypted")
control_part.set_payload("Version: 1")
new_email.attach(control_part)
# Add the encrypted data part
encrypted_part = MIMEApplication(_data=b"", _subtype="octet-stream")
encrypted_part.set_payload(encrypted_email_content.decode("ascii"))
encrypted_part.add_header(
"Content-Disposition",
'attachment; filename="encrypted.asc"',
)
new_email.attach(encrypted_part)
encrypted_message = MailMessage(
[(f"UID {message.uid}".encode(), new_email.as_bytes())],
)
return encrypted_message
@mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
class TestMail(
DirectoriesMixin,
@ -215,7 +272,12 @@ class TestMail(
self.reset_bogus_mailbox()
self.mail_account_handler = MailAccountHandler()
self.messageEncryptor = MessageEncryptor()
self.mail_account_handler = MailAccountHandler(
gnupghome=self.messageEncryptor.gpg_home,
)
super().setUp()
def create_message(
@ -1283,6 +1345,72 @@ class TestMail(
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
def test_decrypt_encrypted_mail(self):
"""
Creates a mail with attachments. Then encrypts it with a new key.
Verifies that this encrypted message can be decrypted with attachments intact.
"""
message = self.create_message(
body="Test message with 2 attachments",
attachments=[
_AttachmentDef(
filename="f1.pdf",
disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
headers = message.headers
text = message.text
encrypted_message = self.messageEncryptor.encrypt(message)
self.assertEqual(len(encrypted_message.attachments), 1)
self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
self.assertEqual(encrypted_message.text, "")
message_decryptor = MailMessageDecryptor(
gnupghome=self.messageEncryptor.gpg_home,
)
decrypted_message = message_decryptor(encrypted_message)
self.assertEqual(len(decrypted_message.attachments), 2)
self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
self.assertEqual(decrypted_message.headers, headers)
self.assertEqual(decrypted_message.text, text)
self.assertEqual(decrypted_message.uid, message.uid)
def test_handle_encrypted_message(self):
message = self.create_message(
subject="the message title",
from_="Myself",
attachments=2,
)
encrypted_message = self.messageEncryptor.encrypt(message)
account = MailAccount.objects.create()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
)
rule.save()
result = self.mail_account_handler._handle_message(encrypted_message, rule)
self.assertEqual(result, 2)
self._queue_consumption_tasks_mock.assert_called()
self.assert_queue_consumption_tasks_call_args(
[
[
{"override_title": "file_0", "override_filename": "file_0.pdf"},
{"override_title": "file_1", "override_filename": "file_1.pdf"},
],
],
)
def assert_queue_consumption_tasks_call_args(
self,
expected_call_args: list[list[dict[str, str]]],