Skip to content

Pipelines

kelp.pipelines

init

init(
    project_file_path=None,
    target=None,
    init_vars=None,
    manifest_file_path=None,
    refresh=False,
    store_in_global=True,
    run_policy_checks=False,
    log_level=None,
)

Initialize kelp runtime context from current directory.

When manifest_file_path is provided (or resolved from KELP_MANIFEST_FILE environment variable), the context is loaded directly from a pre-built manifest JSON file, skipping all project discovery, Jinja rendering, and metadata loading.

When policy_config.enabled is True in the project settings, metadata governance policies are evaluated immediately after loading. Warn-severity violations are logged; error-severity violations raise a RuntimeError.

Parameters:

Name Type Description Default
project_file_path str | None

Path to project file or directory.

None
target str | None

Target environment name.

None
init_vars dict[str, Any] | None

Runtime variable overrides.

None
manifest_file_path str | None

Path to a manifest JSON file. When provided, skips source file loading. Also resolved from KELP_MANIFEST_FILE env var.

None
refresh bool

If True, recreate context even if one already exists.

False
store_in_global bool

Whether to store context globally.

True
run_policy_checks bool

Whether to run policy checks.

False
log_level str | None

Optional log level to configure.

None

Returns:

Type Description
MetaRuntimeContext

The initialized MetaRuntimeContext.

Source code in src/kelp/config/config.py
def init(
    project_file_path: str | None = None,
    target: str | None = None,
    init_vars: dict[str, Any] | None = None,
    manifest_file_path: str | None = None,
    refresh: bool = False,
    store_in_global: bool = True,
    run_policy_checks: bool = False,
    log_level: str | None = None,
) -> MetaRuntimeContext:
    """Initialize kelp runtime context from current directory.

    When ``manifest_file_path`` is provided (or resolved from ``KELP_MANIFEST_FILE``
    environment variable), the context is loaded directly from a pre-built
    manifest JSON file, skipping all project discovery, Jinja rendering, and
    metadata loading.

    When ``policy_config.enabled`` is True in the project settings, metadata
    governance policies are evaluated immediately after loading. Warn-severity
    violations are logged; error-severity violations raise a RuntimeError.

    Args:
        project_file_path: Path to project file or directory.
        target: Target environment name.
        init_vars: Runtime variable overrides.
        manifest_file_path: Path to a manifest JSON file. When provided, skips
            source file loading. Also resolved from KELP_MANIFEST_FILE env var.
        refresh: If True, recreate context even if one already exists.
        store_in_global: Whether to store context globally.
        run_policy_checks: Whether to run policy checks.
        log_level: Optional log level to configure.

    Returns:
        The initialized MetaRuntimeContext.
    """
    if log_level:
        configure_logging(log_level)

    ctx = KelpFramework.init(
        project_file_path=project_file_path,
        target=target,
        init_vars=init_vars,
        manifest_file_path=manifest_file_path,
        refresh=refresh,
        store_in_global=store_in_global,
    )

    _run_policy_checks(ctx, run_policy_checks)

    return ctx

func

func(name)

Get the fully qualified name for a Unity Catalog function.

Returns the fully qualified name (catalog.schema.function_name) of the function for use in PySpark expressions and SQL queries.

Parameters:

Name Type Description Default
name str

Function name.

required

Returns:

Type Description
str

Fully qualified function name.

Raises:

Type Description
KeyError

If the function name is not found in the catalog.

Source code in src/kelp/pipelines/api.py
def func(name: str) -> str:
    """Get the fully qualified name for a Unity Catalog function.

    Returns the fully qualified name (``catalog.schema.function_name``) of the
    function for use in PySpark expressions and SQL queries.

    Args:
        name: Function name.

    Returns:
        Fully qualified function name.

    Raises:
        KeyError: If the function name is not found in the catalog.
    """
    from kelp.tables import func as tables_func

    return tables_func(name)

get_model

get_model(name)

Get the KelpSdpModel object for a given table name.

Retrieves a KelpSdpModel instance with all computed properties including fully qualified name, schema, quality checks, and partition information.

Parameters:

Name Type Description Default
name str

Table name to retrieve.

required

Returns:

Name Type Description
KelpSdpModel KelpSdpModel

Table object with all properties and computed values.

Raises:

Type Description
KeyError

If the table name is not found in the catalog.

Source code in src/kelp/pipelines/api.py
def get_model(name: str) -> KelpSdpModel:
    """Get the KelpSdpModel object for a given table name.

    Retrieves a KelpSdpModel instance with all computed properties including
    fully qualified name, schema, quality checks, and partition information.

    Args:
        name: Table name to retrieve.

    Returns:
        KelpSdpModel: Table object with all properties and computed values.

    Raises:
        KeyError: If the table name is not found in the catalog.
    """
    return ModelManager.build_sdp_model(name)

params

params(name, exclude=None)

Get the streaming table parameters as a dictionary.

Returns all table parameters suitable for the @dp.table decorator, excluding quality expectations (expect_all, expect_all_or_drop, expect_all_or_fail, expect_all_or_quarantine).

Parameters:

Name Type Description Default
name str

Table name.

required
exclude list[str] | None

List of parameter keys to additionally exclude from the result.

None

Returns:

Type Description
dict[str, str]

Dictionary of streaming table parameters.

Raises:

Type Description
KeyError

If the table name is not found in the catalog.

Source code in src/kelp/pipelines/api.py
def params(name: str, exclude: list[str] | None = None) -> dict[str, str]:
    """Get the streaming table parameters as a dictionary.

    Returns all table parameters suitable for the @dp.table decorator, excluding
    quality expectations (expect_all, expect_all_or_drop, expect_all_or_fail,
    expect_all_or_quarantine).

    Args:
        name: Table name.
        exclude: List of parameter keys to additionally exclude from the result.

    Returns:
        Dictionary of streaming table parameters.

    Raises:
        KeyError: If the table name is not found in the catalog.
    """
    exclude = exclude or []
    return ModelManager.build_sdp_model(name).params(exclude=exclude)

params_cst

params_cst(name, exclude=None)

Get the create_streaming_table API parameters.

Returns table parameters suitable for the Databricks create_streaming_table API, excluding expect_all_or_quarantine which is not supported by the API.

Parameters:

Name Type Description Default
name str

Table name.

required
exclude list[str] | None

List of parameter keys to additionally exclude from the result.

None

Returns:

Type Description
dict[str, str]

Dictionary of create_streaming_table parameters.

Raises:

Type Description
KeyError

If the table name is not found in the catalog.

Source code in src/kelp/pipelines/api.py
def params_cst(name: str, exclude: list[str] | None = None) -> dict[str, str]:
    """Get the create_streaming_table API parameters.

    Returns table parameters suitable for the Databricks create_streaming_table API,
    excluding expect_all_or_quarantine which is not supported by the API.

    Args:
        name: Table name.
        exclude: List of parameter keys to additionally exclude from the result.

    Returns:
        Dictionary of create_streaming_table parameters.

    Raises:
        KeyError: If the table name is not found in the catalog.
    """
    exclude = exclude or []
    return ModelManager.build_sdp_model(name).params_cst(exclude=exclude)

ref

ref(name)

Get the source reference name for a table.

Returns the fully qualified name of the source table. This is always the main table name, regardless of quality configuration.

Used to reference the source table in SQL queries and pipeline definitions.

Parameters:

Name Type Description Default
name str

Table name.

required

Returns:

Type Description
str

Fully qualified source table name (catalog.schema.name).

Source code in src/kelp/pipelines/api.py
def ref(name: str) -> str:
    """Get the source reference name for a table.

    Returns the fully qualified name of the source table. This is always the
    main table name, regardless of quality configuration.

    Used to reference the source table in SQL queries and pipeline definitions.

    Args:
        name: Table name.

    Returns:
        Fully qualified source table name (catalog.schema.name).
    """
    return ModelManager.build_sdp_model(name).fqn or name

schema

schema(name, exclude=None)

Get the Spark schema DDL for a table.

Returns the full Spark schema definition including constraints and generated columns, suitable for use with the @dp.table decorator.

Parameters:

Name Type Description Default
name str

Table name.

required
exclude list[str] | None

Column names to exclude from the schema.

None

Returns:

Type Description
str | None

Spark schema DDL string, or None if not available.

Source code in src/kelp/pipelines/api.py
def schema(name: str, exclude: list[str] | None = None) -> str | None:
    """Get the Spark schema DDL for a table.

    Returns the full Spark schema definition including constraints and generated
    columns, suitable for use with the @dp.table decorator.

    Args:
        name: Table name.
        exclude: Column names to exclude from the schema.

    Returns:
        Spark schema DDL string, or None if not available.
    """
    return ModelManager.build_sdp_model(name, exclude=exclude).schema

schema_lite

schema_lite(name, exclude=None)

Get the raw Spark schema without constraints or generated columns.

Returns the basic Spark schema definition without any modifications, suitable for use with Struct operations or type references.

Parameters:

Name Type Description Default
name str

Table name.

required
exclude list[str] | None

Column names to exclude from the schema.

None

Returns:

Type Description
str | None

Raw Spark schema DDL string, or None if not available.

Source code in src/kelp/pipelines/api.py
def schema_lite(name: str, exclude: list[str] | None = None) -> str | None:
    """Get the raw Spark schema without constraints or generated columns.

    Returns the basic Spark schema definition without any modifications,
    suitable for use with Struct operations or type references.

    Args:
        name: Table name.
        exclude: Column names to exclude from the schema.

    Returns:
        Raw Spark schema DDL string, or None if not available.
    """
    return ModelManager.build_sdp_model(name, exclude=exclude).schema_lite

source

source(name)

Get the path for a data source.

Returns the path for a source (volume, table, or raw path) that can be used in pipelines for reading or writing data.

For table sources: returns the fully qualified name (catalog.schema.model_name) For volume sources: returns the volume path (/Volumes/catalog/schema/volume) For raw_path sources: returns the configured path

Parameters:

Name Type Description Default
name str

Source name.

required

Returns:

Type Description
str

Path string suitable for use with spark.read or spark.write.

Raises:

Type Description
KeyError

If the source is not found in the catalog.

ValueError

If the source configuration is incomplete.

Source code in src/kelp/pipelines/api.py
def source(name: str) -> str:
    """Get the path for a data source.

    Returns the path for a source (volume, table, or raw path) that can be used
    in pipelines for reading or writing data.

    For table sources: returns the fully qualified name (catalog.schema.model_name)
    For volume sources: returns the volume path (/Volumes/catalog/schema/volume)
    For raw_path sources: returns the configured path

    Args:
        name: Source name.

    Returns:
        Path string suitable for use with spark.read or spark.write.

    Raises:
        KeyError: If the source is not found in the catalog.
        ValueError: If the source configuration is incomplete.
    """
    return SourceManager.get_path(name)

source_options

source_options(name)

Get the options dictionary for a data source.

Returns source-specific options that can be passed to Spark readers/writers or used for configuring the source behavior.

Parameters:

Name Type Description Default
name str

Source name.

required

Returns:

Type Description
dict

Dictionary of source-specific options.

Raises:

Type Description
KeyError

If the source is not found in the catalog.

Source code in src/kelp/pipelines/api.py
def source_options(name: str) -> dict:
    """Get the options dictionary for a data source.

    Returns source-specific options that can be passed to Spark readers/writers
    or used for configuring the source behavior.

    Args:
        name: Source name.

    Returns:
        Dictionary of source-specific options.

    Raises:
        KeyError: If the source is not found in the catalog.
    """
    return SourceManager.get_options(name)

target

target(name)

Get the target table name for a given table.

Returns the appropriate target table name based on quality configuration: - If quarantine is enabled, returns the quarantine/validation table - Otherwise returns the main table name (FQN)

Used to determine which table should be written to in data pipelines based on quality configuration.

Parameters:

Name Type Description Default
name str

Table name.

required

Returns:

Type Description
str

Fully qualified target table name.

Source code in src/kelp/pipelines/api.py
def target(name: str) -> str:
    """Get the target table name for a given table.

    Returns the appropriate target table name based on quality configuration:
    - If quarantine is enabled, returns the quarantine/validation table
    - Otherwise returns the main table name (FQN)

    Used to determine which table should be written to in data pipelines based on
    quality configuration.

    Args:
        name: Table name.

    Returns:
        Fully qualified target table name.
    """
    return ModelManager.build_sdp_model(name).target_table or name

create_streaming_table

create_streaming_table(
    *,
    name,
    comment=None,
    spark_conf=None,
    table_properties=None,
    path=None,
    partition_cols=None,
    cluster_by_auto=None,
    cluster_by=None,
    schema=None,
    row_filter=None,
    expect_all=None,
    expect_all_or_drop=None,
    expect_all_or_fail=None,
    exclude_params=None,
    **kwargs,
)

Enhanced version of dp.create_streaming_table with Kelp metadata discovery.

Creates a Databricks streaming table with automatic parameter discovery from table metadata. This is useful for programmatic table creation where table configuration is defined in YAML.

Parameters:

Name Type Description Default
name str

Table name (required). Used to look up configuration in metadata.

required
comment str | None

Table description/comment.

None
spark_conf dict[str, str] | None

Spark configuration properties.

None
table_properties dict[str, str] | None

Databricks table properties.

None
path str | None

Physical path for external tables or custom locations.

None
partition_cols list[str] | None

List of column names for partitioning.

None
cluster_by_auto bool | None

Enable automatic clustering optimization.

None
cluster_by list[str] | None

List of column names for explicit clustering (max 4).

None
schema str | Any | None

Table schema definition (DDL or StructType).

None
row_filter str | None

SQL row filter expression.

None
expect_all dict[str, str] | None

Dictionary of SQL expressions that must all pass.

None
expect_all_or_drop dict[str, str] | None

Dictionary of SQL expressions; failing rows are dropped.

None
expect_all_or_fail dict[str, str] | None

Dictionary of SQL expressions; job fails if any fail.

None
exclude_params list[str] | None

List of parameter keys to exclude from metadata discovery.

None
**kwargs

Additional arguments passed to dp.create_streaming_table.

{}

Raises:

Type Description
KeyError

If the table name is not found in the catalog.

Source code in src/kelp/pipelines/streaming_tables.py
def create_streaming_table(
    *,
    name: str,
    comment: str | None = None,
    spark_conf: dict[str, str] | None = None,
    table_properties: dict[str, str] | None = None,
    path: str | None = None,
    partition_cols: list[str] | None = None,
    cluster_by_auto: bool | None = None,
    cluster_by: list[str] | None = None,
    schema: str | Any | None = None,
    row_filter: str | None = None,
    # Databricks supports expectation params directly on create_streaming_table
    expect_all: dict[str, str] | None = None,
    expect_all_or_drop: dict[str, str] | None = None,
    expect_all_or_fail: dict[str, str] | None = None,
    # Exclude
    exclude_params: list[str] | None = None,
    # Future extension point
    **kwargs,
) -> None:
    """Enhanced version of dp.create_streaming_table with Kelp metadata discovery.

    Creates a Databricks streaming table with automatic parameter discovery from
    table metadata. This is useful for programmatic table creation where table
    configuration is defined in YAML.

    Args:
        name: Table name (required). Used to look up configuration in metadata.
        comment: Table description/comment.
        spark_conf: Spark configuration properties.
        table_properties: Databricks table properties.
        path: Physical path for external tables or custom locations.
        partition_cols: List of column names for partitioning.
        cluster_by_auto: Enable automatic clustering optimization.
        cluster_by: List of column names for explicit clustering (max 4).
        schema: Table schema definition (DDL or StructType).
        row_filter: SQL row filter expression.
        expect_all: Dictionary of SQL expressions that must all pass.
        expect_all_or_drop: Dictionary of SQL expressions; failing rows are dropped.
        expect_all_or_fail: Dictionary of SQL expressions; job fails if any fail.
        exclude_params: List of parameter keys to exclude from metadata discovery.
        **kwargs: Additional arguments passed to dp.create_streaming_table.

    Raises:
        KeyError: If the table name is not found in the catalog.
    """

    params = dict(
        name=name,
        comment=comment,
        spark_conf=spark_conf,
        table_properties=table_properties,
        path=path,
        partition_cols=partition_cols,
        cluster_by_auto=cluster_by_auto,
        cluster_by=cluster_by,
        schema=schema,
        row_filter=row_filter,
        expect_all=expect_all,
        expect_all_or_drop=expect_all_or_drop,
        expect_all_or_fail=expect_all_or_fail,
    )

    meta_params = ModelManager.build_sdp_model(name, soft_handle=True).params_cst(
        exclude=exclude_params or []
    )

    params = merge_params(params, meta_params, kwargs)

    dp.create_streaming_table(**params)

materialized_view

materialized_view(
    query_function=None,
    *,
    name=None,
    comment=None,
    spark_conf=None,
    table_properties=None,
    path=None,
    partition_cols=None,
    cluster_by_auto=False,
    cluster_by=None,
    schema=None,
    row_filter=None,
    private=None,
    exclude_params=None,
    **kwargs,
)

Drop-in wrapper for @dp.materialized_view with Kelp metadata parameter injection.

This decorator mirrors the parameter style of :func:table and resolves missing values from Kelp model metadata, but it does not apply SDP expectations, DQX checks, or quarantine flows.

Parameters:

Name Type Description Default
query_function Callable[..., DataFrame] | None

The decorated function returning a DataFrame.

None
name str | None

Materialized view name. Defaults to function name.

None
comment str | None

Object description/comment.

None
spark_conf dict[str, str] | None

Spark configuration properties.

None
table_properties dict[str, str] | None

Databricks table properties.

None
path str | None

Physical path for custom locations.

None
partition_cols list[str] | None

List of column names for partitioning.

None
cluster_by_auto bool

Enable automatic clustering optimization.

False
cluster_by list[str] | None

List of column names for explicit clustering.

None
schema str | Any | None

Schema definition (DDL or StructType).

None
row_filter str | None

SQL row filter expression.

None
private bool | None

Whether the materialized view should be private.

None
exclude_params list[str] | None

List of metadata keys to exclude from auto-injection.

None
**kwargs

Additional arguments forwarded to @dp.materialized_view.

{}

Returns:

Type Description
Callable[[Callable[..., DataFrame]], None] | None

Decorator function or None when used as @materialized_view.

Source code in src/kelp/pipelines/streaming_tables.py
def materialized_view(
    query_function: Callable[..., DataFrame] | None = None,
    *,
    name: str | None = None,
    comment: str | None = None,
    spark_conf: dict[str, str] | None = None,
    table_properties: dict[str, str] | None = None,
    path: str | None = None,
    partition_cols: list[str] | None = None,
    cluster_by_auto: bool = False,
    cluster_by: list[str] | None = None,
    schema: str | Any | None = None,
    row_filter: str | None = None,
    private: bool | None = None,
    exclude_params: list[str] | None = None,
    **kwargs,
) -> Callable[[Callable[..., DataFrame]], None] | None:
    """Drop-in wrapper for @dp.materialized_view with Kelp metadata parameter injection.

    This decorator mirrors the parameter style of :func:`table` and resolves
    missing values from Kelp model metadata, but it does not apply SDP
    expectations, DQX checks, or quarantine flows.

    Args:
        query_function: The decorated function returning a DataFrame.
        name: Materialized view name. Defaults to function name.
        comment: Object description/comment.
        spark_conf: Spark configuration properties.
        table_properties: Databricks table properties.
        path: Physical path for custom locations.
        partition_cols: List of column names for partitioning.
        cluster_by_auto: Enable automatic clustering optimization.
        cluster_by: List of column names for explicit clustering.
        schema: Schema definition (DDL or StructType).
        row_filter: SQL row filter expression.
        private: Whether the materialized view should be private.
        exclude_params: List of metadata keys to exclude from auto-injection.
        **kwargs: Additional arguments forwarded to @dp.materialized_view.

    Returns:
        Decorator function or None when used as ``@materialized_view``.
    """
    params_passed = dict(
        name=name,
        comment=comment,
        spark_conf=spark_conf,
        table_properties=table_properties,
        path=path,
        partition_cols=partition_cols,
        cluster_by_auto=cluster_by_auto,
        cluster_by=cluster_by,
        schema=schema,
        row_filter=row_filter,
        private=private,
    )
    passed_kwargs = kwargs

    def outer(decorated: Callable[..., DataFrame]) -> None:
        model_name = name or getattr(decorated, "__name__", "unknown")
        sdp_table = ModelManager.build_sdp_model(model_name, soft_handle=True)
        meta_params = sdp_table.params(exclude=exclude_params or [])
        params = merge_params(params_passed, meta_params, passed_kwargs)
        dp.materialized_view(**params)(decorated)

    if query_function is not None and callable(query_function):
        outer(query_function)
        return None
    return outer

table

table(
    query_function=None,
    *,
    name=None,
    comment=None,
    spark_conf=None,
    table_properties=None,
    path=None,
    partition_cols=None,
    cluster_by_auto=False,
    cluster_by=None,
    schema=None,
    row_filter=None,
    private=None,
    expect_all=None,
    expect_all_or_drop=None,
    expect_all_or_fail=None,
    expect_all_or_quarantine=None,
    exclude_params=None,
    **kwargs,
)

Drop-in replacement for @dp.table with built-in data quality and quarantine support.

This decorator extends Databricks' standard @dp.table with enhanced data quality processing. It supports both SDP (Spark Data Pruning) expectations and DQX checks, with automatic quarantine table creation for failed records.

Features: - Automatic schema and parameter discovery from table metadata - SDP expectations (expect_all, expect_all_or_drop, expect_all_or_fail) - Quarantine table support for failed records - DQX integration for complex quality checks - Parameter override support for flexibility

Parameters:

Name Type Description Default
query_function Callable[..., DataFrame] | None

The decorated function returning a DataFrame (for @decorator form).

None
name str | None

Table name. If not provided, uses function name.

None
comment str | None

Table description/comment.

None
spark_conf dict[str, str] | None

Spark configuration properties.

None
table_properties dict[str, str] | None

Databricks table properties.

None
path str | None

Physical path for external tables or custom locations.

None
partition_cols list[str] | None

List of column names for partitioning.

None
cluster_by_auto bool

Enable automatic clustering optimization.

False
cluster_by list[str] | None

List of column names for explicit clustering (max 4).

None
schema str | Any | None

Table schema definition (DDL or StructType).

None
row_filter str | None

SQL row filter expression.

None
private bool | None

Whether the table should be private.

None
expect_all dict[str, str] | None

Dictionary of SQL expressions that must all pass.

None
expect_all_or_drop dict[str, str] | None

Dictionary of SQL expressions; failing rows are dropped.

None
expect_all_or_fail dict[str, str] | None

Dictionary of SQL expressions; job fails if any fail.

None
expect_all_or_quarantine dict[str, str] | None

Dictionary of SQL expressions; failing rows quarantined.

None
exclude_params list[str] | None

List of parameter keys to exclude from metadata discovery.

None
**kwargs

Additional arguments passed to the underlying @dp.table.

{}

Returns:

Type Description
Callable[[Callable[..., DataFrame]], None] | None

Decorator function or None if called with a query function directly.

Raises:

Type Description
ValueError

If validation or quarantine table names cannot be determined.

ImportError

If DQX checks are configured but databricks-labs-dqx is not installed.

Example

@streaming_table(name="my_table", expect_all={"non_null": "column1 IS NOT NULL"}) def get_my_data(): return spark.read.table("source_table")

Source code in src/kelp/pipelines/streaming_tables.py
def table(
    # --- standard @dp.table parameters (keep signature compatible) ---
    query_function: Callable[..., DataFrame] | None = None,
    *,
    name: str | None = None,
    comment: str | None = None,
    spark_conf: dict[str, str] | None = None,
    table_properties: dict[str, str] | None = None,
    path: str | None = None,
    partition_cols: list[str] | None = None,
    cluster_by_auto: bool = False,
    cluster_by: list[str] | None = None,
    schema: str | Any | None = None,
    row_filter: str | None = None,
    private: bool | None = None,
    # Kelp specific expectation parameters
    expect_all: dict[str, str] | None = None,
    expect_all_or_drop: dict[str, str] | None = None,
    expect_all_or_fail: dict[str, str] | None = None,
    expect_all_or_quarantine: dict[str, str] | None = None,
    # Exclude
    exclude_params: list[str] | None = None,
    # Future extension point
    **kwargs,
) -> Callable[[Callable[..., DataFrame]], None] | None:
    """Drop-in replacement for @dp.table with built-in data quality and quarantine support.

    This decorator extends Databricks' standard @dp.table with enhanced data quality
    processing. It supports both SDP (Spark Data Pruning) expectations and DQX checks,
    with automatic quarantine table creation for failed records.

    Features:
    - Automatic schema and parameter discovery from table metadata
    - SDP expectations (expect_all, expect_all_or_drop, expect_all_or_fail)
    - Quarantine table support for failed records
    - DQX integration for complex quality checks
    - Parameter override support for flexibility

    Args:
        query_function: The decorated function returning a DataFrame (for @decorator form).
        name: Table name. If not provided, uses function name.
        comment: Table description/comment.
        spark_conf: Spark configuration properties.
        table_properties: Databricks table properties.
        path: Physical path for external tables or custom locations.
        partition_cols: List of column names for partitioning.
        cluster_by_auto: Enable automatic clustering optimization.
        cluster_by: List of column names for explicit clustering (max 4).
        schema: Table schema definition (DDL or StructType).
        row_filter: SQL row filter expression.
        private: Whether the table should be private.
        expect_all: Dictionary of SQL expressions that must all pass.
        expect_all_or_drop: Dictionary of SQL expressions; failing rows are dropped.
        expect_all_or_fail: Dictionary of SQL expressions; job fails if any fail.
        expect_all_or_quarantine: Dictionary of SQL expressions; failing rows quarantined.
        exclude_params: List of parameter keys to exclude from metadata discovery.
        **kwargs: Additional arguments passed to the underlying @dp.table.

    Returns:
        Decorator function or None if called with a query function directly.

    Raises:
        ValueError: If validation or quarantine table names cannot be determined.
        ImportError: If DQX checks are configured but databricks-labs-dqx is not installed.

    Example:
        @streaming_table(name="my_table", expect_all={"non_null": "column1 IS NOT NULL"})
        def get_my_data():
            return spark.read.table("source_table")
    """
    # Build a dict of explicit overwrites provided at callsite.
    spark = SparkSession.active()
    params_passed = dict(
        name=name,
        comment=comment,
        spark_conf=spark_conf,
        table_properties=table_properties,
        path=path,
        partition_cols=partition_cols,
        cluster_by_auto=cluster_by_auto,
        cluster_by=cluster_by,
        schema=schema,
        row_filter=row_filter,
        private=private,
        expect_all=expect_all,
        expect_all_or_drop=expect_all_or_drop,
        expect_all_or_fail=expect_all_or_fail,
        expect_all_or_quarantine=expect_all_or_quarantine,
    )
    passed_kwargs = kwargs

    def outer(decorated: Callable[..., DataFrame]) -> None:

        model_name = name or getattr(decorated, "__name__", "unknown")
        sdp_table = ModelManager.build_sdp_model(model_name, soft_handle=True)
        meta_params = sdp_table.params_raw(exclude=exclude_params or [])

        params = merge_params(params_passed, meta_params, passed_kwargs)

        fqn = str(params.get("name") or model_name)
        expect_all = params.pop("expect_all", None)
        expect_all_or_drop = params.pop("expect_all_or_drop", None)
        expect_all_or_fail = params.pop("expect_all_or_fail", None)
        expect_all_or_quarantine = params.pop("expect_all_or_quarantine", None)

        # validation_model_name = table_def.get_validation_model_name()
        # quarantine_model_name = table_def.get_quarantine_model_name()
        validation_model_name = sdp_table.validation_table
        quarantine_model_name = sdp_table.quarantine_table
        if validation_model_name is None or quarantine_model_name is None:
            raise ValueError("Validation or quarantine table name is missing.")

        dqx_obj = sdp_table.get_dqx_check_obj()

        if dqx_obj:
            try:
                from databricks.labs.dqx.engine import DQEngine
            except ImportError as e:
                raise ImportError(
                    "DQX is required for using DQX checks in @streaming_table. Please install databricks-labs-dqx."
                    "For more information check https://databrickslabs.github.io/dqx/",
                ) from e

            from databricks.sdk import WorkspaceClient

            dq_engine = DQEngine(WorkspaceClient())
            dqx_checks = dqx_obj.checks

            def _apply_dqx_checks() -> DataFrame:
                df = decorated()
                result = dq_engine.apply_checks_by_metadata(df, dqx_checks)
                # result = df
                if isinstance(result, tuple):
                    # DQX may return (df, observation)
                    return result[0]
                return result

            sdp_level = dqx_obj.sdp_expect_level
            if sdp_level != "deactivate":
                if sdp_level == "warn":
                    expect_all = {"dqx_error": "_errors IS NULL"}
                elif sdp_level == "fail":
                    expect_all_or_fail = {"dqx_error": "_errors IS NULL"}
                elif sdp_level == "drop":
                    expect_all_or_drop = {"dqx_error": "_errors IS NULL"}

            validty_func = _apply_expectations(
                _apply_dqx_checks,
                expect_all,
                expect_all_or_drop,
                expect_all_or_fail,
            )

            if dqx_obj.sdp_quarantine:
                dp.table(
                    name=validation_model_name,
                    private=True,
                )(validty_func)  # ty:ignore[no-matching-overload]

                @dp.table(**params)
                def valid_table():
                    df = spark.readStream.table(validation_model_name)
                    return dq_engine.get_valid(df)

                @dp.table(
                    name=quarantine_model_name,
                    comment=f"Quarantined rows from {fqn}",
                )
                def invalid_table():
                    df = spark.readStream.table(validation_model_name)
                    return dq_engine.get_invalid(df)
            else:
                dp.table(**params)(validty_func)

        elif expect_all_or_quarantine:
            not_combined = _combine_rules(expect_all_or_quarantine, apply_not=True)
            quarantine_col = "is_quarantined"

            def validity_wrapper():
                return decorated().withColumn(quarantine_col, expr(not_combined))

            if not expect_all:
                expect_all = {}
            expect_all.update(expect_all_or_quarantine)
            # expect_all.update({"quarantine_col": f"{quarantine_col} = false"})

            validty_func = _apply_expectations(
                validity_wrapper,
                expect_all,
                expect_all_or_drop,
                expect_all_or_fail,
            )

            dp.table(
                name=validation_model_name,
                private=True,
                partition_cols=[quarantine_col],
            )(validty_func)  # ty:ignore[no-matching-overload]

            # Create two table one quarantined and one valid rows only.
            # Original table now only has valid rows.

            @dp.table(**params)
            def valid_table():
                return (
                    spark.readStream.table(validation_model_name)
                    .filter(f"{quarantine_col} = false")
                    .drop(quarantine_col)
                )

            # Quarantined table with invalid rows only.

            @dp.table(
                name=quarantine_model_name,
                comment=f"Quarantined rows from {fqn}",
            )
            def invalid_table():
                return (
                    spark.readStream.table(validation_model_name)
                    .filter(f"{quarantine_col} = true")
                    .drop(quarantine_col)
                )

        else:
            dp.table(**params)(
                _apply_expectations(decorated, expect_all, expect_all_or_drop, expect_all_or_fail),
            )

    # Support both @decorator and @decorator(...) forms, like @dp.table.
    if query_function is not None and callable(query_function):
        outer(query_function)
        return None
    return outer

apply_schema

apply_schema(
    name=None,
    *,
    schema=None,
    safe_cast=False,
    drop_extra_columns=True,
    add_missing_columns=True,
    missing_column_default=None,
)

Return a transformation that enforces schema on a DataFrame.

The returned callable is designed for :pyspark:DataFrame.transform <pyspark.sql.DataFrame.transform>.

Exactly one of name or schema must be provided.

  • If name is given the target schema is resolved from the Kelp metadata catalog via :func:kelp.models_api.columns. When no columns are defined the DataFrame is returned unchanged (pass-through).
  • If schema is given as a DDL string (e.g. "id INT, name STRING") or a :class:~pyspark.sql.types.StructType it is used directly.

Parameters:

Name Type Description Default
name str | None

Kelp table name — resolves schema from the metadata catalog.

None
schema str | StructType | None

Explicit target schema as DDL string or StructType.

None
safe_cast bool

When True, uses try_cast so incompatible values become NULL instead of raising. Defaults to False.

False
drop_extra_columns bool

When True (default), columns present in the DataFrame but absent from the target schema are removed.

True
add_missing_columns bool

When True (default), columns defined in the target schema but absent from the DataFrame are added with missing_column_default as their value.

True
missing_column_default Any

The literal value used for newly added columns. Defaults to None (SQL NULL).

None

Returns:

Type Description
Callable[[DataFrame], DataFrame]

A Callable[[DataFrame], DataFrame] suitable for

Callable[[DataFrame], DataFrame]

df.transform(…).

Raises:

Type Description
ValueError

If both or neither of name / schema are provided.

Source code in src/kelp/transformations/schema.py
def apply_schema(
    name: str | None = None,
    *,
    schema: str | StructType | None = None,
    safe_cast: bool = False,
    drop_extra_columns: bool = True,
    add_missing_columns: bool = True,
    missing_column_default: Any = None,
) -> Callable[[DataFrame], DataFrame]:
    """Return a transformation that enforces *schema* on a DataFrame.

    The returned callable is designed for :pyspark:`DataFrame.transform
    <pyspark.sql.DataFrame.transform>`.

    Exactly **one** of *name* or *schema* must be provided.

    * If *name* is given the target schema is resolved from the Kelp metadata
      catalog via :func:`kelp.models_api.columns`.  When no columns are defined
      the DataFrame is returned unchanged (pass-through).
    * If *schema* is given as a DDL string (e.g. ``"id INT, name STRING"``) or
      a :class:`~pyspark.sql.types.StructType` it is used directly.

    Args:
        name: Kelp table name — resolves schema from the metadata catalog.
        schema: Explicit target schema as DDL string or ``StructType``.
        safe_cast: When ``True``, uses ``try_cast`` so incompatible values
            become ``NULL`` instead of raising.  Defaults to ``False``.
        drop_extra_columns: When ``True`` (default), columns present in the
            DataFrame but absent from the target schema are removed.
        add_missing_columns: When ``True`` (default), columns defined in the
            target schema but absent from the DataFrame are added with
            *missing_column_default* as their value.
        missing_column_default: The literal value used for newly added columns.
            Defaults to ``None`` (SQL ``NULL``).

    Returns:
        A ``Callable[[DataFrame], DataFrame]`` suitable for
        ``df.transform(…)``.

    Raises:
        ValueError: If both or neither of *name* / *schema* are provided.
    """
    if name is not None and schema is not None:
        msg = "Provide either 'name' or 'schema', not both."
        raise ValueError(msg)
    if name is None and schema is None:
        msg = "Provide either 'name' (Kelp table name) or 'schema' (DDL / StructType)."
        raise ValueError(msg)

    target_schema = _resolve_schema(name=name, schema=schema)

    if target_schema is None:
        logger.debug("No schema found for '%s' — returning pass-through.", name)
        return lambda df: df

    logger.debug(
        "apply_schema target: %d columns (%s)",
        len(target_schema.fields),
        ", ".join(f.name for f in target_schema.fields),
    )

    def _enforce(df: DataFrame) -> DataFrame:
        return _enforce_schema(
            df,
            target_schema=target_schema,
            safe_cast=safe_cast,
            drop_extra_columns=drop_extra_columns,
            add_missing_columns=add_missing_columns,
            missing_column_default=missing_column_default,
        )

    return _enforce