Transformations
kelp.transformations
¶
DataFrame transformations backed by Kelp metadata.
Currently provides :func:apply_schema for schema enforcement and
:func:apply_func for applying Unity Catalog functions.
apply_func
¶
Return a transformation that applies a Unity Catalog function to a DataFrame.
The returned callable is designed for :pyspark:DataFrame.transform
<pyspark.sql.DataFrame.transform>.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func_name
|
str
|
Name of the Unity Catalog function (e.g., 'normalize_email'). The function is resolved from the Kelp metadata catalog and invoked with the fully qualified name. |
required |
new_column
|
str
|
Name of the new column to be created with the function result. |
required |
parameters
|
dict[str, str] | str | None
|
Function parameters or literal column name.
Can be:
- A string (literal): single column name to pass to the function
(e.g., |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[DataFrame], DataFrame]
|
A |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the function name is not found in the catalog. |
ValueError
|
If parameters is invalid. |
Examples:
Apply a function to a single column::
from kelp.transformations import apply_func
df = spark.read.table("customers")
result = df.transform(
apply_func(
func_name="normalize_email", new_column="email_normalized", parameters="email"
)
)
Apply a function with multiple parameters::
result = df.transform(
apply_func(
func_name="format_full_name",
new_column="full_name",
parameters={"first_name": "first_name", "last_name": "last_name"},
)
)
Source code in src/kelp/transformations/functions.py
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | |
apply_schema
¶
apply_schema(
name=None,
*,
schema=None,
safe_cast=False,
drop_extra_columns=True,
add_missing_columns=True,
missing_column_default=None,
)
Return a transformation that enforces schema on a DataFrame.
The returned callable is designed for :pyspark:DataFrame.transform
<pyspark.sql.DataFrame.transform>.
Exactly one of name or schema must be provided.
- If name is given the target schema is resolved from the Kelp metadata
catalog via :func:
kelp.models_api.columns. When no columns are defined the DataFrame is returned unchanged (pass-through). - If schema is given as a DDL string (e.g.
"id INT, name STRING") or a :class:~pyspark.sql.types.StructTypeit is used directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Kelp table name — resolves schema from the metadata catalog. |
None
|
schema
|
str | StructType | None
|
Explicit target schema as DDL string or |
None
|
safe_cast
|
bool
|
When |
False
|
drop_extra_columns
|
bool
|
When |
True
|
add_missing_columns
|
bool
|
When |
True
|
missing_column_default
|
Any
|
The literal value used for newly added columns.
Defaults to |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[DataFrame], DataFrame]
|
A |
Callable[[DataFrame], DataFrame]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both or neither of name / schema are provided. |