From d04b0f996d31629206883580fc844fb1951902bb Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Fri, 31 May 2024 09:28:05 -0700 Subject: [PATCH] Finishes up --- .../management/commands/document_exporter.py | 196 +++++++++++------- .../management/commands/document_importer.py | 26 +-- .../tests/test_management_exporter.py | 14 ++ .../tests/test_management_importer.py | 8 +- src/documents/tests/utils.py | 10 +- 5 files changed, 152 insertions(+), 102 deletions(-) diff --git a/src/documents/management/commands/document_exporter.py b/src/documents/management/commands/document_exporter.py index 50b411bda..19647b891 100644 --- a/src/documents/management/commands/document_exporter.py +++ b/src/documents/management/commands/document_exporter.py @@ -299,89 +299,22 @@ class Command(BaseCommand): document = document_map[document_dict["pk"]] # 3.2. generate a unique filename - filename_counter = 0 - while True: - if self.use_filename_format: - base_name = generate_filename( - document, - counter=filename_counter, - append_gpg=False, - ) - else: - base_name = document.get_public_filename(counter=filename_counter) - - if base_name not in self.exported_files: - self.exported_files.add(base_name) - break - else: - filename_counter += 1 + base_name = self.generate_base_name(document) # 3.3. write filenames into manifest - original_name = base_name - if self.use_folder_prefix: - original_name = os.path.join("originals", original_name) - original_target = (self.target / Path(original_name)).resolve() - document_dict[EXPORTER_FILE_NAME] = original_name - - if not self.no_thumbnail: - thumbnail_name = base_name + "-thumbnail.webp" - if self.use_folder_prefix: - thumbnail_name = os.path.join("thumbnails", thumbnail_name) - thumbnail_target = (self.target / Path(thumbnail_name)).resolve() - document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name - else: - thumbnail_target = None - - if not self.no_archive and document.has_archive_version: - archive_name = base_name + "-archive.pdf" - if self.use_folder_prefix: - archive_name = os.path.join("archive", archive_name) - archive_target = (self.target / Path(archive_name)).resolve() - document_dict[EXPORTER_ARCHIVE_NAME] = archive_name - else: - archive_target = None + original_target, thumbnail_target, archive_target = ( + self.generate_document_targets(document, base_name, document_dict) + ) # 3.4. write files to target folder - if document.storage_type == Document.STORAGE_TYPE_GPG: - t = int(time.mktime(document.created.timetuple())) - - original_target.parent.mkdir(parents=True, exist_ok=True) - with document.source_file as out_file: - original_target.write_bytes(GnuPG.decrypted(out_file)) - os.utime(original_target, times=(t, t)) - - if thumbnail_target: - thumbnail_target.parent.mkdir(parents=True, exist_ok=True) - with document.thumbnail_file as out_file: - thumbnail_target.write_bytes(GnuPG.decrypted(out_file)) - os.utime(thumbnail_target, times=(t, t)) - - if archive_target: - archive_target.parent.mkdir(parents=True, exist_ok=True) - if TYPE_CHECKING: - assert isinstance(document.archive_path, Path) - with document.archive_path as out_file: - archive_target.write_bytes(GnuPG.decrypted(out_file)) - os.utime(archive_target, times=(t, t)) - else: - self.check_and_copy( - document.source_path, - document.checksum, + if not self.data_only: + self.copy_document_files( + document, original_target, + thumbnail_target, + archive_target, ) - if thumbnail_target: - self.check_and_copy(document.thumbnail_path, None, thumbnail_target) - - if archive_target: - if TYPE_CHECKING: - assert isinstance(document.archive_path, Path) - self.check_and_copy( - document.archive_path, - document.archive_checksum, - archive_target, - ) - if self.split_manifest: manifest_name = Path(base_name + "-manifest.json") if self.use_folder_prefix: @@ -457,6 +390,115 @@ class Command(BaseCommand): else: item.unlink() + def generate_base_name(self, document: Document) -> str: + """ + Generates a unique name for the document, one which hasn't already been exported (or will be) + """ + filename_counter = 0 + while True: + if self.use_filename_format: + base_name = generate_filename( + document, + counter=filename_counter, + append_gpg=False, + ) + else: + base_name = document.get_public_filename(counter=filename_counter) + + if base_name not in self.exported_files: + self.exported_files.add(base_name) + break + else: + filename_counter += 1 + return base_name + + def generate_document_targets( + self, + document: Document, + base_name: str, + document_dict: dict, + ) -> tuple[Path, Optional[Path], Optional[Path]]: + """ + Generates the targets for a given document, including the original file, archive file and thumbnail (depending on settings). + """ + original_name = base_name + if self.use_folder_prefix: + original_name = os.path.join("originals", original_name) + original_target = (self.target / Path(original_name)).resolve() + document_dict[EXPORTER_FILE_NAME] = original_name + + if not self.no_thumbnail: + thumbnail_name = base_name + "-thumbnail.webp" + if self.use_folder_prefix: + thumbnail_name = os.path.join("thumbnails", thumbnail_name) + thumbnail_target = (self.target / Path(thumbnail_name)).resolve() + document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name + else: + thumbnail_target = None + + if not self.no_archive and document.has_archive_version: + archive_name = base_name + "-archive.pdf" + if self.use_folder_prefix: + archive_name = os.path.join("archive", archive_name) + archive_target = (self.target / Path(archive_name)).resolve() + document_dict[EXPORTER_ARCHIVE_NAME] = archive_name + else: + archive_target = None + + return original_target, thumbnail_target, archive_target + + def copy_document_files( + self, + document: Document, + original_target: Path, + thumbnail_target: Optional[Path], + archive_target: Optional[Path], + ) -> None: + """ + Copies files from the document storage location to the specified target location. + + If the document is encrypted, the files are decrypted before copying them to the target location. + """ + if document.storage_type == Document.STORAGE_TYPE_GPG: + t = int(time.mktime(document.created.timetuple())) + + original_target.parent.mkdir(parents=True, exist_ok=True) + with document.source_file as out_file: + original_target.write_bytes(GnuPG.decrypted(out_file)) + os.utime(original_target, times=(t, t)) + + if thumbnail_target: + thumbnail_target.parent.mkdir(parents=True, exist_ok=True) + with document.thumbnail_file as out_file: + thumbnail_target.write_bytes(GnuPG.decrypted(out_file)) + os.utime(thumbnail_target, times=(t, t)) + + if archive_target: + archive_target.parent.mkdir(parents=True, exist_ok=True) + if TYPE_CHECKING: + assert isinstance(document.archive_path, Path) + with document.archive_path as out_file: + archive_target.write_bytes(GnuPG.decrypted(out_file)) + os.utime(archive_target, times=(t, t)) + else: + self.check_and_copy( + document.source_path, + document.checksum, + original_target, + ) + + if thumbnail_target: + self.check_and_copy(document.thumbnail_path, None, thumbnail_target) + + if archive_target: + if TYPE_CHECKING: + assert isinstance(document.archive_path, Path) + self.check_and_copy( + document.archive_path, + document.archive_checksum, + archive_target, + ) + def check_and_copy( self, source: Path, @@ -467,8 +509,6 @@ class Command(BaseCommand): Copies the source to the target, if target doesn't exist or the target doesn't seem to match the source attributes """ - if self.data_only: - return target = target.resolve() if target in self.files_in_export_dir: diff --git a/src/documents/management/commands/document_importer.py b/src/documents/management/commands/document_importer.py index 9afc343c4..1ce7e5f1c 100644 --- a/src/documents/management/commands/document_importer.py +++ b/src/documents/management/commands/document_importer.py @@ -72,12 +72,6 @@ class Command(BaseCommand): help="If set, only the database will be exported, not files", ) - def __init__(self, *args, **kwargs): - BaseCommand.__init__(self, *args, **kwargs) - self.source = None - self.manifest = None - self.version = None - def pre_check(self) -> None: """ Runs some initial checks against the source directory, including looking for @@ -162,7 +156,8 @@ class Command(BaseCommand): else: self.stdout.write(self.style.NOTICE("No version.json file located")) - self._check_manifest_valid() + if not self.data_only: + self._check_manifest_valid() with ( disable_signal( @@ -214,8 +209,7 @@ class Command(BaseCommand): raise e if not self.data_only: - self._import_files_from_manifest(options["no_progress_bar"]) - + self._import_files_from_manifest() else: self.stdout.write(self.style.NOTICE("Data only import completed")) @@ -223,7 +217,7 @@ class Command(BaseCommand): call_command( "document_index", "reindex", - no_progress_bar=options["no_progress_bar"], + no_progress_bar=self.no_progress_bar, ) @staticmethod @@ -257,8 +251,8 @@ class Command(BaseCommand): "appear to be in the source directory.", ) try: - with doc_path.open(mode="rb") as infile: - infile.read(1) + with doc_path.open(mode="rb"): + pass except Exception as e: raise CommandError( f"Failed to read from original file {doc_path}", @@ -273,14 +267,14 @@ class Command(BaseCommand): f"does not appear to be in the source directory.", ) try: - with doc_archive_path.open(mode="rb") as infile: - infile.read(1) + with doc_archive_path.open(mode="rb"): + pass except Exception as e: raise CommandError( f"Failed to read from archive file {doc_archive_path}", ) from e - def _import_files_from_manifest(self, progress_bar_disable): + def _import_files_from_manifest(self): settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True) settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True) settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) @@ -291,7 +285,7 @@ class Command(BaseCommand): filter(lambda r: r["model"] == "documents.document", self.manifest), ) - for record in tqdm.tqdm(manifest_documents, disable=progress_bar_disable): + for record in tqdm.tqdm(manifest_documents, disable=self.no_progress_bar): document = Document.objects.get(pk=record["pk"]) doc_file = record[EXPORTER_FILE_NAME] diff --git a/src/documents/tests/test_management_exporter.py b/src/documents/tests/test_management_exporter.py index 0a191c0f7..6d7eff980 100644 --- a/src/documents/tests/test_management_exporter.py +++ b/src/documents/tests/test_management_exporter.py @@ -826,3 +826,17 @@ class TestExportImport( # Manifest and version files only should be present in the exported directory self.assertFileCountInDir(self.target, 2) + self.assertIsFile(self.target / "manifest.json") + self.assertIsFile(self.target / "version.json") + + shutil.rmtree(self.dirs.media_dir / "documents") + Document.objects.all().delete() + + call_command( + "document_importer", + "--no-progress-bar", + "--data-only", + self.target, + ) + + self.assertEqual(Document.objects.all().count(), 4) diff --git a/src/documents/tests/test_management_importer.py b/src/documents/tests/test_management_importer.py index c0d155d02..aa76ffadb 100644 --- a/src/documents/tests/test_management_importer.py +++ b/src/documents/tests/test_management_importer.py @@ -14,9 +14,15 @@ from documents.settings import EXPORTER_ARCHIVE_NAME from documents.settings import EXPORTER_FILE_NAME from documents.tests.utils import DirectoriesMixin from documents.tests.utils import FileSystemAssertsMixin +from documents.tests.utils import SampleDirMixin -class TestCommandImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): +class TestCommandImport( + DirectoriesMixin, + FileSystemAssertsMixin, + SampleDirMixin, + TestCase, +): def test_check_manifest_exists(self): """ GIVEN: diff --git a/src/documents/tests/utils.py b/src/documents/tests/utils.py index 2243fa557..4ec0851df 100644 --- a/src/documents/tests/utils.py +++ b/src/documents/tests/utils.py @@ -156,10 +156,6 @@ class DirectoriesMixin: they are cleaned up on exit """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.dirs = None - def setUp(self) -> None: self.dirs = setup_directories() super().setUp() @@ -203,11 +199,11 @@ class FileSystemAssertsMixin: def assertFileCountInDir(self, path: Union[PathLike, str], count: int): path = Path(path).resolve() self.assertTrue(path.is_dir(), f"Path {path} is not a directory") - file_count = len([x for x in path.iterdir() if x.is_file()]) + files = [x for x in path.iterdir() if x.is_file()] self.assertEqual( - file_count, + len(files), count, - f"Path {path} contains {file_count} files instead of {count} file", + f"Path {path} contains {len(files)} files instead of {count} files", )