Backend API support merge flag for bulk editing permissions

This commit is contained in:
shamoon 2024-01-22 12:07:47 -08:00
parent 8f3ab2791b
commit 93f3c3aa7b
6 changed files with 265 additions and 8 deletions

View File

@ -129,13 +129,17 @@ def redo_ocr(doc_ids):
return "OK"
def set_permissions(doc_ids, set_permissions, owner=None):
def set_permissions(doc_ids, set_permissions, owner=None, merge=False):
qs = Document.objects.filter(id__in=doc_ids)
qs.update(owner=owner)
if merge:
# If merging, only set owner for documents that don't have an owner
qs.filter(owner__isnull=True).update(owner=owner)
else:
qs.update(owner=owner)
for doc in qs:
set_permissions_for_object(set_permissions, doc)
set_permissions_for_object(permissions=set_permissions, object=doc, merge=merge)
affected_docs = [doc.id for doc in qs]

View File

@ -916,6 +916,8 @@ class BulkEditSerializer(DocumentListSerializer, SetPermissionsMixin):
)
if "owner" in parameters and parameters["owner"] is not None:
self._validate_owner(parameters["owner"])
if "merge" not in parameters:
parameters["merge"] = False
def validate(self, attrs):
method = attrs["method"]
@ -1258,6 +1260,12 @@ class BulkEditObjectPermissionsSerializer(serializers.Serializer, SetPermissions
write_only=True,
)
merge = serializers.BooleanField(
default=False,
write_only=True,
required=False,
)
def get_object_class(self, object_type):
object_class = None
if object_type == "tags":

View File

@ -0,0 +1,110 @@
from unittest import mock
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.test import TestCase
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_groups_with_perms
from guardian.shortcuts import get_users_with_perms
from documents.bulk_edit import set_permissions
from documents.models import Document
from documents.tests.utils import DirectoriesMixin
class TestBulkEditPermissions(DirectoriesMixin, TestCase):
def setUp(self):
super().setUp()
self.doc1 = Document.objects.create(checksum="A", title="A")
self.doc2 = Document.objects.create(checksum="B", title="B")
self.doc3 = Document.objects.create(checksum="C", title="C")
self.owner = User.objects.create(username="test_owner")
self.user1 = User.objects.create(username="user1")
self.user2 = User.objects.create(username="user2")
self.group1 = Group.objects.create(name="group1")
self.group2 = Group.objects.create(name="group2")
@mock.patch("documents.tasks.bulk_update_documents.delay")
def test_set_permissions(self, m):
doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id]
assign_perm("view_document", self.group1, self.doc1)
permissions = {
"view": {
"users": [self.user1.id, self.user2.id],
"groups": [self.group2.id],
},
"change": {
"users": [self.user1.id],
"groups": [self.group2.id],
},
}
set_permissions(
doc_ids,
set_permissions=permissions,
owner=self.owner,
merge=False,
)
m.assert_called_once()
self.assertEqual(Document.objects.filter(owner=self.owner).count(), 3)
self.assertEqual(Document.objects.filter(id__in=doc_ids).count(), 3)
users_with_perms = get_users_with_perms(
self.doc1,
)
self.assertEqual(users_with_perms.count(), 2)
# group1 should be replaced by group2
groups_with_perms = get_groups_with_perms(
self.doc1,
)
self.assertEqual(groups_with_perms.count(), 1)
@mock.patch("documents.tasks.bulk_update_documents.delay")
def test_set_permissions_merge(self, m):
doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id]
self.doc1.owner = self.user1
self.doc1.save()
assign_perm("view_document", self.user1, self.doc1)
assign_perm("view_document", self.group1, self.doc1)
permissions = {
"view": {
"users": [self.user2.id],
"groups": [self.group2.id],
},
"change": {
"users": [self.user2.id],
"groups": [self.group2.id],
},
}
set_permissions(
doc_ids,
set_permissions=permissions,
owner=self.owner,
merge=True,
)
m.assert_called_once()
# when merge is true owner doesn't get replaced if its not empty
self.assertEqual(Document.objects.filter(owner=self.owner).count(), 2)
self.assertEqual(Document.objects.filter(id__in=doc_ids).count(), 3)
# merge of user1 which was pre-existing and user2
users_with_perms = get_users_with_perms(
self.doc1,
)
self.assertEqual(users_with_perms.count(), 2)
# group1 should be merged by group2
groups_with_perms = get_groups_with_perms(
self.doc1,
)
self.assertEqual(groups_with_perms.count(), 2)

View File

@ -765,6 +765,58 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
self.assertCountEqual(args[0], [self.doc2.id, self.doc3.id])
self.assertEqual(len(kwargs["set_permissions"]["view"]["users"]), 2)
@mock.patch("documents.serialisers.bulk_edit.set_permissions")
def test_set_permissions_merge(self, m):
m.return_value = "OK"
user1 = User.objects.create(username="user1")
user2 = User.objects.create(username="user2")
permissions = {
"view": {
"users": [user1.id, user2.id],
"groups": None,
},
"change": {
"users": [user1.id],
"groups": None,
},
}
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"method": "set_permissions",
"parameters": {"set_permissions": permissions},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called()
args, kwargs = m.call_args
self.assertEqual(kwargs["merge"], False)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"method": "set_permissions",
"parameters": {"set_permissions": permissions, "merge": True},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called()
args, kwargs = m.call_args
self.assertEqual(kwargs["merge"], True)
@mock.patch("documents.serialisers.bulk_edit.set_permissions")
def test_insufficient_permissions_ownership(self, m):
"""

View File

@ -700,8 +700,8 @@ class TestBulkEditObjectPermissions(APITestCase):
def setUp(self):
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
self.temp_admin = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.temp_admin)
self.t1 = Tag.objects.create(name="t1")
self.t2 = Tag.objects.create(name="t2")
@ -822,6 +822,79 @@ class TestBulkEditObjectPermissions(APITestCase):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(StoragePath.objects.get(pk=self.sp1.id).owner, self.user3)
def test_bulk_object_set_permissions_merge(self):
"""
GIVEN:
- Existing objects
WHEN:
- bulk_edit_object_perms API endpoint is called with merge=True or merge=False (default)
THEN:
- Permissions and / or owner are replaced or merged, depending on the merge flag
"""
permissions = {
"view": {
"users": [self.user1.id, self.user2.id],
"groups": [],
},
"change": {
"users": [self.user1.id],
"groups": [],
},
}
assign_perm("view_tag", self.user3, self.t1)
self.t1.owner = self.user3
self.t1.save()
# merge=True
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.t1.id, self.t2.id],
"object_type": "tags",
"owner": self.user1.id,
"permissions": permissions,
"merge": True,
},
),
content_type="application/json",
)
self.t1.refresh_from_db()
self.t2.refresh_from_db()
self.assertEqual(response.status_code, status.HTTP_200_OK)
# user3 should still be owner of t1 since was set prior
self.assertEqual(self.t1.owner, self.user3)
# user1 should now be owner of t2 since it didn't have an owner
self.assertEqual(self.t2.owner, self.user1)
# user1 should be added
self.assertIn(self.user1, get_users_with_perms(self.t1))
# user3 should be preserved
self.assertIn(self.user3, get_users_with_perms(self.t1))
# merge=False (default)
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.t1.id, self.t2.id],
"object_type": "tags",
"permissions": permissions,
"merge": False,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# user1 should be added
self.assertIn(self.user1, get_users_with_perms(self.t1))
# user3 should be removed
self.assertNotIn(self.user3, get_users_with_perms(self.t1))
def test_bulk_edit_object_permissions_insufficient_perms(self):
"""
GIVEN:

View File

@ -1385,6 +1385,7 @@ class BulkEditObjectPermissionsView(GenericAPIView, PassUserMixin):
object_class = serializer.get_object_class(object_type)
permissions = serializer.validated_data.get("permissions")
owner = serializer.validated_data.get("owner")
merge = serializer.validated_data.get("merge")
if not user.is_superuser:
objs = object_class.objects.filter(pk__in=object_ids)
@ -1396,12 +1397,21 @@ class BulkEditObjectPermissionsView(GenericAPIView, PassUserMixin):
try:
qs = object_class.objects.filter(id__in=object_ids)
if "owner" in serializer.validated_data:
qs.update(owner=owner)
# if merge is true, we dont want to remove the owner
if "owner" in serializer.validated_data and (
not merge or (merge and owner is not None)
):
# if merge is true, we dont want to overwrite the owner
qs_owner_update = qs.filter(owner__isnull=True) if merge else qs
qs_owner_update.update(owner=owner)
if "permissions" in serializer.validated_data:
for obj in qs:
set_permissions_for_object(permissions, obj)
set_permissions_for_object(
permissions=permissions,
object=obj,
merge=merge,
)
return Response({"result": "OK"})
except Exception as e: