Worker Pools¶
Worker pools allow you to distribute work across multiple worker instances automatically. Instead of managing multiple workers manually, a pool provides a single interface that dispatches method calls to available workers using configurable load balancing strategies.
Overview¶
Worker pools provide:
- Automatic Load Balancing: Distribute work across workers using different algorithms
- Shared Resource Limits: Enforce rate limits and resource constraints across the entire pool
- On-Demand Workers: Create workers dynamically for bursty workloads
- Transparent API: Use pools exactly like single workers
- Pool Statistics: Monitor worker utilization and load distribution
Basic Usage¶
Creating a Pool¶
Create a worker pool by specifying max_workers
when calling .options()
:
from concurry import Worker
class DataProcessor(Worker):
def __init__(self, multiplier: int):
self.multiplier = multiplier
self.processed = 0
def process(self, value: int) -> int:
self.processed += 1
return value * self.multiplier
# Create a pool with 5 workers
pool = DataProcessor.options(
mode="thread",
max_workers=5
).init(multiplier=10)
# Use exactly like a single worker
future = pool.process(42)
result = future.result() # 420
# Check pool statistics
stats = pool.get_pool_stats()
print(f"Pool has {stats['total_workers']} workers")
pool.stop()
Context Manager (Recommended)¶
Pools support the context manager protocol for automatic cleanup of all workers:
# Context manager automatically stops all workers
with DataProcessor.options(
mode="thread",
max_workers=5
).init(multiplier=10) as pool:
future = pool.process(42)
result = future.result() # 420
# All 5 workers automatically stopped here
# Works with blocking mode
with DataProcessor.options(
mode="thread",
max_workers=5,
blocking=True
).init(multiplier=10) as pool:
results = [pool.process(i) for i in range(10)]
# Pool automatically stopped
# Cleanup happens even on exceptions
with DataProcessor.options(mode="thread", max_workers=3).init(multiplier=2) as pool:
if some_error:
raise ValueError("Error occurred")
# All workers still stopped despite exception
Benefits:
- ✅ Automatic cleanup of all workers - no need to remember .stop()
- ✅ Exception safe - all workers stopped even on errors
- ✅ Cleaner code - follows Python best practices
- ✅ Works with all pool types (thread, process, ray)
- ✅ Works with on-demand pools
Supported Modes¶
Different execution modes support different pool configurations:
Mode | Default max_workers | Supports Pools | Notes |
---|---|---|---|
sync |
1 (fixed) | ❌ No | Single-threaded execution only |
asyncio |
1 (fixed) | ❌ No | Single event loop only |
thread |
24 | ✅ Yes | Thread-based concurrency |
process |
4 | ✅ Yes | Process-based concurrency |
ray |
0 (unlimited) | ✅ Yes | Distributed execution |
# Thread pool - good for I/O-bound tasks
thread_pool = MyWorker.options(mode="thread", max_workers=10).init()
# Process pool - good for CPU-bound tasks
process_pool = MyWorker.options(mode="process", max_workers=4).init()
# Ray pool - good for distributed computing
import ray
ray.init()
ray_pool = MyWorker.options(
mode="ray",
max_workers=20,
actor_options={"num_cpus": 0.5} # Each worker uses 0.5 CPU
).init()
Load Balancing Algorithms¶
Worker pools use load balancing algorithms to decide which worker handles each request.
Round Robin (Default)¶
Distributes requests evenly in circular fashion. Simple and fair for homogeneous workers.
pool = MyWorker.options(
mode="thread",
max_workers=5,
load_balancing="round_robin" # or "rr"
).init()
# Calls go to: worker 0, 1, 2, 3, 4, 0, 1, 2, ...
for i in range(10):
pool.process(i)
Best for: - Workers with similar capabilities - Tasks with similar execution times - When simplicity is preferred
Least Active Load¶
Selects the worker with the fewest currently active (in-flight) requests. Adapts dynamically to worker load.
pool = MyWorker.options(
mode="thread",
max_workers=5,
load_balancing="active" # or "least_active"
).init()
# Always selects the worker with fewest active calls
# Good for tasks with varying execution times
Best for: - Tasks with variable execution times - Heterogeneous workers - Avoiding overloading slow workers
Least Total Load¶
Selects the worker with the fewest total calls over its lifetime. Ensures even distribution of total work.
pool = MyWorker.options(
mode="thread",
max_workers=5,
load_balancing="total" # or "least_total"
).init()
# Ensures all workers get equal number of tasks long-term
Best for: - Monitoring total work distribution - Ensuring even wear on workers - Tasks with similar execution times
Random¶
Randomly selects a worker for each request. Simple and effective for stateless workers.
pool = MyWorker.options(
mode="thread",
max_workers=5,
load_balancing="random" # or "rand"
).init()
# Each request goes to a random worker
# Default for on-demand pools
Best for: - Stateless workers - On-demand pools - High-throughput scenarios
Comparing Load Balancers¶
import time
from concurry import Worker
class SlowWorker(Worker):
def process(self, duration: float) -> str:
time.sleep(duration)
return f"Processed for {duration}s"
# With round-robin, all workers might be busy
rr_pool = SlowWorker.options(
mode="thread",
max_workers=3,
load_balancing="round_robin"
).init()
# With least-active, new calls go to idle workers
la_pool = SlowWorker.options(
mode="thread",
max_workers=3,
load_balancing="active"
).init()
# Submit mixed workload
for duration in [5.0, 0.1, 0.1, 0.1]: # 1 slow, 3 fast
rr_pool.process(duration) # May queue behind slow task
la_pool.process(duration) # Fast tasks avoid slow worker
rr_pool.stop()
la_pool.stop()
Resource Limits with Pools¶
Worker pools can enforce shared resource limits across all workers, ensuring the entire pool respects rate limits and resource constraints.
Shared Rate Limiting¶
from concurry import Worker, CallLimit, RateLimit
class APIWorker(Worker):
def __init__(self, api_key: str):
self.api_key = api_key
def call_api(self, prompt: str) -> str:
# Automatically rate-limited across all workers in pool
with self.limits.acquire(requested={"tokens": 100}):
response = external_api_call(prompt)
return response
# Create pool with shared limits
# All 10 workers share the same 1000 tokens/min budget
pool = APIWorker.options(
mode="thread",
max_workers=10,
limits=[
CallLimit(window_seconds=60, capacity=100), # 100 calls/min
RateLimit(
key="tokens",
window_seconds=60,
capacity=1000 # 1000 tokens/min shared across pool
)
]
).init(api_key="my-key")
# All workers share the limit pool
futures = [pool.call_api(f"Request {i}") for i in range(200)]
results = [f.result() for f in futures]
pool.stop()
Resource Pooling¶
from concurry import Worker, ResourceLimit
class DatabaseWorker(Worker):
def __init__(self, db_config: dict):
self.db_config = db_config
def query(self, sql: str) -> list:
# Limit concurrent database connections across all workers
with self.limits.acquire(requested={"connections": 1}):
return execute_query(sql)
# Pool of 20 workers sharing 5 database connections
pool = DatabaseWorker.options(
mode="thread",
max_workers=20,
limits=[
ResourceLimit(key="connections", capacity=5)
]
).init(db_config={...})
# Even with 20 workers, only 5 queries run concurrently
Per-Worker vs Shared Limits¶
from concurry import Worker, LimitSet, CallLimit
# Per-worker limits: Each worker has its own 10 calls/sec
# Total pool capacity: 50 calls/sec (5 workers × 10)
pool1 = MyWorker.options(
mode="thread",
max_workers=5,
limits=[CallLimit(window_seconds=1, capacity=10)] # List creates shared LimitSet
).init()
# Pre-create a shared LimitSet for explicit sharing
shared_limits = LimitSet(
limits=[CallLimit(window_seconds=1, capacity=10)],
shared=True,
mode="thread"
)
# Shared limits: All workers share 10 calls/sec
# Total pool capacity: 10 calls/sec (shared across all workers)
pool2 = MyWorker.options(
mode="thread",
max_workers=5,
limits=shared_limits # Pass LimitSet instance
).init()
On-Demand Workers¶
On-demand pools create workers dynamically for each request and destroy them after completion. Useful for bursty workloads or resource-constrained environments.
Basic On-Demand Pool¶
from concurry import Worker
class BatchProcessor(Worker):
def process_batch(self, data: list) -> dict:
# Heavy processing
return {"processed": len(data), "result": sum(data)}
# Create on-demand pool
pool = BatchProcessor.options(
mode="thread",
on_demand=True,
max_workers=0 # Unlimited (up to cpu_count()-1 for threads)
).init()
# Each call creates a new worker
future1 = pool.process_batch([1, 2, 3])
future2 = pool.process_batch([4, 5, 6])
# Workers are automatically cleaned up after results are retrieved
result1 = future1.result()
result2 = future2.result()
pool.stop()
On-Demand with Limits¶
# Limit concurrent on-demand workers
pool = MyWorker.options(
mode="ray",
on_demand=True,
max_workers=10 # Max 10 concurrent on-demand workers
).init()
# On-demand pools use 'random' load balancing by default
stats = pool.get_pool_stats()
print(stats["load_balancer"]["algorithm"]) # "Random"
When to Use On-Demand¶
Use on-demand for: - Bursty workloads with idle periods - Resource-constrained environments - Cold-start is acceptable - Workers hold significant memory
Use persistent pools for: - Steady workload - Warm start is important - Low per-request overhead needed - Workers are lightweight
Worker Composition¶
You can use workers and pools inside other workers, enabling powerful composition patterns.
Pool Inside Worker¶
from concurry import Worker
class ComputeWorker(Worker):
"""Worker that does heavy computation."""
def compute(self, x: int) -> int:
return x ** 2
class CoordinatorWorker(Worker):
"""Coordinator that manages a pool of compute workers."""
def __init__(self):
# Create internal pool
self.compute_pool = ComputeWorker.options(
mode="process", # CPU-bound
max_workers=4
).init()
def process_batch(self, values: list) -> list:
# Distribute work across internal pool
futures = [self.compute_pool.compute(x) for x in values]
return [f.result() for f in futures]
def __del__(self):
# Cleanup internal pool
if hasattr(self, 'compute_pool'):
self.compute_pool.stop()
# Use coordinator in thread mode
coordinator = CoordinatorWorker.options(mode="thread").init()
results = coordinator.process_batch([1, 2, 3, 4, 5]).result()
print(results) # [1, 4, 9, 16, 25]
coordinator.stop()
Pipeline with Multiple Pools¶
from concurry import Worker
class Fetcher(Worker):
"""Fetch data from external sources."""
def fetch(self, url: str) -> bytes:
return download(url)
class Processor(Worker):
"""Process fetched data."""
def process(self, data: bytes) -> dict:
return parse_and_transform(data)
class Storer(Worker):
"""Store processed data."""
def store(self, data: dict) -> str:
return save_to_database(data)
# Create pipeline with three pools
fetcher_pool = Fetcher.options(mode="thread", max_workers=10).init()
processor_pool = Processor.options(mode="process", max_workers=4).init()
storer_pool = Storer.options(mode="thread", max_workers=5).init()
# Process pipeline with automatic future unwrapping
urls = ["http://example.com/1", "http://example.com/2"]
for url in urls:
# Chain workers - futures are automatically unwrapped
fetched = fetcher_pool.fetch(url)
processed = processor_pool.process(fetched) # Auto-unwraps future
stored = storer_pool.store(processed) # Auto-unwraps future
print(f"Stored: {stored.result()}")
# Cleanup
fetcher_pool.stop()
processor_pool.stop()
storer_pool.stop()
Nested Pools with Ray¶
from concurry import Worker
import ray
ray.init()
class LeafWorker(Worker):
"""Leaf worker that does actual work."""
def work(self, x: int) -> int:
return x * 2
class BranchWorker(Worker):
"""Branch worker that manages leaf workers."""
def __init__(self):
# Each branch manages its own leaf pool
self.leaf_pool = LeafWorker.options(
mode="ray",
max_workers=5,
actor_options={"num_cpus": 0.1}
).init()
def process_group(self, values: list) -> list:
futures = [self.leaf_pool.work(x) for x in values]
return [f.result() for f in futures]
# Create pool of branch workers (each with internal leaf pool)
branch_pool = BranchWorker.options(
mode="ray",
max_workers=3,
actor_options={"num_cpus": 0.2}
).init()
# Distribute work across branch pool
# Each branch distributes to its leaf pool
result = branch_pool.process_group([1, 2, 3, 4, 5]).result()
print(result) # [2, 4, 6, 8, 10]
branch_pool.stop()
Handling Exceptions¶
Worker pools handle exceptions gracefully, allowing you to catch and handle errors from any worker in the pool.
Basic Exception Handling¶
from concurry import Worker
class RiskyWorker(Worker):
def risky_operation(self, value: int) -> int:
if value < 0:
raise ValueError(f"Negative value not allowed: {value}")
return value * 2
pool = RiskyWorker.options(mode="thread", max_workers=5).init()
# Submit mixed good/bad values
values = [1, 2, -3, 4, -5]
futures = [pool.risky_operation(v) for v in values]
# Handle each result
for i, future in enumerate(futures):
try:
result = future.result()
print(f"Success: {values[i]} -> {result}")
except ValueError as e:
print(f"Error: {values[i]} -> {e}")
pool.stop()
Partial Failure Handling¶
from concurry import Worker
from concurrent.futures import TimeoutError
class UnreliableWorker(Worker):
def process(self, item: dict) -> dict:
if item.get("fail"):
raise RuntimeError("Processing failed")
return {"result": item["value"] * 2}
pool = UnreliableWorker.options(
mode="process",
max_workers=4
).init()
# Process batch with some failures
items = [
{"value": 1},
{"value": 2, "fail": True}, # This will fail
{"value": 3},
{"value": 4, "fail": True}, # This will fail
{"value": 5},
]
futures = [pool.process(item) for item in items]
# Collect results, handling failures
results = []
errors = []
for i, future in enumerate(futures):
try:
result = future.result(timeout=5)
results.append(result)
except RuntimeError as e:
errors.append((i, str(e)))
except TimeoutError:
errors.append((i, "Timeout"))
print(f"Successful: {len(results)}")
print(f"Failed: {len(errors)}")
pool.stop()
Retry Logic with Pools¶
from concurry import Worker
import random
class RetryableWorker(Worker):
def flaky_operation(self, data: str) -> str:
if random.random() < 0.3: # 30% failure rate
raise ConnectionError("Temporary failure")
return data.upper()
pool = RetryableWorker.options(mode="thread", max_workers=5).init()
def process_with_retry(item: str, max_retries: int = 3) -> str:
"""Process item with automatic retries."""
for attempt in range(max_retries):
try:
future = pool.flaky_operation(item)
return future.result(timeout=5)
except ConnectionError as e:
if attempt == max_retries - 1:
raise # Last attempt, give up
print(f"Attempt {attempt + 1} failed, retrying...")
# Process with retries
items = ["hello", "world", "foo", "bar"]
results = [process_with_retry(item) for item in items]
print(results)
pool.stop()
Long-Running Tasks¶
Worker pools can handle long-running tasks efficiently with proper timeout and cancellation handling.
Timeout Handling¶
from concurry import Worker
from concurrent.futures import TimeoutError
import time
class SlowWorker(Worker):
def slow_task(self, duration: float) -> str:
time.sleep(duration)
return f"Completed after {duration}s"
pool = SlowWorker.options(mode="thread", max_workers=3).init()
# Submit mix of fast and slow tasks
tasks = [
pool.slow_task(0.5),
pool.slow_task(10.0), # This will timeout
pool.slow_task(0.5),
]
# Get results with timeout
for i, future in enumerate(tasks):
try:
result = future.result(timeout=2.0) # 2 second timeout
print(f"Task {i}: {result}")
except TimeoutError:
print(f"Task {i}: Timed out (worker continues in background)")
pool.stop(timeout=15) # Wait for workers to finish
Progress Tracking¶
from concurry import Worker, ProgressBar
import time
class ProgressWorker(Worker):
def process_items(self, items: list) -> list:
results = []
# Track progress across workers
for item in ProgressBar(items, desc="Processing", style="ray"):
time.sleep(0.1) # Simulate work
results.append(item * 2)
return results
pool = ProgressWorker.options(
mode="ray",
max_workers=4,
actor_options={"num_cpus": 0.25}
).init()
# Submit multiple batches (each tracked separately)
batches = [list(range(10)) for _ in range(4)]
futures = [pool.process_items(batch) for batch in batches]
# Wait for all to complete
results = [f.result() for f in futures]
pool.stop()
Graceful Shutdown¶
from concurry import Worker
import time
import signal
class GracefulWorker(Worker):
def __init__(self):
self.should_stop = False
def long_task(self, items: list) -> list:
results = []
for item in items:
if self.should_stop:
break
time.sleep(0.5)
results.append(item * 2)
return results
def shutdown(self):
self.should_stop = True
pool = GracefulWorker.options(mode="thread", max_workers=5).init()
# Submit long-running tasks
futures = [pool.long_task(list(range(100))) for _ in range(5)]
# Setup signal handler for graceful shutdown
def signal_handler(sig, frame):
print("Shutting down gracefully...")
# Tell workers to stop
for _ in range(5): # For each worker
pool.shutdown()
# Wait for completion
pool.stop(timeout=10)
print("Shutdown complete")
signal.signal(signal.SIGINT, signal_handler)
# Wait for results or interrupt
try:
results = [f.result() for f in futures]
except KeyboardInterrupt:
pass
Pool Statistics and Monitoring¶
Worker pools provide detailed statistics for monitoring performance and debugging.
Basic Statistics¶
from concurry import Worker
class MonitoredWorker(Worker):
def process(self, x: int) -> int:
return x * 2
pool = MonitoredWorker.options(
mode="thread",
max_workers=5,
load_balancing="active"
).init()
# Submit some work
futures = [pool.process(i) for i in range(100)]
results = [f.result() for f in futures]
# Get pool statistics
stats = pool.get_pool_stats()
print(f"Total workers: {stats['total_workers']}")
print(f"Max workers: {stats['max_workers']}")
print(f"On-demand: {stats['on_demand']}")
print(f"Stopped: {stats['stopped']}")
# Load balancer statistics
lb_stats = stats['load_balancer']
print(f"Algorithm: {lb_stats['algorithm']}")
print(f"Total dispatched: {lb_stats['total_dispatched']}")
if lb_stats['algorithm'] == 'LeastActiveLoad':
print(f"Active calls: {lb_stats['active_calls']}")
print(f"Total active: {lb_stats['total_active']}")
pool.stop()
Monitoring Load Distribution¶
from concurry import Worker
import time
class StatefulWorker(Worker):
def __init__(self):
self.processed_count = 0
def process(self, x: int) -> int:
self.processed_count += 1
time.sleep(0.01)
return x * 2
def get_count(self) -> int:
return self.processed_count
# Create pool with different algorithms
for algorithm in ["round_robin", "active", "total", "random"]:
pool = StatefulWorker.options(
mode="thread",
max_workers=3,
load_balancing=algorithm
).init()
# Submit work
futures = [pool.process(i) for i in range(30)]
results = [f.result() for f in futures]
# Check statistics
stats = pool.get_pool_stats()
print(f"\nAlgorithm: {algorithm}")
print(f"Total dispatched: {stats['load_balancer']['total_dispatched']}")
if algorithm == "total":
total_calls = stats['load_balancer']['total_calls']
print(f"Per-worker calls: {total_calls}")
pool.stop()
Custom Metrics¶
from concurry import Worker
import time
from collections import defaultdict
class MetricsWorker(Worker):
def __init__(self):
self.metrics = defaultdict(int)
self.start_time = time.time()
def process(self, task_type: str, data: any) -> any:
start = time.time()
# Process based on type
if task_type == "fast":
result = data * 2
elif task_type == "slow":
time.sleep(0.1)
result = data ** 2
else:
result = None
# Record metrics
duration = time.time() - start
self.metrics[f"{task_type}_count"] += 1
self.metrics[f"{task_type}_total_time"] += duration
return result
def get_metrics(self) -> dict:
uptime = time.time() - self.start_time
return {
"metrics": dict(self.metrics),
"uptime": uptime
}
pool = MetricsWorker.options(mode="thread", max_workers=3).init()
# Submit mixed workload
tasks = [("fast", i) for i in range(50)] + [("slow", i) for i in range(10)]
futures = [pool.process(task_type, data) for task_type, data in tasks]
results = [f.result() for f in futures]
# Aggregate metrics from all workers (not directly accessible in pool)
# Pool stats give load balancer info, individual worker metrics require
# special handling or aggregation logic
pool.stop()
Model Inheritance with Pools¶
Worker pools support the same model inheritance and validation features as single workers. All the patterns from the Workers guide apply to pools as well.
Typed/BaseModel Pools (Not Ray-Compatible)¶
from concurry import Worker
from morphic import Typed
from pydantic import BaseModel, Field
from typing import List
# Typed worker pool (works with thread, process, asyncio)
class TypedWorker(Worker, Typed):
name: str
multiplier: int = Field(default=2, ge=1)
def process(self, x: int) -> int:
return x * self.multiplier
# ✅ Works with thread, process, asyncio
pool = TypedWorker.options(
mode="thread",
max_workers=5
).init(name="processor", multiplier=3)
# All workers in pool share the same validated configuration
futures = [pool.process(i) for i in range(10)]
results = [f.result() for f in futures]
print(results) # [0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
pool.stop()
# ❌ Does NOT work with Ray
try:
pool = TypedWorker.options(
mode="ray",
max_workers=5
).init(name="processor", multiplier=3)
except ValueError as e:
print("Ray mode not supported with Typed workers")
# ValueError: Cannot create Ray worker with Pydantic-based class
Validation Decorators with Ray Pools¶
Use @validate
or @validate_call
decorators for Ray-compatible validation:
from concurry import Worker
from morphic import validate
from pydantic import validate_call
# Option 1: @validate decorator (Ray-compatible)
class ValidatedWorker(Worker):
@validate
def __init__(self, multiplier: int = 2):
self.multiplier = multiplier
@validate
def process(self, x: int, offset: float = 0.0) -> float:
return (x * self.multiplier) + offset
# ✅ Works with Ray!
pool = ValidatedWorker.options(
mode="ray",
max_workers=10
).init(multiplier="5") # String coerced to int
# Strings are coerced for all method calls
futures = [pool.process(str(i), offset=str(i * 0.5)) for i in range(5)]
results = [f.result() for f in futures]
print(results) # [0.0, 5.5, 11.0, 16.5, 22.0]
pool.stop()
# Option 2: @validate_call decorator (Ray-compatible)
class PydanticValidatedWorker(Worker):
@validate_call
def __init__(self, base: int):
self.base = base
self.call_count = 0
@validate_call
def compute(self, x: int, y: int = 0) -> int:
self.call_count += 1
return (x + y) * self.base
# ✅ Also works with Ray!
pool = PydanticValidatedWorker.options(
mode="ray",
max_workers=10
).init(base=3)
futures = [pool.compute("10", y=str(i)) for i in range(5)]
results = [f.result() for f in futures]
print(results) # [30, 33, 36, 39, 42]
pool.stop()
Pool-Specific Considerations¶
State Isolation: Each worker in a pool maintains its own state, even with validation:
from morphic import validate
class StatefulWorker(Worker):
@validate
def __init__(self, multiplier: int):
self.multiplier = multiplier
self.count = 0
@validate
def process(self, x: int) -> dict:
self.count += 1
return {"result": x * self.multiplier, "count": self.count}
# Create pool of 3 workers
pool = StatefulWorker.options(
mode="thread",
max_workers=3,
load_balancing="round_robin"
).init(multiplier=2)
# Each worker maintains separate count
results = [pool.process(10).result() for _ in range(9)]
# With round-robin, each worker processes 3 times
# results[0], [3], [6]: worker 0 (count: 1, 2, 3)
# results[1], [4], [7]: worker 1 (count: 1, 2, 3)
# results[2], [5], [8]: worker 2 (count: 1, 2, 3)
for r in results:
print(r) # {'result': 20, 'count': 1|2|3}
pool.stop()
Shared Limits with Validated Workers:
from concurry import Worker, RateLimit
from morphic import validate
class APIWorker(Worker):
@validate
def __init__(self, api_key: str):
self.api_key = api_key
@validate
def call_api(self, endpoint: str, tokens: int = 100) -> dict:
# Use limits.acquire() to enforce rate limits
with self.limits.acquire(requested={"tokens": tokens}) as acq:
response = {"endpoint": endpoint, "tokens": tokens}
acq.update(usage={"tokens": tokens})
return response
# Pool of 10 workers sharing 1000 tokens/min
pool = APIWorker.options(
mode="thread",
max_workers=10,
limits=[
RateLimit(key="tokens", window_seconds=60, capacity=1000)
]
).init(api_key="my-key")
# All workers share the token budget
futures = [pool.call_api("/users", tokens=100) for _ in range(20)]
# Only 10 complete immediately, rest wait for token refresh
results = [f.result() for f in futures]
pool.stop()
Ray Pool Compatibility Summary¶
Worker Type | Thread Pool | Process Pool | Asyncio Pool | Ray Pool |
---|---|---|---|---|
Plain Worker | ✅ | ✅ | ✅ | ✅ |
Worker + Typed | ✅ | ✅ | ✅ | ❌ |
Worker + BaseModel | ✅ | ✅ | ✅ | ❌ |
Worker + @validate | ✅ | ✅ | ✅ | ✅ |
Worker + @validate_call | ✅ | ✅ | ✅ | ✅ |
For Ray pools: - ✅ Use plain Worker classes - ✅ Use @validate or @validate_call decorators for validation - ❌ Don't inherit from Typed or BaseModel
Example: Ray Pool with Validation
import ray
from concurry import Worker
from morphic import validate
ray.init()
class DistributedWorker(Worker):
"""Ray-compatible worker with validation."""
@validate
def __init__(self, model_name: str, batch_size: int = 32):
self.model_name = model_name
self.batch_size = batch_size
# Load model, etc.
@validate
def predict(self, data: list, threshold: float = 0.5) -> list:
# Process batch with validation
return [x * threshold for x in data]
# Create Ray pool with validated workers
pool = DistributedWorker.options(
mode="ray",
max_workers=20,
actor_options={"num_cpus": 0.5}
).init(model_name="bert-base", batch_size="64") # Coerced to int
# Distribute work across Ray cluster
futures = [
pool.predict([1.0, 2.0, 3.0], threshold=str(0.8 + i*0.1)) # Strings coerced
for i in range(10)
]
results = [f.result() for f in futures]
pool.stop()
ray.shutdown()
When to Use Each Approach¶
For Non-Ray Pools (thread, process, asyncio):
Use Typed/BaseModel when: - You want full model validation and lifecycle hooks - You need immutable configuration - You want the richest feature set
Use @validate/@validate_call when: - You want flexibility to switch to Ray later - You only need validation on specific methods - You prefer decorator-based validation
For Ray Pools:
Use @validate decorator when: - You want morphic's validation style - You need type coercion (strings → numbers) - You want minimal overhead
Use @validate_call decorator when: - You want Pydantic's validation features - You need Field constraints - You prefer strict validation
Use plain Worker when: - You don't need validation - You want maximum performance
Retry Mechanisms with Pools¶
Worker pools fully support retry configuration, with each worker in the pool using the same retry settings.
Basic Pool with Retry¶
from concurry import Worker
class APIWorker(Worker):
def fetch(self, id: int) -> dict:
return requests.get(f"https://api.example.com/{id}").json()
# Pool of 10 workers, each with retry configuration
pool = APIWorker.options(
mode="thread",
max_workers=10,
num_retries=3,
retry_algorithm="exponential",
retry_on=[ConnectionError, TimeoutError]
).init()
# Each request to the pool will retry on failure
futures = [pool.fetch(i) for i in range(100)]
results = [f.result() for f in futures]
pool.stop()
How Retries Work in Pools¶
Key behaviors:
- Per-Worker Configuration: Each worker in the pool has the same retry configuration
- Worker-Side Retries: Retries happen on the worker that received the request
- Load Balancing Before Retry: Load balancer selects a worker once; retries stay on that worker
- No Retry Statistics: Pool statistics track successful dispatches, not retry attempts
# Example: Pool with 5 workers and retries
pool = MyWorker.options(
mode="thread",
max_workers=5,
num_retries=3,
load_balancing="round_robin"
).init()
# Request goes to worker 0, retries happen on worker 0
future1 = pool.process(1)
# Request goes to worker 1, retries happen on worker 1
future2 = pool.process(2)
Pool with Shared Limits and Retry¶
Retries automatically coordinate with shared limits:
from concurry import LimitSet, ResourceLimit
# Create shared limit
shared_limits = LimitSet(
limits=[ResourceLimit(key="db_connections", capacity=10)],
shared=True,
mode="thread"
)
# Pool shares the limit across all workers
pool = DatabaseWorker.options(
mode="thread",
max_workers=20, # 20 workers share 10 connections
num_retries=3,
retry_on=[DatabaseError],
limits=shared_limits
).init()
# Each worker's retries properly release/acquire shared limits
# No deadlocks - limits are released between retry attempts
On-Demand Pools with Retry¶
On-demand pools create and destroy workers dynamically, with retry configuration:
pool = MyWorker.options(
mode="thread",
on_demand=True,
max_workers=10,
num_retries=3,
retry_algorithm="exponential"
).init()
# Each on-demand worker is created with retry configuration
future = pool.process(data)
result = future.result() # Worker retries if needed, then is destroyed
pool.stop()
TaskWorker Pools with Retry¶
from concurry import TaskWorker
def flaky_function(x):
if random.random() < 0.5:
raise ConnectionError("Transient error")
return x * 2
# Pool of task workers with retry
pool = TaskWorker.options(
mode="process",
max_workers=4,
num_retries=3,
retry_on=[ConnectionError]
).init()
# Each submit/map call can retry
results = list(pool.map(flaky_function, range(100)))
pool.stop()
Retry with Different Load Balancing¶
Retry behavior is independent of load balancing:
# Least Active Load with Retry
pool = MyWorker.options(
mode="thread",
max_workers=10,
load_balancing="active", # Routes to least busy worker
num_retries=3 # Each worker retries its own tasks
).init()
# If a worker receives a task and fails:
# - It retries locally (doesn't re-dispatch to a different worker)
# - Load balancer only selects worker for initial dispatch
Best Practices for Pool Retries¶
1. Use Retries for Transient Errors
# ✅ Good: Retry on network errors
pool = APIWorker.options(
mode="thread",
max_workers=10,
num_retries=3,
retry_on=[ConnectionError, TimeoutError]
).init()
# ❌ Bad: Retry on all exceptions (including bugs)
pool = APIWorker.options(
mode="thread",
max_workers=10,
num_retries=3,
retry_on=[Exception] # Too broad
).init()
2. Consider Pool Size vs Retry Count
# For high-availability: More workers, fewer retries
pool = MyWorker.options(
mode="thread",
max_workers=20, # More workers available
num_retries=2 # Quick failover
).init()
# For resource-constrained: Fewer workers, more retries
pool = MyWorker.options(
mode="process",
max_workers=4, # Limited workers
num_retries=5 # More retries per worker
).init()
3. Combine with Shared Limits
# Ensure fair resource distribution across pool
from concurry import RateLimit
pool = APIWorker.options(
mode="thread",
max_workers=10,
num_retries=3,
limits=[RateLimit(key="requests", window_seconds=60, capacity=100)]
).init()
# All workers share 100 requests/min budget
# Retries count against the budget but are auto-managed
4. Monitor Retry Behavior
import logging
def retry_logger(exception, attempt, worker_class, **ctx):
logging.warning(
f"Worker {worker_class} retry {attempt}: {exception}"
)
return isinstance(exception, (ConnectionError, TimeoutError))
pool = MyWorker.options(
mode="thread",
max_workers=5,
num_retries=3,
retry_on=retry_logger
).init()
For comprehensive retry documentation, see the Retry Mechanisms Guide.
Best Practices¶
Choosing Pool Size¶
import multiprocessing as mp
# For CPU-bound tasks (process mode)
cpu_pool_size = mp.cpu_count()
# For I/O-bound tasks (thread mode)
io_pool_size = mp.cpu_count() * 4 # Or higher
# For Ray distributed tasks
ray_pool_size = 100 # Based on cluster size
pool = MyWorker.options(
mode="process",
max_workers=cpu_pool_size
).init()
Initialization Costs¶
from concurry import Worker
class ExpensiveInitWorker(Worker):
def __init__(self, model_path: str):
# Expensive: Load ML model
self.model = load_model(model_path)
def predict(self, data: list) -> list:
return self.model.predict(data)
# Use persistent pool - initialization happens once per worker
pool = ExpensiveInitWorker.options(
mode="process",
max_workers=4 # Init 4 times total
).init(model_path="/path/to/model")
# DON'T use on-demand for expensive init
# Each call would reload the model!
Resource Cleanup¶
from concurry import Worker
import contextlib
class ResourceWorker(Worker):
def __init__(self):
self.connection = create_connection()
def process(self, data: any) -> any:
return self.connection.query(data)
def __del__(self):
# Cleanup connection
if hasattr(self, 'connection'):
self.connection.close()
# Use context manager for automatic cleanup
with contextlib.closing(
ResourceWorker.options(mode="thread", max_workers=5).init()
) as pool:
results = [pool.process(i).result() for i in range(10)]
# Pool automatically stopped and resources cleaned
Error Isolation¶
from concurry import Worker
# Bad: Shared mutable state
bad_shared = {"counter": 0}
class BadWorker(Worker):
def process(self, x: int) -> int:
# Race condition!
bad_shared["counter"] += x
return bad_shared["counter"]
# Good: Worker-local state
class GoodWorker(Worker):
def __init__(self):
self.counter = 0 # Each worker has its own
def process(self, x: int) -> int:
self.counter += x
return self.counter
pool = GoodWorker.options(mode="thread", max_workers=5).init()
Advanced Patterns¶
Dynamic Pool Resizing (Future)¶
Priority Queues (Future)¶
# TODO: Not yet implemented
# Future API for priority-based dispatch
# pool.process(data, priority=10)
Health Checking¶
from concurry import Worker
class HealthCheckedWorker(Worker):
def __init__(self):
self.healthy = True
def process(self, data: any) -> any:
if not self.healthy:
raise RuntimeError("Worker unhealthy")
return data * 2
def health_check(self) -> bool:
return self.healthy
# Periodically check worker health
# (Manual implementation - not built-in)
See Also¶
- Workers Guide - Detailed worker documentation
- Retry Mechanisms Guide - Using retries with pools
- Limits Guide - Resource limits and rate limiting
- Futures Guide - Working with futures
- Getting Started - Basic concepts