Add option to delete original documents after merge
This commit is contained in:
@@ -234,7 +234,11 @@ def rotate(doc_ids: list[int], degrees: int):
|
||||
return "OK"
|
||||
|
||||
|
||||
def merge(doc_ids: list[int], metadata_document_id: Optional[int] = None):
|
||||
def merge(
|
||||
doc_ids: list[int],
|
||||
metadata_document_id: Optional[int] = None,
|
||||
delete_originals: bool = False,
|
||||
):
|
||||
logger.info(
|
||||
f"Attempting to merge {len(doc_ids)} documents into a single document.",
|
||||
)
|
||||
@@ -285,9 +289,20 @@ def merge(doc_ids: list[int], metadata_document_id: Optional[int] = None):
|
||||
overrides,
|
||||
)
|
||||
|
||||
if delete_originals:
|
||||
logger.info("Removing original documents after merge")
|
||||
delete(affected_docs)
|
||||
|
||||
return "OK"
|
||||
|
||||
|
||||
def merge_and_delete_originals(
|
||||
doc_ids: list[int],
|
||||
metadata_document_id: Optional[int] = None,
|
||||
):
|
||||
return merge(doc_ids, metadata_document_id, True)
|
||||
|
||||
|
||||
def split(doc_ids: list[int], pages: list[list[int]]):
|
||||
logger.info(
|
||||
f"Attempting to split document {doc_ids[0]} into {len(pages)} documents",
|
||||
|
||||
@@ -943,6 +943,7 @@ class BulkEditSerializer(
|
||||
"set_permissions",
|
||||
"rotate",
|
||||
"merge",
|
||||
"merge_and_delete_originals",
|
||||
"split",
|
||||
"delete_pages",
|
||||
],
|
||||
@@ -999,6 +1000,8 @@ class BulkEditSerializer(
|
||||
return bulk_edit.rotate
|
||||
elif method == "merge":
|
||||
return bulk_edit.merge
|
||||
elif method == "merge_and_delete_originals":
|
||||
return bulk_edit.merge_and_delete_originals
|
||||
elif method == "split":
|
||||
return bulk_edit.split
|
||||
elif method == "delete_pages":
|
||||
|
||||
@@ -376,6 +376,45 @@ class TestPDFActions(DirectoriesMixin, TestCase):
|
||||
/ "0000003.pdf",
|
||||
sample3,
|
||||
)
|
||||
|
||||
sample4 = self.dirs.scratch_dir / "sample4.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000001.pdf",
|
||||
sample4,
|
||||
)
|
||||
sample4_archive = self.dirs.archive_dir / "sample4_archive.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000001.pdf",
|
||||
sample4_archive,
|
||||
)
|
||||
|
||||
sample5 = self.dirs.scratch_dir / "sample5.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000002.pdf",
|
||||
sample5,
|
||||
)
|
||||
sample5_archive = self.dirs.archive_dir / "sample5_archive.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000002.pdf",
|
||||
sample5_archive,
|
||||
)
|
||||
|
||||
self.doc1 = Document.objects.create(
|
||||
checksum="A",
|
||||
title="A",
|
||||
@@ -410,6 +449,24 @@ class TestPDFActions(DirectoriesMixin, TestCase):
|
||||
mime_type="image/jpeg",
|
||||
)
|
||||
|
||||
self.doc1_delete_after_merge = Document.objects.create(
|
||||
checksum="Ad",
|
||||
title="Adelete",
|
||||
filename=sample4,
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
self.doc1_delete_after_merge.archive_filename = sample4_archive
|
||||
self.doc1_delete_after_merge.save()
|
||||
|
||||
self.doc2_delete_after_merge = Document.objects.create(
|
||||
checksum="Bd",
|
||||
title="Bdelete",
|
||||
filename=sample5,
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
self.doc2_delete_after_merge.archive_filename = sample5_archive
|
||||
self.doc2_delete_after_merge.save()
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
def test_merge(self, mock_consume_file):
|
||||
"""
|
||||
@@ -444,6 +501,41 @@ class TestPDFActions(DirectoriesMixin, TestCase):
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
def test_merge_and_delete_originals(self, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Merge action with deleting documents is called with 2 documents
|
||||
THEN:
|
||||
- Consume file should be called
|
||||
- Documents should be deleted
|
||||
"""
|
||||
doc_ids = [self.doc1_delete_after_merge.id, self.doc2_delete_after_merge.id]
|
||||
|
||||
result = bulk_edit.merge_and_delete_originals(doc_ids)
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
expected_filename = (
|
||||
f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf"
|
||||
)
|
||||
|
||||
mock_consume_file.assert_called()
|
||||
|
||||
consume_file_args, _ = mock_consume_file.call_args
|
||||
self.assertEqual(
|
||||
Path(consume_file_args[0].original_file).name,
|
||||
expected_filename,
|
||||
)
|
||||
self.assertEqual(consume_file_args[1].title, None)
|
||||
|
||||
with self.assertRaises(Document.DoesNotExist):
|
||||
Document.objects.get(id=self.doc1_delete_after_merge.id)
|
||||
|
||||
with self.assertRaises(Document.DoesNotExist):
|
||||
Document.objects.get(id=self.doc2_delete_after_merge.id)
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
@mock.patch("pikepdf.open")
|
||||
def test_merge_with_errors(self, mock_open_pdf, mock_consume_file):
|
||||
|
||||
@@ -968,7 +968,12 @@ class BulkEditView(PassUserMixin):
|
||||
has_perms = (
|
||||
all((doc.owner == user or doc.owner is None) for doc in document_objs)
|
||||
if method
|
||||
in [bulk_edit.set_permissions, bulk_edit.delete, bulk_edit.rotate]
|
||||
in [
|
||||
bulk_edit.set_permissions,
|
||||
bulk_edit.delete,
|
||||
bulk_edit.rotate,
|
||||
bulk_edit.merge_and_delete_originals,
|
||||
]
|
||||
else all(
|
||||
has_perms_owner_aware(user, "change_document", doc)
|
||||
for doc in document_objs
|
||||
|
||||
Reference in New Issue
Block a user