Raw containers#
This example demonstrates how to use arbitrary containers in 5 different languages, all orchestrated in flytekit seamlessly. Flyte mounts an input data volume where all the data needed by the container is available, and an output data volume for the container to write all the data which will be stored away.
The data is written as separate files, one per input variable. The format of the file is serialized strings. Refer to the raw protocol to understand how to leverage this.
Note
To clone and run the example code on this page, see the Flytesnacks repo.
import logging
from flytekit import ContainerTask, ImageSpec, kwtypes, task, workflow
from flytekit.core.base_task import TaskMetadata
logger = logging.getLogger(__file__)
Container tasks#
A flytekit.ContainerTask
denotes an arbitrary container. In the following example, the name of the task
is calculate_ellipse_area_shell
. This name has to be unique in the entire project. Users can specify:
input_data_dir
-> where inputs will be written to.output_data_dir
-> where Flyte will expect the outputs to exist.
inputs
and outputs
specify the interface for the task; thus it should be an ordered dictionary of typed input and
output variables.
The image
field specifies the container image for the task, either as an image name or
an ImageSpec.
To access the file that is not included in the image, use ImageSpec
to copy files or
directories into container /root
.
Cache can be enabled in a ContainerTask
by configuring the cache settings in the TaskMetadata
in the metadata
parameter.
calculate_ellipse_area_shell = ContainerTask(
name="ellipse-area-metadata-shell",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-shell:v2",
command=[
"./calculate-ellipse-area.sh",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)
# use `ImageSpec` to copy files or directories into container `/root`
calculate_ellipse_area_python = ContainerTask(
name="ellipse-area-metadata-python",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image=ImageSpec(
base_image="ghcr.io/flyteorg/rawcontainers-python:v2",
registry="localhost:30000",
builder="default",
copy=["calculate-ellipse-area-new.py"],
),
command=[
"python",
"calculate-ellipse-area-new.py",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)
calculate_ellipse_area_r = ContainerTask(
name="ellipse-area-metadata-r",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-r:v2",
command=[
"Rscript",
"--vanilla",
"calculate-ellipse-area.R",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)
calculate_ellipse_area_haskell = ContainerTask(
name="ellipse-area-metadata-haskell",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-haskell:v2",
command=[
"./calculate-ellipse-area",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)
calculate_ellipse_area_julia = ContainerTask(
name="ellipse-area-metadata-julia",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-julia:v2",
command=[
"julia",
"calculate-ellipse-area.jl",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
metadata=TaskMetadata(cache=True, cache_version="1.0"),
)
@task
def report_all_calculated_areas(
area_shell: float,
metadata_shell: str,
area_python: float,
metadata_python: str,
area_r: float,
metadata_r: str,
area_haskell: float,
metadata_haskell: str,
area_julia: float,
metadata_julia: str,
As can be seen in this example, ContainerTask
s can be interacted with like normal Python functions, whose inputs
correspond to the declared input variables. All data returned by the tasks are consumed and logged by a Flyte task.
@workflow
def wf(a: float, b: float):
# Calculate area in all languages
area_shell, metadata_shell = calculate_ellipse_area_shell(a=a, b=b)
area_python, metadata_python = calculate_ellipse_area_python(a=a, b=b)
area_r, metadata_r = calculate_ellipse_area_r(a=a, b=b)
area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)
# Report on all results in a single task to simplify comparison
report_all_calculated_areas(
area_shell=area_shell,
metadata_shell=metadata_shell,
area_python=area_python,
metadata_python=metadata_python,
area_r=area_r,
metadata_r=metadata_r,
area_haskell=area_haskell,
metadata_haskell=metadata_haskell,
area_julia=area_julia,
metadata_julia=metadata_julia,
)
One of the benefits of raw container tasks is that Flytekit does not need to be installed in the target container.
Note
Raw containers can be run locally when flytekit version >= 1.11.0.
Raise User Error#
Raw containers handle errors by checking for the presence of an _ERROR
file in the
output_data_dir
after the container’s execution. If this file exists, Flyte treats it as
a user-defined error and retries the task if retries
parameter is set in the task
metadata.
Scripts#
The contents of each script specified in the ContainerTask
is as follows:
calculate-ellipse-area.sh#
#! /usr/bin/env sh
echo "4*a(1) * $1 * $2" | bc -l | tee "$3/area"
echo "[from shell rawcontainer]" | tee "$3/metadata"
calculate-ellipse-area.py#
import math
import sys
def write_output(output_dir, output_file, v):
with open(f"{output_dir}/{output_file}", "w") as f:
f.write(str(v))
def calculate_area(a, b):
return math.pi * a * b
def main(a, b, output_dir):
a = float(a)
b = float(b)
area = calculate_area(a, b)
write_output(output_dir, "area", area)
write_output(output_dir, "metadata", "[from python rawcontainer]")
if __name__ == "__main__":
a = sys.argv[1]
b = sys.argv[2]
output_dir = sys.argv[3]
main(a, b, output_dir)
calculate-ellipse-area.R#
#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE)
a = args[1]
b = args[2]
output_dir = args[3]
area <- pi * as.double(a) * as.double(b)
print(area)
writeLines(as.character(area), sprintf("%s/%s", output_dir, 'area'))
writeLines("[from R rawcontainer]", sprintf("%s/%s", output_dir, 'metadata'))
calculate-ellipse-area.hs#
import System.IO
import System.Environment
import Text.Read
import Text.Printf
calculateEllipseArea :: Float -> Float -> Float
calculateEllipseArea a b = pi * a * b
main = do
args <- getArgs
let a = args!!0
b = args!!1
let area = calculateEllipseArea (read a::Float) (read b::Float)
let output_area = args!!2 ++ "/area"
output_metadata = args!!2 ++ "/metadata"
writeFile output_area (show area)
writeFile output_metadata "[from haskell rawcontainer]"
calculate-ellipse-area.jl#
using Printf
function calculate_area(a, b)
π * a * b
end
function write_output(output_dir, output_file, v)
output_path = @sprintf "%s/%s" output_dir output_file
open(output_path, "w") do file
write(file, string(v))
end
end
function main(a, b, output_dir)
a = parse.(Float64, a)
b = parse.(Float64, b)
area = calculate_area(a, b)
write_output(output_dir, "area", area)
write_output(output_dir, "metadata", "[from julia rawcontainer]")
end
# the keyword ARGS is a special value that contains the command-line arguments
# julia arrays are 1-indexed
a = ARGS[1]
b = ARGS[2]
output_dir = ARGS[3]
main(a, b, output_dir)