Finishes up

This commit is contained in:
Trenton H 2024-05-31 09:28:05 -07:00
parent 9299a6983a
commit d04b0f996d
5 changed files with 152 additions and 102 deletions

View File

@ -299,89 +299,22 @@ class Command(BaseCommand):
document = document_map[document_dict["pk"]] document = document_map[document_dict["pk"]]
# 3.2. generate a unique filename # 3.2. generate a unique filename
filename_counter = 0 base_name = self.generate_base_name(document)
while True:
if self.use_filename_format:
base_name = generate_filename(
document,
counter=filename_counter,
append_gpg=False,
)
else:
base_name = document.get_public_filename(counter=filename_counter)
if base_name not in self.exported_files:
self.exported_files.add(base_name)
break
else:
filename_counter += 1
# 3.3. write filenames into manifest # 3.3. write filenames into manifest
original_name = base_name original_target, thumbnail_target, archive_target = (
if self.use_folder_prefix: self.generate_document_targets(document, base_name, document_dict)
original_name = os.path.join("originals", original_name) )
original_target = (self.target / Path(original_name)).resolve()
document_dict[EXPORTER_FILE_NAME] = original_name
if not self.no_thumbnail:
thumbnail_name = base_name + "-thumbnail.webp"
if self.use_folder_prefix:
thumbnail_name = os.path.join("thumbnails", thumbnail_name)
thumbnail_target = (self.target / Path(thumbnail_name)).resolve()
document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
else:
thumbnail_target = None
if not self.no_archive and document.has_archive_version:
archive_name = base_name + "-archive.pdf"
if self.use_folder_prefix:
archive_name = os.path.join("archive", archive_name)
archive_target = (self.target / Path(archive_name)).resolve()
document_dict[EXPORTER_ARCHIVE_NAME] = archive_name
else:
archive_target = None
# 3.4. write files to target folder # 3.4. write files to target folder
if document.storage_type == Document.STORAGE_TYPE_GPG: if not self.data_only:
t = int(time.mktime(document.created.timetuple())) self.copy_document_files(
document,
original_target.parent.mkdir(parents=True, exist_ok=True)
with document.source_file as out_file:
original_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(original_target, times=(t, t))
if thumbnail_target:
thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
with document.thumbnail_file as out_file:
thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(thumbnail_target, times=(t, t))
if archive_target:
archive_target.parent.mkdir(parents=True, exist_ok=True)
if TYPE_CHECKING:
assert isinstance(document.archive_path, Path)
with document.archive_path as out_file:
archive_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(archive_target, times=(t, t))
else:
self.check_and_copy(
document.source_path,
document.checksum,
original_target, original_target,
thumbnail_target,
archive_target,
) )
if thumbnail_target:
self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
if archive_target:
if TYPE_CHECKING:
assert isinstance(document.archive_path, Path)
self.check_and_copy(
document.archive_path,
document.archive_checksum,
archive_target,
)
if self.split_manifest: if self.split_manifest:
manifest_name = Path(base_name + "-manifest.json") manifest_name = Path(base_name + "-manifest.json")
if self.use_folder_prefix: if self.use_folder_prefix:
@ -457,6 +390,115 @@ class Command(BaseCommand):
else: else:
item.unlink() item.unlink()
def generate_base_name(self, document: Document) -> str:
"""
Generates a unique name for the document, one which hasn't already been exported (or will be)
"""
filename_counter = 0
while True:
if self.use_filename_format:
base_name = generate_filename(
document,
counter=filename_counter,
append_gpg=False,
)
else:
base_name = document.get_public_filename(counter=filename_counter)
if base_name not in self.exported_files:
self.exported_files.add(base_name)
break
else:
filename_counter += 1
return base_name
def generate_document_targets(
self,
document: Document,
base_name: str,
document_dict: dict,
) -> tuple[Path, Optional[Path], Optional[Path]]:
"""
Generates the targets for a given document, including the original file, archive file and thumbnail (depending on settings).
"""
original_name = base_name
if self.use_folder_prefix:
original_name = os.path.join("originals", original_name)
original_target = (self.target / Path(original_name)).resolve()
document_dict[EXPORTER_FILE_NAME] = original_name
if not self.no_thumbnail:
thumbnail_name = base_name + "-thumbnail.webp"
if self.use_folder_prefix:
thumbnail_name = os.path.join("thumbnails", thumbnail_name)
thumbnail_target = (self.target / Path(thumbnail_name)).resolve()
document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
else:
thumbnail_target = None
if not self.no_archive and document.has_archive_version:
archive_name = base_name + "-archive.pdf"
if self.use_folder_prefix:
archive_name = os.path.join("archive", archive_name)
archive_target = (self.target / Path(archive_name)).resolve()
document_dict[EXPORTER_ARCHIVE_NAME] = archive_name
else:
archive_target = None
return original_target, thumbnail_target, archive_target
def copy_document_files(
self,
document: Document,
original_target: Path,
thumbnail_target: Optional[Path],
archive_target: Optional[Path],
) -> None:
"""
Copies files from the document storage location to the specified target location.
If the document is encrypted, the files are decrypted before copying them to the target location.
"""
if document.storage_type == Document.STORAGE_TYPE_GPG:
t = int(time.mktime(document.created.timetuple()))
original_target.parent.mkdir(parents=True, exist_ok=True)
with document.source_file as out_file:
original_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(original_target, times=(t, t))
if thumbnail_target:
thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
with document.thumbnail_file as out_file:
thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(thumbnail_target, times=(t, t))
if archive_target:
archive_target.parent.mkdir(parents=True, exist_ok=True)
if TYPE_CHECKING:
assert isinstance(document.archive_path, Path)
with document.archive_path as out_file:
archive_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(archive_target, times=(t, t))
else:
self.check_and_copy(
document.source_path,
document.checksum,
original_target,
)
if thumbnail_target:
self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
if archive_target:
if TYPE_CHECKING:
assert isinstance(document.archive_path, Path)
self.check_and_copy(
document.archive_path,
document.archive_checksum,
archive_target,
)
def check_and_copy( def check_and_copy(
self, self,
source: Path, source: Path,
@ -467,8 +509,6 @@ class Command(BaseCommand):
Copies the source to the target, if target doesn't exist or the target doesn't seem to match Copies the source to the target, if target doesn't exist or the target doesn't seem to match
the source attributes the source attributes
""" """
if self.data_only:
return
target = target.resolve() target = target.resolve()
if target in self.files_in_export_dir: if target in self.files_in_export_dir:

View File

@ -72,12 +72,6 @@ class Command(BaseCommand):
help="If set, only the database will be exported, not files", help="If set, only the database will be exported, not files",
) )
def __init__(self, *args, **kwargs):
BaseCommand.__init__(self, *args, **kwargs)
self.source = None
self.manifest = None
self.version = None
def pre_check(self) -> None: def pre_check(self) -> None:
""" """
Runs some initial checks against the source directory, including looking for Runs some initial checks against the source directory, including looking for
@ -162,7 +156,8 @@ class Command(BaseCommand):
else: else:
self.stdout.write(self.style.NOTICE("No version.json file located")) self.stdout.write(self.style.NOTICE("No version.json file located"))
self._check_manifest_valid() if not self.data_only:
self._check_manifest_valid()
with ( with (
disable_signal( disable_signal(
@ -214,8 +209,7 @@ class Command(BaseCommand):
raise e raise e
if not self.data_only: if not self.data_only:
self._import_files_from_manifest(options["no_progress_bar"]) self._import_files_from_manifest()
else: else:
self.stdout.write(self.style.NOTICE("Data only import completed")) self.stdout.write(self.style.NOTICE("Data only import completed"))
@ -223,7 +217,7 @@ class Command(BaseCommand):
call_command( call_command(
"document_index", "document_index",
"reindex", "reindex",
no_progress_bar=options["no_progress_bar"], no_progress_bar=self.no_progress_bar,
) )
@staticmethod @staticmethod
@ -257,8 +251,8 @@ class Command(BaseCommand):
"appear to be in the source directory.", "appear to be in the source directory.",
) )
try: try:
with doc_path.open(mode="rb") as infile: with doc_path.open(mode="rb"):
infile.read(1) pass
except Exception as e: except Exception as e:
raise CommandError( raise CommandError(
f"Failed to read from original file {doc_path}", f"Failed to read from original file {doc_path}",
@ -273,14 +267,14 @@ class Command(BaseCommand):
f"does not appear to be in the source directory.", f"does not appear to be in the source directory.",
) )
try: try:
with doc_archive_path.open(mode="rb") as infile: with doc_archive_path.open(mode="rb"):
infile.read(1) pass
except Exception as e: except Exception as e:
raise CommandError( raise CommandError(
f"Failed to read from archive file {doc_archive_path}", f"Failed to read from archive file {doc_archive_path}",
) from e ) from e
def _import_files_from_manifest(self, progress_bar_disable): def _import_files_from_manifest(self):
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True) settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True) settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
@ -291,7 +285,7 @@ class Command(BaseCommand):
filter(lambda r: r["model"] == "documents.document", self.manifest), filter(lambda r: r["model"] == "documents.document", self.manifest),
) )
for record in tqdm.tqdm(manifest_documents, disable=progress_bar_disable): for record in tqdm.tqdm(manifest_documents, disable=self.no_progress_bar):
document = Document.objects.get(pk=record["pk"]) document = Document.objects.get(pk=record["pk"])
doc_file = record[EXPORTER_FILE_NAME] doc_file = record[EXPORTER_FILE_NAME]

View File

@ -826,3 +826,17 @@ class TestExportImport(
# Manifest and version files only should be present in the exported directory # Manifest and version files only should be present in the exported directory
self.assertFileCountInDir(self.target, 2) self.assertFileCountInDir(self.target, 2)
self.assertIsFile(self.target / "manifest.json")
self.assertIsFile(self.target / "version.json")
shutil.rmtree(self.dirs.media_dir / "documents")
Document.objects.all().delete()
call_command(
"document_importer",
"--no-progress-bar",
"--data-only",
self.target,
)
self.assertEqual(Document.objects.all().count(), 4)

View File

@ -14,9 +14,15 @@ from documents.settings import EXPORTER_ARCHIVE_NAME
from documents.settings import EXPORTER_FILE_NAME from documents.settings import EXPORTER_FILE_NAME
from documents.tests.utils import DirectoriesMixin from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin from documents.tests.utils import FileSystemAssertsMixin
from documents.tests.utils import SampleDirMixin
class TestCommandImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): class TestCommandImport(
DirectoriesMixin,
FileSystemAssertsMixin,
SampleDirMixin,
TestCase,
):
def test_check_manifest_exists(self): def test_check_manifest_exists(self):
""" """
GIVEN: GIVEN:

View File

@ -156,10 +156,6 @@ class DirectoriesMixin:
they are cleaned up on exit they are cleaned up on exit
""" """
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dirs = None
def setUp(self) -> None: def setUp(self) -> None:
self.dirs = setup_directories() self.dirs = setup_directories()
super().setUp() super().setUp()
@ -203,11 +199,11 @@ class FileSystemAssertsMixin:
def assertFileCountInDir(self, path: Union[PathLike, str], count: int): def assertFileCountInDir(self, path: Union[PathLike, str], count: int):
path = Path(path).resolve() path = Path(path).resolve()
self.assertTrue(path.is_dir(), f"Path {path} is not a directory") self.assertTrue(path.is_dir(), f"Path {path} is not a directory")
file_count = len([x for x in path.iterdir() if x.is_file()]) files = [x for x in path.iterdir() if x.is_file()]
self.assertEqual( self.assertEqual(
file_count, len(files),
count, count,
f"Path {path} contains {file_count} files instead of {count} file", f"Path {path} contains {len(files)} files instead of {count} files",
) )