Skip to content

Do some initial file syncing error handling cleanup #3235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 8, 2017
Merged
2 changes: 1 addition & 1 deletion readthedocs/builds/syncers.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def copy(cls, path, target, host, is_file=False, **__):
)
ret = os.system(sync_cmd)
if ret != 0:
log.info("COPY ERROR to app servers.")
log.error("COPY ERROR to app servers. Command: [{}] Return: [{}]".format(sync_cmd, ret))


class Syncer(SettingsOverrideObject):
Expand Down
25 changes: 19 additions & 6 deletions readthedocs/core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from django.utils.safestring import SafeText, mark_safe
from django.utils.text import slugify as slugify_base
from future.backports.urllib.parse import urlparse
from celery import group, chord

from ..tasks import send_email_task
from readthedocs.builds.constants import LATEST
Expand All @@ -25,7 +26,15 @@
SYNC_USER = getattr(settings, 'SYNC_USER', getpass.getuser())


def broadcast(type, task, args, kwargs=None): # pylint: disable=redefined-builtin
def broadcast(type, task, args, kwargs=None, callback=None): # pylint: disable=redefined-builtin
"""
Run a broadcast across our servers.

Returns a task group that can be checked for results.

`callback` should be a task signature that will be run once,
after all of the broadcast tasks have finished running.
"""
assert type in ['web', 'app', 'build']
if kwargs is None:
kwargs = {}
Expand All @@ -34,12 +43,16 @@ def broadcast(type, task, args, kwargs=None): # pylint: disable=redefined-built
servers = getattr(settings, "MULTIPLE_APP_SERVERS", [default_queue])
elif type in ['build']:
servers = getattr(settings, "MULTIPLE_BUILD_SERVERS", [default_queue])

tasks = []
for server in servers:
task.apply_async(
queue=server,
args=args,
kwargs=kwargs,
)
task_sig = task.s(*args, **kwargs).set(queue=server)
tasks.append(task_sig)
if callback:
task_promise = chord(tasks)(callback).get()
else:
task_promise = group(*tasks).apply_async()
return task_promise


def clean_url(url):
Expand Down
96 changes: 49 additions & 47 deletions readthedocs/projects/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from .exceptions import ProjectImportError
from .models import ImportedFile, Project, Domain
from .signals import before_vcs, after_vcs, before_build, after_build
from readthedocs.api.client import api as api_v1
from readthedocs.builds.constants import (LATEST,
BUILD_STATE_CLONING,
BUILD_STATE_INSTALLING,
Expand Down Expand Up @@ -224,6 +223,8 @@ def run_build(self, docker=False, record=True):
pdf=bool(outcomes['pdf']),
epub=bool(outcomes['epub']),
)
else:
log.warning('No build ID, not syncing files')

if self.build_env.failed:
self.send_notifications()
Expand Down Expand Up @@ -280,11 +281,10 @@ def setup_vcs(self):
if commit:
self.build['commit'] = commit
except ProjectImportError as e:
log.error(
log.exception(
LOG_TEMPLATE.format(project=self.project.slug,
version=self.version.slug,
msg=str(e)),
exc_info=True,
msg='Failed to import Project: '),
)
raise BuildEnvironmentError('Failed to import project: %s' % e,
status_code=404)
Expand Down Expand Up @@ -346,35 +346,27 @@ def update_app_instances(self, html=False, localmedia=False, search=False,
'active': True,
'built': True,
})
except HttpClientError as e:
log.error('Updating version failed, skipping file sync: version=%s',
self.version.pk, exc_info=True)
else:
# Broadcast finalization steps to web application instances
broadcast(
type='app',
task=sync_files,
args=[
self.project.pk,
self.version.pk,
],
kwargs=dict(
hostname=socket.gethostname(),
html=html,
localmedia=localmedia,
search=search,
pdf=pdf,
epub=epub,
)
)

# Delayed tasks
# TODO these should be chained on to the broadcast calls. The
# broadcast calls could be lumped together into a promise, and on
# task result, these next few tasks can be updated, also in a
# chained fashion
fileify.delay(self.version.pk, commit=self.build.get('commit'))
update_search.delay(self.version.pk, commit=self.build.get('commit'))
except HttpClientError:
log.exception('Updating version failed, skipping file sync: version=%s' % self.version)

# Broadcast finalization steps to web application instances
broadcast(
type='app',
task=sync_files,
args=[
self.project.pk,
self.version.pk,
],
kwargs=dict(
hostname=socket.gethostname(),
html=html,
localmedia=localmedia,
search=search,
pdf=pdf,
epub=epub,
),
callback=sync_callback.s(version_pk=self.version.pk, commit=self.build['commit']),
)

def setup_environment(self):
"""
Expand Down Expand Up @@ -442,8 +434,7 @@ def build_docs_html(self):
kwargs=dict(html=True)
)
except socket.error:
# TODO do something here
pass
log.exception('move_files task has failed on socket error.')

return success

Expand Down Expand Up @@ -550,8 +541,6 @@ def update_imported_docs(version_pk):
version_slug = LATEST
version_repo = project.vcs_repo(version_slug)
ret_dict['checkout'] = version_repo.update()
except Exception:
raise
finally:
after_vcs.send(sender=version)

Expand All @@ -575,10 +564,10 @@ def update_imported_docs(version_pk):

try:
api_v2.project(project.pk).sync_versions.post(version_post_data)
except HttpClientError as e:
log.error("Sync Versions Exception: %s", e.content)
except Exception as e:
log.error("Unknown Sync Versions Exception", exc_info=True)
except HttpClientError:
log.exception("Sync Versions Exception")
except Exception:
log.exception("Unknown Sync Versions Exception")
return ret_dict


Expand Down Expand Up @@ -634,6 +623,8 @@ def move_files(version_pk, hostname, html=False, localmedia=False, search=False,
:type epub: bool
"""
version = Version.objects.get(pk=version_pk)
log.debug(LOG_TEMPLATE.format(project=version.project.slug, version=version.slug,
msg='Moving files: {}'.format(locals())))

if html:
from_path = version.project.artifact_path(
Expand All @@ -642,18 +633,18 @@ def move_files(version_pk, hostname, html=False, localmedia=False, search=False,
Syncer.copy(from_path, target, host=hostname)

if 'sphinx' in version.project.documentation_type:
if localmedia:
if search:
from_path = version.project.artifact_path(
version=version.slug, type_='sphinx_localmedia')
version=version.slug, type_='sphinx_search')
to_path = version.project.get_production_media_path(
type_='htmlzip', version_slug=version.slug, include_file=False)
type_='json', version_slug=version.slug, include_file=False)
Syncer.copy(from_path, to_path, host=hostname)

if search:
if localmedia:
from_path = version.project.artifact_path(
version=version.slug, type_='sphinx_search')
version=version.slug, type_='sphinx_localmedia')
to_path = version.project.get_production_media_path(
type_='json', version_slug=version.slug, include_file=False)
type_='htmlzip', version_slug=version.slug, include_file=False)
Syncer.copy(from_path, to_path, host=hostname)

# Always move PDF's because the return code lies.
Expand Down Expand Up @@ -984,3 +975,14 @@ def clear_html_artifacts(version):
if isinstance(version, int):
version = Version.objects.get(pk=version)
remove_dir(version.project.rtd_build_path(version=version.slug))


@task(queue='web')
def sync_callback(_, version_pk, commit, *args, **kwargs):
"""
This will be called once the sync_files tasks are done.

The first argument is the result from previous tasks, which we discard.
"""
fileify(version_pk, commit=commit)
update_search(version_pk, commit=commit)
6 changes: 2 additions & 4 deletions readthedocs/rtd_tests/tests/projects/test_admin_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,8 @@ def test_project_delete(self, remove_dir):
action_data
)
self.assertFalse(Project.objects.filter(pk=self.project.pk).exists())
remove_dir.apply_async.assert_has_calls([
remove_dir.s.assert_has_calls([
mock.call(
kwargs={},
queue='celery',
args=[self.project.doc_path]
self.project.doc_path,
),
])
2 changes: 0 additions & 2 deletions readthedocs/search/parse_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ def generate_sections_from_pyquery(body):
'title': title,
'content': content,
}
log.debug("(Search Index) Section [%s:%s]: %s",
section_id, title, content)


def process_file(filename):
Expand Down
4 changes: 0 additions & 4 deletions readthedocs/search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ def parse_sphinx_sections(content):
'title': title,
'content': content,
}
log.debug("(Search Index) Section [%s:%s]: %s",
section_id, title, content)


def parse_mkdocs_sections(content):
Expand Down Expand Up @@ -265,8 +263,6 @@ def parse_mkdocs_sections(content):
'title': h2_title,
'content': h2_content,
}
log.debug("(Search Index) Section [%s:%s]: %s",
section_id, h2_title, h2_content)
# we're unsure which exceptions can be raised
# pylint: disable=bare-except
except:
Expand Down
2 changes: 1 addition & 1 deletion readthedocs/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def INSTALLED_APPS(self): # noqa
'debug': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'debug.log',
'filename': os.path.join(LOGS_ROOT, 'debug.log'),
'formatter': 'default',
},
},
Expand Down
3 changes: 3 additions & 0 deletions readthedocs/settings/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def DATABASES(self): # noqa

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ALWAYS_EAGER = True

HAYSTACK_CONNECTIONS = {
Expand All @@ -55,6 +56,8 @@ def DATABASES(self): # noqa
def LOGGING(self): # noqa - avoid pep8 N802
logging = super(CommunityDevSettings, self).LOGGING
logging['formatters']['default']['format'] = '[%(asctime)s] ' + self.LOG_FORMAT
# Remove double logging
logging['loggers']['']['handlers'] = []
return logging


Expand Down