from __future__ import absolute_import, division
import re
import uuid
from datetime import datetime
from hashlib import sha1
from sqlalchemy import Column, DateTime, ForeignKey, String, Text, Integer
from sqlalchemy.event import listen
from sqlalchemy.orm import deferred, relationship
from sqlalchemy.schema import UniqueConstraint, Index
from changes.config import db
from changes.constants import Result
from changes.db.types.enum import Enum
from changes.db.types.guid import GUID
from changes.db.utils import model_repr
[docs]class TestCase(db.Model):
"""
A single run of a single test, together with any captured output, retry-count
and its return value.
Every test that gets run ever has a row in this table.
At the time this was written, it seems to have 400-500M rows
(how is this still surviving?)
NOTE: DO NOT MODIFY THIS TABLE! Running migration on this table has caused
unavailability in the past. If you need to add a new column, consider doing
that on a new table and linking it back to tests via the ID.
"""
__tablename__ = 'test'
__table_args__ = (
UniqueConstraint('job_id', 'label_sha', name='unq_test_name'),
Index('idx_test_step_id', 'step_id'),
Index('idx_test_project_key', 'project_id', 'label_sha'),
Index('idx_task_date_created', 'date_created'),
Index('idx_test_project_key_date', 'project_id', 'label_sha', 'date_created'),
)
id = Column(GUID, nullable=False, primary_key=True, default=uuid.uuid4)
job_id = Column(GUID, ForeignKey('job.id', ondelete="CASCADE"), nullable=False)
project_id = Column(GUID, ForeignKey('project.id', ondelete="CASCADE"), nullable=False)
step_id = Column(GUID, ForeignKey('jobstep.id', ondelete="CASCADE"))
name_sha = Column('label_sha', String(40), nullable=False)
name = Column(Text, nullable=False)
_package = Column('package', Text, nullable=True)
result = Column(Enum(Result), default=Result.unknown, nullable=False)
duration = Column(Integer, default=0)
message = deferred(Column(Text))
date_created = Column(DateTime, default=datetime.utcnow, nullable=False)
reruns = Column(Integer)
# owner should be considered an unstructured string field. It may contain
# email address ("Foo <foo@example.com>", a username ("foo"), or something
# else. This field is not used directly by Changes, so
# providers + consumers on either side of Changes should be sure they know
# what they're doing.
owner = Column(Text)
job = relationship('Job')
step = relationship('JobStep')
project = relationship('Project')
__repr__ = model_repr('name', '_package', 'result')
def __init__(self, **kwargs):
super(TestCase, self).__init__(**kwargs)
if self.id is None:
self.id = uuid.uuid4()
if self.result is None:
self.result = Result.unknown
if self.date_created is None:
self.date_created = datetime.utcnow()
@classmethod
def calculate_name_sha(self, name):
if name:
return sha1(name).hexdigest()
raise ValueError
@property
def sep(self):
name = (self._package or self.name)
# handle the case where it might begin with some special character
if not re.match(r'^[a-zA-Z0-9]', name):
return '/'
elif '/' in name:
return '/'
return '.'
def _get_package(self):
if not self._package:
try:
package, _ = self.name.rsplit(self.sep, 1)
except ValueError:
package = None
else:
package = self._package
return package
def _set_package(self, value):
self._package = value
package = property(_get_package, _set_package)
@property
def short_name(self):
name, package = self.name, self.package
if package and name.startswith(package) and name != package:
return name[len(package) + 1:]
return name
def set_name_sha(target, value, oldvalue, initiator):
if not value:
return value
new_sha = sha1(value).hexdigest()
if new_sha != target.name_sha:
target.name_sha = new_sha
return value
listen(TestCase.name, 'set', set_name_sha, retval=False)