Usage Guide
Usage Guide
This page gives a brief overview of how to build workflows with fuseline.
Building Steps
Steps subclass Task and implement run_step to perform work. Steps can be connected using the >> operator.
from fuseline import Task, Workflow
class Hello(Task):
def run_step(self, _setup_res):
print("hello")
class World(Task):
def run_step(self, _setup_res):
print("world")
hello = Hello()
world = World()
hello >> world
flow = Workflow(outputs=[world])
flow.run()Typed Dependencies
Steps can depend on the output of other steps using Depends and Computed.
from fuseline import Computed, Depends, Task, Workflow
class Add(Task):
def run_step(self, x: int, y: int) -> int:
return x + y
class Multiply(Task):
add = Add()
def run_step(self, value: Computed[int] = Depends(add)) -> int:
return value * 2
mul = Multiply()
wf = Workflow(outputs=[mul])
print(wf.run({"x": 2, "y": 3})) # prints 10Asynchronous Workflows
Use AsyncTask and AsyncWorkflow to execute steps asynchronously.
import asyncio
from fuseline import AsyncTask, AsyncWorkflow
class AsyncHello(AsyncTask):
async def run_step_async(self, _setup_res):
await asyncio.sleep(0.1)
print("hello")
async def main():
step = AsyncHello()
wf = AsyncWorkflow(outputs=[step])
await wf.run_async()
asyncio.run(main())Parallel Execution
Run independent branches in parallel using an execution engine such as ProcessEngine.
from fuseline import ProcessEngine
wf.run({"a": 1, "b": 2}, execution_engine=ProcessEngine(2))Exporting and Tracing
Workflows can be exported to YAML with Workflow.export() and execution traces can be recorded using the trace parameter. Both features rely on pluggable exporters and tracers so alternative formats or backends can be added easily.
wf.export("workflow.yaml")
wf = Workflow(outputs=[step], trace="trace.log")
wf.run({})See the example scripts in the examples/ directory for more inspiration.