Map Jobs

State Machines halt execution at the first encounter of a failure.

Sometimes, when processing map jobs, you may want all instances of the map job to complete regardless of individual errors.

We can accomodate for this by passing capture_map_errors=True to our compose constructor.

Example

1
2
3
4
5
from orkestra import compose

@compose(is_map_job=True, capture_map_errors=True)
def flaky_map_job(event, context):
    ...
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import random
from typing import *

from orkestra import compose


class Error(TypedDict):
    Error: str
    Cause: str


@compose
def ones_and_zeros(event, context) -> List[int]:
    return random.choices([0, 1], k=10)


@compose(is_map_job=True, capture_map_errors=True)
def divide_by(n: int, context) -> float:
    return 1 / n


@compose(is_map_job=True)
def filter_division_errors(event: Union[float, Error], context) -> float:
    return event if isinstance(event, float) else 0.0


@compose
def sum_up(numbers: List[float], context):
    return sum(numbers)


@compose
def times_3(n: Union[int, float], context) -> Union[int, float]:
    assert isinstance(n, (int, float))
    return n * 3


ones_and_zeros >> divide_by >> filter_division_errors >> sum_up >> times_3
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from aws_cdk import core as cdk

from examples.map_job import ones_and_zeros


class MapJob(cdk.Stack):
    def __init__(self, scope, id, **kwargs):

        super().__init__(scope, id, **kwargs)

        ones_and_zeros.schedule(
            self,
            state_machine_name="map_example",
        )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from math import isnan
from typing import *

import pytest
from hypothesis import given, infer, assume
from hypothesis.strategies import lists, floats

from examples.map_job import (
    ones_and_zeros,
    divide_by,
    times_3,
    sum_up,
    filter_division_errors,
    Error,
)


def test_ones_and_zeros(generic_event, generic_context):
    result = ones_and_zeros(generic_event, generic_context)
    assert len(result) > 5 and isinstance(result, list)
    assert all(n in [0, 1] for n in result)


def test_divide_by(generic_event, generic_context):
    numbers = ones_and_zeros(generic_event, generic_context)
    for n in numbers:
        if n == 0:
            with pytest.raises(ZeroDivisionError):
                divide_by(n, generic_context)
        else:
            assert divide_by(n, generic_context)


@given(n=infer)
def test_times_3(n: Union[int, float], generic_context):
    assume(not isnan(n))
    assert times_3(n, generic_context) == n * 3


@given(numbers=lists(floats(min_value=0)))
def test_sum_up(numbers: List[float], generic_context):
    assert sum_up(numbers, generic_context) == sum(numbers)


@given(event=infer)
def test_division_error_filter(event: Union[float, Error], generic_context):
    result = filter_division_errors(event, generic_context)
    if isinstance(event, float):
        assume(not isnan(event))
        assert result == event
    else:
        assert result == 0.0