Lets see if this breaks any existing tests

This commit is contained in:
Trenton H 2024-06-05 09:03:36 -07:00
parent 6ddb62bf3f
commit b237ff892b
5 changed files with 258 additions and 110 deletions

View File

@ -31,6 +31,7 @@ if settings.AUDIT_LOG_ENABLED:
from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_filename
from documents.management.commands.mixins import SecurityMixin
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
@ -47,6 +48,7 @@ from documents.models import WorkflowAction
from documents.models import WorkflowTrigger
from documents.settings import EXPORTER_ARCHIVE_NAME
from documents.settings import EXPORTER_FILE_NAME
from documents.settings import EXPORTER_SALT_NAME
from documents.settings import EXPORTER_THUMBNAIL_NAME
from documents.utils import copy_file_with_basic_stats
from paperless import version
@ -56,7 +58,7 @@ from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
class Command(BaseCommand):
class Command(SecurityMixin, BaseCommand):
help = (
"Decrypt and rename all files in our collection into a given target "
"directory. And include a manifest file containing document data for "
@ -165,6 +167,11 @@ class Command(BaseCommand):
help="If set, the progress bar will not be shown",
)
parser.add_argument(
"--passphrase",
help="If provided, is used to encrypt mail account passwords in the export",
)
def handle(self, *args, **options):
self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"]
@ -177,6 +184,7 @@ class Command(BaseCommand):
self.zip_export: bool = options["zip"]
self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"]
self.passphrase: Optional[str] = options.get("passphrase")
self.files_in_export_dir: set[Path] = set()
self.exported_files: set[str] = set()
@ -272,6 +280,8 @@ class Command(BaseCommand):
serializers.serialize("json", manifest_key_to_object_query[key]),
)
self.encrypt_secret_fields(manifest_dict)
# These are treated specially and included in the per-document manifest
# if that setting is enabled. Otherwise, they are just exported to the bulk
# manifest
@ -353,17 +363,22 @@ class Command(BaseCommand):
self.files_in_export_dir.remove(manifest_path)
# 4.2 write version information to target folder
version_path = (self.target / "version.json").resolve()
version_path.write_text(
extra_metadata_path = (self.target / "metadata.json").resolve()
metadata = {"version": version.__full_version_str__}
# 4.2.1 If needed, write the salt value into the metadata
if self.passphrase:
metadata[EXPORTER_SALT_NAME] = self.salt
extra_metadata_path.write_text(
json.dumps(
{"version": version.__full_version_str__},
metadata,
indent=2,
ensure_ascii=False,
),
encoding="utf-8",
)
if version_path in self.files_in_export_dir:
self.files_in_export_dir.remove(version_path)
if extra_metadata_path in self.files_in_export_dir:
self.files_in_export_dir.remove(extra_metadata_path)
if self.delete:
# 5. Remove files which we did not explicitly export in this run
@ -527,3 +542,13 @@ class Command(BaseCommand):
if perform_copy:
target.parent.mkdir(parents=True, exist_ok=True)
copy_file_with_basic_stats(source, target)
def encrypt_secret_fields(self, manifest: dict) -> None:
""" """
if self.passphrase:
self.setup_crypto()
for mail_account_record in manifest["mail_accounts"]:
mail_account_record["password"] = self.encrypt_field(
mail_account_record["password"],
)

View File

@ -3,6 +3,7 @@ import logging
import os
from contextlib import contextmanager
from pathlib import Path
from typing import Optional
import tqdm
from django.conf import settings
@ -21,6 +22,7 @@ from django.db.models.signals import post_save
from filelock import FileLock
from documents.file_handling import create_source_path_directory
from documents.management.commands.mixins import SecurityMixin
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
@ -31,6 +33,7 @@ from documents.models import Tag
from documents.parsers import run_convert
from documents.settings import EXPORTER_ARCHIVE_NAME
from documents.settings import EXPORTER_FILE_NAME
from documents.settings import EXPORTER_SALT_NAME
from documents.settings import EXPORTER_THUMBNAIL_NAME
from documents.signals.handlers import update_filename_and_move_files
from documents.utils import copy_file_with_basic_stats
@ -49,7 +52,7 @@ def disable_signal(sig, receiver, sender):
sig.connect(receiver=receiver, sender=sender)
class Command(BaseCommand):
class Command(SecurityMixin, BaseCommand):
help = (
"Using a manifest.json file, load the data from there, and import the "
"documents it refers to."
@ -72,92 +75,173 @@ class Command(BaseCommand):
help="If set, only the database will be exported, not files",
)
parser.add_argument(
"--passphrase",
help="If provided, is used to decrypt mail account passwords in the export",
)
def pre_check(self) -> None:
"""
Runs some initial checks against the source directory, including looking for
common mistakes like having files still and users other than expected
Runs some initial checks against the state of the install and source, including:
- Does the target exist?
- Can we access the target?
- Does the target have a manifest file?
- Are there existing files in the document folders?
- Are there existing users or documents in the database?
"""
def pre_check_maybe_not_empty():
# Skip this check if operating only on the database
# We can expect data to exist in that case
if not self.data_only:
for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
if document_dir.exists() and document_dir.is_dir():
for entry in document_dir.glob("**/*"):
if entry.is_dir():
continue
self.stdout.write(
self.style.WARNING(
f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
),
)
break
# But existing users or other data still matters in a data only
if (
User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count()
!= 0
):
self.stdout.write(
self.style.WARNING(
"Found existing user(s), this might indicate a non-empty installation",
),
)
if Document.objects.count() != 0:
self.stdout.write(
self.style.WARNING(
"Found existing documents(s), this might indicate a non-empty installation",
),
)
def pre_check_manifest_exists():
if not (self.source / "manifest.json").exists():
raise CommandError(
"That directory doesn't appear to contain a manifest.json file.",
)
if not self.source.exists():
raise CommandError("That path doesn't exist")
if not os.access(self.source, os.R_OK):
raise CommandError("That path doesn't appear to be readable")
# Skip this check if operating only on the database
# We can expect data to exist in that case
if not self.data_only:
for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
if document_dir.exists() and document_dir.is_dir():
for entry in document_dir.glob("**/*"):
if entry.is_dir():
continue
self.stdout.write(
self.style.WARNING(
f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
),
)
break
if (
User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count()
!= 0
):
pre_check_maybe_not_empty()
pre_check_manifest_exists()
def load_manifest_files(self) -> None:
"""
Loads manifest data from the various JSON files for parsing and loading the database
"""
main_manifest_path = self.source / "manifest.json"
with main_manifest_path.open() as infile:
self.manifest = json.load(infile)
self.manifest_paths.append(main_manifest_path)
for file in Path(self.source).glob("**/*-manifest.json"):
with file.open() as infile:
self.manifest += json.load(infile)
self.manifest_paths.append(file)
def load_metadata(self) -> None:
"""
Loads either just the version information or the version information and extra data
Must account for the old style of export as well, with just version.json
"""
version_path = self.source / "version.json"
metadata_path = self.source / "metadata.json"
if not version_path.exists() and not metadata_path.exists():
self.stdout.write(
self.style.NOTICE("No version.json or metadata.json file located"),
)
return
if version_path.exists():
with version_path.open() as infile:
self.version = json.load(infile)["version"]
elif metadata_path.exists():
with metadata_path.open() as infile:
data = json.load(infile)
self.version = data["version"]
if not self.passphrase and EXPORTER_SALT_NAME in data:
raise CommandError(
"No passphrase was given, but this export contains encrypted fields",
)
elif EXPORTER_SALT_NAME in data:
self.salt = data[EXPORTER_SALT_NAME]
if self.version and self.version != version.__full_version_str__:
self.stdout.write(
self.style.WARNING(
"Found existing user(s), this might indicate a non-empty installation",
),
)
if Document.objects.count() != 0:
self.stdout.write(
self.style.WARNING(
"Found existing documents(s), this might indicate a non-empty installation",
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}."
" Continuing, but import may fail.",
),
)
def load_data_to_database(self) -> None:
"""
As the name implies, loads data from the JSON file(s) into the database
"""
try:
with transaction.atomic():
# delete these since pk can change, re-created from import
ContentType.objects.all().delete()
Permission.objects.all().delete()
for manifest_path in self.manifest_paths:
call_command("loaddata", manifest_path)
except (FieldDoesNotExist, DeserializationError, IntegrityError) as e:
self.stdout.write(self.style.ERROR("Database import failed"))
if (
self.version is not None
and self.version != version.__full_version_str__
):
self.stdout.write(
self.style.ERROR(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}",
),
)
raise e
else:
self.stdout.write(
self.style.ERROR("No version information present"),
)
raise e
def handle(self, *args, **options):
logging.getLogger().handlers[0].level = logging.ERROR
self.source = Path(options["source"]).resolve()
self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"]
self.passphrase: str | None = options.get("passphrase")
self.version: Optional[str] = None
self.salt: Optional[str] = None
self.manifest_paths = []
self.manifest = []
self.pre_check()
manifest_paths = []
self.load_manifest_files()
main_manifest_path = self.source / "manifest.json"
self.load_metadata()
self._check_manifest_exists(main_manifest_path)
self.check_manifest_validity()
with main_manifest_path.open() as infile:
self.manifest = json.load(infile)
manifest_paths.append(main_manifest_path)
for file in Path(self.source).glob("**/*-manifest.json"):
with file.open() as infile:
self.manifest += json.load(infile)
manifest_paths.append(file)
version_path = self.source / "version.json"
if version_path.exists():
with version_path.open() as infile:
self.version = json.load(infile)["version"]
# Provide an initial warning if needed to the user
if self.version != version.__full_version_str__:
self.stdout.write(
self.style.WARNING(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}."
" Continuing, but import may fail.",
),
)
else:
self.stdout.write(self.style.NOTICE("No version.json file located"))
if not self.data_only:
self._check_manifest_files_valid()
self.decrypt_secret_fields()
with (
disable_signal(
@ -181,32 +265,7 @@ class Command(BaseCommand):
auditlog.unregister(CustomFieldInstance)
# Fill up the database with whatever is in the manifest
try:
with transaction.atomic():
# delete these since pk can change, re-created from import
ContentType.objects.all().delete()
Permission.objects.all().delete()
for manifest_path in manifest_paths:
call_command("loaddata", manifest_path)
except (FieldDoesNotExist, DeserializationError, IntegrityError) as e:
self.stdout.write(self.style.ERROR("Database import failed"))
if (
self.version is not None
and self.version != version.__full_version_str__
):
self.stdout.write(
self.style.ERROR(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}",
),
)
raise e
else:
self.stdout.write(
self.style.ERROR("No version information present"),
)
raise e
self.load_data_to_database()
if not self.data_only:
self._import_files_from_manifest()
@ -220,30 +279,20 @@ class Command(BaseCommand):
no_progress_bar=self.no_progress_bar,
)
@staticmethod
def _check_manifest_exists(path: Path):
if not path.exists():
raise CommandError(
"That directory doesn't appear to contain a manifest.json file.",
)
def _check_manifest_files_valid(self):
def check_manifest_validity(self):
"""
Attempts to verify the manifest is valid. Namely checking the files
referred to exist and the files can be read from
"""
self.stdout.write("Checking the manifest")
for record in self.manifest:
if record["model"] != "documents.document":
continue
if EXPORTER_FILE_NAME not in record:
def check_document_validity(document_record: dict):
if EXPORTER_FILE_NAME not in document_record:
raise CommandError(
"The manifest file contains a record which does not "
"refer to an actual document file.",
)
doc_file = record[EXPORTER_FILE_NAME]
doc_file = document_record[EXPORTER_FILE_NAME]
doc_path: Path = self.source / doc_file
if not doc_path.exists():
raise CommandError(
@ -258,8 +307,8 @@ class Command(BaseCommand):
f"Failed to read from original file {doc_path}",
) from e
if EXPORTER_ARCHIVE_NAME in record:
archive_file = record[EXPORTER_ARCHIVE_NAME]
if EXPORTER_ARCHIVE_NAME in document_record:
archive_file = document_record[EXPORTER_ARCHIVE_NAME]
doc_archive_path: Path = self.source / archive_file
if not doc_archive_path.exists():
raise CommandError(
@ -274,6 +323,21 @@ class Command(BaseCommand):
f"Failed to read from archive file {doc_archive_path}",
) from e
def check_acount_account_valid(mail_account_record: dict):
if EXPORTER_SALT_NAME in mail_account_record and not self.passphrase:
raise CommandError(
"The manifest file contains encrypted mail account passwords, but no passphrase was provided",
)
self.stdout.write("Checking the manifest")
for record in self.manifest:
# Only check if the document files exist if this is not data only
# We don't care about documents for a data only import
if not self.data_only and record["model"] == "documents.document":
check_document_validity(record)
elif record["model"] == "paperless_mail.mailaccount":
check_acount_account_valid(record)
def _import_files_from_manifest(self):
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
@ -339,3 +403,13 @@ class Command(BaseCommand):
copy_file_with_basic_stats(archive_path, document.archive_path)
document.save()
def decrypt_secret_fields(self) -> None:
""" """
if self.passphrase:
# Salt has been loaded from metadata.json at this point, so it cannot be None
self.setup_crypto(self.salt)
for record in self.manifest:
if record["model"] == "paperless_mail.mailaccount":
record["password"] = self.decrypt_field(record["password"])

View File

@ -1,6 +1,12 @@
import base64
import os
from argparse import ArgumentParser
from typing import Final
from typing import Optional
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from django.core.management import CommandError
@ -41,3 +47,45 @@ class ProgressBarMixin:
def handle_progress_bar_mixin(self, *args, **options):
self.no_progress_bar = options["no_progress_bar"]
self.use_progress_bar = not self.no_progress_bar
class SecurityMixin:
"""
https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet
"""
# This matches to Django's default for now
# https://github.com/django/django/blob/adae61942/django/contrib/auth/hashers.py#L315
KEY_ITERATIONS: Final[int] = 1_000_000
def setup_crypto(self, salt: Optional[str]):
self.salt = salt or os.urandom(16).hex()
self.fernet = self.get_fernet(self.passphrase, self.salt)
def get_fernet(self, passphrase: str, salt: str) -> Fernet:
"""
Constructs a class for encryption or decryption using the specified passphrase and salt
Salt is assumed to be a hexadecimal representation of a cryptographically secure random byte string
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=bytes.fromhex(salt),
iterations=480000,
)
key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode()))
return Fernet(key)
def encrypt_field(self, value: str) -> str:
"""
Given a string field value, encrypts it and returns the hexadecimal representation of the encrypted token
"""
return self.fernet.encrypt(value.encode("utf-8")).hex()
def decrypt_field(self, value: str) -> str:
"""
Given a string field value, decrypts it and returns the original value of the field
"""
return self.fernet.decrypt(bytes.fromhex(value)).decode("utf-8")

View File

@ -3,3 +3,4 @@
EXPORTER_FILE_NAME = "__exported_file_name__"
EXPORTER_THUMBNAIL_NAME = "__exported_thumbnail_name__"
EXPORTER_ARCHIVE_NAME = "__exported_archive_name__"
EXPORTER_SALT_NAME = "__salt_hex__"

View File

@ -126,14 +126,14 @@ class TestCommandImport(
},
]
with self.assertRaises(CommandError) as cm:
cmd._check_manifest_files_valid()
cmd.check_manifest_validity()
self.assertInt("Failed to read from original file", str(cm.exception))
original_path.chmod(0o444)
archive_path.chmod(0o222)
with self.assertRaises(CommandError) as cm:
cmd._check_manifest_files_valid()
cmd.check_manifest_validity()
self.assertInt("Failed to read from archive file", str(cm.exception))
def test_import_source_not_existing(self):