First pass implementation of a basic custom metadata field per document
This commit is contained in:
parent
0def30210c
commit
021b1031dd
@ -80,7 +80,7 @@ django_checks() {
|
||||
|
||||
search_index() {
|
||||
|
||||
local -r index_version=6
|
||||
local -r index_version=7
|
||||
local -r index_version_file=${DATA_DIR}/.index_version
|
||||
|
||||
if [[ (! -f "${index_version_file}") || $(<"${index_version_file}") != "$index_version" ]]; then
|
||||
|
@ -30,6 +30,7 @@ from whoosh.searching import ResultsPage
|
||||
from whoosh.searching import Searcher
|
||||
from whoosh.writing import AsyncWriter
|
||||
|
||||
from documents.models import CustomMetadata
|
||||
from documents.models import Document
|
||||
from documents.models import Note
|
||||
from documents.models import User
|
||||
@ -60,6 +61,8 @@ def get_schema():
|
||||
has_path=BOOLEAN(),
|
||||
notes=TEXT(),
|
||||
num_notes=NUMERIC(sortable=True, signed=False),
|
||||
custom_metadata=TEXT(),
|
||||
custom_field_count=NUMERIC(sortable=True, signed=False),
|
||||
owner=TEXT(),
|
||||
owner_id=NUMERIC(),
|
||||
has_owner=BOOLEAN(),
|
||||
@ -69,7 +72,7 @@ def get_schema():
|
||||
)
|
||||
|
||||
|
||||
def open_index(recreate=False):
|
||||
def open_index(recreate=False) -> FileIndex:
|
||||
try:
|
||||
if exists_in(settings.INDEX_DIR) and not recreate:
|
||||
return open_dir(settings.INDEX_DIR, schema=get_schema())
|
||||
@ -82,7 +85,7 @@ def open_index(recreate=False):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_index_writer(optimize=False):
|
||||
def open_index_writer(optimize=False) -> AsyncWriter:
|
||||
writer = AsyncWriter(open_index())
|
||||
|
||||
try:
|
||||
@ -95,7 +98,7 @@ def open_index_writer(optimize=False):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_index_searcher():
|
||||
def open_index_searcher() -> Searcher:
|
||||
searcher = open_index().searcher()
|
||||
|
||||
try:
|
||||
@ -108,6 +111,9 @@ def update_document(writer: AsyncWriter, doc: Document):
|
||||
tags = ",".join([t.name for t in doc.tags.all()])
|
||||
tags_ids = ",".join([str(t.id) for t in doc.tags.all()])
|
||||
notes = ",".join([str(c.note) for c in Note.objects.filter(document=doc)])
|
||||
custom_fields = ",".join(
|
||||
[str(c) for c in CustomMetadata.objects.filter(document=doc)],
|
||||
)
|
||||
asn = doc.archive_serial_number
|
||||
if asn is not None and (
|
||||
asn < Document.ARCHIVE_SERIAL_NUMBER_MIN
|
||||
@ -147,6 +153,8 @@ def update_document(writer: AsyncWriter, doc: Document):
|
||||
has_path=doc.storage_path is not None,
|
||||
notes=notes,
|
||||
num_notes=len(notes),
|
||||
custom_metadata=custom_fields,
|
||||
custom_field_count=len(custom_fields),
|
||||
owner=doc.owner.username if doc.owner else None,
|
||||
owner_id=doc.owner.id if doc.owner else None,
|
||||
has_owner=doc.owner is not None,
|
||||
@ -156,20 +164,20 @@ def update_document(writer: AsyncWriter, doc: Document):
|
||||
)
|
||||
|
||||
|
||||
def remove_document(writer, doc):
|
||||
def remove_document(writer: AsyncWriter, doc: Document):
|
||||
remove_document_by_id(writer, doc.pk)
|
||||
|
||||
|
||||
def remove_document_by_id(writer, doc_id):
|
||||
def remove_document_by_id(writer: AsyncWriter, doc_id):
|
||||
writer.delete_by_term("id", doc_id)
|
||||
|
||||
|
||||
def add_or_update_document(document):
|
||||
def add_or_update_document(document: Document):
|
||||
with open_index_writer() as writer:
|
||||
update_document(writer, document)
|
||||
|
||||
|
||||
def remove_document_from_index(document):
|
||||
def remove_document_from_index(document: Document):
|
||||
with open_index_writer() as writer:
|
||||
remove_document(writer, document)
|
||||
|
||||
|
80
src/documents/migrations/1040_custommetadata.py
Normal file
80
src/documents/migrations/1040_custommetadata.py
Normal file
@ -0,0 +1,80 @@
|
||||
# Generated by Django 4.2.5 on 2023-10-20 15:48
|
||||
|
||||
import django.db.models.deletion
|
||||
import django.utils.timezone
|
||||
from django.conf import settings
|
||||
from django.db import migrations
|
||||
from django.db import models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
("documents", "1039_consumptiontemplate"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="CustomMetadata",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
(
|
||||
"data_type",
|
||||
models.CharField(
|
||||
choices=[
|
||||
("string", "String"),
|
||||
("url", "URL"),
|
||||
("date", "Date"),
|
||||
],
|
||||
default="string",
|
||||
max_length=50,
|
||||
),
|
||||
),
|
||||
("data", models.CharField(blank=True, max_length=512)),
|
||||
("name", models.CharField(blank=True, max_length=512)),
|
||||
(
|
||||
"created",
|
||||
models.DateTimeField(
|
||||
db_index=True,
|
||||
default=django.utils.timezone.now,
|
||||
verbose_name="created",
|
||||
),
|
||||
),
|
||||
(
|
||||
"document",
|
||||
models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="metadata",
|
||||
to="documents.document",
|
||||
verbose_name="document",
|
||||
),
|
||||
),
|
||||
(
|
||||
"user",
|
||||
models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
related_name="metadata",
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
verbose_name="user",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"verbose_name": "custom metadata",
|
||||
"verbose_name_plural": "custom metadata",
|
||||
"ordering": ("created",),
|
||||
},
|
||||
),
|
||||
]
|
@ -883,3 +883,83 @@ if settings.AUDIT_LOG_ENABLED:
|
||||
auditlog.register(Tag)
|
||||
auditlog.register(DocumentType)
|
||||
auditlog.register(Note)
|
||||
|
||||
|
||||
class CustomMetadata(models.Model):
|
||||
class DataType(models.TextChoices):
|
||||
STRING = ("string", _("String"))
|
||||
URL = ("url", _("URL"))
|
||||
DATE = ("date", _("Date"))
|
||||
|
||||
data_type = models.CharField(
|
||||
max_length=50,
|
||||
choices=DataType.choices,
|
||||
default=DataType.STRING,
|
||||
)
|
||||
|
||||
data = models.CharField(
|
||||
max_length=512,
|
||||
blank=True,
|
||||
)
|
||||
|
||||
name = models.CharField(
|
||||
max_length=512,
|
||||
blank=True,
|
||||
)
|
||||
|
||||
created = models.DateTimeField(
|
||||
_("created"),
|
||||
default=timezone.now,
|
||||
db_index=True,
|
||||
)
|
||||
|
||||
document = models.ForeignKey(
|
||||
Document,
|
||||
blank=True,
|
||||
null=True,
|
||||
related_name="metadata",
|
||||
on_delete=models.CASCADE,
|
||||
verbose_name=_("document"),
|
||||
)
|
||||
|
||||
user = models.ForeignKey(
|
||||
User,
|
||||
blank=True,
|
||||
null=True,
|
||||
related_name="metadata",
|
||||
on_delete=models.SET_NULL,
|
||||
verbose_name=_("user"),
|
||||
)
|
||||
|
||||
class Meta:
|
||||
ordering = ("created",)
|
||||
verbose_name = _("custom metadata")
|
||||
verbose_name_plural = _("custom metadata")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.data_type} : {self.name} : {self.data}"
|
||||
|
||||
def to_json(self) -> dict[str, str]:
|
||||
return {
|
||||
"id": self.id,
|
||||
"created": self.created,
|
||||
"type": self.data_type,
|
||||
"name": self.name,
|
||||
"data": self.data,
|
||||
"user": {
|
||||
"id": self.user.id,
|
||||
"username": self.user.username,
|
||||
"first_name": self.user.first_name,
|
||||
"last_name": self.user.last_name,
|
||||
},
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_json(document: Document, user: User, data) -> "CustomMetadata":
|
||||
return CustomMetadata.objects.create(
|
||||
document=document,
|
||||
data_type=data["type"],
|
||||
name=data["name"],
|
||||
data=data["data"],
|
||||
user=user,
|
||||
)
|
||||
|
@ -2421,7 +2421,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
f"/api/documents/{doc.pk}/notes/",
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(resp.content, b"Insufficient permissions to view")
|
||||
self.assertEqual(resp.content, b"Insufficient permissions to view notes")
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
assign_perm("view_document", user1, doc)
|
||||
@ -2430,7 +2430,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
f"/api/documents/{doc.pk}/notes/",
|
||||
data={"note": "this is a posted note"},
|
||||
)
|
||||
self.assertEqual(resp.content, b"Insufficient permissions to create")
|
||||
self.assertEqual(resp.content, b"Insufficient permissions to create notes")
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
note = Note.objects.create(
|
||||
@ -2444,7 +2444,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.content, b"Insufficient permissions to delete")
|
||||
self.assertEqual(response.content, b"Insufficient permissions to delete notes")
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_delete_note(self):
|
||||
@ -2694,7 +2694,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
f"/api/documents/{doc.pk}/share_links/",
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(resp.content, b"Insufficient permissions")
|
||||
self.assertEqual(resp.content, b"Insufficient permissions to add share link")
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
assign_perm("change_document", user1, doc)
|
||||
|
291
src/documents/tests/test_api_custom_metadata.py
Normal file
291
src/documents/tests/test_api_custom_metadata.py
Normal file
@ -0,0 +1,291 @@
|
||||
from datetime import timedelta
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.utils import timezone
|
||||
from guardian.shortcuts import assign_perm
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from documents.models import CustomMetadata
|
||||
from documents.models import Document
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
|
||||
|
||||
class TestCustomMetadata(DirectoriesMixin, APITestCase):
|
||||
def setUp(self):
|
||||
self.user = User.objects.create_superuser(username="temp_admin")
|
||||
self.client.force_authenticate(user=self.user)
|
||||
return super().setUp()
|
||||
|
||||
@staticmethod
|
||||
def create_json_no_date(metadata: CustomMetadata):
|
||||
"""
|
||||
Small helper to remove the created datatime from the JSON
|
||||
It doesn't matter to verify
|
||||
"""
|
||||
expected = metadata.to_json()
|
||||
del expected["created"]
|
||||
return expected
|
||||
|
||||
def test_get_existing_custom_metadata(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- A document with 2 different metadata attached to it
|
||||
WHEN:
|
||||
- API request for document custom metadata is made
|
||||
THEN:
|
||||
- Both associated values are returned
|
||||
"""
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
mime_type="application/pdf",
|
||||
content="this is a document which will have custom metadata on it! Neat",
|
||||
)
|
||||
|
||||
metadata1 = CustomMetadata.objects.create(
|
||||
data_type=CustomMetadata.DataType.STRING,
|
||||
name="Invoice Number",
|
||||
data="#123456",
|
||||
document=doc,
|
||||
user=self.user,
|
||||
)
|
||||
|
||||
metadata2 = CustomMetadata.objects.create(
|
||||
data_type=CustomMetadata.DataType.URL,
|
||||
name="October 20th, 2023 On This Day",
|
||||
data="https://en.wikipedia.org/wiki/Pope_Pius_XII",
|
||||
document=doc,
|
||||
user=self.user,
|
||||
)
|
||||
|
||||
all_metadata = [metadata1, metadata2]
|
||||
|
||||
response = self.client.get(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/",
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
resp_data = response.json()
|
||||
|
||||
self.assertEqual(len(resp_data), 2)
|
||||
|
||||
for idx, resp_data in enumerate(reversed(resp_data)):
|
||||
del resp_data["created"]
|
||||
|
||||
self.assertDictEqual(
|
||||
resp_data,
|
||||
self.create_json_no_date(all_metadata[idx]),
|
||||
)
|
||||
|
||||
def test_create_custom_metadata(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing document
|
||||
WHEN:
|
||||
- API request is made to add 2 custom metadata fields
|
||||
THEN:
|
||||
- metadata objects are created and associated with document
|
||||
- Document modified time is updated
|
||||
"""
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
mime_type="application/pdf",
|
||||
content="this is a document which will have custom_metadata added",
|
||||
created=timezone.now() - timedelta(days=1),
|
||||
)
|
||||
# set to yesterday
|
||||
doc.modified = timezone.now() - timedelta(days=1)
|
||||
self.assertEqual(doc.modified.day, (timezone.now() - timedelta(days=1)).day)
|
||||
|
||||
resp = self.client.post(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/",
|
||||
data={"type": "string", "name": "Custom Field 1", "data": "Custom Data 1"},
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
response = self.client.get(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/",
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
resp_data = response.json()
|
||||
|
||||
self.assertEqual(len(resp_data), 1)
|
||||
|
||||
resp_data = resp_data[0]
|
||||
|
||||
self.assertEqual(resp_data["data"], "Custom Data 1")
|
||||
|
||||
doc = Document.objects.get(pk=doc.pk)
|
||||
# modified was updated to today
|
||||
self.assertEqual(doc.modified.day, timezone.now().day)
|
||||
|
||||
def test_custom_metadata_view_add_delete_permissions_aware(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing document owned by user2 but with granted view perms for user1
|
||||
WHEN:
|
||||
- API request is made by user1 to add a custom metadata
|
||||
THEN:
|
||||
- custom metadata is not created
|
||||
"""
|
||||
user1 = User.objects.create_user(username="test1")
|
||||
user1.user_permissions.add(*Permission.objects.all())
|
||||
user1.save()
|
||||
|
||||
user2 = User.objects.create_user(username="test2")
|
||||
user2.save()
|
||||
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
mime_type="application/pdf",
|
||||
content="this is a document which will have custom_metadata added",
|
||||
)
|
||||
doc.owner = user2
|
||||
doc.save()
|
||||
|
||||
self.client.force_authenticate(user1)
|
||||
|
||||
resp = self.client.get(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/",
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(
|
||||
resp.content,
|
||||
b"Insufficient permissions to view custom metadata",
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
assign_perm("view_document", user1, doc)
|
||||
|
||||
resp = self.client.post(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/",
|
||||
data={"type": "string", "name": "Custom Field 1", "data": "Custom Data 1"},
|
||||
)
|
||||
self.assertEqual(
|
||||
resp.content,
|
||||
b"Insufficient permissions to create custom metadata",
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
metadata = CustomMetadata.objects.create(
|
||||
data_type=CustomMetadata.DataType.STRING,
|
||||
name="Invoice Number",
|
||||
data="#123456",
|
||||
document=doc,
|
||||
user=self.user,
|
||||
)
|
||||
|
||||
response = self.client.delete(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/?id={metadata.pk}",
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
response.content,
|
||||
b"Insufficient permissions to delete custom metadata",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_delete_custom_metadata(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing document, existing custom metadata
|
||||
WHEN:
|
||||
- API request is made to delete a custom metadata
|
||||
THEN:
|
||||
- custom metadata is deleted, document modified is updated
|
||||
"""
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
mime_type="application/pdf",
|
||||
content="this is a document which will have custom metadata!",
|
||||
created=timezone.now() - timedelta(days=1),
|
||||
)
|
||||
# set to yesterday
|
||||
doc.modified = timezone.now() - timedelta(days=1)
|
||||
self.assertEqual(doc.modified.day, (timezone.now() - timedelta(days=1)).day)
|
||||
|
||||
metadata = CustomMetadata.objects.create(
|
||||
data_type=CustomMetadata.DataType.DATE,
|
||||
name="Invoice Number",
|
||||
data="2023-10-20",
|
||||
document=doc,
|
||||
user=self.user,
|
||||
)
|
||||
|
||||
response = self.client.delete(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/?id={metadata.pk}",
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
self.assertEqual(len(CustomMetadata.objects.all()), 0)
|
||||
doc = Document.objects.get(pk=doc.pk)
|
||||
# modified was updated to today
|
||||
self.assertEqual(doc.modified.day, timezone.now().day)
|
||||
|
||||
def test_get_custom_metadata_no_doc(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- A request to get custom metadata from a non-existent document
|
||||
WHEN:
|
||||
- API request for document custom metadata is made
|
||||
THEN:
|
||||
- HTTP status.HTTP_404_NOT_FOUND is returned
|
||||
"""
|
||||
response = self.client.get(
|
||||
"/api/documents/500/custom_metadata/",
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
@mock.patch("documents.views.CustomMetadata.to_json")
|
||||
def test_get_custom_metadata_failure(self, mocked_to_json: MagicMock):
|
||||
mocked_to_json.side_effect = Exception("this failed somehow")
|
||||
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
mime_type="application/pdf",
|
||||
content="this is a document which will have custom metadata on it! Neat",
|
||||
)
|
||||
|
||||
_ = CustomMetadata.objects.create(
|
||||
data_type=CustomMetadata.DataType.STRING,
|
||||
name="Invoice Number",
|
||||
data="#123456",
|
||||
document=doc,
|
||||
user=self.user,
|
||||
)
|
||||
|
||||
response = self.client.get(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/",
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||||
|
||||
@mock.patch("documents.views.CustomMetadata.from_json")
|
||||
def test_add_custom_metadata_failure(self, mocked_from_json: MagicMock):
|
||||
mocked_from_json.side_effect = Exception("this failed somehow else")
|
||||
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
mime_type="application/pdf",
|
||||
content="this is a document which will have custom metadata on it! Neat",
|
||||
)
|
||||
|
||||
response = self.client.post(
|
||||
f"/api/documents/{doc.pk}/custom_metadata/",
|
||||
data={"type": "string", "name": "Custom Field 1", "data": "Custom Data 1"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
|
@ -153,7 +153,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
|
||||
manifest = self._do_export(use_filename_format=use_filename_format)
|
||||
|
||||
self.assertEqual(len(manifest), 159)
|
||||
self.assertEqual(len(manifest), 164)
|
||||
|
||||
# dont include consumer or AnonymousUser users
|
||||
self.assertEqual(
|
||||
@ -247,7 +247,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
self.assertEqual(Document.objects.get(id=self.d4.id).title, "wow_dec")
|
||||
self.assertEqual(GroupObjectPermission.objects.count(), 1)
|
||||
self.assertEqual(UserObjectPermission.objects.count(), 1)
|
||||
self.assertEqual(Permission.objects.count(), 116)
|
||||
self.assertEqual(Permission.objects.count(), 120)
|
||||
messages = check_sanity()
|
||||
# everything is alright after the test
|
||||
self.assertEqual(len(messages), 0)
|
||||
@ -676,15 +676,15 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
)
|
||||
|
||||
self.assertEqual(ContentType.objects.count(), 29)
|
||||
self.assertEqual(Permission.objects.count(), 116)
|
||||
self.assertEqual(ContentType.objects.count(), 30)
|
||||
self.assertEqual(Permission.objects.count(), 120)
|
||||
|
||||
manifest = self._do_export()
|
||||
|
||||
with paperless_environment():
|
||||
self.assertEqual(
|
||||
len(list(filter(lambda e: e["model"] == "auth.permission", manifest))),
|
||||
116,
|
||||
120,
|
||||
)
|
||||
# add 1 more to db to show objects are not re-created by import
|
||||
Permission.objects.create(
|
||||
@ -692,7 +692,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
codename="test_perm",
|
||||
content_type_id=1,
|
||||
)
|
||||
self.assertEqual(Permission.objects.count(), 117)
|
||||
self.assertEqual(Permission.objects.count(), 121)
|
||||
|
||||
# will cause an import error
|
||||
self.user.delete()
|
||||
@ -701,5 +701,5 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
with self.assertRaises(IntegrityError):
|
||||
call_command("document_importer", "--no-progress-bar", self.target)
|
||||
|
||||
self.assertEqual(ContentType.objects.count(), 29)
|
||||
self.assertEqual(Permission.objects.count(), 117)
|
||||
self.assertEqual(ContentType.objects.count(), 30)
|
||||
self.assertEqual(Permission.objects.count(), 121)
|
||||
|
@ -28,6 +28,7 @@ from django.http import HttpResponse
|
||||
from django.http import HttpResponseBadRequest
|
||||
from django.http import HttpResponseForbidden
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.http import HttpResponseServerError
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils import timezone
|
||||
from django.utils.decorators import method_decorator
|
||||
@ -78,6 +79,7 @@ from documents.matching import match_storage_paths
|
||||
from documents.matching import match_tags
|
||||
from documents.models import ConsumptionTemplate
|
||||
from documents.models import Correspondent
|
||||
from documents.models import CustomMetadata
|
||||
from documents.models import Document
|
||||
from documents.models import DocumentType
|
||||
from documents.models import Note
|
||||
@ -497,7 +499,7 @@ class DocumentViewSet(
|
||||
"view_document",
|
||||
doc,
|
||||
):
|
||||
return HttpResponseForbidden("Insufficient permissions to view")
|
||||
return HttpResponseForbidden("Insufficient permissions to view notes")
|
||||
except Document.DoesNotExist:
|
||||
raise Http404
|
||||
|
||||
@ -507,7 +509,7 @@ class DocumentViewSet(
|
||||
except Exception as e:
|
||||
logger.warning(f"An error occurred retrieving notes: {e!s}")
|
||||
return Response(
|
||||
{"error": "Error retreiving notes, check logs for more detail."},
|
||||
{"error": "Error retrieving notes, check logs for more detail."},
|
||||
)
|
||||
elif request.method == "POST":
|
||||
try:
|
||||
@ -516,7 +518,9 @@ class DocumentViewSet(
|
||||
"change_document",
|
||||
doc,
|
||||
):
|
||||
return HttpResponseForbidden("Insufficient permissions to create")
|
||||
return HttpResponseForbidden(
|
||||
"Insufficient permissions to create notes",
|
||||
)
|
||||
|
||||
c = Note.objects.create(
|
||||
document=doc,
|
||||
@ -558,7 +562,7 @@ class DocumentViewSet(
|
||||
"change_document",
|
||||
doc,
|
||||
):
|
||||
return HttpResponseForbidden("Insufficient permissions to delete")
|
||||
return HttpResponseForbidden("Insufficient permissions to delete notes")
|
||||
|
||||
note = Note.objects.get(id=int(request.GET.get("id")))
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
@ -599,7 +603,9 @@ class DocumentViewSet(
|
||||
"change_document",
|
||||
doc,
|
||||
):
|
||||
return HttpResponseForbidden("Insufficient permissions")
|
||||
return HttpResponseForbidden(
|
||||
"Insufficient permissions to add share link",
|
||||
)
|
||||
except Document.DoesNotExist:
|
||||
raise Http404
|
||||
|
||||
@ -618,6 +624,100 @@ class DocumentViewSet(
|
||||
]
|
||||
return Response(links)
|
||||
|
||||
@action(methods=["get", "post", "delete"], detail=True)
|
||||
def custom_metadata(self, request, pk=None) -> Response:
|
||||
def package_custom_metadata(doc: Document):
|
||||
return [
|
||||
c.to_json()
|
||||
for c in CustomMetadata.objects.filter(document=doc).order_by(
|
||||
"-created",
|
||||
)
|
||||
]
|
||||
|
||||
request.user = request.user
|
||||
try:
|
||||
doc = Document.objects.get(pk=pk)
|
||||
if request.user is not None and not has_perms_owner_aware(
|
||||
request.user,
|
||||
"view_document",
|
||||
doc,
|
||||
):
|
||||
return HttpResponseForbidden(
|
||||
"Insufficient permissions to view custom metadata",
|
||||
)
|
||||
except Document.DoesNotExist:
|
||||
raise Http404
|
||||
|
||||
if request.method == "GET":
|
||||
try:
|
||||
return Response(package_custom_metadata(doc))
|
||||
except Exception as e:
|
||||
logger.warning(f"An error occurred retrieving custom metadata: {e!s}")
|
||||
return HttpResponseServerError(
|
||||
{
|
||||
"error": (
|
||||
"Error retrieving custom metadata,"
|
||||
" check logs for more detail."
|
||||
),
|
||||
},
|
||||
)
|
||||
elif request.method == "POST":
|
||||
try:
|
||||
if request.user is not None and not has_perms_owner_aware(
|
||||
request.user,
|
||||
"change_document",
|
||||
doc,
|
||||
):
|
||||
return HttpResponseForbidden(
|
||||
"Insufficient permissions to create custom metadata",
|
||||
)
|
||||
|
||||
CustomMetadata.from_json(doc, request.user, request.data)
|
||||
|
||||
doc.modified = timezone.now()
|
||||
doc.save()
|
||||
|
||||
from documents import index
|
||||
|
||||
index.add_or_update_document(self.get_object())
|
||||
|
||||
return Response(package_custom_metadata(doc))
|
||||
except Exception as e:
|
||||
logger.warning(f"An error occurred saving custom metadata: {e!s}")
|
||||
return HttpResponseServerError(
|
||||
{
|
||||
"error": (
|
||||
"Error saving custom metadata, "
|
||||
"check logs for more detail."
|
||||
),
|
||||
},
|
||||
)
|
||||
elif request.method == "DELETE":
|
||||
if request.user is not None and not has_perms_owner_aware(
|
||||
request.user,
|
||||
"change_document",
|
||||
doc,
|
||||
):
|
||||
return HttpResponseForbidden(
|
||||
"Insufficient permissions to delete custom metadata",
|
||||
)
|
||||
|
||||
metadata = CustomMetadata.objects.get(id=int(request.GET.get("id")))
|
||||
metadata.delete()
|
||||
|
||||
doc.modified = timezone.now()
|
||||
doc.save()
|
||||
|
||||
from documents import index
|
||||
|
||||
index.add_or_update_document(self.get_object())
|
||||
|
||||
return Response(package_custom_metadata(doc))
|
||||
|
||||
return Response(
|
||||
{"error": "unreachable error was reached for custom metadata"},
|
||||
) # pragma: no cover
|
||||
|
||||
|
||||
class SearchResultSerializer(DocumentSerializer, PassUserMixin):
|
||||
def to_representation(self, instance):
|
||||
|
Loading…
x
Reference in New Issue
Block a user