Retry Mechanisms¶
Concurry provides a comprehensive retry system for handling transient failures in worker method calls. The retry mechanism supports multiple backoff strategies, exception filtering, output validation, and seamless integration with all worker features.
Overview¶
The retry system allows you to automatically retry failed operations with:
- Multiple backoff strategies: Exponential, Linear, and Fibonacci with configurable jitter
- Exception filtering: Retry on specific exception types or using custom logic
- Output validation: Retry when output doesn't meet requirements (e.g., LLM response validation)
- Full context: Retry filters receive attempt number, elapsed time, and more
- Actor-side execution: Retries happen on the worker side for efficiency
- Automatic limit release: Resource limits are automatically released between retry attempts
Basic Usage¶
Simple Retry Configuration¶
Configure retries when creating a worker:
from concurry import Worker
class APIWorker(Worker):
def __init__(self, endpoint: str):
self.endpoint = endpoint
def fetch_data(self, id: int) -> dict:
# May raise ConnectionError, TimeoutError, etc.
response = requests.get(f"{self.endpoint}/{id}")
return response.json()
# Retry up to 3 times on any exception
worker = APIWorker.options(
mode="thread",
num_retries=3,
retry_algorithm="exponential",
retry_wait=1.0,
retry_jitter=0.3
).init(endpoint="https://api.example.com")
result = worker.fetch_data(123).result()
worker.stop()
Retry Parameters¶
All retry parameters are passed to Worker.options()
:
Parameter | Type | Default | Description |
---|---|---|---|
num_retries |
int | 0 | Maximum number of retry attempts after initial failure |
retry_on |
type | callable | list | [Exception] |
Exception types or filters that trigger retries |
retry_algorithm |
str | "exponential" | Backoff strategy: "exponential", "linear", "fibonacci" |
retry_wait |
float | 1.0 | Base wait time in seconds between retries |
retry_jitter |
float | 0.3 | Jitter factor (0-1) for randomizing wait times |
retry_until |
callable | list | None | Validation functions for output |
Retry Algorithms¶
Exponential Backoff (Default)¶
Doubles the wait time with each retry attempt:
worker = MyWorker.options(
mode="thread",
num_retries=5,
retry_algorithm="exponential",
retry_wait=1.0, # Base wait time
retry_jitter=0.3 # Add randomness
).init()
# Wait times (with jitter):
# Attempt 1: random(0, 2.0) seconds (base_wait * 2^0)
# Attempt 2: random(0, 4.0) seconds (base_wait * 2^1)
# Attempt 3: random(0, 8.0) seconds (base_wait * 2^2)
# Attempt 4: random(0, 16.0) seconds (base_wait * 2^3)
# Attempt 5: random(0, 32.0) seconds (base_wait * 2^4)
Best for: Network requests, API calls, distributed systems
Linear Backoff¶
Increases wait time linearly:
worker = MyWorker.options(
mode="thread",
num_retries=5,
retry_algorithm="linear",
retry_wait=2.0,
retry_jitter=0.3
).init()
# Wait times (with jitter):
# Attempt 1: random(0, 2.0) seconds (base_wait * 1)
# Attempt 2: random(0, 4.0) seconds (base_wait * 2)
# Attempt 3: random(0, 6.0) seconds (base_wait * 3)
# Attempt 4: random(0, 8.0) seconds (base_wait * 4)
# Attempt 5: random(0, 10.0) seconds (base_wait * 5)
Best for: Rate-limited APIs, predictable backoff patterns
Fibonacci Backoff¶
Wait times follow the Fibonacci sequence:
worker = MyWorker.options(
mode="thread",
num_retries=5,
retry_algorithm="fibonacci",
retry_wait=1.0,
retry_jitter=0.3
).init()
# Wait times (with jitter):
# Attempt 1: random(0, 1.0) seconds (base_wait * fib(1) = 1)
# Attempt 2: random(0, 1.0) seconds (base_wait * fib(2) = 1)
# Attempt 3: random(0, 2.0) seconds (base_wait * fib(3) = 2)
# Attempt 4: random(0, 3.0) seconds (base_wait * fib(4) = 3)
# Attempt 5: random(0, 5.0) seconds (base_wait * fib(5) = 5)
Best for: Balancing aggressive and conservative retry strategies
Full Jitter¶
All strategies use the "Full Jitter" algorithm from AWS:
This prevents thundering herd problems by randomizing retry timing. Set retry_jitter=0
to disable.
Exception Filtering¶
Retry on Specific Exceptions¶
Specify which exceptions should trigger retries:
class NetworkWorker(Worker):
def fetch(self, url: str) -> str:
# May raise ConnectionError, TimeoutError, HTTPError, etc.
return requests.get(url).text
# Only retry on network-related errors
worker = NetworkWorker.options(
mode="thread",
num_retries=3,
retry_on=[ConnectionError, TimeoutError]
).init()
# ConnectionError or TimeoutError → retry
# HTTPError or other exceptions → fail immediately
Custom Exception Filters¶
Use callables for complex exception filtering logic:
def should_retry_api_error(exception, **context):
"""Retry only on specific API error codes."""
if isinstance(exception, requests.HTTPError):
# Retry on 429 (rate limit) or 503 (service unavailable)
return exception.response.status_code in [429, 503]
return isinstance(exception, (ConnectionError, TimeoutError))
worker = APIWorker.options(
mode="thread",
num_retries=5,
retry_on=should_retry_api_error
).init()
Multiple Filters¶
Combine exception types and custom filters:
worker = MyWorker.options(
mode="thread",
num_retries=3,
retry_on=[
ConnectionError, # Always retry on connection errors
lambda exception, **ctx: (
isinstance(exception, ValueError) and "retry" in str(exception)
) # Retry ValueError only if message contains "retry"
]
).init()
Context in Filters¶
Exception filters receive rich context:
def smart_retry_filter(exception, attempt, elapsed_time, method_name, **kwargs):
"""Advanced retry logic using context."""
# Don't retry after 30 seconds
if elapsed_time > 30:
return False
# Give up after 3 attempts for certain errors
if isinstance(exception, ValueError) and attempt >= 3:
return False
# Always retry network errors
if isinstance(exception, ConnectionError):
return True
return False
worker = MyWorker.options(
mode="thread",
num_retries=5,
retry_on=smart_retry_filter
).init()
Available context:
- exception
: The exception that was raised
- attempt
: Current attempt number (1-indexed)
- elapsed_time
: Seconds since first attempt
- method_name
: Name of the method being called
- args
: Original positional arguments
- kwargs
: Original keyword arguments
Output Validation¶
Use retry_until
to retry when output doesn't meet requirements, even without exceptions.
Simple Validation¶
class LLMWorker(Worker):
def generate_json(self, prompt: str) -> dict:
"""Generate JSON from LLM (may return invalid JSON)."""
response = self.llm.generate(prompt)
return json.loads(response) # May fail or return wrong structure
def is_valid_json(result, **context):
"""Check if result has required fields."""
return isinstance(result, dict) and "data" in result and "status" in result
worker = LLMWorker.options(
mode="thread",
num_retries=5,
retry_until=is_valid_json # Retry until validation passes
).init()
# Will retry up to 5 times until result has required fields
result = worker.generate_json("Generate user data").result()
Multiple Validators¶
All validators must pass for the result to be accepted:
def has_required_fields(result, **ctx):
return "id" in result and "name" in result
def has_valid_values(result, **ctx):
return result.get("id", 0) > 0 and len(result.get("name", "")) > 0
worker = DataWorker.options(
mode="thread",
num_retries=3,
retry_until=[has_required_fields, has_valid_values]
).init()
Combining Exceptions and Validation¶
worker = LLMWorker.options(
mode="thread",
num_retries=5,
retry_on=[json.JSONDecodeError, KeyError], # Retry on parsing errors
retry_until=lambda result, **ctx: len(result.get("text", "")) > 100 # And until long enough
).init()
RetryValidationError¶
When validation fails after all retries, RetryValidationError
is raised:
from concurry import RetryValidationError
try:
result = worker.generate_json(prompt).result()
except RetryValidationError as e:
print(f"Failed after {e.attempts} attempts")
print(f"All results: {e.all_results}")
print(f"Validation errors: {e.validation_errors}")
# Use the last result even though validation failed
last_output = e.all_results[-1]
Integration with Workers¶
Custom Workers with Retry¶
Retries work with all worker types:
class DataProcessor(Worker):
def __init__(self, multiplier: int):
self.multiplier = multiplier
def process(self, value: int) -> int:
# May fail transiently
return self.fetch_and_multiply(value)
# Retry configuration applies to all methods
worker = DataProcessor.options(
mode="thread",
num_retries=3,
retry_algorithm="exponential"
).init(multiplier=10)
result = worker.process(5).result()
TaskWorker with Retry¶
TaskWorker.submit()
and TaskWorker.map()
support retries:
from concurry import TaskWorker
def flaky_function(x):
if random.random() < 0.5:
raise ConnectionError("Transient error")
return x * 2
# Configure retry for task submissions
worker = TaskWorker.options(
mode="process",
num_retries=3,
retry_on=[ConnectionError]
).init()
# Automatically retries on failure
future = worker.submit(flaky_function, 10)
result = future.result() # Will retry up to 3 times
# Works with map() too
results = list(worker.map(flaky_function, range(10)))
worker.stop()
Async Functions with Retry¶
Retries work seamlessly with async functions:
class AsyncAPIWorker(Worker):
async def fetch_data(self, url: str) -> dict:
"""Async method with retry."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
worker = AsyncAPIWorker.options(
mode="asyncio", # Use asyncio mode for best performance
num_retries=3,
retry_on=[aiohttp.ClientError]
).init()
result = worker.fetch_data("https://api.example.com/data").result()
worker.stop()
All Execution Modes¶
Retries work across all execution modes:
# Thread mode - good for I/O-bound with retries
worker = MyWorker.options(mode="thread", num_retries=3).init()
# Process mode - good for CPU-bound with retries
worker = MyWorker.options(mode="process", num_retries=3).init()
# Asyncio mode - best for async I/O with retries
worker = MyWorker.options(mode="asyncio", num_retries=3).init()
# Ray mode - distributed execution with retries
worker = MyWorker.options(mode="ray", num_retries=3).init()
Integration with Worker Pools¶
Retries work transparently with worker pools:
class APIWorker(Worker):
def fetch(self, id: int) -> dict:
return requests.get(f"https://api.example.com/{id}").json()
# Pool of 10 workers, each with retry configuration
pool = APIWorker.options(
mode="thread",
max_workers=10,
num_retries=3,
retry_algorithm="exponential",
retry_on=[ConnectionError, TimeoutError]
).init()
# Each request to the pool will retry on failure
futures = [pool.fetch(i) for i in range(100)]
results = [f.result() for f in futures]
pool.stop()
Key Points: - Each worker in the pool has the same retry configuration - Retries happen on the worker that received the request - Load balancing happens before retry logic (not during retries) - Pool statistics don't include retry attempts (only successful dispatches)
Integration with Limits¶
Retries automatically release and reacquire limits between attempts:
Resource Limits with Retry¶
from concurry import ResourceLimit
class DatabaseWorker(Worker):
def query(self, sql: str) -> list:
# Acquire connection from limit
with self.limits.acquire(requested={"connections": 1}) as acq:
result = execute_query(sql)
acq.update(usage={"connections": 1})
return result
worker = DatabaseWorker.options(
mode="thread",
num_retries=3,
retry_on=[DatabaseError],
limits=[ResourceLimit(key="connections", capacity=5)]
).init()
# If query fails, limit is automatically released before retry
result = worker.query("SELECT * FROM users").result()
How it works: 1. Limit is acquired before method execution 2. Method executes 3. If it fails and should retry: - Limit is automatically released - Wait for retry delay - Limit is reacquired for next attempt 4. If it succeeds or retries exhausted: - Limit is released normally
Rate Limits with Retry¶
from concurry import RateLimit
class APIWorker(Worker):
def call_api(self, endpoint: str) -> dict:
with self.limits.acquire(requested={"requests": 1}) as acq:
response = requests.get(f"{self.base_url}/{endpoint}")
acq.update(usage={"requests": 1})
return response.json()
worker = APIWorker.options(
mode="thread",
num_retries=5,
retry_algorithm="exponential",
limits=[RateLimit(key="requests", window_seconds=60, capacity=100)]
).init()
# Retries will respect rate limit (released between attempts)
Shared Limits with Retry¶
When using shared limits across a pool, retries automatically coordinate:
from concurry import LimitSet, ResourceLimit
# Create shared limit
shared_limits = LimitSet(
limits=[ResourceLimit(key="db_connections", capacity=10)],
shared=True,
mode="thread"
)
# Pool shares the limit
pool = DatabaseWorker.options(
mode="thread",
max_workers=20, # 20 workers share 10 connections
num_retries=3,
limits=shared_limits
).init()
# Each worker's retries properly release/acquire shared limits
Call Limits with Retry¶
from concurry import CallLimit
# Limit total concurrent calls per worker
worker = MyWorker.options(
mode="thread",
num_retries=3,
limits=[CallLimit(window_seconds=1, capacity=10)] # Max 10 calls/sec
).init()
# Retry attempts don't count against call limit (automatically managed)
Advanced Patterns¶
Retry with Context-Aware Validation¶
def validate_result(result, attempt, elapsed_time, **ctx):
"""Accept lower quality results after multiple attempts."""
if attempt <= 2:
# First 2 attempts: strict validation
return result.get("confidence", 0) > 0.9
else:
# Later attempts: relaxed validation
return result.get("confidence", 0) > 0.7
worker = MLWorker.options(
mode="thread",
num_retries=5,
retry_until=validate_result
).init()
Conditional Retry Based on Method Arguments¶
def should_retry_depending_on_args(exception, args, kwargs, **ctx):
"""Retry logic that considers the original arguments."""
# Don't retry for premium users (args[0] is user_id)
if "premium" in kwargs.get("user_type", ""):
return False
# Retry for standard users on network errors
return isinstance(exception, ConnectionError)
worker = UserDataWorker.options(
mode="thread",
num_retries=3,
retry_on=should_retry_depending_on_args
).init()
Exponential Backoff with Max Wait¶
def calculate_wait_with_cap(attempt, config):
"""Custom wait calculator with maximum."""
from concurry.core.retry import calculate_retry_wait
wait = calculate_retry_wait(attempt, config)
return min(wait, 30.0) # Cap at 30 seconds
# Use standard exponential but with your own cap logic
worker = MyWorker.options(
mode="thread",
num_retries=10,
retry_algorithm="exponential",
retry_wait=1.0
).init()
Retry with Fallback Values¶
from concurry import RetryValidationError
def fetch_with_fallback(worker, key):
"""Fetch data with automatic fallback on validation failure."""
try:
return worker.fetch(key).result()
except RetryValidationError as e:
# Use the best result from all attempts
return max(e.all_results, key=lambda r: r.get("score", 0))
worker = DataWorker.options(
mode="thread",
num_retries=3,
retry_until=lambda r, **ctx: r.get("score", 0) > 0.8
).init()
result = fetch_with_fallback(worker, "data_key")
Performance Considerations¶
Retry Overhead¶
- No overhead when disabled (
num_retries=0
, the default) - Minimal overhead on success (~microseconds for retry config check)
- Overhead on retry: Wait time + re-execution time
- Actor-side retries: No round-trip overhead between retries
Choosing Retry Parameters¶
# Fast-fail for non-critical operations
worker = MyWorker.options(
mode="thread",
num_retries=1,
retry_algorithm="linear",
retry_wait=0.1
).init()
# Aggressive retry for critical operations
worker = CriticalWorker.options(
mode="thread",
num_retries=10,
retry_algorithm="exponential",
retry_wait=1.0,
retry_jitter=0.5 # More randomness
).init()
Retry vs Circuit Breaker¶
Consider using a circuit breaker pattern for: - Cascading failures - Protecting downstream services - Fast failure when system is down
Retries are best for: - Transient network errors - Rate limiting - Eventually consistent operations
Best Practices¶
1. Be Specific with Exception Types¶
# ❌ Too broad - will retry on bugs
worker = MyWorker.options(
mode="thread",
num_retries=5,
retry_on=[Exception] # Catches everything
).init()
# ✅ Specific - only retries transient errors
worker = MyWorker.options(
mode="thread",
num_retries=5,
retry_on=[ConnectionError, TimeoutError, HTTPError]
).init()
2. Use Exponential Backoff for Network Calls¶
# ✅ Good for network operations
worker = APIWorker.options(
mode="thread",
num_retries=5,
retry_algorithm="exponential",
retry_wait=1.0
).init()
3. Set Reasonable Retry Limits¶
# ❌ Too many retries - wastes time
worker = MyWorker.options(num_retries=100).init()
# ✅ Reasonable for most use cases
worker = MyWorker.options(num_retries=3).init()
# ✅ More for critical operations
worker = CriticalWorker.options(num_retries=7).init()
4. Combine Retries with Timeouts¶
worker = APIWorker.options(
mode="thread",
num_retries=3,
retry_on=[TimeoutError]
).init()
# Set timeout when calling
future = worker.fetch_data(url)
try:
result = future.result(timeout=30) # Total timeout including retries
except TimeoutError:
print("Operation timed out after retries")
5. Log Retry Attempts¶
import logging
def retry_with_logging(exception, attempt, **ctx):
"""Log retry attempts for monitoring."""
logging.warning(
f"Retry attempt {attempt} for {ctx['method_name']}: {exception}"
)
return isinstance(exception, (ConnectionError, TimeoutError))
worker = MyWorker.options(
mode="thread",
num_retries=3,
retry_on=retry_with_logging
).init()
6. Test Retry Logic¶
import pytest
from concurry import RetryValidationError
def test_retry_on_transient_error():
"""Test that worker retries on transient errors."""
worker = MyWorker.options(
mode="sync", # Use sync for testing
num_retries=3,
retry_on=[ConnectionError]
).init()
# Should succeed after retries
result = worker.fetch_data().result()
assert result is not None
def test_retry_exhaustion():
"""Test that retries eventually give up."""
worker = MyWorker.options(
mode="sync",
num_retries=2,
retry_until=lambda r, **ctx: False # Always fails validation
).init()
with pytest.raises(RetryValidationError) as exc_info:
worker.process().result()
assert exc_info.value.attempts == 3 # Initial + 2 retries
API Reference¶
RetryConfig¶
Complete configuration for retry behavior (automatically created from Worker.options()
):
from concurry import RetryConfig, RetryAlgorithm
config = RetryConfig(
num_retries=3,
retry_on=[ConnectionError, TimeoutError],
retry_algorithm=RetryAlgorithm.Exponential,
retry_wait=1.0,
retry_jitter=0.3,
retry_until=lambda result, **ctx: result.get("status") == "ok"
)
RetryValidationError¶
Exception raised when output validation fails:
class RetryValidationError(Exception):
attempts: int # Number of attempts made
all_results: List[Any] # Results from all attempts
validation_errors: List[str] # Error messages from validators
method_name: str # Name of the method that was retried
Retry Algorithms¶
from concurry import RetryAlgorithm
RetryAlgorithm.Exponential # Default: 1, 2, 4, 8, 16, ...
RetryAlgorithm.Linear # 1, 2, 3, 4, 5, ...
RetryAlgorithm.Fibonacci # 1, 1, 2, 3, 5, 8, ...
See Also¶
- Workers Guide - Worker basics and configuration
- Worker Pools Guide - Pool management and load balancing
- Limits Guide - Resource and rate limiting
- Futures Guide - Working with futures and results