Limits¶
Limits in concurry provide flexible, composable resource protection and rate limiting. They enable you to control resource usage, enforce rate limits, and track consumption across different dimensions simultaneously.
Overview¶
The limit system has three layers:
Layer 1: Limit Definitions (Data Containers)¶
- RateLimit - Time-based rate limiting with multiple algorithms
- CallLimit - Call counting (special case of RateLimit)
- ResourceLimit - Semaphore-based resource limiting
Important: Limit
objects are simple data containers that define constraints. They are NOT thread-safe and cannot be acquired directly.
Layer 2: LimitSet (Thread-Safe Executor)¶
LimitSet is a factory function that creates thread-safe limit executors. It handles:
- Thread-safe acquisition and release
- Atomic multi-limit acquisition
- Partial acquisition (nested patterns)
- Backend selection based on execution mode
- Optional config
dict for metadata (e.g., region, account)
LimitSet returns:
- InMemorySharedLimitSet
- For sync, thread, asyncio (uses threading.Lock
)
- MultiprocessSharedLimitSet
- For process mode (uses multiprocessing.Manager
)
- RaySharedLimitSet
- For Ray mode (uses Ray actor)
Layer 3: LimitPool (Multi-LimitSet Load Balancing)¶
LimitPool aggregates multiple independent LimitSets
with load balancing. It provides:
- Reduced contention - Workers acquire from different LimitSets
- Higher throughput - More concurrent acquisitions without blocking
- Multi-region support - Each LimitSet
can represent a different region/account
- Scalability - Add more LimitSets
to increase capacity
Use cases: - Multi-region API endpoints - Multiple API accounts/keys - Service tiers (premium vs. standard) - High-contention workload distribution
Quick Reference¶
Basic Pattern¶
from concurry import LimitSet, RateLimit, CallLimit, ResourceLimit, RateLimitAlgorithm
# 1. Define limits (data containers)
limits = LimitSet(limits=[
CallLimit(window_seconds=60, capacity=100),
RateLimit(key="tokens", window_seconds=60, capacity=1000),
ResourceLimit(key="connections", capacity=10)
])
# 2. Acquire limits (thread-safe)
with limits.acquire(requested={"tokens": 50, "connections": 2}) as acq:
result = do_work()
# 3. Update RateLimit usage
acq.update(usage={"tokens": result.actual_tokens})
# CallLimit and ResourceLimit auto-handled
Key Behaviors¶
Feature | Behavior |
---|---|
Limit objects | Data containers only, NOT thread-safe |
LimitSet | Factory function, creates thread-safe executor |
LimitPool | Aggregates multiple LimitSets with load balancing |
Config | Each LimitSet can have metadata accessible via acquisition |
Empty LimitSet | Workers always have self.limits , even without configuration |
No limits | Empty LimitSet always allows acquisition, zero overhead |
CallLimit | Always acquired with default of 1, no update needed |
ResourceLimit | Always acquired with default if not specified, no update needed |
RateLimit | Must be in requested dict, requires update() call |
Partial acquisition | Specify only what you need, CallLimit/ResourceLimit auto-included |
Nested acquisition | Supported, enables fine-grained resource management |
Shared limits | shared=True (default) shares limits across workers |
Mode matching | mode parameter must match worker execution mode |
Basic Usage¶
Creating a LimitSet¶
Always use LimitSet
to create thread-safe limit executors:
from concurry import LimitSet, RateLimit, RateLimitAlgorithm
# Define limit constraints (data containers)
rate_limit = RateLimit(
key="api_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1000
)
# Create thread-safe LimitSet
limits = LimitSet(
limits=[rate_limit],
shared=True, # Default: share across workers
mode="sync" # Default: for sync/thread/asyncio
)
# Acquire and use tokens (thread-safe)
with limits.acquire(requested={"api_tokens": 100}) as acq:
result = call_api()
# Update with actual usage
acq.update(usage={"api_tokens": result.actual_tokens})
Key points:
- Limit
objects are data containers - use LimitSet
for thread-safe operations
- LimitSet
is a factory that creates appropriate backend implementations
- Always call acq.update()
for RateLimits to report actual usage
- Unused tokens may be refunded (algorithm-specific)
- Usage must not exceed requested amount
Empty LimitSet (No Limits)¶
Workers always have self.limits
available, even when no limits are configured. If you create a worker without passing limits, it automatically gets an empty LimitSet that always allows acquisition without blocking.
from concurry import Worker
class APIWorker(Worker):
def __init__(self):
pass
def process(self, data):
# self.limits is always available
with self.limits.acquire():
# Always succeeds immediately, no blocking
return do_work(data)
# Worker without limits - self.limits.acquire() always succeeds
worker = APIWorker.options(mode="thread").init()
result = worker.process(data).result()
worker.stop()
Key benefits:
- Write code once, conditionally enforce limits
- No need to check if self.limits is not None
- Zero overhead when no limits configured
- Enables gradual adoption of limits
Creating empty LimitSet directly:
from concurry import LimitSet
# Create empty LimitSet - always allows acquisition
empty_limits = LimitSet(limits=[], shared=False, mode="sync")
with empty_limits.acquire():
# Always succeeds immediately
do_work()
Use cases: - Development/testing without limit enforcement - Conditional limit enforcement based on environment - Gradual rollout of rate limiting - Code that optionally uses limits
RateLimit¶
RateLimits enforce time-based constraints on resource usage, such as API tokens, bandwidth, or request rates.
from concurry import LimitSet, RateLimit, RateLimitAlgorithm
# Define rate limit
rate_limit = RateLimit(
key="api_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1000
)
# Create LimitSet for thread-safe usage
limits = LimitSet(limits=[rate_limit])
# Use it
with limits.acquire(requested={"api_tokens": 100}) as acq:
result = call_api()
acq.update(usage={"api_tokens": result.actual_tokens})
CallLimit¶
CallLimit is a special RateLimit for counting calls, where usage is always 1.
from concurry import LimitSet, CallLimit, RateLimitAlgorithm
# Define call limit
call_limit = CallLimit(
window_seconds=60,
algorithm=RateLimitAlgorithm.SlidingWindow,
capacity=100
)
# Create LimitSet
limits = LimitSet(limits=[call_limit])
# Each acquisition counts as 1 call (automatic)
with limits.acquire():
make_api_call()
Key points:
- Fixed key: "call_count"
- Usage is always 1 (validated)
- No need to call update()
- handled automatically
- Perfect for call rate limits independent of resource usage
ResourceLimit¶
ResourceLimits provide simple counting for finite resources like database connections or file handles.
from concurry import LimitSet, ResourceLimit
# Define resource limit
resource_limit = ResourceLimit(
key="db_connections",
capacity=10
)
# Create LimitSet
limits = LimitSet(limits=[resource_limit])
# Acquire 2 connections
with limits.acquire(requested={"db_connections": 2}):
conn1 = get_connection()
conn2 = get_connection()
execute_queries(conn1, conn2)
# Connections automatically released
Key points:
- No time component (unlike RateLimit)
- Automatic release on context exit
- No need to call update()
- handled automatically
- Thread-safe semaphore logic handled by LimitSet
Rate Limiting Algorithms¶
RateLimit supports five algorithms with different characteristics:
TokenBucket¶
Allows bursts up to capacity while maintaining average rate. Tokens refill continuously.
limit = RateLimit(
key="tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1000
)
Best for: APIs that allow occasional bursts
Characteristics: - Burst handling: Excellent - Precision: Good - Memory: Low - Refunding: Yes
LeakyBucket¶
Processes requests at fixed rate, smoothing traffic.
limit = RateLimit(
key="tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.LeakyBucket,
capacity=1000
)
Best for: Predictable, steady-state traffic
Characteristics: - Burst handling: Poor (by design) - Precision: Excellent - Memory: Low - Refunding: No
SlidingWindow¶
Precise rate limiting with rolling time window. More accurate than fixed window.
limit = RateLimit(
key="tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.SlidingWindow,
capacity=1000
)
Best for: Precise rate limiting without fixed window edge cases
Characteristics: - Burst handling: Good - Precision: Excellent - Memory: Higher (stores timestamps) - Refunding: No
FixedWindow¶
Simple rate limiting with fixed time buckets. Fast but can allow 2x burst at window boundaries.
limit = RateLimit(
key="tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.FixedWindow,
capacity=1000
)
Best for: Simple rate limiting where edge cases are acceptable
Characteristics: - Burst handling: Poor (2x burst at boundaries) - Precision: Moderate - Memory: Lowest - Refunding: No
GCRA (Generic Cell Rate Algorithm)¶
Most precise rate limiting using theoretical arrival time tracking.
limit = RateLimit(
key="tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.GCRA,
capacity=1000
)
Best for: Strict rate control with precise timing
Characteristics: - Burst handling: Excellent - Precision: Best - Memory: Low - Refunding: Yes
LimitSet: Multi-Dimensional Limiting¶
LimitSet enables atomic acquisition of multiple limits simultaneously with full thread-safety.
Basic Multi-Dimensional Limiting¶
from concurry import (
LimitSet, RateLimit, CallLimit, ResourceLimit,
RateLimitAlgorithm
)
# Create LimitSet with multiple limit types
limits = LimitSet(limits=[
CallLimit(
window_seconds=60,
algorithm=RateLimitAlgorithm.SlidingWindow,
capacity=100
),
RateLimit(
key="input_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.GCRA,
capacity=10_000
),
RateLimit(
key="output_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1_000
),
ResourceLimit(
key="db_connections",
capacity=10
)
])
# Acquire specific limits atomically
# CallLimit is automatically acquired with default of 1
with limits.acquire(requested={
"input_tokens": 500,
"output_tokens": 50,
"db_connections": 2
}) as acq:
result = process_data()
# Update RateLimits with actual usage
acq.update(usage={
"input_tokens": result.input_used,
"output_tokens": result.output_used
})
# CallLimit and ResourceLimit handled automatically
Key behavior:
- When requested
is specified, CallLimit and ResourceLimit are automatically included with default of 1
- RateLimits must be explicitly specified in requested
- All limits are acquired atomically (all-or-nothing)
Nested Acquisition Pattern¶
LimitSet supports partial acquisition, enabling powerful nested patterns:
# Level 1: Acquire long-lived resources
with limits.acquire(requested={"db_connections": 2}):
# Do setup with connections
# Level 2: Acquire rate limits for operations
# Note: CallLimit still automatically acquired here
with limits.acquire(requested={
"input_tokens": 100,
"output_tokens": 50
}) as rate_acq:
result = call_api()
rate_acq.update(usage={
"input_tokens": result.input_used,
"output_tokens": result.output_used
})
# Connections still held here, but tokens released
# Another rate-limited operation
with limits.acquire(requested={
"input_tokens": 200,
"output_tokens": 20
}) as rate_acq2:
result2 = call_api()
rate_acq2.update(usage={
"input_tokens": result2.input_used,
"output_tokens": result2.output_used
})
# Connections released at end of outer context
Benefits of nested acquisition: - Hold resources only as long as needed - Reduces resource contention - More efficient limit utilization - Better granular control
Non-Blocking try_acquire¶
acq = limits.try_acquire(requested={
"input_tokens": 1000,
"db_connections": 1
})
if acq.successful:
with acq:
# All limits acquired
result = expensive_operation()
acq.update(usage={"input_tokens": result.tokens})
else:
# Could not acquire all limits immediately
print("Resources not available, will retry later")
LimitPool and Config for Multi-Region/Multi-Account Scenarios¶
For high-scale production scenarios with multiple API endpoints (e.g., different AWS regions, multiple accounts, or different service tiers), LimitPool provides a powerful way to distribute load across multiple independent LimitSets
while reducing contention.
Config Parameter on LimitSet¶
Each LimitSet
can have an associated config
dictionary containing metadata that's accessible via the acquisition object. This is useful for passing contextual information (like region, account ID, or API endpoint) when making external API calls.
from concurry import LimitSet, RateLimit, RateLimitAlgorithm
# Create LimitSet with config metadata
us_east_limits = LimitSet(
limits=[
RateLimit(key="tokens", window_seconds=60, capacity=1000)
],
shared=True,
mode="thread",
config={"region": "us-east-1", "account": "12345"}
)
# Access config during acquisition
with us_east_limits.acquire(requested={"tokens": 100}) as acq:
# Get region from config
region = acq.config["region"]
account = acq.config["account"]
# Make API call to specific region/account
result = call_api(region=region, account=account)
# Update usage
acq.update(usage={"tokens": result.tokens_used})
Key Features:
- Config is immutable after acquisition (copies are made)
- Empty dict by default (no config needed for simple cases)
- Accessible via acquisition.config
- Perfect for multi-region/multi-account routing
LimitPool: High-Performance Load Balancing¶
LimitPool
aggregates multiple independent LimitSets
and uses load balancing to distribute acquisitions across them. This dramatically reduces contention when many workers are competing for the same limits.
Benefits:
- Reduced Contention: Workers acquire from different LimitSets
in round-robin fashion
- Higher Throughput: More concurrent acquisitions without blocking
- Multi-Region Support: Each LimitSet
can represent a different region/account
- Scalability: Add more LimitSets
to increase capacity
from concurry import LimitSet, LimitPool, RateLimit, RateLimitAlgorithm
# Create multiple independent LimitSets with different configs
us_east = LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"region": "us-east-1", "endpoint": "https://api-us-east-1.example.com"}
)
us_west = LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"region": "us-west-2", "endpoint": "https://api-us-west-2.example.com"}
)
eu_west = LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"region": "eu-west-1", "endpoint": "https://api-eu-west-1.example.com"}
)
# Create LimitPool with load balancing
pool = LimitPool(
limit_sets=[us_east, us_west, eu_west],
load_balancing="round_robin",
worker_index=0 # Starting offset for round-robin
)
# Acquire from pool - automatically selects a LimitSet
with pool.acquire(requested={"tokens": 100}) as acq:
# Get region and endpoint from selected LimitSet's config
region = acq.config["region"]
endpoint = acq.config["endpoint"]
# Make API call to the selected region
result = requests.post(endpoint, json={"prompt": "Hello"})
# Update usage
acq.update(usage={"tokens": result.json()["tokens_used"]})
Load Balancing Strategies¶
LimitPool
supports two load balancing algorithms:
1. Round-Robin (Default)
- Workers select LimitSets
in sequential order
- Each worker starts at a different offset to minimize overlap
- Best for persistent worker pools
- Provides even distribution across all LimitSets
from concurry import LimitPool
pool = LimitPool(
limit_sets=[limitset1, limitset2, limitset3],
load_balancing="round_robin", # Default
worker_index=0 # Worker's starting offset
)
2. Random
- Randomly selects a LimitSet
for each acquisition
- Best for on-demand workers or bursty workloads
- Provides good distribution with minimal coordination
pool = LimitPool(
limit_sets=[limitset1, limitset2, limitset3],
load_balancing="random",
worker_index=0 # Not used for random
)
Worker Integration with LimitPool¶
Workers automatically handle LimitPool
through self.limits
. When you pass a LimitPool
to a worker, it's used seamlessly for all limit acquisitions.
from concurry import Worker, LimitSet, LimitPool, RateLimit, RateLimitAlgorithm
# Create multiple LimitSets for different regions
limitsets = []
for region, endpoint in [
("us-east-1", "https://api-us-east-1.example.com"),
("us-west-2", "https://api-us-west-2.example.com"),
("eu-west-1", "https://api-eu-west-1.example.com"),
]:
ls = LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"region": region, "endpoint": endpoint}
)
limitsets.append(ls)
# Create LimitPool
limit_pool = LimitPool(
limit_sets=limitsets,
load_balancing="round_robin",
worker_index=0
)
class APIWorker(Worker):
def __init__(self):
pass
def call_api(self, prompt: str):
# Acquire from pool - automatically selects a region
with self.limits.acquire(requested={"tokens": 100}) as acq:
# Get region and endpoint from selected LimitSet's config
region = acq.config["region"]
endpoint = acq.config["endpoint"]
# Make API call
result = requests.post(
endpoint,
json={"prompt": prompt, "max_tokens": 100}
)
# Update actual usage
acq.update(usage={"tokens": result.json()["tokens_used"]})
return {"region": region, "response": result.json()["text"]}
# Create worker with LimitPool
worker = APIWorker.options(mode="thread", limits=limit_pool).init()
# Each call automatically load-balances across regions
result1 = worker.call_api("Hello").result() # Might use us-east-1
result2 = worker.call_api("World").result() # Might use us-west-2
result3 = worker.call_api("AI").result() # Might use eu-west-1
worker.stop()
Worker Pools with LimitPool¶
When using worker pools, each worker gets its own LimitPool
instance with a unique worker_index
for proper round-robin distribution:
from concurry import Worker, LimitSet, LimitPool, RateLimit
# Create multiple LimitSets (one per region)
limitsets = [
LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"region": f"region-{i}", "account": f"account-{i}"}
)
for i in range(5) # 5 regions
]
# Pass list of LimitSets directly - workers get LimitPool automatically
pool = APIWorker.options(
mode="thread",
max_workers=10,
limits=limitsets # List of LimitSets creates LimitPool per worker
).init()
# Each worker has its own LimitPool with proper worker_index offset
# Worker 0: starts at index 0, 5, 10, ...
# Worker 1: starts at index 1, 6, 11, ...
# Worker 2: starts at index 2, 7, 12, ...
# ... minimizes contention through staggered starting points
futures = [pool.call_api(f"prompt-{i}") for i in range(100)]
results = [f.result() for f in futures]
pool.stop()
Use Cases for LimitPool¶
1. Multi-Region API Access
Distribute load across multiple regional API endpoints to maximize throughput and reduce latency:
regions = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"]
limitsets = [
LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=10000)],
shared=True,
mode="thread",
config={"region": region, "endpoint": f"https://api-{region}.example.com"}
)
for region in regions
]
pool = LimitPool(limit_sets=limitsets, load_balancing="round_robin")
2. Multiple API Accounts
Use separate accounts to increase overall rate limits:
accounts = ["account-1", "account-2", "account-3"]
limitsets = [
LimitSet(
limits=[RateLimit(key="requests", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"account": account, "api_key": get_api_key(account)}
)
for account in accounts
]
pool = LimitPool(limit_sets=limitsets, load_balancing="random")
3. Service Tiers
Differentiate between premium and standard tiers:
premium = LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=10000)],
shared=True,
mode="thread",
config={"tier": "premium", "priority": "high"}
)
standard = LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"tier": "standard", "priority": "normal"}
)
pool = LimitPool(limit_sets=[premium, standard], load_balancing="round_robin")
4. High-Contention Workloads
Reduce lock contention by splitting a single limit pool into multiple independent pools:
# Instead of 1 LimitSet with 10,000 capacity (high contention)
# Use 10 LimitSets with 1,000 capacity each (low contention per LimitSet)
limitsets = [
LimitSet(
limits=[RateLimit(key="tokens", window_seconds=60, capacity=1000)],
shared=True,
mode="thread",
config={"pool_id": i}
)
for i in range(10)
]
pool = LimitPool(limit_sets=limitsets, load_balancing="round_robin")
# Workers distribute acquisitions across 10 independent LimitSets
# Dramatically reduces lock contention for high-throughput scenarios
5. Real-World Example: AWS Bedrock Multi-Region, Multi-Account Claude 3.7 Sonnet
This comprehensive example demonstrates using LimitPool
with AWS Bedrock's Claude 3.7 Sonnet across multiple regions and accounts, with realistic rate limits based on AWS quotas:
import boto3
import base64
import json
from concurry import Worker, LimitSet, LimitPool, RateLimit, CallLimit, RateLimitAlgorithm
# AWS Bedrock default quotas for Claude 3.7 (as of 2025)
# - Input tokens per minute (TPM): 400,000
# - Output tokens per minute (TPM): 100,000
# - Requests per minute (RPM): 500
# Configuration for multiple AWS accounts and regions
# Each region has a different model ID for Claude 3.7 Sonnet
bedrock_configs = [
# Region 1: US East (N. Virginia), 2 accounts
{
"region": "us-east-1",
"account_id": "123456789012",
"model_id": "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"profile": "account1",
},
{
"region": "us-east-1",
"account_id": "234567890123",
"model_id": "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"profile": "account2",
},
# Region 2: Europe (Paris), 2 accounts
{
"region": "eu-west-3",
"account_id": "123456789012",
"model_id": "eu.anthropic.claude-3-7-sonnet-20250219-v1:0",
"profile": "account1",
},
{
"region": "eu-west-3",
"account_id": "234567890123",
"model_id": "eu.anthropic.claude-3-7-sonnet-20250219-v1:0",
"profile": "account2",
},
# Region 3: Asia Pacific (Mumbai), 2 accounts
{
"region": "ap-south-1",
"account_id": "123456789012",
"model_id": "apac.anthropic.claude-3-7-sonnet-20250219-v1:0",
"profile": "account1",
},
{
"region": "ap-south-1",
"account_id": "234567890123",
"model_id": "apac.anthropic.claude-3-7-sonnet-20250219-v1:0",
"profile": "account2",
},
]
# Create LimitSets for each account/region combination
# Each LimitSet enforces AWS Bedrock's default quotas
limitsets = []
for config in bedrock_configs:
limitset = LimitSet(
limits=[
# AWS Bedrock: 500 requests per minute per account/region
CallLimit(
window_seconds=60,
algorithm=RateLimitAlgorithm.SlidingWindow,
capacity=500
),
# AWS Bedrock: 400,000 input tokens per minute
RateLimit(
key="input_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=400_000
),
# AWS Bedrock: 100,000 output tokens per minute
RateLimit(
key="output_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=100_000
),
],
shared=True,
mode="thread",
config=config, # Store region, account, and model info
)
limitsets.append(limitset)
# Create LimitPool to distribute load across all account/region combinations
limit_pool = LimitPool(
limit_sets=limitsets,
load_balancing="round_robin",
worker_index=0
)
## Create worker which invokes Claude 3.7 Sonnet with vision capabilities:
class ClaudeWorker(Worker):
"""Worker for calling Claude 3.7 Sonnet on AWS Bedrock with images."""
def __init__(self):
# Bedrock clients will be created per-request based on config
self._clients = {}
def _get_client(self, region: str, profile: str):
"""Get or create a Bedrock client for the given region and profile."""
key = (region, profile)
if key not in self._clients:
session = boto3.Session(profile_name=profile)
self._clients[key] = session.client(
service_name="bedrock-runtime",
region_name=region
)
return self._clients[key]
def analyze_image(
self,
image_path: str,
prompt: str,
max_tokens: int = 1024
) -> dict:
"""Analyze an image using Claude 3.7 Sonnet with vision capabilities.
Args:
image_path: Path to the image file
prompt: Text prompt for image analysis
max_tokens: Maximum output tokens
Returns:
Dictionary with analysis results and metadata
"""
# Acquire limits from the pool
# LimitPool automatically selects an account/region using round-robin
with self.limits.acquire(requested={
"input_tokens": 10_000, # Estimate for prompt + image
"output_tokens": max_tokens
}) as acq:
# Get config from the selected LimitSet
config = acq.config
region = config["region"]
account_id = config["account_id"]
model_id = config["model_id"]
profile = config["profile"]
# Get Bedrock client for this region/account
client = self._get_client(region, profile)
# Read and encode image
with open(image_path, "rb") as f:
image_bytes = f.read()
# Determine image format
import mimetypes
mime_type, _ = mimetypes.guess_type(image_path)
if mime_type not in ["image/jpeg", "image/png", "image/gif", "image/webp"]:
mime_type = "image/jpeg" # Default
# Construct request for Claude 3.7 with vision
request_body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": [
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": mime_type,
"data": base64.b64encode(image_bytes).decode("utf-8")
}
},
{
"type": "text",
"text": prompt
}
]
}
]
}
# Invoke Claude 3.7 Sonnet
try:
response = client.invoke_model(
modelId=model_id,
body=json.dumps(request_body)
)
# Parse response
response_body = json.loads(response["body"].read())
# Extract token usage from response
chars_per_token = 4 # Rough estimate
usage = response_body.get("usage", {})
input_tokens_used = usage.get("input_tokens", 10_000) # Fallback estimate
output_tokens_used = usage.get("output_tokens", len(response_body.get("content", [{}])[0].get("text", "")) // chars_per_token)
# Update actual token usage
acq.update(usage={
"input_tokens": input_tokens_used,
"output_tokens": output_tokens_used
})
# Return results with metadata
return {
"text": response_body["content"][0]["text"],
"region": region,
"account_id": account_id,
"model_id": model_id,
"input_tokens": input_tokens_used,
"output_tokens": output_tokens_used,
"stop_reason": response_body.get("stop_reason"),
}
except Exception as e:
# If error, still update usage with estimates to avoid deadlock
acq.update(usage={
"input_tokens": 10_000,
"output_tokens": 0
})
raise e
# Create worker pool with LimitPool
# 10 workers will distribute load across 6 account/region combinations
with ClaudeWorker.options(
mode="thread",
max_workers=1000,
limits=limit_pool,
num_retries=3,
retry_on=[Exception], # Retry on any Bedrock errors
retry_algorithm="exponential",
retry_wait=1.0,
).init() as pool:
# Process multiple images concurrently
images = [
("image1.jpg", "Describe this image in detail"),
("image2.png", "What objects do you see?"),
("image3.jpg", "Analyze the composition"),
# ... more images
]
# Submit all tasks
futures = [
pool.analyze_image(image_path, prompt)
for image_path, prompt in images
]
# Collect results
results = [f.result() for f in futures]
# Results show which region/account was used for each request
for i, result in enumerate(results):
print(f"Image {i+1}:")
print(f" Region: {result['region']}")
print(f" Account: {result['account_id']}")
print(f" Tokens: {result['input_tokens']} in | {result['output_tokens']} out")
print(f" Response: {result['text'][:100]}...")
print()
# Pool automatically stopped by context manager
# All limits properly tracked across all account/region combinations
Key Benefits of this Approach:
- Maximum Throughput: 6 account/region combinations × 500 RPM = 3,000 requests/minute capacity
- Automatic Failover: If one region/account hits limits, requests automatically go to others
- Cost Optimization: Distribute load across accounts and region to maximize throughput
- Geographic Distribution: Use closest regions for lower latency
- Quota Management: Each LimitSet enforces AWS Bedrock's per-region quotas in each account
- Zero Lock Contention: Workers acquire from different LimitSets, minimizing blocking
- Automatic Retries: Built-in retry logic for transient Bedrock errors (using exponential backoff and minimum wait time of 1 second between retries)
- Token Tracking: Precise tracking of actual token usage vs requested
Monitoring and Scaling:
# Get comprehensive stats across all account/region combinations
stats = limit_pool.get_stats()
print(f"Total LimitSets: {stats['num_limit_sets']}")
print(f"Load Balancing: {stats['load_balancing']}")
print()
for i, ls_stats in enumerate(stats['limit_sets']):
config = limitsets[i].config
print(f"LimitSet {i}: {config['region']} ({config['account_id']})")
print(f" Input Tokens: {ls_stats['input_tokens']}")
print(f" Output Tokens: {ls_stats['output_tokens']}")
print(f" Calls: {ls_stats['call_count']}")
print()
Cost Considerations:
AWS Bedrock pricing varies by region. With this setup, you can: - Route traffic to cheaper regions during off-peak times - Maximize utilization of committed throughput contracts - Implement smart routing based on token costs per region - Track costs per account/region for chargeback
LimitPool Statistics¶
Monitor usage across all LimitSets
in the pool:
# Get comprehensive stats
stats = pool.get_stats()
print(f"Number of LimitSets: {stats['num_limit_sets']}")
print(f"Load balancing: {stats['load_balancing']}")
# Per-LimitSet stats
for i, ls_stats in enumerate(stats['limit_sets']):
print(f"LimitSet {i}:")
for key, limit_stats in ls_stats.items():
print(f" {key}: {limit_stats}")
Accessing Individual LimitSets¶
You can access individual LimitSets
in the pool by index:
pool = LimitPool(limit_sets=[ls1, ls2, ls3])
# Access by index
first_limitset = pool[0]
second_limitset = pool[1]
# Get stats for specific LimitSet
stats = first_limitset.get_stats()
# Direct acquisition from specific LimitSet
with first_limitset.acquire(requested={"tokens": 100}) as acq:
# Use acq as normal
pass
Backward Compatibility:
For pools with a single LimitSet
, you can also access Limit
objects by key:
# Single LimitSet pool
pool = LimitPool(limit_sets=[limitset])
# Access Limit by key (only works for single-LimitSet pools)
token_limit = pool["tokens"]
stats = token_limit.get_stats()
Worker Integration¶
Limits integrate seamlessly with Workers via the limits
parameter. You can pass a LimitSet
, a LimitPool
, or a list of Limit
objects.
Option 1: Pass LimitSet (Recommended for Sharing)¶
from concurry import Worker, LimitSet, RateLimit, ResourceLimit, RateLimitAlgorithm
# Create shared LimitSet
shared_limits = LimitSet(
limits=[
RateLimit(
key="api_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1000
),
ResourceLimit(
key="db_connections",
capacity=5
)
],
shared=True, # Shared across workers (default)
mode="thread" # Match worker mode
)
class LLMWorker(Worker):
def __init__(self, model: str):
self.model = model
def process(self, prompt: str) -> str:
# Nested acquisition pattern
with self.limits.acquire(requested={"db_connections": 1}):
context = get_context_from_db()
with self.limits.acquire(requested={"api_tokens": 500}) as acq:
result = call_llm(self.model, prompt, context)
acq.update(usage={"api_tokens": result.tokens_used})
return result.text
# Multiple workers share the same limits
workers = [
LLMWorker.options(mode="thread", limits=shared_limits).init("gpt-4")
for _ in range(5)
]
Option 2: Pass List of Limits (Private Per Worker)¶
# Define limits as list
limit_definitions = [
RateLimit(
key="api_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1000
),
ResourceLimit(key="db_connections", capacity=5)
]
# Each worker creates its own private LimitSet
worker = LLMWorker.options(
mode="thread",
limits=limit_definitions # List, not LimitSet
).init("gpt-4")
Behavior:
- Passing a LimitSet
: Workers share the same limits
- Passing a List[Limit]
: Each worker gets its own private LimitSet
- Omitting limits
parameter: Workers get empty LimitSet (always succeeds)
Option 3: No Limits (Default)¶
Workers always have self.limits
available. If you don't pass the limits
parameter, workers automatically get an empty LimitSet that always allows acquisition.
from concurry import Worker
class SimpleWorker(Worker):
def process(self, data):
# self.limits is always available, even without configuration
with self.limits.acquire():
# Always succeeds immediately, no blocking
return do_work(data)
# Worker without limits - self.limits.acquire() always succeeds
worker = SimpleWorker.options(mode="thread").init()
result = worker.process(data).result()
worker.stop()
Key benefits:
- Write limit-aware code once
- Conditionally enable limits based on environment
- No runtime checks needed (if self.limits is not None
)
- Zero overhead when limits not configured
- Enables gradual adoption and testing
Example: Conditional limits based on environment
import os
from concurry import Worker, LimitSet, RateLimit, RateLimitAlgorithm
# Define limits only in production
limits = None
if os.getenv("ENV") == "production":
limits = LimitSet(
limits=[
RateLimit(
key="api_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1000
)
],
shared=True,
mode="thread"
)
class APIWorker(Worker):
def call_api(self, prompt: str):
# Code works the same regardless of whether limits are configured
with self.limits.acquire(requested={"api_tokens": 100}) as acq:
result = external_api(prompt)
acq.update(usage={"api_tokens": result.tokens})
return result.text
# Production: limits enforced
# Development: limits always succeed, no blocking
worker = APIWorker.options(mode="thread", limits=limits).init()
Execution Modes¶
Limits work with all execution modes, with appropriate backend selection:
Mode | LimitSet Backend | Shared Across |
---|---|---|
sync |
InMemorySharedLimitSet |
Same process |
thread |
InMemorySharedLimitSet |
Same process |
asyncio |
InMemorySharedLimitSet |
Same process |
process |
MultiprocessSharedLimitSet |
Multiple processes |
ray |
RaySharedLimitSet |
Ray cluster |
# For process workers
process_limits = LimitSet(
limits=[...],
shared=True,
mode="process" # Uses multiprocessing.Manager
)
worker = MyWorker.options(mode="process", limits=process_limits).init()
# For Ray workers
ray_limits = LimitSet(
limits=[...],
shared=True,
mode="ray" # Uses Ray actor
)
worker = MyWorker.options(mode="ray", limits=ray_limits).init()
Shared vs Non-Shared LimitSets¶
LimitSet
supports both shared and non-shared modes via the shared
parameter (defaults to True
).
Shared LimitSets (shared=True, default)¶
Multiple workers share the same limit pool:
# Create shared LimitSet
shared_limits = LimitSet(
limits=[
RateLimit(
key="api_tokens",
window_seconds=60,
algorithm=RateLimitAlgorithm.TokenBucket,
capacity=1000
)
],
shared=True, # Default
mode="thread"
)
# All workers share the 1000 token/minute limit
workers = [
APIWorker.options(mode="thread", limits=shared_limits).init()
for _ in range(5)
]
# If worker 1 uses 600 tokens, only 400 remain for all workers
Non-Shared LimitSets (shared=False)¶
Each worker gets its own independent limit pool:
# Create non-shared LimitSet (less common)
non_shared_limits = LimitSet(
limits=[...],
shared=False,
mode="sync" # Must be "sync" for non-shared
)
# Each worker gets its own copy with separate limits
worker1 = APIWorker.options(mode="sync", limits=non_shared_limits).init()
worker2 = APIWorker.options(mode="sync", limits=non_shared_limits).init()
# worker1's usage doesn't affect worker2's limits
Note: Non-shared mode only works with mode="sync"
. For most use cases, you want shared=True
(the default).
Backend Types and Performance¶
LimitSet automatically selects the appropriate backend based on mode
:
Backend | Modes | Synchronization | Overhead |
---|---|---|---|
InMemorySharedLimitSet |
sync, thread, asyncio | threading.Lock |
1-5 μs |
MultiprocessSharedLimitSet |
process | multiprocessing.Manager |
50-100 μs |
RaySharedLimitSet |
ray | Ray actor (0.01 CPU) | 500-1000 μs |
# InMemorySharedLimitSet: Fast, in-process
thread_limits = LimitSet(limits=[...], shared=True, mode="thread")
# MultiprocessSharedLimitSet: Cross-process
process_limits = LimitSet(limits=[...], shared=True, mode="process")
# RaySharedLimitSet: Distributed
ray_limits = LimitSet(limits=[...], shared=True, mode="ray")
Advanced Patterns¶
Conditional Limiting¶
def process_with_priority(priority: str, data):
# High priority gets more tokens
requested = {"api_tokens": 1000 if priority == "high" else 100}
with limits.acquire(requested=requested) as acq:
result = process(data)
acq.update(usage={"api_tokens": result.actual_tokens})
return result
Graceful Degradation¶
def process_with_fallback(data):
# Try premium service first
acq = premium_limits.try_acquire(requested={"premium_tokens": 100})
if acq.successful:
with acq:
result = premium_service(data)
acq.update(usage={"premium_tokens": result.tokens})
return result
else:
# Fall back to basic service
with basic_limits.acquire(requested={"basic_tokens": 10}) as acq:
result = basic_service(data)
acq.update(usage={"basic_tokens": result.tokens})
return result
Monitoring and Observability¶
def monitor_limits(limits: LimitSet):
"""Print current limit statistics."""
stats = limits.get_stats()
for key, limit_stats in stats.items():
print(f"\nLimit: {key}")
for stat_name, value in limit_stats.items():
print(f" {stat_name}: {value}")
# Get individual limit stats
token_limit = limits["api_tokens"]
token_stats = token_limit.get_stats()
print(f"Available tokens: {token_stats['available_tokens']}")
print(f"Utilization: {token_stats['utilization']:.2%}")
Timeout Handling¶
try:
with limits.acquire(
requested={"api_tokens": 1000},
timeout=5.0
) as acq:
result = expensive_operation()
acq.update(usage={"api_tokens": result.tokens})
except TimeoutError:
print("Could not acquire tokens within 5 seconds")
# Handle timeout - queue for later, use cached result, etc.
Best Practices¶
1. Choose the Right Algorithm¶
- TokenBucket: For APIs with burst tolerance (most common)
- GCRA: For strict rate control with precise timing
- SlidingWindow: When you need precision without burst issues
- LeakyBucket: For smooth, predictable traffic
- FixedWindow: When simplicity matters more than edge cases
2. Always Use LimitSet, Not Limit Directly¶
# ✅ Good: Use LimitSet for thread-safe operations
limits = LimitSet(limits=[
RateLimit(key="tokens", window_seconds=60, capacity=1000)
])
with limits.acquire(requested={"tokens": 100}) as acq:
result = operation()
acq.update(usage={"tokens": result.actual_cost})
# ❌ Bad: Don't use Limit directly (not thread-safe!)
limit = RateLimit(key="tokens", window_seconds=60, capacity=1000)
# limit.acquire() doesn't exist!
3. Always Update RateLimit Usage¶
# ✅ Good: Report actual usage for RateLimits
with limits.acquire(requested={"tokens": 100}) as acq:
result = operation()
acq.update(usage={"tokens": result.actual_cost})
# ❌ Bad: Missing update for RateLimit (raises RuntimeError)
with limits.acquire(requested={"tokens": 100}) as acq:
result = operation()
# Missing acq.update()! Will raise error on context exit
Note: CallLimit and ResourceLimit are automatic and don't need update()
.
4. Use Nested Acquisition for Better Resource Management¶
# ✅ Good: Nest resources and rate limits
with limits.acquire(requested={"db_connections": 1}):
# Setup
with limits.acquire(requested={"tokens": 100}) as acq:
result = do_work()
acq.update(usage={"tokens": result.tokens})
# Connection still held, tokens released
# ❌ Avoid: Acquiring everything at once for long operations
with limits.acquire(requested={"db_connections": 1, "tokens": 100}) as acq:
# Connection AND tokens held for entire duration
long_running_operation()
acq.update(usage={"tokens": 100})
5. Handle Timeouts Gracefully¶
# ✅ Good: Handle timeout and provide feedback
try:
with limits.acquire(requested={"tokens": 1000}, timeout=3.0) as acq:
result = operation()
acq.update(usage={"tokens": result.tokens})
except TimeoutError:
logger.warning("Rate limit timeout, queueing for later")
queue.put(task)
6. Monitor Limit Utilization¶
# ✅ Good: Regular monitoring
def check_limit_health():
stats = limits.get_stats()
for key, limit_stats in stats.items():
if limit_stats.get('utilization', 0) > 0.9:
alert(f"Limit {key} at {limit_stats['utilization']:.0%}")
7. Match LimitSet Mode to Worker Mode¶
# ✅ Good: Match modes for shared limits
thread_limits = LimitSet(limits=[...], shared=True, mode="thread")
workers = [
Worker.options(mode="thread", limits=thread_limits).init()
for _ in range(5)
]
# ❌ Don't: Mix execution modes
process_limits = LimitSet(limits=[...], shared=True, mode="process")
worker = Worker.options(mode="thread", limits=process_limits).init()
# ^ Won't work! Mode mismatch
8. Use Partial Acquisition (CallLimit/ResourceLimit Auto-Included)¶
# ✅ Good: Specify only what you need - CallLimit auto-acquired
limits = LimitSet(limits=[
CallLimit(window_seconds=60, capacity=100),
RateLimit(key="tokens", window_seconds=60, capacity=1000)
])
# CallLimit automatically acquired with default of 1
with limits.acquire(requested={"tokens": 50}) as acq:
result = operation()
acq.update(usage={"tokens": result.tokens})
# CallLimit was automatically acquired and released
Error Handling¶
Common Errors¶
ValueError: Usage exceeds requested
# Cause: Trying to use more than requested
acq = limit.acquire(requested=100)
acq.update(used=150) # Error!
# Solution: Request sufficient amount upfront
acq = limit.acquire(requested=200)
acq.update(used=150) # OK
RuntimeError: Not all limits updated
# Cause: Missing update() call for RateLimit
with limits.acquire(requested={"tokens": 100}) as acq:
pass # Error on exit - no update!
# Solution: Always update RateLimits
with limits.acquire(requested={"tokens": 100}) as acq:
result = operation()
acq.update(usage={"tokens": result.tokens})
TimeoutError: Failed to acquire
# Cause: Could not acquire within timeout
limits.acquire(requested={"tokens": 1000}, timeout=1.0)
# Solution: Handle timeout or increase timeout
try:
limits.acquire(requested={"tokens": 1000}, timeout=5.0)
except TimeoutError:
# Queue for later, use cached result, etc.
pass
Integration with Retry Mechanisms¶
Limits work seamlessly with Concurry's retry mechanism. When a method is retried, limits are automatically released between attempts to prevent deadlocks and ensure fair resource usage.
Automatic Limit Release on Retry¶
When using limits with retry configuration, the system automatically:
- Acquires limits before method execution
- If method fails and should retry:
- Releases all acquired limits
- Waits for retry delay
- Reacquires limits for next attempt
- Releases limits after final success or failure
from concurry import Worker, ResourceLimit
class DatabaseWorker(Worker):
def query(self, sql: str) -> list:
# Acquire database connection
with self.limits.acquire(requested={"connections": 1}) as acq:
result = execute_query(sql)
acq.update(usage={"connections": 1})
return result
worker = DatabaseWorker.options(
mode="thread",
num_retries=3,
retry_on=[DatabaseError],
limits=[ResourceLimit(key="connections", capacity=5)]
).init()
# If query fails:
# - Connection is automatically released
# - Wait for retry delay
# - Connection is reacquired for retry
# - No deadlocks!
Rate Limits with Retry¶
Rate limits are properly managed across retries:
from concurry import RateLimit
class APIWorker(Worker):
def call_api(self, endpoint: str) -> dict:
with self.limits.acquire(requested={"requests": 1}) as acq:
response = requests.get(f"{self.base_url}/{endpoint}")
acq.update(usage={"requests": 1})
return response.json()
worker = APIWorker.options(
mode="thread",
num_retries=5,
retry_algorithm="exponential",
limits=[RateLimit(key="requests", window_seconds=60, capacity=100)]
).init()
# Each retry attempt counts as a separate request
# Limits are released between attempts
# Total budget is respected across all attempts
Shared Limits with Retry¶
When using shared limits across a pool, retries coordinate properly:
from concurry import LimitSet, ResourceLimit
# Create shared limit
shared_limits = LimitSet(
limits=[ResourceLimit(key="db_connections", capacity=10)],
shared=True,
mode="thread"
)
# Pool with shared limits and retry
pool = DatabaseWorker.options(
mode="thread",
max_workers=20, # 20 workers share 10 connections
num_retries=3,
retry_on=[DatabaseError],
limits=shared_limits
).init()
# Benefits:
# - Each worker's retries properly release/acquire shared limits
# - No starvation - limits are freed between attempts
# - Fair resource distribution across all workers
Call Limits with Retry¶
CallLimit
automatically tracks retry attempts:
from concurry import CallLimit
worker = MyWorker.options(
mode="thread",
num_retries=3,
limits=[CallLimit(window_seconds=1, capacity=10)]
).init()
# CallLimit counts each attempt (initial + retries)
# Each retry attempt is a separate "call" for limit purposes
# Automatically managed - no manual update needed
Best Practices for Limits with Retry¶
1. Size Limits for Worst-Case Retry Scenarios
# If each request can retry 3 times, and you have 10 workers:
# Worst case: 10 * (1 + 3) = 40 total attempts
# Size rate limits accordingly
worker = MyWorker.options(
mode="thread",
max_workers=10,
num_retries=3,
limits=[RateLimit(
key="requests",
window_seconds=60,
capacity=100 # Accounts for retries
)]
).init()
2. Use Resource Limits to Prevent Resource Exhaustion
# Limit concurrent database connections
worker = DatabaseWorker.options(
mode="thread",
num_retries=3,
limits=[ResourceLimit(key="connections", capacity=10)]
).init()
# Even with retries, never exceeds 10 concurrent connections
3. Combine with Exponential Backoff
# Retry with backoff reduces rate limit pressure
worker = APIWorker.options(
mode="thread",
num_retries=5,
retry_algorithm="exponential", # Increases wait time
retry_wait=1.0,
limits=[RateLimit(key="requests", window_seconds=60, capacity=100)]
).init()
# Later retries have longer delays, spreading out rate limit usage
4. Monitor Limit Utilization with Retries
def should_retry_with_limit_check(exception, attempt, **ctx):
"""Smart retry that backs off if limits are tight."""
if attempt > 3:
return False # Don't retry too many times
# Check if we should retry based on exception
return isinstance(exception, (ConnectionError, TimeoutError))
worker = MyWorker.options(
mode="thread",
num_retries=5,
retry_on=should_retry_with_limit_check,
limits=[RateLimit(key="requests", window_seconds=60, capacity=100)]
).init()
5. Use Shared Limits for Pool-Wide Retry Coordination
# Shared limits ensure fair resource distribution even with retries
shared_limits = LimitSet(
limits=[
ResourceLimit(key="resources", capacity=20),
RateLimit(key="requests", window_seconds=60, capacity=200)
],
shared=True,
mode="thread"
)
pool = MyWorker.options(
mode="thread",
max_workers=10,
num_retries=3,
limits=shared_limits
).init()
# All workers share the limits
# Retries don't cause resource starvation
For comprehensive retry documentation, see the Retry Mechanisms Guide.
Performance Considerations¶
Acquisition Overhead¶
Backend | Overhead | Use Case |
---|---|---|
InMemory | 1-5 μs | Single process |
Multiprocess | 50-100 μs | Multi-process |
Ray | 500-1000 μs | Distributed |
Algorithm Performance¶
Algorithm | Memory | CPU | Precision |
---|---|---|---|
FixedWindow | Lowest | Lowest | Moderate |
TokenBucket | Low | Low | Good |
GCRA | Low | Low | Best |
LeakyBucket | Low | Medium | Excellent |
SlidingWindow | Higher | Medium | Excellent |
Optimization Tips¶
- Batch operations when possible to reduce acquire/release cycles
- Use try_acquire for non-critical operations
- Monitor utilization to right-size limits
- Choose simpler algorithms (FixedWindow, TokenBucket) for high-throughput scenarios
- Use nested acquisition to minimize resource holding time
Submission Queue: Client-Side Task Queuing¶
In addition to rate limiting and resource limits, Concurry provides submission queuing to prevent overloading worker backends when submitting large batches of tasks. This feature limits the number of "in-flight" tasks per worker before they even reach the backend execution environment.
Overview¶
The max_queued_tasks
parameter controls how many tasks can be submitted to a worker's backend before blocking on the client side. This prevents issues like:
- Memory exhaustion from thousands of pending futures
- Backend overload from too many queued tasks (especially Ray actors)
- Network saturation when submitting large batches to distributed workers
- Resource contention from excessive concurrent task submissions
Key Characteristics: - Client-side queuing: Tasks block during submission, not execution - Per-worker limit: Each worker (or worker in a pool) has its own independent queue - Transparent to users: Your submission loops don't need modification - Compatible with all features: Works seamlessly with limits, retries, polling, and load balancing - Automatic release: Queue slots released automatically when tasks complete (via future callbacks)
How It Works¶
from concurry import Worker
class DataProcessor(Worker):
def process(self, data: str) -> str:
return data.upper()
# Create worker with submission queue
worker = DataProcessor.options(
mode="thread",
max_queued_tasks=5 # Max 5 tasks "in-flight" at once
).init()
# Submit 100 tasks
# - First 5 submit immediately
# - 6th submission blocks until one of the first 5 completes
# - As tasks complete, new submissions proceed
futures = [worker.process(f"data-{i}") for i in range(100)]
# Gather results (submission queue prevents overload)
results = [f.result() for f in futures]
worker.stop()
What Happens:
1. First 5 submissions succeed immediately (queue has capacity=5)
2. 6th submission blocks on worker.process()
call
3. When 1st task completes, its future callback releases a queue slot
4. 6th submission proceeds
5. This continues until all 100 tasks are submitted and completed
Default Values by Mode¶
Mode | Default max_queued_tasks |
Reasoning |
---|---|---|
sync |
None (bypassed) |
Immediate execution, no queuing needed |
asyncio |
None (bypassed) |
Event loop handles concurrency |
thread |
100 |
High concurrency, large queue OK |
process |
5 |
Limited by CPU cores, smaller queue |
ray |
2 |
Distributed, minimize in-flight tasks |
Blocking Mode: Automatically bypassed (max_queued_tasks has no effect)
Basic Usage¶
Single Worker¶
from concurry import Worker
class SlowWorker(Worker):
def slow_task(self, x: int) -> int:
time.sleep(0.1)
return x * 2
# Limit in-flight tasks to prevent overload
worker = SlowWorker.options(
mode="process",
max_queued_tasks=3 # Max 3 tasks at once
).init()
# Submit 50 tasks - submission loop blocks as needed
futures = []
for i in range(50):
# This blocks when queue is full (3 tasks pending)
# Automatically unblocks when a task completes
f = worker.slow_task(i)
futures.append(f)
# All tasks complete successfully
results = [f.result() for f in futures]
worker.stop()
Worker Pool¶
from concurry import Worker
class APIWorker(Worker):
def call_api(self, url: str) -> dict:
return requests.get(url).json()
# Pool with per-worker queues
pool = APIWorker.options(
mode="thread",
max_workers=10, # 10 workers
max_queued_tasks=5, # 5 in-flight per worker
load_balancing="round_robin"
).init()
# Total capacity: 10 workers × 5 queue = 50 concurrent tasks
# Submissions beyond 50 block until slots free up
futures = [pool.call_api(f"https://api.example.com/{i}") for i in range(500)]
results = [f.result() for f in futures]
pool.stop()
Per-Worker Queues:
- Each worker in the pool has its own independent queue
- Worker 0: Can have 5 tasks in-flight
- Worker 1: Can have 5 tasks in-flight
- ...and so on
- Load balancing distributes tasks across all workers
- Total capacity = max_workers × max_queued_tasks
Integration with Synchronization Primitives¶
The submission queue is designed to work seamlessly with wait()
and gather()
- the primary use case for batch task submission:
from concurry import Worker, gather, wait, ReturnWhen
class DataProcessor(Worker):
def process(self, data: str) -> str:
time.sleep(0.05) # Simulate work
return data.upper()
worker = DataProcessor.options(
mode="thread",
max_queued_tasks=10
).init()
# Submit large batch
# Submission queue prevents memory/backend overload
futures = [worker.process(f"item-{i}") for i in range(1000)]
# Gather all results (submission already completed)
results = gather(futures, timeout=60.0)
print(f"Processed {len(results)} items")
worker.stop()
With wait():
# Submit batch with submission queue
futures = [worker.process(data) for data in large_dataset]
# Wait for all to complete
done, not_done = wait(futures, return_when=ReturnWhen.ALL_COMPLETED, timeout=300.0)
print(f"Completed: {len(done)}, Pending: {len(not_done)}")
Integration with Limits¶
Submission queue and resource limits serve different purposes and work together:
Submission Queue: - Limits tasks submitted to backend (client-side) - Prevents overloading worker queues/memory - Applies before task reaches worker
Resource Limits: - Limits concurrent execution within worker (worker-side) - Protects external resources (APIs, databases) - Applies during task execution
from concurry import Worker, ResourceLimit, RateLimit, LimitSet
class DatabaseWorker(Worker):
def query(self, sql: str) -> list:
# Acquire resource limits during execution
with self.limits.acquire(requested={
"connections": 1,
"queries": 1
}) as acq:
result = execute_query(sql)
acq.update(usage={"queries": 1})
return result
# Create shared limits
limits = LimitSet(
limits=[
ResourceLimit(key="connections", capacity=5), # Max 5 concurrent queries
RateLimit(key="queries", window_seconds=60, capacity=100) # 100 queries/min
],
shared=True,
mode="thread"
)
# Worker with both submission queue AND limits
worker = DatabaseWorker.options(
mode="thread",
max_queued_tasks=10, # Max 10 submitted at once (client-side)
limits=limits # Max 5 executing concurrently (worker-side)
).init()
# Submit 100 queries
# - Submission queue limits to 10 in-flight tasks
# - Resource limit ensures only 5 execute concurrently
# - Rate limit ensures no more than 100 queries/minute
futures = [worker.query(f"SELECT * FROM table_{i}") for i in range(100)]
results = [f.result() for f in futures]
worker.stop()
Flow: 1. Submission Queue (Client): Task waits here if 10+ tasks already submitted 2. Worker Queue (Backend): Task enters worker's execution queue 3. Resource Limits (Worker): Task waits here if 5+ queries already executing 4. Execution: Task runs 5. Completion: Releases resource limit, frees submission queue slot
Integration with Retries¶
Submission queue counts original submissions, not retry attempts:
from concurry import Worker
class FlakeyWorker(Worker):
def __init__(self):
self.attempt_count = 0
def flakey_task(self, fail_count: int) -> str:
self.attempt_count += 1
if self.attempt_count <= fail_count:
raise ValueError(f"Attempt {self.attempt_count} failed")
return "Success"
worker = FlakeyWorker.options(
mode="thread",
num_retries=5, # Up to 5 retries
retry_wait=0.1,
max_queued_tasks=3 # Only 3 tasks count toward queue
).init()
# Submit 3 tasks that will retry multiple times
# Retries don't count toward submission queue (only original submissions)
f1 = worker.flakey_task(2) # Fails 2 times, succeeds on 3rd
f2 = worker.flakey_task(3) # Fails 3 times, succeeds on 4th
f3 = worker.flakey_task(1) # Fails 1 time, succeeds on 2nd
# Fourth submission can proceed as soon as ANY of the above completes
# (regardless of how many retries they needed)
f4 = worker.flakey_task(2)
results = [f.result() for f in [f1, f2, f3, f4]]
worker.stop()
Key Points:
- Each worker.method()
call counts as 1 submission (regardless of retries)
- Retries happen inside the worker, not counted toward queue
- Submission slot freed when task fully completes (after all retries)
High-Volume Scenarios¶
For scenarios with thousands of tasks (e.g., LLM batch processing), submission queue prevents memory exhaustion:
from concurry import Worker, RateLimit, LimitSet, gather
class LLMWorker(Worker):
def generate(self, prompt: str, max_tokens: int = 100) -> str:
with self.limits.acquire(requested={
"input_tokens": len(prompt) * 4, # Rough estimate
"output_tokens": max_tokens
}) as acq:
result = call_llm_api(prompt, max_tokens)
acq.update(usage={
"input_tokens": result.input_tokens,
"output_tokens": result.output_tokens
})
return result.text
# Create shared limits for API quotas
limits = LimitSet(
limits=[
RateLimit(key="input_tokens", window_seconds=60, capacity=400_000),
RateLimit(key="output_tokens", window_seconds=60, capacity=100_000),
],
shared=True,
mode="thread"
)
# Pool with submission queue for high volume
pool = LLMWorker.options(
mode="thread",
max_workers=50,
max_queued_tasks=10, # 10 per worker = 500 total capacity
limits=limits
).init()
# Process 10,000 prompts without memory issues
# Submission queue prevents creating 10,000 futures at once
prompts = [f"Prompt {i}" for i in range(10_000)]
futures = [pool.generate(prompt) for prompt in prompts]
# Gather results in batches to keep memory under control
batch_size = 1000
all_results = []
for i in range(0, len(futures), batch_size):
batch = futures[i:i+batch_size]
results = gather(batch, timeout=300.0)
all_results.extend(results)
print(f"Processed {len(all_results)}/{len(prompts)} prompts")
pool.stop()
On-Demand Workers¶
On-demand workers (ephemeral, created per request) automatically bypass submission queuing since the pool already limits concurrent workers:
from concurry import Worker
class OnDemandWorker(Worker):
def task(self, x: int) -> int:
return x * 2
# On-demand pool manages worker creation
pool = OnDemandWorker.options(
mode="thread",
max_workers=10, # Max 10 concurrent on-demand workers
on_demand=True,
max_queued_tasks=5 # Ignored for on-demand workers
).init()
# Pool creates/destroys workers as needed
# Max 10 workers run concurrently (managed by pool)
futures = [pool.task(i) for i in range(100)]
results = [f.result() for f in futures]
pool.stop()
Why on-demand bypasses submission queue:
- On-demand workers are ephemeral (created per request, destroyed after completion)
- The pool's max_workers
already limits concurrent workers
- Each on-demand worker only handles 1 task
- Adding submission queue would be redundant and cause deadlocks
Tuning Submission Queue Length¶
Guidelines:
- Short, Fast Tasks (< 100ms):
- Use larger queues:
max_queued_tasks=50-100
- Amortizes submission overhead
-
Keeps workers fed with tasks
-
Long-Running Tasks (> 1s):
- Use smaller queues:
max_queued_tasks=2-5
- Reduces memory footprint
-
Prevents excessive pending work
-
I/O-Bound Tasks:
- Threads:
max_queued_tasks=20-100
- AsyncIO: Bypass (set to
None
) -
High concurrency works well
-
CPU-Bound Tasks:
- Process:
max_queued_tasks=2-5
-
Limited by cores, small queue sufficient
-
Distributed (Ray):
- Ray:
max_queued_tasks=2-5
- Minimize data transfer overhead
- Prevent actor queue saturation
Examples:
# Fast I/O tasks - large queue
worker = FastAPIWorker.options(
mode="thread",
max_queued_tasks=100
).init()
# Slow LLM tasks - small queue
worker = LLMWorker.options(
mode="thread",
max_queued_tasks=3
).init()
# CPU-intensive - small queue
worker = MLWorker.options(
mode="process",
max_queued_tasks=2
).init()
# Ray distributed - small queue
worker = RayWorker.options(
mode="ray",
max_queued_tasks=2
).init()
Monitoring Submission Queues¶
For worker pools, you can inspect submission queue status:
pool = Worker.options(
mode="thread",
max_workers=10,
max_queued_tasks=5
).init()
# Get pool statistics
stats = pool.get_pool_stats()
print(f"Workers: {stats['total_workers']}")
print(f"Queue length per worker: {stats['max_queued_tasks']}")
print(f"Queue info: {stats['submission_queues']}")
# Per-worker queue info
for queue_info in stats['submission_queues']:
print(f"Worker {queue_info['worker_idx']}: capacity={queue_info['capacity']}")
Best Practices¶
1. Match Queue Size to Task Characteristics
# ✅ Good: Small queue for expensive operations
expensive_worker = Worker.options(
mode="thread",
max_queued_tasks=3 # Don't overload
).init()
# ✅ Good: Large queue for cheap operations
cheap_worker = Worker.options(
mode="thread",
max_queued_tasks=100 # Keep workers fed
).init()
2. Use with Synchronization Primitives
# ✅ Good: Submission queue + gather
futures = [worker.process(item) for item in large_batch]
results = gather(futures, timeout=300.0)
# ✅ Good: Submission queue + wait
done, not_done = wait(futures, timeout=300.0)
3. Combine with Resource Limits
# ✅ Good: Layered protection
worker = Worker.options(
mode="thread",
max_queued_tasks=10, # Client-side limit
limits=[ResourceLimit(key="resources", capacity=5)] # Worker-side limit
).init()
4. Don't Mix with Blocking Mode
# ❌ Avoid: Submission queue has no effect in blocking mode
worker = Worker.options(
mode="thread",
blocking=True, # Returns results directly
max_queued_tasks=5 # Ignored!
).init()
# ✅ Good: Use non-blocking mode for submission queue
worker = Worker.options(
mode="thread",
blocking=False, # Returns futures
max_queued_tasks=5
).init()
5. Let Defaults Work for Most Cases
# ✅ Good: Defaults are well-tuned for each mode
worker_thread = Worker.options(mode="thread").init() # max_queued_tasks=100
worker_process = Worker.options(mode="process").init() # max_queued_tasks=5
worker_ray = Worker.options(mode="ray").init() # max_queued_tasks=2
Advanced: Bypassing Submission Queue¶
To explicitly bypass submission queue (unlimited in-flight tasks):
worker = Worker.options(
mode="thread",
max_queued_tasks=None # Bypass submission queue
).init()
# Be careful! Can create thousands of futures at once
futures = [worker.task(i) for i in range(10_000)]
When to bypass: - Testing/debugging - Very fast tasks with negligible memory footprint - Custom batching logic in your code - Sync or AsyncIO modes (already bypassed by default)
Troubleshooting¶
Issue: Submissions blocking longer than expected
# Problem: Queue too small for workload
worker = Worker.options(
mode="thread",
max_queued_tasks=2 # Too small!
).init()
# Solution: Increase queue length
worker = Worker.options(
mode="thread",
max_queued_tasks=20 # Better
).init()
Issue: Memory usage high despite submission queue
# Problem: Futures themselves consume memory
futures = [worker.task(i) for i in range(100_000)] # 100k futures in memory
# Solution: Process in batches
batch_size = 1000
for i in range(0, len(data), batch_size):
batch_futures = [worker.task(x) for x in data[i:i+batch_size]]
results = gather(batch_futures)
process_results(results)
Issue: Deadlock with on-demand workers
# Problem: On-demand workers stuck
pool = Worker.options(
mode="thread",
on_demand=True,
max_workers=5,
max_queued_tasks=2 # Can cause issues
).init()
# Solution: On-demand automatically bypasses submission queue
# (This is handled automatically by Concurry)
See Also¶
- Workers Guide - Integrating limits with Workers
- Retry Mechanisms Guide - Using retries with limits
- Worker Pools Guide - Shared limits across pools
- API Reference - Detailed API documentation
- Examples - More limit usage examples