from unittest import mock from django.contrib.auth.models import Group from django.contrib.auth.models import User from django.test import TestCase from guardian.shortcuts import assign_perm from guardian.shortcuts import get_groups_with_perms from guardian.shortcuts import get_users_with_perms from documents import bulk_edit from documents.models import Correspondent from documents.models import Document from documents.models import DocumentType from documents.models import StoragePath from documents.models import Tag from documents.tests.utils import DirectoriesMixin class TestBulkEdit(DirectoriesMixin, TestCase): def setUp(self): super().setUp() self.owner = User.objects.create(username="test_owner") self.user1 = User.objects.create(username="user1") self.user2 = User.objects.create(username="user2") self.group1 = Group.objects.create(name="group1") self.group2 = Group.objects.create(name="group2") patcher = mock.patch("documents.bulk_edit.bulk_update_documents.delay") self.async_task = patcher.start() self.addCleanup(patcher.stop) self.c1 = Correspondent.objects.create(name="c1") self.c2 = Correspondent.objects.create(name="c2") self.dt1 = DocumentType.objects.create(name="dt1") self.dt2 = DocumentType.objects.create(name="dt2") self.t1 = Tag.objects.create(name="t1") self.t2 = Tag.objects.create(name="t2") self.doc1 = Document.objects.create(checksum="A", title="A") self.doc2 = Document.objects.create( checksum="B", title="B", correspondent=self.c1, document_type=self.dt1, ) self.doc3 = Document.objects.create( checksum="C", title="C", correspondent=self.c2, document_type=self.dt2, ) self.doc4 = Document.objects.create(checksum="D", title="D") self.doc5 = Document.objects.create(checksum="E", title="E") self.doc2.tags.add(self.t1) self.doc3.tags.add(self.t2) self.doc4.tags.add(self.t1, self.t2) self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}") def test_set_correspondent(self): self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1) bulk_edit.set_correspondent( [self.doc1.id, self.doc2.id, self.doc3.id], self.c2.id, ) self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 3) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id]) def test_unset_correspondent(self): self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1) bulk_edit.set_correspondent([self.doc1.id, self.doc2.id, self.doc3.id], None) self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 0) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id]) def test_set_document_type(self): self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1) bulk_edit.set_document_type( [self.doc1.id, self.doc2.id, self.doc3.id], self.dt2.id, ) self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 3) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id]) def test_unset_document_type(self): self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1) bulk_edit.set_document_type([self.doc1.id, self.doc2.id, self.doc3.id], None) self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 0) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id]) def test_set_document_storage_path(self): """ GIVEN: - 5 documents without defined storage path WHEN: - Bulk edit called to add storage path to 1 document THEN: - Single document storage path update """ self.assertEqual(Document.objects.filter(storage_path=None).count(), 5) bulk_edit.set_storage_path( [self.doc1.id], self.sp1.id, ) self.assertEqual(Document.objects.filter(storage_path=None).count(), 4) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc1.id]) def test_unset_document_storage_path(self): """ GIVEN: - 4 documents without defined storage path - 1 document with a defined storage WHEN: - Bulk edit called to remove storage path from 1 document THEN: - Single document storage path removed """ self.assertEqual(Document.objects.filter(storage_path=None).count(), 5) bulk_edit.set_storage_path( [self.doc1.id], self.sp1.id, ) self.assertEqual(Document.objects.filter(storage_path=None).count(), 4) bulk_edit.set_storage_path( [self.doc1.id], None, ) self.assertEqual(Document.objects.filter(storage_path=None).count(), 5) self.async_task.assert_called() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc1.id]) def test_add_tag(self): self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2) bulk_edit.add_tag( [self.doc1.id, self.doc2.id, self.doc3.id, self.doc4.id], self.t1.id, ) self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 4) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc3.id]) def test_remove_tag(self): self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2) bulk_edit.remove_tag([self.doc1.id, self.doc3.id, self.doc4.id], self.t1.id) self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 1) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args self.assertCountEqual(kwargs["document_ids"], [self.doc4.id]) def test_modify_tags(self): tag_unrelated = Tag.objects.create(name="unrelated") self.doc2.tags.add(tag_unrelated) self.doc3.tags.add(tag_unrelated) bulk_edit.modify_tags( [self.doc2.id, self.doc3.id], add_tags=[self.t2.id], remove_tags=[self.t1.id], ) self.assertCountEqual(list(self.doc2.tags.all()), [self.t2, tag_unrelated]) self.assertCountEqual(list(self.doc3.tags.all()), [self.t2, tag_unrelated]) self.async_task.assert_called_once() args, kwargs = self.async_task.call_args # TODO: doc3 should not be affected, but the query for that is rather complicated self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id]) def test_delete(self): self.assertEqual(Document.objects.count(), 5) bulk_edit.delete([self.doc1.id, self.doc2.id]) self.assertEqual(Document.objects.count(), 3) self.assertCountEqual( [doc.id for doc in Document.objects.all()], [self.doc3.id, self.doc4.id, self.doc5.id], ) @mock.patch("documents.tasks.bulk_update_documents.delay") def test_set_permissions(self, m): doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id] assign_perm("view_document", self.group1, self.doc1) permissions = { "view": { "users": [self.user1.id, self.user2.id], "groups": [self.group2.id], }, "change": { "users": [self.user1.id], "groups": [self.group2.id], }, } bulk_edit.set_permissions( doc_ids, set_permissions=permissions, owner=self.owner, merge=False, ) m.assert_called_once() self.assertEqual(Document.objects.filter(owner=self.owner).count(), 3) self.assertEqual(Document.objects.filter(id__in=doc_ids).count(), 3) users_with_perms = get_users_with_perms( self.doc1, ) self.assertEqual(users_with_perms.count(), 2) # group1 should be replaced by group2 groups_with_perms = get_groups_with_perms( self.doc1, ) self.assertEqual(groups_with_perms.count(), 1) @mock.patch("documents.tasks.bulk_update_documents.delay") def test_set_permissions_merge(self, m): doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id] self.doc1.owner = self.user1 self.doc1.save() assign_perm("view_document", self.user1, self.doc1) assign_perm("view_document", self.group1, self.doc1) permissions = { "view": { "users": [self.user2.id], "groups": [self.group2.id], }, "change": { "users": [self.user2.id], "groups": [self.group2.id], }, } bulk_edit.set_permissions( doc_ids, set_permissions=permissions, owner=self.owner, merge=True, ) m.assert_called_once() # when merge is true owner doesn't get replaced if its not empty self.assertEqual(Document.objects.filter(owner=self.owner).count(), 2) self.assertEqual(Document.objects.filter(id__in=doc_ids).count(), 3) # merge of user1 which was pre-existing and user2 users_with_perms = get_users_with_perms( self.doc1, ) self.assertEqual(users_with_perms.count(), 2) # group1 should be merged by group2 groups_with_perms = get_groups_with_perms( self.doc1, ) self.assertEqual(groups_with_perms.count(), 2)