Fix tests, refactoring

This commit is contained in:
shamoon 2024-10-05 21:17:24 -07:00
parent be970396b3
commit f8ef338c86
3 changed files with 58 additions and 51 deletions

View File

@ -11,7 +11,6 @@ from fnmatch import fnmatch
from pathlib import Path
from typing import TYPE_CHECKING
import httpx
import magic
import pathvalidate
from celery import chord
@ -44,6 +43,7 @@ from documents.tasks import consume_file
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.models import ProcessedMail
from paperless_mail.oauth import refresh_oauth_token
from paperless_mail.preprocessor import MailMessageDecryptor
from paperless_mail.preprocessor import MailMessagePreprocessor
@ -422,54 +422,6 @@ def get_mailbox(server, port, security) -> MailBox:
return mailbox
def refresh_oauth_token(account: MailAccount) -> bool:
"""
Refreshes the oauth token for the given mail account.
"""
logger = logging.getLogger("paperless_mail")
logger.debug(f"Attempting to refresh oauth token for account {account}")
if not account.refresh_token:
logger.error(f"Account {account}: No refresh token available.")
return False
if account.account_type == MailAccount.MailAccountType.GMAIL_OAUTH:
url = "https://accounts.google.com/o/oauth2/token"
data = {
"client_id": settings.GMAIL_OAUTH_CLIENT_ID,
"client_secret": settings.GMAIL_OAUTH_CLIENT_SECRET,
"refresh_token": account.refresh_token,
"grant_type": "refresh_token",
}
elif account.account_type == MailAccount.MailAccountType.OUTLOOK_OAUTH:
url = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
data = {
"client_id": settings.OUTLOOK_OAUTH_CLIENT_ID,
"client_secret": settings.OUTLOOK_OAUTH_CLIENT_SECRET,
"refresh_token": account.refresh_token,
"grant_type": "refresh_token",
}
response = httpx.post(
url=url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
data = response.json()
if response.status_code < 400 and "access_token" in data:
account.password = data["access_token"]
account.expiration = timezone.now() + timedelta(
seconds=data["expires_in"],
)
account.save()
logger.debug(f"Successfully refreshed oauth token for account {account}")
return True
else:
logger.error(
f"Failed to refresh oauth token for account {account}: {response}",
)
return False
class MailAccountHandler(LoggingMixin):
"""
The main class that handles mail accounts.

View File

@ -1,4 +1,11 @@
import logging
from datetime import timedelta
import httpx
from django.conf import settings
from django.utils import timezone
from paperless_mail.models import MailAccount
# Gmail setup guide: https://postmansmtp.com/how-to-configure-post-smtp-with-gmailgsuite-using-oauth/
# Outlok setup guide: https://medium.com/@manojkumardhakad/python-read-and-send-outlook-mail-using-oauth2-token-and-graph-api-53de606ecfa1
@ -53,3 +60,51 @@ def generate_outlook_token_request_data(code: str) -> dict:
"redirect_uri": "http://localhost:8000/api/oauth/callback/",
"grant_type": "authorization_code",
}
def refresh_oauth_token(account: MailAccount) -> bool:
"""
Refreshes the oauth token for the given mail account.
"""
logger = logging.getLogger("paperless_mail")
logger.debug(f"Attempting to refresh oauth token for account {account}")
if not account.refresh_token:
logger.error(f"Account {account}: No refresh token available.")
return False
if account.account_type == MailAccount.MailAccountType.GMAIL_OAUTH:
url = "https://accounts.google.com/o/oauth2/token"
data = {
"client_id": settings.GMAIL_OAUTH_CLIENT_ID,
"client_secret": settings.GMAIL_OAUTH_CLIENT_SECRET,
"refresh_token": account.refresh_token,
"grant_type": "refresh_token",
}
elif account.account_type == MailAccount.MailAccountType.OUTLOOK_OAUTH:
url = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
data = {
"client_id": settings.OUTLOOK_OAUTH_CLIENT_ID,
"client_secret": settings.OUTLOOK_OAUTH_CLIENT_SECRET,
"refresh_token": account.refresh_token,
"grant_type": "refresh_token",
}
response = httpx.post(
url=url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
data = response.json()
if response.status_code < 400 and "access_token" in data:
account.password = data["access_token"]
account.expiration = timezone.now() + timedelta(
seconds=data["expires_in"],
)
account.save()
logger.debug(f"Successfully refreshed oauth token for account {account}")
return True
else:
logger.error(
f"Failed to refresh oauth token for account {account}: {response}",
)
return False

View File

@ -1645,7 +1645,7 @@ class TestMailAccountTestView(APITestCase):
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.content.decode(), "Unable to connect to server")
@mock.patch("paperless_mail.views.MailAccount.objects.get")
@mock.patch("paperless_mail.models.MailAccount.objects.get")
@mock.patch("paperless_mail.mail.get_mailbox")
@mock.patch("paperless_mail.mail.mailbox_login")
def test_mail_account_test_view_refresh_token(
@ -1668,7 +1668,7 @@ class TestMailAccountTestView(APITestCase):
)
mock_get.return_value = existing_account
with mock.patch("paperless_mail.views.refresh_oauth_token", return_value=True):
with mock.patch("paperless_mail.oauth.refresh_oauth_token", return_value=True):
data = {
"id": existing_account.id,
"imap_server": "imap.example.com",