Skip to content

Advanced usage

1) Dependency strategy in larger graphs

In advanced graphs, be explicit about dependencies whenever ordering is part of the design. Inference is great for simple built-in access (ctx.get_system() / ctx.get_topology()), but explicit depends=[...] makes multi-stage logic easier to reason about.

Use this rule:

  • rely on inference for small, local tasks,
  • use explicit dependencies for cross-stage data flow and long-lived pipelines.

2) Performance tuning: where runtime really goes

The main tuning knobs are:

  • threads in run(...) for item-level parallelism,
  • thread_safe per Python task,
  • writer_threads for sink throughput.
from lahuta.pipeline import InMemoryPolicy, Pipeline
from lahuta.sources import FileSource


def heavy_py(ctx):
    # imagine CPU-heavy Python logic here
    return {"path": ctx.path}


p = Pipeline(FileSource(["core/data/1kx2_small.cif"]))
p.add_task(
    name="heavy",
    task=heavy_py,
    in_memory_policy=InMemoryPolicy.Keep,
    thread_safe=True,
    writer_threads=4,
)
out = p.run(threads=8)

Keep in mind that Lahuta parallelizes across items, not inside one Python function body. If one Python task is very slow, that stage dominates runtime even when C++ stages are fast.

3) Backpressure and sink tuning

When output sinks are slower than compute stages, tune backpressure instead of letting queues grow uncontrolled.

from lahuta.pipeline import BackpressureConfig, OnFull, set_default_backpressure_config

cfg = BackpressureConfig()
cfg.max_queue_msgs = 512
cfg.max_queue_bytes = 8 * 1024 * 1024
cfg.max_batch_bytes = 2 * 1024 * 1024
cfg.writer_threads = 4
cfg.on_full = OnFull.Block
cfg.required = True
set_default_backpressure_config(cfg)

Use required=False for best-effort sinks that should not fail the run. Use required=True for sinks that are part of correctness, such as an authoritative output file set.

4) Result decoding tradeoffs

Choose decode paths by data shape and cost:

  • to_numpy(channel) for binary contact channels and analytics workflows,
  • to_dict(channel) for Python-native records,
  • json(channel) for direct JSON payload streams,
  • raw(channel) when you need exact emitted payloads.
contacts_np = out.to_numpy("contacts")
contacts_py = out.to_dict("contacts")
events = out.json("events")
payloads = out.raw("events")

For large runs, binary + late decode is usually the best performance path.

5) Model-data pipelines and requires_fields

For model/LMDB workflows, keep payload loading intentional with requires_fields.

from lahuta.pipeline import DataField, InMemoryPolicy, Pipeline
from lahuta.sources import DatabaseSource

p = Pipeline(DatabaseSource("demo_db", batch=256))
p.params("system").is_model = True

def capture(ctx):
    payload = ctx.model_payload
    view = payload.positions_view if payload is not None else None
    return {"has_view": view is not None}

p.add_task(
    name="capture",
    task=capture,
    requires_fields=[DataField.PositionsView],
    in_memory_policy=InMemoryPolicy.Keep,
)

Views are efficient but lifetime-sensitive. Copy data inside the task if it must survive beyond that task execution.

6) Graph patterns: fan-in, fan-out, and channel design

Use channel names to separate “what computes” from “where output goes.” Multiple tasks can emit to one channel (fan-in), and one channel can have multiple sinks.

from lahuta.pipeline import Pipeline
from lahuta.sources import FileSource

p = Pipeline(FileSource(["core/data/1kx2_small.cif"]))
p.add_task(name="meta_file", task=lambda ctx: {"file": ctx.path}, channel="meta")
p.add_task(name="meta_size", task=lambda ctx: {"size": 1}, channel="meta")
p.to_files("meta", path="meta.ndjson")

7) Streaming sources (NMR and MD)

Use streaming sources when each logical input expands into many frames.

from lahuta.pipeline import Pipeline
from lahuta.sources import MdTrajectoriesSource, NmrSource

nmr = Pipeline(NmrSource(["nmr_models.cif.gz"]))
md = Pipeline(MdTrajectoriesSource([("traj.gro", ["traj.xtc"])]))

Frame-level metadata is available through PipelineContext (session_id, conformer_id, timestamp_ps) so custom tasks can produce time-aware outputs.

For a focused discussion of why MD/NMR streaming behaves differently from single-structure analysis, see MD/NMR analysis.

8) Observability and debugging

Use graph and report APIs to inspect behavior before optimizing blindly.

from lahuta.pipeline import Pipeline, ReportingLevel
from lahuta.sources import FileSource

p = Pipeline(FileSource(["core/data/1kx2_small.cif"]))
print(p.describe())

p.set_reporting_level(ReportingLevel.DEBUG)
p.run(threads=2)
print(p.get_run_report())

DEBUG reports are especially useful for queue pressure, permit wait time, stage timing, and sink multiplexer behavior.

9) Failure handling patterns

Pipeline runs are designed to continue across item-level failures. For Python tasks, exceptions are captured into structured error records rather than aborting the whole run.

Combine this with channel isolation and post-run checks:

  • keep critical outputs in dedicated channels,
  • inspect error channels explicitly,
  • decide in application code whether non-zero errors should fail your job.

For long-running production pipelines, this “collect errors, then decide policy” model is usually more robust than fail-fast for the first bad item.

Continue to MD/NMR analysis for streaming-specific behavior and caveats.