flytekit.dynamic#
- flytekit.dynamic(_task_function=None, task_config=None, cache=False, retries=0, interruptible=None, deprecated='', timeout=0, container_image=None, environment=None, requests=None, limits=None, secret_requests=None, *, execution_mode=ExecutionBehavior.DYNAMIC, node_dependency_hints=None, task_resolver=None, docs=None, disable_deck=None, enable_deck=None, deck_fields=(DeckField.SOURCE_CODE, DeckField.DEPENDENCIES, DeckField.TIMELINE, DeckField.INPUT, DeckField.OUTPUT), pod_template=None, pod_template_name=None, accelerator=None, pickle_untyped=False, shared_memory=None, resources=None, **kwargs)#
Please first see the comments for
flytekit.task()andflytekit.workflow(). Thisdynamicconcept is an amalgamation of both and enables the user to pursue some pretty incredible constructs.In short, a task’s function is run at execution time only, and a workflow function is run at compilation time only (local execution notwithstanding). A dynamic workflow is modeled on the backend as a task, but at execution time, the function body is run to produce a workflow. It is almost as if the decorator changed from
@taskto@workflowexcept workflows cannot make use of their inputs like native Python values whereas dynamic workflows can. The resulting workflow is passed back to the Flyte engine and is run as a subworkflow. Simple usage@dynamic def my_dynamic_subwf(a: int) -> (typing.List[str], int): s = [] for i in range(a): s.append(t1(a=i)) return s, 5
Note in the code block that we call the Python
rangeoperator on the input. This is typically not allowed in a workflow but it is here. You can even express dependencies between tasks.@dynamic def my_dynamic_subwf(a: int, b: int) -> int: x = t1(a=a) return t2(b=b, x=x)
See the cookbook for a longer discussion.
- Parameters:
_task_function (Optional[Callable[P, FuncOut]])
task_config (Optional[T])
retries (int)
interruptible (Optional[bool])
deprecated (str)
timeout (Union[datetime.timedelta, int])
requests (Optional[Resources])
limits (Optional[Resources])
secret_requests (Optional[List[Secret]])
execution_mode (PythonFunctionTask.ExecutionBehavior)
node_dependency_hints (Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]])
task_resolver (Optional[TaskResolverMixin])
docs (Optional[Documentation])
disable_deck (Optional[bool])
enable_deck (Optional[bool])
deck_fields (Optional[Tuple[DeckField, ...]])
pod_template (Optional['PodTemplate'])
pod_template_name (Optional[str])
accelerator (Optional[BaseAccelerator])
pickle_untyped (bool)
shared_memory (Optional[Union[L[True], str]])
resources (Optional[Resources])
- Return type:
Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]