fix(upload): Fix uploading / dropping a folder

Issues:
- the `storage_path` being dropped to wasn't being considered when computing new document's `storage_path`
- include `original_filename` when computing filename
- add try/catch in `get_file_from_gcs`
- support dropping folders anywhere in the app
This commit is contained in:
Martin Tan 2023-10-12 12:24:53 +08:00
parent 892b033cdd
commit f8d16dcf4b
4 changed files with 287 additions and 262 deletions

View File

@ -268,7 +268,8 @@ export class AppComponent implements OnInit, OnDestroy {
this.fileLeave(true) this.fileLeave(true)
let storagePathId = parseInt(this.route.snapshot.queryParams['spid']) let storagePathId = parseInt(this.route.snapshot.queryParams['spid'])
storagePathId = !isNaN(storagePathId) ? storagePathId : undefined storagePathId = !isNaN(storagePathId) ? storagePathId : undefined
this.uploadDocumentsService.uploadFiles(files, { storagePathId }) const isUploadWithFolders = files.every(f => 'fullPath' in f.fileEntry && typeof f.fileEntry.fullPath === 'string' && (f.fileEntry.fullPath as string).split('/').filter(s => !!s).length > 1)
this.uploadDocumentsService.uploadFiles(files, { storagePathId, isUploadWithFolders })
this.toastService.showInfo($localize`Initiating upload...`, 3000) this.toastService.showInfo($localize`Initiating upload...`, 3000)
} }
} }

View File

@ -607,8 +607,22 @@ class Consumer(LoggingMixin):
) )
if self.full_path: if self.full_path:
# e.g. full_path: "/CDV#3500648756/OR#1161.pdf"
# e.g. ['CDV#3500648756']
folders = self.full_path.split('/')[:-1] folders = self.full_path.split('/')[:-1]
# remove empty values from splitting the leading slash
folders = [i for i in folders if i] folders = [i for i in folders if i]
# e.g. user dropped the file in storage path id 26
# which is "folder_test" or "test/test2/test3"
if document.storage_path:
# e.g. ['test', 'test2', 'test3']
parent_folders = document.storage_path.path.split('/')
# just double check that there are no empty values from leading slashes
parent_folders = [i for i in parent_folders if i]
# e.g. "test/test2/test3/CDV#3500648756"
folders = parent_folders + folders
folder_path = '/'.join(folders) folder_path = '/'.join(folders)
print(f'folder_path: {folder_path}') print(f'folder_path: {folder_path}')

View File

@ -1,248 +1,252 @@
import logging import logging
import os import os
from collections import defaultdict from collections import defaultdict
from pathlib import PurePath from pathlib import PurePath
# import uuid
import pathvalidate
from django.conf import settings import pathvalidate
from django.template.defaultfilters import slugify from django.conf import settings
from django.utils import timezone from django.template.defaultfilters import slugify
from documents.models import Document from django.utils import timezone
from documents.models import Document
logger = logging.getLogger("paperless.filehandling")
logger = logging.getLogger("paperless.filehandling")
class defaultdictNoStr(defaultdict):
def __str__(self): class defaultdictNoStr(defaultdict):
raise ValueError("Don't use {tags} directly.") def __str__(self):
raise ValueError("Don't use {tags} directly.")
def create_source_path_directory(source_path):
os.makedirs(os.path.dirname(source_path), exist_ok=True) def create_source_path_directory(source_path):
os.makedirs(os.path.dirname(source_path), exist_ok=True)
def delete_empty_directories(directory, root):
if not os.path.isdir(directory): def delete_empty_directories(directory, root):
return if not os.path.isdir(directory):
return
# Go up in the directory hierarchy and try to delete all directories
directory = os.path.normpath(directory) # Go up in the directory hierarchy and try to delete all directories
root = os.path.normpath(root) directory = os.path.normpath(directory)
root = os.path.normpath(root)
if not directory.startswith(root + os.path.sep):
# don't do anything outside our originals folder. if not directory.startswith(root + os.path.sep):
# don't do anything outside our originals folder.
# append os.path.set so that we avoid these cases:
# directory = /home/originals2/test # append os.path.set so that we avoid these cases:
# root = /home/originals ("/" gets appended and startswith fails) # directory = /home/originals2/test
return # root = /home/originals ("/" gets appended and startswith fails)
return
while directory != root:
if not os.listdir(directory): while directory != root:
# it's empty if not os.listdir(directory):
try: # it's empty
os.rmdir(directory) try:
except OSError: os.rmdir(directory)
# whatever. empty directories aren't that bad anyway. except OSError:
return # whatever. empty directories aren't that bad anyway.
else: return
# it's not empty. else:
return # it's not empty.
return
# go one level up
directory = os.path.normpath(os.path.dirname(directory)) # go one level up
directory = os.path.normpath(os.path.dirname(directory))
def many_to_dictionary(field):
# Converts ManyToManyField to dictionary by assuming, that field def many_to_dictionary(field):
# entries contain an _ or - which will be used as a delimiter # Converts ManyToManyField to dictionary by assuming, that field
mydictionary = dict() # entries contain an _ or - which will be used as a delimiter
mydictionary = dict()
for index, t in enumerate(field.all()):
# Populate tag names by index for index, t in enumerate(field.all()):
mydictionary[index] = slugify(t.name) # Populate tag names by index
mydictionary[index] = slugify(t.name)
# Find delimiter
delimiter = t.name.find("_") # Find delimiter
delimiter = t.name.find("_")
if delimiter == -1:
delimiter = t.name.find("-") if delimiter == -1:
delimiter = t.name.find("-")
if delimiter == -1:
continue if delimiter == -1:
continue
key = t.name[:delimiter]
value = t.name[delimiter + 1 :] key = t.name[:delimiter]
value = t.name[delimiter + 1 :]
mydictionary[slugify(key)] = slugify(value)
mydictionary[slugify(key)] = slugify(value)
return mydictionary
return mydictionary
def generate_unique_filename(doc, archive_filename=False):
""" def generate_unique_filename(doc, archive_filename=False):
Generates a unique filename for doc in settings.ORIGINALS_DIR. """
Generates a unique filename for doc in settings.ORIGINALS_DIR.
The returned filename is guaranteed to be either the current filename
of the document if unchanged, or a new filename that does not correspondent The returned filename is guaranteed to be either the current filename
to any existing files. The function will append _01, _02, etc to the of the document if unchanged, or a new filename that does not correspondent
filename before the extension to avoid conflicts. to any existing files. The function will append _01, _02, etc to the
filename before the extension to avoid conflicts.
If archive_filename is True, return a unique archive filename instead.
If archive_filename is True, return a unique archive filename instead.
"""
if archive_filename: """
old_filename = doc.archive_filename if archive_filename:
root = settings.ARCHIVE_DIR old_filename = doc.archive_filename
else: root = settings.ARCHIVE_DIR
old_filename = doc.filename else:
root = settings.ORIGINALS_DIR old_filename = doc.filename
root = settings.ORIGINALS_DIR
# If generating archive filenames, try to make a name that is similar to
# the original filename first. # If generating archive filenames, try to make a name that is similar to
# the original filename first.
if archive_filename and doc.filename:
new_filename = os.path.splitext(doc.filename)[0] + ".pdf" if archive_filename and doc.filename:
if new_filename == old_filename or not os.path.exists( new_filename = os.path.splitext(doc.filename)[0] + ".pdf"
os.path.join(root, new_filename), if new_filename == old_filename or not os.path.exists(
): os.path.join(root, new_filename),
return new_filename ):
return new_filename
counter = 0
counter = 0
while True:
new_filename = generate_filename( while True:
doc, new_filename = generate_filename(
counter, doc,
archive_filename=archive_filename, counter,
) archive_filename=archive_filename,
if new_filename == old_filename: )
# still the same as before. if new_filename == old_filename:
return new_filename # still the same as before.
return new_filename
if os.path.exists(os.path.join(root, new_filename)):
counter += 1 if os.path.exists(os.path.join(root, new_filename)):
else: counter += 1
return new_filename else:
return new_filename
# new_filename = str(uuid.uuid4()) + ".pdf"
def generate_filename( # if not os.path.exists(os.path.join(root, new_filename)):
doc: Document, # return new_filename
counter=0,
append_gpg=True,
archive_filename=False, def generate_filename(
): doc: Document,
path = "" counter=0,
filename_format = settings.FILENAME_FORMAT append_gpg=True,
archive_filename=False,
try: ):
if doc.storage_path is not None: path = ""
logger.debug( filename_format = settings.FILENAME_FORMAT
f"Document has storage_path {doc.storage_path.id} "
f"({doc.storage_path.path}) set", try:
) if doc.storage_path is not None:
filename_format = doc.storage_path.path logger.debug(
f"Document has storage_path {doc.storage_path.id} "
if filename_format is not None: f"({doc.storage_path.path}) set",
tags = defaultdictNoStr( )
lambda: slugify(None), filename_format = doc.storage_path.path + '/' + doc.original_filename
many_to_dictionary(doc.tags),
) if filename_format is not None:
tags = defaultdictNoStr(
tag_list = pathvalidate.sanitize_filename( lambda: slugify(None),
",".join( many_to_dictionary(doc.tags),
sorted(tag.name for tag in doc.tags.all()), )
),
replacement_text="-", tag_list = pathvalidate.sanitize_filename(
) ",".join(
sorted(tag.name for tag in doc.tags.all()),
no_value_default = "-none-" ),
replacement_text="-",
if doc.correspondent: )
correspondent = pathvalidate.sanitize_filename(
doc.correspondent.name, no_value_default = "-none-"
replacement_text="-",
) if doc.correspondent:
else: correspondent = pathvalidate.sanitize_filename(
correspondent = no_value_default doc.correspondent.name,
replacement_text="-",
if doc.document_type: )
document_type = pathvalidate.sanitize_filename( else:
doc.document_type.name, correspondent = no_value_default
replacement_text="-",
) if doc.document_type:
else: document_type = pathvalidate.sanitize_filename(
document_type = no_value_default doc.document_type.name,
replacement_text="-",
if doc.archive_serial_number: )
asn = str(doc.archive_serial_number) else:
else: document_type = no_value_default
asn = no_value_default
if doc.archive_serial_number:
if doc.owner is not None: asn = str(doc.archive_serial_number)
owner_username_str = str(doc.owner.username) else:
else: asn = no_value_default
owner_username_str = no_value_default
if doc.owner is not None:
if doc.original_filename is not None: owner_username_str = str(doc.owner.username)
# No extension else:
original_name = PurePath(doc.original_filename).with_suffix("").name owner_username_str = no_value_default
else:
original_name = no_value_default if doc.original_filename is not None:
# No extension
# Convert UTC database datetime to localized date original_name = PurePath(doc.original_filename).with_suffix("").name
local_added = timezone.localdate(doc.added) else:
local_created = timezone.localdate(doc.created) original_name = no_value_default
path = filename_format.format( # Convert UTC database datetime to localized date
title=pathvalidate.sanitize_filename(doc.title, replacement_text="-"), local_added = timezone.localdate(doc.added)
correspondent=correspondent, local_created = timezone.localdate(doc.created)
document_type=document_type,
created=local_created.isoformat(), path = filename_format.format(
created_year=local_created.strftime("%Y"), title=pathvalidate.sanitize_filename(doc.title, replacement_text="-"),
created_year_short=local_created.strftime("%y"), correspondent=correspondent,
created_month=local_created.strftime("%m"), document_type=document_type,
created_month_name=local_created.strftime("%B"), created=local_created.isoformat(),
created_month_name_short=local_created.strftime("%b"), created_year=local_created.strftime("%Y"),
created_day=local_created.strftime("%d"), created_year_short=local_created.strftime("%y"),
added=local_added.isoformat(), created_month=local_created.strftime("%m"),
added_year=local_added.strftime("%Y"), created_month_name=local_created.strftime("%B"),
added_year_short=local_added.strftime("%y"), created_month_name_short=local_created.strftime("%b"),
added_month=local_added.strftime("%m"), created_day=local_created.strftime("%d"),
added_month_name=local_added.strftime("%B"), added=local_added.isoformat(),
added_month_name_short=local_added.strftime("%b"), added_year=local_added.strftime("%Y"),
added_day=local_added.strftime("%d"), added_year_short=local_added.strftime("%y"),
asn=asn, added_month=local_added.strftime("%m"),
tags=tags, added_month_name=local_added.strftime("%B"),
tag_list=tag_list, added_month_name_short=local_added.strftime("%b"),
owner_username=owner_username_str, added_day=local_added.strftime("%d"),
original_name=original_name, asn=asn,
).strip() tags=tags,
tag_list=tag_list,
if settings.FILENAME_FORMAT_REMOVE_NONE: owner_username=owner_username_str,
path = path.replace("-none-/", "") # remove empty directories original_name=original_name,
path = path.replace(" -none-", "") # remove when spaced, with space ).strip()
path = path.replace("-none-", "") # remove rest of the occurences
if settings.FILENAME_FORMAT_REMOVE_NONE:
path = path.replace("-none-", "none") # backward compatibility path = path.replace("-none-/", "") # remove empty directories
path = path.strip(os.sep) path = path.replace(" -none-", "") # remove when spaced, with space
path = path.replace("-none-", "") # remove rest of the occurences
except (ValueError, KeyError, IndexError):
logger.warning( path = path.replace("-none-", "none") # backward compatibility
f"Invalid filename_format '{filename_format}', falling back to default", path = path.strip(os.sep)
)
except (ValueError, KeyError, IndexError):
counter_str = f"_{counter:02}" if counter else "" logger.warning(
f"Invalid filename_format '{filename_format}', falling back to default",
filetype_str = ".pdf" if archive_filename else doc.file_type )
if len(path) > 0: counter_str = f"_{counter:02}" if counter else ""
filename = f"{path}{counter_str}{filetype_str}"
else: filetype_str = ".pdf" if archive_filename else doc.file_type
filename = f"{doc.pk:07}{counter_str}{filetype_str}"
if len(path) > 0:
# Append .gpg for encrypted files filename = f"{path}{counter_str}{filetype_str}"
if append_gpg and doc.storage_type == doc.STORAGE_TYPE_GPG: else:
filename += ".gpg" filename = f"{doc.pk:07}{counter_str}{filetype_str}"
return filename # Append .gpg for encrypted files
if append_gpg and doc.storage_type == doc.STORAGE_TYPE_GPG:
filename += ".gpg"
return filename

View File

@ -30,20 +30,26 @@ def upload_file(source, target):
blob.upload_from_file(read_file_2) blob.upload_from_file(read_file_2)
def get_file_from_gcs(bucket_path): def get_file_from_gcs(bucket_path):
if (not client) or (not bucket): try:
raise Exception("Google Cloud Storage is not initialized.") if (not client) or (not bucket):
raise Exception("Google Cloud Storage is not initialized.")
# print("Getting blob from Google Cloud Storage") # print("Getting blob from Google Cloud Storage")
# Create a blob object representing the path in the bucket # Create a blob object representing the path in the bucket
blob = bucket.blob(str(bucket_path)) blob = bucket.blob(str(bucket_path))
# Download the file as a byte array # Download the file as a byte array
byte_stream = BytesIO() byte_stream = BytesIO()
# print("Downloading file from Google Cloud Storage") # print("Downloading file from Google Cloud Storage")
blob.download_to_file(byte_stream) blob.download_to_file(byte_stream)
# Seek to the start of the byte stream to allow reading from the beginning # Seek to the start of the byte stream to allow reading from the beginning
byte_stream.seek(0) byte_stream.seek(0)
# print("Returning downloaded file to caller") # print("Returning downloaded file to caller")
return byte_stream return byte_stream
except:
return None
def exists():
return False