Skip to content

Transformations

kelp.transformations

DataFrame transformations backed by Kelp metadata.

Currently provides :func:apply_schema for schema enforcement and :func:apply_func for applying Unity Catalog functions.

apply_func

apply_func(func_name, new_column, parameters=None)

Return a transformation that applies a Unity Catalog function to a DataFrame.

The returned callable is designed for :pyspark:DataFrame.transform <pyspark.sql.DataFrame.transform>.

Parameters:

Name Type Description Default
func_name str

Name of the Unity Catalog function (e.g., 'normalize_email'). The function is resolved from the Kelp metadata catalog and invoked with the fully qualified name.

required
new_column str

Name of the new column to be created with the function result.

required
parameters dict[str, str] | str | None

Function parameters or literal column name. Can be: - A string (literal): single column name to pass to the function (e.g., "customer_id" passes the customer_id column). - A dict: mapping of function parameter names to DataFrame column names (e.g., {"id": "customer_id"} maps function param 'id' to column 'customer_id'). - None: assumes function parameter name matches the column name.

None

Returns:

Type Description
Callable[[DataFrame], DataFrame]

A Callable[[DataFrame], DataFrame] suitable for df.transform(…).

Raises:

Type Description
KeyError

If the function name is not found in the catalog.

ValueError

If parameters is invalid.

Examples:

Apply a function to a single column::

from kelp.transformations import apply_func

df = spark.read.table("customers")
result = df.transform(
    apply_func(
        func_name="normalize_email", new_column="email_normalized", parameters="email"
    )
)

Apply a function with multiple parameters::

result = df.transform(
    apply_func(
        func_name="format_full_name",
        new_column="full_name",
        parameters={"first_name": "first_name", "last_name": "last_name"},
    )
)
Source code in src/kelp/transformations/functions.py
def apply_func(
    func_name: str,
    new_column: str,
    parameters: dict[str, str] | str | None = None,
) -> Callable[[DataFrame], DataFrame]:
    """Return a transformation that applies a Unity Catalog function to a DataFrame.

    The returned callable is designed for :pyspark:`DataFrame.transform
    <pyspark.sql.DataFrame.transform>`.

    Args:
        func_name: Name of the Unity Catalog function (e.g., 'normalize_email').
            The function is resolved from the Kelp metadata catalog and invoked
            with the fully qualified name.
        new_column: Name of the new column to be created with the function result.
        parameters: Function parameters or literal column name.
            Can be:
            - A string (literal): single column name to pass to the function
              (e.g., ``"customer_id"`` passes the customer_id column).
            - A dict: mapping of function parameter names to DataFrame column names
              (e.g., ``{"id": "customer_id"}`` maps function param 'id' to column 'customer_id').
            - None: assumes function parameter name matches the column name.

    Returns:
        A ``Callable[[DataFrame], DataFrame]`` suitable for ``df.transform(…)``.

    Raises:
        KeyError: If the function name is not found in the catalog.
        ValueError: If parameters is invalid.

    Examples:
        Apply a function to a single column::

            from kelp.transformations import apply_func

            df = spark.read.table("customers")
            result = df.transform(
                apply_func(
                    func_name="normalize_email", new_column="email_normalized", parameters="email"
                )
            )

        Apply a function with multiple parameters::

            result = df.transform(
                apply_func(
                    func_name="format_full_name",
                    new_column="full_name",
                    parameters={"first_name": "first_name", "last_name": "last_name"},
                )
            )
    """
    from kelp.tables import func as get_func_fqn

    # Resolve the fully qualified function name from the catalog
    fqn = get_func_fqn(func_name)

    logger.debug(
        "Creating apply_func transformation: %s -> %s with parameters: %s",
        func_name,
        new_column,
        parameters,
    )

    def _apply(df: DataFrame) -> DataFrame:
        """Apply the function to the DataFrame."""
        # Handle different parameters types
        if isinstance(parameters, str):
            # Literal column name - pass directly to function
            func_expr = functions.expr(f"{fqn}({parameters})")
        elif isinstance(parameters, dict):
            # Parameter mapping - build expression with mapped column names
            col_names = [f"`{col}`" for col in parameters.values()]
            func_expr = functions.expr(f"{fqn}({', '.join(col_names)})")
        else:
            # No mapping - assume parameter matches column name
            func_expr = functions.expr(f"{fqn}(*)")

        result_df = df.withColumn(new_column, func_expr)

        logger.debug(
            "Applied function %s to create column '%s'",
            fqn,
            new_column,
        )

        return result_df

    return _apply

apply_schema

apply_schema(
    name=None,
    *,
    schema=None,
    safe_cast=False,
    drop_extra_columns=True,
    add_missing_columns=True,
    missing_column_default=None,
)

Return a transformation that enforces schema on a DataFrame.

The returned callable is designed for :pyspark:DataFrame.transform <pyspark.sql.DataFrame.transform>.

Exactly one of name or schema must be provided.

  • If name is given the target schema is resolved from the Kelp metadata catalog via :func:kelp.models_api.columns. When no columns are defined the DataFrame is returned unchanged (pass-through).
  • If schema is given as a DDL string (e.g. "id INT, name STRING") or a :class:~pyspark.sql.types.StructType it is used directly.

Parameters:

Name Type Description Default
name str | None

Kelp table name — resolves schema from the metadata catalog.

None
schema str | StructType | None

Explicit target schema as DDL string or StructType.

None
safe_cast bool

When True, uses try_cast so incompatible values become NULL instead of raising. Defaults to False.

False
drop_extra_columns bool

When True (default), columns present in the DataFrame but absent from the target schema are removed.

True
add_missing_columns bool

When True (default), columns defined in the target schema but absent from the DataFrame are added with missing_column_default as their value.

True
missing_column_default Any

The literal value used for newly added columns. Defaults to None (SQL NULL).

None

Returns:

Type Description
Callable[[DataFrame], DataFrame]

A Callable[[DataFrame], DataFrame] suitable for

Callable[[DataFrame], DataFrame]

df.transform(…).

Raises:

Type Description
ValueError

If both or neither of name / schema are provided.

Source code in src/kelp/transformations/schema.py
def apply_schema(
    name: str | None = None,
    *,
    schema: str | StructType | None = None,
    safe_cast: bool = False,
    drop_extra_columns: bool = True,
    add_missing_columns: bool = True,
    missing_column_default: Any = None,
) -> Callable[[DataFrame], DataFrame]:
    """Return a transformation that enforces *schema* on a DataFrame.

    The returned callable is designed for :pyspark:`DataFrame.transform
    <pyspark.sql.DataFrame.transform>`.

    Exactly **one** of *name* or *schema* must be provided.

    * If *name* is given the target schema is resolved from the Kelp metadata
      catalog via :func:`kelp.models_api.columns`.  When no columns are defined
      the DataFrame is returned unchanged (pass-through).
    * If *schema* is given as a DDL string (e.g. ``"id INT, name STRING"``) or
      a :class:`~pyspark.sql.types.StructType` it is used directly.

    Args:
        name: Kelp table name — resolves schema from the metadata catalog.
        schema: Explicit target schema as DDL string or ``StructType``.
        safe_cast: When ``True``, uses ``try_cast`` so incompatible values
            become ``NULL`` instead of raising.  Defaults to ``False``.
        drop_extra_columns: When ``True`` (default), columns present in the
            DataFrame but absent from the target schema are removed.
        add_missing_columns: When ``True`` (default), columns defined in the
            target schema but absent from the DataFrame are added with
            *missing_column_default* as their value.
        missing_column_default: The literal value used for newly added columns.
            Defaults to ``None`` (SQL ``NULL``).

    Returns:
        A ``Callable[[DataFrame], DataFrame]`` suitable for
        ``df.transform(…)``.

    Raises:
        ValueError: If both or neither of *name* / *schema* are provided.
    """
    if name is not None and schema is not None:
        msg = "Provide either 'name' or 'schema', not both."
        raise ValueError(msg)
    if name is None and schema is None:
        msg = "Provide either 'name' (Kelp table name) or 'schema' (DDL / StructType)."
        raise ValueError(msg)

    target_schema = _resolve_schema(name=name, schema=schema)

    if target_schema is None:
        logger.debug("No schema found for '%s' — returning pass-through.", name)
        return lambda df: df

    logger.debug(
        "apply_schema target: %d columns (%s)",
        len(target_schema.fields),
        ", ".join(f.name for f in target_schema.fields),
    )

    def _enforce(df: DataFrame) -> DataFrame:
        return _enforce_schema(
            df,
            target_schema=target_schema,
            safe_cast=safe_cast,
            drop_extra_columns=drop_extra_columns,
            add_missing_columns=add_missing_columns,
            missing_column_default=missing_column_default,
        )

    return _enforce