Usage Guide
Usage Guide
This page gives a brief overview of how to build workflows with fuseline.
Building Steps
Steps subclass Task
and implement run_step
to perform work. Steps can be connected using the >>
operator.
from fuseline import Task, Workflow
class Hello(Task):
def run_step(self, _setup_res):
print("hello")
class World(Task):
def run_step(self, _setup_res):
print("world")
= Hello()
hello = World()
world >> world
hello = Workflow(outputs=[world])
flow flow.run()
Typed Dependencies
Steps can depend on the output of other steps using Depends
and Computed
.
from fuseline import Computed, Depends, Task, Workflow
class Add(Task):
def run_step(self, x: int, y: int) -> int:
return x + y
class Multiply(Task):
= Add()
add def run_step(self, value: Computed[int] = Depends(add)) -> int:
return value * 2
= Multiply()
mul = Workflow(outputs=[mul])
wf print(wf.run({"x": 2, "y": 3})) # prints 10
Asynchronous Workflows
Use AsyncTask
and AsyncWorkflow
to execute steps asynchronously.
import asyncio
from fuseline import AsyncTask, AsyncWorkflow
class AsyncHello(AsyncTask):
async def run_step_async(self, _setup_res):
await asyncio.sleep(0.1)
print("hello")
async def main():
= AsyncHello()
step = AsyncWorkflow(outputs=[step])
wf await wf.run_async()
asyncio.run(main())
Parallel Execution
Run independent branches in parallel using an execution engine such as ProcessEngine
.
from fuseline import ProcessEngine
"a": 1, "b": 2}, execution_engine=ProcessEngine(2)) wf.run({
Exporting and Tracing
Workflows can be exported to YAML with Workflow.export()
and execution traces can be recorded using the trace
parameter. Both features rely on pluggable exporters and tracers so alternative formats or backends can be added easily.
"workflow.yaml")
wf.export(= Workflow(outputs=[step], trace="trace.log")
wf wf.run({})
See the example scripts in the examples/
directory for more inspiration.