API Reference¶
Welcome to the Concurry API reference documentation. This section provides detailed information about all classes, functions, and modules in the Concurry library.
Core Modules¶
Workers¶
Actor pattern implementation for stateful concurrent operations.
Key Classes:
- Worker
- Base class for creating stateful workers
- TaskWorker
- Concrete worker for submitting arbitrary tasks
Key Methods:
- Worker.options()
- Configure worker execution mode and options
- Worker.init()
- Initialize the worker
- TaskWorker.submit()
- Submit arbitrary functions for execution
- TaskWorker.map()
- Map function over iterables
Futures¶
Unified future interface for working with futures from any concurrency framework.
Key Classes:
- BaseFuture
- Abstract base class providing unified interface
- SyncFuture
- For immediately available results
- ConcurrentFuture
- Wraps concurrent.futures.Future
- AsyncioFuture
- Wraps asyncio.Future
- RayFuture
- Wraps Ray's ObjectRef
Key Functions:
- wrap_future()
- Automatically wrap any future-like object
Limits¶
Flexible resource protection and rate limiting with composable limit types.
Key Classes:
- Limit
- Abstract base class for all limits
- RateLimit
- Time-based rate limiting with multiple algorithms
- CallLimit
- Call counting (special case of RateLimit)
- ResourceLimit
- Semaphore-based resource limiting
- LimitSet
- Factory function for thread-safe limit executors
Key Methods:
- LimitSet.acquire()
- Acquire limits for execution
- Acquisition.update()
- Update usage for rate limits
Retry Mechanisms¶
Automatic retry with configurable strategies, exception filtering, and output validation.
Key Classes:
- RetryConfig
- Configuration for retry behavior
- RetryValidationError
- Raised when output validation fails after all retries
Key Functions:
- calculate_retry_wait()
- Calculate wait time for retry attempt
Progress Tracking¶
Beautiful, feature-rich progress bars with state tracking.
Key Classes:
- ProgressBar
- Feature-rich progress tracking with tqdm integration
Module Overview¶
The Concurry library is organized into focused modules:
concurry/
├── __init__.py # Main exports
├── core/
│ ├── future.py # Unified future interface
│ ├── config.py # Configuration enums
│ ├── retry.py # Retry mechanisms
│ ├── limit/ # Resource and rate limiting
│ │ ├── limit.py # Limit definitions
│ │ └── limit_set.py # LimitSet factory
│ └── worker/ # Worker pattern implementation
│ ├── base_worker.py # Worker base classes
│ ├── task_worker.py # TaskWorker implementation
│ ├── sync_worker.py # Synchronous worker
│ ├── thread_worker.py # Thread-based worker
│ ├── process_worker.py # Process-based worker
│ ├── asyncio_worker.py # Asyncio-based worker
│ ├── ray_worker.py # Ray-based worker
│ └── worker_pool.py # Worker pool implementation
├── executor.py # Executor function
└── utils/
└── progress.py # Progress bar implementation
Quick Reference¶
Import Patterns¶
# Import main classes
from concurry import Worker, TaskWorker, worker
# Import futures
from concurry import BaseFuture, wrap_future
# Import limits
from concurry import LimitSet, RateLimit, CallLimit, ResourceLimit
# Import retry
from concurry import RetryConfig, RetryValidationError, RetryAlgorithm
# Import config
from concurry import ExecutionMode, LoadBalancingAlgorithm, RateLimitAlgorithm
# Import progress
from concurry import ProgressBar
Common Usage Patterns¶
# Worker pattern
class MyWorker(Worker):
def process(self, x: int) -> int:
return x * 2
worker = MyWorker.options(mode="thread").init()
result = worker.process(10).result()
worker.stop()
# TaskWorker pattern
worker = TaskWorker.options(mode="process").init()
result = worker.submit(lambda x: x ** 2, 5).result()
worker.stop()
# Worker with retry
worker = MyWorker.options(
mode="thread",
num_retries=3,
retry_algorithm="exponential"
).init()
# Worker with limits
from concurry import RateLimit
worker = MyWorker.options(
mode="thread",
limits=[RateLimit(key="requests", window_seconds=60, capacity=100)]
).init()
# Worker pool
pool = MyWorker.options(
mode="thread",
max_workers=10,
load_balancing="round_robin"
).init()
Type Information¶
Concurry is fully typed and supports static type checking with mypy. All public APIs include comprehensive type annotations.
Type Hints¶
from typing import Any, Optional
from concurry import Worker, BaseFuture
class TypedWorker(Worker):
def __init__(self, value: int) -> None:
self.value = value
def process(self, x: int, multiplier: float = 1.0) -> float:
return (x + self.value) * multiplier
# Type-safe factory
def create_worker(mode: str = "thread") -> TypedWorker:
return TypedWorker.options(mode=mode).init(value=10)
# Type-safe result handling
def process_future(future: BaseFuture[int], timeout: Optional[float] = None) -> int:
return future.result(timeout=timeout)
Error Handling¶
All Concurry APIs use standard Python exceptions:
TimeoutError
- Future operation exceeds timeoutValueError
- Invalid arguments or configurationTypeError
- Type validation failuresRuntimeError
- Runtime errors (e.g., worker stopped)RetryValidationError
- Output validation failed after retries
Next Steps¶
- Browse the detailed API documentation for each module
- Check out the examples for practical usage patterns
- Review the user guide for comprehensive tutorials
- Explore Workers Guide for worker patterns
- Learn about Retry Mechanisms for fault tolerance