Advanced usage¶
1) Dependency strategy in larger graphs¶
In advanced graphs, be explicit about dependencies whenever ordering is part of the design. Inference is great for simple built-in access (ctx.get_system() / ctx.get_topology()), but explicit depends=[...] makes multi-stage logic easier to reason about.
Use this rule:
- rely on inference for small, local tasks,
- use explicit dependencies for cross-stage data flow and long-lived pipelines.
2) Performance tuning: where runtime really goes¶
The main tuning knobs are:
threadsinrun(...)for item-level parallelism,thread_safeper Python task,writer_threadsfor sink throughput.
from lahuta.pipeline import InMemoryPolicy, Pipeline
from lahuta.sources import FileSource
def heavy_py(ctx):
# imagine CPU-heavy Python logic here
return {"path": ctx.path}
p = Pipeline(FileSource(["core/data/1kx2_small.cif"]))
p.add_task(
name="heavy",
task=heavy_py,
in_memory_policy=InMemoryPolicy.Keep,
thread_safe=True,
writer_threads=4,
)
out = p.run(threads=8)
Keep in mind that Lahuta parallelizes across items, not inside one Python function body. If one Python task is very slow, that stage dominates runtime even when C++ stages are fast.
3) Backpressure and sink tuning¶
When output sinks are slower than compute stages, tune backpressure instead of letting queues grow uncontrolled.
from lahuta.pipeline import BackpressureConfig, OnFull, set_default_backpressure_config
cfg = BackpressureConfig()
cfg.max_queue_msgs = 512
cfg.max_queue_bytes = 8 * 1024 * 1024
cfg.max_batch_bytes = 2 * 1024 * 1024
cfg.writer_threads = 4
cfg.on_full = OnFull.Block
cfg.required = True
set_default_backpressure_config(cfg)
Use required=False for best-effort sinks that should not fail the run. Use required=True for sinks that are part of correctness, such as an authoritative output file set.
4) Result decoding tradeoffs¶
Choose decode paths by data shape and cost:
to_numpy(channel)for binary contact channels and analytics workflows,to_dict(channel)for Python-native records,json(channel)for direct JSON payload streams,raw(channel)when you need exact emitted payloads.
contacts_np = out.to_numpy("contacts")
contacts_py = out.to_dict("contacts")
events = out.json("events")
payloads = out.raw("events")
For large runs, binary + late decode is usually the best performance path.
5) Model-data pipelines and requires_fields¶
For model/LMDB workflows, keep payload loading intentional with requires_fields.
from lahuta.pipeline import DataField, InMemoryPolicy, Pipeline
from lahuta.sources import DatabaseSource
p = Pipeline(DatabaseSource("demo_db", batch=256))
p.params("system").is_model = True
def capture(ctx):
payload = ctx.model_payload
view = payload.positions_view if payload is not None else None
return {"has_view": view is not None}
p.add_task(
name="capture",
task=capture,
requires_fields=[DataField.PositionsView],
in_memory_policy=InMemoryPolicy.Keep,
)
Views are efficient but lifetime-sensitive. Copy data inside the task if it must survive beyond that task execution.
6) Graph patterns: fan-in, fan-out, and channel design¶
Use channel names to separate “what computes” from “where output goes.” Multiple tasks can emit to one channel (fan-in), and one channel can have multiple sinks.
from lahuta.pipeline import Pipeline
from lahuta.sources import FileSource
p = Pipeline(FileSource(["core/data/1kx2_small.cif"]))
p.add_task(name="meta_file", task=lambda ctx: {"file": ctx.path}, channel="meta")
p.add_task(name="meta_size", task=lambda ctx: {"size": 1}, channel="meta")
p.to_files("meta", path="meta.ndjson")
7) Streaming sources (NMR and MD)¶
Use streaming sources when each logical input expands into many frames.
from lahuta.pipeline import Pipeline
from lahuta.sources import MdTrajectoriesSource, NmrSource
nmr = Pipeline(NmrSource(["nmr_models.cif.gz"]))
md = Pipeline(MdTrajectoriesSource([("traj.gro", ["traj.xtc"])]))
Frame-level metadata is available through PipelineContext (session_id, conformer_id, timestamp_ps) so custom tasks can produce time-aware outputs.
For a focused discussion of why MD/NMR streaming behaves differently from single-structure analysis, see MD/NMR analysis.
8) Observability and debugging¶
Use graph and report APIs to inspect behavior before optimizing blindly.
from lahuta.pipeline import Pipeline, ReportingLevel
from lahuta.sources import FileSource
p = Pipeline(FileSource(["core/data/1kx2_small.cif"]))
print(p.describe())
p.set_reporting_level(ReportingLevel.DEBUG)
p.run(threads=2)
print(p.get_run_report())
DEBUG reports are especially useful for queue pressure, permit wait time, stage timing, and sink multiplexer behavior.
9) Failure handling patterns¶
Pipeline runs are designed to continue across item-level failures. For Python tasks, exceptions are captured into structured error records rather than aborting the whole run.
Combine this with channel isolation and post-run checks:
- keep critical outputs in dedicated channels,
- inspect error channels explicitly,
- decide in application code whether non-zero errors should fail your job.
For long-running production pipelines, this “collect errors, then decide policy” model is usually more robust than fail-fast for the first bad item.
Continue to MD/NMR analysis for streaming-specific behavior and caveats.