Schedules#

Tags: Basic

Launch plans can be set to run automatically on a schedule using the Flyte Native Scheduler. For workflows that depend on knowing the kick-off time, Flyte supports passing in the scheduled time (not the actual time, which may be a few seconds off) as an argument to the workflow.

Check out a demo of how the Native Scheduler works:

Note

Native scheduler doesn’t support AWS syntax.

Note

To clone and run the example code on this page, see the Flytesnacks repo.

Consider the following example workflow:

productionizing/lp_schedules.py#
from datetime import datetime

from flytekit import task, workflow


@task
def format_date(run_date: datetime) -> str:
    return run_date.strftime("%Y-%m-%d %H:%M")


@workflow
def date_formatter_wf(kickoff_time: datetime):
    formatted_kickoff_time = format_date(run_date=kickoff_time)
    print(formatted_kickoff_time)

The date_formatter_wf workflow can be scheduled using either the CronSchedule or the FixedRate object.

Cron schedules#

Cron expression strings use this syntax. An incorrect cron schedule expression would lead to failure in triggering the schedule.

productionizing/lp_schedules.py#
from flytekit import CronSchedule, LaunchPlan  # noqa: E402

# creates a launch plan that runs every minute.
cron_lp = LaunchPlan.get_or_create(
    name="my_cron_scheduled_lp",
    workflow=date_formatter_wf,
    schedule=CronSchedule(
        # Note that the ``kickoff_time_input_arg`` matches the workflow input we defined above: kickoff_time
        # But in case you are using the AWS scheme of schedules and not using the native scheduler then switch over the schedule parameter with cron_expression
        schedule="*/1 * * * *",  # Following schedule runs every min
        kickoff_time_input_arg="kickoff_time",
    ),
)

The kickoff_time_input_arg corresponds to the workflow input kickoff_time. Specifying this argument means that Flyte will pass in the kick-off time of the cron schedule into the kickoff_time argument of the date_formatter_wf workflow.

Fixed rate intervals#

If you prefer to use an interval rather than a cron scheduler to schedule your workflows, you can use the fixed-rate scheduler. A fixed-rate scheduler runs at the specified interval.

Here’s an example:

productionizing/lp_schedules.py#
from datetime import timedelta  # noqa: E402

from flytekit import FixedRate, LaunchPlan  # noqa: E402


@task
def be_positive(name: str) -> str:
    return f"You're awesome, {name}"


@workflow
def positive_wf(name: str):
    reminder = be_positive(name=name)
    print(f"{reminder}")


fixed_rate_lp = LaunchPlan.get_or_create(
    name="my_fixed_rate_lp",
    workflow=positive_wf,
    # Note that the workflow above doesn't accept any kickoff time arguments.
    # We just omit the ``kickoff_time_input_arg`` from the FixedRate schedule invocation
    schedule=FixedRate(duration=timedelta(minutes=10)),
    fixed_inputs={"name": "you"},
)

This fixed-rate scheduler runs every ten minutes. Similar to a cron scheduler, a fixed-rate scheduler also accepts kickoff_time_input_arg (which is omitted in this example).

Activating a schedule#

After initializing your launch plan, activate the specific version of the launch plan so that the schedule runs.

flytectl update launchplan -p flyteexamples -d development {{ name_of_lp }} --version <foo> --activate

Verify if your launch plan was activated:

flytectl get launchplan -p flytesnacks -d development

Deactivating a schedule#

You can archive/deactivate the launch plan to deschedule any scheduled job associated with it.

flytectl update launchplan -p flyteexamples -d development {{ name_of_lp }} --version <foo> --archive