diff --git a/src/paperless_mail/mail.py b/src/paperless_mail/mail.py index 83771dbf5..93efdb844 100644 --- a/src/paperless_mail/mail.py +++ b/src/paperless_mail/mail.py @@ -10,6 +10,7 @@ from datetime import timedelta from fnmatch import fnmatch from pathlib import Path from typing import TYPE_CHECKING +from typing import Callable from typing import Optional from typing import Union @@ -43,6 +44,7 @@ from documents.tasks import consume_file from paperless_mail.models import MailAccount from paperless_mail.models import MailRule from paperless_mail.models import ProcessedMail +from paperless_mail.preprocessor import MailMessageDecryptor # Apple Mail sets multiple IMAP KEYWORD and the general "\Flagged" FLAG # imaplib => conn.fetch(b"", "FLAGS") @@ -426,8 +428,11 @@ class MailAccountHandler(LoggingMixin): logging_name = "paperless_mail" - def __init__(self) -> None: + _message_preprocessors: list[Callable[[MailMessage], MailMessage]] = [] + + def __init__(self, *gpgArgs, **gpgkwArgs) -> None: super().__init__() + self._message_preprocessors.append(MailMessageDecryptor(*gpgArgs, **gpgkwArgs)) self.renew_logging_group() def _correspondent_from_name(self, name: str) -> Optional[Correspondent]: @@ -535,6 +540,11 @@ class MailAccountHandler(LoggingMixin): return total_processed_files + def _preprocess_message(self, message: MailMessage): + for preprocessor in self._message_preprocessors: + message = preprocessor(message) + return message + def _handle_mail_rule( self, M: MailBox, @@ -613,6 +623,8 @@ class MailAccountHandler(LoggingMixin): return total_processed_files def _handle_message(self, message, rule: MailRule) -> int: + message = self._preprocess_message(message) + processed_elements = 0 # Skip Message handling when only attachments are to be processed but diff --git a/src/paperless_mail/preprocessor.py b/src/paperless_mail/preprocessor.py new file mode 100644 index 000000000..145a0f400 --- /dev/null +++ b/src/paperless_mail/preprocessor.py @@ -0,0 +1,67 @@ +from email import message_from_bytes +from email import policy +from email.message import Message + +from gnupg import GPG +from imap_tools import MailMessage + +from documents.loggers import LoggingMixin + + +class MailMessageDecryptor(LoggingMixin): + logging_name = "paperless_mail_message_decryptor" + + def __init__(self, *args, **kwargs): + super().__init__() + self.renew_logging_group() + self._gpg = GPG(*args, **kwargs) + + def __call__(self, message: MailMessage) -> MailMessage: + if not hasattr(message, "obj"): + self.log.debug("Message does not have 'obj' attribute") + return message + if message.obj.get_content_type() != "multipart/encrypted": + self.log.debug("Message not encrypted. Keep unchanged") + return message + + self.log.debug("Message is encrypted.") + email_message = self._to_email_message(message) + decrypted_raw_message = self._gpg.decrypt(email_message.as_string()) + + if not decrypted_raw_message.ok: + self.log.debug( + f"Message decryption failed with status message " + f"{decrypted_raw_message.status}", + ) + raise Exception( + f"Decryption failed: {decrypted_raw_message.status}, {decrypted_raw_message.stderr}", + ) + self.log.debug("Message decrypted successfully.") + + decrypted_message = self._build_decrypted_message( + decrypted_raw_message, + email_message, + ) + + return MailMessage( + [(f"UID {message.uid}".encode(), decrypted_message.as_bytes())], + ) + + @staticmethod + def _to_email_message(message: MailMessage) -> Message: + email_message = message_from_bytes( + message.obj.as_bytes(), + policy=policy.default, + ) + return email_message + + @staticmethod + def _build_decrypted_message(decrypted_raw_message, email_message): + decrypted_message = message_from_bytes( + decrypted_raw_message.data, + policy=policy.default, + ) + for header, value in email_message.items(): + if not decrypted_message.get(header): + decrypted_message.add_header(header, value) + return decrypted_message diff --git a/src/paperless_mail/tests/test_mail.py b/src/paperless_mail/tests/test_mail.py index 0920f033c..1010c5fe1 100644 --- a/src/paperless_mail/tests/test_mail.py +++ b/src/paperless_mail/tests/test_mail.py @@ -1,13 +1,17 @@ import dataclasses import email.contentmanager import random +import tempfile import uuid from collections import namedtuple from contextlib import AbstractContextManager +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart from typing import Optional from typing import Union from unittest import mock +import gnupg import pytest from django.core.management import call_command from django.db import DatabaseError @@ -30,6 +34,7 @@ from paperless_mail.mail import TagMailAction from paperless_mail.mail import apply_mail_action from paperless_mail.models import MailAccount from paperless_mail.models import MailRule +from paperless_mail.preprocessor import MailMessageDecryptor @dataclasses.dataclass @@ -193,6 +198,58 @@ def fake_magic_from_buffer(buffer, mime=False): return "Some verbose file description" +class MessageEncryptor: + def __init__(self): + self.gpg_home = tempfile.mkdtemp() + self.gpg = gnupg.GPG(gnupghome=self.gpg_home) + self._testUser = "testuser@example.com" + # Generate a new key + input_data = self.gpg.gen_key_input( + name_email=self._testUser, + passphrase=None, + key_type="RSA", + key_length=2048, + expire_date=0, + no_protection=True, + ) + self.gpg.gen_key(input_data) + + def encrypt(self, message): + original_email: email.message.Message = message.obj + encrypted_data = self.gpg.encrypt( + original_email.as_bytes(), + self._testUser, + armor=True, + ) + if not encrypted_data.ok: + raise Exception(f"Encryption failed: {encrypted_data.stderr}") + encrypted_email_content = encrypted_data.data + + new_email = MIMEMultipart("encrypted", protocol="application/pgp-encrypted") + new_email["From"] = original_email["From"] + new_email["To"] = original_email["To"] + new_email["Subject"] = original_email["Subject"] + + # Add the control part + control_part = MIMEApplication(_data=b"", _subtype="pgp-encrypted") + control_part.set_payload("Version: 1") + new_email.attach(control_part) + + # Add the encrypted data part + encrypted_part = MIMEApplication(_data=b"", _subtype="octet-stream") + encrypted_part.set_payload(encrypted_email_content.decode("ascii")) + encrypted_part.add_header( + "Content-Disposition", + 'attachment; filename="encrypted.asc"', + ) + new_email.attach(encrypted_part) + + encrypted_message = MailMessage( + [(f"UID {message.uid}".encode(), new_email.as_bytes())], + ) + return encrypted_message + + @mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer) class TestMail( DirectoriesMixin, @@ -215,7 +272,12 @@ class TestMail( self.reset_bogus_mailbox() - self.mail_account_handler = MailAccountHandler() + self.messageEncryptor = MessageEncryptor() + + self.mail_account_handler = MailAccountHandler( + gnupghome=self.messageEncryptor.gpg_home, + ) + super().setUp() def create_message( @@ -1283,6 +1345,72 @@ class TestMail( self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0) self.assertEqual(len(self.bogus_mailbox.messages), 3) + def test_decrypt_encrypted_mail(self): + """ + Creates a mail with attachments. Then encrypts it with a new key. + Verifies that this encrypted message can be decrypted with attachments intact. + """ + message = self.create_message( + body="Test message with 2 attachments", + attachments=[ + _AttachmentDef( + filename="f1.pdf", + disposition="inline", + ), + _AttachmentDef(filename="f2.pdf"), + ], + ) + headers = message.headers + text = message.text + encrypted_message = self.messageEncryptor.encrypt(message) + + self.assertEqual(len(encrypted_message.attachments), 1) + self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc") + self.assertEqual(encrypted_message.text, "") + + message_decryptor = MailMessageDecryptor( + gnupghome=self.messageEncryptor.gpg_home, + ) + decrypted_message = message_decryptor(encrypted_message) + + self.assertEqual(len(decrypted_message.attachments), 2) + self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf") + self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf") + self.assertEqual(decrypted_message.headers, headers) + self.assertEqual(decrypted_message.text, text) + self.assertEqual(decrypted_message.uid, message.uid) + + def test_handle_encrypted_message(self): + message = self.create_message( + subject="the message title", + from_="Myself", + attachments=2, + ) + + encrypted_message = self.messageEncryptor.encrypt(message) + + account = MailAccount.objects.create() + rule = MailRule( + assign_title_from=MailRule.TitleSource.FROM_FILENAME, + account=account, + ) + rule.save() + + result = self.mail_account_handler._handle_message(encrypted_message, rule) + + self.assertEqual(result, 2) + + self._queue_consumption_tasks_mock.assert_called() + + self.assert_queue_consumption_tasks_call_args( + [ + [ + {"override_title": "file_0", "override_filename": "file_0.pdf"}, + {"override_title": "file_1", "override_filename": "file_1.pdf"}, + ], + ], + ) + def assert_queue_consumption_tasks_call_args( self, expected_call_args: list[list[dict[str, str]]],