Source code for changes.models.test

from __future__ import absolute_import, division

import re
import uuid

from datetime import datetime
from hashlib import sha1
from sqlalchemy import Column, DateTime, ForeignKey, String, Text, Integer
from sqlalchemy.event import listen
from sqlalchemy.orm import deferred, relationship
from sqlalchemy.schema import UniqueConstraint, Index

from changes.config import db
from changes.constants import Result
from changes.db.types.enum import Enum
from changes.db.types.guid import GUID
from changes.db.utils import model_repr


[docs]class TestCase(db.Model): """ A single run of a single test, together with any captured output, retry-count and its return value. Every test that gets run ever has a row in this table. At the time this was written, it seems to have 400-500M rows (how is this still surviving?) NOTE: DO NOT MODIFY THIS TABLE! Running migration on this table has caused unavailability in the past. If you need to add a new column, consider doing that on a new table and linking it back to tests via the ID. """ __tablename__ = 'test' __table_args__ = ( UniqueConstraint('job_id', 'label_sha', name='unq_test_name'), Index('idx_test_step_id', 'step_id'), Index('idx_test_project_key', 'project_id', 'label_sha'), Index('idx_task_date_created', 'date_created'), Index('idx_test_project_key_date', 'project_id', 'label_sha', 'date_created'), ) id = Column(GUID, nullable=False, primary_key=True, default=uuid.uuid4) job_id = Column(GUID, ForeignKey('job.id', ondelete="CASCADE"), nullable=False) project_id = Column(GUID, ForeignKey('project.id', ondelete="CASCADE"), nullable=False) step_id = Column(GUID, ForeignKey('jobstep.id', ondelete="CASCADE")) name_sha = Column('label_sha', String(40), nullable=False) name = Column(Text, nullable=False) _package = Column('package', Text, nullable=True) result = Column(Enum(Result), default=Result.unknown, nullable=False) duration = Column(Integer, default=0) message = deferred(Column(Text)) date_created = Column(DateTime, default=datetime.utcnow, nullable=False) reruns = Column(Integer) # owner should be considered an unstructured string field. It may contain # email address ("Foo <foo@example.com>", a username ("foo"), or something # else. This field is not used directly by Changes, so # providers + consumers on either side of Changes should be sure they know # what they're doing. owner = Column(Text) job = relationship('Job') step = relationship('JobStep') project = relationship('Project') __repr__ = model_repr('name', '_package', 'result') def __init__(self, **kwargs): super(TestCase, self).__init__(**kwargs) if self.id is None: self.id = uuid.uuid4() if self.result is None: self.result = Result.unknown if self.date_created is None: self.date_created = datetime.utcnow() @classmethod def calculate_name_sha(self, name): if name: return sha1(name).hexdigest() raise ValueError @property def sep(self): name = (self._package or self.name) # handle the case where it might begin with some special character if not re.match(r'^[a-zA-Z0-9]', name): return '/' elif '/' in name: return '/' return '.' def _get_package(self): if not self._package: try: package, _ = self.name.rsplit(self.sep, 1) except ValueError: package = None else: package = self._package return package def _set_package(self, value): self._package = value package = property(_get_package, _set_package) @property def short_name(self): name, package = self.name, self.package if package and name.startswith(package) and name != package: return name[len(package) + 1:] return name
def set_name_sha(target, value, oldvalue, initiator): if not value: return value new_sha = sha1(value).hexdigest() if new_sha != target.name_sha: target.name_sha = new_sha return value listen(TestCase.name, 'set', set_name_sha, retval=False)