Functions¶
This guide explains how to define and organize kelp_functions in your Kelp project. Functions are reusable SQL or Python code that can be registered in Unity Catalog and referenced by transformations and ABAC policies.
Configure Paths and Defaults¶
Add dedicated paths and inheritance blocks to kelp_project.yml to manage function metadata and defaults:
kelp_project:
functions_path: "./kelp_metadata/functions"
functions:
+catalog: ${ function_catalog }
+schema: ${ function_schema }
vars:
function_catalog: security_catalog
function_schema: security_schema
The + prefix applies the values as defaults to all functions in the functions_path. You can also create sub-paths with their own defaults:
kelp_project:
functions_path: "./kelp_metadata/functions"
functions:
+catalog: ${ function_catalog }
+schema: ${ default_schema }
abac:
+schema: ${ abac_schema }
vars:
function_catalog: security_catalog
default_schema: public_functions
abac_schema: abac_functions
This allows grouping functions by domain while maintaining consistent naming and organization.
Define SQL Functions¶
SQL functions are lightweight, deterministic functions suitable for type conversions, calculations, and data transformations.
Inline Function Body¶
Define the function body directly in the YAML metadata:
kelp_functions:
- name: normalize_email
language: SQL
description: Normalize customer email addresses
parameters:
- name: email
data_type: STRING
returns_data_type: STRING
body: lower(trim(email))
External Body via body_path¶
For complex functions, store the SQL in a separate file and reference it using body_path:
kelp_functions:
- name: standardize_phone
language: SQL
description: Standardize phone number format using regex
parameters:
- name: phone
data_type: STRING
returns_data_type: STRING
body_path: ./kelp_metadata/functions/sql/standardize_phone.sql
The body_path is resolved relative to the project root.
Example file (kelp_metadata/functions/sql/standardize_phone.sql):
Define Python Functions¶
Python functions allow more complex logic and can leverage external libraries (with appropriate dependencies).
Python Function with Inline Body¶
kelp_functions:
- name: format_full_name
language: PYTHON
description: Format full name from first and last name
parameters:
- name: first_name
data_type: STRING
- name: last_name
data_type: STRING
returns_data_type: STRING
body: |
def format_full_name(first, last):
if first is None:
return last
if last is None:
return first
return f"{first} {last}".strip()
Python Function with External File¶
For better maintainability, store Python function bodies in separate files:
kelp_functions:
- name: classify_customer
language: PYTHON
description: Classify customers into segments based on spending
parameters:
- name: total_spent
data_type: DECIMAL
- name: transaction_count
data_type: INT
returns_data_type: STRING
body_path: ./kelp_metadata/functions/python/classify_customer.py
Example file (kelp_metadata/functions/python/classify_customer.py):
def classify_customer(spent, count):
if spent is None or count is None:
return "Unknown"
avg_transaction = spent / count if count > 0 else 0
if avg_transaction > 1000:
return "Premium"
elif count > 100:
return "Frequent"
elif spent > 5000:
return "High Value"
else:
return "Regular"
Python with Dependencies¶
For Python functions that require external packages, use the environment clause:
kelp_functions:
- name: calculate_hash
language: PYTHON
description: Calculate SHA-256 hash of input string
parameters:
- name: input_string
data_type: STRING
returns_data_type: STRING
environment:
dependencies:
- hashlib # Built-in, no version needed
environment_version: null # Use Databricks default
body_path: ./kelp_metadata/functions/python/calculate_hash.py
Example file (kelp_metadata/functions/python/calculate_hash.py):
import hashlib
def calculate_hash(value):
if value is None:
return None
return hashlib.sha256(value.encode()).hexdigest()
Table-Valued Functions¶
Define functions that return multiple columns using the returns_table clause:
kelp_functions:
- name: parse_json_payload
language: SQL
description: Parse nested JSON payment payload into columns
parameters:
- name: json_payload
data_type: STRING
returns_table:
- name: payment_id
data_type: STRING
- name: amount
data_type: DECIMAL
- name: currency
data_type: STRING
- name: timestamp
data_type: TIMESTAMP
body: |
SELECT
json_payload:payment_id::STRING as payment_id,
json_payload:amount::DECIMAL as amount,
json_payload:currency::STRING as currency,
json_payload:timestamp::TIMESTAMP as timestamp
Function Properties¶
Basic Properties¶
name- Unique function identifierlanguage-SQLorPYTHONdescription- Human-readable documentationparameters- List of input parameters with names and data typesreturns_data_type- For scalar functions, the return typereturns_table- For table-valued functions, the output column schemabody- Inline function body (string)body_path- Path to external function body file
Advanced Properties¶
kelp_functions:
- name: my_deterministic_function
language: SQL
catalog: security_catalog # Override inherited catalog
schema: custom_schema # Override inherited schema
temporary: false # Session-scoped function
if_not_exists: false # Fail if function exists
or_replace: true # Use CREATE OR REPLACE
deterministic: true # Function is deterministic
data_access: "CONTAINS SQL" # SQL data access level
default_collation: "as_cast" # Collation for comparisons
parameters:
- name: value
data_type: STRING
default_expression: "'default'"
comment: "Input value with default"
returns_data_type: STRING
body: upper(trim(value))
Organizing Functions¶
Create a clear directory structure for your functions:
kelp_metadata/functions/
├── functions.yml # Core functions
├── sql/
│ ├── normalize_email.sql
│ ├── standardize_phone.sql
│ └── encode_pii.sql
├── python/
│ ├── classify_customer.py
│ ├── calculate_hash.py
│ └── segment_users.py
└── abac/ # Functions for ABAC policies
├── abac_functions.yml
└── sql/
├── mask_ssn.sql
└── mask_ccn.sql
Keep different categories of functions organized by purpose and layer.
Using Functions in Transformations¶
Reference functions by their fully qualified name using kp.func():
from pyspark.sql.functions import col
import kelp.pipelines as kp
@kp.table()
def customer_normalized():
"""Use Kelp functions in transformations."""
df = spark.readStream.table(kp.ref("bronze_customers"))
return df.select(
col("id"),
# Call function by FQN
F.expr(f"{kp.func('normalize_email')}(email)").alias("normalized_email"),
col("name")
)
Syncing Functions to Catalog¶
Functions must be synced to Unity Catalog before they can be used in ABAC policies or other metadata.
Sync All Functions¶
import kelp.catalog as kc
kc.init("kelp_project.yml", target="prod")
for query in kc.sync_functions():
print(f"Executing: {query}")
spark.sql(query)
Sync Specific Functions¶
for query in kc.sync_functions(function_names=["normalize_email", "standardize_phone"]):
spark.sql(query)
Syncing with Other Metadata¶
Functions are not included in sync_catalog() by default. Sync them explicitly before your pipeline runs:
import kelp.catalog as kc
# Before pipeline runs
for query in kc.sync_functions():
spark.sql(query)
# After pipeline runs (tables/metrics exist)
for query in kc.sync_catalog():
spark.sql(query)
Best Practices¶
-
Use SQL for simple transformations - SQL functions are lighter and faster than Python.
-
Document your functions - Provide clear descriptions and parameter comments.
-
Keep functions focused - Each function should do one thing well.
-
Test with typical data - Ensure your functions handle NULL values and edge cases.
-
Use external files for complex logic - Keep YAML readable by moving large function bodies to separate files.
-
Organize hierarchically - Use subdirectories and inheritance blocks to group related functions.
-
Avoid hardcoding values - Use parameters instead of hardcoded constants in function logic.
-
Version your Python functions - Pin dependency versions in the
environment.dependencieslist. -
Document parameters - Add
commentfields to parameters for clarity:
kelp_functions:
- name: mask_value
language: SQL
parameters:
- name: value
data_type: STRING
comment: "Value to mask - will be truncated to 'XX****'"
- name: mask_char
data_type: STRING
default_expression: "'X'"
comment: "Character to use for masking (default 'X')"
returns_data_type: STRING
body: CONCAT(REPEAT(mask_char, 2), REPEAT('*', LENGTH(value) - 2))
See Also¶
- Transformations - Using functions in transformations
- ABAC Policies - Using functions for row/column access control
- Sync Metadata with Your Catalog - Registering functions in Unity Catalog