Skip to content

CDK Orchestration

So far, we've seen examples of lambdas composed together similar to Airflow operators, where the composition of those functions is defined in the same module as those functions.

This is simple and intuitive and but there are MANY other powerful operations that can be composed using AWS Step Function besides lambdas.

Example Step Functions Tasks

Example#

Does Orkestra provide a way of helping us compose arbitrary step function tasks more intuitively?

Yes, Orkestra has a function coerce that takes any object with a .next method, such as those in the cdk step functions library, such that calling object_1 >> object_2 is equivalent to returning object_1.next(object_2).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import random

from orkestra import compose


@compose
def random_int(event, context) -> int:
    return random.randrange(100)


@compose
def random_shape(event, context):

    return random.choice(["triangle", "circle", "square"])


@compose
def random_animal(event, context):

    return random.choice(["cat", "dog", "goat"])
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import core as cdk

from examples.orchestration import (
    random_int,
    random_shape,
    random_animal,
)
from orkestra import coerce


class CdkComposition(cdk.Stack):
    def __init__(self, scope, id, **kwargs):
        super().__init__(scope, id, **kwargs)

        task_composition_def = (
            random_int.task(self)
            >> random_shape.task(self)
            >> random_animal.task(self)
        )

        sfn.StateMachine(
            self,
            "composed_task_sfn",
            definition=task_composition_def,
            state_machine_name="cdk_task_composition_example",
        )

        wait_1 = sfn.Wait(
            self,
            "wait1",
            time=sfn.WaitTime.duration(cdk.Duration.seconds(1)),
        )

        simple_coercion_def = (
            coerce(wait_1)
            >> random_int.task(self)
            >> sfn.Succeed(self, "great_success")
        )

        sfn.StateMachine(
            self,
            "simple_coercion_sfn",
            definition=simple_coercion_def,
            state_machine_name="simple_coercion_example",
        )