Skip to content

Commit b220bc6

Browse files
authored
Merge pull request #8952 from readthedocs/humitos/celery-upgrade
2 parents 7a92182 + 14cb4ff commit b220bc6

File tree

8 files changed

+65
-88
lines changed

8 files changed

+65
-88
lines changed

readthedocs/builds/apps.py

+1-17
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,4 @@ class Config(AppConfig):
1212
verbose_name = _("Builds")
1313

1414
def ready(self):
15-
from readthedocs.builds.tasks import ArchiveBuilds
16-
from readthedocs.worker import app
17-
app.tasks.register(ArchiveBuilds)
18-
19-
try:
20-
from readthedocsext.monitoring.metrics.tasks import (
21-
Metrics1mTask,
22-
Metrics5mTask,
23-
Metrics10mTask,
24-
Metrics30mTask,
25-
)
26-
app.tasks.register(Metrics1mTask)
27-
app.tasks.register(Metrics5mTask)
28-
app.tasks.register(Metrics10mTask)
29-
app.tasks.register(Metrics30mTask)
30-
except (ModuleNotFoundError, ImportError):
31-
log.warning('Metrics tasks could not be imported.')
15+
import readthedocs.builds.tasks

readthedocs/builds/tasks.py

+46-52
Original file line numberDiff line numberDiff line change
@@ -178,67 +178,61 @@ def _get_version(self, task, args, kwargs):
178178
return version
179179

180180

181-
class ArchiveBuilds(Task):
182-
183-
"""Task to archive old builds to cold storage."""
184-
185-
name = __name__ + '.archive_builds'
186-
187-
def run(self, *args, **kwargs):
188-
if not settings.RTD_SAVE_BUILD_COMMANDS_TO_STORAGE:
189-
return
190-
191-
lock_id = '{0}-lock'.format(self.name)
192-
days = kwargs.get('days', 14)
193-
limit = kwargs.get('limit', 2000)
194-
delete = kwargs.get('delete', True)
195-
196-
with memcache_lock(lock_id, self.app.oid) as acquired:
197-
if acquired:
198-
archive_builds_task(days=days, limit=limit, delete=delete)
199-
else:
200-
log.warning('Archive Builds Task still locked')
181+
# NOTE: re-define this task to keep the old name. Delete it after deploy.
182+
@app.task(queue='web')
183+
def archive_builds(days=14, limit=200, include_cold=False, delete=False):
184+
archive_builds_task.delay(days=days, limit=limit, include_cold=include_cold, delete=delete)
201185

202186

203-
def archive_builds_task(days=14, limit=200, include_cold=False, delete=False):
187+
@app.task(queue='web', bind=True)
188+
def archive_builds_task(self, days=14, limit=200, include_cold=False, delete=False):
204189
"""
205-
Find stale builds and remove build paths.
190+
Task to archive old builds to cold storage.
206191
207192
:arg days: Find builds older than `days` days.
208193
:arg include_cold: If True, include builds that are already in cold storage
209194
:arg delete: If True, deletes BuildCommand objects after archiving them
210195
"""
211-
max_date = timezone.now() - timezone.timedelta(days=days)
212-
queryset = Build.objects.exclude(commands__isnull=True)
213-
if not include_cold:
214-
queryset = queryset.exclude(cold_storage=True)
215-
216-
queryset = (
217-
queryset
218-
.filter(date__lt=max_date)
219-
.prefetch_related('commands')
220-
.only('date', 'cold_storage')
221-
[:limit]
222-
)
196+
if not settings.RTD_SAVE_BUILD_COMMANDS_TO_STORAGE:
197+
return
223198

224-
for build in queryset:
225-
commands = BuildCommandSerializer(build.commands, many=True).data
226-
if commands:
227-
for cmd in commands:
228-
if len(cmd['output']) > MAX_BUILD_COMMAND_SIZE:
229-
cmd['output'] = cmd['output'][-MAX_BUILD_COMMAND_SIZE:]
230-
cmd['output'] = "... (truncated) ...\n\nCommand output too long. Truncated to last 1MB.\n\n" + cmd['output'] # noqa
231-
log.warning('Truncating build command for build.', build_id=build.id)
232-
output = BytesIO(json.dumps(commands).encode('utf8'))
233-
filename = '{date}/{id}.json'.format(date=str(build.date.date()), id=build.id)
234-
try:
235-
build_commands_storage.save(name=filename, content=output)
236-
build.cold_storage = True
237-
build.save()
238-
if delete:
239-
build.commands.all().delete()
240-
except IOError:
241-
log.exception('Cold Storage save failure')
199+
lock_id = '{0}-lock'.format(self.name)
200+
with memcache_lock(lock_id, self.app.oid) as acquired:
201+
if not acquired:
202+
log.warning('Archive Builds Task still locked')
203+
return False
204+
205+
max_date = timezone.now() - timezone.timedelta(days=days)
206+
queryset = Build.objects.exclude(commands__isnull=True)
207+
if not include_cold:
208+
queryset = queryset.exclude(cold_storage=True)
209+
210+
queryset = (
211+
queryset
212+
.filter(date__lt=max_date)
213+
.prefetch_related('commands')
214+
.only('date', 'cold_storage')
215+
[:limit]
216+
)
217+
218+
for build in queryset:
219+
commands = BuildCommandSerializer(build.commands, many=True).data
220+
if commands:
221+
for cmd in commands:
222+
if len(cmd['output']) > MAX_BUILD_COMMAND_SIZE:
223+
cmd['output'] = cmd['output'][-MAX_BUILD_COMMAND_SIZE:]
224+
cmd['output'] = "... (truncated) ...\n\nCommand output too long. Truncated to last 1MB.\n\n" + cmd['output'] # noqa
225+
log.warning('Truncating build command for build.', build_id=build.id)
226+
output = BytesIO(json.dumps(commands).encode('utf8'))
227+
filename = '{date}/{id}.json'.format(date=str(build.date.date()), id=build.id)
228+
try:
229+
build_commands_storage.save(name=filename, content=output)
230+
build.cold_storage = True
231+
build.save()
232+
if delete:
233+
build.commands.all().delete()
234+
except IOError:
235+
log.exception('Cold Storage save failure')
242236

243237

244238
@app.task(queue='web')

readthedocs/builds/tests/test_tasks.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import datetime, timedelta
22
from unittest import mock
33

4-
from django.test import TestCase
4+
from django.test import TestCase, override_settings
55
from django.utils import timezone
66
from django_dynamic_fixture import get
77

@@ -74,6 +74,7 @@ def test_delete_inactive_external_versions(self):
7474
self.assertEqual(Version.external.all().count(), 2)
7575
self.assertFalse(Version.objects.filter(slug='external-inactive-old').exists())
7676

77+
@override_settings(RTD_SAVE_BUILD_COMMANDS_TO_STORAGE=True)
7778
@mock.patch('readthedocs.builds.tasks.build_commands_storage')
7879
def test_archive_builds(self, build_commands_storage):
7980
project = get(Project)
@@ -98,7 +99,7 @@ def test_archive_builds(self, build_commands_storage):
9899
self.assertEqual(Build.objects.count(), 10)
99100
self.assertEqual(BuildCommandResult.objects.count(), 100)
100101

101-
archive_builds_task(days=5, delete=True)
102+
archive_builds_task.delay(days=5, delete=True)
102103

103104
self.assertEqual(len(build_commands_storage.save.mock_calls), 5)
104105
self.assertEqual(Build.objects.count(), 10)

readthedocs/builds/utils.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,9 @@ def memcache_lock(lock_id, oid):
9494
finally:
9595
# memcache delete is very slow, but we have to use it to take
9696
# advantage of using add() for atomic locking
97-
if monotonic() < timeout_at:
97+
if monotonic() < timeout_at and status:
9898
# don't release the lock if we exceeded the timeout
9999
# to lessen the chance of releasing an expired lock
100-
# owned by someone else.
100+
# owned by someone else
101+
# also don't release the lock if we didn't acquire it
101102
cache.delete(lock_id)

readthedocs/core/apps.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
# -*- coding: utf-8 -*-
2-
31
"""App configurations for core app."""
42

3+
import structlog
4+
55
from django.apps import AppConfig
66

7+
log = structlog.get_logger(__name__)
8+
79

810
class CoreAppConfig(AppConfig):
911
name = 'readthedocs.core'
@@ -14,3 +16,8 @@ def ready(self):
1416

1517
# Import `readthedocs.core.logs` to set up structlog
1618
import readthedocs.core.logs # noqa
19+
20+
try:
21+
import readthedocsext.monitoring.metrics.tasks
22+
except (ModuleNotFoundError, ImportError):
23+
log.info('Metrics tasks could not be imported.')

readthedocs/projects/tasks/builds.py

-6
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,6 @@ def update_versions_from_repository(self, environment):
205205
bind=True,
206206
)
207207
def sync_repository_task(self, version_id):
208-
# NOTE: `before_start` is new on Celery 5.2.x, but we are using 5.1.x currently.
209-
self.before_start(self.request.id, self.request.args, self.request.kwargs)
210-
211208
self.execute()
212209

213210

@@ -993,7 +990,4 @@ def is_type_sphinx(self):
993990
bind=True,
994991
)
995992
def update_docs_task(self, version_id, build_id, build_commit=None):
996-
# NOTE: `before_start` is new on Celery 5.2.x, but we are using 5.1.x currently.
997-
self.before_start(self.request.id, self.request.args, self.request.kwargs)
998-
999993
self.execute()

readthedocs/settings/base.py

+2
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,8 @@ def TEMPLATES(self):
432432
'options': {'queue': 'web'},
433433
'kwargs': {
434434
'days': 1,
435+
'limit': 2000,
436+
'delete': True,
435437
},
436438
},
437439
'every-day-delete-inactive-external-versions': {

requirements/pip.txt

+1-7
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,7 @@ Pygments==2.11.2
3838
# django-redis-cache 2.1.3 depends on redis<4.0
3939
redis==3.5.3 # pyup: ignore
4040

41-
# Pin celery and kombu because it produces an error
42-
# File "/home/docs/lib/python3.8/site-packages/celery/app/trace.py", line 361, in build_tracer
43-
# push_request = request_stack.push
44-
# AttributeError: 'NoneType' object has no attribute 'push'
45-
# See https://github.com/celery/celery/discussions/7172
46-
kombu==5.1.0 # pyup: ignore
47-
celery==5.1.2 # pyup: ignore
41+
celery==5.2.3
4842

4943
# When upgrading to 0.43.0 we should double check the ``base.html`` change
5044
# described in the changelog. In previous versions, the allauth app included a

0 commit comments

Comments
 (0)