Pipelines
kelp.pipelines
¶
init
¶
init(
project_file_path=None,
target=None,
init_vars=None,
manifest_file_path=None,
refresh=False,
store_in_global=True,
run_policy_checks=False,
log_level=None,
)
Initialize kelp runtime context from current directory.
When manifest_file_path is provided (or resolved from KELP_MANIFEST_FILE
environment variable), the context is loaded directly from a pre-built
manifest JSON file, skipping all project discovery, Jinja rendering, and
metadata loading.
When policy_config.enabled is True in the project settings, metadata
governance policies are evaluated immediately after loading. Warn-severity
violations are logged; error-severity violations raise a RuntimeError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
project_file_path
|
str | None
|
Path to project file or directory. |
None
|
target
|
str | None
|
Target environment name. |
None
|
init_vars
|
dict[str, Any] | None
|
Runtime variable overrides. |
None
|
manifest_file_path
|
str | None
|
Path to a manifest JSON file. When provided, skips source file loading. Also resolved from KELP_MANIFEST_FILE env var. |
None
|
refresh
|
bool
|
If True, recreate context even if one already exists. |
False
|
store_in_global
|
bool
|
Whether to store context globally. |
True
|
run_policy_checks
|
bool
|
Whether to run policy checks. |
False
|
log_level
|
str | None
|
Optional log level to configure. |
None
|
Returns:
| Type | Description |
|---|---|
MetaRuntimeContext
|
The initialized MetaRuntimeContext. |
Source code in src/kelp/config/config.py
func
¶
Get the fully qualified name for a Unity Catalog function.
Returns the fully qualified name (catalog.schema.function_name) of the
function for use in PySpark expressions and SQL queries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Function name. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Fully qualified function name. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the function name is not found in the catalog. |
Source code in src/kelp/pipelines/api.py
get_model
¶
Get the KelpSdpModel object for a given table name.
Retrieves a KelpSdpModel instance with all computed properties including fully qualified name, schema, quality checks, and partition information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name to retrieve. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
KelpSdpModel |
KelpSdpModel
|
Table object with all properties and computed values. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the table name is not found in the catalog. |
Source code in src/kelp/pipelines/api.py
params
¶
Get the streaming table parameters as a dictionary.
Returns all table parameters suitable for the @dp.table decorator, excluding quality expectations (expect_all, expect_all_or_drop, expect_all_or_fail, expect_all_or_quarantine).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name. |
required |
exclude
|
list[str] | None
|
List of parameter keys to additionally exclude from the result. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Dictionary of streaming table parameters. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the table name is not found in the catalog. |
Source code in src/kelp/pipelines/api.py
params_cst
¶
Get the create_streaming_table API parameters.
Returns table parameters suitable for the Databricks create_streaming_table API, excluding expect_all_or_quarantine which is not supported by the API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name. |
required |
exclude
|
list[str] | None
|
List of parameter keys to additionally exclude from the result. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Dictionary of create_streaming_table parameters. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the table name is not found in the catalog. |
Source code in src/kelp/pipelines/api.py
ref
¶
Get the source reference name for a table.
Returns the fully qualified name of the source table. This is always the main table name, regardless of quality configuration.
Used to reference the source table in SQL queries and pipeline definitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Fully qualified source table name (catalog.schema.name). |
Source code in src/kelp/pipelines/api.py
schema
¶
Get the Spark schema DDL for a table.
Returns the full Spark schema definition including constraints and generated columns, suitable for use with the @dp.table decorator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name. |
required |
exclude
|
list[str] | None
|
Column names to exclude from the schema. |
None
|
Returns:
| Type | Description |
|---|---|
str | None
|
Spark schema DDL string, or None if not available. |
Source code in src/kelp/pipelines/api.py
schema_lite
¶
Get the raw Spark schema without constraints or generated columns.
Returns the basic Spark schema definition without any modifications, suitable for use with Struct operations or type references.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name. |
required |
exclude
|
list[str] | None
|
Column names to exclude from the schema. |
None
|
Returns:
| Type | Description |
|---|---|
str | None
|
Raw Spark schema DDL string, or None if not available. |
Source code in src/kelp/pipelines/api.py
source
¶
Get the path for a data source.
Returns the path for a source (volume, table, or raw path) that can be used in pipelines for reading or writing data.
For table sources: returns the fully qualified name (catalog.schema.model_name) For volume sources: returns the volume path (/Volumes/catalog/schema/volume) For raw_path sources: returns the configured path
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Source name. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Path string suitable for use with spark.read or spark.write. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the source is not found in the catalog. |
ValueError
|
If the source configuration is incomplete. |
Source code in src/kelp/pipelines/api.py
source_options
¶
Get the options dictionary for a data source.
Returns source-specific options that can be passed to Spark readers/writers or used for configuring the source behavior.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Source name. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary of source-specific options. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the source is not found in the catalog. |
Source code in src/kelp/pipelines/api.py
target
¶
Get the target table name for a given table.
Returns the appropriate target table name based on quality configuration: - If quarantine is enabled, returns the quarantine/validation table - Otherwise returns the main table name (FQN)
Used to determine which table should be written to in data pipelines based on quality configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Fully qualified target table name. |
Source code in src/kelp/pipelines/api.py
create_streaming_table
¶
create_streaming_table(
*,
name,
comment=None,
spark_conf=None,
table_properties=None,
path=None,
partition_cols=None,
cluster_by_auto=None,
cluster_by=None,
schema=None,
row_filter=None,
expect_all=None,
expect_all_or_drop=None,
expect_all_or_fail=None,
exclude_params=None,
**kwargs,
)
Enhanced version of dp.create_streaming_table with Kelp metadata discovery.
Creates a Databricks streaming table with automatic parameter discovery from table metadata. This is useful for programmatic table creation where table configuration is defined in YAML.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Table name (required). Used to look up configuration in metadata. |
required |
comment
|
str | None
|
Table description/comment. |
None
|
spark_conf
|
dict[str, str] | None
|
Spark configuration properties. |
None
|
table_properties
|
dict[str, str] | None
|
Databricks table properties. |
None
|
path
|
str | None
|
Physical path for external tables or custom locations. |
None
|
partition_cols
|
list[str] | None
|
List of column names for partitioning. |
None
|
cluster_by_auto
|
bool | None
|
Enable automatic clustering optimization. |
None
|
cluster_by
|
list[str] | None
|
List of column names for explicit clustering (max 4). |
None
|
schema
|
str | Any | None
|
Table schema definition (DDL or StructType). |
None
|
row_filter
|
str | None
|
SQL row filter expression. |
None
|
expect_all
|
dict[str, str] | None
|
Dictionary of SQL expressions that must all pass. |
None
|
expect_all_or_drop
|
dict[str, str] | None
|
Dictionary of SQL expressions; failing rows are dropped. |
None
|
expect_all_or_fail
|
dict[str, str] | None
|
Dictionary of SQL expressions; job fails if any fail. |
None
|
exclude_params
|
list[str] | None
|
List of parameter keys to exclude from metadata discovery. |
None
|
**kwargs
|
Additional arguments passed to dp.create_streaming_table. |
{}
|
Raises:
| Type | Description |
|---|---|
KeyError
|
If the table name is not found in the catalog. |
Source code in src/kelp/pipelines/streaming_tables.py
materialized_view
¶
materialized_view(
query_function=None,
*,
name=None,
comment=None,
spark_conf=None,
table_properties=None,
path=None,
partition_cols=None,
cluster_by_auto=False,
cluster_by=None,
schema=None,
row_filter=None,
private=None,
exclude_params=None,
**kwargs,
)
Drop-in wrapper for @dp.materialized_view with Kelp metadata parameter injection.
This decorator mirrors the parameter style of :func:table and resolves
missing values from Kelp model metadata, but it does not apply SDP
expectations, DQX checks, or quarantine flows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_function
|
Callable[..., DataFrame] | None
|
The decorated function returning a DataFrame. |
None
|
name
|
str | None
|
Materialized view name. Defaults to function name. |
None
|
comment
|
str | None
|
Object description/comment. |
None
|
spark_conf
|
dict[str, str] | None
|
Spark configuration properties. |
None
|
table_properties
|
dict[str, str] | None
|
Databricks table properties. |
None
|
path
|
str | None
|
Physical path for custom locations. |
None
|
partition_cols
|
list[str] | None
|
List of column names for partitioning. |
None
|
cluster_by_auto
|
bool
|
Enable automatic clustering optimization. |
False
|
cluster_by
|
list[str] | None
|
List of column names for explicit clustering. |
None
|
schema
|
str | Any | None
|
Schema definition (DDL or StructType). |
None
|
row_filter
|
str | None
|
SQL row filter expression. |
None
|
private
|
bool | None
|
Whether the materialized view should be private. |
None
|
exclude_params
|
list[str] | None
|
List of metadata keys to exclude from auto-injection. |
None
|
**kwargs
|
Additional arguments forwarded to @dp.materialized_view. |
{}
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., DataFrame]], None] | None
|
Decorator function or None when used as |
Source code in src/kelp/pipelines/streaming_tables.py
table
¶
table(
query_function=None,
*,
name=None,
comment=None,
spark_conf=None,
table_properties=None,
path=None,
partition_cols=None,
cluster_by_auto=False,
cluster_by=None,
schema=None,
row_filter=None,
private=None,
expect_all=None,
expect_all_or_drop=None,
expect_all_or_fail=None,
expect_all_or_quarantine=None,
exclude_params=None,
**kwargs,
)
Drop-in replacement for @dp.table with built-in data quality and quarantine support.
This decorator extends Databricks' standard @dp.table with enhanced data quality processing. It supports both SDP (Spark Data Pruning) expectations and DQX checks, with automatic quarantine table creation for failed records.
Features: - Automatic schema and parameter discovery from table metadata - SDP expectations (expect_all, expect_all_or_drop, expect_all_or_fail) - Quarantine table support for failed records - DQX integration for complex quality checks - Parameter override support for flexibility
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_function
|
Callable[..., DataFrame] | None
|
The decorated function returning a DataFrame (for @decorator form). |
None
|
name
|
str | None
|
Table name. If not provided, uses function name. |
None
|
comment
|
str | None
|
Table description/comment. |
None
|
spark_conf
|
dict[str, str] | None
|
Spark configuration properties. |
None
|
table_properties
|
dict[str, str] | None
|
Databricks table properties. |
None
|
path
|
str | None
|
Physical path for external tables or custom locations. |
None
|
partition_cols
|
list[str] | None
|
List of column names for partitioning. |
None
|
cluster_by_auto
|
bool
|
Enable automatic clustering optimization. |
False
|
cluster_by
|
list[str] | None
|
List of column names for explicit clustering (max 4). |
None
|
schema
|
str | Any | None
|
Table schema definition (DDL or StructType). |
None
|
row_filter
|
str | None
|
SQL row filter expression. |
None
|
private
|
bool | None
|
Whether the table should be private. |
None
|
expect_all
|
dict[str, str] | None
|
Dictionary of SQL expressions that must all pass. |
None
|
expect_all_or_drop
|
dict[str, str] | None
|
Dictionary of SQL expressions; failing rows are dropped. |
None
|
expect_all_or_fail
|
dict[str, str] | None
|
Dictionary of SQL expressions; job fails if any fail. |
None
|
expect_all_or_quarantine
|
dict[str, str] | None
|
Dictionary of SQL expressions; failing rows quarantined. |
None
|
exclude_params
|
list[str] | None
|
List of parameter keys to exclude from metadata discovery. |
None
|
**kwargs
|
Additional arguments passed to the underlying @dp.table. |
{}
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., DataFrame]], None] | None
|
Decorator function or None if called with a query function directly. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If validation or quarantine table names cannot be determined. |
ImportError
|
If DQX checks are configured but databricks-labs-dqx is not installed. |
Example
@streaming_table(name="my_table", expect_all={"non_null": "column1 IS NOT NULL"}) def get_my_data(): return spark.read.table("source_table")
Source code in src/kelp/pipelines/streaming_tables.py
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 | |
apply_schema
¶
apply_schema(
name=None,
*,
schema=None,
safe_cast=False,
drop_extra_columns=True,
add_missing_columns=True,
missing_column_default=None,
)
Return a transformation that enforces schema on a DataFrame.
The returned callable is designed for :pyspark:DataFrame.transform
<pyspark.sql.DataFrame.transform>.
Exactly one of name or schema must be provided.
- If name is given the target schema is resolved from the Kelp metadata
catalog via :func:
kelp.models_api.columns. When no columns are defined the DataFrame is returned unchanged (pass-through). - If schema is given as a DDL string (e.g.
"id INT, name STRING") or a :class:~pyspark.sql.types.StructTypeit is used directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Kelp table name — resolves schema from the metadata catalog. |
None
|
schema
|
str | StructType | None
|
Explicit target schema as DDL string or |
None
|
safe_cast
|
bool
|
When |
False
|
drop_extra_columns
|
bool
|
When |
True
|
add_missing_columns
|
bool
|
When |
True
|
missing_column_default
|
Any
|
The literal value used for newly added columns.
Defaults to |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[DataFrame], DataFrame]
|
A |
Callable[[DataFrame], DataFrame]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both or neither of name / schema are provided. |