Architecture: Workers and Worker Pools¶
This document provides a comprehensive technical overview of the worker and worker pool architecture in Concurry.
Table of Contents¶
- Overview
- Core Abstractions
- Worker Base Class
- WorkerProxy Hierarchy
- WorkerProxyPool Hierarchy
- WorkerBuilder
- Execution Modes
- Worker Lifecycle
- Pool Architecture
- Critical Implementation Details
- Adding New Worker Types
- Limitations and Gotchas
Overview¶
Concurry implements a Worker/Proxy pattern where: - Worker: User-defined class with business logic (plain Python class) - WorkerProxy: Wraps Worker and manages execution context (thread, process, Ray actor, etc.) - WorkerProxyPool: Manages multiple WorkerProxy instances with load balancing
This separation allows the same Worker code to run in different execution contexts (sync, thread, process, asyncio, Ray) without modification.
Key Design Principles¶
- Worker classes are plain Python - No inheritance requirements beyond
Workerbase class - Proxy classes handle execution - All concurrency, serialization, and communication logic
- Typed validation - All proxies and pools inherit from
morphic.Typedfor configuration validation - No shared state between workers - Each worker maintains isolated state
- Unified Future API - All execution modes return BaseFuture subclasses
- No silent fallbacks - Explicit configuration, noisy failures over silent degradation
Principle 6: No Silent Fallbacks¶
Core Tenet: Concurry must fail noisily rather than silently degrade performance.
Why This Matters:
Silent fallbacks are the bane of concurrency frameworks. When a system automatically falls back to slower implementations without warning, users experience: - Mysterious slowdowns that are hard to diagnose - Production surprises when code suddenly runs 10-100x slower - False confidence in development that breaks in production - Debugging nightmares trying to find why "the same code" performs differently
Design Policy:
✅ DO: Have multiple implementations for flexibility
# Example: Multiple limit backend implementations
- InMemorySharedLimitSet (fast, thread-safe, single-process)
- MultiprocessSharedLimitSet (slower, multi-process safe)
- RaySharedLimitSet (distributed, Ray cluster)
✅ DO: Select implementation via explicit configuration
# User explicitly chooses based on their execution mode
limits = LimitSet(
limits=[...],
mode="ray" # ← Explicit choice
)
❌ DON'T: Auto-fallback when configured implementation fails
# BAD: Silent fallback
try:
return RaySharedLimitSet(...)
except RayNotAvailableError:
# ❌ Silently use slower implementation
return InMemorySharedLimitSet(...)
✅ DO: Fail loudly with actionable error
# GOOD: Noisy failure
if mode == "ray" and not ray.is_initialized():
raise RuntimeError(
"Ray mode selected but Ray is not initialized. "
"Call ray.init() before creating workers, or use mode='thread'."
)
Real-World Example:
# User configures for Ray (expects distributed performance)
pool = MyWorker.options(
mode="ray",
max_workers=100 # Expects 100 Ray actors
).init()
# WRONG: Silent fallback to thread pool
# → User thinks they have 100 distributed workers
# → Actually running 100 threads in one process
# → 10x slower, saturates single machine
# → Production outage with no clear cause
# RIGHT: Noisy failure
# RuntimeError: Ray mode selected but Ray is not initialized.
# → User immediately knows what's wrong
# → Can fix or choose different mode
# → No mysterious slowdowns
Implementation Guidelines:
- Configuration is a contract - If user specifies
mode="ray", they expect Ray - Fail at initialization - Check requirements during
.init(), not during execution - Clear error messages - Tell user exactly what's wrong and how to fix it
- No implicit downgrades - Don't automatically use slower implementation
- Document requirements - Each mode's dependencies must be clear
Exceptions to This Rule:
The only acceptable "fallback" is when it's semantically equivalent and performance-neutral:
# OK: Fallback with equivalent performance
if len(my_list) == 0: # ← Could use "not my_list" but explicit is better
return default_value
This is not really a fallback - it's just handling an edge case with no performance implication.
Core Abstractions¶
Worker Base Class¶
class Worker:
"""User-facing base class for all workers."""
@classmethod
def options(cls, mode, blocking, max_workers, ...) -> WorkerBuilder:
"""Configure worker execution options."""
...
def __init__(self, *args, **kwargs):
"""User-defined initialization - completely flexible signature."""
...
Key Characteristics:
- Does NOT inherit from morphic.Typed (allows flexible __init__ signatures)
- Supports cooperative multiple inheritance with Typed/BaseModel across ALL modes
- Validation decorators (@validate, @validate_call) work with ALL modes including Ray
- User-defined workers are wrapped by _create_worker_wrapper() to inject limits and retry logic
- Typed/BaseModel workers automatically use composition pattern for seamless compatibility
Model Inheritance Support:
- ✅ Worker + morphic.Typed: Full support for ALL modes (sync, thread, process, asyncio, ray)
- ✅ Worker + pydantic.BaseModel: Full support for ALL modes (sync, thread, process, asyncio, ray)
- ✅ @morphic.validate / @pydantic.validate_call: Works with ALL modes including Ray
- ✅ Automatic Composition Wrapper: Typed/BaseModel workers transparently use composition pattern
Universal Composition Wrapper for Typed/BaseModel Workers¶
Concurry automatically applies a composition wrapper when workers inherit from morphic.Typed or pydantic.BaseModel. This provides seamless compatibility across ALL execution modes without requiring any code changes from users.
The Problems Being Solved¶
Problem 1: Infrastructure Method Wrapping
When retry logic is applied via __getattribute__, it can accidentally wrap Pydantic/Typed infrastructure methods:
class MyWorker(Worker, Typed):
name: str
value: int
def process(self, x: int) -> int:
return x * self.value
worker = MyWorker.options(
mode="thread",
retry_config=RetryConfig(
num_retries=3,
retry_until=lambda result, **ctx: validate_result(result)
)
).init(name="test", value=10)
# PROBLEM: Pydantic's post_set_validate_inputs() gets wrapped with retry logic!
# When setting attributes, retry_until is called with wrong signature
# Result: TypeError or unexpected retry behavior
Problem 2: Ray Serialization Conflicts
Ray's ray.remote() decorator conflicts with Pydantic's __setattr__ implementation:
class MyWorker(Worker, BaseModel):
name: str
value: int
def process(self, x: int) -> int:
return x * self.value
# PROBLEM: Ray wraps the class and modifies __setattr__
# This breaks Pydantic's frozen model validation
worker = MyWorker.options(mode="ray").init(name="test", value=10)
# Result: ValueError or serialization errors
Problem 3: Internal Method Calls Bypassing Retry Logic
When BaseModel workers have methods that internally call other methods with retry configuration, those internal calls could bypass retry validation:
from concurry import worker, async_gather
from pydantic import BaseModel
from typing import List, Dict, Any
@worker(mode="asyncio")
class LLM(BaseModel):
model_name: str
async def call_llm(self, prompt: str) -> Dict[str, Any]:
"""Single LLM call with validation."""
response = await litellm.acompletion(
model=self.model_name,
messages=[{"role": "user", "content": prompt}]
)
return {"response": response.choices[0].message.content}
async def call_batch(self, prompts: List[str]) -> List[str]:
"""Batch calls - internally calls call_llm()."""
tasks = [self.call_llm(prompt) for prompt in prompts]
results = await async_gather(tasks)
return [r["response"] for r in results]
# Configure with retry_until for call_llm
llm = LLM.options(
mode="asyncio",
num_retries={"*": 0, "call_llm": 0},
retry_until={
"*": None,
"call_llm": lambda result, **ctx: validate_json(result["response"])
}
).init(model_name="gpt-4")
# PROBLEM: When call_batch() internally calls call_llm(),
# the validator wasn't being invoked on internal calls!
# Result: Invalid responses slip through validation
How the Composition Wrapper Solves These Problems¶
Instead of using inheritance, the composition wrapper creates a plain Python class that holds the Typed/BaseModel worker internally and delegates only user-defined methods:
# User writes this:
class MyWorker(Worker, Typed):
name: str
value: int
def process(self, x: int) -> int:
return x * self.value
# Concurry automatically transforms it to this (conceptually):
class MyWorker_CompositionWrapper(Worker):
def __init__(self, name: str, value: int):
self._wrapped_instance = MyWorker_Original(name=name, value=value)
def process(self, x: int) -> int:
# Delegate to wrapped instance
return self._wrapped_instance.process(x)
# Infrastructure methods (model_dump, model_validate, etc.) NOT exposed
Benefits:
- Infrastructure Isolation: Pydantic methods never exposed at wrapper level → retry logic can't wrap them
- Ray Compatibility: Wrapper is plain Python → no
__setattr__conflicts with Ray - Transparent to Users: Workers behave identically, validation still works
- Consistent Behavior: Same code path for all modes (sync, thread, process, asyncio, ray)
- Performance Optimized: Method delegation uses captured closures to avoid repeated
getattr()calls - Internal Call Validation: Methods called internally by other methods still go through retry/validation logic
When is the Composition Wrapper Applied?¶
The wrapper is applied automatically by WorkerBuilder when:
- Worker class inherits from
morphic.TypedORpydantic.BaseModel - Check is performed at worker creation time (in
.init()) - Applied for ALL execution modes (not just Ray)
Detection Logic (_should_use_composition_wrapper):
def _should_use_composition_wrapper(worker_cls: Type) -> bool:
"""Check if worker needs composition wrapper.
Note: Check Typed FIRST as it's a subclass of BaseModel.
"""
# Check for Typed first (extends BaseModel)
try:
from morphic import Typed
if isinstance(worker_cls, type) and issubclass(worker_cls, Typed):
return True
except ImportError:
pass
# Check for BaseModel
try:
from pydantic import BaseModel
if isinstance(worker_cls, type) and issubclass(worker_cls, BaseModel):
return True
except ImportError:
pass
return False
Implementation Details¶
Step 1: Wrapper Creation (_create_composition_wrapper):
The wrapper is created dynamically at runtime:
def _create_composition_wrapper(worker_cls: Type) -> Type:
"""Create composition wrapper for BaseModel/Typed workers."""
from . import Worker as WorkerBase
class CompositionWrapper(WorkerBase):
"""Auto-generated wrapper using composition pattern.
Holds BaseModel/Typed instance internally and delegates
user-defined method calls to it.
"""
def __init__(self, *args, **kwargs):
# Create wrapped instance with user's args/kwargs
self._wrapped_instance = worker_cls(*args, **kwargs)
def __getattr__(self, name: str):
# Block infrastructure methods
if _is_infrastructure_method(name):
raise AttributeError(
f"Infrastructure method '{name}' not available. "
f"Only user-defined methods are exposed."
)
return getattr(self._wrapped_instance, name)
# Copy user-defined methods to wrapper class
import inspect
for attr_name in dir(worker_cls):
# Skip private methods and infrastructure methods
if attr_name.startswith("_"):
continue
if _is_infrastructure_method(attr_name):
continue
if attr_name not in worker_cls.__dict__:
continue # Inherited from parent
attr = getattr(worker_cls, attr_name)
if not callable(attr) or isinstance(attr, type):
continue
# Create delegating method (with performance optimization)
is_async = inspect.iscoroutinefunction(attr)
setattr(CompositionWrapper, attr_name,
make_method(attr_name, is_async, attr))
return CompositionWrapper
Step 2: Infrastructure Method Detection (_is_infrastructure_method):
To avoid wrapping Pydantic methods, we maintain a cached set of method names:
def _is_infrastructure_method(
method_name: str,
_cache: Optional[Dict[str, Set[str]]] = None
) -> bool:
"""Check if method belongs to Typed or BaseModel infrastructure.
Uses function-level caching via mutable default argument for O(1) lookup.
Cache is populated on first call and reused for all subsequent calls.
"""
if _cache is None:
_cache = {}
# Populate cache on first call
if len(_cache) == 0:
try:
from morphic import Typed
_cache["typed_methods"] = set(dir(Typed))
except ImportError:
_cache["typed_methods"] = set()
try:
from pydantic import BaseModel
_cache["basemodel_methods"] = set(dir(BaseModel))
except ImportError:
_cache["basemodel_methods"] = set()
# O(1) lookup in cached sets
return (method_name in _cache["typed_methods"] or
method_name in _cache["basemodel_methods"])
Step 3: Performance-Optimized Method Delegation:
Critical optimization: Capture unbound method in closure to avoid getattr() on every call:
def make_method(method_name, is_async, unbound_method):
"""Create delegating method with captured unbound method.
OPTIMIZATION: Captures unbound_method in closure to avoid slow
getattr(self._wrapped_instance, method_name) on every invocation.
This saves ~200ns per call, critical for tight loops.
"""
if is_async:
async def async_delegating_method(self, *args, **kwargs):
# Fast: Call unbound method with wrapped instance directly
return await unbound_method(self._wrapped_instance, *args, **kwargs)
async_delegating_method.__name__ = method_name
return async_delegating_method
else:
def delegating_method(self, *args, **kwargs):
# Fast: Call unbound method with wrapped instance directly
return unbound_method(self._wrapped_instance, *args, **kwargs)
delegating_method.__name__ = method_name
return delegating_method
Without optimization (slow):
def delegating_method(self, *args, **kwargs):
method = getattr(self._wrapped_instance, method_name) # ~200ns overhead!
return method(*args, **kwargs)
With optimization (fast):
def delegating_method(self, *args, **kwargs):
return unbound_method(self._wrapped_instance, *args, **kwargs) # Direct call
Step 4: Limits Injection:
The limits attribute must be accessible to user methods. Since user methods execute on _wrapped_instance, limits are set there:
# In _create_worker_wrapper:
class WorkerWithLimitsAndRetry(worker_cls):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# ... create limit_pool ...
# Check if this is a composition wrapper
if hasattr(self, "_wrapped_instance"):
# Set limits on wrapped instance (where user methods execute)
object.__setattr__(self._wrapped_instance, "limits", limit_pool)
else:
# Set limits on self (plain worker)
object.__setattr__(self, "limits", limit_pool)
Why object.__setattr__? Bypasses Pydantic's frozen model validation, allowing us to inject limits after construction.
Step 5: Internal Method Call Retry Wrapping:
To ensure internal method calls go through retry logic, the wrapper dynamically modifies _wrapped_instance's class:
# In WorkerWithLimitsAndRetry.__init__:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Cache composition flag for performance (critical in Ray)
_is_composition = hasattr(self, "_wrapped_instance")
object.__setattr__(self, "_is_composition_wrapper", _is_composition)
# CRITICAL FIX: Replace _wrapped_instance's class with retry-aware version
if _is_composition:
original_instance = self._wrapped_instance
original_class = type(original_instance)
class WrappedInstanceWithRetry(original_class):
def __getattribute__(self, name: str):
"""Apply retry wrapping to method calls."""
attr = super().__getattribute__(name)
# Apply retry logic (same as WorkerWithLimitsAndRetry)
if (
has_retry
and not name.startswith("_")
and callable(attr)
and not isinstance(attr, type)
):
# Get retry config for this method
if name in retry_configs:
method_config = retry_configs[name]
else:
method_config = retry_configs.get("*")
if method_config is None:
return attr
if method_config.num_retries == 0 and method_config.retry_until is None:
return attr
# Wrap with retry logic
wrapped = create_retry_wrapper(
attr, method_config, name, original_class.__name__
)
return wrapped
return attr
# Replace instance's class (Python allows this!)
original_instance.__class__ = WrappedInstanceWithRetry
Why This Matters:
When a method like call_batch() internally calls call_llm(), the call goes through:
CompositionWrapper.call_batch()→ delegates to_wrapped_instance.call_batch()- Inside
call_batch():self.call_llm()→ goes throughWrappedInstanceWithRetry.__getattribute__ - Retry wrapper applied → validation occurs →
RetryValidationErrorraised if validation fails
Without this fix, internal calls would go directly to the unwrapped method, bypassing retry/validation entirely.
Example Scenario:
@worker(mode="asyncio")
class LLM(BaseModel):
async def call_llm(self, prompt: str) -> dict:
"""Validated method."""
response = await api_call(prompt)
return {"response": response}
async def call_batch(self, prompts: List[str]) -> List[dict]:
"""Internally calls call_llm()."""
tasks = [self.call_llm(p) for p in prompts] # ← Internal calls
return await async_gather(tasks)
llm = LLM.options(
num_retries={"*": 0, "call_llm": 0},
retry_until={"*": None, "call_llm": validator}
).init()
# When call_batch() calls call_llm() internally:
# ✅ With Step 5: validator runs, RetryValidationError raised if invalid
# ❌ Without Step 5: validator skipped, invalid data returned
Behavior and Edge Cases¶
User-Defined Methods:
class MyWorker(Worker, Typed):
name: str
def process(self, x: int) -> int:
return x * 2
worker = MyWorker.options(mode="thread").init(name="test")
result = worker.process(5).result() # ✅ Works - delegates to wrapped instance
Infrastructure Methods (blocked at wrapper level):
worker.model_dump() # ❌ AttributeError: Infrastructure method not available
worker.model_validate({...}) # ❌ AttributeError
worker.__pydantic_fields__ # ❌ AttributeError
Validation Still Works:
# Validation happens during __init__ of wrapped instance
worker = MyWorker.options(mode="thread").init(name=123)
# ❌ ValidationError: name must be string
Async Methods:
class AsyncWorker(Worker, Typed):
name: str
async def fetch(self, url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
worker = AsyncWorker.options(mode="asyncio").init(name="fetcher")
result = worker.fetch("https://example.com").result() # ✅ Works
Accessing Validated Fields:
class MyWorker(Worker, Typed):
multiplier: int
def process(self, x: int) -> int:
return x * self.multiplier # ✅ Accesses validated field
worker = MyWorker.options(mode="ray").init(multiplier=3)
result = worker.process(5).result() # Returns 15
Limits Integration:
class MyWorker(Worker, Typed):
name: str
def process(self, x: int) -> int:
with self.limits.acquire(requested={"tokens": 100}):
# ✅ limits accessible via self
return x * 2
worker = MyWorker.options(
mode="ray",
limits=[RateLimit(key="tokens", capacity=1000, window_seconds=1)]
).init(name="test")
Performance Characteristics¶
Method Call Overhead:
| Worker Type | Plain Worker | Composition Wrapper | Overhead |
|---|---|---|---|
| Sync | 3.2µs/call | 3.2µs/call | 0% (optimized) |
| Thread | 77.9µs/call | 77.9µs/call | 0% (optimized) |
| Asyncio | 15.6µs/submit | 15.6µs/submit | 0% (optimized) |
| Ray | ~2ms/call | ~2ms/call | 0% (network dominates) |
Why Zero Overhead?
- Unbound Method Capture: Avoids
getattr()on every call (~200ns saved) - Method Caching: Wrapper methods created once, cached forever
- Direct Call:
unbound_method(instance, *args)is as fast asinstance.method(*args)
Memory Overhead:
- One extra object per worker (
_wrapped_instance) - Negligible: ~64 bytes for object header + attributes
Comparison: Composition vs Inheritance¶
With Composition (current implementation):
class MyWorker_CompositionWrapper(Worker):
def __init__(self, name: str, value: int):
self._wrapped_instance = MyWorker_Original(name, value)
def process(self, x: int) -> int:
return self._wrapped_instance.process(x)
# Retry logic applied via __getattribute__ on wrapper
# Only 'process' exposed → infrastructure methods safe
Without Composition (old approach, problematic):
class MyWorker(Worker, Typed):
name: str
value: int
def process(self, x: int) -> int:
return x * self.value
# Retry logic applied via __getattribute__ on MyWorker
# ALL methods exposed → infrastructure methods get wrapped!
# Result: post_set_validate_inputs() wrapped with retry logic → crashes
Lifecycle Integration¶
The composition wrapper is applied early in the worker lifecycle:
User calls Worker.options(mode, ...).init(args, kwargs)
↓
WorkerBuilder created
↓
WorkerBuilder.init() called
↓
_apply_composition_wrapper_if_needed()
↓
├─ Check: _should_use_composition_wrapper(worker_cls)
│ ├─ Is subclass of Typed? → YES
│ └─ Is subclass of BaseModel? → YES
│ ↓
├─ Create wrapper: worker_cls = _create_composition_wrapper(worker_cls)
│ ├─ Create CompositionWrapper class
│ ├─ Copy user-defined methods with delegation
│ ├─ Block infrastructure methods
│ └─ Return wrapper class
│ ↓
└─ Continue with wrapped class
↓
_create_worker_wrapper(worker_cls, limits, retry)
↓
Create WorkerProxy (Thread/Process/Ray/etc.)
↓
Worker instance created from wrapped class
↓
User methods work transparently
Testing and Validation¶
The composition wrapper is tested comprehensively:
- Basic functionality: Method calls work correctly
- Validation: Pydantic validation errors raised properly
- Field access: Validated fields accessible in methods
- Limits integration:
self.limitsworks as expected - Worker pools: Composition wrapper works with pools
- All modes: Tested across sync, thread, process, asyncio, ray
- Edge cases: Optional fields, defaults, constraints, hooks
- Performance: Meets performance targets for tight loops
See tests/core/worker/test_pydantic_integration.py for comprehensive test coverage.
Why Universal (All Modes)?¶
Initially, the composition wrapper was Ray-specific to solve the serialization issue. However, applying it universally provides significant benefits:
- Consistent Behavior: Same code path for all modes eliminates edge cases
- Simpler Logic: No mode-specific branching in
_create_worker_wrapper - Infrastructure Isolation: Prevents retry logic wrapping Pydantic methods in ALL modes
- Easier Maintenance: One implementation to test and optimize
- Future-Proof: Any mode that conflicts with Pydantic automatically works
The performance optimization (unbound method capture) ensures zero overhead, making the universal application practical.
WorkerProxy Hierarchy¶
All WorkerProxy classes inherit from WorkerProxy(Typed, ABC):
WorkerProxy (Typed, ABC)
├── SyncWorkerProxy # Direct execution in current thread
├── ThreadWorkerProxy # Thread + queue-based communication
├── ProcessWorkerProxy # Process + multiprocessing queues + cloudpickle
├── AsyncioWorkerProxy # Event loop + sync thread for mixed execution
└── RayWorkerProxy # Ray actor + ObjectRef futures
Common Interface:
class WorkerProxy(Typed, ABC):
# Public configuration (immutable after creation)
worker_cls: Type[Worker]
blocking: bool
unwrap_futures: bool
init_args: tuple
init_kwargs: dict
limits: Optional[Any] # LimitPool instance
retry_config: Optional[Any] # RetryConfig instance
max_queued_tasks: Optional[int]
mode: ClassVar[ExecutionMode] # Set by subclass
# Private attributes (mutable, not serialized)
_stopped: bool = PrivateAttr(default=False)
_options: dict = PrivateAttr(default_factory=dict)
_method_cache: dict = PrivateAttr(default_factory=dict)
_submission_semaphore: Optional[Any] = PrivateAttr(default=None)
# Abstract methods that subclasses must implement
def _execute_method(self, method_name: str, *args, **kwargs) -> BaseFuture:
"""Execute a worker method and return a future."""
...
# Common behavior
def __getattr__(self, name: str) -> Callable:
"""Intercept method calls and dispatch via _execute_method."""
...
def stop(self, timeout: float = 30) -> None:
"""Stop the worker and clean up resources."""
...
def __enter__(self) / __exit__(self):
"""Context manager support for automatic cleanup."""
...
Key Implementation Rules:
- Mode as ClassVar: Each proxy sets
mode: ClassVar[ExecutionMode]at class level, NOT passed as parameter - Typed Configuration: All config fields are immutable public attributes validated by Pydantic
- Private Attributes: Use
PrivateAttr()for mutable state, initialized inpost_initialize() - Method Caching:
__getattr__caches method wrappers in_method_cachefor performance - Submission Queue: Use
_submission_semaphore(BoundedSemaphore) to limit in-flight tasks per worker - Future Unwrapping: Automatically unwrap BaseFuture arguments before execution (unless
unwrap_futures=False)
SyncWorkerProxy¶
Execution Model: Direct execution in current thread
Future Type: SyncFuture (caches result/exception at creation)
class SyncWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Sync
_worker: Any = PrivateAttr() # Worker instance stored directly
def _execute_method(self, method_name: str, *args, **kwargs) -> SyncFuture:
method = getattr(self._worker, method_name)
try:
result = _invoke_function(method, *args, **kwargs)
return SyncFuture(result_value=result)
except Exception as e:
return SyncFuture(exception_value=e)
Characteristics:
- No threads, no queues, no asynchronous communication
- Async functions executed via asyncio.run() (blocks until complete)
- Zero overhead for simple testing and debugging
- Submission queue bypassed (max_queued_tasks ignored)
ThreadWorkerProxy¶
Execution Model: Dedicated worker thread + command queue
Future Type: ConcurrentFuture (wraps concurrent.futures.Future)
class ThreadWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Threads
command_queue_timeout: confloat(ge=0) # From global config
_command_queue: Any = PrivateAttr() # queue.Queue
_futures: Dict[str, Any] = PrivateAttr() # uuid -> ConcurrentFuture
_futures_lock: Any = PrivateAttr() # threading.Lock
_thread: Any = PrivateAttr() # threading.Thread
Architecture: 1. Main thread (client): Submits commands to queue, returns future immediately 2. Worker thread: Processes commands from queue, sets results on futures
Communication Flow:
Client Thread Worker Thread
│ │
│ 1. Create future │
│ 2. Store in _futures dict │
│ 3. Put (uuid, method, args) │
├───────────────────────────────>│
│ 4. Return future │ 5. Get command from queue
│ │ 6. Execute method
│ │ 7. Set result on future
│ │ 8. Remove from _futures dict
│ 9. future.result() blocks │
│ until worker sets result │
Characteristics:
- Async functions executed via asyncio.run() in worker thread (no concurrency)
- Command queue timeout checked via queue.get(timeout=command_queue_timeout)
- Futures tracked in dict for cancellation on stop()
ProcessWorkerProxy¶
Execution Model: Separate process + multiprocessing queues + cloudpickle serialization
Future Type: ConcurrentFuture (wraps concurrent.futures.Future)
class ProcessWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Processes
mp_context: Literal["fork", "spawn", "forkserver"] = "fork"
result_queue_timeout: confloat(ge=0)
result_queue_cleanup_timeout: confloat(ge=0)
_command_queue: Any = PrivateAttr() # mp.Queue
_result_queue: Any = PrivateAttr() # mp.Queue
_futures: dict = PrivateAttr() # uuid -> PyFuture
_futures_lock: Any = PrivateAttr() # threading.Lock
_process: Any = PrivateAttr() # mp.Process
_result_thread: Any = PrivateAttr() # threading.Thread
Architecture:
1. Main process (client): Sends commands to _command_queue
2. Worker process: Executes commands, sends results to _result_queue
3. Result thread: Reads from _result_queue, sets results on futures
Communication Flow:
Main Process Worker Process Result Thread
│ │ │
│ 1. Serialize args │ │
│ 2. Put command in queue │ │
├──────────────────────────>│ │
│ 3. Return future │ 4. Get command │
│ │ 5. Deserialize │
│ │ 6. Execute method │
│ │ 7. Serialize result │
│ │ 8. Put in result_queue
│ ├─────────────────────>│
│ │ │ 9. Get result
│ │ │ 10. Deserialize
│ │ │ 11. Set on future
│ 12. future.result() │ │
Characteristics:
- Worker class serialization: Uses cloudpickle.dumps() to serialize worker class
- Async functions: Executed via asyncio.run() in worker process (no concurrency)
- Exception preservation: Original exception types preserved across process boundary
- Separate result thread: Required because Queue.get() from another process blocks
- Multiprocessing context: Supports fork, spawn, or forkserver
Critical Serialization Details: - Worker class is serialized ONCE at proxy creation - Limits passed as list of Limit objects (or LimitPool), recreated inside worker process - RetryConfig serialized and passed to worker process - Args/kwargs serialized per method call
Process Mode: Cloudpickle and Multiprocessing Context¶
Critical Design Decision: Process workers use cloudpickle for all user-provided objects and forkserver as the default multiprocessing context.
Problem Context:
Users frequently define functions, classes, and retry filters in Jupyter notebooks or test files:
# Common user workflow in Jupyter notebook
def my_function(x): # Local function - NOT picklable by standard pickle
return x * 2
def should_retry(exception, **context): # Local retry filter
return isinstance(exception, ValueError)
worker = TaskWorker.options(
mode="process",
retry_on=[should_retry] # Needs cloudpickle!
).init(fn=my_function) # Needs cloudpickle!
Standard pickle cannot serialize:
- Local functions (defined in function scope)
- Lambda functions
- Functions/classes defined in __main__ (Jupyter notebooks, test files)
- Closures with local variable capture
This would break a core user workflow, forcing users to define all functions at module level.
Solution: Cloudpickle for User-Provided Objects
ProcessWorkerProxy uses cloudpickle (not standard pickle) for:
- Worker class (
worker_cls): Allows local Worker classes in notebooks/tests - Init arguments (
init_args,init_kwargs): Allows local functions as parameters - Retry configuration (
retry_config): Allows local retry filters - Task functions (TaskWorker): Allows local functions, lambdas
# In ProcessWorkerProxy.post_initialize()
worker_cls_bytes = cloudpickle.dumps(self.worker_cls)
init_args_bytes = cloudpickle.dumps(self.init_args)
init_kwargs_bytes = cloudpickle.dumps(self.init_kwargs)
retry_config_bytes = cloudpickle.dumps(self.retry_config)
# Later unpickled in worker process
worker_cls = cloudpickle.loads(worker_cls_bytes)
init_args = cloudpickle.loads(init_args_bytes)
init_kwargs = cloudpickle.loads(init_kwargs_bytes)
retry_config = cloudpickle.loads(retry_config_bytes)
What Does NOT Use Cloudpickle:
- Multiprocessing primitives (Queues, Locks, Semaphores): Standard
multiprocessingserialization - Manager proxies (
MultiprocessSharedLimitSet): Custom__getstate__/__setstate__ - Internal state (futures dict, threads): Not serialized
Multiprocessing Context: Fork vs Spawn vs Forkserver
Python's multiprocessing supports three start methods:
| Context | Startup Time | Memory | Thread Safety | Ray Client Compatible |
|---|---|---|---|---|
fork |
~10ms | Shared (copy-on-write) | ❌ UNSAFE | ❌ BREAKS |
spawn |
~10-20s on Linux, ~1-2s on macOS | Independent | ✅ Safe | ✅ Works |
forkserver |
~200ms | Independent | ✅ Safe | ✅ Works |
Why Fork is UNSAFE:
fork() copies the entire process memory, including:
- Active threads (which continue running in child!)
- Open file descriptors
- gRPC connections (Ray client mode)
- Mutexes and condition variables (can be in inconsistent state)
The Ray Client Problem:
Ray client mode (common user workflow) uses gRPC for communication with remote cluster. gRPC spawns background threads. If you fork() while gRPC threads are active:
Parent Process Forked Child Process
gRPC Thread 1 ──fork()──> gRPC Thread 1 (CORRUPTED - mid-operation)
gRPC Thread 2 ──fork()──> gRPC Thread 2 (CORRUPTED - holding mutex)
Main Thread ──fork()──> Main Thread (tries to use gRPC -> DEADLOCK/SEGFAULT)
Real Error (before fix):
[mutex.cc : 2443] RAW: Check w->waitp->cond == nullptr failed
Check failed: next_worker->state == KICKED
Segmentation fault (core dumped)
Why Spawn is TOO SLOW:
spawn starts a fresh Python interpreter, which must:
1. Initialize Python runtime
2. Import all modules
3. Load dependencies
4. Reconstruct worker state
Benchmarks:
- Process mode with fork: ~10ms per worker startup
- Process mode with spawn: ~10-20s per worker startup on Linux (1000x slower!)
- Process mode with forkserver: ~200ms per worker startup (20x slower than fork, 50x faster than spawn)
Why Forkserver is the Best Balance:
forkserver works by:
1. Starting a clean server process early (before any threads)
2. Server process forks worker processes on demand
3. Forked workers inherit minimal state (no gRPC threads!)
Benefits: - ✅ Safe: No active threads in server process when forking - ✅ Fast: ~200ms startup vs 10-20s for spawn - ✅ Compatible: Works with Ray client + process workers concurrently - ✅ Common workflow: Users can have Ray client connected 24/7 and use process workers
Configuration:
from concurry.config import global_config
# Default (recommended)
assert global_config.defaults.mp_context == "forkserver"
# Override if needed (not recommended)
with global_config.temp_config(mp_context="spawn"):
worker = MyWorker.options(mode="process").init()
Historical Approaches (Failed):
- Attempt 1: Use
forkwithManager()- FAILED (segfaults with Ray client) - Attempt 2: Use
spawnfor everything - FAILED (10-20s startup, unusable) - Attempt 3: Use
spawnfor Manager,forkserverfor workers - FAILED (Manager proxy pickling issues) - Final Solution: Use
forkserverfor both Manager and workers + cloudpickle for user objects - SUCCESS
Current Architecture (October 2025):
# ProcessWorkerProxy creates worker process:
ctx = multiprocessing.get_context("forkserver") # From global_config
process = ctx.Process(
target=_process_worker_main,
args=(
cloudpickle.dumps(worker_cls), # ← cloudpickle
cloudpickle.dumps(init_args), # ← cloudpickle
cloudpickle.dumps(init_kwargs), # ← cloudpickle
limits, # ← Manager proxies (standard pickle)
cloudpickle.dumps(retry_config), # ← cloudpickle
command_queue, # ← multiprocessing.Queue
result_queue, # ← multiprocessing.Queue
)
)
# MultiprocessSharedLimitSet creates Manager:
manager_ctx = multiprocessing.get_context("forkserver") # Same context!
manager = manager_ctx.Manager()
# Manager proxies are pickled using their custom __reduce__ methods
Key Insight: Manager proxies have custom __reduce__ methods that handle their own serialization. When workers are created, the Manager proxies are pickled (using their custom methods) and unpickled in the worker process, automatically reconnecting to the Manager server. This works with any mp_context as long as both Manager and workers use the same context.
AsyncioWorkerProxy¶
Execution Model: Event loop thread + dedicated sync thread for sync methods
Future Type: ConcurrentFuture (wraps concurrent.futures.Future)
class AsyncioWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Asyncio
loop_ready_timeout: confloat(ge=0)
thread_ready_timeout: confloat(ge=0)
sync_queue_timeout: confloat(ge=0)
_loop: Any = PrivateAttr(default=None) # asyncio.EventLoop
_worker: Any = PrivateAttr(default=None) # Worker instance
_loop_thread: Any = PrivateAttr() # threading.Thread (runs event loop)
_sync_thread: Any = PrivateAttr() # threading.Thread (runs sync methods)
_sync_queue: Any = PrivateAttr() # queue.Queue (for sync methods)
_futures: Dict[str, Any] = PrivateAttr() # uuid -> ConcurrentFuture
Architecture:
1. Event loop thread: Runs asyncio event loop for async methods
2. Sync worker thread: Executes sync methods without blocking event loop
3. Main thread: Routes method calls to appropriate thread
Method Routing:
def _execute_method(self, method_name, *args, **kwargs):
method = getattr(self._worker, method_name)
is_async = asyncio.iscoroutinefunction(method)
if is_async:
# Route to event loop for concurrent execution
self._loop.call_soon_threadsafe(schedule_async_task)
else:
# Route to sync thread to avoid blocking event loop
self._sync_queue.put((future, method_name, args, kwargs))
Characteristics: - True async concurrency: Multiple async methods can run concurrently in event loop - Sync method isolation: Sync methods don't block event loop - Best for I/O-bound async: HTTP requests, database queries, WebSocket connections - 10-50x speedup: For concurrent I/O operations vs sequential execution - ~13% overhead: For sync methods vs ThreadWorker (minimal impact)
Performance Comparison (30 HTTP requests, 50ms latency each): - SyncWorker: 1.66s (sequential) - ThreadWorker: 1.66s (sequential) - ProcessWorker: 1.67s (sequential) - AsyncioWorker: 0.16s (concurrent) ✅ 10x faster!
RayWorkerProxy¶
Execution Model: Ray actor (distributed process) + ObjectRef futures
Future Type: RayFuture (wraps Ray ObjectRef)
class RayWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Ray
actor_options: Optional[Dict[str, Any]] = None # Ray resource options
_ray_actor: Any = PrivateAttr() # Ray actor handle
_futures: Dict[str, Any] = PrivateAttr() # uuid -> RayFuture
_futures_lock: Any = PrivateAttr() # threading.Lock
Architecture: 1. Client process: Holds actor handle, submits method calls 2. Ray actor: Separate process (possibly remote machine), executes methods 3. Ray cluster: Manages scheduling, data transfer, fault tolerance
Communication Flow:
Client Process Ray Actor Ray Cluster
│ │ │
│ 1. Get actor handle │ │
│ 2. actor.method.remote() │ │
├─────────────────────────────────────────────────>│
│ 3. Return ObjectRef │ │ 4. Schedule task
│ │<──────────────────────┤
│ │ 5. Execute method │
│ │ 6. Store result │
│ ├──────────────────────>│
│ 7. ray.get(ObjectRef) │ │ 8. Retrieve result
│<─────────────────────────────────────────────────┤
Characteristics:
- Zero-copy optimization: RayFuture → ObjectRef passed directly (no serialization)
- Cross-worker futures: Other BaseFuture types materialized before passing
- Native async support: Ray handles async methods automatically
- Resource allocation: Via actor_options={"num_cpus": 2, "num_gpus": 1, "resources": {...}}
- Distributed execution: Actor can run on any node in Ray cluster
- Fault tolerance: Ray handles actor failures and restarts
Future Unwrapping with Zero-Copy:
def _unwrap_future_for_ray(obj):
if isinstance(obj, RayFuture):
return obj._object_ref # Zero-copy: pass ObjectRef directly
elif isinstance(obj, BaseFuture):
return obj.result() # Cross-worker: materialize value
return obj
Retry Logic for Ray:
Ray actors bypass __getattribute__, so retry logic must be pre-applied to methods at class level:
worker_cls_to_use = _create_worker_wrapper(
self.worker_cls,
self.limits,
self.retry_config,
for_ray=True # Pre-wrap methods at class level
)
WorkerProxyPool Hierarchy¶
All WorkerProxyPool classes inherit from WorkerProxyPool(Typed, ABC):
WorkerProxyPool (Typed, ABC)
├── InMemoryWorkerProxyPool # Sync, Thread, Asyncio workers
├── MultiprocessWorkerProxyPool # Process workers
└── RayWorkerProxyPool # Ray workers
Common Interface:
class WorkerProxyPool(Typed, ABC):
# Public configuration (immutable after creation)
worker_cls: Type[Worker]
mode: ExecutionMode
max_workers: int
load_balancing: LoadBalancingAlgorithm
on_demand: bool
blocking: bool
unwrap_futures: bool
limits: Optional[Any] # Shared LimitPool
init_args: tuple
init_kwargs: dict
on_demand_cleanup_timeout: confloat(ge=0)
on_demand_slot_max_wait: confloat(ge=0)
max_queued_tasks: Optional[int]
retry_config: Optional[Any]
# Private attributes
_load_balancer: Any = PrivateAttr()
_workers: List[Any] = PrivateAttr()
_stopped: bool = PrivateAttr()
_method_cache: Dict[str, Callable] = PrivateAttr()
_on_demand_workers: List[Any] = PrivateAttr()
_on_demand_lock: Any = PrivateAttr()
_on_demand_counter: int = PrivateAttr()
# Abstract methods
def _initialize_pool(self) -> None:
"""Create all workers (or prepare for on-demand)."""
...
def _create_worker(self, worker_index: int) -> Any:
"""Create a single worker with unique index."""
...
def _get_on_demand_limit(self) -> Optional[int]:
"""Get max concurrent on-demand workers."""
...
# Common behavior
def __getattr__(self, name: str) -> Callable:
"""Intercept method calls and dispatch to load-balanced worker."""
...
def get_pool_stats(self) -> dict:
"""Get pool statistics."""
...
def stop(self, timeout: float = 30) -> None:
"""Stop all workers in pool."""
...
Key Architecture Decisions:
- Client-Side Pool: Pool lives on client, manages remote workers (not a remote actor itself)
- Load Balancer: Selects worker index, tracks active/total calls per worker
- Per-Worker Queues: Each worker has independent submission semaphore
- Shared Limits: All workers share same LimitSet instances
- On-Demand Workers: Created per request, destroyed after completion
- Worker Indices: Sequential indices (0, 1, 2, ...) for round-robin in LimitPool
Load Balancing¶
Implemented via BaseLoadBalancer subclasses:
- RoundRobin: Distribute requests evenly in circular fashion
- LeastActiveLoad: Select worker with fewest active (in-flight) calls
- LeastTotalLoad: Select worker with fewest total (lifetime) calls
- Random: Random worker selection (best for on-demand)
Load Balancer Lifecycle:
def method_wrapper(*args, **kwargs):
# 1. Select worker
worker_idx = self._load_balancer.select_worker(len(self._workers))
# 2. Check if stopped
if self._stopped:
raise RuntimeError("Worker pool is stopped")
# 3. Record start
self._load_balancer.record_start(worker_idx)
# 4. Execute method (worker manages its own queue internally)
result = getattr(self._workers[worker_idx], name)(*args, **kwargs)
# 5. Wrap future to record completion
return self._wrap_future_with_tracking(result, worker_idx)
Future Wrapping for Load Balancer Tracking:
def _wrap_future_with_tracking(self, future, worker_idx):
def on_complete(f):
self._load_balancer.record_complete(worker_idx)
future.add_done_callback(on_complete)
return future
On-Demand Workers¶
Lifecycle:
1. Creation: New worker created per request
2. Execution: Single method call
3. Cleanup: Worker stopped after result available
4. Tracking: Stored in _on_demand_workers list during execution
Concurrency Limits:
- Thread: max(1, cpu_count() - 1)
- Process: max(1, cpu_count() - 1)
- Ray: Unlimited (cluster manages resources)
Cleanup Strategy:
def _wrap_future_with_cleanup(self, future, worker):
def cleanup_callback(f):
# Schedule cleanup in separate thread to avoid deadlock
def deferred_cleanup():
worker.stop(timeout=self.on_demand_cleanup_timeout)
threading.Thread(target=deferred_cleanup, daemon=True).start()
future.add_done_callback(cleanup_callback)
return future
Critical: Cleanup must happen in separate thread to avoid deadlock. Calling worker.stop() from within a callback can cause deadlocks because stop() may try to cancel futures that are invoking this callback.
WorkerBuilder¶
WorkerBuilder is the factory that creates workers or pools based on configuration:
class WorkerBuilder(Typed):
# Public configuration
worker_cls: Type["Worker"]
mode: ExecutionMode
blocking: bool
max_workers: Optional[int]
load_balancing: Optional[LoadBalancingAlgorithm]
on_demand: bool
max_queued_tasks: Optional[int]
num_retries: int
retry_on: Optional[Any]
retry_algorithm: RetryAlgorithm
retry_wait: float
retry_jitter: float
retry_until: Optional[Any]
options: dict
def init(self, *args, **kwargs) -> Union[WorkerProxy, WorkerProxyPool]:
if self._should_create_pool():
return self._create_pool(args, kwargs)
else:
return self._create_single_worker(args, kwargs)
Responsibilities: 1. Validate configuration (max_workers, on_demand compatibility) 2. Apply defaults from global config 3. Process limits parameter (create LimitPool) 4. Create retry config if num_retries > 0 5. Check Ray + Pydantic incompatibility 6. Decide single worker vs pool 7. Instantiate appropriate proxy/pool class
Limits Processing:
def _transform_worker_limits(limits, mode, is_pool, worker_index):
if limits is None:
return empty LimitPool
if isinstance(limits, LimitPool):
return limits
if isinstance(limits, list) and all isinstance(Limit):
return LimitPool([LimitSet(limits, shared=is_pool, mode)])
if isinstance(limits, list) and all isinstance(BaseLimitSet):
return LimitPool(limits) # Multi-region limits
if isinstance(limits, BaseLimitSet):
if not limits.shared and is_pool:
raise ValueError("Pool requires shared=True")
return LimitPool([limits])
Worker Wrapping:
def _create_worker_wrapper(worker_cls, limits, retry_config, for_ray=False):
class WorkerWithLimitsAndRetry(worker_cls):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set limits (create LimitPool from list if needed)
if isinstance(limits, list):
limit_set = LimitSet(limits, shared=False, mode=Sync)
limit_pool = LimitPool([limit_set])
else:
limit_pool = limits
object.__setattr__(self, "limits", limit_pool)
def __getattribute__(self, name):
attr = super().__getattribute__(name)
if for_ray:
# Ray: Pre-wrap methods at class level
return attr
if has_retry and not name.startswith("_") and callable(attr):
return create_retry_wrapper(attr, retry_config)
return attr
if for_ray and has_retry:
# Pre-wrap all methods at class level for Ray
for method_name in dir(worker_cls):
if not method_name.startswith("_"):
method = getattr(worker_cls, method_name)
if callable(method):
wrapped = create_retry_wrapper(method, retry_config)
setattr(WorkerWithLimitsAndRetry, method_name, wrapped)
return WorkerWithLimitsAndRetry
Execution Modes¶
| Mode | Worker Proxy | Pool Support | Concurrency | Serialization | Best For |
|---|---|---|---|---|---|
sync |
SyncWorkerProxy | No | None | None | Testing, debugging |
thread |
ThreadWorkerProxy | Yes | Thread-level | None (shared memory) | I/O-bound tasks |
process |
ProcessWorkerProxy | Yes | Process-level | cloudpickle | CPU-bound tasks |
asyncio |
AsyncioWorkerProxy | No | Event loop | None (shared memory) | Async I/O (HTTP, DB) |
ray |
RayWorkerProxy | Yes | Distributed | Ray serialization | Distributed computing |
Default max_workers (pools): - Sync: 1 (fixed) - Asyncio: 1 (fixed) - Thread: 24 - Process: 4 - Ray: 0 (unlimited on-demand)
Default load_balancing:
- Persistent pools: round_robin
- On-demand pools: random
Default max_queued_tasks (submission queue): - Sync: None (bypassed) - Asyncio: None (bypassed) - Thread: 100 - Process: 5 - Ray: 2
Worker Lifecycle¶
Initialization¶
User calls Worker.options(mode, ...).init(args, kwargs)
↓
WorkerBuilder created with configuration
↓
WorkerBuilder.init() called
↓
├─ If max_workers=1 or None: _create_single_worker()
│ ↓
│ 1. Select appropriate WorkerProxy class
│ 2. Process limits → LimitPool
│ 3. Create retry config
│ 4. Instantiate proxy
│ 5. proxy.post_initialize() called
│ 6. Worker wrapper created (_create_worker_wrapper)
│ 7. Worker instance created with user args/kwargs
│ 8. Limits and retry logic injected
│ 9. Return proxy
│
└─ If max_workers>1 or on_demand: _create_pool()
↓
1. Select appropriate WorkerProxyPool class
2. Process limits → shared LimitPool
3. Create retry config
4. Instantiate pool
5. pool.post_initialize() called
6. Load balancer created
7. For persistent pools: _initialize_pool()
└─ Create N workers with sequential indices
8. Return pool
Method Execution (Single Worker)¶
user calls worker.method(args, kwargs)
↓
WorkerProxy.__getattr__("method") intercepts
↓
Check _method_cache for cached wrapper
↓
If not cached, create method_wrapper:
↓
1. Check if stopped
2. Acquire submission semaphore (if configured)
3. Check stopped again (atomic with semaphore)
4. Call _execute_method(name, args, kwargs)
5. Wrap future to release semaphore on completion
6. Return future (or result if blocking=True)
↓
Cache wrapper in _method_cache
↓
Return wrapper to user
↓
User calls wrapper(args) → future returned
↓
User calls future.result() → blocks until complete
Method Execution (Pool) - Non-Blocking Dispatch¶
Critical: Pool dispatch is ALWAYS non-blocking to the user.
user calls pool.method(args, kwargs) [NON-BLOCKING]
↓
WorkerProxyPool.__getattr__("method") intercepts [INSTANT]
↓
Check _method_cache for cached wrapper [O(1)]
↓
If not cached, create method_wrapper:
↓
├─ If on_demand:
│ 1. Wait for on-demand slot (if limit reached)
│ - Blocks internally but returns future to user immediately
│ 2. Check if stopped
│ 3. Increment counter, get worker_index [O(1), instant]
│ 4. Create worker with _create_worker(worker_index)
│ 5. Track in _on_demand_workers
│ 6. Call worker.method(args, kwargs) [Returns future instantly]
│ 7. Wrap future for cleanup after completion
│ 8. Return future (or result if blocking=True) [USER NEVER BLOCKS]
│
└─ If persistent:
1. Check workers exist and not stopped [O(1), instant]
2. Select worker: idx = load_balancer.select_worker(N) [O(1) or O(N), instant]
3. Check if stopped [O(1), instant]
4. Record start: load_balancer.record_start(idx) [O(1), instant]
5. Call worker.method(args, kwargs) [Returns future instantly]
- USER RECEIVES FUTURE IMMEDIATELY (non-blocking!)
- Worker manages its own submission queue internally
- Worker's internal semaphore may block proxy→backend flow
- User is never aware of internal blocking
6. Wrap future to record completion in load balancer [Instant]
7. Return future (or result if blocking=True) [USER NEVER BLOCKS]
Total user-facing time: ~1-10μs (microseconds)
User code never blocks on submission regardless of queue state!
Key Points: - Load balancer selection is O(1) (round-robin) or O(N) where N=max_workers (typically <100) - Worker method call returns future immediately - Worker's internal queue manages backpressure transparently - Pool dispatch layer has NO semaphores or blocking - All blocking happens inside WorkerProxy, hidden from user
Shutdown¶
user calls worker.stop() or pool.stop()
↓
Set _stopped = True
↓
Cancel all pending futures
↓
├─ Single Worker:
│ └─ Mode-specific cleanup:
│ - Sync: No-op
│ - Thread: Put None in queue, join thread
│ - Process: Put None in queue, join process + result thread
│ - Asyncio: Stop sync thread, stop event loop
│ - Ray: ray.kill(actor)
│
└─ Pool:
1. Stop all persistent workers
2. Stop all on-demand workers
3. Clear worker lists
4. (Workers handle their own cleanup)
Context Manager¶
with Worker.options(mode="thread").init() as worker:
result = worker.method().result()
# worker.stop() called automatically
Pool Architecture¶
Persistent Pool¶
Client Process
│
├─ WorkerProxyPool (client-side)
│ ├─ LoadBalancer (round-robin, least-active, etc.)
│ ├─ Shared LimitPool (all workers share)
│ ├─ Worker 0 (WorkerProxy, index=0)
│ │ ├─ Submission Semaphore (max_queued_tasks)
│ │ ├─ LimitPool (copy of shared, offset by index=0)
│ │ └─ Worker instance (with limits, retry)
│ ├─ Worker 1 (WorkerProxy, index=1)
│ │ ├─ Submission Semaphore
│ │ ├─ LimitPool (copy of shared, offset by index=1)
│ │ └─ Worker instance
│ └─ Worker N-1 (WorkerProxy, index=N-1)
│ ├─ Submission Semaphore
│ ├─ LimitPool (copy of shared, offset by index=N-1)
│ └─ Worker instance
│
└─ Method calls dispatched via LoadBalancer
Key Characteristics: - All workers created at initialization - Load balancer distributes calls - Each worker has own submission semaphore - All workers share same LimitSet instances (via LimitPool) - Worker indices used for round-robin in LimitPool
On-Demand Pool¶
Client Process
│
├─ WorkerProxyPool (client-side)
│ ├─ LoadBalancer (typically Random)
│ ├─ Shared LimitPool (all workers share)
│ ├─ _on_demand_workers list (tracks active ephemeral workers)
│ ├─ _on_demand_lock (thread-safe access to list)
│ └─ _on_demand_counter (sequential indices)
│
├─ On method call:
│ 1. Wait for slot (if limit enforced)
│ 2. Create worker with unique index
│ 3. Add to _on_demand_workers
│ 4. Execute method
│ 5. Wrap future for cleanup
│
└─ On future completion:
1. Callback triggers
2. Schedule deferred cleanup (separate thread)
3. Call worker.stop()
4. Remove from _on_demand_workers
Key Characteristics:
- No persistent workers
- Workers created per request
- Workers destroyed after completion
- Cleanup happens in separate thread (avoid deadlock)
- Concurrency limited by _get_on_demand_limit()
- Random load balancing (default)
Critical Implementation Details¶
1. Typed Integration¶
All proxies and pools inherit from morphic.Typed:
- Public fields are immutable and validated
- Private attributes use PrivateAttr()
- post_initialize() called after validation
- object.__setattr__() used to set private attrs during initialization
class WorkerProxy(Typed, ABC):
worker_cls: Type[Worker] # Immutable public field
_stopped: bool = PrivateAttr(default=False) # Mutable private attr
def post_initialize(self) -> None:
# Use object.__setattr__ to bypass frozen model
object.__setattr__(self, "_stopped", False)
object.__setattr__(self, "_method_cache", {})
2. Mode as ClassVar¶
Each proxy sets mode: ClassVar[ExecutionMode] at class level:
This avoids passing mode as a parameter, reducing serialization size.
3. Submission Queue (max_queued_tasks) - Non-Blocking User Submissions¶
Critical Design Principle: User submissions are ALWAYS non-blocking.
The submission architecture has two distinct layers:
Layer 1: User Code → Worker Proxy (Always Non-Blocking, Unlimited)
- User calls worker.method(args) or pool.method(args)
- Returns future instantly without blocking
- No limit on number of submissions
- No semaphore at this layer
- All futures created immediately (< 1ms per submission)
Layer 2: Worker Proxy → Execution Backend (Controlled by max_queued_tasks)
- Worker proxy manages internal queue to backend (thread/process/ray)
- Semaphore limits in-flight tasks to backend
- Only max_queued_tasks tasks forwarded to backend at once
- Tasks wait in proxy's internal queue when backend is full
- User code never sees this blocking
Callback-Driven Forwarding Architecture¶
The implementation uses a callback-driven forwarding mechanism to achieve non-blocking submissions:
# In WorkerProxy.post_initialize():
self._submission_semaphore = threading.BoundedSemaphore(max_queued_tasks)
self._pending_submissions = queue.Queue() # Client-side pending queue
# In WorkerProxy.__getattr__().method_wrapper():
def method_wrapper(*args, **kwargs):
# Check if stopped
if self._stopped:
raise RuntimeError("Worker is stopped")
# Try to forward immediately (NON-BLOCKING check)
if self._submission_semaphore is None:
# No rate limiting - forward immediately
future = self._execute_method(name, *args, **kwargs)
elif self._submission_semaphore.acquire(blocking=False): # ← Non-blocking!
# Got semaphore - forward to backend now
future = self._execute_method(name, *args, **kwargs)
# Add callback to release semaphore and forward next pending
future = self._wrap_future_with_submission_tracking(future)
else:
# Semaphore unavailable - create shell future and queue for later
from concurrent.futures import Future as PyFuture
py_future = PyFuture()
future = ConcurrentFuture(future=py_future)
# Queue: (shell_future, py_future, method_name, args, kwargs)
self._pending_submissions.put((future, py_future, name, args, kwargs))
# Return future IMMEDIATELY (non-blocking!)
return future if not self.blocking else future.result()
# Callback-driven forwarding chain:
def _wrap_future_with_submission_tracking(self, future):
"""When a task completes, automatically forward next pending task."""
def on_submission_complete(f):
# Release semaphore first
self._submission_semaphore.release()
# Try to forward next pending submission (if any)
try:
shell_future, py_future, method_name, args, kwargs = \
self._pending_submissions.get_nowait()
# Try to acquire semaphore (should succeed since we just released)
if self._submission_semaphore.acquire(blocking=False):
# Forward to backend
backend_future = self._execute_method(method_name, *args, **kwargs)
# Chain backend future to shell future
def chain_result(bf):
try:
result = bf.result()
py_future.set_result(result)
except Exception as e:
py_future.set_exception(e)
backend_future.add_done_callback(chain_result)
# Recursively track this forwarded submission
backend_future.add_done_callback(on_submission_complete)
except queue.Empty:
# No pending submissions - done
pass
future.add_done_callback(on_submission_complete)
return future
Key Innovation: No blocking on semaphore during submission. Instead:
1. Try non-blocking acquire: semaphore.acquire(blocking=False)
2. If successful: forward immediately
3. If fails: create "shell" future, queue internally
4. On completion callback: automatically forward next queued task
Advantages: - Zero blocking in user code (all futures returned instantly) - No dedicated threads (callbacks drive forwarding) - Automatic backpressure (semaphore limits backend load) - Self-perpetuating chain (each completion triggers next forward) - Minimal overhead (< 1μs per submission when queue not full) - Memory efficient (shell futures are lightweight until backed)
Design Rationale¶
Why callback-driven instead of background threads?
We considered several alternatives:
Alternative 1: Blocking Semaphore (Original Implementation)
# Simple but BLOCKS user code
self._submission_semaphore.acquire() # ← User code blocks here!
future = self._execute_method(name, *args, **kwargs)
return future
max_queued_tasks=10 would block 990 times
Alternative 2: Dedicated Forwarding Thread Pool
# Add thread pool to each worker to forward submissions
self._forwarding_pool = ThreadPoolExecutor(max_workers=1)
future = self._forwarding_pool.submit(
self._wait_for_semaphore_and_forward,
name, args, kwargs
)
Alternative 3: Async Queue with Event Loop
# Use asyncio queue for pending submissions
await self._pending_queue.put((name, args, kwargs))
# Separate async task forwards to backend
Chosen: Callback-Driven Forwarding ✅
# Non-blocking semaphore check
if self._submission_semaphore.acquire(blocking=False):
forward_immediately()
else:
queue_internally() # Returns shell future instantly
# On completion callback:
future.add_done_callback(forward_next_from_queue)
stop(), no thread coordination
Key Insight: We already have threads for task completion (thread pool workers, process result handlers, Ray monitors). Reusing their callbacks for forwarding is free!
Shutdown and Cleanup¶
Critical Issue: What happens to pending futures when stop() is called?
Solution: Cancel all futures in _pending_submissions queue:
def stop(self, timeout: float = 30) -> None:
self._stopped = True
# Cancel all pending futures that were never forwarded
if self._pending_submissions is not None:
while True:
try:
shell_future, py_future, method_name, args, kwargs = \
self._pending_submissions.get_nowait()
py_future.cancel() # Mark as cancelled
except queue.Empty:
break # All pending futures cancelled
Callback Guard: Prevent forwarding during shutdown:
def on_submission_complete(f):
# Release semaphore
self._submission_semaphore.release()
# ⚠️ CRITICAL: Don't forward if stopped
if self._stopped:
return # Exit early
# Forward next task from queue...
Why this matters: Without the guard, cancelled futures could trigger callbacks that try to forward more tasks to a stopped worker, causing: - Deadlocks (trying to acquire locks on stopped backend) - Race conditions (accessing worker state during teardown) - Timeouts (tasks hang waiting for stopped backend)
Lifecycle:
1. User calls stop() → Sets self._stopped = True
2. Cancel all pending futures in _pending_submissions → Users see CancelledError
3. Callback guard prevents forwarding new tasks → No new submissions reach backend
4. Existing in-flight tasks complete or timeout → Clean shutdown
5. Backend cleaned up (threads joined, processes terminated, ray actors killed)
Two-Queue Architecture¶
Why two queues?
- _pending_submissions (client-side, in WorkerProxy): Rate-limits forwarding to backend
- _command_queue (backend-side, Thread/Process/Asyncio only): Holds tasks waiting for execution
Client Code WorkerProxy Execution Backend
│ │ │
│ worker.method(1) │ │
├──────────────────────────────>│ │
│ <return future instantly> │ │
│ │ Forward task 1 │
│ ├─────────────────────────────>│
│ worker.method(2) │ (semaphore: 1/10 slots) │
├──────────────────────────────>│ │
│ <return future instantly> │ │
│ │ Forward task 2 │
│ ├─────────────────────────────>│
│ ... submit 1000 more │ (semaphore: 2/10 slots) │
│ (ALL return instantly!) │ │
│ │ Queue task 11-1000 │
│ │ (semaphore full) │
│ │ │
│ │ <task 1 completes> │
│ │<─────────────────────────────│
│ │ Release semaphore (1/10) │
│ │ Forward task 11 from queue │
│ ├─────────────────────────────>│
│ │ (semaphore: 2/10 slots) │
Purpose: Controls how many tasks can be in-flight from worker proxy to underlying execution (thread/process/ray). User submissions are always non-blocking. The proxy manages its internal queue to the underlying execution context. This prevents overloading the execution backend, especially for Ray actors, while keeping user submissions instant.
Example Flows¶
Single Worker Flow:
User submits 1000 tasks (instant, non-blocking):
├─ Task 1-10: Immediately forwarded to backend (semaphore allows)
│ └─ Future returned instantly
├─ Task 11-1000: Queued in _pending_submissions (semaphore full)
│ └─ "Shell" futures returned instantly (not yet backed by execution)
└─ As tasks complete:
└─ Callback chain forwards tasks 11-1000 from queue to backend
User has all 1000 futures immediately and never blocks!
Submission time: ~0.001s (instant)
Execution time: depends on task duration
Worker Pool Behavior:
Pool with 5 workers, max_queued_tasks=10 each:
├─ User submits 1000 tasks (instant, non-blocking pool dispatch)
├─ Load balancer distributes to 5 workers (instant, O(1))
│ └─ ~200 tasks per worker
├─ Each worker:
│ ├─ Forwards 10 tasks to its backend (semaphore allows)
│ └─ Queues remaining ~190 in _pending_submissions
└─ Total: 50 tasks in-flight to backends, 950 queued in proxies
Total capacity: 5 workers × 10 queue = 50 in-flight to backends
User submission time: ~0.002s for 1000 tasks (instant!)
Key Invariant:
max_queued_tasks controls the depth of the Worker Proxy → Execution Backend queue, not the User → Worker Proxy submission. The user-facing API is always non-blocking regardless of this setting.
Bypassed for:
- Sync mode: Immediate execution, no queue needed
- Asyncio mode: Event loop handles concurrency, no queue needed
- Blocking mode: Sequential execution, returns results directly
- On-demand workers: Ephemeral workers, pool already limits concurrency
Performance: - Future creation: < 1 μs per future (instant) - Callback overhead: ~5-10 μs per task completion - Total submission time: O(n) where n = number of tasks, ~1 μs per task - Example: 10,000 tasks submitted in ~10ms (0.01s)
4. Future Unwrapping¶
Automatically unwrap BaseFuture arguments before execution (unless unwrap_futures=False):
def _unwrap_futures_in_args(args, kwargs, unwrap_futures):
if not unwrap_futures:
return args, kwargs
# Fast-path: check if any futures or collections present
has_future_or_collection = ...
if not has_future_or_collection:
return args, kwargs # No unwrapping needed
# Recursively unwrap using morphic.map_collection
unwrapped_args = tuple(
map_collection(arg, _unwrap_future_value, recurse=True)
for arg in args
)
...
Ray Zero-Copy Optimization:
def _unwrap_future_for_ray(obj):
if isinstance(obj, RayFuture):
return obj._object_ref # Zero-copy!
elif isinstance(obj, BaseFuture):
return obj.result() # Materialize
return obj
5. Worker Wrapper Creation¶
_create_worker_wrapper() injects limits and retry logic:
def _create_worker_wrapper(worker_cls, limits, retry_config, for_ray=False):
if not retry_config or retry_config.num_retries == 0:
# Only limits, no retry
class WorkerWithLimits(worker_cls):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Create LimitPool from list if needed
if isinstance(limits, list):
limit_set = LimitSet(limits, shared=False, mode=Sync)
limit_pool = LimitPool([limit_set])
else:
limit_pool = limits
# Use object.__setattr__ to bypass frozen Pydantic models
object.__setattr__(self, "limits", limit_pool)
return WorkerWithLimits
# With retry logic
class WorkerWithLimitsAndRetry(worker_cls):
def __init__(self, *args, **kwargs):
# Same as above for limits
...
def __getattribute__(self, name):
attr = super().__getattribute__(name)
if not for_ray and not name.startswith("_") and callable(attr):
# Wrap method with retry logic
return create_retry_wrapper(attr, retry_config, ...)
return attr
if for_ray:
# Pre-wrap methods at class level (Ray bypasses __getattribute__)
for method_name in dir(worker_cls):
if not method_name.startswith("_"):
method = getattr(worker_cls, method_name)
if callable(method):
wrapped = create_retry_wrapper(method, retry_config, ...)
setattr(WorkerWithLimitsAndRetry, method_name, wrapped)
return WorkerWithLimitsAndRetry
Key Points:
- Always sets self.limits (even if empty LimitPool)
- Uses object.__setattr__() to support frozen Pydantic models
- For Ray: Pre-wraps methods at class level (bypasses __getattribute__)
- For other modes: Wraps methods dynamically via __getattribute__
6. Load Balancing State¶
Load balancer tracks per-worker statistics:
class LeastActiveLoadBalancer(BaseLoadBalancer):
_active_calls: Dict[int, int] # worker_id -> active count
_total_dispatched: int
_lock: threading.Lock
def select_worker(self, num_workers):
with self._lock:
# Find worker with minimum active calls
min_active = min(self._active_calls.values())
for i in range(num_workers):
if self._active_calls[i] == min_active:
return i
def record_start(self, worker_id):
with self._lock:
self._active_calls[worker_id] += 1
self._total_dispatched += 1
def record_complete(self, worker_id):
with self._lock:
self._active_calls[worker_id] -= 1
7. Shared Limits Across Pool¶
All workers in pool share same LimitSet instances:
# In WorkerBuilder._create_pool():
limits = _transform_worker_limits(
limits=self.options.get("limits"),
mode=execution_mode,
is_pool=True, # Creates shared LimitSet
worker_index=0 # Placeholder
)
# In WorkerProxyPool._create_worker():
worker_limits = _transform_worker_limits(
limits=self.limits, # Shared LimitSet
mode=self.mode,
is_pool=False,
worker_index=i # Unique index for round-robin
)
# Each worker gets a LimitPool with:
# - Same LimitSet instances (shared state)
# - Unique worker_index (for round-robin offset)
Adding New Worker Types¶
To add a new execution mode (e.g., Dask, Celery):
1. Create WorkerProxy Subclass¶
class DaskWorkerProxy(WorkerProxy):
# Set mode at class level
mode: ClassVar[ExecutionMode] = ExecutionMode.Dask
# Add mode-specific config fields
dask_scheduler: str
# Add private attributes
_dask_future: Any = PrivateAttr()
_client: Any = PrivateAttr()
def post_initialize(self) -> None:
super().post_initialize()
# Initialize Dask client
import dask.distributed
self._client = dask.distributed.Client(self.dask_scheduler)
# Create worker wrapper
worker_cls = _create_worker_wrapper(
self.worker_cls,
self.limits,
self.retry_config
)
# Submit worker to Dask cluster
self._dask_future = self._client.submit(worker_cls, ...)
def _execute_method(self, method_name, *args, **kwargs):
# Submit method call to Dask worker
dask_future = self._client.submit(
lambda w: getattr(w, method_name)(*args, **kwargs),
self._dask_future
)
return DaskFuture(dask_future=dask_future)
def stop(self, timeout=30):
super().stop(timeout)
self._client.close()
2. Create Future Subclass¶
class DaskFuture(BaseFuture):
__slots__ = ("_dask_future",)
FUTURE_UUID_PREFIX = "dask-"
def __init__(self, dask_future):
super().__init__()
self._dask_future = dask_future
def result(self, timeout=None):
return self._dask_future.result(timeout=timeout)
def done(self):
return self._dask_future.done()
def cancel(self):
return self._dask_future.cancel()
# ... implement other BaseFuture methods
3. Create Pool Subclass (if supported)¶
class DaskWorkerProxyPool(WorkerProxyPool):
dask_scheduler: str
def _initialize_pool(self):
for i in range(self.max_workers):
worker = self._create_worker(worker_index=i)
self._workers.append(worker)
def _create_worker(self, worker_index=0):
# Process limits with worker_index
worker_limits = _transform_worker_limits(
limits=self.limits,
mode=self.mode,
is_pool=False,
worker_index=worker_index
)
return DaskWorkerProxy(
worker_cls=self.worker_cls,
dask_scheduler=self.dask_scheduler,
limits=worker_limits,
...
)
def _get_on_demand_limit(self):
return None # Dask manages resources
4. Update ExecutionMode Enum¶
class ExecutionMode(AutoEnum):
Sync = alias("sync")
Threads = alias("thread", "threads")
Processes = alias("process", "processes")
Asyncio = alias("asyncio", "async")
Ray = alias("ray")
Dask = alias("dask") # New!
5. Update WorkerBuilder¶
# In WorkerBuilder._create_single_worker():
elif execution_mode == ExecutionMode.Dask:
from .dask_worker import DaskWorkerProxy
proxy_cls = DaskWorkerProxy
# In WorkerBuilder._create_pool():
elif execution_mode == ExecutionMode.Dask:
pool_cls = DaskWorkerProxyPool
6. Update wrap_future()¶
def wrap_future(future):
# ... existing checks ...
elif hasattr(future, "_dask_future"):
return future # Already a DaskFuture
# ... try to import Dask and check type ...
7. Add Configuration Defaults¶
class GlobalDefaults(Typed):
# ... existing fields ...
class Dask(Typed):
blocking: bool = False
max_workers: int = 8
load_balancing: LoadBalancingAlgorithm = LoadBalancingAlgorithm.RoundRobin
max_queued_tasks: Optional[int] = 10
# ... other Dask-specific defaults ...
dask: Dask = Dask()
8. Add Tests¶
class TestDaskWorker:
def test_basic_execution(self):
worker = SimpleWorker.options(mode="dask").init()
result = worker.compute(5).result()
assert result == expected
worker.stop()
def test_dask_pool(self):
pool = SimpleWorker.options(
mode="dask",
max_workers=4
).init()
results = [pool.compute(i).result() for i in range(10)]
assert len(results) == 10
pool.stop()
Enhanced Worker Decorator and Auto-Initialization¶
Overview¶
The @worker decorator and __init_subclass__ mechanism provide three ways to configure workers with pre-defined options:
- Decorator Configuration:
@worker(mode='thread', max_workers=4) - Inheritance Configuration:
class LLM(Worker, mode='thread', max_workers=4) - Auto-Initialization:
auto_init=Trueenables direct class instantiation to create workers
This feature makes worker creation more ergonomic while maintaining backward compatibility.
Design Goals¶
- Ergonomic API: Enable
llm = LLM(model='gpt-4')to create workers directly - Backward Compatible:
@workerwithout parameters still works - Flexible Configuration: Support decorator, inheritance, and
.options()override - Clear Precedence: Explicit
.options()> Decorator > Inheritance >global_config - No Recursion: Prevent infinite loops when
auto_init=True
Implementation Architecture¶
Three Configuration Sources¶
1. Decorator Configuration (_worker_decorator_config)
Set by @worker(...) decorator:
@worker(mode='thread', max_workers=4, auto_init=True)
class LLM:
pass
# Stored as: LLM._worker_decorator_config = {'mode': 'thread', 'max_workers': 4, 'auto_init': True}
2. Inheritance Configuration (_worker_inheritance_config)
Set by __init_subclass__ when subclassing Worker:
class LLM(Worker, mode='thread', max_workers=4, auto_init=True):
pass
# Stored as: LLM._worker_inheritance_config = {'mode': 'thread', 'max_workers': 4, 'auto_init': True}
3. Explicit Configuration (.options())
Highest priority, overrides both decorator and inheritance:
llm = LLM.options(mode='process', max_workers=8).init(...)
# mode='process', max_workers=8 (overrides decorator/inheritance)
Configuration Merging in Worker.options()¶
The Worker.options() method merges all three sources:
@classmethod
def options(cls, *, mode=_NO_ARG, ...):
merged_params = {}
# 1. Start with inheritance config (lowest priority)
if hasattr(cls, '_worker_inheritance_config'):
merged_params.update(cls._worker_inheritance_config)
# 2. Override with decorator config (medium priority)
if hasattr(cls, '_worker_decorator_config'):
merged_params.update(cls._worker_decorator_config)
# 3. Override with explicit parameters (highest priority)
if mode is not _NO_ARG:
merged_params['mode'] = mode
# ... (other parameters)
# 4. Apply global_config defaults for missing values
# ...
return WorkerBuilder(worker_cls=cls, ...)
Precedence (highest to lowest):
1. Explicit .options() parameters
2. @worker decorator parameters
3. class Worker(...) inheritance parameters
4. global_config defaults
Auto-Initialization via Worker.__new__¶
When auto_init=True, direct class instantiation creates a worker:
def __new__(cls, *args, **kwargs):
# 1. Check for _from_proxy flag (prevents recursion)
in_proxy_creation = kwargs.pop('_from_proxy', False)
if in_proxy_creation:
# Proxy is creating the actual worker instance
return super().__new__(cls)
# 2. Merge decorator and inheritance configs
merged_config = {}
if hasattr(cls, '_worker_inheritance_config'):
merged_config.update(cls._worker_inheritance_config)
if hasattr(cls, '_worker_decorator_config'):
merged_config.update(cls._worker_decorator_config)
# 3. Check if auto_init is enabled
should_auto_init = merged_config.get('auto_init', False)
if should_auto_init:
# 4. Create worker via .options().init()
options_params = {k: v for k, v in merged_config.items() if k != 'auto_init'}
builder = cls.options(**options_params)
return builder.init(*args, **kwargs)
# 5. Normal instantiation (plain Python instance)
return super().__new__(cls)
Key Points:
- _from_proxy=True flag prevents infinite recursion
- auto_init defaults to True if any config is provided
- .options().init() is called automatically when auto_init=True
Recursion Prevention with _from_proxy Flag¶
The Problem: Without recursion prevention, this would loop infinitely:
@worker(mode='thread', auto_init=True)
class LLM:
pass
llm = LLM(...) # Calls Worker.__new__
# → sees auto_init=True
# → calls cls.options().init(...)
# → WorkerBuilder creates proxy
# → Proxy calls worker_cls(...) to create worker instance
# → Calls Worker.__new__ again
# → sees auto_init=True again
# → INFINITE RECURSION!
The Solution: _from_proxy=True flag breaks the cycle:
# In all proxy classes (SyncWorkerProxy, ThreadWorkerProxy, etc.):
def post_initialize(self):
worker_cls = _create_worker_wrapper(self.worker_cls, ...)
# CRITICAL: Pass _from_proxy=True to bypass auto_init
init_kwargs = dict(self.init_kwargs)
init_kwargs['_from_proxy'] = True # ← Prevents recursion
self._worker = worker_cls(*self.init_args, **init_kwargs)
When _from_proxy=True is present, Worker.__new__ skips the auto_init logic and creates a plain instance directly.
Flow with _from_proxy:
1. User: llm = LLM(model='gpt-4')
2. Worker.__new__: sees auto_init=True, calls .options().init(model='gpt-4')
3. WorkerBuilder: creates SyncWorkerProxy(init_kwargs={'model': 'gpt-4'})
4. SyncWorkerProxy.post_initialize(): adds _from_proxy=True to init_kwargs
5. Proxy: calls worker_cls(model='gpt-4', _from_proxy=True)
6. Worker.__new__: sees _from_proxy=True, skips auto_init, returns plain instance ✅
7. No recursion!
Composition Wrapper Compatibility¶
The composition wrapper (for Typed/BaseModel workers) must NOT inherit auto_init:
def _create_composition_wrapper(worker_cls):
class CompositionWrapper(Worker):
def __init__(self, *args, **kwargs):
# Remove _from_proxy before creating wrapped instance
kwargs.pop('_from_proxy', None)
self._wrapped_instance = worker_cls(*args, _from_proxy=True, **kwargs)
# CRITICAL: Remove auto_init from config to prevent recursion
if hasattr(worker_cls, '_worker_inheritance_config'):
config = worker_cls._worker_inheritance_config.copy()
config.pop('auto_init', None) # ← Remove auto_init
if len(config) > 0:
CompositionWrapper._worker_inheritance_config = config
if hasattr(worker_cls, '_worker_decorator_config'):
config = worker_cls._worker_decorator_config.copy()
config.pop('auto_init', None) # ← Remove auto_init
if len(config) > 0:
CompositionWrapper._worker_decorator_config = config
return CompositionWrapper
Why: The composition wrapper is an internal implementation detail. It should never auto-initialize itself when instantiated by the proxy.
Worker Wrapper Compatibility¶
Similarly, worker wrappers (for limits/retries) must NOT inherit auto_init:
def _create_worker_wrapper(worker_cls, limits, retry_configs):
if limits is not None:
class WorkerWithLimits(worker_cls):
def __init__(self, *args, **kwargs):
kwargs.pop('_from_proxy', None)
super().__init__(*args, **kwargs)
# ... limits logic
# CRITICAL: Remove auto_init from inherited config
if hasattr(WorkerWithLimits, '_worker_inheritance_config'):
config = WorkerWithLimits._worker_inheritance_config.copy()
config.pop('auto_init', None)
if len(config) > 0:
WorkerWithLimits._worker_inheritance_config = config
if hasattr(WorkerWithLimits, '_worker_decorator_config'):
config = WorkerWithLimits._worker_decorator_config.copy()
config.pop('auto_init', None)
if len(config) > 0:
WorkerWithLimits._worker_decorator_config = config
return WorkerWithLimits
return worker_cls
Why: Worker wrappers are internal classes created by the proxy. They should never trigger auto_init when instantiated.
Usage Patterns¶
Pattern 1: Decorator with Auto-Init¶
@worker(mode='thread', max_workers=4, auto_init=True)
class LLM:
def __init__(self, model_name: str):
self.model_name = model_name
def call_llm(self, prompt: str) -> str:
return f"Response from {self.model_name}"
# Direct instantiation creates worker
llm = LLM(model_name='gpt-4')
future = llm.call_llm("Hello")
result = future.result()
llm.stop()
Pattern 2: Inheritance with Auto-Init¶
class LLM(Worker, mode='thread', max_workers=4, auto_init=True):
def __init__(self, model_name: str):
self.model_name = model_name
# Direct instantiation creates worker
llm = LLM(model_name='gpt-4')
llm.stop()
Pattern 3: Decorator Without Auto-Init (Backward Compatible)¶
@worker
class LLM:
pass
# Must use .options().init() (backward compatible)
llm = LLM.options(mode='thread').init(...)
llm.stop()
Pattern 4: Override at Instantiation¶
@worker(mode='thread', max_workers=4, auto_init=True)
class LLM:
pass
# Use decorator defaults
llm1 = LLM(...) # mode='thread', max_workers=4
# Override decorator config
llm2 = LLM.options(mode='process', max_workers=8).init(...)
# mode='process', max_workers=8
llm1.stop()
llm2.stop()
Testing and Validation¶
Test Coverage:
- tests/core/worker/test_enhanced_worker_decorator.py (comprehensive test suite)
- Basic decorator functionality
- Inheritance configuration
- Auto-initialization behavior
- Configuration precedence
- Options override
- All execution modes
- Typed/BaseModel workers
- Context manager support
- Error cases
- Recursion prevention
Key Test Cases:
1. Decorator with auto_init=True creates workers directly
2. Decorator with auto_init=False creates plain instances
3. Inheritance with auto_init=True creates workers directly
4. .options() overrides decorator/inheritance config
5. Mixed decorator + inheritance (decorator wins, with warning)
6. No infinite recursion with auto_init=True
7. Composition wrapper doesn't trigger auto_init
8. Worker wrapper doesn't trigger auto_init
9. Works across all execution modes (sync, thread, process, asyncio, ray)
10. Works with Typed and BaseModel workers
Design Rationale¶
Why Three Configuration Sources?
- Decorator: Convenient for standalone classes
- Inheritance: Natural for class hierarchies
- .options(): Runtime flexibility and override capability
Why auto_init Defaults to True?
- If user provides any configuration, they likely want auto-initialization
- Makes the API more ergonomic (llm = LLM(...) vs llm = LLM.options().init(...))
- Can be explicitly disabled with auto_init=False
Why _from_proxy Flag?
- Simplest solution to prevent recursion
- Minimal performance overhead (one dict lookup)
- Clear intent (flag explicitly indicates proxy creation)
- Alternative approaches (checking call stack, thread-local state) are more complex
Why Remove auto_init from Wrappers?
- Wrappers are internal implementation details
- They should never auto-initialize themselves
- Prevents infinite recursion when proxy creates worker instance
- Keeps auto_init behavior at the user-facing layer only
Backward Compatibility¶
Fully Backward Compatible:
- @worker without parameters still works
- Existing .options().init() code unchanged
- No breaking changes to existing APIs
- New features are opt-in via auto_init=True
Migration Path:
# Old code (still works)
@worker
class LLM:
pass
llm = LLM.options(mode='thread').init(...)
# New code (more ergonomic)
@worker(mode='thread', auto_init=True)
class LLM:
pass
llm = LLM(...)
Limitations and Gotchas¶
1. Typed/BaseModel Workers and Infrastructure Methods¶
Note: This is NOT a limitation anymore! As of the Universal Composition Wrapper implementation, workers inheriting from morphic.Typed or pydantic.BaseModel work seamlessly across ALL execution modes including Ray.
What Changed:
- Before: Ray + Typed/BaseModel raised ValueError due to serialization conflicts
- After: Automatic composition wrapper enables Ray support with zero code changes
Example (now works in all modes):
class MyWorker(Worker, Typed):
name: str
value: int
def process(self, x: int) -> int:
return x * self.value
# ✅ Works in ALL modes (sync, thread, process, asyncio, ray)
worker = MyWorker.options(mode="ray").init(name="test", value=10)
result = worker.process(5).result() # Returns 50
See the Universal Composition Wrapper section for implementation details.
2. Async Functions in Non-Asyncio Modes¶
Limitation: Async functions work but don't provide concurrency benefits
Why: Other modes use asyncio.run() which blocks until completion
Example:
class APIWorker(Worker):
async def fetch(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
# ThreadWorker: Each fetch() blocks the thread
worker = APIWorker.options(mode="thread").init()
urls = [f"http://api.example.com/{i}" for i in range(10)]
futures = [worker.fetch(url) for url in urls] # Sequential, ~10 seconds
# AsyncioWorker: fetch() runs concurrently in event loop
worker = APIWorker.options(mode="asyncio").init()
futures = [worker.fetch(url) for url in urls] # Concurrent, ~1 second
Best Practice: Use mode="asyncio" for async I/O-bound tasks.
3. Submission Queue vs Resource Limits¶
Two separate mechanisms:
1. Submission Queue (max_queued_tasks): Client-side, limits pending futures
2. Resource Limits (limits): Worker-side, limits concurrent operations
Example:
# Submission queue: Max 10 futures in-flight
# Resource limit: Max 5 concurrent executions
worker = MyWorker.options(
mode="ray",
max_queued_tasks=10,
limits=[ResourceLimit(key="slots", capacity=5)]
).init()
# Submit 100 tasks:
futures = [worker.task(i) for i in range(100)]
# - First 10 submit immediately (submission queue)
# - Next 90 block on submission queue
# - Inside worker: Max 5 execute concurrently (resource limit)
4. On-Demand Workers and Limits¶
Issue: Each on-demand worker gets own LimitPool copy
Impact: Limits are NOT shared across on-demand workers
Example:
pool = MyWorker.options(
mode="thread",
on_demand=True,
limits=[ResourceLimit(key="connections", capacity=10)]
).init()
# Creates 5 workers, each with capacity=10 → 50 total connections!
Solution: Don't use limits with on-demand workers, or use persistent pool.
5. Method Caching and Callable Attributes¶
Issue: __getattr__ caches method wrappers by name
Problem: If worker class has callable attributes that change, cache becomes stale
Example:
class DynamicWorker(Worker):
def __init__(self):
self.processor = lambda x: x * 2
def update_processor(self, new_func):
self.processor = new_func
worker = DynamicWorker.options(mode="thread").init()
worker.processor(5) # Returns 10
worker.update_processor(lambda x: x * 3)
worker.processor(5) # Still returns 10! (cached wrapper)
Solution: Clear _method_cache when updating callable attributes, or use regular methods instead of callable attributes.
6. Exception Handling in Pools¶
Behavior: Exceptions don't stop the pool
Example:
pool = MyWorker.options(mode="thread", max_workers=4).init()
futures = [pool.task(i) for i in range(10)]
# If task(5) raises exception, other tasks continue
# Exception stored in futures[5], not propagated
try:
results = [f.result() for f in futures]
except Exception as e:
# Only raised when accessing futures[5].result()
...
Best Practice: Use gather(return_exceptions=True) to collect all results/exceptions.
7. Worker State and Pools¶
Limitation: Worker state is per-worker, not per-pool
Example:
class Counter(Worker):
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
pool = Counter.options(mode="thread", max_workers=4).init()
# Each worker has own count
results = [pool.increment().result() for _ in range(10)]
# Results: [1, 1, 1, 1, 2, 2, 2, 2, 3, 3] (depends on load balancing)
# NOT: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Solution: Use shared state mechanisms (Redis, database, etc.) or single worker.
8. Stop Timeout and Cleanup¶
Issue: stop() timeout is per-operation, not total
Example:
pool = MyWorker.options(mode="thread", max_workers=10).init()
pool.stop(timeout=5)
# May take up to 50 seconds! (5s × 10 workers)
Best Practice: Set appropriate timeout based on pool size.
9. Cloudpickle Serialization Quirks¶
Issue: Process and Ray workers serialize worker class
Limitations: - Local variables from outer scope captured by closures - Large dependencies increase serialization time - Some objects can't be pickled (open files, database connections)
Example:
# BAD: Captures entire DataFrame in closure
df = pd.DataFrame(...) # 1GB
class Processor(Worker):
def process(self, row_id):
return df.iloc[row_id] # Serializes entire df!
worker = Processor.options(mode="process").init()
Solution: Pass data as arguments, not via closures:
class Processor(Worker):
def __init__(self, df):
self.df = df
worker = Processor.options(mode="process").init(df)
10. Load Balancer State and Restarts¶
Issue: Load balancer state lost on pool restart
Example:
pool = MyWorker.options(
mode="thread",
max_workers=4,
load_balancing="least_total"
).init()
# After 1000 calls, load balanced across workers
stats = pool.get_pool_stats()
# {"load_balancer": {"total_calls": {0: 250, 1: 250, 2: 250, 3: 250}}}
pool.stop()
pool = MyWorker.options(...).init() # New pool
# Load balancer reset, starts from zero
Solution: Don't rely on load balancer state persisting across restarts.
This architecture document provides a comprehensive technical overview of the worker and worker pool system in Concurry. For implementation details, see the source code in src/concurry/core/worker/.