diff --git a/readthedocs/builds/syncers.py b/readthedocs/builds/syncers.py index ff792959534..3c8af919e01 100644 --- a/readthedocs/builds/syncers.py +++ b/readthedocs/builds/syncers.py @@ -140,7 +140,7 @@ def copy(cls, path, target, host, is_file=False, **__): ) ret = os.system(sync_cmd) if ret != 0: - log.info("COPY ERROR to app servers.") + log.error("COPY ERROR to app servers. Command: [{}] Return: [{}]".format(sync_cmd, ret)) class Syncer(SettingsOverrideObject): diff --git a/readthedocs/core/utils/__init__.py b/readthedocs/core/utils/__init__.py index 9adfce777c4..fca8e13f96f 100644 --- a/readthedocs/core/utils/__init__.py +++ b/readthedocs/core/utils/__init__.py @@ -14,6 +14,7 @@ from django.utils.safestring import SafeText, mark_safe from django.utils.text import slugify as slugify_base from future.backports.urllib.parse import urlparse +from celery import group, chord from ..tasks import send_email_task from readthedocs.builds.constants import LATEST @@ -25,7 +26,15 @@ SYNC_USER = getattr(settings, 'SYNC_USER', getpass.getuser()) -def broadcast(type, task, args, kwargs=None): # pylint: disable=redefined-builtin +def broadcast(type, task, args, kwargs=None, callback=None): # pylint: disable=redefined-builtin + """ + Run a broadcast across our servers. + + Returns a task group that can be checked for results. + + `callback` should be a task signature that will be run once, + after all of the broadcast tasks have finished running. + """ assert type in ['web', 'app', 'build'] if kwargs is None: kwargs = {} @@ -34,12 +43,16 @@ def broadcast(type, task, args, kwargs=None): # pylint: disable=redefined-built servers = getattr(settings, "MULTIPLE_APP_SERVERS", [default_queue]) elif type in ['build']: servers = getattr(settings, "MULTIPLE_BUILD_SERVERS", [default_queue]) + + tasks = [] for server in servers: - task.apply_async( - queue=server, - args=args, - kwargs=kwargs, - ) + task_sig = task.s(*args, **kwargs).set(queue=server) + tasks.append(task_sig) + if callback: + task_promise = chord(tasks)(callback).get() + else: + task_promise = group(*tasks).apply_async() + return task_promise def clean_url(url): diff --git a/readthedocs/projects/tasks.py b/readthedocs/projects/tasks.py index 7d50ea0881c..a52fa80c3af 100644 --- a/readthedocs/projects/tasks.py +++ b/readthedocs/projects/tasks.py @@ -29,7 +29,6 @@ from .exceptions import ProjectImportError from .models import ImportedFile, Project, Domain from .signals import before_vcs, after_vcs, before_build, after_build -from readthedocs.api.client import api as api_v1 from readthedocs.builds.constants import (LATEST, BUILD_STATE_CLONING, BUILD_STATE_INSTALLING, @@ -224,6 +223,8 @@ def run_build(self, docker=False, record=True): pdf=bool(outcomes['pdf']), epub=bool(outcomes['epub']), ) + else: + log.warning('No build ID, not syncing files') if self.build_env.failed: self.send_notifications() @@ -280,11 +281,10 @@ def setup_vcs(self): if commit: self.build['commit'] = commit except ProjectImportError as e: - log.error( + log.exception( LOG_TEMPLATE.format(project=self.project.slug, version=self.version.slug, - msg=str(e)), - exc_info=True, + msg='Failed to import Project: '), ) raise BuildEnvironmentError('Failed to import project: %s' % e, status_code=404) @@ -346,35 +346,27 @@ def update_app_instances(self, html=False, localmedia=False, search=False, 'active': True, 'built': True, }) - except HttpClientError as e: - log.error('Updating version failed, skipping file sync: version=%s', - self.version.pk, exc_info=True) - else: - # Broadcast finalization steps to web application instances - broadcast( - type='app', - task=sync_files, - args=[ - self.project.pk, - self.version.pk, - ], - kwargs=dict( - hostname=socket.gethostname(), - html=html, - localmedia=localmedia, - search=search, - pdf=pdf, - epub=epub, - ) - ) - - # Delayed tasks - # TODO these should be chained on to the broadcast calls. The - # broadcast calls could be lumped together into a promise, and on - # task result, these next few tasks can be updated, also in a - # chained fashion - fileify.delay(self.version.pk, commit=self.build.get('commit')) - update_search.delay(self.version.pk, commit=self.build.get('commit')) + except HttpClientError: + log.exception('Updating version failed, skipping file sync: version=%s' % self.version) + + # Broadcast finalization steps to web application instances + broadcast( + type='app', + task=sync_files, + args=[ + self.project.pk, + self.version.pk, + ], + kwargs=dict( + hostname=socket.gethostname(), + html=html, + localmedia=localmedia, + search=search, + pdf=pdf, + epub=epub, + ), + callback=sync_callback.s(version_pk=self.version.pk, commit=self.build['commit']), + ) def setup_environment(self): """ @@ -442,8 +434,7 @@ def build_docs_html(self): kwargs=dict(html=True) ) except socket.error: - # TODO do something here - pass + log.exception('move_files task has failed on socket error.') return success @@ -550,8 +541,6 @@ def update_imported_docs(version_pk): version_slug = LATEST version_repo = project.vcs_repo(version_slug) ret_dict['checkout'] = version_repo.update() - except Exception: - raise finally: after_vcs.send(sender=version) @@ -575,10 +564,10 @@ def update_imported_docs(version_pk): try: api_v2.project(project.pk).sync_versions.post(version_post_data) - except HttpClientError as e: - log.error("Sync Versions Exception: %s", e.content) - except Exception as e: - log.error("Unknown Sync Versions Exception", exc_info=True) + except HttpClientError: + log.exception("Sync Versions Exception") + except Exception: + log.exception("Unknown Sync Versions Exception") return ret_dict @@ -634,6 +623,8 @@ def move_files(version_pk, hostname, html=False, localmedia=False, search=False, :type epub: bool """ version = Version.objects.get(pk=version_pk) + log.debug(LOG_TEMPLATE.format(project=version.project.slug, version=version.slug, + msg='Moving files: {}'.format(locals()))) if html: from_path = version.project.artifact_path( @@ -642,18 +633,18 @@ def move_files(version_pk, hostname, html=False, localmedia=False, search=False, Syncer.copy(from_path, target, host=hostname) if 'sphinx' in version.project.documentation_type: - if localmedia: + if search: from_path = version.project.artifact_path( - version=version.slug, type_='sphinx_localmedia') + version=version.slug, type_='sphinx_search') to_path = version.project.get_production_media_path( - type_='htmlzip', version_slug=version.slug, include_file=False) + type_='json', version_slug=version.slug, include_file=False) Syncer.copy(from_path, to_path, host=hostname) - if search: + if localmedia: from_path = version.project.artifact_path( - version=version.slug, type_='sphinx_search') + version=version.slug, type_='sphinx_localmedia') to_path = version.project.get_production_media_path( - type_='json', version_slug=version.slug, include_file=False) + type_='htmlzip', version_slug=version.slug, include_file=False) Syncer.copy(from_path, to_path, host=hostname) # Always move PDF's because the return code lies. @@ -984,3 +975,14 @@ def clear_html_artifacts(version): if isinstance(version, int): version = Version.objects.get(pk=version) remove_dir(version.project.rtd_build_path(version=version.slug)) + + +@task(queue='web') +def sync_callback(_, version_pk, commit, *args, **kwargs): + """ + This will be called once the sync_files tasks are done. + + The first argument is the result from previous tasks, which we discard. + """ + fileify(version_pk, commit=commit) + update_search(version_pk, commit=commit) diff --git a/readthedocs/rtd_tests/tests/projects/test_admin_actions.py b/readthedocs/rtd_tests/tests/projects/test_admin_actions.py index fb2d63555b0..b0f5526b331 100644 --- a/readthedocs/rtd_tests/tests/projects/test_admin_actions.py +++ b/readthedocs/rtd_tests/tests/projects/test_admin_actions.py @@ -71,10 +71,8 @@ def test_project_delete(self, remove_dir): action_data ) self.assertFalse(Project.objects.filter(pk=self.project.pk).exists()) - remove_dir.apply_async.assert_has_calls([ + remove_dir.s.assert_has_calls([ mock.call( - kwargs={}, - queue='celery', - args=[self.project.doc_path] + self.project.doc_path, ), ]) diff --git a/readthedocs/search/parse_json.py b/readthedocs/search/parse_json.py index 1640b58ee88..196caf2bd12 100644 --- a/readthedocs/search/parse_json.py +++ b/readthedocs/search/parse_json.py @@ -88,8 +88,6 @@ def generate_sections_from_pyquery(body): 'title': title, 'content': content, } - log.debug("(Search Index) Section [%s:%s]: %s", - section_id, title, content) def process_file(filename): diff --git a/readthedocs/search/utils.py b/readthedocs/search/utils.py index 5ad9d1b2b38..07eb38cd1f8 100644 --- a/readthedocs/search/utils.py +++ b/readthedocs/search/utils.py @@ -212,8 +212,6 @@ def parse_sphinx_sections(content): 'title': title, 'content': content, } - log.debug("(Search Index) Section [%s:%s]: %s", - section_id, title, content) def parse_mkdocs_sections(content): @@ -265,8 +263,6 @@ def parse_mkdocs_sections(content): 'title': h2_title, 'content': h2_content, } - log.debug("(Search Index) Section [%s:%s]: %s", - section_id, h2_title, h2_content) # we're unsure which exceptions can be raised # pylint: disable=bare-except except: diff --git a/readthedocs/settings/base.py b/readthedocs/settings/base.py index 0dfa2c1e4a6..d595f49e430 100644 --- a/readthedocs/settings/base.py +++ b/readthedocs/settings/base.py @@ -357,7 +357,7 @@ def INSTALLED_APPS(self): # noqa 'debug': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', - 'filename': 'debug.log', + 'filename': os.path.join(LOGS_ROOT, 'debug.log'), 'formatter': 'default', }, }, diff --git a/readthedocs/settings/dev.py b/readthedocs/settings/dev.py index 4ae0ca0295e..e605f2c4ecb 100644 --- a/readthedocs/settings/dev.py +++ b/readthedocs/settings/dev.py @@ -33,6 +33,7 @@ def DATABASES(self): # noqa BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' + CELERY_RESULT_SERIALIZER = 'json' CELERY_ALWAYS_EAGER = True HAYSTACK_CONNECTIONS = { @@ -55,6 +56,8 @@ def DATABASES(self): # noqa def LOGGING(self): # noqa - avoid pep8 N802 logging = super(CommunityDevSettings, self).LOGGING logging['formatters']['default']['format'] = '[%(asctime)s] ' + self.LOG_FORMAT + # Remove double logging + logging['loggers']['']['handlers'] = [] return logging