Global Configuration¶
Concurry provides a powerful global configuration system that allows you to customize default behavior across all execution modes. This eliminates the need to repeatedly specify the same parameters and makes it easy to tune performance for your specific use case.
Overview¶
The configuration system has two levels:
- Global defaults - Apply to all execution modes unless overridden
- Mode-specific overrides - Take precedence for a specific execution mode (sync, asyncio, thread, process, ray)
Quick Start¶
Accessing the Global Configuration¶
from concurry import global_config
# View current defaults
print(global_config.defaults.num_retries) # 0
print(global_config.defaults.retry_wait) # 1.0
# View mode-specific settings
print(global_config.thread.max_workers) # 30
print(global_config.ray.max_queued_tasks) # 3
Modifying Defaults Permanently¶
from concurry import global_config
# Change global defaults (affects all modes)
global_config.defaults.num_retries = 3
global_config.defaults.retry_wait = 2.0
# Change mode-specific settings
global_config.thread.max_workers = 50
global_config.ray.max_queued_tasks = 10
# Reset to library defaults
global_config.reset_to_defaults()
Temporary Configuration with temp_config()
¶
The recommended approach is using the temp_config()
context manager:
from concurry import temp_config, Worker
class MyWorker(Worker):
def process(self, data):
return data * 2
# Temporary global override
with temp_config(global_num_retries=5, global_retry_wait=3.0):
# All workers created here use these retry settings
worker1 = MyWorker.options(mode="thread").init()
worker2 = MyWorker.options(mode="ray").init()
# Both have num_retries=5, retry_wait=3.0
worker1.stop()
worker2.stop()
# Original settings restored after context
# Mode-specific overrides
with temp_config(
thread_max_workers=100,
ray_max_queued_tasks=50
):
# Only affects thread and ray modes respectively
pass
Configuration Categories¶
Worker & Pool Configuration¶
Configure worker pool behavior:
with temp_config(
thread_max_workers=50, # Max workers in thread pool
thread_max_queued_tasks=2000, # Max queued tasks
process_max_workers=8, # Max workers in process pool
ray_max_workers=0 # 0 = unlimited for Ray
):
pass
Execution Configuration¶
Control blocking behavior and future unwrapping:
with temp_config(
global_blocking=True, # Return results directly, not futures
global_unwrap_futures=False # Don't auto-unwrap nested futures
):
worker = MyWorker.options(mode="thread").init()
result = worker.process(10) # Returns result directly (blocking=True)
worker.stop()
Retry Configuration¶
Configure retry behavior globally:
from concurry import temp_config
from concurry.core.retry import RetryAlgorithm
with temp_config(
global_num_retries=3, # Retry up to 3 times
global_retry_algorithm=RetryAlgorithm.Exponential, # Exponential backoff
global_retry_wait=2.0, # Min 2s wait
global_retry_jitter=0.5 # 50% jitter
):
pass
Load Balancing Configuration¶
Configure how tasks are distributed across workers:
from concurry import temp_config
from concurry.core.constants import LoadBalancingAlgorithm
with temp_config(
thread_load_balancing=LoadBalancingAlgorithm.LeastActiveLoad,
process_load_balancing=LoadBalancingAlgorithm.RoundRobin
):
pass
Rate Limiting Configuration¶
Configure rate limiter behavior:
from concurry import temp_config
from concurry.core.constants import RateLimitAlgorithm
with temp_config(
global_rate_limit_algorithm=RateLimitAlgorithm.TokenBucket,
global_rate_limiter_min_wait_time=0.05 # Min 50ms between checks
):
pass
Polling Strategy Configuration¶
Configure how wait()
and gather()
poll for results:
with temp_config(
# Fixed polling
global_polling_fixed_interval=0.02, # Check every 20ms
# Adaptive polling
global_polling_adaptive_min_interval=0.0005, # 0.5ms min
global_polling_adaptive_max_interval=0.3, # 300ms max
global_polling_adaptive_initial_interval=0.02, # Start at 20ms
# Exponential polling
global_polling_exponential_initial_interval=0.02, # Start at 20ms
global_polling_exponential_max_interval=3.0, # Max 3s
# Progressive polling
global_polling_progressive_min_interval=0.0005, # 0.5ms min
global_polling_progressive_max_interval=1.0 # 1s max
):
pass
Internal Timeouts¶
Configure internal timeouts (advanced):
with temp_config(
# Thread mode timeouts
thread_worker_command_queue_timeout=0.2,
# Process mode timeouts
process_worker_result_queue_timeout=60.0,
process_worker_result_queue_cleanup_timeout=2.0,
# Asyncio mode timeouts
asyncio_worker_loop_ready_timeout=60.0,
asyncio_worker_thread_ready_timeout=60.0,
asyncio_worker_sync_queue_timeout=0.2,
# Pool timeouts (all modes)
thread_pool_on_demand_cleanup_timeout=10.0,
thread_pool_on_demand_slot_max_wait=120.0
):
pass
Progress Bar Configuration¶
Configure progress bar display:
with temp_config(
global_progress_bar_ncols=120, # Wider progress bars
global_progress_bar_smoothing=0.2, # More smoothing
global_progress_bar_miniters=10 # Update every 10 iterations
):
from concurry.utils.progress import ProgressBar
for item in ProgressBar(range(1000)):
# Uses custom settings
pass
Hierarchical Resolution¶
When looking up a configuration value, Concurry follows this resolution order:
- Explicit parameter passed to the function/method (highest priority)
- Mode-specific override from
global_config.<mode>.*
- Global default from
global_config.defaults.*
(lowest priority)
Example:
from concurry import temp_config, Worker
# Set global default
with temp_config(global_num_retries=3):
# Set thread-specific override
with temp_config(thread_num_retries=10):
# Explicit parameter wins
worker = Worker.options(
mode="thread",
num_retries=5 # This value is used
).init()
# Without explicit parameter, mode-specific wins
worker2 = Worker.options(mode="thread").init()
# Uses num_retries=10 (thread override)
# Different mode uses global default
worker3 = Worker.options(mode="ray").init()
# Uses num_retries=3 (global default)
Best Practices¶
1. Use temp_config()
for Tests¶
import pytest
from concurry import temp_config
def test_with_custom_config():
with temp_config(
global_num_retries=0, # No retries in tests
thread_max_workers=2 # Fewer workers for testing
):
# Test code here
pass
2. Set Global Defaults at Application Startup¶
from concurry import global_config
def configure_concurry():
"""Configure Concurry for production environment."""
# Aggressive retries
global_config.defaults.num_retries = 3
global_config.defaults.retry_wait = 2.0
# Larger thread pools
global_config.thread.max_workers = 100
global_config.thread.max_queued_tasks = 5000
# Ray configuration
global_config.ray.max_workers = 0 # Unlimited
if __name__ == "__main__":
configure_concurry()
# Start application
3. Prefer Mode-Specific Overrides Over Global Changes¶
When you need different behavior per mode:
# Good: Specific overrides
with temp_config(
thread_max_workers=100,
process_max_workers=8,
ray_max_workers=0
):
pass
# Less ideal: Global setting that might not fit all modes
with temp_config(global_max_workers=100):
pass
4. Document Configuration Dependencies¶
If your code relies on specific configuration:
def heavy_processing():
"""
Process heavy workloads.
Configuration:
Recommended to increase thread pool size:
>>> with temp_config(thread_max_workers=200):
... heavy_processing()
"""
pass
Common Patterns¶
Performance Tuning¶
# High-throughput configuration
with temp_config(
thread_max_workers=200,
thread_max_queued_tasks=10000,
global_polling_fixed_interval=0.001, # Poll more frequently
global_rate_limiter_min_wait_time=0.001
):
# High-performance processing
pass
Conservative/Reliable Configuration¶
# Conservative settings for reliability
with temp_config(
global_num_retries=5,
global_retry_wait=5.0,
thread_max_workers=10, # Fewer workers
thread_max_queued_tasks=100, # Smaller queues
global_stop_timeout=60.0 # Longer stop timeout
):
# Reliable processing
pass
Development/Debug Configuration¶
# Debug-friendly settings
with temp_config(
thread_max_workers=1, # Single worker for easier debugging
global_num_retries=0, # No retries to see failures immediately
global_blocking=True, # Synchronous for simpler stack traces
global_stop_timeout=5.0 # Quick shutdown
):
# Development code
pass
Configuration Reference¶
For a complete list of all configuration options, see:
GlobalDefaults
class inconcurry/config.py
for global settingsExecutionModeDefaults
class inconcurry/config.py
for mode-specific settings
You can also inspect the configuration at runtime:
from concurry import global_config
import pprint
# See all global defaults
pprint.pprint(global_config.defaults.model_dump())
# See all thread mode settings
pprint.pprint(global_config.thread.model_dump())
# Get resolved defaults for a specific mode
from concurry.core.constants import ExecutionMode
resolved = global_config.get_defaults(ExecutionMode.Threads)
print(resolved.num_retries) # Falls back to global if thread-specific is None
Related Documentation¶
- Architecture: Configuration System - Detailed design and implementation
- Workers - Worker configuration options
- Retries - Retry configuration
- Limits - Rate limiting configuration