Skip to content

Introduction

The pipeline API is how we go from one structure input into a repeatable analysis across many input files. In a nutshell, you define a source, add one or more tasks, run once, and read outputs from named channels. We start simple, and add complexity in layers.

Start with one task that computes contacts using the default contact configuration.

In Lahuta, ContactTask() defaults to the MolStar provider with the standard interaction selection, so this is the shortest useful setup:

from lahuta.pipeline import ContactTask, Pipeline
from lahuta.sources import FileSource

p = Pipeline(FileSource(["core/data/1kx2_small.cif"]))
p.add_task(name="contacts", task=ContactTask())

out = p.run(threads=2)
records = out.to_dict("contacts")

print(len(records))
print(records[0]["provider"])  # molstar

This one call has four building blocks:

  • FileSource says where items come from.
  • Pipeline(...) creates the execution graph around that source.
  • add_task(...) adds a node that emits to a channel (here, contacts).
  • run(...) executes the graph, and PipelineResult gives you decoded output for each channel.

FileSource is just the simplest starting point. Lahuta supports multiple source types for different analysis and data types:

from lahuta.sources import (
    DatabaseHandleSource,
    DatabaseSource,
    DirectorySource,
    FileListSource,
    FileSource,
    MdTrajectoriesSource,
    NmrSource,
)

# One or more explicit files
src_files = FileSource(["core/data/1kx2_small.cif", "core/data/fubi.cif"])

# Scan a directory recursively
src_dir = DirectorySource("core/data", extensions=[".cif", ".cif.gz"], recursive=True)

# Read a text file with one input path per line
src_list = FileListSource("inputs.txt")

# Read records from an LMDB path
src_db = DatabaseSource("demo_db", batch=256)

# Read records from an existing open DB handle
src_db_handle = DatabaseHandleSource(db_handle, batch=256)

# Stream NMR multi-model inputs
src_nmr = NmrSource(["nmr_model.cif.gz"])

# Stream MD trajectories (structure + XTC files)
src_md = MdTrajectoriesSource([
    ("traj.gro", ["traj.xtc"]),
])

In practice, you keep the same task code and swap only the source constructor based on where the data lives.

When you want full control, make the contact task explicit. This is the same operation with all key contact options shown:

from lahuta import ContactProvider, InteractionType
from lahuta.pipeline import ContactTask, InMemoryPolicy, Pipeline
from lahuta.sources import DirectorySource

p = Pipeline(DirectorySource("core/data", extensions=[".cif"], recursive=False))
p.add_task(
    name="contacts",
    task=ContactTask(
        provider=ContactProvider.MolStar,
        interaction_type=InteractionType.All,
    ),
    in_memory_policy=InMemoryPolicy.Keep,
)

Available contact providers are:

  • ContactProvider.MolStar
  • ContactProvider.Arpeggio
  • ContactProvider.GetContacts

You can add multiple contact tasks to the same pipeline run. This is useful when you want to compare provider behavior on the same structure set without running the source twice:

from lahuta import ContactProvider
from lahuta.pipeline import ContactTask, Pipeline
from lahuta.sources import FileListSource

p = Pipeline(FileListSource("inputs.txt"))
p.add_task(name="contacts_molstar",  task=ContactTask(provider=ContactProvider.MolStar))
p.add_task(name="contacts_arpeggio", task=ContactTask(provider=ContactProvider.Arpeggio))

out = p.run(threads=2)
molstar = out.to_dict("contacts_molstar")
arpeggio = out.to_dict("contacts_arpeggio")

The same graph can include your own Python functions, not only built-in contact tasks.
Custom tasks can read the current PipelineContext, use ctx.get_system() or ctx.get_topology() when needed, and emit text or JSON payloads into channels.

Continue to the next page: Custom Python tasks.