Architecture: Workers and Worker Pools¶
This document provides a comprehensive technical overview of the worker and worker pool architecture in Concurry.
Table of Contents¶
- Overview
- Core Abstractions
- Worker Base Class
- WorkerProxy Hierarchy
- WorkerProxyPool Hierarchy
- WorkerBuilder
- Execution Modes
- Worker Lifecycle
- Pool Architecture
- Critical Implementation Details
- Adding New Worker Types
- Limitations and Gotchas
Overview¶
Concurry implements a Worker/Proxy pattern where: - Worker: User-defined class with business logic (plain Python class) - WorkerProxy: Wraps Worker and manages execution context (thread, process, Ray actor, etc.) - WorkerProxyPool: Manages multiple WorkerProxy instances with load balancing
This separation allows the same Worker code to run in different execution contexts (sync, thread, process, asyncio, Ray) without modification.
Key Design Principles¶
- Worker classes are plain Python - No inheritance requirements beyond
Worker
base class - Proxy classes handle execution - All concurrency, serialization, and communication logic
- Typed validation - All proxies and pools inherit from
morphic.Typed
for configuration validation - No shared state between workers - Each worker maintains isolated state
- Unified Future API - All execution modes return BaseFuture subclasses
Core Abstractions¶
Worker Base Class¶
class Worker:
"""User-facing base class for all workers."""
@classmethod
def options(cls, mode, blocking, max_workers, ...) -> WorkerBuilder:
"""Configure worker execution options."""
...
def __init__(self, *args, **kwargs):
"""User-defined initialization - completely flexible signature."""
...
Key Characteristics:
- Does NOT inherit from morphic.Typed
(allows flexible __init__
signatures)
- Supports cooperative multiple inheritance with Typed
/BaseModel
(except Ray mode)
- Validation decorators (@validate
, @validate_call
) work with ALL modes including Ray
- User-defined workers are wrapped by _create_worker_wrapper()
to inject limits
and retry logic
Model Inheritance Support:
- ✅ Worker + morphic.Typed
: Full support for sync, thread, process, asyncio (NOT Ray)
- ✅ Worker + pydantic.BaseModel
: Full support for sync, thread, process, asyncio (NOT Ray)
- ✅ @morphic.validate
/ @pydantic.validate_call
: Works with ALL modes including Ray
- ❌ Ray + Typed/BaseModel: Incompatible due to Ray's actor wrapping conflicting with Pydantic's __setattr__
Ray + Pydantic Limitation:
Ray's ray.remote()
wraps classes as actors and modifies their __setattr__
behavior, which conflicts with Pydantic's frozen model implementation. Concurry automatically detects this and raises ValueError
with clear guidance to use composition instead of inheritance, or to use validation decorators.
WorkerProxy Hierarchy¶
All WorkerProxy classes inherit from WorkerProxy(Typed, ABC)
:
WorkerProxy (Typed, ABC)
├── SyncWorkerProxy # Direct execution in current thread
├── ThreadWorkerProxy # Thread + queue-based communication
├── ProcessWorkerProxy # Process + multiprocessing queues + cloudpickle
├── AsyncioWorkerProxy # Event loop + sync thread for mixed execution
└── RayWorkerProxy # Ray actor + ObjectRef futures
Common Interface:
class WorkerProxy(Typed, ABC):
# Public configuration (immutable after creation)
worker_cls: Type[Worker]
blocking: bool
unwrap_futures: bool
init_args: tuple
init_kwargs: dict
limits: Optional[Any] # LimitPool instance
retry_config: Optional[Any] # RetryConfig instance
max_queued_tasks: Optional[int]
mode: ClassVar[ExecutionMode] # Set by subclass
# Private attributes (mutable, not serialized)
_stopped: bool = PrivateAttr(default=False)
_options: dict = PrivateAttr(default_factory=dict)
_method_cache: dict = PrivateAttr(default_factory=dict)
_submission_semaphore: Optional[Any] = PrivateAttr(default=None)
# Abstract methods that subclasses must implement
def _execute_method(self, method_name: str, *args, **kwargs) -> BaseFuture:
"""Execute a worker method and return a future."""
...
# Common behavior
def __getattr__(self, name: str) -> Callable:
"""Intercept method calls and dispatch via _execute_method."""
...
def stop(self, timeout: float = 30) -> None:
"""Stop the worker and clean up resources."""
...
def __enter__(self) / __exit__(self):
"""Context manager support for automatic cleanup."""
...
Key Implementation Rules:
- Mode as ClassVar: Each proxy sets
mode: ClassVar[ExecutionMode]
at class level, NOT passed as parameter - Typed Configuration: All config fields are immutable public attributes validated by Pydantic
- Private Attributes: Use
PrivateAttr()
for mutable state, initialized inpost_initialize()
- Method Caching:
__getattr__
caches method wrappers in_method_cache
for performance - Submission Queue: Use
_submission_semaphore
(BoundedSemaphore) to limit in-flight tasks per worker - Future Unwrapping: Automatically unwrap BaseFuture arguments before execution (unless
unwrap_futures=False
)
SyncWorkerProxy¶
Execution Model: Direct execution in current thread
Future Type: SyncFuture
(caches result/exception at creation)
class SyncWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Sync
_worker: Any = PrivateAttr() # Worker instance stored directly
def _execute_method(self, method_name: str, *args, **kwargs) -> SyncFuture:
method = getattr(self._worker, method_name)
try:
result = _invoke_function(method, *args, **kwargs)
return SyncFuture(result_value=result)
except Exception as e:
return SyncFuture(exception_value=e)
Characteristics:
- No threads, no queues, no asynchronous communication
- Async functions executed via asyncio.run()
(blocks until complete)
- Zero overhead for simple testing and debugging
- Submission queue bypassed (max_queued_tasks ignored)
ThreadWorkerProxy¶
Execution Model: Dedicated worker thread + command queue
Future Type: ConcurrentFuture
(wraps concurrent.futures.Future
)
class ThreadWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Threads
command_queue_timeout: confloat(ge=0) # From global config
_command_queue: Any = PrivateAttr() # queue.Queue
_futures: Dict[str, Any] = PrivateAttr() # uuid -> ConcurrentFuture
_futures_lock: Any = PrivateAttr() # threading.Lock
_thread: Any = PrivateAttr() # threading.Thread
Architecture: 1. Main thread (client): Submits commands to queue, returns future immediately 2. Worker thread: Processes commands from queue, sets results on futures
Communication Flow:
Client Thread Worker Thread
│ │
│ 1. Create future │
│ 2. Store in _futures dict │
│ 3. Put (uuid, method, args) │
├───────────────────────────────>│
│ 4. Return future │ 5. Get command from queue
│ │ 6. Execute method
│ │ 7. Set result on future
│ │ 8. Remove from _futures dict
│ 9. future.result() blocks │
│ until worker sets result │
Characteristics:
- Async functions executed via asyncio.run()
in worker thread (no concurrency)
- Command queue timeout checked via queue.get(timeout=command_queue_timeout)
- Futures tracked in dict for cancellation on stop()
ProcessWorkerProxy¶
Execution Model: Separate process + multiprocessing queues + cloudpickle serialization
Future Type: ConcurrentFuture
(wraps concurrent.futures.Future
)
class ProcessWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Processes
mp_context: Literal["fork", "spawn", "forkserver"] = "fork"
result_queue_timeout: confloat(ge=0)
result_queue_cleanup_timeout: confloat(ge=0)
_command_queue: Any = PrivateAttr() # mp.Queue
_result_queue: Any = PrivateAttr() # mp.Queue
_futures: dict = PrivateAttr() # uuid -> PyFuture
_futures_lock: Any = PrivateAttr() # threading.Lock
_process: Any = PrivateAttr() # mp.Process
_result_thread: Any = PrivateAttr() # threading.Thread
Architecture:
1. Main process (client): Sends commands to _command_queue
2. Worker process: Executes commands, sends results to _result_queue
3. Result thread: Reads from _result_queue
, sets results on futures
Communication Flow:
Main Process Worker Process Result Thread
│ │ │
│ 1. Serialize args │ │
│ 2. Put command in queue │ │
├──────────────────────────>│ │
│ 3. Return future │ 4. Get command │
│ │ 5. Deserialize │
│ │ 6. Execute method │
│ │ 7. Serialize result │
│ │ 8. Put in result_queue
│ ├─────────────────────>│
│ │ │ 9. Get result
│ │ │ 10. Deserialize
│ │ │ 11. Set on future
│ 12. future.result() │ │
Characteristics:
- Worker class serialization: Uses cloudpickle.dumps()
to serialize worker class
- Async functions: Executed via asyncio.run()
in worker process (no concurrency)
- Exception preservation: Original exception types preserved across process boundary
- Separate result thread: Required because Queue.get()
from another process blocks
- Multiprocessing context: Supports fork, spawn, or forkserver
Critical Serialization Details: - Worker class is serialized ONCE at proxy creation - Limits passed as list of Limit objects (or LimitPool), recreated inside worker process - RetryConfig serialized and passed to worker process - Args/kwargs serialized per method call
AsyncioWorkerProxy¶
Execution Model: Event loop thread + dedicated sync thread for sync methods
Future Type: ConcurrentFuture
(wraps concurrent.futures.Future
)
class AsyncioWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Asyncio
loop_ready_timeout: confloat(ge=0)
thread_ready_timeout: confloat(ge=0)
sync_queue_timeout: confloat(ge=0)
_loop: Any = PrivateAttr(default=None) # asyncio.EventLoop
_worker: Any = PrivateAttr(default=None) # Worker instance
_loop_thread: Any = PrivateAttr() # threading.Thread (runs event loop)
_sync_thread: Any = PrivateAttr() # threading.Thread (runs sync methods)
_sync_queue: Any = PrivateAttr() # queue.Queue (for sync methods)
_futures: Dict[str, Any] = PrivateAttr() # uuid -> ConcurrentFuture
Architecture:
1. Event loop thread: Runs asyncio
event loop for async methods
2. Sync worker thread: Executes sync methods without blocking event loop
3. Main thread: Routes method calls to appropriate thread
Method Routing:
def _execute_method(self, method_name, *args, **kwargs):
method = getattr(self._worker, method_name)
is_async = asyncio.iscoroutinefunction(method)
if is_async:
# Route to event loop for concurrent execution
self._loop.call_soon_threadsafe(schedule_async_task)
else:
# Route to sync thread to avoid blocking event loop
self._sync_queue.put((future, method_name, args, kwargs))
Characteristics: - True async concurrency: Multiple async methods can run concurrently in event loop - Sync method isolation: Sync methods don't block event loop - Best for I/O-bound async: HTTP requests, database queries, WebSocket connections - 10-50x speedup: For concurrent I/O operations vs sequential execution - ~13% overhead: For sync methods vs ThreadWorker (minimal impact)
Performance Comparison (30 HTTP requests, 50ms latency each): - SyncWorker: 1.66s (sequential) - ThreadWorker: 1.66s (sequential) - ProcessWorker: 1.67s (sequential) - AsyncioWorker: 0.16s (concurrent) ✅ 10x faster!
RayWorkerProxy¶
Execution Model: Ray actor (distributed process) + ObjectRef futures
Future Type: RayFuture
(wraps Ray ObjectRef)
class RayWorkerProxy(WorkerProxy):
mode: ClassVar[ExecutionMode] = ExecutionMode.Ray
actor_options: Optional[Dict[str, Any]] = None # Ray resource options
_ray_actor: Any = PrivateAttr() # Ray actor handle
_futures: Dict[str, Any] = PrivateAttr() # uuid -> RayFuture
_futures_lock: Any = PrivateAttr() # threading.Lock
Architecture: 1. Client process: Holds actor handle, submits method calls 2. Ray actor: Separate process (possibly remote machine), executes methods 3. Ray cluster: Manages scheduling, data transfer, fault tolerance
Communication Flow:
Client Process Ray Actor Ray Cluster
│ │ │
│ 1. Get actor handle │ │
│ 2. actor.method.remote() │ │
├─────────────────────────────────────────────────>│
│ 3. Return ObjectRef │ │ 4. Schedule task
│ │<──────────────────────┤
│ │ 5. Execute method │
│ │ 6. Store result │
│ ├──────────────────────>│
│ 7. ray.get(ObjectRef) │ │ 8. Retrieve result
│<─────────────────────────────────────────────────┤
Characteristics:
- Zero-copy optimization: RayFuture → ObjectRef passed directly (no serialization)
- Cross-worker futures: Other BaseFuture types materialized before passing
- Native async support: Ray handles async methods automatically
- Resource allocation: Via actor_options={"num_cpus": 2, "num_gpus": 1, "resources": {...}}
- Distributed execution: Actor can run on any node in Ray cluster
- Fault tolerance: Ray handles actor failures and restarts
Future Unwrapping with Zero-Copy:
def _unwrap_future_for_ray(obj):
if isinstance(obj, RayFuture):
return obj._object_ref # Zero-copy: pass ObjectRef directly
elif isinstance(obj, BaseFuture):
return obj.result() # Cross-worker: materialize value
return obj
Retry Logic for Ray:
Ray actors bypass __getattribute__
, so retry logic must be pre-applied to methods at class level:
worker_cls_to_use = _create_worker_wrapper(
self.worker_cls,
self.limits,
self.retry_config,
for_ray=True # Pre-wrap methods at class level
)
WorkerProxyPool Hierarchy¶
All WorkerProxyPool classes inherit from WorkerProxyPool(Typed, ABC)
:
WorkerProxyPool (Typed, ABC)
├── InMemoryWorkerProxyPool # Sync, Thread, Asyncio workers
├── MultiprocessWorkerProxyPool # Process workers
└── RayWorkerProxyPool # Ray workers
Common Interface:
class WorkerProxyPool(Typed, ABC):
# Public configuration (immutable after creation)
worker_cls: Type[Worker]
mode: ExecutionMode
max_workers: int
load_balancing: LoadBalancingAlgorithm
on_demand: bool
blocking: bool
unwrap_futures: bool
limits: Optional[Any] # Shared LimitPool
init_args: tuple
init_kwargs: dict
on_demand_cleanup_timeout: confloat(ge=0)
on_demand_slot_max_wait: confloat(ge=0)
max_queued_tasks: Optional[int]
retry_config: Optional[Any]
# Private attributes
_load_balancer: Any = PrivateAttr()
_workers: List[Any] = PrivateAttr()
_stopped: bool = PrivateAttr()
_method_cache: Dict[str, Callable] = PrivateAttr()
_on_demand_workers: List[Any] = PrivateAttr()
_on_demand_lock: Any = PrivateAttr()
_on_demand_counter: int = PrivateAttr()
_worker_semaphores: List[Any] = PrivateAttr()
# Abstract methods
def _initialize_pool(self) -> None:
"""Create all workers (or prepare for on-demand)."""
...
def _create_worker(self, worker_index: int) -> Any:
"""Create a single worker with unique index."""
...
def _get_on_demand_limit(self) -> Optional[int]:
"""Get max concurrent on-demand workers."""
...
# Common behavior
def __getattr__(self, name: str) -> Callable:
"""Intercept method calls and dispatch to load-balanced worker."""
...
def get_pool_stats(self) -> dict:
"""Get pool statistics."""
...
def stop(self, timeout: float = 30) -> None:
"""Stop all workers in pool."""
...
Key Architecture Decisions:
- Client-Side Pool: Pool lives on client, manages remote workers (not a remote actor itself)
- Load Balancer: Selects worker index, tracks active/total calls per worker
- Per-Worker Queues: Each worker has independent submission semaphore
- Shared Limits: All workers share same LimitSet instances
- On-Demand Workers: Created per request, destroyed after completion
- Worker Indices: Sequential indices (0, 1, 2, ...) for round-robin in LimitPool
Load Balancing¶
Implemented via BaseLoadBalancer
subclasses:
- RoundRobin: Distribute requests evenly in circular fashion
- LeastActiveLoad: Select worker with fewest active (in-flight) calls
- LeastTotalLoad: Select worker with fewest total (lifetime) calls
- Random: Random worker selection (best for on-demand)
Load Balancer Lifecycle:
def method_wrapper(*args, **kwargs):
# 1. Select worker
worker_idx = self._load_balancer.select_worker(len(self._workers))
# 2. Acquire worker's submission semaphore (blocks if queue full)
self._worker_semaphores[worker_idx].acquire()
# 3. Record start
self._load_balancer.record_start(worker_idx)
# 4. Execute method
result = getattr(self._workers[worker_idx], name)(*args, **kwargs)
# 5. Wrap future to release semaphore and record completion
return self._wrap_future_with_tracking(result, worker_idx)
Future Wrapping for Semaphore Release:
def _wrap_future_with_tracking(self, future, worker_idx):
def on_complete(f):
self._load_balancer.record_complete(worker_idx)
self._worker_semaphores[worker_idx].release()
future.add_done_callback(on_complete)
return future
On-Demand Workers¶
Lifecycle:
1. Creation: New worker created per request
2. Execution: Single method call
3. Cleanup: Worker stopped after result available
4. Tracking: Stored in _on_demand_workers
list during execution
Concurrency Limits:
- Thread: max(1, cpu_count() - 1)
- Process: max(1, cpu_count() - 1)
- Ray: Unlimited (cluster manages resources)
Cleanup Strategy:
def _wrap_future_with_cleanup(self, future, worker):
def cleanup_callback(f):
# Schedule cleanup in separate thread to avoid deadlock
def deferred_cleanup():
worker.stop(timeout=self.on_demand_cleanup_timeout)
threading.Thread(target=deferred_cleanup, daemon=True).start()
future.add_done_callback(cleanup_callback)
return future
Critical: Cleanup must happen in separate thread to avoid deadlock. Calling worker.stop()
from within a callback can cause deadlocks because stop()
may try to cancel futures that are invoking this callback.
WorkerBuilder¶
WorkerBuilder is the factory that creates workers or pools based on configuration:
class WorkerBuilder(Typed):
# Public configuration
worker_cls: Type["Worker"]
mode: ExecutionMode
blocking: bool
max_workers: Optional[int]
load_balancing: Optional[LoadBalancingAlgorithm]
on_demand: bool
max_queued_tasks: Optional[int]
num_retries: int
retry_on: Optional[Any]
retry_algorithm: RetryAlgorithm
retry_wait: float
retry_jitter: float
retry_until: Optional[Any]
options: dict
def init(self, *args, **kwargs) -> Union[WorkerProxy, WorkerProxyPool]:
if self._should_create_pool():
return self._create_pool(args, kwargs)
else:
return self._create_single_worker(args, kwargs)
Responsibilities: 1. Validate configuration (max_workers, on_demand compatibility) 2. Apply defaults from global config 3. Process limits parameter (create LimitPool) 4. Create retry config if num_retries > 0 5. Check Ray + Pydantic incompatibility 6. Decide single worker vs pool 7. Instantiate appropriate proxy/pool class
Limits Processing:
def _transform_worker_limits(limits, mode, is_pool, worker_index):
if limits is None:
return empty LimitPool
if isinstance(limits, LimitPool):
return limits
if isinstance(limits, list) and all isinstance(Limit):
return LimitPool([LimitSet(limits, shared=is_pool, mode)])
if isinstance(limits, list) and all isinstance(BaseLimitSet):
return LimitPool(limits) # Multi-region limits
if isinstance(limits, BaseLimitSet):
if not limits.shared and is_pool:
raise ValueError("Pool requires shared=True")
return LimitPool([limits])
Worker Wrapping:
def _create_worker_wrapper(worker_cls, limits, retry_config, for_ray=False):
class WorkerWithLimitsAndRetry(worker_cls):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set limits (create LimitPool from list if needed)
if isinstance(limits, list):
limit_set = LimitSet(limits, shared=False, mode=Sync)
limit_pool = LimitPool([limit_set])
else:
limit_pool = limits
object.__setattr__(self, "limits", limit_pool)
def __getattribute__(self, name):
attr = super().__getattribute__(name)
if for_ray:
# Ray: Pre-wrap methods at class level
return attr
if has_retry and not name.startswith("_") and callable(attr):
return create_retry_wrapper(attr, retry_config)
return attr
if for_ray and has_retry:
# Pre-wrap all methods at class level for Ray
for method_name in dir(worker_cls):
if not method_name.startswith("_"):
method = getattr(worker_cls, method_name)
if callable(method):
wrapped = create_retry_wrapper(method, retry_config)
setattr(WorkerWithLimitsAndRetry, method_name, wrapped)
return WorkerWithLimitsAndRetry
Execution Modes¶
Mode | Worker Proxy | Pool Support | Concurrency | Serialization | Best For |
---|---|---|---|---|---|
sync |
SyncWorkerProxy | No | None | None | Testing, debugging |
thread |
ThreadWorkerProxy | Yes | Thread-level | None (shared memory) | I/O-bound tasks |
process |
ProcessWorkerProxy | Yes | Process-level | cloudpickle | CPU-bound tasks |
asyncio |
AsyncioWorkerProxy | No | Event loop | None (shared memory) | Async I/O (HTTP, DB) |
ray |
RayWorkerProxy | Yes | Distributed | Ray serialization | Distributed computing |
Default max_workers (pools): - Sync: 1 (fixed) - Asyncio: 1 (fixed) - Thread: 24 - Process: 4 - Ray: 0 (unlimited on-demand)
Default load_balancing:
- Persistent pools: round_robin
- On-demand pools: random
Default max_queued_tasks (submission queue): - Sync: None (bypassed) - Asyncio: None (bypassed) - Thread: 100 - Process: 5 - Ray: 2
Worker Lifecycle¶
Initialization¶
User calls Worker.options(mode, ...).init(args, kwargs)
↓
WorkerBuilder created with configuration
↓
WorkerBuilder.init() called
↓
├─ If max_workers=1 or None: _create_single_worker()
│ ↓
│ 1. Select appropriate WorkerProxy class
│ 2. Process limits → LimitPool
│ 3. Create retry config
│ 4. Instantiate proxy
│ 5. proxy.post_initialize() called
│ 6. Worker wrapper created (_create_worker_wrapper)
│ 7. Worker instance created with user args/kwargs
│ 8. Limits and retry logic injected
│ 9. Return proxy
│
└─ If max_workers>1 or on_demand: _create_pool()
↓
1. Select appropriate WorkerProxyPool class
2. Process limits → shared LimitPool
3. Create retry config
4. Instantiate pool
5. pool.post_initialize() called
6. Load balancer created
7. For persistent pools: _initialize_pool()
└─ Create N workers with sequential indices
8. Return pool
Method Execution (Single Worker)¶
user calls worker.method(args, kwargs)
↓
WorkerProxy.__getattr__("method") intercepts
↓
Check _method_cache for cached wrapper
↓
If not cached, create method_wrapper:
↓
1. Check if stopped
2. Acquire submission semaphore (if configured)
3. Check stopped again (atomic with semaphore)
4. Call _execute_method(name, args, kwargs)
5. Wrap future to release semaphore on completion
6. Return future (or result if blocking=True)
↓
Cache wrapper in _method_cache
↓
Return wrapper to user
↓
User calls wrapper(args) → future returned
↓
User calls future.result() → blocks until complete
Method Execution (Pool)¶
user calls pool.method(args, kwargs)
↓
WorkerProxyPool.__getattr__("method") intercepts
↓
Check _method_cache for cached wrapper
↓
If not cached, create method_wrapper:
↓
├─ If on_demand:
│ 1. Wait for on-demand slot (blocks if limit reached)
│ 2. Check if stopped
│ 3. Increment counter, get worker_index
│ 4. Create worker with _create_worker(worker_index)
│ 5. Track in _on_demand_workers
│ 6. Call worker.method(args, kwargs)
│ 7. Wrap future for cleanup after completion
│ 8. Return future (or result if blocking=True)
│
└─ If persistent:
1. Check workers exist and not stopped
2. Select worker: idx = load_balancer.select_worker(N)
3. Acquire worker's submission semaphore (blocks if full)
4. Check stopped again (atomic with semaphore)
5. Record start: load_balancer.record_start(idx)
6. Call worker.method(args, kwargs)
7. Wrap future to:
- Release worker's semaphore
- Record completion in load balancer
8. Return future (or result if blocking=True)
Shutdown¶
user calls worker.stop() or pool.stop()
↓
Set _stopped = True
↓
Cancel all pending futures
↓
├─ Single Worker:
│ └─ Mode-specific cleanup:
│ - Sync: No-op
│ - Thread: Put None in queue, join thread
│ - Process: Put None in queue, join process + result thread
│ - Asyncio: Stop sync thread, stop event loop
│ - Ray: ray.kill(actor)
│
└─ Pool:
1. Stop all persistent workers
2. Stop all on-demand workers
3. Clear worker lists
4. (Workers handle their own cleanup)
Context Manager¶
with Worker.options(mode="thread").init() as worker:
result = worker.method().result()
# worker.stop() called automatically
Pool Architecture¶
Persistent Pool¶
Client Process
│
├─ WorkerProxyPool (client-side)
│ ├─ LoadBalancer (round-robin, least-active, etc.)
│ ├─ Shared LimitPool (all workers share)
│ ├─ Worker 0 (WorkerProxy, index=0)
│ │ ├─ Submission Semaphore (max_queued_tasks)
│ │ ├─ LimitPool (copy of shared, offset by index=0)
│ │ └─ Worker instance (with limits, retry)
│ ├─ Worker 1 (WorkerProxy, index=1)
│ │ ├─ Submission Semaphore
│ │ ├─ LimitPool (copy of shared, offset by index=1)
│ │ └─ Worker instance
│ └─ Worker N-1 (WorkerProxy, index=N-1)
│ ├─ Submission Semaphore
│ ├─ LimitPool (copy of shared, offset by index=N-1)
│ └─ Worker instance
│
└─ Method calls dispatched via LoadBalancer
Key Characteristics: - All workers created at initialization - Load balancer distributes calls - Each worker has own submission semaphore - All workers share same LimitSet instances (via LimitPool) - Worker indices used for round-robin in LimitPool
On-Demand Pool¶
Client Process
│
├─ WorkerProxyPool (client-side)
│ ├─ LoadBalancer (typically Random)
│ ├─ Shared LimitPool (all workers share)
│ ├─ _on_demand_workers list (tracks active ephemeral workers)
│ ├─ _on_demand_lock (thread-safe access to list)
│ └─ _on_demand_counter (sequential indices)
│
├─ On method call:
│ 1. Wait for slot (if limit enforced)
│ 2. Create worker with unique index
│ 3. Add to _on_demand_workers
│ 4. Execute method
│ 5. Wrap future for cleanup
│
└─ On future completion:
1. Callback triggers
2. Schedule deferred cleanup (separate thread)
3. Call worker.stop()
4. Remove from _on_demand_workers
Key Characteristics:
- No persistent workers
- Workers created per request
- Workers destroyed after completion
- Cleanup happens in separate thread (avoid deadlock)
- Concurrency limited by _get_on_demand_limit()
- Random load balancing (default)
Critical Implementation Details¶
1. Typed Integration¶
All proxies and pools inherit from morphic.Typed
:
- Public fields are immutable and validated
- Private attributes use PrivateAttr()
- post_initialize()
called after validation
- object.__setattr__()
used to set private attrs during initialization
class WorkerProxy(Typed, ABC):
worker_cls: Type[Worker] # Immutable public field
_stopped: bool = PrivateAttr(default=False) # Mutable private attr
def post_initialize(self) -> None:
# Use object.__setattr__ to bypass frozen model
object.__setattr__(self, "_stopped", False)
object.__setattr__(self, "_method_cache", {})
2. Mode as ClassVar¶
Each proxy sets mode: ClassVar[ExecutionMode]
at class level:
This avoids passing mode as a parameter, reducing serialization size.
3. Submission Queue (max_queued_tasks)¶
Client-side semaphore limits in-flight tasks per worker:
# In WorkerProxy.post_initialize():
if self.max_queued_tasks is not None:
self._submission_semaphore = threading.BoundedSemaphore(self.max_queued_tasks)
# In WorkerProxy.__getattr__():
if self._submission_semaphore:
self._submission_semaphore.acquire() # Blocks if queue full
# Wrap future to release on completion:
def on_complete(f):
self._submission_semaphore.release()
future.add_done_callback(on_complete)
Purpose: Prevent memory exhaustion from thousands of pending futures, especially for Ray actors.
Bypassed for: - Sync mode (immediate execution) - Asyncio mode (event loop handles concurrency) - Blocking mode (sequential execution) - On-demand workers (pool already limits concurrency)
4. Future Unwrapping¶
Automatically unwrap BaseFuture arguments before execution (unless unwrap_futures=False
):
def _unwrap_futures_in_args(args, kwargs, unwrap_futures):
if not unwrap_futures:
return args, kwargs
# Fast-path: check if any futures or collections present
has_future_or_collection = ...
if not has_future_or_collection:
return args, kwargs # No unwrapping needed
# Recursively unwrap using morphic.map_collection
unwrapped_args = tuple(
map_collection(arg, _unwrap_future_value, recurse=True)
for arg in args
)
...
Ray Zero-Copy Optimization:
def _unwrap_future_for_ray(obj):
if isinstance(obj, RayFuture):
return obj._object_ref # Zero-copy!
elif isinstance(obj, BaseFuture):
return obj.result() # Materialize
return obj
5. Worker Wrapper Creation¶
_create_worker_wrapper()
injects limits and retry logic:
def _create_worker_wrapper(worker_cls, limits, retry_config, for_ray=False):
if not retry_config or retry_config.num_retries == 0:
# Only limits, no retry
class WorkerWithLimits(worker_cls):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Create LimitPool from list if needed
if isinstance(limits, list):
limit_set = LimitSet(limits, shared=False, mode=Sync)
limit_pool = LimitPool([limit_set])
else:
limit_pool = limits
# Use object.__setattr__ to bypass frozen Pydantic models
object.__setattr__(self, "limits", limit_pool)
return WorkerWithLimits
# With retry logic
class WorkerWithLimitsAndRetry(worker_cls):
def __init__(self, *args, **kwargs):
# Same as above for limits
...
def __getattribute__(self, name):
attr = super().__getattribute__(name)
if not for_ray and not name.startswith("_") and callable(attr):
# Wrap method with retry logic
return create_retry_wrapper(attr, retry_config, ...)
return attr
if for_ray:
# Pre-wrap methods at class level (Ray bypasses __getattribute__)
for method_name in dir(worker_cls):
if not method_name.startswith("_"):
method = getattr(worker_cls, method_name)
if callable(method):
wrapped = create_retry_wrapper(method, retry_config, ...)
setattr(WorkerWithLimitsAndRetry, method_name, wrapped)
return WorkerWithLimitsAndRetry
Key Points:
- Always sets self.limits
(even if empty LimitPool)
- Uses object.__setattr__()
to support frozen Pydantic models
- For Ray: Pre-wraps methods at class level (bypasses __getattribute__
)
- For other modes: Wraps methods dynamically via __getattribute__
6. Load Balancing State¶
Load balancer tracks per-worker statistics:
class LeastActiveLoadBalancer(BaseLoadBalancer):
_active_calls: Dict[int, int] # worker_id -> active count
_total_dispatched: int
_lock: threading.Lock
def select_worker(self, num_workers):
with self._lock:
# Find worker with minimum active calls
min_active = min(self._active_calls.values())
for i in range(num_workers):
if self._active_calls[i] == min_active:
return i
def record_start(self, worker_id):
with self._lock:
self._active_calls[worker_id] += 1
self._total_dispatched += 1
def record_complete(self, worker_id):
with self._lock:
self._active_calls[worker_id] -= 1
7. Shared Limits Across Pool¶
All workers in pool share same LimitSet instances:
# In WorkerBuilder._create_pool():
limits = _transform_worker_limits(
limits=self.options.get("limits"),
mode=execution_mode,
is_pool=True, # Creates shared LimitSet
worker_index=0 # Placeholder
)
# In WorkerProxyPool._create_worker():
worker_limits = _transform_worker_limits(
limits=self.limits, # Shared LimitSet
mode=self.mode,
is_pool=False,
worker_index=i # Unique index for round-robin
)
# Each worker gets a LimitPool with:
# - Same LimitSet instances (shared state)
# - Unique worker_index (for round-robin offset)
Adding New Worker Types¶
To add a new execution mode (e.g., Dask
, Celery
):
1. Create WorkerProxy Subclass¶
class DaskWorkerProxy(WorkerProxy):
# Set mode at class level
mode: ClassVar[ExecutionMode] = ExecutionMode.Dask
# Add mode-specific config fields
dask_scheduler: str
# Add private attributes
_dask_future: Any = PrivateAttr()
_client: Any = PrivateAttr()
def post_initialize(self) -> None:
super().post_initialize()
# Initialize Dask client
import dask.distributed
self._client = dask.distributed.Client(self.dask_scheduler)
# Create worker wrapper
worker_cls = _create_worker_wrapper(
self.worker_cls,
self.limits,
self.retry_config
)
# Submit worker to Dask cluster
self._dask_future = self._client.submit(worker_cls, ...)
def _execute_method(self, method_name, *args, **kwargs):
# Submit method call to Dask worker
dask_future = self._client.submit(
lambda w: getattr(w, method_name)(*args, **kwargs),
self._dask_future
)
return DaskFuture(dask_future=dask_future)
def stop(self, timeout=30):
super().stop(timeout)
self._client.close()
2. Create Future Subclass¶
class DaskFuture(BaseFuture):
__slots__ = ("_dask_future",)
FUTURE_UUID_PREFIX = "dask-"
def __init__(self, dask_future):
super().__init__()
self._dask_future = dask_future
def result(self, timeout=None):
return self._dask_future.result(timeout=timeout)
def done(self):
return self._dask_future.done()
def cancel(self):
return self._dask_future.cancel()
# ... implement other BaseFuture methods
3. Create Pool Subclass (if supported)¶
class DaskWorkerProxyPool(WorkerProxyPool):
dask_scheduler: str
def _initialize_pool(self):
for i in range(self.max_workers):
worker = self._create_worker(worker_index=i)
self._workers.append(worker)
def _create_worker(self, worker_index=0):
# Process limits with worker_index
worker_limits = _transform_worker_limits(
limits=self.limits,
mode=self.mode,
is_pool=False,
worker_index=worker_index
)
return DaskWorkerProxy(
worker_cls=self.worker_cls,
dask_scheduler=self.dask_scheduler,
limits=worker_limits,
...
)
def _get_on_demand_limit(self):
return None # Dask manages resources
4. Update ExecutionMode Enum¶
class ExecutionMode(AutoEnum):
Sync = alias("sync")
Threads = alias("thread", "threads")
Processes = alias("process", "processes")
Asyncio = alias("asyncio", "async")
Ray = alias("ray")
Dask = alias("dask") # New!
5. Update WorkerBuilder¶
# In WorkerBuilder._create_single_worker():
elif execution_mode == ExecutionMode.Dask:
from .dask_worker import DaskWorkerProxy
proxy_cls = DaskWorkerProxy
# In WorkerBuilder._create_pool():
elif execution_mode == ExecutionMode.Dask:
pool_cls = DaskWorkerProxyPool
6. Update wrap_future()¶
def wrap_future(future):
# ... existing checks ...
elif hasattr(future, "_dask_future"):
return future # Already a DaskFuture
# ... try to import Dask and check type ...
7. Add Configuration Defaults¶
class GlobalDefaults(Typed):
# ... existing fields ...
class Dask(Typed):
blocking: bool = False
max_workers: int = 8
load_balancing: LoadBalancingAlgorithm = LoadBalancingAlgorithm.RoundRobin
max_queued_tasks: Optional[int] = 10
# ... other Dask-specific defaults ...
dask: Dask = Dask()
8. Add Tests¶
class TestDaskWorker:
def test_basic_execution(self):
worker = SimpleWorker.options(mode="dask").init()
result = worker.compute(5).result()
assert result == expected
worker.stop()
def test_dask_pool(self):
pool = SimpleWorker.options(
mode="dask",
max_workers=4
).init()
results = [pool.compute(i).result() for i in range(10)]
assert len(results) == 10
pool.stop()
Limitations and Gotchas¶
1. Ray + Pydantic Incompatibility¶
Problem: Ray's ray.remote()
conflicts with Pydantic's __setattr__
Symptoms:
class MyWorker(Worker, Typed):
name: str
value: int
worker = MyWorker.options(mode="ray").init(name="test", value=10)
# ValueError: Cannot create Ray worker with Pydantic-based class
Why: Ray wraps classes as actors and modifies __setattr__
, which breaks Pydantic's frozen model implementation.
Workarounds: 1. Use composition instead of inheritance:
class MyWorker(Worker):
def __init__(self, name: str, value: int):
self.name = name
self.value = value
- Use validation decorators (Ray-compatible):
Detection: Concurry automatically detects this and:
- Raises ValueError
when trying to create Ray worker with Typed/BaseModel
- Warns when creating non-Ray worker (if Ray is installed)
2. Async Functions in Non-Asyncio Modes¶
Limitation: Async functions work but don't provide concurrency benefits
Why: Other modes use asyncio.run()
which blocks until completion
Example:
class APIWorker(Worker):
async def fetch(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
# ThreadWorker: Each fetch() blocks the thread
worker = APIWorker.options(mode="thread").init()
urls = [f"http://api.example.com/{i}" for i in range(10)]
futures = [worker.fetch(url) for url in urls] # Sequential, ~10 seconds
# AsyncioWorker: fetch() runs concurrently in event loop
worker = APIWorker.options(mode="asyncio").init()
futures = [worker.fetch(url) for url in urls] # Concurrent, ~1 second
Best Practice: Use mode="asyncio"
for async I/O-bound tasks.
3. Submission Queue vs Resource Limits¶
Two separate mechanisms:
1. Submission Queue (max_queued_tasks
): Client-side, limits pending futures
2. Resource Limits (limits
): Worker-side, limits concurrent operations
Example:
# Submission queue: Max 10 futures in-flight
# Resource limit: Max 5 concurrent executions
worker = MyWorker.options(
mode="ray",
max_queued_tasks=10,
limits=[ResourceLimit(key="slots", capacity=5)]
).init()
# Submit 100 tasks:
futures = [worker.task(i) for i in range(100)]
# - First 10 submit immediately (submission queue)
# - Next 90 block on submission queue
# - Inside worker: Max 5 execute concurrently (resource limit)
4. On-Demand Workers and Limits¶
Issue: Each on-demand worker gets own LimitPool copy
Impact: Limits are NOT shared across on-demand workers
Example:
pool = MyWorker.options(
mode="thread",
on_demand=True,
limits=[ResourceLimit(key="connections", capacity=10)]
).init()
# Creates 5 workers, each with capacity=10 → 50 total connections!
Solution: Don't use limits with on-demand workers, or use persistent pool.
5. Method Caching and Callable Attributes¶
Issue: __getattr__
caches method wrappers by name
Problem: If worker class has callable attributes that change, cache becomes stale
Example:
class DynamicWorker(Worker):
def __init__(self):
self.processor = lambda x: x * 2
def update_processor(self, new_func):
self.processor = new_func
worker = DynamicWorker.options(mode="thread").init()
worker.processor(5) # Returns 10
worker.update_processor(lambda x: x * 3)
worker.processor(5) # Still returns 10! (cached wrapper)
Solution: Clear _method_cache
when updating callable attributes, or use regular methods instead of callable attributes.
6. Exception Handling in Pools¶
Behavior: Exceptions don't stop the pool
Example:
pool = MyWorker.options(mode="thread", max_workers=4).init()
futures = [pool.task(i) for i in range(10)]
# If task(5) raises exception, other tasks continue
# Exception stored in futures[5], not propagated
try:
results = [f.result() for f in futures]
except Exception as e:
# Only raised when accessing futures[5].result()
...
Best Practice: Use gather(return_exceptions=True)
to collect all results/exceptions.
7. Worker State and Pools¶
Limitation: Worker state is per-worker, not per-pool
Example:
class Counter(Worker):
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
pool = Counter.options(mode="thread", max_workers=4).init()
# Each worker has own count
results = [pool.increment().result() for _ in range(10)]
# Results: [1, 1, 1, 1, 2, 2, 2, 2, 3, 3] (depends on load balancing)
# NOT: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Solution: Use shared state mechanisms (Redis, database, etc.) or single worker.
8. Stop Timeout and Cleanup¶
Issue: stop()
timeout is per-operation, not total
Example:
pool = MyWorker.options(mode="thread", max_workers=10).init()
pool.stop(timeout=5)
# May take up to 50 seconds! (5s × 10 workers)
Best Practice: Set appropriate timeout based on pool size.
9. Cloudpickle Serialization Quirks¶
Issue: Process and Ray workers serialize worker class
Limitations: - Local variables from outer scope captured by closures - Large dependencies increase serialization time - Some objects can't be pickled (open files, database connections)
Example:
# BAD: Captures entire DataFrame in closure
df = pd.DataFrame(...) # 1GB
class Processor(Worker):
def process(self, row_id):
return df.iloc[row_id] # Serializes entire df!
worker = Processor.options(mode="process").init()
Solution: Pass data as arguments, not via closures:
class Processor(Worker):
def __init__(self, df):
self.df = df
worker = Processor.options(mode="process").init(df)
10. Load Balancer State and Restarts¶
Issue: Load balancer state lost on pool restart
Example:
pool = MyWorker.options(
mode="thread",
max_workers=4,
load_balancing="least_total"
).init()
# After 1000 calls, load balanced across workers
stats = pool.get_pool_stats()
# {"load_balancer": {"total_calls": {0: 250, 1: 250, 2: 250, 3: 250}}}
pool.stop()
pool = MyWorker.options(...).init() # New pool
# Load balancer reset, starts from zero
Solution: Don't rely on load balancer state persisting across restarts.
This architecture document provides a comprehensive technical overview of the worker and worker pool system in Concurry. For implementation details, see the source code in src/concurry/core/worker/
.