from __future__ import absolute_import, print_function
import os
import requests
from collections import defaultdict
from datetime import datetime
from flask import current_app
from requests.exceptions import ConnectionError, HTTPError, Timeout, SSLError
from sqlalchemy import distinct, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import func
from changes.constants import Status, Result, ResultSource
from changes.config import db, statsreporter
from changes.db.utils import try_create
from changes.jobs.sync_artifact import sync_artifact
from changes.lib.artifact_store_lib import ArtifactStoreClient
from changes.models.artifact import Artifact
from changes.models.bazeltarget import BazelTarget
from changes.models.failurereason import FailureReason
from changes.models.filecoverage import FileCoverage
from changes.models.itemstat import ItemStat
from changes.models.jobphase import JobPhase
from changes.models.jobplan import JobPlan
from changes.models.jobstep import JobStep
from changes.models.log import LogSource
from changes.models.option import ItemOption
from changes.models.snapshot import SnapshotImage
from changes.models.test import TestCase
from changes.queue.task import tracked_task
from changes.db.utils import get_or_create
from changes.storage.artifactstore import ARTIFACTSTORE_PREFIX, ArtifactStoreFileStorage
INFRA_FAILURE_REASONS = ['malformed_manifest_json', 'missing_manifest_json']
def abort_step(task):
step = JobStep.query.get(task.kwargs['step_id'])
step.status = Status.finished
step.result = Result.aborted
db.session.add(step)
db.session.commit()
current_app.logger.exception('Unrecoverable exception syncing step %s', step.id)
def is_missing_tests(step, jobplan):
return _expects_tests(jobplan) and is_final_jobphase(step.phase) and not _has_tests(step)
def _result_from_failure_reasons(step):
if step.replacement_id is not None:
return None
reasons = [r for r, in db.session.query(
distinct(FailureReason.reason)
).filter(
FailureReason.step_id == step.id,
).all()]
if any(infra_reason in reasons for infra_reason in INFRA_FAILURE_REASONS):
return Result.infra_failed
elif reasons:
return Result.failed
return None
def _is_snapshot_job(jobplan):
"""
Args:
jobplan (JobPlan): The JobPlan to check.
Returns:
bool: Whether this plan is for a job that is creating a snapshot.
"""
if not jobplan:
return False
is_snapshot = db.session.query(SnapshotImage.query.filter(
SnapshotImage.job_id == jobplan.job_id
).exists()).scalar()
return bool(is_snapshot)
def _expects_tests(jobplan):
"""Check whether a jobplan expects tests or not.
Usually this is encoded within the jobplan itself under a snapshot
of the ItemOptions associated with the plan, but if not we fall
back to looking at the plan itself.
Since snapshot builds never return tests, we override this for
snapshot builds and never expect tests for them (which allows
building a snapshot for a plan that has buld.expect-tests enabled)
"""
if _is_snapshot_job(jobplan):
return False
if 'snapshot' in jobplan.data:
options = jobplan.data['snapshot']['options']
else:
options = dict(db.session.query(
ItemOption.name, ItemOption.value,
).filter(
ItemOption.item_id == jobplan.plan.id,
ItemOption.name == 'build.expect-tests',
))
return options.get('build.expect-tests') == '1'
def is_final_jobphase(phase):
# TODO(dcramer): there is probably a better way we can be explicit about
# this?
jobphase_query = JobPhase.query.filter(
JobPhase.job_id == phase.job_id,
JobPhase.id != phase.id,
JobPhase.date_created > phase.date_created,
)
return not db.session.query(jobphase_query.exists()).scalar()
def _has_tests(step):
has_tests = db.session.query(TestCase.query.filter(
TestCase.step_id == step.id,
).exists()).scalar()
return has_tests
def has_test_failures(step):
return db.session.query(TestCase.query.filter(
TestCase.step_id == step.id,
TestCase.result == Result.failed,
).exists()).scalar()
def has_missing_targets(step):
return db.session.query(BazelTarget.query.filter(
BazelTarget.step_id == step.id,
BazelTarget.status == Status.in_progress,
or_(
BazelTarget.result_source == ResultSource.from_self,
# None value for result_source implies `from_self`
BazelTarget.result_source.is_(None),
),
).exists()).scalar()
# Extra time to allot snapshot builds, as they are expected to take longer.
_SNAPSHOT_TIMEOUT_BONUS_MINUTES = 40
def has_timed_out(step, jobplan, default_timeout):
"""
Args:
default_timeout (int): Timeout in minutes to be used when
no timeout is specified for this build. Required because
nothing is expected to run forever.
"""
if step.status != Status.in_progress:
# HACK: We don't really want to timeout jobsteps that are
# stuck for infrastructural reasons here, but we don't yet
# have code to handle other cases well.
# To be sure that jobsteps can die, we cast a wide net here.
if step.status == Status.finished:
return False
# date_started is preferred if available, but we fall back to
# date_created (always set) so jobsteps that never start don't get to run forever.
start_time = step.date_started or step.date_created
# TODO(adai): specify timeout in project YAML file, currently uses global
# defaults for step timeout duration if the job is autogenerated
# TODO(dcramer): we make an assumption that there is a single step
options = jobplan.get_steps()[0].options if not jobplan.plan.autogenerated() else {}
timeout = int(options.get('build.timeout', '0')) or default_timeout
# timeout is in minutes
timeout = timeout * 60
# Snapshots are given a few extra minutes before being considered as timed out.
if _is_snapshot_job(jobplan):
timeout += 60 * _SNAPSHOT_TIMEOUT_BONUS_MINUTES
# If jobstep is still queued/pending allocation, apply a separate timeout
if step.status in (Status.allocated, Status.pending_allocation, Status.queued):
timeout = TIMEOUT_IN_QUEUE_MIN * 60
delta = datetime.utcnow() - start_time
if delta.total_seconds() > timeout:
return True
return False
def record_coverage_stats(step):
coverage_stats = db.session.query(
func.sum(FileCoverage.lines_covered).label('lines_covered'),
func.sum(FileCoverage.lines_uncovered).label('lines_uncovered'),
func.sum(FileCoverage.diff_lines_covered).label('diff_lines_covered'),
func.sum(FileCoverage.diff_lines_uncovered).label('diff_lines_uncovered'),
).filter(
FileCoverage.step_id == step.id,
).group_by(
FileCoverage.step_id,
).first()
stat_list = (
'lines_covered', 'lines_uncovered',
'diff_lines_covered', 'diff_lines_uncovered',
)
for stat_name in stat_list:
try_create(ItemStat, where={
'item_id': step.id,
'name': stat_name,
'value': getattr(coverage_stats, stat_name, 0) or 0,
})
# In minutes, the timeout applied to jobs without a timeout specified at build time.
# If the job legitimately takes more than an hour, the build
# should specify an appropriate timeout.
DEFAULT_TIMEOUT_MIN = 60
# In minutes, the timeout applied to jobs which are in the queue/pending allocation.
# This is applied uniformly across all jobsteps, and cannot be overridden per jobstep.
TIMEOUT_IN_QUEUE_MIN = 180
# In seconds, the timeout applied to any requests we make to the artifacts
# store. Arbitrarily chose as the amount of delay we can tolerate for each
# sync_job_step.
ARTIFACTS_REQUEST_TIMEOUT_SECS = 5
# List of artifact names recognized to be log source (content which is
# continuously updated during the duration of a test, like console logs and
# infralogs)
LOGSOURCE_WHITELIST = ('console', 'infralog',)
def _sync_from_artifact_store(jobstep):
"""Checks and creates new artifacts from the artifact store."""
url = '{base}/buckets/{jobstep_id}/artifacts/'.format(
base=current_app.config.get('ARTIFACTS_SERVER'),
jobstep_id=jobstep.id.hex,
)
job = jobstep.job
try:
res = requests.get(url, timeout=ARTIFACTS_REQUEST_TIMEOUT_SECS)
res.raise_for_status()
artifacts = res.json()
for artifact in artifacts:
# Artifact name is guaranteed to be unique in an artifact store bucket.
artifact_name = artifact['name']
artifact_path = artifact['relativePath']
if artifact_name in LOGSOURCE_WHITELIST:
_, created = get_or_create(LogSource, where={
'name': artifact_name,
'job': job,
'step': jobstep,
}, defaults={
'project': job.project,
'date_created': job.date_started,
'in_artifact_store': True,
})
if created:
try:
db.session.commit()
except IntegrityError as err:
db.session.rollback()
current_app.logger.error(
'DB Error while inserting LogSource %s',
artifact_name, exc_info=True)
# If this artifact is a logsource, don't add it to the list of
# test artifacts.
continue
art, created = get_or_create(Artifact, where={
# Don't conflict with same artifacts uploaded by other means (Jenkins/Mesos)
'name': ARTIFACTSTORE_PREFIX + artifact_path,
'step_id': jobstep.id,
'job_id': jobstep.job_id,
'project_id': jobstep.project_id,
})
if created:
art.file.storage = 'changes.storage.artifactstore.ArtifactStoreFileStorage'
filename = ArtifactStoreFileStorage.get_filename_from_artifact_name(jobstep.id.hex, artifact_name)
art.file.set_filename(filename)
try:
db.session.add(art)
db.session.commit()
except IntegrityError as err:
db.session.rollback()
current_app.logger.error(
'DB Error while inserting artifact %s: %s', filename, err)
except (ConnectionError, HTTPError, SSLError, Timeout) as err:
if isinstance(err, HTTPError) and err.response is not None and err.response.status_code == 404:
# While not all plans use the Artifact Store, 404s are normal and expected.
# No sense in reporting them.
pass
else:
# Log to sentry - unable to contact artifacts store
current_app.logger.warning('Error fetching url %s: %s', url, err, exc_info=True)
raise
except Exception as err:
current_app.logger.error('Error updating artifacts for jobstep %s: %s', jobstep, err, exc_info=True)
raise err
def _get_artifacts_to_sync(artifacts, artifact_manager, prefer_artifactstore):
def is_artifact_store(artifact):
return artifact.file.storage == 'changes.storage.artifactstore.ArtifactStoreFileStorage' and artifact.file
artifacts_by_name = defaultdict(list)
# group by filename
for artifact in artifacts:
artifacts_by_name[os.path.basename(artifact.name)].append(artifact)
to_sync = []
for _, arts in artifacts_by_name.iteritems():
# don't sync_artifact artifacts that we won't actually process
arts = [art for art in arts if artifact_manager.can_process(art.name)]
if len(arts) == 0:
continue
artifactstore_arts = [a for a in arts if is_artifact_store(a)]
other_arts = [a for a in arts if not is_artifact_store(a)]
# if we have this artifact from both sources, let buildstep choose which to use
if len(artifactstore_arts) and len(other_arts):
arts = artifactstore_arts if prefer_artifactstore else other_arts
to_sync.extend(arts)
return to_sync
def _sync_artifacts_for_jobstep(step):
artifacts = Artifact.query.filter(Artifact.step_id == step.id).all()
_, buildstep = JobPlan.get_build_step_for_job(job_id=step.job_id)
prefer_artifactstore = buildstep.prefer_artifactstore()
artifact_manager = buildstep.get_artifact_manager(step)
to_sync = _get_artifacts_to_sync(artifacts, artifact_manager, prefer_artifactstore)
# buildstep may want to check for e.g. required artifacts
buildstep.verify_final_artifacts(step, to_sync)
for artifact in to_sync:
sync_artifact.delay_if_needed(
artifact_id=artifact.id.hex,
task_id=artifact.id.hex,
parent_task_id=step.id.hex,
)
@tracked_task(on_abort=abort_step, max_retries=100)
[docs]def sync_job_step(step_id):
"""
Polls a build for updates. May have sync_artifact children.
"""
step = JobStep.query.get(step_id)
if not step:
return
jobplan, implementation = JobPlan.get_build_step_for_job(job_id=step.job_id)
# only synchronize if upstream hasn't suggested we're finished
if step.status != Status.finished:
implementation.update_step(step=step)
db.session.flush()
_sync_from_artifact_store(step)
if step.status == Status.finished:
# there is a small race condition where step.status got changed right after
# the first call to _sync_from_artifact_store
_sync_from_artifact_store(step)
_sync_artifacts_for_jobstep(step)
is_finished = (step.status == Status.finished and
# make sure all child tasks (like sync_artifact) have also finished
sync_job_step.verify_all_children() == Status.finished)
if not is_finished:
default_timeout = current_app.config['DEFAULT_JOB_TIMEOUT_MIN']
if has_timed_out(step, jobplan, default_timeout=default_timeout):
old_status = step.status
step.data['timed_out'] = True
implementation.cancel_step(step=step)
# Not all implementations can actually cancel, but it's dead to us as of now
# so we mark it as finished.
step.status = Status.finished
step.date_finished = datetime.utcnow()
# Implementations default to marking canceled steps as aborted,
# but we're not canceling on good terms (it should be done by now)
# so we consider it a failure here.
#
# We check whether the step was marked as in_progress to make a best
# guess as to whether this is an infrastructure failure, or the
# repository under test is just taking too long. This won't be 100%
# reliable, but is probably good enough.
if old_status == Status.in_progress:
step.result = Result.failed
else:
step.result = Result.infra_failed
db.session.add(step)
job = step.job
try_create(FailureReason, {
'step_id': step.id,
'job_id': job.id,
'build_id': job.build_id,
'project_id': job.project_id,
'reason': 'timeout'
})
db.session.flush()
statsreporter.stats().incr('job_step_timed_out')
# If we timeout something that isn't in progress, that's our fault, and we should know.
if old_status != Status.in_progress:
current_app.logger.warning(
"Timed out jobstep that wasn't in progress: %s (was %s)", step.id, old_status)
raise sync_job_step.NotFinished
# Close the ArtifactStore bucket used by jenkins, if it exists
bucket_name = step.data.get('jenkins_bucket_name')
if bucket_name:
try:
ArtifactStoreClient(current_app.config['ARTIFACTS_SERVER']).close_bucket(bucket_name)
except Exception:
# Closing buckets is not strictly necessary in artifactstore
pass
# Ignore any 'failures' if the build did not finish properly.
# NOTE(josiah): we might want to include "unknown" and "skipped" here as
# well, or have some named condition like "not meaningful_result(step.result)".
if step.result in (Result.aborted, Result.infra_failed):
_report_jobstep_result(step)
return
# Check for FailureReason objects generated by child jobs
failure_result = _result_from_failure_reasons(step)
if failure_result and failure_result != step.result:
step.result = failure_result
db.session.add(step)
db.session.commit()
if failure_result == Result.infra_failed:
_report_jobstep_result(step)
return
try:
record_coverage_stats(step)
except Exception:
current_app.logger.exception('Failing recording coverage stats for step %s', step.id)
missing_tests = is_missing_tests(step, jobplan)
try_create(ItemStat, where={
'item_id': step.id,
'name': 'tests_missing',
'value': int(missing_tests),
})
if missing_tests:
if step.result != Result.failed:
step.result = Result.failed
db.session.add(step)
try_create(FailureReason, {
'step_id': step.id,
'job_id': step.job_id,
'build_id': step.job.build_id,
'project_id': step.project_id,
'reason': 'missing_tests'
})
db.session.commit()
db.session.flush()
if has_test_failures(step):
if step.result != Result.failed:
step.result = Result.failed
db.session.add(step)
try_create(FailureReason, {
'step_id': step.id,
'job_id': step.job_id,
'build_id': step.job.build_id,
'project_id': step.project_id,
'reason': 'test_failures'
})
db.session.commit()
if has_missing_targets(step):
if step.result != Result.failed:
step.result = Result.failed
db.session.add(step)
BazelTarget.query.filter(
BazelTarget.step_id == step.id,
BazelTarget.status == Status.in_progress,
).update({
'status': Status.finished,
'result': Result.aborted,
})
try_create(FailureReason, {
'step_id': step.id,
'job_id': step.job_id,
'build_id': step.job.build_id,
'project_id': step.project_id,
'reason': 'missing_targets',
})
db.session.commit()
_report_jobstep_result(step)
def _report_jobstep_result(step):
"""To be called once we're done syncing a JobStep to report the result for monitoring.
Args:
step (JobStep): The JobStep to report the result of.
"""
labels = {
Result.unknown: 'unknown',
Result.passed: 'passed',
Result.failed: 'failed',
Result.infra_failed: 'infra_failed',
Result.aborted: 'aborted',
Result.skipped: 'skipped',
}
label = labels.get(step.result, 'OTHER')
# TODO(kylec): Include the project slug in the metric so we can
# track on a per-project basis if needed.
statsreporter.stats().incr('jobstep_result_' + label)