Test coverage
This commit is contained in:
parent
603f1397a8
commit
fe480432cf
@ -415,6 +415,11 @@ describe('SettingsComponent', () => {
|
|||||||
completeSetup()
|
completeSetup()
|
||||||
expect(component['systemStatus']).toEqual(status) // private
|
expect(component['systemStatus']).toEqual(status) // private
|
||||||
expect(component.systemStatusHasErrors).toBeTruthy()
|
expect(component.systemStatusHasErrors).toBeTruthy()
|
||||||
|
// coverage
|
||||||
|
component['systemStatus'].database.status = PaperlessConnectionStatus.OK
|
||||||
|
component['systemStatus'].tasks.redis_status = PaperlessConnectionStatus.OK
|
||||||
|
component['systemStatus'].tasks.celery_status = PaperlessConnectionStatus.OK
|
||||||
|
expect(component.systemStatusHasErrors).toBeFalsy()
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should open system status dialog', () => {
|
it('should open system status dialog', () => {
|
||||||
|
@ -1,3 +1,6 @@
|
|||||||
|
import os
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
from rest_framework import status
|
from rest_framework import status
|
||||||
from rest_framework.test import APITestCase
|
from rest_framework.test import APITestCase
|
||||||
@ -8,15 +11,13 @@ from paperless import version
|
|||||||
class TestSystemStatusView(APITestCase):
|
class TestSystemStatusView(APITestCase):
|
||||||
ENDPOINT = "/api/status/"
|
ENDPOINT = "/api/status/"
|
||||||
|
|
||||||
def test_system_status_insufficient_permissions(self):
|
def setUp(self):
|
||||||
response = self.client.get(self.ENDPOINT)
|
self.user = User.objects.create_superuser(
|
||||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
||||||
|
|
||||||
def test_system_status(self):
|
|
||||||
user = User.objects.create_superuser(
|
|
||||||
username="temp_admin",
|
username="temp_admin",
|
||||||
)
|
)
|
||||||
self.client.force_login(user)
|
|
||||||
|
def test_system_status(self):
|
||||||
|
self.client.force_login(self.user)
|
||||||
response = self.client.get(self.ENDPOINT)
|
response = self.client.get(self.ENDPOINT)
|
||||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||||
self.assertEqual(response.data["pngx_version"], version.__full_version_str__)
|
self.assertEqual(response.data["pngx_version"], version.__full_version_str__)
|
||||||
@ -32,3 +33,44 @@ class TestSystemStatusView(APITestCase):
|
|||||||
self.assertEqual(response.data["tasks"]["redis_url"], "redis://localhost:6379")
|
self.assertEqual(response.data["tasks"]["redis_url"], "redis://localhost:6379")
|
||||||
self.assertEqual(response.data["tasks"]["redis_status"], "ERROR")
|
self.assertEqual(response.data["tasks"]["redis_status"], "ERROR")
|
||||||
self.assertIsNotNone(response.data["tasks"]["redis_error"])
|
self.assertIsNotNone(response.data["tasks"]["redis_error"])
|
||||||
|
|
||||||
|
def test_system_status_insufficient_permissions(self):
|
||||||
|
response = self.client.get(self.ENDPOINT)
|
||||||
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||||
|
normal_user = User.objects.create_user(username="normal_user")
|
||||||
|
self.client.force_login(normal_user)
|
||||||
|
response = self.client.get(self.ENDPOINT)
|
||||||
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||||
|
|
||||||
|
def test_system_status_container_detection(self):
|
||||||
|
self.client.force_login(self.user)
|
||||||
|
os.environ["PNGX_CONTAINERIZED"] = "1"
|
||||||
|
response = self.client.get(self.ENDPOINT)
|
||||||
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||||
|
self.assertEqual(response.data["install_type"], "docker")
|
||||||
|
os.environ["KUBERNETES_SERVICE_HOST"] = "http://localhost"
|
||||||
|
response = self.client.get(self.ENDPOINT)
|
||||||
|
self.assertEqual(response.data["install_type"], "kubernetes")
|
||||||
|
|
||||||
|
class MockRedis:
|
||||||
|
def from_url(self, url):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def ping(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
@mock.patch("redis.Redis")
|
||||||
|
def test_system_status_redis_ping(self, mock_ping):
|
||||||
|
mock_ping.return_value = self.MockRedis()
|
||||||
|
self.client.force_login(self.user)
|
||||||
|
response = self.client.get(self.ENDPOINT)
|
||||||
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||||
|
self.assertEqual(response.data["tasks"]["redis_status"], "OK")
|
||||||
|
|
||||||
|
@mock.patch("celery.app.control.Inspect.ping")
|
||||||
|
def test_system_status_celery_ping(self, mock_ping):
|
||||||
|
mock_ping.return_value = {"hostname": {"ok": "pong"}}
|
||||||
|
self.client.force_login(self.user)
|
||||||
|
response = self.client.get(self.ENDPOINT)
|
||||||
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||||
|
self.assertEqual(response.data["tasks"]["celery_status"], "OK")
|
||||||
|
@ -1562,23 +1562,26 @@ class SystemStatusView(GenericAPIView, PassUserMixin):
|
|||||||
elif os.environ.get("PNGX_CONTAINERIZED") == "1":
|
elif os.environ.get("PNGX_CONTAINERIZED") == "1":
|
||||||
install_type = "docker"
|
install_type = "docker"
|
||||||
|
|
||||||
media_stats = os.statvfs(settings.MEDIA_ROOT)
|
|
||||||
|
|
||||||
db_conn = connections["default"]
|
db_conn = connections["default"]
|
||||||
db_url = db_conn.settings_dict["NAME"]
|
db_url = db_conn.settings_dict["NAME"]
|
||||||
loader = MigrationLoader(connection=db_conn)
|
|
||||||
all_migrations = [f"{app}.{name}" for app, name in loader.graph.nodes]
|
|
||||||
applied_migrations = [
|
|
||||||
f"{m.app}.{m.name}"
|
|
||||||
for m in MigrationRecorder.Migration.objects.all().order_by("id")
|
|
||||||
]
|
|
||||||
db_error = None
|
db_error = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
db_conn.cursor()
|
db_conn.ensure_connection()
|
||||||
db_status = "OK"
|
db_status = "OK"
|
||||||
except Exception as e:
|
loader = MigrationLoader(connection=db_conn)
|
||||||
|
all_migrations = [f"{app}.{name}" for app, name in loader.graph.nodes]
|
||||||
|
applied_migrations = [
|
||||||
|
f"{m.app}.{m.name}"
|
||||||
|
for m in MigrationRecorder.Migration.objects.all().order_by("id")
|
||||||
|
]
|
||||||
|
except Exception as e: # pragma: no cover
|
||||||
|
applied_migrations = []
|
||||||
db_status = "ERROR"
|
db_status = "ERROR"
|
||||||
db_error = str(e)
|
logger.exception(f"System status error connecting to database: {e}")
|
||||||
|
db_error = "Error connecting to database, check logs for more detail."
|
||||||
|
|
||||||
|
media_stats = os.statvfs(settings.MEDIA_ROOT)
|
||||||
|
|
||||||
redis_url = settings._CELERY_REDIS_URL
|
redis_url = settings._CELERY_REDIS_URL
|
||||||
redis_error = None
|
redis_error = None
|
||||||
@ -1588,7 +1591,8 @@ class SystemStatusView(GenericAPIView, PassUserMixin):
|
|||||||
redis_status = "OK"
|
redis_status = "OK"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
redis_status = "ERROR"
|
redis_status = "ERROR"
|
||||||
redis_error = str(e)
|
logger.exception(f"System status error connecting to redis: {e}")
|
||||||
|
redis_error = "Error connecting to redis, check logs for more detail."
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ping = celery_app.control.inspect().ping()
|
ping = celery_app.control.inspect().ping()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user