Source code for slm.validators

from collections import namedtuple
from datetime import datetime, timezone

from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db.models import Model
from django.utils.translation import gettext_lazy as _

from slm.defines import EquipmentState, FlagSeverity

# we can't use actual nulls for times because it breaks things like
# Greatest on MYSQL
# NULL_TIME = datetime.utcfromtimestamp(0)
NULL_TIME = datetime.fromtimestamp(0, timezone.utc)
NULL_VALUES = [None, "", NULL_TIME]


Flag = namedtuple("Flag", "message manual severity")


[docs] def get_validators(model, field): """ Get the validator list for a given model and field from validation settings. :param model: The Django model name <app_name>.<ModelClass> :param field: The field name :return: """ from django.conf import settings if isinstance(model, Model) or ( isinstance(model, type) and issubclass(model, Model) ): model = model._meta.label return getattr(settings, "SLM_DATA_VALIDATORS", {}).get(model, {}).get(field, [])
# toggle this flag to allow save block bypassing. BYPASS_BLOCKS = None def set_bypass(toggle): global BYPASS_BLOCKS BYPASS_BLOCKS = bool(toggle)
[docs] def bypass_block(): """ Return if we should bypass any validation save blocking. This setting is controlled by the SLM_VALIDATION_BYPASS_BLOCK setting in settings or can be preempted by toggling the BYPASS_BLOCKS flag above. This is important when working with legacy data. All validators should respect this flag. :return: True if we should bypass any save blocks """ from django.conf import settings if BYPASS_BLOCKS is None: return getattr(settings, "SLM_VALIDATION_BYPASS_BLOCK") return bool(BYPASS_BLOCKS)
class SLMValidator: severity = FlagSeverity.NOTIFY def __init__(self, *args, **kwargs): self.severity = kwargs.pop("severity", self.severity) super().__init__(*args, **kwargs) def __call__(self, instance, field, value): raise NotImplementedError() def validate(self, instance, field, validator): try: validator() except ValidationError as ve: if self.severity == FlagSeverity.BLOCK_SAVE and not bypass_block(): raise ve self.throw_flag(ve.message, instance, field) def throw_error(self, message, instance, field): if self.severity == FlagSeverity.BLOCK_SAVE and not bypass_block(): raise ValidationError(_(message)) self.throw_flag(message, instance, field) def throw_flag(self, message, instance, field): if not instance._flags: instance._flags = {} instance._flags[field.name] = message instance.save() class FieldRequired(SLMValidator): required_msg = _("This field is required.") desired_msg = _("This field is desired.") allow_legacy_nulls = False desired = False def __init__(self, allow_legacy_nulls=allow_legacy_nulls, desired=desired): self.allow_legacy_nulls = allow_legacy_nulls self.desired = desired super().__init__(severity=FlagSeverity.BLOCK_SAVE) def __call__(self, instance, field, value): if isinstance(value, str): value = value.strip() if value in NULL_VALUES: if ( self.desired or not self.allow_legacy_nulls or instance.get_initial_value(field.name) in NULL_VALUES ): self.throw_flag(self.desired_msg, instance, field) else: self.throw_error(self.required_msg, instance, field) class EnumValidator(SLMValidator): statement = _("Value not in Enumeration.") def __call__(self, instance, field, value): if isinstance(value, str): value = value.strip() if value not in NULL_VALUES: try: field.enum(value) except ValueError: self.throw_error(self.statement, instance, field) class VerifiedEquipmentValidator(SLMValidator): statement = _("This equipment code has not been verified.") def __call__(self, instance, field, value): if value.state == EquipmentState.UNVERIFIED: self.throw_error(self.statement, instance, field) elif value.state == EquipmentState.LEGACY: self.throw_error(self.statement, instance, field) class ActiveEquipmentValidator(SLMValidator): statement = _("This equipment code is no longer in use.") replaced_statement = _("This equipment code has been replaced by: {}.") def __call__(self, instance, field, value): if value.state == EquipmentState.LEGACY: if value.replaced_by.exists(): self.throw_error( self.replaced_statement.format( ", ".join([eq.model for eq in value.replaced_by.all()]) ), instance, field, ) else: self.throw_error(self.statement, instance, field) class NonEmptyValidator(SLMValidator): statement = _("More than zero selections should be made.") def __call__(self, instance, field, value): if not value.all(): self.throw_error(self.statement, instance, field) class FourIDValidator(SLMValidator): regex_val = RegexValidator(regex=r"[A-Z0-9]{4}") def __call__(self, instance, field, value): self.validate(instance, field, lambda: self.regex_val(value)) if not instance.site.name.startswith(value): self.throw_error( f"{field.verbose_name} " f"{_('must be the prefix of the 9 character site name')}.", instance, field, ) class ARPValidator(SLMValidator): def __call__(self, instance, field, value): at_arp = getattr( getattr(instance, "antenna_type", None), "reference_point", None ) if at_arp != value: self.throw_error( f"{getattr(value, 'name', None)} " f"{_('must does not match the antenna reference point: ')}" f"{getattr(at_arp, 'name', None)}.", instance, field, )
[docs] class TimeRangeBookendValidator(SLMValidator): """ Ensures that sections that should not have overlapping time ranges are properly bookended - i.e. that the time range fields are closed before the next section starts. """ accessor: str bookend_field: str
[docs] def __init__( self, *args, bookend_field="installed", severity=FlagSeverity.NOTIFY, **kwargs ): self.bookend_field = bookend_field assert self.bookend_field, "Bookend field must be specified." super().__init__(*args, severity=severity, **kwargs)
def __call__(self, instance, field, value): sections = ( getattr( instance.site, instance._meta.get_field("site").remote_field.get_accessor_name(), ) .head() .sort(reverse=True) ) last = sections[0] for section in sections[1:]: # todo - this should be unnecessary when validation system is made # more robust if "Must end before" in section._flags.get(field.name, ""): del section._flags[field.name] section.save() ###### last_start = getattr(last, self.bookend_field, None) if getattr(section, field.name, None) is None or ( last_start and last_start < getattr(section, field.name) ): self.throw_error( _("Must end before {} starts {}.").format( last, getattr(last, self.bookend_field, None) ), section, field, ) last = section
class TimeRangeValidator(SLMValidator): start_field = None end_field = None def __init__(self, *args, severity=FlagSeverity.BLOCK_SAVE, **kwargs): self.start_field = kwargs.pop("start_field", None) self.end_field = kwargs.pop("end_field", None) assert not (self.start_field and self.end_field) super().__init__(*args, severity=severity, **kwargs) def __call__(self, instance, field, value): if self.start_field: start = getattr(instance, self.start_field, None) if start is not None and start != NULL_TIME and value: if start > value: self.throw_error( f"{field.verbose_name} " f"{_('must be greater than')} " f"{instance._meta.get_field(self.start_field).verbose_name}", instance, field, ) if self.end_field: end = getattr(instance, self.end_field, None) if end is not None and end != NULL_TIME: if ( value is None or value == NULL_TIME and end is not None and end != NULL_TIME ): self.throw_error( f"{_('Cannot define')} " f"{instance._meta.get_field(self.end_field).verbose_name} " f"{_('without defining')} " f"{field.verbose_name}.", instance, field, ) elif end < value: self.throw_error( f"{field.verbose_name} " f"{_('must be less than')} " f"{instance._meta.get_field(self.end_field).verbose_name}", instance, field, )
[docs] class PositionsMatchValidator(SLMValidator): """ Attach this validator to SiteLocation llh and/or xyz fields to validate that these positions are within the given tolerance of each other. """ tolerance = 1.0 """ 3D tolerance in meters between the positions before flagging. """
[docs] def __init__( self, *args, severity=FlagSeverity.BLOCK_SAVE, tolerance: float = tolerance, **kwargs, ): self.tolerance = tolerance super().__init__(*args, severity=severity, **kwargs)
def __call__(self, instance, field, value): from math import sqrt from slm.utils import llh2xyz fieldname = field.name xyz1 = value if fieldname == "xyz" else llh2xyz(value) other = "xyz" if fieldname == "llh" else "llh" otherfield = instance._meta.get_field(other) if xyz1: xyz2 = ( llh2xyz(getattr(instance, other)) if other == "llh" else getattr(instance, other) ) if xyz2: diff = sqrt( (xyz1[0] - xyz2[0]) ** 2 + (xyz1[1] - xyz2[1]) ** 2 + (xyz1[2] - xyz2[2]) ** 2 ) if diff > self.tolerance: self.throw_error( f"{diff:.2f} meters away from {otherfield.verbose_name}", instance, field, )