compose
__init__(self, func=None, enable_powertools=False, is_map_job=False, capture_map_errors=False, log_event=None, capture_response=None, capture_error=None, raise_on_empty_metrics=None, capture_cold_start_metric=None, default_dimensions=None, model=None, envelope=None, timeout=None, runtime=None, layers=None, comment=None, input_path=None, items_path=None, max_concurrency=None, output_path=None, parameters=None, result_path=None, result_selector=None, client_context=None, invocation_type=None, payload_response_only=None, qualifier=None, retry_on_service_exceptions=None, heartbeat=None, integration_pattern=None, sfn_timeout=None, tracing=None, state_machine_type=None, state_machine_name=None, **aws_lambda_constructor_kwargs)
special
#
Container for functions meant to be composed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Union[Callable, Iterable[Callable]] |
a function or list or tuple of functions |
None |
timeout |
Optional[orkestra.interfaces.Duration] |
the timeout duration of the lambda |
None |
enable_powertools |
bool |
if true, enables powertools |
False |
log_event |
Optional[bool] |
passed to aws_lambda_powertools.Logger |
None |
capture_response |
Optional[bool] |
passed to aws_lambda_powertools.Tracer |
None |
capture_error |
Optional[bool] |
passed to aws_lambda_powertools.Tracer |
None |
raise_on_empty_metrics |
Optional[bool] |
passed to aws_lambda_powertools.Metrics |
None |
capture_cold_start_metric |
Optional[bool] |
passed to aws_lambda_powertools.Metrics |
None |
default_dimensions |
Optional[dict] |
passed to aws_lambda_powertools.Metrics |
None |
model |
Optional[pydantic.BaseModel] |
passed to aws_lambda_powertools.utilities.parser.event_parser |
None |
envelope |
Optional[pydantic.BaseModel] |
passed to aws_lambda_powertools.utilities.parser.event_parser |
None |
runtime |
Optional[orkestra.interfaces.Runtime] |
the python runtime to use for the lambda |
None |
layers |
Optional[Sequence[orkestra.interfaces.PythonLayerVersion]] |
A list of layers to add to the function’s execution environment. You can configure your Lambda function to pull in additional code during initialization in the form of layers. Layers are packages of libraries or other dependencies that can be used by multiple functions. Default: - No layers. |
None |
is_map_job |
bool |
whether the lambda is a map job |
False |
capture_map_errors |
bool |
set true to add guarantee successful map job execution |
False |
comment |
Optional[str] |
An optional description for this state. Default: No comment |
None |
input_path |
Optional[str] |
JSONPath expression to select part of the state to be the input to this state. May also be the special value JsonPath.DISCARD, which will cause the effective input to be the empty object {}. Default: $ |
None |
items_path |
Optional[str] |
JSONPath expression to select the array to iterate over. Default: $ |
None |
max_concurrency |
Union[int, float] |
MaxConcurrency. An upper bound on the number of iterations you want running at once. Default: - full concurrency |
None |
output_path |
Optional[str] |
JSONPath expression to select part of the state to be the output to this state. May also be the special value JsonPath.DISCARD, which will cause the effective output to be the empty object {}. Default: $ |
None |
parameters |
Optional[Mapping[str, Any]] |
The JSON that you want to override your default iteration input. Default: $ |
None |
result_path |
Optional[str] |
JSONPath expression to indicate where to inject the state’s output. May also be the special value JsonPath.DISCARD, which will cause the state’s input to become its output. Default: $ |
None |
result_selector |
Optional[Mapping[str, Any]] |
The JSON that will replace the state’s raw result and become the effective result before ResultPath is applied. You can use ResultSelector to create a payload with values that are static or selected from the state’s raw result. Default: - None |
None |
client_context |
Optional[str] |
Up to 3583 bytes of base64-encoded data about the invoking client to pass to the function. Default: - No context |
None |
invocation_type |
Optional[orkestra.interfaces.LambdaInvocationType] |
Invocation type of the Lambda function. Default: InvocationType.REQUEST_RESPONSE |
None |
payload_response_only |
Optional[bool] |
Invoke the Lambda in a way that only returns the payload response without additional metadata. The payloadResponseOnly property cannot be used if integrationPattern, invocationType, clientContext, or qualifier are specified. It always uses the REQUEST_RESPONSE behavior. Default: false |
None |
qualifier |
Optional[str] |
Version or alias to invoke a published version of the function. You only need to supply this if you want the version of the Lambda Function to depend on data in the state machine state. If not, you can pass the appropriate Alias or Version object directly as the lambdaFunction argument. Default: - Version or alias inherent to the lambdaFunction object. |
None |
retry_on_service_exceptions |
Optional[bool] |
Whether to retry on Lambda service exceptions. This handles Lambda.ServiceException, Lambda.AWSLambdaException and Lambda.SdkClientException with an interval of 2 seconds, a back-off rate of 2 and 6 maximum attempts. Default: true |
None |
heartbeat |
Optional[orkestra.interfaces.Duration] |
Timeout for the heartbeat. Default: - None |
None |
integration_pattern |
Optional[orkestra.interfaces.IntegrationPattern] |
AWS Step Functions integrates with services directly in the Amazon States Language. You can control these AWS services using service integration patterns Default: IntegrationPattern.REQUEST_RESPONSE |
None |
sfn_timeout |
Optional[orkestra.interfaces.Duration] |
Timeout for the state machine. Default: - None |
None |
tracing |
Optional[orkestra.interfaces.Tracing] |
Enable AWS X-Ray Tracing for Lambda Function. Default: Tracing.Enabled |
None |
state_machine_type |
Optional[orkestra.interfaces.StateMachineType] |
Type of the state machine. Default: StateMachineType.STANDARD |
None |
state_machine_name |
Optional[str] |
A name for the state machine. Default: A name is automatically generated |
None |
**aws_lambda_constructor_kwargs |
|
pass directly to sfn.PythonFunction |
{} |
For cdk params see docs.aws.amazon.com/cdk/api/latest/python/modules.html For powertools params see awslabs.github.io/aws-lambda-powertools-python/latest/
Source code in orkestra/decorators.py
def __init__(
self,
func: OptionalFn = None,
enable_powertools: bool = False,
is_map_job: bool = False,
capture_map_errors: bool = False,
log_event: Optional[bool] = None,
capture_response: Optional[bool] = None,
capture_error: Optional[bool] = None,
raise_on_empty_metrics: Optional[bool] = None,
capture_cold_start_metric: Optional[bool] = None,
default_dimensions: Optional[dict] = None,
model: Optional["pydantic.BaseModel"] = None,
envelope: Optional["pydantic.BaseModel"] = None,
timeout: Optional[Duration] = None,
runtime: Optional[Runtime] = None,
layers: Optional[Sequence[PythonLayerVersion]] = None,
comment: Optional[str] = None,
input_path: Optional[str] = None,
items_path: Optional[str] = None,
max_concurrency: Optional[Union[int, float, None]] = None,
output_path: Optional[str] = None,
parameters: Optional[Mapping[str, Any]] = None,
result_path: Optional[str] = None,
result_selector: Optional[Mapping[str, Any]] = None,
client_context: Optional[str] = None,
invocation_type: Optional[LambdaInvocationType] = None,
payload_response_only: Optional[bool] = None,
qualifier: Optional[str] = None,
retry_on_service_exceptions: Optional[bool] = None,
heartbeat: Optional[Duration] = None,
integration_pattern: Optional[IntegrationPattern] = None,
sfn_timeout: Optional[Duration] = None,
tracing: Optional[Tracing] = None,
state_machine_type: Optional[SfnType] = None,
state_machine_name: Optional[str] = None,
**aws_lambda_constructor_kwargs,
):
"""
Container for functions meant to be composed.
Args:
func: a function or list or tuple of functions
timeout: the timeout duration of the lambda
enable_powertools: if true, enables powertools
log_event: passed to aws_lambda_powertools.Logger
capture_response: passed to aws_lambda_powertools.Tracer
capture_error: passed to aws_lambda_powertools.Tracer
raise_on_empty_metrics: passed to aws_lambda_powertools.Metrics
capture_cold_start_metric: passed to aws_lambda_powertools.Metrics
default_dimensions: passed to aws_lambda_powertools.Metrics
model: passed to aws_lambda_powertools.utilities.parser.event_parser
envelope: passed to aws_lambda_powertools.utilities.parser.event_parser
runtime: the python runtime to use for the lambda
layers: A list of layers to add to the function’s execution environment. You can configure your Lambda function to pull in additional code during initialization in the form of layers. Layers are packages of libraries or other dependencies that can be used by multiple functions. Default: - No layers.
is_map_job: whether the lambda is a map job
capture_map_errors: set true to add guarantee successful map job execution
comment: An optional description for this state. Default: No comment
input_path: JSONPath expression to select part of the state to be the input to this state. May also be the special value JsonPath.DISCARD, which will cause the effective input to be the empty object {}. Default: $
items_path: JSONPath expression to select the array to iterate over. Default: $
max_concurrency: MaxConcurrency. An upper bound on the number of iterations you want running at once. Default: - full concurrency
output_path: JSONPath expression to select part of the state to be the output to this state. May also be the special value JsonPath.DISCARD, which will cause the effective output to be the empty object {}. Default: $
parameters: The JSON that you want to override your default iteration input. Default: $
result_path: JSONPath expression to indicate where to inject the state’s output. May also be the special value JsonPath.DISCARD, which will cause the state’s input to become its output. Default: $
result_selector: The JSON that will replace the state’s raw result and become the effective result before ResultPath is applied. You can use ResultSelector to create a payload with values that are static or selected from the state’s raw result. Default: - None
client_context: Up to 3583 bytes of base64-encoded data about the invoking client to pass to the function. Default: - No context
invocation_type: Invocation type of the Lambda function. Default: InvocationType.REQUEST_RESPONSE
payload_response_only: Invoke the Lambda in a way that only returns the payload response without additional metadata. The payloadResponseOnly property cannot be used if integrationPattern, invocationType, clientContext, or qualifier are specified. It always uses the REQUEST_RESPONSE behavior. Default: false
qualifier: Version or alias to invoke a published version of the function. You only need to supply this if you want the version of the Lambda Function to depend on data in the state machine state. If not, you can pass the appropriate Alias or Version object directly as the lambdaFunction argument. Default: - Version or alias inherent to the lambdaFunction object.
retry_on_service_exceptions: Whether to retry on Lambda service exceptions. This handles Lambda.ServiceException, Lambda.AWSLambdaException and Lambda.SdkClientException with an interval of 2 seconds, a back-off rate of 2 and 6 maximum attempts. Default: true
heartbeat: Timeout for the heartbeat. Default: - None
integration_pattern: AWS Step Functions integrates with services directly in the Amazon States Language. You can control these AWS services using service integration patterns Default: IntegrationPattern.REQUEST_RESPONSE
sfn_timeout: Timeout for the state machine. Default: - None
tracing: Enable AWS X-Ray Tracing for Lambda Function. Default: Tracing.Enabled
state_machine_type: Type of the state machine. Default: StateMachineType.STANDARD
state_machine_name: A name for the state machine. Default: A name is automatically generated
**aws_lambda_constructor_kwargs: pass directly to sfn.PythonFunction
For cdk params see https://docs.aws.amazon.com/cdk/api/latest/python/modules.html
For powertools params see https://awslabs.github.io/aws-lambda-powertools-python/latest/
"""
self.func = func
self.downstream = []
self.is_map_job = is_map_job
self.capture_map_errors = capture_map_errors
self.aws_lambda_constructor_kwargs = aws_lambda_constructor_kwargs
self.map_job_kwargs = {
"comment": comment,
"input_path": input_path,
"items_path": items_path,
"max_concurrency": max_concurrency,
"output_path": output_path,
"result_path": result_path,
"result_selector": result_selector,
"parameters": parameters,
}
self.lambda_invoke_kwargs = _coalesce(
self._sfn_task_defaults,
{
"client_context": client_context,
"invocation_type": invocation_type,
"payload_response_only": payload_response_only,
"retry_on_service_exceptions": retry_on_service_exceptions,
"heartbeat": heartbeat,
"integration_pattern": integration_pattern,
"timeout": sfn_timeout,
"comment": comment,
"input_path": input_path,
"output_path": output_path,
"result_path": result_path,
"result_selector": result_selector,
"qualifier": qualifier,
},
)
self.powertools_kwargs = _coalesce(
self._powertools_defaults,
default_dimensions=default_dimensions,
model=model,
envelope=envelope,
log_event=log_event,
capture_response=capture_response,
capture_error=capture_error,
raise_on_empty_metrics=raise_on_empty_metrics,
capture_cold_start_metric=capture_cold_start_metric,
)
self.state_machine_kwargs = {
"state_machine_type": state_machine_type,
"state_machine_name": state_machine_name,
}
self.aws_lambda_constructor_kwargs.update(
timeout=timeout,
runtime=runtime,
tracing=tracing,
layers=layers,
)
self.enable_powertools = enable_powertools
self._lambda_function = None
self._update_metadata()
aws_lambda(self, scope, id=None, **kwargs)
#
Return lambda cdk construct.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scope |
aws_cdk.core.Construct |
cdk construct |
required |
id |
Optional[str] |
construct id |
None |
**kwargs |
|
to be passed to aws_cdk.aws_lambda_python.PythonFunction |
{} |
Returns (aws_cdk.aws_lambda_python.PythonFunction): python lambda
Source code in orkestra/decorators.py
def aws_lambda(
self,
scope: "aws_cdk.core.Construct",
id: Optional[str] = None,
**kwargs,
):
"""
Return lambda cdk construct.
Args:
scope: cdk construct
id: construct id
**kwargs: to be passed to aws_cdk.aws_lambda_python.PythonFunction
Returns (aws_cdk.aws_lambda_python.PythonFunction): python lambda
"""
return self._render_lambda(
self,
scope,
id=id,
**kwargs,
)
definition(self, scope, previous_definition=None, previously_composed=None)
#
Return automagically composed cdk state machine definition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scope |
aws_cdk.core.Construct |
cdk scope |
required |
previous_definition |
Optional[aws_cdk.aws_stepfunctions.IChainable] |
the previous definition |
None |
previously_composed |
Optional[Compose] |
the previously composed |
None |
Source code in orkestra/decorators.py
def definition(
self,
scope: "aws_cdk.core.Construct",
previous_definition: Optional[
"aws_cdk.aws_stepfunctions.IChainable"
] = None,
previously_composed: Optional["Compose"] = None,
):
"""
Return automagically composed cdk state machine definition.
Args:
scope: cdk scope
previous_definition: the previous definition
previously_composed: the previously composed
Returns:
"""
previously_composed = previously_composed or []
if self in previously_composed:
raise CompositionError(
f"Failed to compose {self}. Composition using >> must be acyclic."
)
task = self.task(scope)
definition = (
task
if previous_definition is None
else previous_definition.next(task)
)
if self.downstream:
for c in self.downstream:
c.definition(
scope,
previous_definition=definition,
previously_composed=previously_composed + [self],
)
return definition
schedule(self, scope, id=None, expression=None, day=None, hour=None, minute=None, month=None, week_day=None, year=None, state_machine_name=None, state_machine_type=None, **kwargs)
#
Schedule lambda or state machine to run on interval using EventBridge scheduled event rule.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scope |
aws_cdk.core.Construct |
cdk scope |
required |
id |
Optional[str] |
construct id |
None |
expression |
Optional[str] |
interval at which to run. Can be cron expression or CloudWatch rate expression |
None |
day |
Optional[str] |
day of month |
None |
hour |
Optional[str] |
hour of day |
None |
minute |
Optional[str] |
minute of our |
None |
month |
Optional[str] |
month of year |
None |
week_day |
Optional[str] |
week day |
None |
year |
Optional[str] |
year |
None |
state_machine_name |
Optional[str] |
the state machine name, if downstream |
None |
state_machine_type |
Optional[orkestra.interfaces.StateMachineType] |
type of state machine; express or standard |
None |
**kwargs |
|
{} |
Returns (tuple): EventBridge schedule rule, SFN State Machine
Source code in orkestra/decorators.py
def schedule(
self,
scope: "aws_cdk.core.Construct",
id: Optional[str] = None,
expression: Optional[str] = None,
day: Optional[str] = None,
hour: Optional[str] = None,
minute: Optional[str] = None,
month: Optional[str] = None,
week_day: Optional[str] = None,
year: Optional[str] = None,
state_machine_name: Optional[str] = None,
state_machine_type: Optional[SfnType] = None,
**kwargs,
) -> tuple:
"""
Schedule lambda or state machine to run on interval using EventBridge scheduled event rule.
Args:
scope: cdk scope
id: construct id
expression: interval at which to run. Can be cron expression or CloudWatch rate expression
day: day of month
hour: hour of day
minute: minute of our
month: month of year
week_day: week day
year: year
state_machine_name: the state machine name, if downstream
state_machine_type: type of state machine; express or standard
**kwargs:
Returns (tuple): EventBridge schedule rule, SFN State Machine
"""
from aws_cdk import aws_events as eventbridge
from aws_cdk import aws_events_targets as eventbridge_targets
id = id or _incremental_id(f"{self.func.__name__}_sched")
if expression is not None:
schedule = eventbridge.Schedule.expression(expression)
else:
schedule = eventbridge.Schedule.cron(
day=day,
hour=hour,
minute=minute,
month=month,
week_day=week_day,
year=year,
)
rule = eventbridge.Rule(
scope,
id,
schedule=schedule,
**kwargs,
)
state_machine = self.state_machine(
scope,
state_machine_name=state_machine_name,
state_machine_type=state_machine_type,
**kwargs,
)
target = eventbridge_targets.SfnStateMachine(machine=state_machine)
rule.add_target(target)
return rule, state_machine
state_machine(self, scope, id=None, tracing_enabled=True, state_machine_name=None, state_machine_type=None, **kwargs)
#
Return step functions state machine cdk construct.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scope |
aws_cdk.core.Construct |
cdk scope |
required |
id |
Optional[str] |
cdk id |
None |
tracing_enabled |
bool |
xray tracing |
True |
state_machine_name |
Optional[str] |
name of state machine |
None |
state_machine_type |
Union[orkestra.interfaces.StateMachineType, aws_cdk.aws_stepfunctions.StateMachineType] |
express or standard |
None |
**kwargs |
|
{} |
Source code in orkestra/decorators.py
def state_machine(
self,
scope: "aws_cdk.core.Construct",
id: Optional[str] = None,
tracing_enabled: bool = True,
state_machine_name: Optional[str] = None,
state_machine_type: Optional[
Union[SfnType, "aws_cdk.aws_stepfunctions.StateMachineType"]
] = None,
**kwargs,
):
"""
Return step functions state machine cdk construct.
Args:
scope: cdk scope
id: cdk id
tracing_enabled: xray tracing
state_machine_name: name of state machine
state_machine_type: express or standard
**kwargs:
Returns:
"""
from aws_cdk.aws_stepfunctions import StateMachine
id = id or _incremental_id(f"{self.func.__name__}_sfn")
state_machine_kwargs = _coalesce(
self.state_machine_kwargs,
state_machine_name=state_machine_name,
state_machine_type=state_machine_type,
**kwargs,
)
return StateMachine(
scope,
id,
definition=self.definition(scope),
tracing_enabled=tracing_enabled,
**state_machine_kwargs,
)
task(self, scope, id=None, payload_response_only=True, function_name=None, **kwargs)
#
Return cdk step function task construct.
Source code in orkestra/decorators.py
def task(
self,
scope: "aws_cdk.core.Construct",
id: Optional[str] = None,
payload_response_only: bool = True,
function_name: Optional[str] = None,
**kwargs,
):
"""
Return cdk step function task construct.
"""
from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import aws_stepfunctions_tasks as sfn_tasks
if self.is_map_job:
id = id or _incremental_id(self.func.__name__)
map_kwargs = _coalesce(self.map_job_kwargs)
task = sfn.Map(scope, id, **map_kwargs)
lambda_function = self.aws_lambda(
scope,
function_name=function_name,
)
keyword_args = _coalesce(
self.lambda_invoke_kwargs,
lambda_function=lambda_function,
payload_response_only=payload_response_only,
**kwargs,
)
task_id = f"invoke_{id}"
invoke_lambda = sfn_tasks.LambdaInvoke(
scope,
task_id,
**keyword_args,
)
invoke_lambda.lambda_function = lambda_function
if self.capture_map_errors:
invoke_lambda.add_catch(
sfn.Pass(
scope,
f"{task_id}_failed",
)
)
task.iterator(invoke_lambda)
elif isinstance(self.func, (list, tuple)):
id = "parallelize " + (
"".join([c.func.__name__ for c in self.func])
)
task = sfn.Parallel(
scope,
_incremental_id(id),
)
for fn in self.func:
lambda_fn = fn.aws_lambda(scope)
keyword_args = _coalesce(
self.lambda_invoke_kwargs,
lambda_function=lambda_fn,
payload_response_only=payload_response_only,
**kwargs,
)
branch = sfn_tasks.LambdaInvoke(
scope,
_incremental_id(fn.func.__name__),
**keyword_args,
)
if isinstance(self.func, tuple):
branch.add_catch(
sfn.Pass(
scope,
f"{fn.func.__name__}_failed",
)
)
task.branch(branch)
else:
id = id or _incremental_id(self.func.__name__)
lambda_function = self.aws_lambda(
scope,
function_name=function_name,
)
keyword_args = _coalesce(
self.lambda_invoke_kwargs,
lambda_function=lambda_function,
payload_response_only=payload_response_only,
**kwargs,
)
task = sfn_tasks.LambdaInvoke(
scope,
id,
**keyword_args,
)
task.lambda_function = lambda_function
return coerce(task)