Flow Example

Example code for creating a Prefect Flow using Aiviro.

from prefect import Parameter, task

from aiviro import AIVIRO_DEBUG_KEY
from aiviro.modules.prefect import (
    create_aiviro_flow,
    run_aiviro_script,
    run_python_script,
    run_shell_command,
)


@task(name="aiviro-script")
def parameter_task(number: int):
    run_aiviro_script(
        aiviro_config_path="aiviro_config.yaml",
        script_path="src/aiviro-script.py",
        script_args=f"--n {number}",
        env={AIVIRO_DEBUG_KEY: "1"},
    )


@task(name="test-python-task")
def python_task():
    run_python_script(script_path="src/python_script.py", script_args="--i input.txt")


@task(name="test-shell-command")
def shell_task(ls_arg: str):
    run_shell_command(command=f"ls {ls_arg}")


if __name__ == "__main__":
    # create a flow with a web-robot environment
    fl = create_aiviro_flow(name="My Flow", robot_env="web-test")

    # define parameters & new tasks
    p_num = Parameter(name="number", required=True)
    p_ls = Parameter(name="ls-args", default="")

    # add the tasks
    fl.add_task(parameter_task)
    fl.add_task(shell_task)
    fl.add_task(python_task)

    # create the edges
    shell_task.set_upstream(parameter_task, flow=fl)

    # bind the parameters
    parameter_task.bind(number=p_num, flow=fl)
    shell_task.bind(ls_arg=p_ls, flow=fl)

    # register the flow
    fl.register(project_name="My Project")