Skip to content

Catalog

kelp.catalog

Unity Catalog Sync

create_metric_views

create_metric_views(view_names=None, filter_by_meta=None)

Create specified metric views in the remote catalog.

Creates metric views in Databricks Unity Catalog. If no specific view names are provided, all metric views from the catalog are created.

Parameters:

Name Type Description Default
view_names list[str] | None

List of metric view names to create. If None, creates all views.

None
filter_by_meta dict[str, Any] | None

Optional dict filter applied against each object's meta field using recursive dict-subset matching.

None

Returns:

Type Description
list[str]

List of SQL queries executed to create metric views.

Raises:

Type Description
KeyError

If a specified metric view name is not found in the catalog.

Exception

If metric view creation fails (see UnityCatalogAdapter for details).

Source code in src/kelp/catalog/api.py
def create_metric_views(
    view_names: list[str] | None = None,
    filter_by_meta: dict[str, Any] | None = None,
) -> list[str]:
    """Create specified metric views in the remote catalog.

    Creates metric views in Databricks Unity Catalog. If no specific view names
    are provided, all metric views from the catalog are created.

    Args:
        view_names: List of metric view names to create. If None, creates all views.
        filter_by_meta: Optional dict filter applied against each object's ``meta``
            field using recursive dict-subset matching.

    Returns:
        List of SQL queries executed to create metric views.

    Raises:
        KeyError: If a specified metric view name is not found in the catalog.
        Exception: If metric view creation fails (see UnityCatalogAdapter for details).
    """
    metric_views = _get_objects("metric_views", names=view_names, filter_by_meta=filter_by_meta)
    uc_adapter = UnityCatalogAdapter()
    logger.info("Starting remote catalog sync for all metric views...")
    queries = uc_adapter.create_all_metric_views(metric_views)
    return queries

sync_abac_policies

sync_abac_policies(policy_names=None, filter_by_meta=None)

Synchronize specified ABAC policies to the remote catalog.

Source code in src/kelp/catalog/api.py
def sync_abac_policies(
    policy_names: list[str] | None = None,
    filter_by_meta: dict[str, Any] | None = None,
) -> list[str]:
    """Synchronize specified ABAC policies to the remote catalog."""
    policies = _get_objects("abacs", names=policy_names, filter_by_meta=filter_by_meta)
    uc_adapter = UnityCatalogAdapter()
    queries = uc_adapter.sync_abac_policies(policies)
    return queries

sync_catalog

sync_catalog(
    sync_functions=False,
    sync_metric_views=True,
    sync_tables=True,
    sync_abacs=True,
    filter_by_meta=None,
    profile=None,
)

Synchronize all tables and metric views to remote Databricks catalog.

Synchronizes the entire local catalog (tables and metric views) to the remote Databricks Unity Catalog, applying any configuration such as tags and properties based on the remote_catalog_config settings.

Parameters:

Name Type Description Default
sync_functions bool

If True, syncs all functions to remote catalog.

False
sync_metric_views bool

If True, syncs all metric views to remote catalog.

True
sync_tables bool

If True, syncs all tables to remote catalog.

True
sync_abacs bool

If True, syncs all ABAC policies to remote catalog.

True
filter_by_meta dict[str, Any] | None

Optional dict filter applied against each object's meta field using recursive dict-subset matching.

None
profile str | None

Databricks CLI profile to use for remote metadata lookups.

None

Returns:

Type Description
list[str]

List of SQL queries executed for synchronization.

Raises:

Type Description
Exception

If catalog synchronization fails (see UnityCatalogAdapter for details).

Source code in src/kelp/catalog/api.py
def sync_catalog(
    sync_functions: bool = False,
    sync_metric_views: bool = True,
    sync_tables: bool = True,
    sync_abacs: bool = True,
    filter_by_meta: dict[str, Any] | None = None,
    profile: str | None = None,
) -> list[str]:
    """Synchronize all tables and metric views to remote Databricks catalog.

    Synchronizes the entire local catalog (tables and metric views) to the remote
    Databricks Unity Catalog, applying any configuration such as tags and properties
    based on the remote_catalog_config settings.

    Args:
        sync_functions: If True, syncs all functions to remote catalog.
        sync_metric_views: If True, syncs all metric views to remote catalog.
        sync_tables: If True, syncs all tables to remote catalog.
        sync_abacs: If True, syncs all ABAC policies to remote catalog.
        filter_by_meta: Optional dict filter applied against each object's ``meta``
            field using recursive dict-subset matching.
        profile: Databricks CLI profile to use for remote metadata lookups.

    Returns:
        List of SQL queries executed for synchronization.

    Raises:
        Exception: If catalog synchronization fails (see UnityCatalogAdapter for details).
    """
    uc_adapter = UnityCatalogAdapter()
    logger.info("Starting remote catalog sync for all tables & metric views...")
    queries: list[str] = []
    if sync_functions:
        queries.extend(
            uc_adapter.sync_all_functions(_get_objects("functions", filter_by_meta=filter_by_meta))
        )
    if sync_tables:
        queries.extend(
            uc_adapter.sync_all_tables(
                _get_objects("models", filter_by_meta=filter_by_meta),
                profile=profile,
            )
        )
    if sync_metric_views:
        queries.extend(
            uc_adapter.sync_all_metric_views(
                _get_objects("metric_views", filter_by_meta=filter_by_meta),
                profile=profile,
            )
        )
    if sync_abacs:
        queries.extend(
            uc_adapter.sync_all_abac_policies(_get_objects("abacs", filter_by_meta=filter_by_meta))
        )

    return queries

sync_functions

sync_functions(function_names=None, filter_by_meta=None)

Synchronize specified functions to the remote catalog.

Functions are pre-applied entities and are synced via CREATE OR REPLACE DDL.

Source code in src/kelp/catalog/api.py
def sync_functions(
    function_names: list[str] | None = None,
    filter_by_meta: dict[str, Any] | None = None,
) -> list[str]:
    """Synchronize specified functions to the remote catalog.

    Functions are pre-applied entities and are synced via CREATE OR REPLACE DDL.
    """
    functions = _get_objects("functions", names=function_names, filter_by_meta=filter_by_meta)
    uc_adapter = UnityCatalogAdapter()
    queries = uc_adapter.sync_all_functions(functions)
    return queries

sync_metric_views

sync_metric_views(
    view_names=None, filter_by_meta=None, profile=None
)

Synchronize specified metric views to the remote catalog.

Synchronizes metric views from the local catalog to Databricks Unity Catalog. If no specific view names are provided, all metric views are synchronized.

Parameters:

Name Type Description Default
view_names list[str] | None

List of metric view names to sync. If None, syncs all views.

None
filter_by_meta dict[str, Any] | None

Optional dict filter applied against each object's meta field using recursive dict-subset matching.

None
profile str | None

Databricks CLI profile to use for remote metadata lookups.

None

Returns:

Type Description
list[str]

List of SQL queries executed for synchronization.

Raises:

Type Description
KeyError

If a specified metric view name is not found in the catalog.

Exception

If metric view sync fails (see UnityCatalogAdapter for details).

Source code in src/kelp/catalog/api.py
def sync_metric_views(
    view_names: list[str] | None = None,
    filter_by_meta: dict[str, Any] | None = None,
    profile: str | None = None,
) -> list[str]:
    """Synchronize specified metric views to the remote catalog.

    Synchronizes metric views from the local catalog to Databricks Unity Catalog.
    If no specific view names are provided, all metric views are synchronized.

    Args:
        view_names: List of metric view names to sync. If None, syncs all views.
        filter_by_meta: Optional dict filter applied against each object's ``meta``
            field using recursive dict-subset matching.
        profile: Databricks CLI profile to use for remote metadata lookups.

    Returns:
        List of SQL queries executed for synchronization.

    Raises:
        KeyError: If a specified metric view name is not found in the catalog.
        Exception: If metric view sync fails (see UnityCatalogAdapter for details).
    """
    metric_views = _get_objects("metric_views", names=view_names, filter_by_meta=filter_by_meta)
    uc_adapter = UnityCatalogAdapter()
    logger.info("Starting sync for metric views...")
    queries = uc_adapter.sync_all_metric_views(metric_views, profile=profile)
    return queries

sync_tables

sync_tables(
    model_names=None, filter_by_meta=None, profile=None
)

Synchronize specified tables to the remote catalog.

Synchronizes tables from the local catalog to Databricks Unity Catalog, applying configuration such as tags and properties based on remote_catalog_config. If no specific table names are provided, all tables are synchronized.

Parameters:

Name Type Description Default
model_names list[str] | None

List of table names to sync. If None, syncs all tables.

None
filter_by_meta dict[str, Any] | None

Optional dict filter applied against each object's meta field using recursive dict-subset matching.

None
profile str | None

Databricks CLI profile to use for remote metadata lookups.

None

Returns:

Type Description
list[str]

List of SQL queries executed for synchronization.

Raises:

Type Description
KeyError

If a specified table name is not found in the catalog.

Exception

If table sync fails (see UnityCatalogAdapter for details).

Source code in src/kelp/catalog/api.py
def sync_tables(
    model_names: list[str] | None = None,
    filter_by_meta: dict[str, Any] | None = None,
    profile: str | None = None,
) -> list[str]:
    """Synchronize specified tables to the remote catalog.

    Synchronizes tables from the local catalog to Databricks Unity Catalog,
    applying configuration such as tags and properties based on remote_catalog_config.
    If no specific table names are provided, all tables are synchronized.

    Args:
        model_names: List of table names to sync. If None, syncs all tables.
        filter_by_meta: Optional dict filter applied against each object's ``meta``
            field using recursive dict-subset matching.
        profile: Databricks CLI profile to use for remote metadata lookups.

    Returns:
        List of SQL queries executed for synchronization.

    Raises:
        KeyError: If a specified table name is not found in the catalog.
        Exception: If table sync fails (see UnityCatalogAdapter for details).
    """
    tables = _get_objects("models", names=model_names, filter_by_meta=filter_by_meta)
    uc_adapter = UnityCatalogAdapter()
    queries = uc_adapter.sync_tables(tables, profile=profile)
    return queries

init

init(
    project_file_path=None,
    target=None,
    init_vars=None,
    manifest_file_path=None,
    refresh=False,
    store_in_global=True,
    run_policy_checks=False,
    log_level=None,
)

Initialize kelp runtime context from current directory.

When manifest_file_path is provided (or resolved from KELP_MANIFEST_FILE environment variable), the context is loaded directly from a pre-built manifest JSON file, skipping all project discovery, Jinja rendering, and metadata loading.

When policy_config.enabled is True in the project settings, metadata governance policies are evaluated immediately after loading. Warn-severity violations are logged; error-severity violations raise a RuntimeError.

Parameters:

Name Type Description Default
project_file_path str | None

Path to project file or directory.

None
target str | None

Target environment name.

None
init_vars dict[str, Any] | None

Runtime variable overrides.

None
manifest_file_path str | None

Path to a manifest JSON file. When provided, skips source file loading. Also resolved from KELP_MANIFEST_FILE env var.

None
refresh bool

If True, recreate context even if one already exists.

False
store_in_global bool

Whether to store context globally.

True
run_policy_checks bool

Whether to run policy checks.

False
log_level str | None

Optional log level to configure.

None

Returns:

Type Description
MetaRuntimeContext

The initialized MetaRuntimeContext.

Source code in src/kelp/config/config.py
def init(
    project_file_path: str | None = None,
    target: str | None = None,
    init_vars: dict[str, Any] | None = None,
    manifest_file_path: str | None = None,
    refresh: bool = False,
    store_in_global: bool = True,
    run_policy_checks: bool = False,
    log_level: str | None = None,
) -> MetaRuntimeContext:
    """Initialize kelp runtime context from current directory.

    When ``manifest_file_path`` is provided (or resolved from ``KELP_MANIFEST_FILE``
    environment variable), the context is loaded directly from a pre-built
    manifest JSON file, skipping all project discovery, Jinja rendering, and
    metadata loading.

    When ``policy_config.enabled`` is True in the project settings, metadata
    governance policies are evaluated immediately after loading. Warn-severity
    violations are logged; error-severity violations raise a RuntimeError.

    Args:
        project_file_path: Path to project file or directory.
        target: Target environment name.
        init_vars: Runtime variable overrides.
        manifest_file_path: Path to a manifest JSON file. When provided, skips
            source file loading. Also resolved from KELP_MANIFEST_FILE env var.
        refresh: If True, recreate context even if one already exists.
        store_in_global: Whether to store context globally.
        run_policy_checks: Whether to run policy checks.
        log_level: Optional log level to configure.

    Returns:
        The initialized MetaRuntimeContext.
    """
    if log_level:
        configure_logging(log_level)

    ctx = KelpFramework.init(
        project_file_path=project_file_path,
        target=target,
        init_vars=init_vars,
        manifest_file_path=manifest_file_path,
        refresh=refresh,
        store_in_global=store_in_global,
    )

    _run_policy_checks(ctx, run_policy_checks)

    return ctx