FlyteRemote: A Programmatic Control Plane Interface#
For those who require programmatic access to the control plane, the remote module enables you to perform
certain operations in a Python runtime environment.
Since this section naturally deals with the control plane, this discussion is only relevant for those who have a Flyte backend set up and have access to it (a local demo cluster will suffice as well).
Creating a FlyteRemote Object#
The FlyteRemote class is the entrypoint for programmatically performing operations in a Python
runtime. It can be initialized by passing in the:
Configobject: the parent configuration object that holds all the configuration information to connect to the Flyte backend.default_project: the default project to use when fetching or executing flyte entities.default_domain: the default domain to use when fetching or executing flyte entities.file_access: the file access provider to use for offloading non-literal inputs/outputs.kwargs: additional arguments that need to be passed to createSynchronousFlyteClient.
A FlyteRemote object can be created in various ways:
Auto#
The FlyteRemote class’s auto() method can be used to automatically construct the Config object.
from flytekit import FlyteRemote
remote = FlyteRemote.auto()
auto also accepts a config_file argument, which is the path to the configuration file to use.
The order of precedence that auto follows is:
Finds all the environment variables that match the configuration variables.
If no environment variables are set, it looks for a configuration file at the path specified by the
config_fileargument.If no configuration file is found, it uses the default values.
Sandbox#
The FlyteRemote class’s for_sandbox() method can be used to construct the Config object, specifically to connect to the Flyte cluster.
from flytekit import FlyteRemote
remote = FlyteRemote.for_sandbox()
The initialization is as simple as calling for_sandbox() on the FlyteRemote class!
This, by default, uses localhost:30081 as the endpoint, and the default minio credentials.
If the sandbox is in a hosted-like environment, then port-forward or ingress URLs need to be taken care of.
Any Endpoint#
The FlyteRemote class’s for_endpoint() method can be used to construct the FlyteRemote object to connect to a specific endpoint.
from flytekit import FlyteRemote
remote = FlyteRemote.for_endpoint(
endpoint="flyte.example.net",
default_project="flytesnacks",
default_domain="development",
)
The for_endpoint method also accepts:
insecure: whether to use insecure connections. Defaults toFalse.data_config: can be used to configure how data is downloaded or uploaded to a specific blob storage like S3, GCS, etc.config_file: the path to the configuration file to use.
Generalized Initialization#
The Config class can be directly used to construct the Config object if additional configuration is needed.
You can send PlatformConfig, DataConfig,
SecretsConfig, and StatsConfig objects to the Config class.
|
Settings to talk to a Flyte backend. |
|
Any data storage specific configuration. |
|
Configuration for secrets. |
|
Configuration for sending statsd. |
For example:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, PlatformConfig
remote = FlyteRemote(
config=Config(
platform=PlatformConfig(
endpoint="flyte.example.net",
insecure=False,
client_id="my-client-id",
client_credentials_secret="my-client-secret",
auth_mode="client_credentials",
),
secrets=SecretsConfig(default_dir="/etc/secrets"),
)
)
Fetching Entities#
Tasks, workflows, launch plans, and executions can be fetched using FlyteRemote.
flyte_task = remote.fetch_task(name="my_task", version="v1")
flyte_workflow = remote.fetch_workflow(name="my_workflow", version="v1")
flyte_launch_plan = remote.fetch_launch_plan(name="my_launch_plan", version="v1")
flyte_execution = remote.fetch_execution(name="my_execution")
project and domain can also be specified in all the fetch_* calls.
If not specified, the default values given during the creation of the FlyteRemote object will be used.
The following is an example that fetches workflow():
from flytekit import workflow
task_1 = remote.fetch_task(name="core.basic.hello_world.say_hello", version="v1")
task_2 = remote.fetch_task(
name="core.basic.lp.greet",
version="v13",
project="flytesnacks",
domain="development",
)
@workflow
def my_remote_wf(name: str) -> int:
return task_2(task_1(name=name))
Another example that dynamically creates a launch plan for the my_remote_wf workflow:
from flytekit import LaunchPlan
flyte_workflow = remote.fetch_workflow(
name="my_workflow", version="v1", project="flytesnacks", domain="development"
)
launch_plan = LaunchPlan.get_or_create(name="my_launch_plan", workflow=flyte_workflow)
Registering Entities#
Tasks, workflows, and launch plans can be registered using FlyteRemote.
from flytekit.configuration import SerializationSettings
flyte_entity = ...
flyte_task = remote.register_task(
entity=flyte_entity,
serialization_settings=SerializationSettings(image_config=None),
version="v1",
)
flyte_workflow = remote.register_workflow(
entity=flyte_entity,
serialization_settings=SerializationSettings(image_config=None),
version="v1",
)
flyte_launch_plan = remote.register_launch_plan(entity=flyte_entity, version="v1")
entity: the entity to register.version: the version that will be used to register. If not specified, the version used in serialization settings will be used.serialization_settings: the serialization settings to use. Refer toSerializationSettingsto know all the acceptable parameters.
All the additional parameters which can be sent to the register_* methods can be found in the documentation for the corresponding method:
register_task(), register_workflow(),
and register_launch_plan().
The SerializationSettings class accepts ImageConfig which
holds the available images to use for the registration.
The following example showcases how to register a workflow using an existing image if the workflow is created locally:
from flytekit.configuration import ImageConfig
img = ImageConfig.from_images(
"docker.io/xyz:latest", {"spark": "docker.io/spark:latest"}
)
wf2 = remote.register_workflow(
my_remote_wf,
serialization_settings=SerializationSettings(image_config=img),
version="v1",
)
Executing Entities#
You can execute a task, workflow, or launch plan using execute() method
which returns a FlyteWorkflowExecution object.
For more information on Flyte entities, see the remote flyte entities reference.
flyte_entity = ... # one of FlyteTask, FlyteWorkflow, or FlyteLaunchPlan
execution = remote.execute(
flyte_entity, inputs={...}, execution_name="my_execution", wait=True
)
inputs: the inputs to the entity.execution_name: the name of the execution. This is useful to avoid de-duplication of executions.wait: synchronously wait for the execution to complete.
Additional arguments include:
project: the project on which to execute the entity.domain: the domain on which to execute the entity.type_hints: a dictionary mapping Python types to their corresponding Flyte types.options: options can be configured for a launch plan during registration or overridden during execution. Refer toOptionsto know all the acceptable parameters.
The following is an example demonstrating how to use the Options class to configure a Flyte entity:
from flytekit.models.common import AuthRole, Labels
from flytekit.tools.translator import Options
flyte_entity = ... # one of FlyteTask, FlyteWorkflow, or FlyteLaunchPlan
execution = remote.execute(
flyte_entity,
inputs={...},
execution_name="my_execution",
wait=True,
options=Options(
raw_data_prefix="s3://my-bucket/my-prefix",
auth_role=AuthRole(assumable_iam_role="my-role"),
labels=Labels({"my-label": "my-value"}),
),
)
Retrieving & Inspecting Executions#
After an execution is completed, you can retrieve the execution using the fetch_execution() method.
The fetched execution can be used to retrieve the inputs and outputs of an execution.
execution = remote.fetch_execution(
name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development"
)
input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()
The inputs and outputs correspond to the top-level execution or the workflow itself.
To fetch a specific output, say, a model file:
model_file = execution.outputs["model_file"]
with open(model_file) as f:
# use mode
...
You can use sync() to sync the entity object’s state with the remote state during the execution run:
synced_execution = remote.sync(execution, sync_nodes=True)
node_keys = synced_execution.node_executions.keys()
Note
During the sync, you may come across Received message larger than max (xxx vs. 4194304) error if the message size is too large. In that case, edit the flyte-admin-base-config config map using the command kubectl edit cm flyte-admin-base-config -n flyte to increase the maxMessageSizeBytes value. Refer to the troubleshooting guide in case you’ve queries about the command’s usage.
node_executions will fetch all the underlying node executions recursively.
To fetch output of a specific node execution:
node_execution_output = synced_execution.node_executions["n1"].outputs["model_file"]
Node here, can correspond to a task, workflow, or branch node.
Reference launch plan executions#
When retrieving and inspecting an execution which calls a launch plan, the launch plan manifests as a sub-workflow which
can be found within the workflow_executions of a given node execution. Note that the workflow execution of interest
must again be synced in order to inspect the input and output of the contained tasks.
@task
def add_random(x: int) -> int:
return x + random.randint(1, 100)
@workflow
def sub_wf(x: int) -> int:
x = add_random(x=x)
return add_random(x=x)
sub_wf_lp = LaunchPlan.get_or_create(
name="sub_wf_lp",
workflow=sub_wf,
)
@workflow
def parent_wf(x: int = 1) -> int:
x = add_random(x=x)
return sub_wf_lp(x=x)
To get the output of the first add_random call in sub_wf, you can do the following with the execution from the
parent_wf:
execution = remote.fetch_execution(name="adgswtrzfn99k2cws49q", project="flytesnacks", domain="development")
remote.sync_execution(execution, sync_nodes=True)
remote.sync_execution(execution.node_executions['n1'].workflow_executions[0], sync_nodes=True)
out = execution.node_executions['n1'].workflow_executions[0].node_executions['n0'].outputs['o0']
Listing Entities#
To list the recent executions, use the recent_executions() method.
recent_executions = remote.recent_executions(project="flytesnacks", domain="development", limit=10)
The limit parameter is optional and defaults to 100.
To list tasks by version, use the list_tasks_by_version() method.
tasks = remote.list_tasks_by_version(project="flytesnacks", domain="development", version="v1")
Terminating an Execution#
To terminate an execution, use the terminate() method.
execution = remote.fetch_execution(name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development")
remote.terminate(execution, cause="Code needs to be updated")