Full testing and some fixes of pdf actions
This commit is contained in:
parent
713598d86b
commit
2004b424b5
@ -4,6 +4,7 @@ import logging
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from django.conf import settings
|
||||
from django.db.models import Q
|
||||
|
||||
from documents.data_models import ConsumableDocument
|
||||
@ -17,7 +18,6 @@ from documents.permissions import set_permissions_for_object
|
||||
from documents.tasks import bulk_update_documents
|
||||
from documents.tasks import consume_file
|
||||
from documents.tasks import update_document_archive_file
|
||||
from paperless import settings
|
||||
|
||||
logger = logging.getLogger("paperless.bulk_edit")
|
||||
|
||||
@ -160,6 +160,9 @@ def set_permissions(doc_ids, set_permissions, owner=None, merge=False):
|
||||
|
||||
|
||||
def rotate(doc_ids: list[int], degrees: int):
|
||||
logger.info(
|
||||
f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.",
|
||||
)
|
||||
qs = Document.objects.filter(id__in=doc_ids)
|
||||
affected_docs = []
|
||||
import pikepdf
|
||||
@ -175,10 +178,12 @@ def rotate(doc_ids: list[int], degrees: int):
|
||||
update_document_archive_file.delay(
|
||||
document_id=doc.id,
|
||||
)
|
||||
logger.info(f"Rotated document {doc.id} ({path}) by {degrees} degrees")
|
||||
logger.info(
|
||||
f"Rotated document {doc.id} by {degrees} degrees",
|
||||
)
|
||||
affected_docs.append(doc.id)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error rotating document {doc.id}", e)
|
||||
logger.exception(f"Error rotating document {doc.id}: {e}")
|
||||
|
||||
if len(affected_docs) > 0:
|
||||
bulk_update_documents.delay(document_ids=affected_docs)
|
||||
@ -187,30 +192,35 @@ def rotate(doc_ids: list[int], degrees: int):
|
||||
|
||||
|
||||
def merge(doc_ids: list[int], metadata_document_id: Optional[int] = None):
|
||||
logger.info(
|
||||
f"Attempting to merge {len(doc_ids)} documents into a single document.",
|
||||
)
|
||||
qs = Document.objects.filter(id__in=doc_ids)
|
||||
affected_docs = []
|
||||
import pikepdf
|
||||
|
||||
merged_pdf = pikepdf.new()
|
||||
# use doc_ids to preserve order
|
||||
for doc_id in doc_ids:
|
||||
doc = qs.get(id=doc_id)
|
||||
if doc is None:
|
||||
continue
|
||||
path = os.path.join(settings.ORIGINALS_DIR, str(doc.filename))
|
||||
try:
|
||||
with pikepdf.open(path, allow_overwriting_input=True) as pdf:
|
||||
with pikepdf.open(str(doc.source_path)) as pdf:
|
||||
merged_pdf.pages.extend(pdf.pages)
|
||||
affected_docs.append(doc.id)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"Error merging document {doc.id}, it will not be included in the merge",
|
||||
e,
|
||||
f"Error merging document {doc.id}, it will not be included in the merge: {e}",
|
||||
)
|
||||
if len(affected_docs) == 0:
|
||||
logger.warning("No documents were merged")
|
||||
return "OK"
|
||||
|
||||
filepath = os.path.join(
|
||||
settings.CONSUMPTION_DIR,
|
||||
f"merged_{('_'.join([str(doc_id) for doc_id in doc_ids]))[:100]}.pdf",
|
||||
settings.SCRATCH_DIR,
|
||||
f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf",
|
||||
)
|
||||
merged_pdf.save(filepath)
|
||||
merged_pdf.close()
|
||||
|
||||
if metadata_document_id:
|
||||
metadata_document = qs.get(id=metadata_document_id)
|
||||
@ -239,19 +249,18 @@ def split(doc_ids: list[int], pages: list[list[int]]):
|
||||
doc = Document.objects.get(id=doc_ids[0])
|
||||
import pikepdf
|
||||
|
||||
path = os.path.join(settings.ORIGINALS_DIR, str(doc.filename))
|
||||
try:
|
||||
with pikepdf.open(path, allow_overwriting_input=True) as pdf:
|
||||
with pikepdf.open(doc.source_path) as pdf:
|
||||
for idx, split_doc in enumerate(pages):
|
||||
dst = pikepdf.new()
|
||||
for page in split_doc:
|
||||
dst.pages.append(pdf.pages[page - 1])
|
||||
filepath = os.path.join(
|
||||
settings.CONSUMPTION_DIR,
|
||||
f"{doc.filename}_{split_doc[0]}-{split_doc[-1]}.pdf",
|
||||
settings.SCRATCH_DIR,
|
||||
f"{doc.id}_{split_doc[0]}-{split_doc[-1]}.pdf",
|
||||
)
|
||||
|
||||
dst.save(filepath)
|
||||
dst.close()
|
||||
|
||||
overrides = DocumentMetadataOverrides().from_document(doc)
|
||||
overrides.title = f"{doc.title} (split {idx + 1})"
|
||||
@ -266,6 +275,6 @@ def split(doc_ids: list[int], pages: list[list[int]]):
|
||||
overrides,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error splitting document {doc.id}", e)
|
||||
logger.exception(f"Error splitting document {doc.id}: {e}")
|
||||
|
||||
return "OK"
|
||||
|
@ -102,26 +102,30 @@ class DocumentMetadataOverrides:
|
||||
overrides.storage_path_id = doc.storage_path.id if doc.storage_path else None
|
||||
overrides.owner_id = doc.owner.id if doc.owner else None
|
||||
overrides.tag_ids = list(doc.tags.values_list("id", flat=True))
|
||||
|
||||
overrides.view_users = get_users_with_perms(
|
||||
doc,
|
||||
only_with_perms_in=["view_document"],
|
||||
).values_list("id", flat=True)
|
||||
overrides.view_groups = get_groups_with_perms(
|
||||
doc,
|
||||
only_with_perms_in=["view_document"],
|
||||
).values_list("id", flat=True)
|
||||
overrides.change_users = get_users_with_perms(
|
||||
doc,
|
||||
only_with_perms_in=["change_document"],
|
||||
).values_list("id", flat=True)
|
||||
overrides.change_groups = get_groups_with_perms(
|
||||
doc,
|
||||
only_with_perms_in=["change_document"],
|
||||
).values_list("id", flat=True)
|
||||
overrides.custom_field_ids = list(
|
||||
doc.custom_fields.values_list("id", flat=True),
|
||||
)
|
||||
|
||||
groups_with_perms = get_groups_with_perms(
|
||||
doc,
|
||||
attach_perms=True,
|
||||
)
|
||||
overrides.view_groups = [
|
||||
group.id for group, perms in groups_with_perms if "view_document" in perms
|
||||
]
|
||||
overrides.change_groups = [
|
||||
group.id for group, perms in groups_with_perms if "change_document" in perms
|
||||
]
|
||||
|
||||
return overrides
|
||||
|
||||
|
||||
|
@ -1,3 +1,6 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
from django.contrib.auth.models import Group
|
||||
@ -275,3 +278,232 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
self.doc1,
|
||||
)
|
||||
self.assertEqual(groups_with_perms.count(), 2)
|
||||
|
||||
|
||||
class TestPDFActions(DirectoriesMixin, TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
sample1 = os.path.join(self.dirs.scratch_dir, "sample.pdf")
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000001.pdf",
|
||||
),
|
||||
sample1,
|
||||
)
|
||||
sample1_archive = os.path.join(self.dirs.archive_dir, "sample_archive.pdf")
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000001.pdf",
|
||||
),
|
||||
sample1_archive,
|
||||
)
|
||||
sample2 = os.path.join(self.dirs.scratch_dir, "sample2.pdf")
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000002.pdf",
|
||||
),
|
||||
sample2,
|
||||
)
|
||||
sample2_archive = os.path.join(self.dirs.archive_dir, "sample2_archive.pdf")
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000002.pdf",
|
||||
),
|
||||
sample2_archive,
|
||||
)
|
||||
sample3 = os.path.join(self.dirs.scratch_dir, "sample3.pdf")
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000003.pdf",
|
||||
),
|
||||
sample3,
|
||||
)
|
||||
self.doc1 = Document.objects.create(checksum="A", title="A", filename=sample1)
|
||||
self.doc1.archive_filename = sample1_archive
|
||||
self.doc1.save()
|
||||
self.doc2 = Document.objects.create(checksum="B", title="B", filename=sample2)
|
||||
self.doc2.archive_filename = sample2_archive
|
||||
self.doc2.save()
|
||||
self.doc3 = Document.objects.create(checksum="C", title="C", filename=sample3)
|
||||
img_doc = os.path.join(self.dirs.scratch_dir, "sample_image.jpg")
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.jpg",
|
||||
),
|
||||
img_doc,
|
||||
)
|
||||
self.img_doc = Document.objects.create(
|
||||
checksum="D",
|
||||
title="D",
|
||||
filename=img_doc,
|
||||
)
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
def test_merge(self, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Merge action is called with 3 documents
|
||||
THEN:
|
||||
- Consume file should be called
|
||||
"""
|
||||
doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id]
|
||||
metadata_document_id = self.doc1.id
|
||||
|
||||
result = bulk_edit.merge(doc_ids)
|
||||
|
||||
expected_filename = (
|
||||
f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf"
|
||||
)
|
||||
|
||||
mock_consume_file.assert_called()
|
||||
consume_file_args, _ = mock_consume_file.call_args
|
||||
self.assertEqual(
|
||||
Path(consume_file_args[0].original_file).name,
|
||||
expected_filename,
|
||||
)
|
||||
self.assertEqual(consume_file_args[1].title, None)
|
||||
|
||||
# With metadata_document_id overrides
|
||||
result = bulk_edit.merge(doc_ids, metadata_document_id=metadata_document_id)
|
||||
consume_file_args, _ = mock_consume_file.call_args
|
||||
self.assertEqual(consume_file_args[1].title, "A (merged)")
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
@mock.patch("pikepdf.open")
|
||||
def test_merge_with_errors(self, mock_open_pdf, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Merge action is called with 2 documents
|
||||
- Error occurs when opening both files
|
||||
THEN:
|
||||
- Consume file should not be called
|
||||
"""
|
||||
mock_open_pdf.side_effect = Exception("Error opening PDF")
|
||||
doc_ids = [self.doc2.id, self.doc3.id]
|
||||
|
||||
with self.assertLogs("paperless.bulk_edit", level="ERROR") as cm:
|
||||
bulk_edit.merge(doc_ids)
|
||||
error_str = cm.output[0]
|
||||
expected_str = (
|
||||
"Error merging document 2, it will not be included in the merge"
|
||||
)
|
||||
self.assertIn(expected_str, error_str)
|
||||
|
||||
mock_consume_file.assert_not_called()
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
def test_split(self, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Split action is called with 1 document and 2 pages
|
||||
THEN:
|
||||
- Consume file should be called twice
|
||||
"""
|
||||
doc_ids = [self.doc2.id]
|
||||
pages = [[1, 2], [3]]
|
||||
result = bulk_edit.split(doc_ids, pages)
|
||||
self.assertEqual(mock_consume_file.call_count, 2)
|
||||
consume_file_args, _ = mock_consume_file.call_args
|
||||
self.assertEqual(consume_file_args[1].title, "B (split 2)")
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
@mock.patch("pikepdf.Pdf.save")
|
||||
def test_split_with_errors(self, mock_save_pdf, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Split action is called with 1 document and 2 page groups
|
||||
- Error occurs when saving the files
|
||||
THEN:
|
||||
- Consume file should not be called
|
||||
"""
|
||||
mock_save_pdf.side_effect = Exception("Error saving PDF")
|
||||
doc_ids = [self.doc2.id]
|
||||
pages = [[1, 2], [3]]
|
||||
|
||||
with self.assertLogs("paperless.bulk_edit", level="ERROR") as cm:
|
||||
bulk_edit.split(doc_ids, pages)
|
||||
error_str = cm.output[0]
|
||||
expected_str = "Error splitting document 2"
|
||||
self.assertIn(expected_str, error_str)
|
||||
|
||||
mock_consume_file.assert_not_called()
|
||||
|
||||
@mock.patch("documents.tasks.bulk_update_documents.delay")
|
||||
@mock.patch("documents.tasks.update_document_archive_file.delay")
|
||||
def test_rotate(self, mock_update_document, mock_update_documents):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Rotate action is called with 2 documents
|
||||
THEN:
|
||||
- Rotate action should be called twice
|
||||
"""
|
||||
doc_ids = [self.doc1.id, self.doc2.id]
|
||||
result = bulk_edit.rotate(doc_ids, 90)
|
||||
self.assertEqual(mock_update_document.call_count, 2)
|
||||
mock_update_documents.assert_called_once()
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
@mock.patch("documents.tasks.bulk_update_documents.delay")
|
||||
@mock.patch("documents.tasks.update_document_archive_file.delay")
|
||||
@mock.patch("pikepdf.Pdf.save")
|
||||
def test_rotate_with_error(
|
||||
self,
|
||||
mock_pdf_save,
|
||||
mock_update_archive_file,
|
||||
mock_update_documents,
|
||||
):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Rotate action is called with 2 documents
|
||||
- PikePDF raises an error
|
||||
THEN:
|
||||
- Rotate action should be called 0 times
|
||||
"""
|
||||
mock_pdf_save.side_effect = Exception("Error saving PDF")
|
||||
doc_ids = [self.doc2.id, self.doc3.id]
|
||||
|
||||
with self.assertLogs("paperless.bulk_edit", level="ERROR") as cm:
|
||||
bulk_edit.rotate(doc_ids, 90)
|
||||
error_str = cm.output[0]
|
||||
expected_str = "Error rotating document"
|
||||
self.assertIn(expected_str, error_str)
|
||||
mock_update_archive_file.assert_not_called()
|
||||
|
Loading…
x
Reference in New Issue
Block a user