Skip to content

Use new Celery, use new application pattern #3237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 11, 2017
Merged
4 changes: 2 additions & 2 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ If you add a subproject to a project,
that documentation will also be served under the parent project's subdomain.

For example,
Kombu is a subproject of celery,
Kombu is a subproject of Celery,
so you can access it on the `celery.readthedocs.io` domain:

http://celery.readthedocs.io/projects/kombu/en/latest/
Expand Down Expand Up @@ -204,4 +204,4 @@ file* field.
What commit of Read the Docs is in production?
----------------------------------------------

We deploy readthedocs.org from the `rel` branch in our GitHub repository. You can see the latest commits that have been deployed by looking on GitHub: https://github.com/rtfd/readthedocs.org/commits/rel
We deploy readthedocs.org from the `rel` branch in our GitHub repository. You can see the latest commits that have been deployed by looking on GitHub: https://github.com/rtfd/readthedocs.org/commits/rel
4 changes: 4 additions & 0 deletions readthedocs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Read the Docs"""

# Import the Celery application before anything else happens
from readthedocs.worker import app # noqa
3 changes: 2 additions & 1 deletion readthedocs/core/management/commands/update_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ def handle(self, *args, **options):
project_data = api.project(slug).get()
p = APIProject(**project_data)
log.info("Building %s", p)
tasks.update_docs.run(pk=p.pk, docker=docker)
update_docs = tasks.UpdateDocsTask()
update_docs.run(pk=p.pk, docker=docker)
25 changes: 16 additions & 9 deletions readthedocs/core/management/commands/update_repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ def handle(self, *args, **options):
for version in Version.objects.filter(project__slug=slug,
active=True,
uploaded=False):
tasks.update_docs.run(pk=version.project_id,
record=False,
version_pk=version.pk)
tasks.UpdateDocsTask().run(
pk=version.project_id,
record=False,
version_pk=version.pk
)
else:
p = Project.all_objects.get(slug=slug)
log.info("Building %s", p)
Expand All @@ -66,12 +68,17 @@ def handle(self, *args, **options):
log.info("Updating all versions")
for version in Version.objects.filter(active=True,
uploaded=False):
tasks.update_docs.run(pk=version.project_id,
record=record,
force=force,
version_pk=version.pk)
tasks.UpdateDocsTask().run(
pk=version.project_id,
record=record,
force=force,
version_pk=version.pk
)
else:
log.info("Updating all docs")
for project in Project.objects.all():
tasks.update_docs.run(pk=project.pk, record=record,
force=force)
tasks.UpdateDocsTask().run(
pk=project.pk,
record=record,
force=force
)
3 changes: 2 additions & 1 deletion readthedocs/core/management/commands/update_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.core.management.base import BaseCommand

from readthedocs.builds.models import Version
from readthedocs.projects.tasks import update_docs
from readthedocs.projects.tasks import UpdateDocsTask


class Command(BaseCommand):
Expand All @@ -13,5 +13,6 @@ class Command(BaseCommand):

def handle(self, *args, **options):
for version in Version.objects.filter(active=True, built=False):
update_docs = UpdateDocsTask()
update_docs.run(version.project_id, record=False,
version_pk=version.pk)
5 changes: 3 additions & 2 deletions readthedocs/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
from __future__ import absolute_import
import logging

from celery import task
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.template.loader import get_template
from django.template import TemplateDoesNotExist

from readthedocs.worker import app


log = logging.getLogger(__name__)

EMAIL_TIME_LIMIT = 30


@task(queue='web', time_limit=EMAIL_TIME_LIMIT)
@app.task(queue='web', time_limit=EMAIL_TIME_LIMIT)
def send_email_task(recipient, subject, template, template_html, context=None):
"""Send multipart email

Expand Down
13 changes: 10 additions & 3 deletions readthedocs/core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,15 @@ def broadcast(type, task, args, kwargs=None, callback=None): # pylint: disable=
task_sig = task.s(*args, **kwargs).set(queue=server)
tasks.append(task_sig)
if callback:
task_promise = chord(tasks)(callback).get()
task_promise = chord(tasks, callback).apply_async()
else:
task_promise = group(*tasks).apply_async()
# Celery's Group class does some special handling when an iterable with
# len() == 1 is passed in. This will be hit if there is only one server
# defined in the above queue lists
if len(tasks) > 1:
task_promise = group(*tasks).apply_async()
else:
task_promise = group(tasks).apply_async()
return task_promise


Expand All @@ -77,7 +83,7 @@ def trigger_build(project, version=None, record=True, force=False, basic=False):
will be prefixed with ``build-`` to unify build queue names.
"""
# Avoid circular import
from readthedocs.projects.tasks import update_docs
from readthedocs.projects.tasks import UpdateDocsTask
from readthedocs.builds.models import Build

if project.skip:
Expand Down Expand Up @@ -121,6 +127,7 @@ def trigger_build(project, version=None, record=True, force=False, basic=False):
options['soft_time_limit'] = time_limit
options['time_limit'] = int(time_limit * 1.2)

update_docs = UpdateDocsTask()
update_docs.apply_async(kwargs=kwargs, **options)

return build
Expand Down
5 changes: 3 additions & 2 deletions readthedocs/core/utils/tasks/retrieve.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Utilities for retrieving task data."""

from __future__ import absolute_import
from djcelery import celery as celery_app
from celery.result import AsyncResult


Expand All @@ -20,14 +19,16 @@ def get_task_data(task_id):

meta data has no ``'task_name'`` key set.
"""
from readthedocs.worker import app

result = AsyncResult(task_id)
state, info = result.state, result.info
if state == 'PENDING':
raise TaskNotFound(task_id)
if 'task_name' not in info:
raise TaskNotFound(task_id)
try:
task = celery_app.tasks[info['task_name']]
task = app.tasks[info['task_name']]
except KeyError:
raise TaskNotFound(task_id)
return task, state, info
3 changes: 1 addition & 2 deletions readthedocs/oauth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import absolute_import
from django.contrib.auth.models import User
from djcelery import celery as celery_app

from readthedocs.core.utils.tasks import PublicTask
from readthedocs.core.utils.tasks import permission_check
Expand All @@ -22,4 +21,4 @@ def run_public(self, user_id):
service.sync()


sync_remote_repositories = celery_app.tasks[SyncRemoteRepositories.name]
sync_remote_repositories = SyncRemoteRepositories()
1 change: 1 addition & 0 deletions readthedocs/projects/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default_app_config = 'readthedocs.projects.apps.ProjectsConfig'
12 changes: 12 additions & 0 deletions readthedocs/projects/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Project app config"""

from django.apps import AppConfig


class ProjectsConfig(AppConfig):
name = 'readthedocs.projects'

def ready(self):
from readthedocs.projects.tasks import UpdateDocsTask
from readthedocs.worker import app
app.tasks.register(UpdateDocsTask)
43 changes: 20 additions & 23 deletions readthedocs/projects/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

import requests
from builtins import str
from celery import task, Task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from django.conf import settings
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext_lazy as _
from djcelery import celery as celery_app
from readthedocs_build.config import ConfigError
from slumber.exceptions import HttpClientError

Expand Down Expand Up @@ -53,6 +52,7 @@
from readthedocs.search.parse_json import process_all_json_files
from readthedocs.search.utils import process_mkdocs_json
from readthedocs.vcs_support import utils as vcs_support_utils
from readthedocs.worker import app


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -80,7 +80,7 @@ class UpdateDocsTask(Task):

max_retries = 5
default_retry_delay = (7 * 60)
name = 'update_docs'
name = __name__ + '.update_docs'

def __init__(self, build_env=None, python_env=None, config=None,
force=False, search=True, localmedia=True,
Expand Down Expand Up @@ -490,10 +490,7 @@ def send_notifications(self):
send_notifications.delay(self.version.pk, build_pk=self.build['id'])


update_docs = celery_app.tasks[UpdateDocsTask.name]


@task()
@app.task()
def update_imported_docs(version_pk):
"""
Check out or update the given project's repository
Expand Down Expand Up @@ -572,7 +569,7 @@ def update_imported_docs(version_pk):


# Web tasks
@task(queue='web')
@app.task(queue='web')
def sync_files(project_pk, version_pk, hostname=None, html=False,
localmedia=False, search=False, pdf=False, epub=False):
"""Sync build artifacts to application instances
Expand Down Expand Up @@ -604,7 +601,7 @@ def sync_files(project_pk, version_pk, hostname=None, html=False,
update_static_metadata(project_pk)


@task(queue='web')
@app.task(queue='web')
def move_files(version_pk, hostname, html=False, localmedia=False, search=False,
pdf=False, epub=False):
"""Task to move built documentation to web servers
Expand Down Expand Up @@ -670,7 +667,7 @@ def move_files(version_pk, hostname, html=False, localmedia=False, search=False,
Syncer.copy(from_path, to_path, host=hostname)


@task(queue='web')
@app.task(queue='web')
def update_search(version_pk, commit, delete_non_commit_files=True):
"""Task to update search indexes

Expand Down Expand Up @@ -705,15 +702,15 @@ def update_search(version_pk, commit, delete_non_commit_files=True):
)


@task(queue='web')
@app.task(queue='web')
def symlink_project(project_pk):
project = Project.objects.get(pk=project_pk)
for symlink in [PublicSymlink, PrivateSymlink]:
sym = symlink(project=project)
sym.run()


@task(queue='web')
@app.task(queue='web')
def symlink_domain(project_pk, domain_pk, delete=False):
project = Project.objects.get(pk=project_pk)
domain = Domain.objects.get(pk=domain_pk)
Expand All @@ -725,15 +722,15 @@ def symlink_domain(project_pk, domain_pk, delete=False):
sym.symlink_cnames(domain)


@task(queue='web')
@app.task(queue='web')
def symlink_subproject(project_pk):
project = Project.objects.get(pk=project_pk)
for symlink in [PublicSymlink, PrivateSymlink]:
sym = symlink(project=project)
sym.symlink_subprojects()


@task(queue='web')
@app.task(queue='web')
def fileify(version_pk, commit):
"""
Create ImportedFile objects for all of a version's files.
Expand Down Expand Up @@ -809,7 +806,7 @@ def _manage_imported_files(version, path, commit):
purge(cdn_ids[version.project.slug], changed_files)


@task(queue='web')
@app.task(queue='web')
def send_notifications(version_pk, build_pk):
version = Version.objects.get(pk=version_pk)
build = Build.objects.get(pk=build_pk)
Expand Down Expand Up @@ -878,7 +875,7 @@ def webhook_notification(version, build, hook_url):
requests.post(hook_url, data=data)


@task(queue='web')
@app.task(queue='web')
def update_static_metadata(project_pk, path=None):
"""Update static metadata JSON file

Expand Down Expand Up @@ -924,7 +921,7 @@ def update_static_metadata(project_pk, path=None):


# Random Tasks
@task()
@app.task()
def remove_dir(path):
"""
Remove a directory on the build/celery server.
Expand All @@ -936,7 +933,7 @@ def remove_dir(path):
shutil.rmtree(path, ignore_errors=True)


@task()
@app.task()
def clear_artifacts(version_pk):
"""Remove artifacts from the web servers"""
version = Version.objects.get(pk=version_pk)
Expand All @@ -946,38 +943,38 @@ def clear_artifacts(version_pk):
clear_html_artifacts(version)


@task()
@app.task()
def clear_pdf_artifacts(version):
if isinstance(version, int):
version = Version.objects.get(pk=version)
remove_dir(version.project.get_production_media_path(
type_='pdf', version_slug=version.slug))


@task()
@app.task()
def clear_epub_artifacts(version):
if isinstance(version, int):
version = Version.objects.get(pk=version)
remove_dir(version.project.get_production_media_path(
type_='epub', version_slug=version.slug))


@task()
@app.task()
def clear_htmlzip_artifacts(version):
if isinstance(version, int):
version = Version.objects.get(pk=version)
remove_dir(version.project.get_production_media_path(
type_='htmlzip', version_slug=version.slug))


@task()
@app.task()
def clear_html_artifacts(version):
if isinstance(version, int):
version = Version.objects.get(pk=version)
remove_dir(version.project.rtd_build_path(version=version.slug))


@task(queue='web')
@app.task(queue='web')
def sync_callback(_, version_pk, commit, *args, **kwargs):
"""
This will be called once the sync_files tasks are done.
Expand Down
Loading