Skip to content

Tables

kelp.tables

Generic model metadata API for use in any Spark job.

init

init(
    project_file_path=None,
    target=None,
    init_vars=None,
    manifest_file_path=None,
    refresh=False,
    store_in_global=True,
    run_policy_checks=False,
    log_level=None,
)

Initialize kelp runtime context from current directory.

When manifest_file_path is provided (or resolved from KELP_MANIFEST_FILE environment variable), the context is loaded directly from a pre-built manifest JSON file, skipping all project discovery, Jinja rendering, and metadata loading.

When policy_config.enabled is True in the project settings, metadata governance policies are evaluated immediately after loading. Warn-severity violations are logged; error-severity violations raise a RuntimeError.

Parameters:

Name Type Description Default
project_file_path str | None

Path to project file or directory.

None
target str | None

Target environment name.

None
init_vars dict[str, Any] | None

Runtime variable overrides.

None
manifest_file_path str | None

Path to a manifest JSON file. When provided, skips source file loading. Also resolved from KELP_MANIFEST_FILE env var.

None
refresh bool

If True, recreate context even if one already exists.

False
store_in_global bool

Whether to store context globally.

True
run_policy_checks bool

Whether to run policy checks.

False
log_level str | None

Optional log level to configure.

None

Returns:

Type Description
MetaRuntimeContext

The initialized MetaRuntimeContext.

Source code in src/kelp/config/config.py
def init(
    project_file_path: str | None = None,
    target: str | None = None,
    init_vars: dict[str, Any] | None = None,
    manifest_file_path: str | None = None,
    refresh: bool = False,
    store_in_global: bool = True,
    run_policy_checks: bool = False,
    log_level: str | None = None,
) -> MetaRuntimeContext:
    """Initialize kelp runtime context from current directory.

    When ``manifest_file_path`` is provided (or resolved from ``KELP_MANIFEST_FILE``
    environment variable), the context is loaded directly from a pre-built
    manifest JSON file, skipping all project discovery, Jinja rendering, and
    metadata loading.

    When ``policy_config.enabled`` is True in the project settings, metadata
    governance policies are evaluated immediately after loading. Warn-severity
    violations are logged; error-severity violations raise a RuntimeError.

    Args:
        project_file_path: Path to project file or directory.
        target: Target environment name.
        init_vars: Runtime variable overrides.
        manifest_file_path: Path to a manifest JSON file. When provided, skips
            source file loading. Also resolved from KELP_MANIFEST_FILE env var.
        refresh: If True, recreate context even if one already exists.
        store_in_global: Whether to store context globally.
        run_policy_checks: Whether to run policy checks.
        log_level: Optional log level to configure.

    Returns:
        The initialized MetaRuntimeContext.
    """
    if log_level:
        configure_logging(log_level)

    ctx = KelpFramework.init(
        project_file_path=project_file_path,
        target=target,
        init_vars=init_vars,
        manifest_file_path=manifest_file_path,
        refresh=refresh,
        store_in_global=store_in_global,
    )

    _run_policy_checks(ctx, run_policy_checks)

    return ctx

columns

columns(name)

Get the column definitions for a model.

Source code in src/kelp/tables/api.py
def columns(name: str) -> list[Column]:
    """Get the column definitions for a model."""
    model = ModelManager.build_model(name)
    if model.root_model:
        return model.root_model.columns
    return []

ddl

ddl(name, if_not_exists=True)

Get the full CREATE TABLE DDL statement for a model.

Source code in src/kelp/tables/api.py
def ddl(name: str, if_not_exists: bool = True) -> str | None:
    """Get the full CREATE TABLE DDL statement for a model."""
    return ModelManager.build_model(name).get_ddl(if_not_exists=if_not_exists)

func

func(name)

Get the fully qualified name for a Unity Catalog function.

Source code in src/kelp/tables/api.py
def func(name: str) -> str:
    """Get the fully qualified name for a Unity Catalog function."""
    from kelp.config import get_context

    context = get_context()
    return context.catalog_index.get("functions", name).get_qualified_name()

get_model

get_model(name)

Get the KelpModel object for a given model name.

Source code in src/kelp/tables/api.py
def get_model(name: str) -> KelpModel:
    """Get the KelpModel object for a given model name."""
    return ModelManager.build_model(name)

ref

ref(name)

Get the fully qualified name for a model.

Source code in src/kelp/tables/api.py
def ref(name: str) -> str:
    """Get the fully qualified name for a model."""
    return ModelManager.build_model(name).fqn or name

schema

schema(name, exclude=None)

Get the Spark schema DDL for a model.

Parameters:

Name Type Description Default
name str

Model name.

required
exclude list[str] | None

Column names to exclude from the schema.

None

Returns:

Type Description
str | None

Spark schema DDL string, or None if not available.

Source code in src/kelp/tables/api.py
def schema(name: str, exclude: list[str] | None = None) -> str | None:
    """Get the Spark schema DDL for a model.

    Args:
        name: Model name.
        exclude: Column names to exclude from the schema.

    Returns:
        Spark schema DDL string, or ``None`` if not available.
    """
    return ModelManager.build_model(name, exclude=exclude).schema

schema_lite

schema_lite(name, exclude=None)

Get the raw Spark schema without constraints or generated columns.

Parameters:

Name Type Description Default
name str

Model name.

required
exclude list[str] | None

Column names to exclude from the schema.

None

Returns:

Type Description
str | None

Spark schema DDL string, or None if not available.

Source code in src/kelp/tables/api.py
def schema_lite(name: str, exclude: list[str] | None = None) -> str | None:
    """Get the raw Spark schema without constraints or generated columns.

    Args:
        name: Model name.
        exclude: Column names to exclude from the schema.

    Returns:
        Spark schema DDL string, or ``None`` if not available.
    """
    return ModelManager.build_model(name, exclude=exclude).schema_lite

source

source(name)

Get the path for a data source.

Source code in src/kelp/tables/api.py
def source(name: str) -> str:
    """Get the path for a data source."""
    from kelp.service.source_manager import SourceManager

    return SourceManager.get_path(name)

source_options

source_options(name)

Get the options dictionary for a data source.

Source code in src/kelp/tables/api.py
def source_options(name: str) -> dict:
    """Get the options dictionary for a data source."""
    from kelp.service.source_manager import SourceManager

    return SourceManager.get_options(name)

kelp.service.model_manager.KelpModel dataclass

KelpModel(
    name,
    table_type=None,
    comment=None,
    table_properties=None,
    spark_conf=None,
    path=None,
    partition_cols=None,
    cluster_by_auto=None,
    cluster_by=None,
    row_filter=None,
    fqn=None,
    schema=None,
    schema_lite=None,
    dqx_checks=None,
    validation_table=None,
    quarantine_table=None,
    target_table=None,
    root_model=None,
)

name instance-attribute

name

table_type class-attribute instance-attribute

table_type = None

comment class-attribute instance-attribute

comment = None

table_properties class-attribute instance-attribute

table_properties = None

spark_conf class-attribute instance-attribute

spark_conf = None

path class-attribute instance-attribute

path = None

partition_cols class-attribute instance-attribute

partition_cols = None

cluster_by_auto class-attribute instance-attribute

cluster_by_auto = None

cluster_by class-attribute instance-attribute

cluster_by = None

row_filter class-attribute instance-attribute

row_filter = None

fqn class-attribute instance-attribute

fqn = None

schema class-attribute instance-attribute

schema = None

schema_lite class-attribute instance-attribute

schema_lite = None

dqx_checks class-attribute instance-attribute

dqx_checks = None

validation_table class-attribute instance-attribute

validation_table = None

quarantine_table class-attribute instance-attribute

quarantine_table = None

target_table class-attribute instance-attribute

target_table = None

root_model class-attribute instance-attribute

root_model = None

get_dqx_check_obj

get_dqx_check_obj()
Source code in src/kelp/service/model_manager.py
def get_dqx_check_obj(self) -> DQXQuality | None:
    if self.root_model and isinstance(self.root_model.quality, DQXQuality):
        return self.root_model.quality
    return None

get_ddl

get_ddl(if_not_exists=True)
Source code in src/kelp/service/model_manager.py
def get_ddl(self, if_not_exists: bool = True) -> str | None:
    mapped_type = _UC_TYPE.get(self.table_type.lower(), "TABLE") if self.table_type else "TABLE"
    return (
        ModelManager.get_spark_schema_ddl(
            self.root_model,
            table_type=mapped_type,
            if_not_exists=if_not_exists,
        )
        if self.root_model
        else None
    )

kelp.service.model_manager.KelpSdpModel dataclass

KelpSdpModel(
    name,
    table_type=None,
    comment=None,
    table_properties=None,
    spark_conf=None,
    path=None,
    partition_cols=None,
    cluster_by_auto=None,
    cluster_by=None,
    row_filter=None,
    fqn=None,
    schema=None,
    schema_lite=None,
    dqx_checks=None,
    validation_table=None,
    quarantine_table=None,
    target_table=None,
    root_model=None,
    expect_all=None,
    expect_all_or_fail=None,
    expect_all_or_drop=None,
    expect_all_or_quarantine=None,
)

Bases: KelpModel

expect_all class-attribute instance-attribute

expect_all = None

expect_all_or_fail class-attribute instance-attribute

expect_all_or_fail = None

expect_all_or_drop class-attribute instance-attribute

expect_all_or_drop = None

expect_all_or_quarantine class-attribute instance-attribute

expect_all_or_quarantine = None

name instance-attribute

name

table_type class-attribute instance-attribute

table_type = None

comment class-attribute instance-attribute

comment = None

table_properties class-attribute instance-attribute

table_properties = None

spark_conf class-attribute instance-attribute

spark_conf = None

path class-attribute instance-attribute

path = None

partition_cols class-attribute instance-attribute

partition_cols = None

cluster_by_auto class-attribute instance-attribute

cluster_by_auto = None

cluster_by class-attribute instance-attribute

cluster_by = None

row_filter class-attribute instance-attribute

row_filter = None

fqn class-attribute instance-attribute

fqn = None

schema class-attribute instance-attribute

schema = None

schema_lite class-attribute instance-attribute

schema_lite = None

dqx_checks class-attribute instance-attribute

dqx_checks = None

validation_table class-attribute instance-attribute

validation_table = None

quarantine_table class-attribute instance-attribute

quarantine_table = None

target_table class-attribute instance-attribute

target_table = None

root_model class-attribute instance-attribute

root_model = None

params

params(exclude=None)
Source code in src/kelp/service/model_manager.py
def params(self, exclude: list[str] | None = None) -> dict[str, str]:
    exclude = exclude or []
    default_exclude = [
        "expect_all",
        "expect_all_or_drop",
        "expect_all_or_fail",
        "expect_all_or_quarantine",
    ]
    exclude = list(set(exclude) | set(default_exclude))
    return self.get_sdp_params(exclude=exclude)

params_raw

params_raw(exclude=None)
Source code in src/kelp/service/model_manager.py
def params_raw(self, exclude: list[str] | None = None) -> dict[str, str]:
    exclude = exclude or []
    return self.get_sdp_params(exclude=exclude)

params_cst

params_cst(exclude=None)
Source code in src/kelp/service/model_manager.py
def params_cst(self, exclude: list[str] | None = None) -> dict[str, str]:
    exclude = exclude or []
    default_exclude = ["expect_all_or_quarantine"]
    exclude = list(set(exclude) | set(default_exclude))
    return self.get_sdp_params(exclude=exclude)

get_sdp_params

get_sdp_params(exclude=None)
Source code in src/kelp/service/model_manager.py
def get_sdp_params(self, exclude: list[str] | None = None) -> dict[str, Any]:
    exclude = exclude or []
    params = {
        "name": self.fqn,
        "comment": self.comment,
        "spark_conf": self.spark_conf,
        "table_properties": self.table_properties,
        "path": self.path,
        "partition_cols": self.partition_cols,
        "cluster_by_auto": self.cluster_by_auto,
        "cluster_by": self.cluster_by,
        "schema": self.schema or None,
        "row_filter": self.row_filter,
        "expect_all": self.expect_all,
        "expect_all_or_drop": self.expect_all_or_drop,
        "expect_all_or_fail": self.expect_all_or_fail,
        "expect_all_or_quarantine": self.expect_all_or_quarantine,
    }
    return {k: v for k, v in params.items() if (v is not None or "") and k not in exclude}

get_ddl

get_ddl(if_not_exists=False, or_refresh=True)
Source code in src/kelp/service/model_manager.py
def get_ddl(self, if_not_exists: bool = False, or_refresh: bool = True) -> str | None:
    mapped_type = _UC_TYPE.get(self.table_type.lower(), "TABLE") if self.table_type else "TABLE"
    return (
        ModelManager.get_spark_schema_ddl(
            self.root_model,
            table_type=mapped_type,
            if_not_exists=if_not_exists,
            or_refresh=or_refresh,
        )
        if self.root_model
        else None
    )

get_dqx_check_obj

get_dqx_check_obj()
Source code in src/kelp/service/model_manager.py
def get_dqx_check_obj(self) -> DQXQuality | None:
    if self.root_model and isinstance(self.root_model.quality, DQXQuality):
        return self.root_model.quality
    return None