Skip to content

Commit 302cea6

Browse files
committed
Use new Celery, use new application pattern
* Use modern celery * Drop djcelery * New pattern for starting celery * Bump redis to 2.10.6 to avoid startup bug, change autodiscover call
1 parent 2ac95a9 commit 302cea6

File tree

14 files changed

+85
-59
lines changed

14 files changed

+85
-59
lines changed

readthedocs/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .tasks import app

readthedocs/core/management/commands/update_api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ def handle(self, *args, **options):
3333
project_data = api.project(slug).get()
3434
p = APIProject(**project_data)
3535
log.info("Building %s", p)
36-
tasks.update_docs.run(pk=p.pk, docker=docker)
36+
update_docs = tasks.UpdateDocsTask()
37+
update_docs.run(pk=p.pk, docker=docker)

readthedocs/core/management/commands/update_repos.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@ def handle(self, *args, **options):
5454
for version in Version.objects.filter(project__slug=slug,
5555
active=True,
5656
uploaded=False):
57-
tasks.update_docs.run(pk=version.project_id,
58-
record=False,
59-
version_pk=version.pk)
57+
tasks.UpdateDocsTask().run(
58+
pk=version.project_id,
59+
record=False,
60+
version_pk=version.pk
61+
)
6062
else:
6163
p = Project.all_objects.get(slug=slug)
6264
log.info("Building %s", p)
@@ -66,12 +68,17 @@ def handle(self, *args, **options):
6668
log.info("Updating all versions")
6769
for version in Version.objects.filter(active=True,
6870
uploaded=False):
69-
tasks.update_docs.run(pk=version.project_id,
70-
record=record,
71-
force=force,
72-
version_pk=version.pk)
71+
tasks.UpdateDocsTask().run(
72+
pk=version.project_id,
73+
record=record,
74+
force=force,
75+
version_pk=version.pk
76+
)
7377
else:
7478
log.info("Updating all docs")
7579
for project in Project.objects.all():
76-
tasks.update_docs.run(pk=project.pk, record=record,
77-
force=force)
80+
tasks.UpdateDocsTask().run(
81+
pk=project.pk,
82+
record=record,
83+
force=force
84+
)

readthedocs/core/management/commands/update_versions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from django.core.management.base import BaseCommand
55

66
from readthedocs.builds.models import Version
7-
from readthedocs.projects.tasks import update_docs
7+
from readthedocs.projects.tasks import UpdateDocsTask
88

99

1010
class Command(BaseCommand):
@@ -13,5 +13,6 @@ class Command(BaseCommand):
1313

1414
def handle(self, *args, **options):
1515
for version in Version.objects.filter(active=True, built=False):
16+
update_docs = UpdateDocsTask()
1617
update_docs.run(version.project_id, record=False,
1718
version_pk=version.pk)

readthedocs/core/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import absolute_import
44
import logging
55

6-
from celery import task
6+
from celery import shared_task
77
from django.conf import settings
88
from django.core.mail import EmailMultiAlternatives
99
from django.template.loader import get_template
@@ -15,7 +15,7 @@
1515
EMAIL_TIME_LIMIT = 30
1616

1717

18-
@task(queue='web', time_limit=EMAIL_TIME_LIMIT)
18+
@shared_task(queue='web', time_limit=EMAIL_TIME_LIMIT)
1919
def send_email_task(recipient, subject, template, template_html, context=None):
2020
"""Send multipart email
2121

readthedocs/core/utils/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def trigger_build(project, version=None, record=True, force=False, basic=False):
7777
will be prefixed with ``build-`` to unify build queue names.
7878
"""
7979
# Avoid circular import
80-
from readthedocs.projects.tasks import update_docs
80+
from readthedocs.projects.tasks import UpdateDocsTask
8181
from readthedocs.builds.models import Build
8282

8383
if project.skip:
@@ -121,6 +121,7 @@ def trigger_build(project, version=None, record=True, force=False, basic=False):
121121
options['soft_time_limit'] = time_limit
122122
options['time_limit'] = int(time_limit * 1.2)
123123

124+
update_docs = UpdateDocsTask()
124125
update_docs.apply_async(kwargs=kwargs, **options)
125126

126127
return build

readthedocs/core/utils/tasks/retrieve.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Utilities for retrieving task data."""
22

33
from __future__ import absolute_import
4-
from djcelery import celery as celery_app
54
from celery.result import AsyncResult
65

76

@@ -20,14 +19,16 @@ def get_task_data(task_id):
2019
2120
meta data has no ``'task_name'`` key set.
2221
"""
22+
from readthedocs.worker import app
23+
2324
result = AsyncResult(task_id)
2425
state, info = result.state, result.info
2526
if state == 'PENDING':
2627
raise TaskNotFound(task_id)
2728
if 'task_name' not in info:
2829
raise TaskNotFound(task_id)
2930
try:
30-
task = celery_app.tasks[info['task_name']]
31+
task = app.tasks[info['task_name']]
3132
except KeyError:
3233
raise TaskNotFound(task_id)
3334
return task, state, info

readthedocs/oauth/tasks.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from __future__ import absolute_import
44
from django.contrib.auth.models import User
5-
from djcelery import celery as celery_app
65

76
from readthedocs.core.utils.tasks import PublicTask
87
from readthedocs.core.utils.tasks import permission_check
@@ -22,4 +21,4 @@ def run_public(self, user_id):
2221
service.sync()
2322

2423

25-
sync_remote_repositories = celery_app.tasks[SyncRemoteRepositories.name]
24+
sync_remote_repositories = SyncRemoteRepositories()

readthedocs/projects/tasks.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import requests
1818
from builtins import str
19-
from celery import task, Task
19+
from celery import shared_task, Task
2020
from celery.exceptions import SoftTimeLimitExceeded
2121
from django.conf import settings
2222
from django.core.urlresolvers import reverse
@@ -53,6 +53,7 @@
5353
from readthedocs.search.parse_json import process_all_json_files
5454
from readthedocs.search.utils import process_mkdocs_json
5555
from readthedocs.vcs_support import utils as vcs_support_utils
56+
from readthedocs.worker import app
5657

5758

5859
log = logging.getLogger(__name__)
@@ -80,7 +81,7 @@ class UpdateDocsTask(Task):
8081

8182
max_retries = 5
8283
default_retry_delay = (7 * 60)
83-
name = 'update_docs'
84+
name = __name__ + '.update_docs'
8485

8586
def __init__(self, build_env=None, python_env=None, config=None,
8687
force=False, search=True, localmedia=True,
@@ -490,10 +491,10 @@ def send_notifications(self):
490491
send_notifications.delay(self.version.pk, build_pk=self.build['id'])
491492

492493

493-
update_docs = celery_app.tasks[UpdateDocsTask.name]
494+
app.register_task(UpdateDocsTask())
494495

495496

496-
@task()
497+
@shared_task()
497498
def update_imported_docs(version_pk):
498499
"""
499500
Check out or update the given project's repository
@@ -572,7 +573,7 @@ def update_imported_docs(version_pk):
572573

573574

574575
# Web tasks
575-
@task(queue='web')
576+
@shared_task(queue='web')
576577
def sync_files(project_pk, version_pk, hostname=None, html=False,
577578
localmedia=False, search=False, pdf=False, epub=False):
578579
"""Sync build artifacts to application instances
@@ -604,7 +605,7 @@ def sync_files(project_pk, version_pk, hostname=None, html=False,
604605
update_static_metadata(project_pk)
605606

606607

607-
@task(queue='web')
608+
@shared_task(queue='web')
608609
def move_files(version_pk, hostname, html=False, localmedia=False, search=False,
609610
pdf=False, epub=False):
610611
"""Task to move built documentation to web servers
@@ -670,7 +671,7 @@ def move_files(version_pk, hostname, html=False, localmedia=False, search=False,
670671
Syncer.copy(from_path, to_path, host=hostname)
671672

672673

673-
@task(queue='web')
674+
@shared_task(queue='web')
674675
def update_search(version_pk, commit, delete_non_commit_files=True):
675676
"""Task to update search indexes
676677
@@ -705,15 +706,15 @@ def update_search(version_pk, commit, delete_non_commit_files=True):
705706
)
706707

707708

708-
@task(queue='web')
709+
@shared_task(queue='web')
709710
def symlink_project(project_pk):
710711
project = Project.objects.get(pk=project_pk)
711712
for symlink in [PublicSymlink, PrivateSymlink]:
712713
sym = symlink(project=project)
713714
sym.run()
714715

715716

716-
@task(queue='web')
717+
@shared_task(queue='web')
717718
def symlink_domain(project_pk, domain_pk, delete=False):
718719
project = Project.objects.get(pk=project_pk)
719720
domain = Domain.objects.get(pk=domain_pk)
@@ -725,15 +726,15 @@ def symlink_domain(project_pk, domain_pk, delete=False):
725726
sym.symlink_cnames(domain)
726727

727728

728-
@task(queue='web')
729+
@shared_task(queue='web')
729730
def symlink_subproject(project_pk):
730731
project = Project.objects.get(pk=project_pk)
731732
for symlink in [PublicSymlink, PrivateSymlink]:
732733
sym = symlink(project=project)
733734
sym.symlink_subprojects()
734735

735736

736-
@task(queue='web')
737+
@shared_task(queue='web')
737738
def fileify(version_pk, commit):
738739
"""
739740
Create ImportedFile objects for all of a version's files.
@@ -809,7 +810,7 @@ def _manage_imported_files(version, path, commit):
809810
purge(cdn_ids[version.project.slug], changed_files)
810811

811812

812-
@task(queue='web')
813+
@shared_task(queue='web')
813814
def send_notifications(version_pk, build_pk):
814815
version = Version.objects.get(pk=version_pk)
815816
build = Build.objects.get(pk=build_pk)
@@ -878,7 +879,7 @@ def webhook_notification(version, build, hook_url):
878879
requests.post(hook_url, data=data)
879880

880881

881-
@task(queue='web')
882+
@shared_task(queue='web')
882883
def update_static_metadata(project_pk, path=None):
883884
"""Update static metadata JSON file
884885
@@ -924,7 +925,7 @@ def update_static_metadata(project_pk, path=None):
924925

925926

926927
# Random Tasks
927-
@task()
928+
@shared_task()
928929
def remove_dir(path):
929930
"""
930931
Remove a directory on the build/celery server.
@@ -936,7 +937,7 @@ def remove_dir(path):
936937
shutil.rmtree(path, ignore_errors=True)
937938

938939

939-
@task()
940+
@shared_task()
940941
def clear_artifacts(version_pk):
941942
"""Remove artifacts from the web servers"""
942943
version = Version.objects.get(pk=version_pk)
@@ -946,31 +947,31 @@ def clear_artifacts(version_pk):
946947
clear_html_artifacts(version)
947948

948949

949-
@task()
950+
@shared_task()
950951
def clear_pdf_artifacts(version):
951952
if isinstance(version, int):
952953
version = Version.objects.get(pk=version)
953954
remove_dir(version.project.get_production_media_path(
954955
type_='pdf', version_slug=version.slug))
955956

956957

957-
@task()
958+
@shared_task()
958959
def clear_epub_artifacts(version):
959960
if isinstance(version, int):
960961
version = Version.objects.get(pk=version)
961962
remove_dir(version.project.get_production_media_path(
962963
type_='epub', version_slug=version.slug))
963964

964965

965-
@task()
966+
@shared_task()
966967
def clear_htmlzip_artifacts(version):
967968
if isinstance(version, int):
968969
version = Version.objects.get(pk=version)
969970
remove_dir(version.project.get_production_media_path(
970971
type_='htmlzip', version_slug=version.slug))
971972

972973

973-
@task()
974+
@shared_task()
974975
def clear_html_artifacts(version):
975976
if isinstance(version, int):
976977
version = Version.objects.get(pk=version)

readthedocs/rtd_tests/tests/test_celery.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ def test_update_docs(self):
7272
build = get(Build, project=self.project,
7373
version=self.project.versions.first())
7474
with mock_api(self.repo) as mapi:
75-
result = tasks.update_docs.delay(
75+
update_docs = tasks.UpdateDocsTask()
76+
result = update_docs.delay(
7677
self.project.pk,
7778
build_pk=build.pk,
7879
record=False,
@@ -89,7 +90,8 @@ def test_update_docs_unexpected_setup_exception(self, mock_update_build, mock_se
8990
build = get(Build, project=self.project,
9091
version=self.project.versions.first())
9192
with mock_api(self.repo) as mapi:
92-
result = tasks.update_docs.delay(
93+
update_docs = tasks.UpdateDocsTask()
94+
result = update_docs.delay(
9395
self.project.pk,
9496
build_pk=build.pk,
9597
record=False,
@@ -107,7 +109,8 @@ def test_update_docs_unexpected_build_exception(self, mock_update_build, mock_bu
107109
build = get(Build, project=self.project,
108110
version=self.project.versions.first())
109111
with mock_api(self.repo) as mapi:
110-
result = tasks.update_docs.delay(
112+
update_docs = tasks.UpdateDocsTask()
113+
result = update_docs.delay(
111114
self.project.pk,
112115
build_pk=build.pk,
113116
record=False,

readthedocs/rtd_tests/tests/test_core_utils.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ def setUp(self):
1818
self.project = get(Project, container_time_limit=None)
1919
self.version = get(Version, project=self.project)
2020

21-
@mock.patch('readthedocs.projects.tasks.update_docs')
21+
@mock.patch('readthedocs.projects.tasks.UpdateDocsTask')
2222
def test_trigger_build_time_limit(self, update_docs):
2323
"""Pass of time limit"""
2424
trigger_build(project=self.project, version=self.version)
25-
update_docs.assert_has_calls([
26-
mock.call.apply_async(
25+
update_docs().apply_async.assert_has_calls([
26+
mock.call(
2727
time_limit=720,
2828
soft_time_limit=600,
2929
queue=mock.ANY,
@@ -38,13 +38,13 @@ def test_trigger_build_time_limit(self, update_docs):
3838
)
3939
])
4040

41-
@mock.patch('readthedocs.projects.tasks.update_docs')
41+
@mock.patch('readthedocs.projects.tasks.UpdateDocsTask')
4242
def test_trigger_build_invalid_time_limit(self, update_docs):
4343
"""Time limit as string"""
4444
self.project.container_time_limit = '200s'
4545
trigger_build(project=self.project, version=self.version)
46-
update_docs.assert_has_calls([
47-
mock.call.apply_async(
46+
update_docs().apply_async.assert_has_calls([
47+
mock.call(
4848
time_limit=720,
4949
soft_time_limit=600,
5050
queue=mock.ANY,
@@ -59,13 +59,13 @@ def test_trigger_build_invalid_time_limit(self, update_docs):
5959
)
6060
])
6161

62-
@mock.patch('readthedocs.projects.tasks.update_docs')
62+
@mock.patch('readthedocs.projects.tasks.UpdateDocsTask')
6363
def test_trigger_build_rounded_time_limit(self, update_docs):
6464
"""Time limit should round down"""
6565
self.project.container_time_limit = 3
6666
trigger_build(project=self.project, version=self.version)
67-
update_docs.assert_has_calls([
68-
mock.call.apply_async(
67+
update_docs().apply_async.assert_has_calls([
68+
mock.call(
6969
time_limit=3,
7070
soft_time_limit=3,
7171
queue=mock.ANY,

0 commit comments

Comments
 (0)