Skip to content

Celery: cheat job_status view to return finished after 5 polls #10107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 19 additions & 35 deletions readthedocs/api/v2/views/task_views.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,45 @@
"""Endpoints relating to task/job status, etc."""

import structlog

from django.core.cache import cache
from django.urls import reverse
from redis import ConnectionError
from rest_framework import decorators, permissions
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response

from readthedocs.core.utils.tasks import TaskNoPermission, get_public_task_data
from readthedocs.oauth import tasks


log = structlog.get_logger(__name__)

SUCCESS_STATES = ('SUCCESS',)
FAILURE_STATES = (
'FAILURE',
'REVOKED',
)
FINISHED_STATES = SUCCESS_STATES + FAILURE_STATES
STARTED_STATES = ('RECEIVED', 'STARTED', 'RETRY') + FINISHED_STATES


def get_status_data(task_name, state, data, error=None):
data = {
'name': task_name,
'data': data,
'started': state in STARTED_STATES,
'finished': state in FINISHED_STATES,
# When an exception is raised inside the task, we keep this as SUCCESS
# and add the exception message into the 'error' key
'success': state in SUCCESS_STATES and error is None,
}
if error is not None:
data['error'] = error
return data


@decorators.api_view(['GET'])
@decorators.permission_classes((permissions.AllowAny,))
@decorators.renderer_classes((JSONRenderer,))
def job_status(request, task_id):
try:
task_name, state, public_data, error = get_public_task_data(
request,
task_id,
)
except (TaskNoPermission, ConnectionError):
return Response(get_status_data('unknown', 'PENDING', {}),)
return Response(get_status_data(task_name, state, public_data, error),)
"""Retrieve Celery task function state from frontend."""
# HACK: always poll up to N times and after that return the sync has
# finished. This is a way to avoid re-enabling Celery result backend for now.
# TODO remove this API and RemoteRepo sync UI when we have better auto syncing
poll_n = cache.get(task_id, 0)
poll_n += 1
cache.set(task_id, poll_n, 5 * 60)
finished = poll_n == 5

data = {
"name": "sync_remote_repositories",
"data": {},
"started": True,
"finished": finished,
"success": finished,
}
return Response(data)


@decorators.api_view(['POST'])
@decorators.permission_classes((permissions.IsAuthenticated,))
@decorators.renderer_classes((JSONRenderer,))
def sync_remote_repositories(request):
"""Trigger a re-sync of remote repositories for the user."""
result = tasks.sync_remote_repositories.delay(user_id=request.user.id,)
task_id = result.task_id
return Response({
Expand Down
2 changes: 0 additions & 2 deletions readthedocs/core/utils/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,4 @@
)
from .public import PublicTask # noqa
from .public import TaskNoPermission # noqa
from .public import get_public_task_data # noqa
from .retrieve import TaskNotFound # noqa
from .retrieve import get_task_data # noqa
41 changes: 3 additions & 38 deletions readthedocs/core/utils/tasks/public.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
# -*- coding: utf-8 -*-

"""Celery tasks with publicly viewable status."""

from celery import Task, states
from django.conf import settings

from .retrieve import TaskNotFound, get_task_data


__all__ = (
'PublicTask',
'TaskNoPermission',
'get_public_task_data',
)

STATUS_UPDATES_ENABLED = not settings.CELERY_ALWAYS_EAGER


# pylint: disable=abstract-method
# pylint: disable=broad-except
# pylint: disable=invalid-name
class PublicTask(Task):

"""
Expand Down Expand Up @@ -121,35 +118,3 @@ def __init__(self, task_id, *args, **kwargs):
id=task_id,
)
super().__init__(message, *args, **kwargs)


def get_public_task_data(request, task_id):
"""
Return task details as tuple.

Will raise `TaskNoPermission` if `request` has no permission to access info
of the task with id `task_id`. This is also the case of no task with the
given id exists.

:returns: (task name, task state, public data, error message)
:rtype: (str, str, dict, str)
"""
try:
task, state, info = get_task_data(task_id)
except TaskNotFound:
# No task info has been found act like we don't have permission to see
# the results.
raise TaskNoPermission(task_id)

if not hasattr(task, 'check_permission'):
raise TaskNoPermission(task_id)

context = info.get('context', {})
if not task.check_permission(request, state, context):
raise TaskNoPermission(task_id)
return (
task.name,
state,
info.get('public_data', {}),
info.get('error', None),
)
27 changes: 1 addition & 26 deletions readthedocs/core/utils/tasks/retrieve.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,12 @@
# -*- coding: utf-8 -*-

"""Utilities for retrieving task data."""

from celery import states
from celery.result import AsyncResult


__all__ = ('TaskNotFound', 'get_task_data')
__all__ = ("TaskNotFound",)


class TaskNotFound(Exception):

def __init__(self, task_id, *args, **kwargs):
message = 'No public task found with id {id}'.format(id=task_id)
super().__init__(message, *args, **kwargs)


def get_task_data(task_id):
"""
Will raise `TaskNotFound` if the task is in state ``PENDING`` or the task.

meta data has no ``'task_name'`` key set.
"""
from readthedocs.worker import app

result = AsyncResult(task_id)
state, info = result.state, result.info
if state == states.PENDING:
raise TaskNotFound(task_id)
if 'task_name' not in info:
raise TaskNotFound(task_id)
try:
task = app.tasks[info['task_name']]
except KeyError:
raise TaskNotFound(task_id)
return task, state, info
22 changes: 0 additions & 22 deletions readthedocs/rtd_tests/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
GitHubWebhookView,
GitLabWebhookView,
)
from readthedocs.api.v2.views.task_views import get_status_data
from readthedocs.builds.constants import (
BUILD_STATE_CLONING,
BUILD_STATE_FINISHED,
Expand Down Expand Up @@ -2545,24 +2544,3 @@ def test_modify_version(self):
self.assertEqual(resp.data['has_pdf'], True)
self.assertEqual(resp.data['has_epub'], False)
self.assertEqual(resp.data['has_htmlzip'], False)


class TaskViewsTests(TestCase):

def test_get_status_data(self):
data = get_status_data(
'public_task_exception',
'SUCCESS',
{'data': 'public'},
'Something bad happened',
)
self.assertEqual(
data, {
'name': 'public_task_exception',
'data': {'data': 'public'},
'started': True,
'finished': True,
'success': False,
'error': 'Something bad happened',
},
)
16 changes: 0 additions & 16 deletions readthedocs/rtd_tests/tests/test_privacy_urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
RegexAutomationRule,
VersionAutomationRule,
)
from readthedocs.core.utils.tasks import TaskNoPermission
from readthedocs.integrations.models import HttpExchange, Integration
from readthedocs.oauth.models import RemoteOrganization, RemoteRepository
from readthedocs.projects.models import Domain, EnvironmentVariable, Project, WebHook
Expand Down Expand Up @@ -452,21 +451,6 @@ def setUp(self):
}


class APIUnauthAccessTest(APIMixin, TestCase):

@mock.patch('readthedocs.api.v2.views.task_views.get_public_task_data')
def test_api_urls(self, get_public_task_data):
from readthedocs.api.v2.urls import urlpatterns
get_public_task_data.side_effect = TaskNoPermission('Nope')
self._test_url(urlpatterns)

def login(self):
pass

def is_admin(self):
return False


class PublicUserProfileMixin(URLAccessMixin):

def setUp(self):
Expand Down