initial work on using DTL for rendering the filenames
This commit is contained in:
parent
2ab71137b9
commit
1f2a789c24
@ -1,23 +1,26 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from collections import defaultdict
|
import re
|
||||||
|
from collections.abc import Iterable
|
||||||
from pathlib import PurePath
|
from pathlib import PurePath
|
||||||
|
|
||||||
import pathvalidate
|
import pathvalidate
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.template.defaultfilters import slugify
|
from django.template import Context
|
||||||
|
from django.template import Template
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
|
from documents.models import Correspondent
|
||||||
|
from documents.models import CustomField
|
||||||
|
from documents.models import CustomFieldInstance
|
||||||
from documents.models import Document
|
from documents.models import Document
|
||||||
|
from documents.models import DocumentType
|
||||||
|
from documents.models import StoragePath
|
||||||
|
from documents.models import Tag
|
||||||
|
|
||||||
logger = logging.getLogger("paperless.filehandling")
|
logger = logging.getLogger("paperless.filehandling")
|
||||||
|
|
||||||
|
|
||||||
class defaultdictNoStr(defaultdict):
|
|
||||||
def __str__(self):
|
|
||||||
raise ValueError("Don't use {tags} directly.")
|
|
||||||
|
|
||||||
|
|
||||||
def create_source_path_directory(source_path):
|
def create_source_path_directory(source_path):
|
||||||
os.makedirs(os.path.dirname(source_path), exist_ok=True)
|
os.makedirs(os.path.dirname(source_path), exist_ok=True)
|
||||||
|
|
||||||
@ -54,32 +57,6 @@ def delete_empty_directories(directory, root):
|
|||||||
directory = os.path.normpath(os.path.dirname(directory))
|
directory = os.path.normpath(os.path.dirname(directory))
|
||||||
|
|
||||||
|
|
||||||
def many_to_dictionary(field):
|
|
||||||
# Converts ManyToManyField to dictionary by assuming, that field
|
|
||||||
# entries contain an _ or - which will be used as a delimiter
|
|
||||||
mydictionary = dict()
|
|
||||||
|
|
||||||
for index, t in enumerate(field.all()):
|
|
||||||
# Populate tag names by index
|
|
||||||
mydictionary[index] = slugify(t.name)
|
|
||||||
|
|
||||||
# Find delimiter
|
|
||||||
delimiter = t.name.find("_")
|
|
||||||
|
|
||||||
if delimiter == -1:
|
|
||||||
delimiter = t.name.find("-")
|
|
||||||
|
|
||||||
if delimiter == -1:
|
|
||||||
continue
|
|
||||||
|
|
||||||
key = t.name[:delimiter]
|
|
||||||
value = t.name[delimiter + 1 :]
|
|
||||||
|
|
||||||
mydictionary[slugify(key)] = slugify(value)
|
|
||||||
|
|
||||||
return mydictionary
|
|
||||||
|
|
||||||
|
|
||||||
def generate_unique_filename(doc, archive_filename=False):
|
def generate_unique_filename(doc, archive_filename=False):
|
||||||
"""
|
"""
|
||||||
Generates a unique filename for doc in settings.ORIGINALS_DIR.
|
Generates a unique filename for doc in settings.ORIGINALS_DIR.
|
||||||
@ -127,6 +104,181 @@ def generate_unique_filename(doc, archive_filename=False):
|
|||||||
return new_filename
|
return new_filename
|
||||||
|
|
||||||
|
|
||||||
|
def create_dummy_document():
|
||||||
|
"""Create a dummy Document instance with all possible fields filled, including tags and custom fields."""
|
||||||
|
# Populate the document with representative values for every field
|
||||||
|
dummy_doc = Document(
|
||||||
|
pk=1,
|
||||||
|
title="Sample Title",
|
||||||
|
correspondent=Correspondent(name="Sample Correspondent"),
|
||||||
|
storage_path=StoragePath(path="/dummy/path"),
|
||||||
|
document_type=DocumentType(name="Sample Type"),
|
||||||
|
content="This is some sample document content.",
|
||||||
|
mime_type="application/pdf",
|
||||||
|
checksum="dummychecksum12345678901234567890123456789012",
|
||||||
|
archive_checksum="dummyarchivechecksum123456789012345678901234",
|
||||||
|
page_count=5,
|
||||||
|
created=timezone.now(),
|
||||||
|
modified=timezone.now(),
|
||||||
|
storage_type=Document.STORAGE_TYPE_UNENCRYPTED,
|
||||||
|
added=timezone.now(),
|
||||||
|
filename="/dummy/filename.pdf",
|
||||||
|
archive_filename="/dummy/archive_filename.pdf",
|
||||||
|
original_filename="original_file.pdf",
|
||||||
|
archive_serial_number=12345,
|
||||||
|
)
|
||||||
|
return dummy_doc
|
||||||
|
|
||||||
|
|
||||||
|
def get_creation_date_context(document: Document) -> dict[str, str]:
|
||||||
|
local_created = timezone.localdate(document.created)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"created": local_created.isoformat(),
|
||||||
|
"created_year": local_created.strftime("%Y"),
|
||||||
|
"created_year_short": local_created.strftime("%y"),
|
||||||
|
"created_month": local_created.strftime("%m"),
|
||||||
|
"created_month_name": local_created.strftime("%B"),
|
||||||
|
"created_month_name_short": local_created.strftime("%b"),
|
||||||
|
"created_day": local_created.strftime("%d"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_added_date_context(document: Document) -> dict[str, str]:
|
||||||
|
local_added = timezone.localdate(document.added)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"added": local_added.isoformat(),
|
||||||
|
"added_year": local_added.strftime("%Y"),
|
||||||
|
"added_year_short": local_added.strftime("%y"),
|
||||||
|
"added_month": local_added.strftime("%m"),
|
||||||
|
"added_month_name": local_added.strftime("%B"),
|
||||||
|
"added_month_name_short": local_added.strftime("%b"),
|
||||||
|
"added_day": local_added.strftime("%d"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_basic_metadata_context(
|
||||||
|
document: Document,
|
||||||
|
*,
|
||||||
|
no_value_default: str,
|
||||||
|
) -> dict[str, str]:
|
||||||
|
return {
|
||||||
|
"title": pathvalidate.sanitize_filename(
|
||||||
|
document.title,
|
||||||
|
replacement_text="-",
|
||||||
|
),
|
||||||
|
"correspondent": pathvalidate.sanitize_filename(
|
||||||
|
document.correspondent.name,
|
||||||
|
replacement_text="-",
|
||||||
|
)
|
||||||
|
if document.correspondent
|
||||||
|
else no_value_default,
|
||||||
|
"document_type": pathvalidate.sanitize_filename(
|
||||||
|
document.document_type.name,
|
||||||
|
replacement_text="-",
|
||||||
|
)
|
||||||
|
if document.document_type
|
||||||
|
else no_value_default,
|
||||||
|
"asn": str(document.archive_serial_number)
|
||||||
|
if document.archive_serial_number
|
||||||
|
else no_value_default,
|
||||||
|
"owner_username": document.owner.username
|
||||||
|
if document.owner
|
||||||
|
else no_value_default,
|
||||||
|
"original_name": PurePath(document.original_filename).with_suffix("").name
|
||||||
|
if document.original_filename
|
||||||
|
else no_value_default,
|
||||||
|
"doc_pk": f"{document.pk:07}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_tags_context(tags: Iterable[Tag]) -> dict[str, str]:
|
||||||
|
return {
|
||||||
|
"tags_list": pathvalidate.sanitize_filename(
|
||||||
|
",".join(
|
||||||
|
sorted(tag.name for tag in tags),
|
||||||
|
),
|
||||||
|
replacement_text="-",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_custom_fields_context(
|
||||||
|
custom_fields: Iterable[CustomFieldInstance],
|
||||||
|
) -> dict[str, dict[str, str]]:
|
||||||
|
return {
|
||||||
|
pathvalidate.sanitize_filename(
|
||||||
|
field_instance.field.name,
|
||||||
|
replacement_text="-",
|
||||||
|
): {
|
||||||
|
"type": pathvalidate.sanitize_filename(
|
||||||
|
field_instance.field.data_type,
|
||||||
|
replacement_text="-",
|
||||||
|
),
|
||||||
|
"value": pathvalidate.sanitize_filename(
|
||||||
|
field_instance.value,
|
||||||
|
replacement_text="-",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
for field_instance in custom_fields
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def validate_template_and_render(
|
||||||
|
template_string: str,
|
||||||
|
document: Document | None = None,
|
||||||
|
) -> str | None:
|
||||||
|
"""
|
||||||
|
Renders the given template string using either the given Document or using a dummy Document and data
|
||||||
|
|
||||||
|
Returns None if the string is not valid or an error occurred, otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create the dummy document object with all fields filled in for validation purposes
|
||||||
|
if document is None:
|
||||||
|
document = create_dummy_document()
|
||||||
|
tags_list = [Tag(name="Test Tag 1"), Tag(name="Another Test Tag")]
|
||||||
|
custom_fields = [
|
||||||
|
CustomFieldInstance(
|
||||||
|
field=CustomField(
|
||||||
|
name="Text Custom Field",
|
||||||
|
data_type=CustomField.FieldDataType.STRING,
|
||||||
|
),
|
||||||
|
value_text="Some String Text",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
# or use the real document information
|
||||||
|
logger.info("Using real document")
|
||||||
|
tags_list = document.tags.all()
|
||||||
|
custom_fields = document.custom_fields.all()
|
||||||
|
|
||||||
|
context = (
|
||||||
|
{"document": document}
|
||||||
|
| get_basic_metadata_context(document, no_value_default="-none-")
|
||||||
|
| get_creation_date_context(document)
|
||||||
|
| get_added_date_context(document)
|
||||||
|
| get_tags_context(tags_list)
|
||||||
|
| get_custom_fields_context(custom_fields)
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(context)
|
||||||
|
|
||||||
|
# Try rendering the template
|
||||||
|
try:
|
||||||
|
template = Template(template_string)
|
||||||
|
rendered_template = template.render(Context(context))
|
||||||
|
logger.info(f"Template is valid and rendered successfully: {rendered_template}")
|
||||||
|
return rendered_template
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error in filename generation: {e}")
|
||||||
|
logger.warning(
|
||||||
|
f"Invalid filename_format '{template_string}', falling back to default",
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def generate_filename(
|
def generate_filename(
|
||||||
doc: Document,
|
doc: Document,
|
||||||
counter=0,
|
counter=0,
|
||||||
@ -134,116 +286,86 @@ def generate_filename(
|
|||||||
archive_filename=False,
|
archive_filename=False,
|
||||||
):
|
):
|
||||||
path = ""
|
path = ""
|
||||||
filename_format = settings.FILENAME_FORMAT
|
|
||||||
|
|
||||||
try:
|
def convert_to_django_template_format(old_format):
|
||||||
if doc.storage_path is not None:
|
"""
|
||||||
logger.debug(
|
Converts old Python string format (with {}) to Django template style (with {{ }}),
|
||||||
f"Document has storage_path {doc.storage_path.id} "
|
while ignoring existing {{ ... }} placeholders.
|
||||||
f"({doc.storage_path.path}) set",
|
|
||||||
)
|
|
||||||
filename_format = doc.storage_path.path
|
|
||||||
|
|
||||||
if filename_format is not None:
|
:param old_format: The old style format string (e.g., "{title} by {author}")
|
||||||
tags = defaultdictNoStr(
|
:return: Converted string in Django Template style (e.g., "{{ title }} by {{ author }}")
|
||||||
lambda: slugify(None),
|
"""
|
||||||
many_to_dictionary(doc.tags),
|
|
||||||
)
|
|
||||||
|
|
||||||
tag_list = pathvalidate.sanitize_filename(
|
# Step 1: Match placeholders with single curly braces but not those with double braces
|
||||||
",".join(
|
pattern = r"(?<!\{)\{(\w*)\}(?!\})" # Matches {var} but not {{var}}
|
||||||
sorted(tag.name for tag in doc.tags.all()),
|
|
||||||
),
|
|
||||||
replacement_text="-",
|
|
||||||
)
|
|
||||||
|
|
||||||
no_value_default = "-none-"
|
# Step 2: Replace the placeholders with {{ var }} or {{ }}
|
||||||
|
def replace_with_django(match):
|
||||||
|
variable = match.group(1) # The variable inside the braces
|
||||||
|
return f"{{{{ {variable} }}}}" # Convert to {{ variable }}
|
||||||
|
|
||||||
if doc.correspondent:
|
# Apply the substitution
|
||||||
correspondent = pathvalidate.sanitize_filename(
|
converted_format = re.sub(pattern, replace_with_django, old_format)
|
||||||
doc.correspondent.name,
|
|
||||||
replacement_text="-",
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
correspondent = no_value_default
|
|
||||||
|
|
||||||
if doc.document_type:
|
return converted_format
|
||||||
document_type = pathvalidate.sanitize_filename(
|
|
||||||
doc.document_type.name,
|
|
||||||
replacement_text="-",
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
document_type = no_value_default
|
|
||||||
|
|
||||||
if doc.archive_serial_number:
|
def format_filename(document: Document, template_str: str) -> str | None:
|
||||||
asn = str(doc.archive_serial_number)
|
rendered_filename = validate_template_and_render(template_str, document)
|
||||||
else:
|
if rendered_filename is None:
|
||||||
asn = no_value_default
|
return None
|
||||||
|
|
||||||
if doc.owner is not None:
|
logger.info(rendered_filename)
|
||||||
owner_username_str = str(doc.owner.username)
|
|
||||||
else:
|
|
||||||
owner_username_str = no_value_default
|
|
||||||
|
|
||||||
if doc.original_filename is not None:
|
if settings.FILENAME_FORMAT_REMOVE_NONE:
|
||||||
# No extension
|
rendered_filename = rendered_filename.replace("/-none-/", "/")
|
||||||
original_name = PurePath(doc.original_filename).with_suffix("").name
|
rendered_filename = rendered_filename.replace(" -none-", "")
|
||||||
else:
|
rendered_filename = rendered_filename.replace("-none-", "")
|
||||||
original_name = no_value_default
|
|
||||||
|
|
||||||
# Convert UTC database datetime to localized date
|
rendered_filename = rendered_filename.replace(
|
||||||
local_added = timezone.localdate(doc.added)
|
"-none-",
|
||||||
local_created = timezone.localdate(doc.created)
|
"none",
|
||||||
|
) # backward compatibility
|
||||||
|
|
||||||
path = filename_format.format(
|
rendered_filename = (
|
||||||
title=pathvalidate.sanitize_filename(doc.title, replacement_text="-"),
|
rendered_filename.strip(os.sep).replace("\n", "").replace("\r", "")
|
||||||
correspondent=correspondent,
|
|
||||||
document_type=document_type,
|
|
||||||
created=local_created.isoformat(),
|
|
||||||
created_year=local_created.strftime("%Y"),
|
|
||||||
created_year_short=local_created.strftime("%y"),
|
|
||||||
created_month=local_created.strftime("%m"),
|
|
||||||
created_month_name=local_created.strftime("%B"),
|
|
||||||
created_month_name_short=local_created.strftime("%b"),
|
|
||||||
created_day=local_created.strftime("%d"),
|
|
||||||
added=local_added.isoformat(),
|
|
||||||
added_year=local_added.strftime("%Y"),
|
|
||||||
added_year_short=local_added.strftime("%y"),
|
|
||||||
added_month=local_added.strftime("%m"),
|
|
||||||
added_month_name=local_added.strftime("%B"),
|
|
||||||
added_month_name_short=local_added.strftime("%b"),
|
|
||||||
added_day=local_added.strftime("%d"),
|
|
||||||
asn=asn,
|
|
||||||
tags=tags,
|
|
||||||
tag_list=tag_list,
|
|
||||||
owner_username=owner_username_str,
|
|
||||||
original_name=original_name,
|
|
||||||
doc_pk=f"{doc.pk:07}",
|
|
||||||
).strip()
|
|
||||||
|
|
||||||
if settings.FILENAME_FORMAT_REMOVE_NONE:
|
|
||||||
path = path.replace("/-none-/", "/") # remove empty directories
|
|
||||||
path = path.replace(" -none-", "") # remove when spaced, with space
|
|
||||||
path = path.replace("-none-", "") # remove rest of the occurrences
|
|
||||||
|
|
||||||
path = path.replace("-none-", "none") # backward compatibility
|
|
||||||
path = path.strip(os.sep)
|
|
||||||
|
|
||||||
except (ValueError, KeyError, IndexError):
|
|
||||||
logger.warning(
|
|
||||||
f"Invalid filename_format '{filename_format}', falling back to default",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
counter_str = f"_{counter:02}" if counter else ""
|
return rendered_filename
|
||||||
|
|
||||||
|
# Determine the source of the format string
|
||||||
|
if doc.storage_path is not None:
|
||||||
|
logger.debug(
|
||||||
|
f"Document has storage_path {doc.storage_path.pk} "
|
||||||
|
f"({doc.storage_path.path}) set",
|
||||||
|
)
|
||||||
|
filename_format = doc.storage_path.path
|
||||||
|
elif settings.FILENAME_FORMAT is not None:
|
||||||
|
# Maybe convert old to new style
|
||||||
|
filename_format = convert_to_django_template_format(
|
||||||
|
settings.FILENAME_FORMAT,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Warn the user they should update
|
||||||
|
if filename_format != settings.FILENAME_FORMAT:
|
||||||
|
logger.warning(
|
||||||
|
f"Filename format {settings.FILENAME_FORMAT} is using the old style, please update to use double curly brackets",
|
||||||
|
)
|
||||||
|
logger.info(filename_format)
|
||||||
|
else:
|
||||||
|
filename_format = None
|
||||||
|
|
||||||
|
# If we have one, render it
|
||||||
|
if filename_format is not None:
|
||||||
|
path = format_filename(doc, filename_format)
|
||||||
|
|
||||||
|
counter_str = f"_{counter:02}" if counter else ""
|
||||||
filetype_str = ".pdf" if archive_filename else doc.file_type
|
filetype_str = ".pdf" if archive_filename else doc.file_type
|
||||||
|
|
||||||
if len(path) > 0:
|
if path:
|
||||||
filename = f"{path}{counter_str}{filetype_str}"
|
filename = f"{path}{counter_str}{filetype_str}"
|
||||||
else:
|
else:
|
||||||
filename = f"{doc.pk:07}{counter_str}{filetype_str}"
|
filename = f"{doc.pk:07}{counter_str}{filetype_str}"
|
||||||
|
|
||||||
# Append .gpg for encrypted files
|
|
||||||
if append_gpg and doc.storage_type == doc.STORAGE_TYPE_GPG:
|
if append_gpg and doc.storage_type == doc.STORAGE_TYPE_GPG:
|
||||||
filename += ".gpg"
|
filename += ".gpg"
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import hashlib
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
from collections import defaultdict
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
import pathvalidate
|
import pathvalidate
|
||||||
@ -12,14 +13,41 @@ from django.db import migrations
|
|||||||
from django.db import models
|
from django.db import models
|
||||||
from django.template.defaultfilters import slugify
|
from django.template.defaultfilters import slugify
|
||||||
|
|
||||||
from documents.file_handling import defaultdictNoStr
|
|
||||||
from documents.file_handling import many_to_dictionary
|
|
||||||
|
|
||||||
logger = logging.getLogger("paperless.migrations")
|
logger = logging.getLogger("paperless.migrations")
|
||||||
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# This is code copied straight paperless before the change.
|
# This is code copied straight paperless before the change.
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
class defaultdictNoStr(defaultdict):
|
||||||
|
def __str__(self):
|
||||||
|
raise ValueError("Don't use {tags} directly.")
|
||||||
|
|
||||||
|
|
||||||
|
def many_to_dictionary(field):
|
||||||
|
# Converts ManyToManyField to dictionary by assuming, that field
|
||||||
|
# entries contain an _ or - which will be used as a delimiter
|
||||||
|
mydictionary = dict()
|
||||||
|
|
||||||
|
for index, t in enumerate(field.all()):
|
||||||
|
# Populate tag names by index
|
||||||
|
mydictionary[index] = slugify(t.name)
|
||||||
|
|
||||||
|
# Find delimiter
|
||||||
|
delimiter = t.name.find("_")
|
||||||
|
|
||||||
|
if delimiter == -1:
|
||||||
|
delimiter = t.name.find("-")
|
||||||
|
|
||||||
|
if delimiter == -1:
|
||||||
|
continue
|
||||||
|
|
||||||
|
key = t.name[:delimiter]
|
||||||
|
value = t.name[delimiter + 1 :]
|
||||||
|
|
||||||
|
mydictionary[slugify(key)] = slugify(value)
|
||||||
|
|
||||||
|
return mydictionary
|
||||||
|
|
||||||
|
|
||||||
def archive_name_from_filename(filename):
|
def archive_name_from_filename(filename):
|
||||||
|
@ -501,7 +501,7 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
|||||||
self.assertIsFile(os.path.join(tmp, "notempty", "file"))
|
self.assertIsFile(os.path.join(tmp, "notempty", "file"))
|
||||||
self.assertIsNotDir(os.path.join(tmp, "notempty", "empty"))
|
self.assertIsNotDir(os.path.join(tmp, "notempty", "empty"))
|
||||||
|
|
||||||
@override_settings(FILENAME_FORMAT="{created/[title]")
|
@override_settings(FILENAME_FORMAT="{% if x is None %}/{title]")
|
||||||
def test_invalid_format(self):
|
def test_invalid_format(self):
|
||||||
document = Document()
|
document = Document()
|
||||||
document.pk = 1
|
document.pk = 1
|
||||||
@ -957,7 +957,7 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
|
|||||||
mime_type="application/pdf",
|
mime_type="application/pdf",
|
||||||
pk=2,
|
pk=2,
|
||||||
checksum="2",
|
checksum="2",
|
||||||
storage_path=StoragePath.objects.create(path="TestFolder/{created}"),
|
storage_path=StoragePath.objects.create(path="TestFolder/{{created}}"),
|
||||||
)
|
)
|
||||||
self.assertEqual(generate_filename(doc), "TestFolder/2020-06-25.pdf")
|
self.assertEqual(generate_filename(doc), "TestFolder/2020-06-25.pdf")
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user