Skip to content

Sources

kelp.models.source

Source pydantic-model

Bases: BaseModel

Source definition for Kelp projects.

Represents a data source (volume, table, or raw path) that can be referenced in pipelines using kp.source(). Sources provide a single point of configuration for data locations and connection options.

Attributes:

Name Type Description
origin_file_path SkipJsonSchema[str] | None

Path to the source YAML file defining this source.

name str

Source name (unique identifier).

source_type Literal['volume', 'table', 'raw_path']

Type of source ('volume', 'table', 'raw_path').

path str | None

Physical path for volume or raw_path sources (e.g., '/Volumes/catalog/schema/volume').

catalog str | None

Catalog name for table or volume sources.

schema_ str | None

Schema name for table or volume sources.

table_name str | None

Name for table or view sources.

volume_name str | None

Volume name for volume sources (constructs path as /Volumes/catalog/schema/volume_name).

options dict[str, Any]

Dictionary of options specific to this source (e.g., format, headers).

description str | None

Human-readable description of the source.

Show JSON schema:
{
  "description": "Source definition for Kelp projects.\n\nRepresents a data source (volume, table, or raw path) that can be referenced\nin pipelines using kp.source(). Sources provide a single point of configuration\nfor data locations and connection options.\n\nAttributes:\n    origin_file_path: Path to the source YAML file defining this source.\n    name: Source name (unique identifier).\n    source_type: Type of source ('volume', 'table', 'raw_path').\n    path: Physical path for volume or raw_path sources (e.g., '/Volumes/catalog/schema/volume').\n    catalog: Catalog name for table or volume sources.\n    schema_: Schema name for table or volume sources.\n    table_name: Name for table or view sources.\n    volume_name: Volume name for volume sources (constructs path as /Volumes/catalog/schema/volume_name).\n    options: Dictionary of options specific to this source (e.g., format, headers).\n    description: Human-readable description of the source.",
  "properties": {
    "name": {
      "description": "Source name (unique identifier)",
      "title": "Name",
      "type": "string"
    },
    "source_type": {
      "default": "table",
      "description": "Type of source: volume, table, or raw_path",
      "enum": [
        "volume",
        "table",
        "raw_path"
      ],
      "title": "Source Type",
      "type": "string"
    },
    "path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Physical path for volume or raw_path sources",
      "title": "Path"
    },
    "catalog": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Catalog name for table or volume sources",
      "title": "Catalog"
    },
    "schema": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Schema name for table or volume sources",
      "title": "Schema"
    },
    "table_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Name for table or view sources",
      "title": "Table Name"
    },
    "volume_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Volume name for volume sources (constructs /Volumes/catalog/schema/volume_name)",
      "title": "Volume Name"
    },
    "options": {
      "additionalProperties": true,
      "description": "Source-specific options (e.g., format, headers, encoding)",
      "title": "Options",
      "type": "object"
    },
    "description": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Human-readable description of the source",
      "title": "Description"
    },
    "meta": {
      "additionalProperties": true,
      "description": "Generic user-defined metadata for filtering and grouping",
      "title": "Meta",
      "type": "object"
    }
  },
  "required": [
    "name"
  ],
  "title": "Source",
  "type": "object"
}

Config:

  • validate_by_name: True
  • validate_by_alias: True
  • serialize_by_alias: True
  • use_enum_values: True

Fields:

origin_file_path pydantic-field

origin_file_path = None

Path to the source YAML file defining this source

name pydantic-field

name

Source name (unique identifier)

source_type pydantic-field

source_type = 'table'

Type of source: volume, table, or raw_path

path pydantic-field

path = None

Physical path for volume or raw_path sources

catalog pydantic-field

catalog = None

Catalog name for table or volume sources

schema_ pydantic-field

schema_ = None

Schema name for table or volume sources

table_name pydantic-field

table_name = None

Name for table or view sources

volume_name pydantic-field

volume_name = None

Volume name for volume sources (constructs /Volumes/catalog/schema/volume_name)

options pydantic-field

options

Source-specific options (e.g., format, headers, encoding)

description pydantic-field

description = None

Human-readable description of the source

meta pydantic-field

meta

Generic user-defined metadata for filtering and grouping

model_config class-attribute instance-attribute

model_config = ConfigDict(
    validate_by_name=True,
    validate_by_alias=True,
    serialize_by_alias=True,
    use_enum_values=True,
)

fqn property

fqn

Get the fully qualified name for table sources.

Returns:

Type Description
str | None

Fully qualified name (catalog.schema.model_name) for table sources,

str | None

or None for other source types.

volume_fqn property

volume_fqn

Get the fully qualified path for volume sources.

Constructs the volume path as /Volumes/catalog/schema/volume_name.

Returns:

Type Description
str | None

Volume path (/Volumes/catalog/schema/volume_name) for volume sources,

str | None

or None for other source types.

get_path

get_path()

Get the path for this source.

For table sources, returns the fully qualified name. For volume sources with catalog, schema, and volume_name defined, returns the constructed volume path (/Volumes/catalog/schema/volume_name). Otherwise, returns the explicit path field. For raw_path sources, returns the path directly.

Returns:

Type Description
str

The path or fully qualified name for this source.

Raises:

Type Description
ValueError

If the source configuration is incomplete.

Source code in src/kelp/models/source.py
def get_path(self) -> str:
    """Get the path for this source.

    For table sources, returns the fully qualified name.
    For volume sources with catalog, schema, and volume_name defined,
    returns the constructed volume path (/Volumes/catalog/schema/volume_name).
    Otherwise, returns the explicit path field.
    For raw_path sources, returns the path directly.

    Returns:
        The path or fully qualified name for this source.

    Raises:
        ValueError: If the source configuration is incomplete.
    """
    if self.source_type == "table":
        fqn = self.fqn
        if not fqn:
            raise ValueError(
                f"Table source '{self.name}' requires catalog, schema, and table_name"
            )
        return fqn
    if self.source_type == "volume":
        # Try constructed path first (catalog + schema + volume_name)
        volume_path = self.volume_fqn
        if volume_path:
            return volume_path
        # Fall back to explicit path
        if self.path:
            return self.path
        raise ValueError(
            f"Volume source '{self.name}' requires either path or (catalog, schema, volume_name)"
        )
    if self.source_type == "raw_path":
        if not self.path:
            raise ValueError(f"Source '{self.name}' requires a path")
        return self.path
    raise ValueError(f"Unknown source_type: {self.source_type}")

kelp.service.source_manager.SourceManager

Manager for accessing and resolving sources in Kelp projects.

get_source classmethod

get_source(name)

Get a source by name.

Retrieves a source definition from the project catalog.

Parameters:

Name Type Description Default
name str

Source name to retrieve.

required

Returns:

Type Description
Source

Source object with all configuration.

Raises:

Type Description
KeyError

If the source name is not found in the catalog.

Source code in src/kelp/service/source_manager.py
@classmethod
def get_source(cls, name: str) -> Source:
    """Get a source by name.

    Retrieves a source definition from the project catalog.

    Args:
        name: Source name to retrieve.

    Returns:
        Source object with all configuration.

    Raises:
        KeyError: If the source name is not found in the catalog.
    """
    ctx = get_context()
    return ctx.catalog_index.get("sources", name)

get_path classmethod

get_path(name)

Get the path for a source.

Returns the fully qualified path for the source, which differs based on the source type: - For table sources: returns catalog.schema.model_name - For volume sources: returns the volume path - For raw_path sources: returns the path value

Parameters:

Name Type Description Default
name str

Source name.

required

Returns:

Type Description
str

Path string suitable for use in pipelines.

Raises:

Type Description
KeyError

If the source is not found in the catalog.

ValueError

If the source configuration is incomplete.

Source code in src/kelp/service/source_manager.py
@classmethod
def get_path(cls, name: str) -> str:
    """Get the path for a source.

    Returns the fully qualified path for the source, which differs based on
    the source type:
    - For table sources: returns catalog.schema.model_name
    - For volume sources: returns the volume path
    - For raw_path sources: returns the path value

    Args:
        name: Source name.

    Returns:
        Path string suitable for use in pipelines.

    Raises:
        KeyError: If the source is not found in the catalog.
        ValueError: If the source configuration is incomplete.
    """
    source = cls.get_source(name)
    return source.get_path()

get_options classmethod

get_options(name)

Get the options dictionary for a source.

Options contain source-specific configuration such as format, headers, encoding, or other DataFrame reader/writer options.

Parameters:

Name Type Description Default
name str

Source name.

required

Returns:

Type Description
dict

Dictionary of source-specific options.

Raises:

Type Description
KeyError

If the source is not found in the catalog.

Source code in src/kelp/service/source_manager.py
@classmethod
def get_options(cls, name: str) -> dict:
    """Get the options dictionary for a source.

    Options contain source-specific configuration such as format, headers,
    encoding, or other DataFrame reader/writer options.

    Args:
        name: Source name.

    Returns:
        Dictionary of source-specific options.

    Raises:
        KeyError: If the source is not found in the catalog.
    """
    source = cls.get_source(name)
    return source.options