# Copyright 2026 Daitum
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validator framework for field validation.
This module provides a flexible validation system that allows attaching validators
to fields, combining validators with logical operators, and generating validation
formulas with custom error messages.
"""
from __future__ import annotations
import operator
from abc import ABC, abstractmethod
from collections.abc import Callable, Sequence
from enum import Enum
from typing import Any
from typeguard import typechecked
[docs]
class Severity(Enum):
"""Enumeration of validation severity levels."""
INFO = "Info"
WARNING = "Warning"
ERROR = "Error"
CRITICAL = "Critical"
SEVERITY_RANK = {
Severity.INFO: 1,
Severity.WARNING: 2,
Severity.ERROR: 3,
Severity.CRITICAL: 4,
}
class BoundType(Enum):
"""Whether a range bound is inclusive or exclusive."""
INCLUSIVE = "Inclusive"
EXCLUSIVE = "Exclusive"
from daitum_model import formulas # noqa: E402
from .data_types import BaseDataType, DataType, MapDataType # noqa: E402
from .fields import Field # noqa: E402
from .formula import CONST, Formula, Operand # noqa: E402
from .model import ModelBuilder # noqa: E402
from .named_values import NamedValue # noqa: E402
from .tables import Table # noqa: E402
[docs]
@typechecked
class Validator(ABC):
"""
Abstract base class for field validators.
Validators check field values and generate error messages when validation fails.
They can be combined using AND (&) and OR (|) operators to create complex validation rules.
"""
[docs]
def __init__(
self,
severity: Severity,
custom_message: Formula | str | None = None,
custom_summary_message: str | None = None,
):
"""
Initialize a validator.
Args:
severity: The severity level of validation failures (INFO, WARNING, ERROR, CRITICAL).
custom_message: Optional custom error message to use instead of the default message.
custom_summary_message: Optional custom summary error message to use instead
of the default summary message.
"""
self.severity = severity
self.custom_message = custom_message
self.custom_summary_message = custom_summary_message
self.summary_message: str = ""
[docs]
@abstractmethod
def invalid(self, field: Operand, table: Table | None) -> Formula:
"""
Return a formula indicating whether the field is invalid
"""
[docs]
@abstractmethod
def message(self, field: Operand, table: Table | None) -> Formula | str:
"""
Return a formula containing the message if the field is invalid
"""
[docs]
@abstractmethod
def get_summary_message(self, field: Operand) -> str:
"""
Return a str containing the summary message if the field is invalid
"""
def _attach_to_field(self, field: Field, table: Table):
"""
Generate and register the ``__invalid__`` and ``__message__`` calculated fields
for *field* on *table*.
Args:
field: The field being validated.
table: The table that owns *field*.
"""
invalid = self.invalid(field, table)
message = self.message(field, table)
self.summary_message = self.get_summary_message(field)
invalid_field_name = f"{field.id}__invalid__{self.severity.value}"
message_field_name = f"{field.id}__message__{self.severity.value}"
invalid_field = table.add_calculated_field(invalid_field_name, invalid)
table.add_calculated_field(
message_field_name, formulas.IF(invalid_field, message, formulas.BLANK())
)
def _attach_to_named_value(self, value: NamedValue, model: ModelBuilder):
"""
Generate and register the ``__invalid__`` and ``__message__`` calculations
for *value* on *model*.
Args:
value: The named value being validated.
model: The ``ModelBuilder`` that owns *value*.
"""
invalid = self.invalid(value, None)
message = self.message(value, None)
invalid_value_name = f"{value.id}__invalid__{self.severity.value}"
message_value_name = f"{value.id}__message__{self.severity.value}"
invalid_value = model.add_calculation(invalid_value_name, invalid)
model.add_calculation(
message_value_name, formulas.IF(invalid_value, message, formulas.BLANK())
)
def __and__(self, other: Validator) -> Validator:
"""
Combine two validators with AND logic - both must be valid for result to be valid
"""
return _AndValidator(self, other)
def __or__(self, other: Validator) -> Validator:
"""
Combine two validators with OR logic - either can be valid for result to be valid
"""
return _OrValidator(self, other)
@typechecked
class _AndValidator(Validator):
"""
Composite validator that is invalid when *either* of its two child validators is invalid.
Created by the ``&`` operator on ``Validator`` instances. The message combines both
child messages when both fail, or shows only the relevant child message otherwise.
"""
def __init__(self, left: Validator, right: Validator):
severity = (
left.severity
if SEVERITY_RANK[left.severity] > SEVERITY_RANK[right.severity]
else right.severity
)
super().__init__(severity)
self.left = left
self.right = right
def invalid(self, field: Operand, table: Table | None) -> Formula:
left_invalid = self.left.invalid(field, table)
right_invalid = self.right.invalid(field, table)
return formulas.OR(left_invalid, right_invalid)
def message(self, field: Operand, table: Table | None) -> Formula | str:
left_invalid = self.left.invalid(field, table)
right_invalid = self.right.invalid(field, table)
left_msg = self.left.message(field, table)
right_msg = self.right.message(field, table)
return formulas.IF(
formulas.AND(left_invalid, right_invalid),
left_msg + " and " + right_msg,
formulas.IF(left_invalid, left_msg, right_msg),
)
def get_summary_message(self, field: Operand) -> str:
left_summary_msg = self.left.get_summary_message(field)
right_summary_msg = self.right.get_summary_message(field)
return left_summary_msg + " and " + right_summary_msg
@typechecked
class _OrValidator(Validator):
"""
Composite validator that is invalid only when *both* of its two child validators are invalid.
Created by the ``|`` operator on ``Validator`` instances. The message always combines
both child messages with "or".
"""
def __init__(self, left: Validator, right: Validator):
severity = (
left.severity
if SEVERITY_RANK[left.severity] > SEVERITY_RANK[right.severity]
else right.severity
)
super().__init__(severity)
self.left = left
self.right = right
def invalid(self, field: Operand, table: Table | None) -> Formula:
left_invalid = self.left.invalid(field, table)
right_invalid = self.right.invalid(field, table)
return formulas.AND(left_invalid, right_invalid)
def message(self, field: Operand, table: Table | None) -> Formula | str:
left_msg = self.left.message(field, table)
right_msg = self.right.message(field, table)
return left_msg + " or " + right_msg
def get_summary_message(self, field: Operand) -> str:
left_summary_msg = self.left.get_summary_message(field)
right_summary_msg = self.right.get_summary_message(field)
return left_summary_msg + " or " + right_summary_msg
[docs]
@typechecked
class RangeValidator(Validator): # pylint: disable=too-many-instance-attributes
"""
Validator that checks if a field value is within a specified range.
The range can have a min_value, max_value, or both bounds. Bounds can be static
values or dynamic field references.
"""
[docs]
def __init__(
self,
severity: Severity,
min_value: int | float | Operand | None,
max_value: int | float | Operand | None,
):
"""
Initialize a range validator.
Args:
severity: The severity level of validation failures.
min_value: Minimum allowed value, or None for no lower bound.
max_value: Maximum allowed value, or None for no upper bound.
"""
super().__init__(severity)
self._min_value = min_value
self._max_value = max_value
self.allow_blank: bool = False
self.min_bound_type: BoundType = BoundType.INCLUSIVE
self.max_bound_type: BoundType = BoundType.INCLUSIVE
[docs]
def set_allow_blank(self, allow_blank: bool) -> RangeValidator:
"""Sets whether blank values are considered valid."""
self.allow_blank = allow_blank
return self
[docs]
def set_custom_message(self, custom_message: Formula | str) -> RangeValidator:
"""Sets a custom error message to use instead of the default."""
self.custom_message = custom_message
return self
[docs]
def set_custom_summary_message(self, custom_summary_message: str) -> RangeValidator:
"""Sets a custom summary error message to use instead of the default."""
self.custom_summary_message = custom_summary_message
return self
[docs]
def set_min_bound_type(self, min_bound_type: BoundType) -> RangeValidator:
"""Sets whether the lower bound is inclusive or exclusive."""
self.min_bound_type = min_bound_type
return self
[docs]
def set_max_bound_type(self, max_bound_type: BoundType) -> RangeValidator:
"""Sets whether the upper bound is inclusive or exclusive."""
self.max_bound_type = max_bound_type
return self
@property
def min_value(self) -> int | float | Operand | None:
"""The minimum bound value, or None if unbounded."""
return self._min_value
@property
def max_value(self) -> int | float | Operand | None:
"""The maximum bound value, or None if unbounded."""
return self._max_value
def _apply_cmp(
self,
cmp_fn: Callable,
field: Operand,
bound: Operand,
is_array: bool,
data_type: DataType | MapDataType,
):
if is_array:
return formulas.AND(cmp_fn(field, bound))
if isinstance(data_type, MapDataType):
return formulas.AND(cmp_fn(formulas.VALUES(field), bound))
return cmp_fn(field, bound)
@staticmethod
def _convert_bound_value(value: Any, data_type: BaseDataType) -> Operand | None:
if value is None or isinstance(value, Operand):
return value
primitive_type = data_type.data_type if isinstance(data_type, MapDataType) else data_type
if isinstance(value, int) and primitive_type == DataType.DECIMAL:
return CONST(float(value))
return CONST(value)
def _get_invalid(
self,
field: Operand,
is_array: bool,
data_type: DataType | MapDataType,
non_array_data_type: DataType,
) -> Formula:
min_value: Operand | None = self._convert_bound_value(self._min_value, non_array_data_type)
max_value: Operand | None = self._convert_bound_value(self._max_value, non_array_data_type)
min_invalid = False
max_invalid = False
if min_value is not None:
if min_value.to_data_type() != non_array_data_type:
raise ValueError(
f"The minimum data type {min_value.to_data_type()} "
f"is not compatible with the field data type {non_array_data_type}."
)
# inclusive: invalid when field < min | exclusive: invalid when field <= min
min_cmp = operator.lt if self.min_bound_type == BoundType.INCLUSIVE else operator.le
min_invalid = self._apply_cmp(min_cmp, field, min_value, is_array, data_type)
if max_value is not None:
if max_value.to_data_type() != non_array_data_type:
raise ValueError(
f"The maximum data type {max_value.to_data_type()} "
f"is not compatible with the field data type {non_array_data_type}."
)
# inclusive: invalid when field > max | exclusive: invalid when field >= max
max_cmp = operator.gt if self.max_bound_type == BoundType.INCLUSIVE else operator.ge
max_invalid = self._apply_cmp(max_cmp, field, max_value, is_array, data_type)
return formulas.OR(min_invalid, max_invalid)
[docs]
def invalid(self, field: Operand, table: Table | None) -> Formula:
"""
Return a formula indicating whether the field value or named value is outside
the valid range.
Args:
field: The field or named value to validate.
table: The table containing the field. Optional.
Returns:
A Formula that evaluates to True if the field or named value is invalid,
False otherwise.
"""
data_type = field.to_data_type()
is_array = data_type.is_array()
if isinstance(data_type, DataType):
non_array_data_type = data_type.from_array() if is_array else data_type
elif isinstance(data_type, MapDataType):
non_array_data_type = data_type._data_type # pylint: disable=protected-access
else:
raise NotImplementedError(
f"Range validation is not supported for the data type {data_type}."
)
if isinstance(self._min_value, Field):
if not table:
raise ValueError("Table is not yet defined.")
if self._min_value not in table.get_fields():
raise ValueError(f"Field {self._min_value.id} not in table {table.id}")
if isinstance(self._max_value, Field):
if not table:
raise ValueError("Table is not yet defined.")
if self._max_value not in table.get_fields():
raise ValueError(f"Field {self._max_value.id} not in table {table.id}")
supported_types = [
DataType.INTEGER,
DataType.DECIMAL,
DataType.TIME,
DataType.DATE,
DataType.DATETIME,
]
if non_array_data_type not in supported_types:
raise NotImplementedError(
f"Range validation is not supported for the data type " f"{non_array_data_type}."
)
invalid = self._get_invalid(field, is_array, data_type, non_array_data_type)
return formulas.IF(formulas.ISBLANK(field), formulas.NOT(self.allow_blank), invalid)
[docs]
def message(self, field: Operand, table: Table | None) -> Formula | str:
"""
Return a formula containing the validation error message.
Args:
field: The field or named value to validate.
table: The table containing the field. Optional.
Returns:
A Formula containing an error message describing the validation failure.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_message:
return self.custom_message
min_msg = None
max_msg = None
min_value: Operand | None = self._convert_bound_value(self._min_value, field.to_data_type())
max_value: Operand | None = self._convert_bound_value(self._max_value, field.to_data_type())
if min_value is not None:
min_msg = f"{id_string} must be at least " + formulas.TEXT(min_value)
if max_value is not None:
max_msg = f"{id_string} can be up to " + formulas.TEXT(max_value)
if min_value and max_value:
return (
f"{id_string} must be between "
+ formulas.TEXT(min_value)
+ " and "
+ formulas.TEXT(max_value)
)
if min_msg:
return min_msg
if max_msg:
return max_msg
return ""
[docs]
def get_summary_message(self, field: Operand) -> str:
"""
Return a string of the summary error message.
Args:
field: The field or named value to validate.
Returns:
A string of the summary error message describing the validation failure.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_summary_message:
return self.custom_summary_message
return f"{id_string} values need attention"
[docs]
@typechecked
class NonBlankValidator(Validator):
"""
Validator that checks if a field is not blank.
A field is considered blank if it is empty or null.
"""
[docs]
def invalid(self, field: Operand, table: Table | None) -> Formula:
"""
Return a formula indicating whether the field or named value is null/empty.
If array/map provided, performs the validation on each element of the array/map.
Args:
field: The field or named value to validate.
table: The table containing the field. Optional.
Returns:
A Formula that evaluates to True if the field or named value is null/empty,
False otherwise.
"""
if field.to_data_type() == DataType.STRING:
return formulas.OR(formulas.ISBLANK(field), field.equal_to(""))
if field.to_data_type().is_array():
return formulas.IF(formulas.ISBLANK(field), True, formulas.COUNTBLANKS(field) > 0)
if isinstance(field.to_data_type(), MapDataType):
return formulas.IF(
formulas.ISBLANK(field), True, formulas.COUNTBLANKS(formulas.VALUES(field)) > 0
)
return formulas.ISBLANK(field)
[docs]
def message(self, field: Operand, table: Table | None) -> Formula | str:
"""
Return a formula containing the validation error message.
Args:
field: The field or named value to validate.
table: The table containing the field. Optional.
Returns:
A Formula containing an error message indicating the field or named value
cannot be blank.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_message:
return self.custom_message
return f"{id_string} cannot be blank"
[docs]
def get_summary_message(self, field: Operand) -> str:
"""
Return a string of the summary error message.
Args:
field: The field or named value to validate.
Returns:
A string of the summary error message describing the validation failure.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_summary_message:
return self.custom_summary_message
return f"Blank entries for {id_string}"
[docs]
@typechecked
class UniqueValidator(Validator):
"""
Validator that checks if a field value is unique within the table.
A field is considered invalid if its value appears more than once across all rows
in the table for that field.
"""
[docs]
def __init__(
self,
severity: Severity,
allow_blank: bool = False,
custom_message: Formula | str | None = None,
custom_summary_message: str | None = None,
):
"""
Initialize a Unique validator.
Args:
severity: The severity level of validation failures.
allow_blank: Whether to allow blank values. Defaults to False.
custom_message: Optional custom error message.
custom_summary_message: Optional custom summary error message.
"""
super().__init__(severity, custom_message, custom_summary_message)
self.allow_blank = allow_blank
[docs]
def invalid(self, field: Operand, table: Table | None) -> Formula:
"""
Return a formula indicating whether the field value is duplicated in the table.
Args:
field: The field to validate. Only valid for primitives.
table: The table containing the field.
Returns:
A Formula that evaluates to True if the value appears more than once, False otherwise.
"""
if not isinstance(field, Field):
raise ValueError("Invalid field type.")
if not table:
raise ValueError("Table cannot be empty.")
if field.to_data_type().is_array() or isinstance(field.to_data_type(), MapDataType):
raise NotImplementedError(
f"Unique validation is not supported for the data type {field.to_data_type()}."
)
unique_formula = formulas.COUNT(table[field.id], field).not_equal_to(1)
return formulas.IF(formulas.ISBLANK(field), formulas.NOT(self.allow_blank), unique_formula)
[docs]
def message(self, field: Operand, table: Table | None) -> Formula | str:
"""
Return a formula containing the validation error message.
Args:
field: The field to validate.
table: The table containing the field.
Returns:
A Formula containing an error message indicating the field value must be unique.
"""
if not isinstance(field, Field):
raise ValueError("Invalid field type.")
if not table:
raise ValueError("Table cannot be empty.")
if self.custom_message:
return self.custom_message
return f"The value {field.id} is not unique"
[docs]
def get_summary_message(self, field: Operand) -> str:
"""
Return a string of the summary error message.
Args:
field: The field or named value to validate.
Returns:
A string of the summary error message describing the validation failure.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_summary_message:
return self.custom_summary_message
return f"Duplicate entries for {id_string}"
[docs]
@typechecked
class ListValidator(Validator):
"""
Validator that checks if a field value is within a provided set of allowed values.
The list of allowed values must be non-empty and all values must share the same data type,
consistent with the field being validated.
"""
[docs]
def __init__(
self,
severity: Severity,
values: Sequence[Operand | str | int | float | bool],
custom_message: Formula | str | None = None,
custom_summary_message: str | None = None,
):
"""
Initialize a list validator.
Args:
severity: The severity level of validation failures.
values: The sequence of allowed values. Must be non-empty and all elements must share
the same data type.
custom_message: Optional custom error message.
custom_summary_message: Optional custom summary error message.
Raises:
ValueError: If values is empty.
"""
if not values:
raise ValueError("values must not be empty")
super().__init__(severity, custom_message, custom_summary_message)
self.values = values
[docs]
def invalid(self, field: Operand, table: Table | None) -> Formula:
"""
Return a formula indicating whether the field value or named value is not in the
allowed set. If array/map provided, performs the validation on each element of
the array/map.
Args:
field: The field or named value to validate.
table: The table containing the field. Optional.
Returns:
A Formula that evaluates to True if the value is not in the allowed list,
False otherwise.
"""
allowed_values = formulas.ARRAY(False, *self.values)
de_duplication_field: Formula | None = None
if isinstance(field.to_data_type(), MapDataType):
de_duplication_field = formulas.INTERSECTION(False, formulas.VALUES(field))
if field.to_data_type().is_array():
de_duplication_field = formulas.INTERSECTION(False, field)
if de_duplication_field:
intersection = formulas.INTERSECTION(False, allowed_values, de_duplication_field)
return formulas.NOT(
formulas.SIZE(intersection).equal_to(formulas.SIZE(de_duplication_field))
)
blank_type = type(formulas.BLANK())
# pylint: disable=unidiomatic-typecheck
if any(type(v) is blank_type for v in self.values): # noqa: E721
return formulas.IF(
formulas.ISBLANK(field),
False,
formulas.NOT(formulas.CONTAINS(allowed_values, field)),
)
return formulas.IF(
formulas.ISBLANK(field), True, formulas.NOT(formulas.CONTAINS(allowed_values, field))
)
[docs]
def message(self, field: Operand, table: Table | None) -> Formula | str:
"""
Return a formula containing the validation error message.
Args:
field: The field or named value to validate.
table: The table containing the field. Optional.
Returns:
A Formula containing an error message listing the allowed values.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_message:
return self.custom_message
return f"Unexpected values for {id_string}"
[docs]
def get_summary_message(self, field: Operand) -> str:
"""
Return a string of the summary error message.
Args:
field: The field or named value to validate.
Returns:
A string of the summary error message describing the validation failure.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_summary_message:
return self.custom_summary_message
return f"Unexpected values for {id_string}"
[docs]
@typechecked
class LengthValidator(Validator):
"""
Validator that checks if an array field has a specified number of elements.
Only applicable to array fields. The field is considered invalid if its element count
does not equal the expected length.
"""
[docs]
def __init__(
self,
severity: Severity,
length: int,
custom_message: Formula | str | None = None,
custom_summary_message: str | None = None,
):
"""
Initialize a length validator.
Args:
severity: The severity level of validation failures.
length: The expected number of elements in the array. Must be non-negative.
custom_message: Optional custom error message.
custom_summary_message: Optional custom summary error message.
Raises:
ValueError: If length is negative.
"""
if length < 0:
raise ValueError("Length must be non-negative")
super().__init__(severity, custom_message, custom_summary_message)
self.length = length
[docs]
def invalid(self, field: Operand, table: Table | None) -> Formula:
"""
Return a formula indicating whether the array length does not match the expected length.
Args:
field: The array field or named value to validate. Only valid for arrays.
table: The table containing the field. Optional.
Returns:
A Formula that evaluates to True if the element count differs from the expected
length, False otherwise.
"""
if not field.to_data_type().is_array():
raise NotImplementedError(
f"Length validation is not supported for the data type {field.to_data_type()}."
)
return formulas.ROWS(field).not_equal_to(self.length)
[docs]
def message(self, field: Operand, table: Table | None) -> Formula | str:
"""
Return a formula containing the validation error message.
Args:
field: The array field or named value to validate.
table: The table containing the field. Optional.
Returns:
A Formula containing an error message stating the expected array length.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_message:
return self.custom_message
return f"{id_string} requires {self.length} entries"
[docs]
def get_summary_message(self, field: Operand) -> str:
"""
Return a string of the summary error message.
Args:
field: The field or named value to validate.
Returns:
A string of the summary error message describing the validation failure.
"""
if isinstance(field, Field):
id_string = field.id
elif isinstance(field, NamedValue):
id_string = field.id
else:
raise ValueError("Invalid field type.")
if self.custom_summary_message:
return self.custom_summary_message
return f"Incorrect number of entries for {id_string}"