from __future__ import absolute_import, print_function
from datetime import datetime
from flask import current_app
from sqlalchemy.sql import func
from changes.backends.base import UnrecoverableException
from changes.config import db, queue, statsreporter
from changes.constants import Status, Result
from changes.db.utils import try_create
from changes.jobs.signals import fire_signal
from changes.models.itemstat import ItemStat
from changes.models.job import Job
from changes.models.jobphase import JobPhase
from changes.models.jobplan import JobPlan
from changes.models.jobstep import JobStep
from changes.models.test import TestCase
from changes.queue.task import tracked_task
from changes.utils.agg import aggregate_status, safe_agg
# Maximum duration of failed job that we will consider retrying (in seconds)
# TODO(nate): make this a config
MAX_DURATION_FOR_RETRY_SECS = 900
def aggregate_job_stat(job, name, func_=func.sum):
value = db.session.query(
func.coalesce(func_(ItemStat.value), 0),
).filter(
ItemStat.item_id.in_(
db.session.query(JobStep.id).filter(
JobStep.job_id == job.id,
JobStep.replacement_id.is_(None),
)
),
ItemStat.name == name,
).as_scalar()
try_create(ItemStat, where={
'item_id': job.id,
'name': name,
'value': value,
})
def _should_retry_jobstep(step):
return (step.result == Result.infra_failed and step.replacement_id is None and
(not step.duration or step.duration / 1000 < MAX_DURATION_FOR_RETRY_SECS) and
# make sure this jobstep hasn't already been retried
JobStep.query.filter(JobStep.replacement_id == step.id).first() is None)
def _find_and_retry_jobsteps(phase, implementation):
# phase.steps is ordered by date_started, so we retry the oldest jobsteps first
should_retry = [s for s in phase.steps if _should_retry_jobstep(s)]
if not should_retry:
return
already_retried = dict(db.session.query(JobStep.node_id, func.count(JobStep.node_id)).filter(
JobStep.job == phase.job,
JobStep.replacement_id.isnot(None)
).group_by(JobStep.node_id))
for step in should_retry:
# hard max on how many jobsteps we retry
if (sum(already_retried.itervalues()) >= current_app.config['JOBSTEP_RETRY_MAX']):
break
# max on how many different failing machines we'll retry jobsteps for.
if (step.node_id not in already_retried and len(already_retried) >= current_app.config['JOBSTEP_MACHINE_RETRY_MAX']):
break
newstep = implementation.create_replacement_jobstep(step)
if newstep:
statsreporter.stats().incr('jobstep_replaced')
# NB: node_id could be None if the jobstep failed before we got a node_id
already_retried[step.node_id] = already_retried.get(step.node_id, 0) + 1
def sync_job_phases(job, phases=None, implementation=None):
if phases is None:
phases = JobPhase.query.filter(JobPhase.job_id == job.id)
if implementation is None:
_, implementation = JobPlan.get_build_step_for_job(job_id=job.id)
for phase in phases:
sync_phase(phase, implementation)
def sync_phase(phase, implementation):
_find_and_retry_jobsteps(phase, implementation)
phase_steps = list(phase.steps)
if phase.date_started is None:
phase.date_started = safe_agg(min, (s.date_started for s in phase_steps))
db.session.add(phase)
if phase_steps:
if all(s.status == Status.finished for s in phase_steps):
phase.status = Status.finished
phase.date_finished = safe_agg(max, (s.date_finished for s in phase_steps))
else:
# ensure we dont set the status to finished unless it actually is
new_status = aggregate_status((s.status for s in phase_steps))
if new_status != Status.finished:
phase.status = new_status
if any(s.result is Result.failed for s in phase_steps):
phase.result = Result.failed
if phase.status == Status.finished:
# Sets the final phase result.
implementation.validate_phase(phase=phase)
if db.session.is_modified(phase):
phase.date_modified = datetime.utcnow()
db.session.add(phase)
db.session.commit()
def abort_job(task):
job = Job.query.get(task.kwargs['job_id'])
job.status = Status.finished
job.result = Result.aborted
db.session.add(job)
db.session.flush()
sync_job_phases(job)
db.session.commit()
current_app.logger.exception('Unrecoverable exception syncing job %s', job.id)
@tracked_task(on_abort=abort_job)
[docs]def sync_job(job_id):
"""
Updates jobphase and job statuses based on the status of the constituent jobsteps.
"""
job = Job.query.get(job_id)
if not job:
return
if job.status == Status.finished:
return
jobplan, implementation = JobPlan.get_build_step_for_job(job_id=job.id)
try:
implementation.update(job=job)
except UnrecoverableException:
job.status = Status.finished
job.result = Result.infra_failed
current_app.logger.exception('Unrecoverable exception syncing %s', job.id)
all_phases = list(job.phases)
# propagate changes to any phases as they live outside of the
# normalize synchronization routines
sync_job_phases(job, all_phases, implementation)
is_finished = sync_job.verify_all_children() == Status.finished
if any(p.status != Status.finished for p in all_phases):
is_finished = False
job.date_started = safe_agg(
min, (j.date_started for j in all_phases if j.date_started))
if is_finished:
job.date_finished = safe_agg(
max, (j.date_finished for j in all_phases if j.date_finished))
else:
job.date_finished = None
if job.date_started and job.date_finished:
job.duration = int((job.date_finished - job.date_started).total_seconds() * 1000)
else:
job.duration = None
# if any phases are marked as failing, fail the build
if any(j.result is Result.failed for j in all_phases):
job.result = Result.failed
# If any test cases were marked as failing, fail the build.
# The exception is if the only failing test case occurred in a JobStep that
# had an infra failure. In this case we can't trust the test case result as
# being meaningful and so we ignore these.
elif TestCase.query.join(JobStep, JobStep.id == TestCase.step_id).filter(
TestCase.result == Result.failed, TestCase.job_id == job.id,
JobStep.result != Result.infra_failed
).first():
job.result = Result.failed
# if we've finished all phases, use the best result available
elif is_finished:
# Sets the final job result.
implementation.validate(job=job)
else:
job.result = Result.unknown
if is_finished:
job.status = Status.finished
else:
# ensure we dont set the status to finished unless it actually is
new_status = aggregate_status((j.status for j in all_phases))
if new_status != Status.finished:
job.status = new_status
elif job.status == Status.finished:
job.status = Status.in_progress
current_app.logger.exception('Job incorrectly marked as finished: %s', job.id)
if db.session.is_modified(job):
job.date_modified = datetime.utcnow()
db.session.add(job)
db.session.commit()
if not is_finished:
raise sync_job.NotFinished
try:
aggregate_job_stat(job, 'test_count')
aggregate_job_stat(job, 'test_duration')
aggregate_job_stat(job, 'test_failures')
aggregate_job_stat(job, 'test_rerun_count')
aggregate_job_stat(job, 'tests_missing')
aggregate_job_stat(job, 'lines_covered')
aggregate_job_stat(job, 'lines_uncovered')
aggregate_job_stat(job, 'diff_lines_covered')
aggregate_job_stat(job, 'diff_lines_uncovered')
except Exception:
current_app.logger.exception('Failing recording aggregate stats for job %s', job.id)
fire_signal.delay(
signal='job.finished',
kwargs={'job_id': job.id.hex},
)
if jobplan:
queue.delay('update_project_plan_stats', kwargs={
'project_id': job.project_id.hex,
'plan_id': jobplan.plan_id.hex,
}, countdown=1)