Skip to content

Orkestra#

Docs Codecov

Discord PyPI PyPI - Downloads PyPI - License PyPI - Python Version GitHub issues Mentioned in Awesome CDK

  • deploy lambdas, compose them into workflows, and trigger them on schedule or from cloud events with ease (at a tiny fraction of the cost of Airflow)
  • render your diagrams dynamically from code (like Airflow)
  • no more wondering about the status of your jobs (and how they broke)
  • no more struggling with the operational maintenance of always-on infrastructure to run your jobs

What is Orkestra?#

Orkestra is a lightweight abstraction layer on top of

that provides a seamless way of building observable (scheduled or event-driven) cloud-native workflows.

It aims to bring a similar development experience to that of Airflow while leveraging the full power of AWS.

Features#

  • simple intuitive developer experience
  • scheduled (ETL) workflows
  • event-driven workflows
  • simplified local testing
  • natively integrated with AWS
  • cost-effective
  • highly scalable

Example#

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import random
from typing import *
from uuid import uuid4

from aws_lambda_powertools import Logger, Tracer
from pydantic import BaseModel

from orkestra import compose
from orkestra.interfaces import Duration


def dag():
    (
        generate_item
        >> add_price
        >> copy_item
        >> double_price
        >> (do_nothing, assert_false)
        >> say_hello
        >> [random_int, random_float]
        >> say_goodbye
    )


class Item(BaseModel):
    id: str
    name: str
    price: Optional[float] = None

    @classmethod
    def random(cls):
        return cls(
            id=str(uuid4()),
            name=random.choice(
                [
                    "potato",
                    "moon rock",
                    "hat",
                ]
            ),
        )


logger = Logger()

tracer = Tracer()


default_args = dict(
    enable_powertools=True,
    timeout=Duration.seconds(6),
)


@compose(**default_args)
def generate_item(event, context):
    logger.info("generating random item")
    item = Item.random().dict()
    logger.info(item)
    tracer.put_metadata("GenerateItem", "SUCCESS")
    return item


@compose(model=Item, **default_args)
def add_price(item: Item, context):
    price = 3.14
    logger.info(
        "adding price to item",
        extra={
            "item": item.dict(),
            "price": price,
        },
    )
    item.price = price
    return item.dict()


@compose(model=Item, **default_args)
def copy_item(item: Item, context) -> list:
    logger.info(item.dict())
    return [item.dict()] * 10


@compose(model=Item, is_map_job=True, **default_args)
def double_price(item: Item, context):
    item.price = item.price * 2
    return item.dict()


@compose(**default_args)
def assert_false(event, context):
    assert False


@compose(**default_args)
def do_nothing(event, context):
    logger.info({"doing": "nothing"})


@compose(**default_args)
def say_hello(event, context):
    return "hello, world"


@compose(**default_args)
def say_goodbye(event, context):
    return "goodbye"


@compose(**default_args)
def random_int(event, context):
    return random.randrange(100)


@compose(**default_args)
def random_float(event, context):
    return float(random_int(event, context))


dag()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from aws_cdk import core as cdk
from examples.hello_orkestra import generate_item

class HelloOrkestra(cdk.Stack):
    def __init__(self, scope, id, **kwargs):

        super().__init__(scope, id, **kwargs)

        generate_item.schedule(
            self,
            expression="rate(5 minutes)",
            state_machine_name="hello_orkestra",
        )

app = cdk.App()

HelloOrkestra(app, "helloOrkestra")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from dataclasses import dataclass

import pytest

from examples.hello_orkestra import (
    generate_item,
    add_price,
    copy_item,
    double_price,
    Item,
    assert_false,
    do_nothing,
    say_hello,
    say_goodbye,
    random_int,
    random_float,
)


@pytest.fixture
def context():
    @dataclass
    class LambdaContext:
        function_name: str = "test"
        memory_limit_in_mb: int = 128
        invoked_function_arn: str = (
            "arn:aws:lambda:eu-west-1:809313241:function:test"
        )
        aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72"

    return LambdaContext()


@pytest.fixture
def item():
    return Item.random().dict()


class TestMethods:
    @staticmethod
    def test_generate_item(item, context):
        generated = generate_item(item, context)
        assert Item(**generated)

    @staticmethod
    def test_add_price(item, context):
        result = add_price(item, context)
        assert result["price"]

    @staticmethod
    def test_copy_item(item, context):
        result = copy_item(item, context)
        assert all(i == item for i in result)

    @staticmethod
    def test_double_price(item, context):
        item["price"] = 1
        result = double_price(item, context)
        assert result["price"] == item["price"] * 2

    @staticmethod
    def test_assert_false(item, context):
        with pytest.raises(AssertionError):
            assert_false(item, context)

    @staticmethod
    def test_do_nothing(item, context):
        assert do_nothing(item, context) is None

    @staticmethod
    def test_say_hello(item, context):
        assert say_hello(item, context)

    @staticmethod
    def test_goodbye(item, context):
        assert say_goodbye(item, context)

    @staticmethod
    def test_random_int(item, context):
        result = random_int(item, context)
        assert isinstance(result, int)

    @staticmethod
    def test_random_float(item, context):
        result = random_float(item, context)
        assert isinstance(result, float)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@compose(model=Item, **default_args)
def add_price(item: Item, context):
    price = 3.14
    logger.info(
        "adding price to item",
        extra={
            "item": item.dict(),
            "price": price,
        },
    )
    item.price = price
    return item.dict()