Skip to content

Retry API Reference

concurry.core.retry.RetryConfig

Bases: Typed

Configuration for retry behavior.

This class encapsulates all retry-related settings for worker method calls.

Attributes:

Name Type Description
num_retries Union[conint(ge=0), _NO_ARG_TYPE]

Maximum number of retry attempts after initial failure. Total attempts = num_retries + 1 (initial attempt). Default value is determined by global_config.defaults.num_retries

retry_on Union[type, Callable, List[Union[type, Callable]]]

Exception types or callables that trigger retries. Can be a single exception class, a callable, or a list of either. Callables receive context as kwargs and should return bool. Default: [Exception] (retry on all exceptions).

retry_algorithm Union[RetryAlgorithm, _NO_ARG_TYPE]

Backoff strategy for wait times. Default value is determined by global_config.defaults.retry_algorithm

retry_wait Union[confloat(gt=0), _NO_ARG_TYPE]

Minimum wait time between retries in seconds. This is the base wait time before applying strategy and jitter. Default value is determined by global_config.defaults.retry_wait

retry_jitter Union[confloat(ge=0, le=1), _NO_ARG_TYPE]

Jitter factor between 0 and 1. Uses Full Jitter algorithm from AWS: sleep = random(0, calculated_wait). Set to 0 to disable jitter. Default value is determined by global_config.defaults.retry_jitter

retry_until Optional[Union[Callable, List[Callable]]]

Validation functions for output (default: None). Can be a single callable or list of callables. All must return True. Callables receive result and context as kwargs. If validation fails, triggers retry even without exception.

Example
from concurry import RetryConfig, RetryAlgorithm

# Basic exponential backoff
config = RetryConfig(
    num_retries=3,
    retry_algorithm=RetryAlgorithm.Exponential,
    retry_wait=1.0,
    retry_jitter=0.3
)

# Retry only on specific exceptions
config = RetryConfig(
    num_retries=5,
    retry_on=[ConnectionError, TimeoutError],
    retry_algorithm=RetryAlgorithm.Linear
)

# Custom exception filter
config = RetryConfig(
    num_retries=3,
    retry_on=lambda exception, **ctx: (
        isinstance(exception, ValueError) and "retry" in str(exception)
    )
)

# Output validation (e.g., for LLM responses)
config = RetryConfig(
    num_retries=5,
    retry_until=lambda result, **ctx: (
        isinstance(result, dict) and "data" in result
    )
)
Source code in src/concurry/core/retry.py
class RetryConfig(Typed):
    """Configuration for retry behavior.

    This class encapsulates all retry-related settings for worker method calls.

    Attributes:
        num_retries: Maximum number of retry attempts after initial failure.
            Total attempts = num_retries + 1 (initial attempt).
            Default value is determined by global_config.defaults.num_retries
        retry_on: Exception types or callables that trigger retries.
            Can be a single exception class, a callable, or a list of either.
            Callables receive context as kwargs and should return bool.
            Default: [Exception] (retry on all exceptions).
        retry_algorithm: Backoff strategy for wait times.
            Default value is determined by global_config.defaults.retry_algorithm
        retry_wait: Minimum wait time between retries in seconds.
            This is the base wait time before applying strategy and jitter.
            Default value is determined by global_config.defaults.retry_wait
        retry_jitter: Jitter factor between 0 and 1.
            Uses Full Jitter algorithm from AWS: sleep = random(0, calculated_wait).
            Set to 0 to disable jitter.
            Default value is determined by global_config.defaults.retry_jitter
        retry_until: Validation functions for output (default: None).
            Can be a single callable or list of callables. All must return True.
            Callables receive result and context as kwargs.
            If validation fails, triggers retry even without exception.

    Example:
        ```python
        from concurry import RetryConfig, RetryAlgorithm

        # Basic exponential backoff
        config = RetryConfig(
            num_retries=3,
            retry_algorithm=RetryAlgorithm.Exponential,
            retry_wait=1.0,
            retry_jitter=0.3
        )

        # Retry only on specific exceptions
        config = RetryConfig(
            num_retries=5,
            retry_on=[ConnectionError, TimeoutError],
            retry_algorithm=RetryAlgorithm.Linear
        )

        # Custom exception filter
        config = RetryConfig(
            num_retries=3,
            retry_on=lambda exception, **ctx: (
                isinstance(exception, ValueError) and "retry" in str(exception)
            )
        )

        # Output validation (e.g., for LLM responses)
        config = RetryConfig(
            num_retries=5,
            retry_until=lambda result, **ctx: (
                isinstance(result, dict) and "data" in result
            )
        )
        ```
    """

    num_retries: Union[conint(ge=0), _NO_ARG_TYPE] = _NO_ARG
    retry_on: Union[type, Callable, List[Union[type, Callable]]] = Field(default_factory=lambda: [Exception])
    retry_algorithm: Union[RetryAlgorithm, _NO_ARG_TYPE] = _NO_ARG
    retry_wait: Union[confloat(gt=0), _NO_ARG_TYPE] = _NO_ARG
    retry_jitter: Union[confloat(ge=0, le=1), _NO_ARG_TYPE] = _NO_ARG
    retry_until: Optional[Union[Callable, List[Callable]]] = None

    def post_initialize(self) -> None:
        """Set defaults from global config for _NO_ARG values."""
        from ..config import global_config

        # Clone config
        local_config = global_config.clone()
        defaults = local_config.defaults

        # Set defaults if not provided (use object.__setattr__ for frozen instances)
        if self.num_retries is _NO_ARG:
            object.__setattr__(self, "num_retries", defaults.num_retries)
        if self.retry_algorithm is _NO_ARG:
            object.__setattr__(self, "retry_algorithm", defaults.retry_algorithm)
        if self.retry_wait is _NO_ARG:
            object.__setattr__(self, "retry_wait", defaults.retry_wait)
        if self.retry_jitter is _NO_ARG:
            object.__setattr__(self, "retry_jitter", defaults.retry_jitter)

    @field_validator("num_retries")
    @classmethod
    def validate_num_retries(cls, v):
        """Validate num_retries is non-negative or _NO_ARG."""
        if v is _NO_ARG:
            return v
        if not isinstance(v, int) or v < 0:
            raise ValueError(f"num_retries must be >= 0, got {v}")
        return v

    @field_validator("retry_wait")
    @classmethod
    def validate_retry_wait(cls, v):
        """Validate retry_wait is positive or _NO_ARG."""
        if v is _NO_ARG:
            return v
        if not isinstance(v, (int, float)) or v <= 0:
            raise ValueError(f"retry_wait must be > 0, got {v}")
        return v

    @field_validator("retry_jitter")
    @classmethod
    def validate_retry_jitter(cls, v):
        """Validate retry_jitter is in [0, 1] or _NO_ARG."""
        if v is _NO_ARG:
            return v
        if not isinstance(v, (int, float)) or not (0 <= v <= 1):
            raise ValueError(f"retry_jitter must be in [0, 1], got {v}")
        return v

    @field_validator("retry_on")
    @classmethod
    def validate_retry_on(cls, v):
        """Ensure retry_on is a list of exception types or callables."""
        if not isinstance(v, list):
            v = [v]

        for item in v:
            if isinstance(item, type):
                if not issubclass(item, BaseException):
                    raise ValueError(
                        f"retry_on exception types must be subclasses of BaseException, got {item}"
                    )
            elif not callable(item):
                raise ValueError(f"retry_on items must be exception types or callables, got {type(item)}")

        return v

    @field_validator("retry_until")
    @classmethod
    def validate_retry_until(cls, v):
        """Ensure retry_until is a list of callables."""
        if v is None:
            return None

        if not isinstance(v, list):
            v = [v]

        for item in v:
            if not callable(item):
                raise ValueError(f"retry_until items must be callables, got {type(item)}")

        return v

post_initialize() -> None

Set defaults from global config for _NO_ARG values.

Source code in src/concurry/core/retry.py
def post_initialize(self) -> None:
    """Set defaults from global config for _NO_ARG values."""
    from ..config import global_config

    # Clone config
    local_config = global_config.clone()
    defaults = local_config.defaults

    # Set defaults if not provided (use object.__setattr__ for frozen instances)
    if self.num_retries is _NO_ARG:
        object.__setattr__(self, "num_retries", defaults.num_retries)
    if self.retry_algorithm is _NO_ARG:
        object.__setattr__(self, "retry_algorithm", defaults.retry_algorithm)
    if self.retry_wait is _NO_ARG:
        object.__setattr__(self, "retry_wait", defaults.retry_wait)
    if self.retry_jitter is _NO_ARG:
        object.__setattr__(self, "retry_jitter", defaults.retry_jitter)

validate_num_retries(v) classmethod

Validate num_retries is non-negative or _NO_ARG.

Source code in src/concurry/core/retry.py
@field_validator("num_retries")
@classmethod
def validate_num_retries(cls, v):
    """Validate num_retries is non-negative or _NO_ARG."""
    if v is _NO_ARG:
        return v
    if not isinstance(v, int) or v < 0:
        raise ValueError(f"num_retries must be >= 0, got {v}")
    return v

validate_retry_wait(v) classmethod

Validate retry_wait is positive or _NO_ARG.

Source code in src/concurry/core/retry.py
@field_validator("retry_wait")
@classmethod
def validate_retry_wait(cls, v):
    """Validate retry_wait is positive or _NO_ARG."""
    if v is _NO_ARG:
        return v
    if not isinstance(v, (int, float)) or v <= 0:
        raise ValueError(f"retry_wait must be > 0, got {v}")
    return v

validate_retry_jitter(v) classmethod

Validate retry_jitter is in [0, 1] or _NO_ARG.

Source code in src/concurry/core/retry.py
@field_validator("retry_jitter")
@classmethod
def validate_retry_jitter(cls, v):
    """Validate retry_jitter is in [0, 1] or _NO_ARG."""
    if v is _NO_ARG:
        return v
    if not isinstance(v, (int, float)) or not (0 <= v <= 1):
        raise ValueError(f"retry_jitter must be in [0, 1], got {v}")
    return v

validate_retry_on(v) classmethod

Ensure retry_on is a list of exception types or callables.

Source code in src/concurry/core/retry.py
@field_validator("retry_on")
@classmethod
def validate_retry_on(cls, v):
    """Ensure retry_on is a list of exception types or callables."""
    if not isinstance(v, list):
        v = [v]

    for item in v:
        if isinstance(item, type):
            if not issubclass(item, BaseException):
                raise ValueError(
                    f"retry_on exception types must be subclasses of BaseException, got {item}"
                )
        elif not callable(item):
            raise ValueError(f"retry_on items must be exception types or callables, got {type(item)}")

    return v

validate_retry_until(v) classmethod

Ensure retry_until is a list of callables.

Source code in src/concurry/core/retry.py
@field_validator("retry_until")
@classmethod
def validate_retry_until(cls, v):
    """Ensure retry_until is a list of callables."""
    if v is None:
        return None

    if not isinstance(v, list):
        v = [v]

    for item in v:
        if not callable(item):
            raise ValueError(f"retry_until items must be callables, got {type(item)}")

    return v

concurry.core.retry.RetryValidationError

Bases: Exception

Raised when retry_until validation fails after all retries.

This exception contains all results from retry attempts and validation failure reasons, useful for debugging why validation failed.

Attributes:

Name Type Description
attempts

Number of attempts made (including initial attempt)

all_results

List of all output values from each attempt

validation_errors

List of validation failure reasons for each attempt

method_name

Name of the method that was retried

Example
try:
    result = worker.generate_json(prompt).result()
except RetryValidationError as e:
    print(f"Failed after {e.attempts} attempts")
    print(f"All results: {e.all_results}")
    print(f"Errors: {e.validation_errors}")
    # Use the last result even though validation failed
    last_output = e.all_results[-1]
Source code in src/concurry/core/retry.py
class RetryValidationError(Exception):
    """Raised when retry_until validation fails after all retries.

    This exception contains all results from retry attempts and validation
    failure reasons, useful for debugging why validation failed.

    Attributes:
        attempts: Number of attempts made (including initial attempt)
        all_results: List of all output values from each attempt
        validation_errors: List of validation failure reasons for each attempt
        method_name: Name of the method that was retried

    Example:
        ```python
        try:
            result = worker.generate_json(prompt).result()
        except RetryValidationError as e:
            print(f"Failed after {e.attempts} attempts")
            print(f"All results: {e.all_results}")
            print(f"Errors: {e.validation_errors}")
            # Use the last result even though validation failed
            last_output = e.all_results[-1]
        ```
    """

    def __init__(
        self,
        attempts: int,
        all_results: List[Any],
        validation_errors: List[str],
        method_name: str = "unknown",
    ):
        self.attempts = attempts
        self.all_results = all_results
        self.validation_errors = validation_errors
        self.method_name = method_name

        # Create informative error message
        message = (
            f"Validation failed for method '{method_name}' after {attempts} attempts.\n"
            f"Validation errors: {validation_errors}\n"
            f"Results from all attempts: {all_results}"
        )
        super().__init__(message)

    def __reduce__(self):
        """Support pickling for multiprocessing."""
        return (
            self.__class__,
            (self.attempts, self.all_results, self.validation_errors, self.method_name),
        )

concurry.core.retry.calculate_retry_wait(attempt: int, config: RetryConfig) -> float

Calculate wait time for a retry attempt with strategy and jitter.

This function implements three backoff strategies and applies Full Jitter as described in the AWS blog post on exponential backoff: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

Parameters:

Name Type Description Default
attempt int

Retry attempt number (1-indexed, 1 = first retry)

required
config RetryConfig

Retry configuration with strategy, wait time, and jitter settings

required

Returns:

Type Description
float

Wait time in seconds (always >= 0)

Example
config = RetryConfig(
    retry_wait=1.0,
    retry_algorithm=RetryAlgorithm.Exponential,
    retry_jitter=0.3
)

wait = calculate_retry_wait(1, config)  # ~0-2 seconds
wait = calculate_retry_wait(2, config)  # ~0-4 seconds
wait = calculate_retry_wait(3, config)  # ~0-8 seconds
Source code in src/concurry/core/retry.py
def calculate_retry_wait(attempt: int, config: RetryConfig) -> float:
    """Calculate wait time for a retry attempt with strategy and jitter.

    This function implements three backoff strategies and applies Full Jitter
    as described in the AWS blog post on exponential backoff:
    https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

    Args:
        attempt: Retry attempt number (1-indexed, 1 = first retry)
        config: Retry configuration with strategy, wait time, and jitter settings

    Returns:
        Wait time in seconds (always >= 0)

    Example:
        ```python
        config = RetryConfig(
            retry_wait=1.0,
            retry_algorithm=RetryAlgorithm.Exponential,
            retry_jitter=0.3
        )

        wait = calculate_retry_wait(1, config)  # ~0-2 seconds
        wait = calculate_retry_wait(2, config)  # ~0-4 seconds
        wait = calculate_retry_wait(3, config)  # ~0-8 seconds
        ```
    """
    base_wait = config.retry_wait

    # Apply backoff strategy
    if config.retry_algorithm == RetryAlgorithm.Linear:
        calculated_wait = base_wait * attempt
    elif config.retry_algorithm == RetryAlgorithm.Exponential:
        calculated_wait = base_wait * (2 ** (attempt - 1))
    elif config.retry_algorithm == RetryAlgorithm.Fibonacci:
        calculated_wait = base_wait * _fibonacci(attempt)
    else:
        # Fallback to exponential
        calculated_wait = base_wait * (2 ** (attempt - 1))

    # Apply Full Jitter: sleep = random_between(0, calculated_wait)
    if config.retry_jitter > 0:
        wait = random.uniform(0, calculated_wait)
    else:
        wait = calculated_wait

    return max(0, wait)