Skip to content

Policies

Reference for the metadata governance policy manager. PolicyManager evaluates Policy definitions against loaded models and returns a list of PolicyViolation objects.

kelp.service.policy_manager.PolicyManager

check_model

check_model(model, policies, fast_exit=False)

Check a single model against all matching policies.

All policies with matching applies_to patterns are applied. This allows cumulative governance requirements across layers and subdirectories.

Parameters:

Name Type Description Default
model Model

The model to check.

required
policies list[Policy]

All available policy definitions.

required
fast_exit bool

If True, return as soon as the first matching policy yields one or more violations for this model.

False

Returns:

Type Description
list[PolicyViolation]

List of policy violations from all matching policies.

Source code in src/kelp/service/policy_manager.py
def check_model(
    self,
    model: Model,
    policies: list[Policy],
    fast_exit: bool = False,
) -> list[PolicyViolation]:
    """Check a single model against all matching policies.

    All policies with matching applies_to patterns are applied. This allows
    cumulative governance requirements across layers and subdirectories.

    Args:
        model: The model to check.
        policies: All available policy definitions.
        fast_exit: If True, return as soon as the first matching policy
            yields one or more violations for this model.

    Returns:
        List of policy violations from all matching policies.
    """
    violations: list[PolicyViolation] = []
    model_fqn = model.get_qualified_name()

    # Resolve all matching policies (not just the first)
    resolved_policies = self._resolve_policy_for_model(model, policies)
    if not resolved_policies:
        return []

    # Apply all matching policies (in order)
    for resolved_policy in resolved_policies:
        model_rules = resolved_policy.model_rule
        column_rules = resolved_policy.column_rule
        policy_violations = self._check_model_rules(model, model_fqn, model_rules, column_rules)
        violations.extend(policy_violations)
        if fast_exit and policy_violations:
            return violations

    return violations

check_catalog

check_catalog(models, policies, fast_exit=False)
Source code in src/kelp/service/policy_manager.py
def check_catalog(
    self,
    models: list[Model],
    policies: list[Policy],
    fast_exit: bool = False,
) -> list[PolicyViolation]:
    all_violations: list[PolicyViolation] = []
    for model in models:
        all_violations.extend(self.check_model(model, policies, fast_exit=fast_exit))
    return all_violations

log_violations staticmethod

log_violations(violations)
Source code in src/kelp/service/policy_manager.py
@staticmethod
def log_violations(violations: list[PolicyViolation]) -> None:
    for violation in violations:
        if violation.severity == PolicySeverity.error:
            logger.error("[POLICY ERROR] %s", violation.message)
        else:
            logger.warning("[POLICY WARN] %s", violation.message)

raise_if_errors staticmethod

raise_if_errors(violations)
Source code in src/kelp/service/policy_manager.py
@staticmethod
def raise_if_errors(violations: list[PolicyViolation]) -> None:
    errors = [v for v in violations if v.severity == PolicySeverity.error]
    if errors:
        messages = "\n  ".join(v.message for v in errors)
        raise RuntimeError(f"Policy check failed with {len(errors)} error(s):\n  {messages}")

Model Policy Rules

The ModelPolicyRule class defines governance checks for tables:

from kelp.models.policy import ModelPolicyRule, PolicySeverity

rule = ModelPolicyRule(
    require_description=True,           # Table must have description
    require_any_tag=True,               # At least one tag must exist
    require_tags=["owner", "domain"],   # Specific tags required
    require_constraints=["primary_key"], # Constraint types required
    naming_pattern=r"^(bronze|silver|gold)_.*",  # Regex pattern
    has_columns=["id", "created_at"],   # Columns that must exist
    not_=False,                         # Use True to invert checks
    has_table_property={"owner": "data_team"},  # Properties required
    has_quality_check=True,             # Quality checks must be defined
    severity=PolicySeverity.error,
)

kelp.models.policy.ModelPolicyRule pydantic-model

Bases: BaseModel

Policy rules for table-level governance checks.

Attributes:

Name Type Description
require_description bool

Table must have a non-empty description.

require_tags list[str]

Specific tag keys that must be present on the table.

require_any_tag bool

At least one tag must exist on the table.

require_constraints list[str]

Constraint types that must be defined (e.g. "primary_key").

naming_pattern str | None

Regex pattern that table names must match.

has_columns list[str]

Column names that must be present in the table.

has_table_property dict

Table properties that must exist (partial match, allows extra keys).

has_quality_check bool

Table must have quality checks configured.

not_ bool

Negate checks for this rule block (YAML key: not).

severity PolicySeverity

Severity when this rule is violated.

Show JSON schema:
{
  "$defs": {
    "PolicySeverity": {
      "description": "Severity level for policy violations.\n\nAttributes:\n    warn: Log a warning but do not fail.\n    error: Raise an error and stop processing.",
      "enum": [
        "warn",
        "error"
      ],
      "title": "PolicySeverity",
      "type": "string"
    }
  },
  "description": "Policy rules for table-level governance checks.\n\nAttributes:\n    require_description: Table must have a non-empty description.\n    require_tags: Specific tag keys that must be present on the table.\n    require_any_tag: At least one tag must exist on the table.\n    require_constraints: Constraint types that must be defined (e.g. \"primary_key\").\n    naming_pattern: Regex pattern that table names must match.\n    has_columns: Column names that must be present in the table.\n    has_table_property: Table properties that must exist (partial match, allows extra keys).\n    has_quality_check: Table must have quality checks configured.\n    not_: Negate checks for this rule block (YAML key: ``not``).\n    severity: Severity when this rule is violated.",
  "properties": {
    "require_description": {
      "default": false,
      "description": "Table must have a non-empty description",
      "title": "Require Description",
      "type": "boolean"
    },
    "require_tags": {
      "description": "Specific tag keys that must be present on the table",
      "items": {
        "type": "string"
      },
      "title": "Require Tags",
      "type": "array"
    },
    "require_any_tag": {
      "default": false,
      "description": "At least one tag must exist on the table",
      "title": "Require Any Tag",
      "type": "boolean"
    },
    "require_constraints": {
      "description": "Constraint types that must be defined (e.g. 'primary_key', 'foreign_key')",
      "items": {
        "type": "string"
      },
      "title": "Require Constraints",
      "type": "array"
    },
    "naming_pattern": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Regex pattern that table names must match",
      "title": "Naming Pattern"
    },
    "has_columns": {
      "description": "Column names that must be present in the table",
      "items": {
        "type": "string"
      },
      "title": "Has Columns",
      "type": "array"
    },
    "has_table_property": {
      "additionalProperties": true,
      "description": "Table properties that must exist (partial match, allows extra keys)",
      "title": "Has Table Property",
      "type": "object"
    },
    "has_quality_check": {
      "default": false,
      "description": "Table must have quality checks configured",
      "title": "Has Quality Check",
      "type": "boolean"
    },
    "not": {
      "default": false,
      "description": "Negate checks for this rule block",
      "title": "Not",
      "type": "boolean"
    },
    "severity": {
      "$ref": "#/$defs/PolicySeverity",
      "default": "warn",
      "description": "Severity level when a rule is violated"
    }
  },
  "title": "ModelPolicyRule",
  "type": "object"
}

Config:

  • populate_by_name: True

Fields:

require_description pydantic-field

require_description = False

Table must have a non-empty description

require_tags pydantic-field

require_tags

Specific tag keys that must be present on the table

require_any_tag pydantic-field

require_any_tag = False

At least one tag must exist on the table

require_constraints pydantic-field

require_constraints

Constraint types that must be defined (e.g. 'primary_key', 'foreign_key')

naming_pattern pydantic-field

naming_pattern = None

Regex pattern that table names must match

has_columns pydantic-field

has_columns

Column names that must be present in the table

has_table_property pydantic-field

has_table_property

Table properties that must exist (partial match, allows extra keys)

has_quality_check pydantic-field

has_quality_check = False

Table must have quality checks configured

not_ pydantic-field

not_ = False

Negate checks for this rule block

severity pydantic-field

severity = warn

Severity level when a rule is violated

model_config class-attribute instance-attribute

model_config = ConfigDict(populate_by_name=True)

Column Policy Rules

The ColumnPolicyRule class defines governance checks for columns:

kelp.models.policy.ColumnPolicyRule pydantic-model

Bases: BaseModel

Policy rules for column-level governance checks.

Attributes:

Name Type Description
require_description bool

Each column must have a non-empty description.

require_tags list[str]

Specific tag keys that must be present on each column.

require_any_tag bool

At least one tag must exist on each column.

naming_pattern str | None

Regex pattern that column names must match.

naming_patterns_by_type list[NamingPattern]

Naming patterns for specific data types.

not_ bool

Negate checks for this rule block (YAML key: not).

severity PolicySeverity

Severity when this rule is violated.

Show JSON schema:
{
  "$defs": {
    "NamingPattern": {
      "description": "Naming convention rule for a specific data type.\n\nAttributes:\n    data_type: SQL data type this pattern applies to (e.g., \"STRING\", \"BOOLEAN\").\n    pattern: Regex pattern that column names must match (e.g., \"^(is_|has_).*\").",
      "properties": {
        "data_type": {
          "description": "SQL data type (e.g. STRING, BOOLEAN, INT)",
          "title": "Data Type",
          "type": "string"
        },
        "pattern": {
          "description": "Regex pattern column names must match",
          "title": "Pattern",
          "type": "string"
        }
      },
      "required": [
        "data_type",
        "pattern"
      ],
      "title": "NamingPattern",
      "type": "object"
    },
    "PolicySeverity": {
      "description": "Severity level for policy violations.\n\nAttributes:\n    warn: Log a warning but do not fail.\n    error: Raise an error and stop processing.",
      "enum": [
        "warn",
        "error"
      ],
      "title": "PolicySeverity",
      "type": "string"
    }
  },
  "description": "Policy rules for column-level governance checks.\n\nAttributes:\n    require_description: Each column must have a non-empty description.\n    require_tags: Specific tag keys that must be present on each column.\n    require_any_tag: At least one tag must exist on each column.\n    naming_pattern: Regex pattern that column names must match.\n    naming_patterns_by_type: Naming patterns for specific data types.\n    not_: Negate checks for this rule block (YAML key: ``not``).\n    severity: Severity when this rule is violated.",
  "properties": {
    "require_description": {
      "default": false,
      "description": "Each column must have a non-empty description",
      "title": "Require Description",
      "type": "boolean"
    },
    "require_tags": {
      "description": "Specific tag keys that must be present on each column",
      "items": {
        "type": "string"
      },
      "title": "Require Tags",
      "type": "array"
    },
    "require_any_tag": {
      "default": false,
      "description": "At least one tag must exist on each column",
      "title": "Require Any Tag",
      "type": "boolean"
    },
    "naming_pattern": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Regex pattern that all column names must match",
      "title": "Naming Pattern"
    },
    "naming_patterns_by_type": {
      "description": "Regex patterns for column names by data type",
      "items": {
        "$ref": "#/$defs/NamingPattern"
      },
      "title": "Naming Patterns By Type",
      "type": "array"
    },
    "not": {
      "default": false,
      "description": "Negate checks for this rule block",
      "title": "Not",
      "type": "boolean"
    },
    "severity": {
      "$ref": "#/$defs/PolicySeverity",
      "default": "warn",
      "description": "Severity level when a rule is violated"
    }
  },
  "title": "ColumnPolicyRule",
  "type": "object"
}

Config:

  • populate_by_name: True

Fields:

require_description pydantic-field

require_description = False

Each column must have a non-empty description

require_tags pydantic-field

require_tags

Specific tag keys that must be present on each column

require_any_tag pydantic-field

require_any_tag = False

At least one tag must exist on each column

naming_pattern pydantic-field

naming_pattern = None

Regex pattern that all column names must match

naming_patterns_by_type pydantic-field

naming_patterns_by_type

Regex patterns for column names by data type

not_ pydantic-field

not_ = False

Negate checks for this rule block

severity pydantic-field

severity = warn

Severity level when a rule is violated

model_config class-attribute instance-attribute

model_config = ConfigDict(populate_by_name=True)

Policy Severity

kelp.models.policy.PolicySeverity

Bases: StrEnum

Severity level for policy violations.

Attributes:

Name Type Description
warn

Log a warning but do not fail.

error

Raise an error and stop processing.

warn class-attribute instance-attribute

warn = 'warn'

error class-attribute instance-attribute

error = 'error'

Policy Violation

kelp.models.policy.PolicyViolation pydantic-model

Bases: BaseModel

A single policy rule violation.

Attributes:

Name Type Description
model_name str

Qualified name of the model with the violation.

column_name str | None

Column name if this is a column-level violation, else None.

rule str

The policy rule that was violated.

message str

Human-readable description of the violation.

severity PolicySeverity

Severity of this violation.

Show JSON schema:
{
  "$defs": {
    "PolicySeverity": {
      "description": "Severity level for policy violations.\n\nAttributes:\n    warn: Log a warning but do not fail.\n    error: Raise an error and stop processing.",
      "enum": [
        "warn",
        "error"
      ],
      "title": "PolicySeverity",
      "type": "string"
    }
  },
  "description": "A single policy rule violation.\n\nAttributes:\n    model_name: Qualified name of the model with the violation.\n    column_name: Column name if this is a column-level violation, else None.\n    rule: The policy rule that was violated.\n    message: Human-readable description of the violation.\n    severity: Severity of this violation.",
  "properties": {
    "model_name": {
      "description": "Qualified name of the offending model",
      "title": "Model Name",
      "type": "string"
    },
    "column_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Column name for column-level violations",
      "title": "Column Name"
    },
    "rule": {
      "description": "Policy rule identifier that was violated",
      "title": "Rule",
      "type": "string"
    },
    "message": {
      "description": "Human-readable violation description",
      "title": "Message",
      "type": "string"
    },
    "severity": {
      "$ref": "#/$defs/PolicySeverity",
      "description": "Violation severity"
    }
  },
  "required": [
    "model_name",
    "rule",
    "message",
    "severity"
  ],
  "title": "PolicyViolation",
  "type": "object"
}

Fields:

model_name pydantic-field

model_name

Qualified name of the offending model

column_name pydantic-field

column_name = None

Column name for column-level violations

rule pydantic-field

rule

Policy rule identifier that was violated

message pydantic-field

message

Human-readable violation description

severity pydantic-field

severity

Violation severity

Policy Config

PolicyConfig controls global policy execution settings (including enabled and fast_exit).

kelp.models.policy.PolicyConfig pydantic-model

Bases: BaseModel

Top-level policy execution switch.

Policy rules are authored in policy files (kelp_policies). This model intentionally keeps only the global enable/disable flag for applying policy checks during context initialization and via the check-policies CLI.

Attributes:

Name Type Description
enabled bool

Master switch to activate policy checks (default: False).

fast_exit bool

Stop policy evaluation on first violating policy per model.

Show JSON schema:
{
  "description": "Top-level policy execution switch.\n\nPolicy rules are authored in policy files (``kelp_policies``). This model\nintentionally keeps only the global enable/disable flag for applying policy\nchecks during context initialization and via the ``check-policies`` CLI.\n\nAttributes:\n    enabled: Master switch to activate policy checks (default: False).\n    fast_exit: Stop policy evaluation on first violating policy per model.",
  "properties": {
    "enabled": {
      "default": false,
      "description": "Master switch to activate policy checks",
      "title": "Enabled",
      "type": "boolean"
    },
    "fast_exit": {
      "default": false,
      "description": "Stop policy evaluation on first violating policy per model",
      "title": "Fast Exit",
      "type": "boolean"
    }
  },
  "title": "PolicyConfig",
  "type": "object"
}

Fields:

enabled pydantic-field

enabled = False

Master switch to activate policy checks

fast_exit pydantic-field

fast_exit = False

Stop policy evaluation on first violating policy per model

See Also