Skip to content

Multi-step Pipeline

This example builds a three-node pipeline: a data source, a transformer, and a display. It demonstrates chained propagation and runtime error handling.


Components

Data Source

Produces a list of numbers:

Pipeline/source.py
from panel_flowdash import register

@register(
    page=False,
    component=True,
    title="Number Source",
    provides=[{"key": "numbers", "type": "list"}],
)
def app(config):
    import panel_material_ui as pmui
    return pmui.IntInput(name="Count", value=5, start=1, end=100)

Transformer

Receives a list and applies a transformation:

Pipeline/transform.py
from panel_flowdash import register

@register(
    page=False,
    component=True,
    title="Squarer",
    requires=[{"key": "numbers", "type": "list"}],
    provides=[{"key": "result", "type": "list"}],
)
def app(config):
    import panel as pn
    numbers = config.get("numbers", [])
    squared = [x**2 for x in numbers]
    return pn.pane.Str(str(squared))

Display

Shows the final result:

Pipeline/display.py
from panel_flowdash import register

@register(
    page=False,
    component=True,
    title="Result Display",
    requires=[{"key": "result", "type": "list"}],
)
def app(config):
    import panel as pn
    result = config.get("result", [])
    return pn.pane.Markdown(f"**Sum:** {sum(result)}")

Wiring with the DataflowGraph

from panel_flowdash import DataflowGraph, ComponentSpec, OutputPort, InputPort

specs = {
    "source": ComponentSpec(
        component_id="source",
        title="Source",
        description=None, icon=None, tags=[],
        outputs=[OutputPort(name="numbers", type="list")],
        inputs=[],
        default_size=None,
    ),
    "transform": ComponentSpec(
        component_id="transform",
        title="Transform",
        description=None, icon=None, tags=[],
        outputs=[OutputPort(name="result", type="list")],
        inputs=[InputPort(name="numbers", type="list")],
        default_size=None,
    ),
    "display": ComponentSpec(
        component_id="display",
        title="Display",
        description=None, icon=None, tags=[],
        outputs=[],
        inputs=[InputPort(name="result", type="list")],
        default_size=None,
    ),
}

errors = []
graph = DataflowGraph(specs, on_error=lambda *args: errors.append(args))

graph.add_node("src_1", "source")
graph.add_node("tfm_1", "transform")
graph.add_node("dsp_1", "display")

# Wire: source -> transform -> display
assert graph.add_edge("src_1", "numbers", "tfm_1", "numbers") is True
assert graph.add_edge("tfm_1", "result", "dsp_1", "result") is True

# Propagate values through the chain
src_state = graph.get_state("src_1")
src_state.numbers = [1, 2, 3, 4, 5]

# After propagation:
tfm_state = graph.get_state("tfm_1")
assert tfm_state.numbers == [1, 2, 3, 4, 5]

Error handling

If a type mismatch slips past compile-time checking (e.g. untyped ports), the runtime validation catches it:

# Attempting to connect incompatible typed ports fails at wire time:
result = graph.add_edge("src_1", "numbers", "dsp_1", "result")
# Returns error: "Input 'result' already has a connection..."

# Runtime errors on value propagation are caught by on_error:
assert len(errors) == 0  # no errors if types are compatible

Serving the pipeline

flowdash serve Pipeline/ --title "Data Pipeline"