Skip to content

Custom Python tasks

This page continues from Introduction. Here, the focus is not on built-in contact tasks, but on your own Python functions and how they become pipeline stages.

Start with the smallest custom task

The simplest task can just use ctx.path. In this example, we use DirectorySource and print each input path as the pipeline processes it.

from lahuta.pipeline import Pipeline, PipelineContext
from lahuta.sources import DirectorySource


def print_path(ctx: PipelineContext) -> None:
    print(ctx.path)


p = Pipeline(DirectorySource("core/data", extensions=[".cif"], recursive=False))
p.add_task(name="print_path", task=print_path, store=False)
p.run(threads=2)

This task returns None, so it does not emit channel output. It is useful for lightweight checks, logging, and validating what the source is producing.

A richer custom task

Most real tasks need structural context. You can request that by depending on topology, then read both the parsed system and computed topology inside your function.

from pathlib import Path

from lahuta.pipeline import InMemoryPolicy, Pipeline, PipelineContext
from lahuta.sources import DirectorySource


def summarize_structure(ctx: PipelineContext) -> dict:
    system = ctx.get_system()
    top = ctx.get_topology()

    atom_ids = list(top.get_atom_ids()) if top is not None else []
    return {
        "file": Path(ctx.path).name,
        "n_atoms": int(system.n_atoms) if system is not None else 0,
        "n_topology_atoms": len(atom_ids),
        "first_atom_id": int(atom_ids[0]) if atom_ids else None,
    }


p = Pipeline(DirectorySource("core/data", extensions=[".cif"], recursive=False))
p.add_task(
    name="summary",
    task=summarize_structure,
    depends=["topology"],
    in_memory_policy=InMemoryPolicy.Keep,
)

out = p.run(threads=2)
print(out.to_dict("summary")[:2])

Four tasks with interdependencies

Custom tasks become more powerful when they depend on each other. The example below adds four stages and passes intermediate results through the per-item context.

from pathlib import Path

from lahuta.pipeline import InMemoryPolicy, Pipeline, PipelineContext
from lahuta.sources import DirectorySource


def file_info(ctx: PipelineContext) -> dict:
    return {"file": Path(ctx.path).name}


def counts(ctx: PipelineContext) -> dict:
    system = ctx.get_system()
    top = ctx.get_topology()
    return {
        "atoms": int(system.n_atoms) if system is not None else 0,
        "topology_ready": top is not None,
    }


def size_class(ctx: PipelineContext) -> dict:
    c = ctx.get_json("counts")
    atoms = int(c["atoms"]) if c is not None else 0
    label = "large" if atoms >= 2000 else "small"
    return {"label": label}


def report(ctx: PipelineContext) -> str:
    info = ctx.get_json("file_info") or {}
    cls = ctx.get_json("size_class") or {}
    return f'{info.get("file", "unknown")} -> {cls.get("label", "unknown")}'


p = Pipeline(DirectorySource("core/data", extensions=[".cif"], recursive=False))
p.add_task(name="file_info", task=file_info, in_memory_policy=InMemoryPolicy.Keep)
p.add_task(name="counts", task=counts, depends=["topology"], in_memory_policy=InMemoryPolicy.Keep)
p.add_task(name="size_class", task=size_class, depends=["counts"], in_memory_policy=InMemoryPolicy.Keep)
p.add_task(name="report", task=report, depends=["file_info", "size_class"], in_memory_policy=InMemoryPolicy.Keep)

out = p.run(threads=2)
print(out.to_dict("report")[:5])

In this graph, counts requires built-in topology. size_class depends on counts, and report depends on both file_info and size_class.

This is the core pipeline pattern: compose small task functions, connect them with clear dependencies, and keep each stage easy to test.

Continue to Architecture details for dependency inference, runtime internals, and performance caveats.