Skip to content

Limits API Reference

concurry.core.limit.limit.Limit

Bases: Typed, ABC

Abstract base class for all limit types.

A Limit is a simple data container that defines constraints on resource usage. Limits are NOT thread-safe and cannot be acquired directly. They must be used within a LimitSet for thread-safe acquisition and management.

Attributes:

Name Type Description
key str

Unique identifier for this limit within a LimitSet. Used to reference the limit when acquiring or updating usage.

Thread-Safety

Limits are NOT thread-safe. All internal state (e.g., rate limiter implementations, current usage counters) is unprotected. LimitSet provides the necessary locking and synchronization for safe concurrent access.

Usage

Limits should only be instantiated and used within a LimitSet:

# Create limit definitions
limit = RateLimit(key="api", window_seconds=60, ...)

# Use within thread-safe LimitSet
limit_set = LimitSet(limits=[limit])
with limit_set.acquire(requested={"api": 10}) as acq:
    # Safe concurrent access
    ...
Subclass Requirements

Subclasses must implement: - can_acquire(requested): Check if limit can accommodate amount - validate_usage(requested, used): Validate actual usage - get_stats(): Return current statistics

See Also
  • RateLimit: Time-based rate limiting with configurable algorithms
  • CallLimit: Call counting (usage always 1)
  • ResourceLimit: Semaphore-based resource limiting
  • LimitSet: Thread-safe atomic multi-limit acquisition
Source code in src/concurry/core/limit/limit.py
class Limit(Typed, ABC):
    """Abstract base class for all limit types.

    A Limit is a simple data container that defines constraints on resource usage.
    Limits are NOT thread-safe and cannot be acquired directly. They must be used
    within a LimitSet for thread-safe acquisition and management.

    Attributes:
        key: Unique identifier for this limit within a LimitSet. Used to
            reference the limit when acquiring or updating usage.

    Thread-Safety:
        **Limits are NOT thread-safe.** All internal state (e.g., rate limiter
        implementations, current usage counters) is unprotected. LimitSet provides
        the necessary locking and synchronization for safe concurrent access.

    Usage:
        Limits should only be instantiated and used within a LimitSet:

        ```python
        # Create limit definitions
        limit = RateLimit(key="api", window_seconds=60, ...)

        # Use within thread-safe LimitSet
        limit_set = LimitSet(limits=[limit])
        with limit_set.acquire(requested={"api": 10}) as acq:
            # Safe concurrent access
            ...
        ```

    Subclass Requirements:
        Subclasses must implement:
        - can_acquire(requested): Check if limit can accommodate amount
        - validate_usage(requested, used): Validate actual usage
        - get_stats(): Return current statistics

    See Also:
        - RateLimit: Time-based rate limiting with configurable algorithms
        - CallLimit: Call counting (usage always 1)
        - ResourceLimit: Semaphore-based resource limiting
        - LimitSet: Thread-safe atomic multi-limit acquisition
    """

    key: str  # Unique identifier within a LimitSet

    def can_acquire(self, requested: int) -> bool:
        """Check if the limit can accommodate the requested amount.

        This is a non-blocking check that doesn't modify state. NOT thread-safe.

        Args:
            requested: Amount to check

        Returns:
            True if the requested amount can be acquired

        Warning:
            This method is NOT thread-safe. Only call from within LimitSet
            which provides proper synchronization.
        """
        raise NotImplementedError("Subclasses must implement can_acquire")

    def validate_usage(self, requested: int, used: int) -> None:
        """Validate that usage is valid for this limit type.

        Args:
            requested: Amount originally requested
            used: Actual amount used

        Raises:
            ValueError: If usage is invalid
        """
        raise NotImplementedError("Subclasses must implement validate_usage")

    def get_stats(self) -> dict:
        """Get current statistics for this limit.

        Returns:
            Dictionary of statistics

        Warning:
            This method is NOT thread-safe. For thread-safe stats,
            call via LimitSet.get_stats().
        """
        raise NotImplementedError("Subclasses must implement get_stats")

can_acquire(requested: int) -> bool

Check if the limit can accommodate the requested amount.

This is a non-blocking check that doesn't modify state. NOT thread-safe.

Parameters:

Name Type Description Default
requested int

Amount to check

required

Returns:

Type Description
bool

True if the requested amount can be acquired

Warning

This method is NOT thread-safe. Only call from within LimitSet which provides proper synchronization.

Source code in src/concurry/core/limit/limit.py
def can_acquire(self, requested: int) -> bool:
    """Check if the limit can accommodate the requested amount.

    This is a non-blocking check that doesn't modify state. NOT thread-safe.

    Args:
        requested: Amount to check

    Returns:
        True if the requested amount can be acquired

    Warning:
        This method is NOT thread-safe. Only call from within LimitSet
        which provides proper synchronization.
    """
    raise NotImplementedError("Subclasses must implement can_acquire")

validate_usage(requested: int, used: int) -> None

Validate that usage is valid for this limit type.

Parameters:

Name Type Description Default
requested int

Amount originally requested

required
used int

Actual amount used

required

Raises:

Type Description
ValueError

If usage is invalid

Source code in src/concurry/core/limit/limit.py
def validate_usage(self, requested: int, used: int) -> None:
    """Validate that usage is valid for this limit type.

    Args:
        requested: Amount originally requested
        used: Actual amount used

    Raises:
        ValueError: If usage is invalid
    """
    raise NotImplementedError("Subclasses must implement validate_usage")

get_stats() -> dict

Get current statistics for this limit.

Returns:

Type Description
dict

Dictionary of statistics

Warning

This method is NOT thread-safe. For thread-safe stats, call via LimitSet.get_stats().

Source code in src/concurry/core/limit/limit.py
def get_stats(self) -> dict:
    """Get current statistics for this limit.

    Returns:
        Dictionary of statistics

    Warning:
        This method is NOT thread-safe. For thread-safe stats,
        call via LimitSet.get_stats().
    """
    raise NotImplementedError("Subclasses must implement get_stats")

concurry.core.limit.limit.RateLimit

Bases: Limit

Rate-based limit using configurable rate limiting algorithms.

RateLimits enforce time-based constraints on resource usage, such as API token consumption, bandwidth limits, or request rates. They support multiple algorithms with different performance and precision characteristics.

Thread-Safety

RateLimit is NOT thread-safe. The internal rate limiter implementation (_impl) maintains unprotected state. Use within a LimitSet for thread-safe acquisition and management. LimitSet handles all token acquisition, refunding, and synchronization.

Attributes:

Name Type Description
key str

Unique identifier for this limit (e.g., "input_tokens", "api_calls")

window_seconds confloat(gt=0)

Time window in seconds over which the limit applies

algorithm Union[RateLimitAlgorithm, _NO_ARG_TYPE]

Rate limiting algorithm (TokenBucket, LeakyBucket, SlidingWindow, FixedWindow, or GCRA). If None, uses value from global_config.defaults.rate_limit_algorithm

capacity conint(gt=0)

Maximum capacity (burst size for bucket algorithms, max count for window algorithms)

Algorithms
  • TokenBucket: Allows bursts up to capacity while maintaining average rate. Tokens refill continuously. Best for APIs that allow occasional bursts.
  • LeakyBucket: Processes requests at fixed rate, smoothing traffic. Best for predictable, steady-state traffic.
  • SlidingWindow: Precise rate limiting with rolling time window. More accurate than fixed window, higher memory usage.
  • FixedWindow: Simple rate limiting with fixed time buckets. Fastest but can allow 2x burst at window boundaries.
  • GCRA (Generic Cell Rate Algorithm): Most precise rate limiting using theoretical arrival time tracking. Best for strict rate control.
Token Refunding

When actual usage is less than requested, unused tokens can be refunded back to the limit (up to capacity). This is algorithm-specific and handled by LimitSet: - TokenBucket and GCRA: Support refunding - Others: No refunding (reserved tokens count against limit)

Example

Use within LimitSet::

from concurry import RateLimit, RateLimitAlgorithm, LimitSet

# Define rate limit
limit = RateLimit(
    key="api_tokens",
    window_seconds=60,
    algorithm=RateLimitAlgorithm.TokenBucket,
    capacity=1000
)

# Use within LimitSet (thread-safe)
limits = LimitSet(limits=[limit])
with limits.acquire(requested={"api_tokens": 100}) as acq:
    result = call_api()
    acq.update(usage={"api_tokens": 80})  # Refund 20 tokens
See Also
  • CallLimit: Special case for call counting (usage always 1)
  • ResourceLimit: Non-time-based resource limiting
  • LimitSet: Thread-safe atomic multi-limit acquisition
Source code in src/concurry/core/limit/limit.py
class RateLimit(Limit):
    """Rate-based limit using configurable rate limiting algorithms.

    RateLimits enforce time-based constraints on resource usage, such as API token
    consumption, bandwidth limits, or request rates. They support multiple algorithms
    with different performance and precision characteristics.

    Thread-Safety:
        **RateLimit is NOT thread-safe.** The internal rate limiter implementation
        (_impl) maintains unprotected state. Use within a LimitSet for thread-safe
        acquisition and management. LimitSet handles all token acquisition, refunding,
        and synchronization.

    Attributes:
        key: Unique identifier for this limit (e.g., "input_tokens", "api_calls")
        window_seconds: Time window in seconds over which the limit applies
        algorithm: Rate limiting algorithm (TokenBucket, LeakyBucket, SlidingWindow,
            FixedWindow, or GCRA). If None, uses value from
            global_config.defaults.rate_limit_algorithm
        capacity: Maximum capacity (burst size for bucket algorithms, max count for
            window algorithms)

    Algorithms:
        - **TokenBucket**: Allows bursts up to capacity while maintaining average rate.
          Tokens refill continuously. Best for APIs that allow occasional bursts.
        - **LeakyBucket**: Processes requests at fixed rate, smoothing traffic.
          Best for predictable, steady-state traffic.
        - **SlidingWindow**: Precise rate limiting with rolling time window.
          More accurate than fixed window, higher memory usage.
        - **FixedWindow**: Simple rate limiting with fixed time buckets.
          Fastest but can allow 2x burst at window boundaries.
        - **GCRA** (Generic Cell Rate Algorithm): Most precise rate limiting using
          theoretical arrival time tracking. Best for strict rate control.

    Token Refunding:
        When actual usage is less than requested, unused tokens can be refunded back
        to the limit (up to capacity). This is algorithm-specific and handled by
        LimitSet:
        - TokenBucket and GCRA: Support refunding
        - Others: No refunding (reserved tokens count against limit)

    Example:
        Use within LimitSet::

            from concurry import RateLimit, RateLimitAlgorithm, LimitSet

            # Define rate limit
            limit = RateLimit(
                key="api_tokens",
                window_seconds=60,
                algorithm=RateLimitAlgorithm.TokenBucket,
                capacity=1000
            )

            # Use within LimitSet (thread-safe)
            limits = LimitSet(limits=[limit])
            with limits.acquire(requested={"api_tokens": 100}) as acq:
                result = call_api()
                acq.update(usage={"api_tokens": 80})  # Refund 20 tokens

    See Also:
        - CallLimit: Special case for call counting (usage always 1)
        - ResourceLimit: Non-time-based resource limiting
        - LimitSet: Thread-safe atomic multi-limit acquisition
    """

    window_seconds: confloat(gt=0)
    algorithm: Union[RateLimitAlgorithm, _NO_ARG_TYPE] = _NO_ARG
    capacity: conint(gt=0)

    def post_initialize(self) -> NoReturn:
        """Initialize the rate limiter implementation."""
        # Apply default algorithm from global config if not specified
        if self.algorithm is _NO_ARG:
            from ...config import global_config

            local_config = global_config.clone()
            object.__setattr__(self, "algorithm", local_config.defaults.rate_limit_algorithm)

        # Convert max_rate from capacity per window to per second
        max_rate = self.capacity / self.window_seconds if self.window_seconds > 0 else 0

        # Use factory to create the appropriate limiter
        self._impl = RateLimiter(
            algorithm=self.algorithm,
            max_rate=max_rate,
            capacity=self.capacity,
            window_seconds=self.window_seconds,
        )

    def can_acquire(self, requested: int) -> bool:
        """Check if tokens can be acquired without consuming them.

        Warning:
            This method is NOT thread-safe. Only call from within LimitSet.
        """
        return self._impl.can_acquire(tokens=requested)

    def validate_usage(self, requested: int, used: int) -> None:
        """Validate that usage doesn't exceed requested."""
        if used > requested:
            raise ValueError(
                f"Usage ({used}) cannot exceed requested amount ({requested}) for limit '{self.key}'"
            )

    def get_stats(self) -> dict:
        """Get current rate limit statistics."""
        stats = self._impl.get_stats()
        stats["key"] = self.key
        stats["window_seconds"] = self.window_seconds
        stats["capacity"] = self.capacity
        return stats

post_initialize() -> NoReturn

Initialize the rate limiter implementation.

Source code in src/concurry/core/limit/limit.py
def post_initialize(self) -> NoReturn:
    """Initialize the rate limiter implementation."""
    # Apply default algorithm from global config if not specified
    if self.algorithm is _NO_ARG:
        from ...config import global_config

        local_config = global_config.clone()
        object.__setattr__(self, "algorithm", local_config.defaults.rate_limit_algorithm)

    # Convert max_rate from capacity per window to per second
    max_rate = self.capacity / self.window_seconds if self.window_seconds > 0 else 0

    # Use factory to create the appropriate limiter
    self._impl = RateLimiter(
        algorithm=self.algorithm,
        max_rate=max_rate,
        capacity=self.capacity,
        window_seconds=self.window_seconds,
    )

can_acquire(requested: int) -> bool

Check if tokens can be acquired without consuming them.

Warning

This method is NOT thread-safe. Only call from within LimitSet.

Source code in src/concurry/core/limit/limit.py
def can_acquire(self, requested: int) -> bool:
    """Check if tokens can be acquired without consuming them.

    Warning:
        This method is NOT thread-safe. Only call from within LimitSet.
    """
    return self._impl.can_acquire(tokens=requested)

validate_usage(requested: int, used: int) -> None

Validate that usage doesn't exceed requested.

Source code in src/concurry/core/limit/limit.py
def validate_usage(self, requested: int, used: int) -> None:
    """Validate that usage doesn't exceed requested."""
    if used > requested:
        raise ValueError(
            f"Usage ({used}) cannot exceed requested amount ({requested}) for limit '{self.key}'"
        )

get_stats() -> dict

Get current rate limit statistics.

Source code in src/concurry/core/limit/limit.py
def get_stats(self) -> dict:
    """Get current rate limit statistics."""
    stats = self._impl.get_stats()
    stats["key"] = self.key
    stats["window_seconds"] = self.window_seconds
    stats["capacity"] = self.capacity
    return stats

concurry.core.limit.limit.CallLimit

Bases: RateLimit

Special RateLimit for counting individual calls.

CallLimit is a specialized RateLimit that enforces a simpler semantic: counting the number of calls (acquisitions) rather than arbitrary token amounts. Usage must always be 1 per call, and the key is fixed to "call_count".

Note

CallLimit is not thread-safe and cannot be acquired directly. Use within a LimitSet for thread-safe acquisition and management.

Attributes:

Name Type Description
key str

Always "call_count" (fixed, cannot be changed)

window_seconds confloat(gt=0)

Time window for call counting

algorithm Union[RateLimitAlgorithm, _NO_ARG_TYPE]

Rate limiting algorithm to use

capacity conint(gt=0)

Maximum calls allowed per window

Characteristics
  • Usage must always be 1 (validated on update)
  • Key is fixed to "call_count" for consistency
  • Inherits all RateLimit algorithm support
  • No need to call update() in LimitSet (handled automatically)
Example

Use within LimitSet::

from concurry import CallLimit, RateLimit, RateLimitAlgorithm, LimitSet

limits = LimitSet(limits=[
    CallLimit(
        window_seconds=60,
        algorithm=RateLimitAlgorithm.TokenBucket,
        capacity=100
    ),
    RateLimit(
        key="tokens",
        window_seconds=60,
        algorithm=RateLimitAlgorithm.TokenBucket,
        capacity=1000
    )
])

# CallLimit doesn't need explicit requested or update
with limits.acquire(requested={"tokens": 100}) as acq:
    result = do_work()
    acq.update(usage={"tokens": result.actual_tokens})
    # No need to update "call_count" - automatic!
Notes
  • Trying to set usage != 1 raises ValueError
  • Perfect for enforcing call rate limits independent of resource usage
  • Use RateLimit directly if you need custom keys or multi-token semantics
See Also
  • RateLimit: General-purpose rate limiting with custom keys
  • LimitSet: Combine CallLimit with other limits
Source code in src/concurry/core/limit/limit.py
class CallLimit(RateLimit):
    """Special RateLimit for counting individual calls.

    CallLimit is a specialized RateLimit that enforces a simpler semantic: counting
    the number of calls (acquisitions) rather than arbitrary token amounts. Usage
    must always be 1 per call, and the key is fixed to "call_count".

    Note:
        CallLimit is not thread-safe and cannot be acquired directly. Use within
        a LimitSet for thread-safe acquisition and management.

    Attributes:
        key: Always "call_count" (fixed, cannot be changed)
        window_seconds: Time window for call counting
        algorithm: Rate limiting algorithm to use
        capacity: Maximum calls allowed per window

    Characteristics:
        - Usage must always be 1 (validated on update)
        - Key is fixed to "call_count" for consistency
        - Inherits all RateLimit algorithm support
        - No need to call update() in LimitSet (handled automatically)

    Example:
        Use within LimitSet::

            from concurry import CallLimit, RateLimit, RateLimitAlgorithm, LimitSet

            limits = LimitSet(limits=[
                CallLimit(
                    window_seconds=60,
                    algorithm=RateLimitAlgorithm.TokenBucket,
                    capacity=100
                ),
                RateLimit(
                    key="tokens",
                    window_seconds=60,
                    algorithm=RateLimitAlgorithm.TokenBucket,
                    capacity=1000
                )
            ])

            # CallLimit doesn't need explicit requested or update
            with limits.acquire(requested={"tokens": 100}) as acq:
                result = do_work()
                acq.update(usage={"tokens": result.actual_tokens})
                # No need to update "call_count" - automatic!

    Notes:
        - Trying to set usage != 1 raises ValueError
        - Perfect for enforcing call rate limits independent of resource usage
        - Use RateLimit directly if you need custom keys or multi-token semantics

    See Also:
        - RateLimit: General-purpose rate limiting with custom keys
        - LimitSet: Combine CallLimit with other limits
    """

    CallLimit_key: ClassVar[str] = "call_count"

    @classmethod
    def pre_initialize(cls, data: Dict) -> NoReturn:
        data["key"] = cls.CallLimit_key  ## Force the key to be "call_count"

    def validate_usage(self, requested: int, used: int) -> None:
        """Validate that usage is always 1 for CallLimit."""
        if used != 1:
            raise ValueError(
                f"CallLimit usage must always be 1, got: {used}. "
                f"CallLimit '{self.key}' is for counting individual calls only."
            )
        super().validate_usage(requested, used)

validate_usage(requested: int, used: int) -> None

Validate that usage is always 1 for CallLimit.

Source code in src/concurry/core/limit/limit.py
def validate_usage(self, requested: int, used: int) -> None:
    """Validate that usage is always 1 for CallLimit."""
    if used != 1:
        raise ValueError(
            f"CallLimit usage must always be 1, got: {used}. "
            f"CallLimit '{self.key}' is for counting individual calls only."
        )
    super().validate_usage(requested, used)

concurry.core.limit.limit.ResourceLimit

Bases: Limit

Semaphore-based resource limiting for countable resources.

ResourceLimits provide simple counting semantics for resources that exist in finite quantities, such as database connections, file handles, thread pool slots, or hardware devices. Unlike RateLimits, they have no time component and are automatically released when the context manager exits.

Thread-Safety

ResourceLimit is NOT thread-safe. The internal _current_usage counter is unprotected. Use within a LimitSet for thread-safe acquisition and management. LimitSet handles all semaphore logic, acquisition tracking, and synchronization.

Attributes:

Name Type Description
key str

Unique identifier for this resource (e.g., "db_connections", "file_handles")

capacity int

Maximum number of resources available (must be >= 1)

Characteristics
  • No time component (unlike RateLimit)
  • Semaphore logic handled by LimitSet
  • Automatic release on context exit
  • No update() needed in LimitSet (handled automatically)
Example

Use within LimitSet::

from concurry import LimitSet, ResourceLimit, RateLimit, RateLimitAlgorithm

limits = LimitSet(limits=[
    ResourceLimit(key="db_connections", capacity=5),
    ResourceLimit(key="file_handles", capacity=20),
    RateLimit(
        key="api_tokens",
        window_seconds=60,
        algorithm=RateLimitAlgorithm.TokenBucket,
        capacity=1000
    )
])

# Acquire multiple resources atomically
with limits.acquire(requested={
    "db_connections": 2,
    "file_handles": 5,
    "api_tokens": 100
}) as acq:
    # Use resources
    acq.update(usage={"api_tokens": 80})
    # No need to update ResourceLimits - automatic!

With Worker::

from concurry import Worker

class DatabaseWorker(Worker):
    def query(self, sql: str):
        with self.limits.acquire(requested={"db_connections": 1}):
            return execute_query(sql)

worker = DatabaseWorker.options(
    mode="thread",
    limits=limits
).init()
Notes
  • Resources are released automatically on context exit
  • No need to call update() in LimitSet context
  • Capacity must be >= 1 (enforced at initialization)
  • Semaphore logic is managed by LimitSet for thread-safety
  • Perfect for connection pools, file handle limits, etc.
See Also
  • RateLimit: Time-based rate limiting
  • CallLimit: Call counting (time-based)
  • LimitSet: Combine multiple limits atomically with thread-safety
Source code in src/concurry/core/limit/limit.py
class ResourceLimit(Limit):
    """Semaphore-based resource limiting for countable resources.

    ResourceLimits provide simple counting semantics for resources that exist in
    finite quantities, such as database connections, file handles, thread pool slots,
    or hardware devices. Unlike RateLimits, they have no time component and are
    automatically released when the context manager exits.

    Thread-Safety:
        **ResourceLimit is NOT thread-safe.** The internal _current_usage counter
        is unprotected. Use within a LimitSet for thread-safe acquisition and
        management. LimitSet handles all semaphore logic, acquisition tracking,
        and synchronization.

    Attributes:
        key: Unique identifier for this resource (e.g., "db_connections", "file_handles")
        capacity: Maximum number of resources available (must be >= 1)

    Characteristics:
        - No time component (unlike RateLimit)
        - Semaphore logic handled by LimitSet
        - Automatic release on context exit
        - No update() needed in LimitSet (handled automatically)

    Example:
        Use within LimitSet::

            from concurry import LimitSet, ResourceLimit, RateLimit, RateLimitAlgorithm

            limits = LimitSet(limits=[
                ResourceLimit(key="db_connections", capacity=5),
                ResourceLimit(key="file_handles", capacity=20),
                RateLimit(
                    key="api_tokens",
                    window_seconds=60,
                    algorithm=RateLimitAlgorithm.TokenBucket,
                    capacity=1000
                )
            ])

            # Acquire multiple resources atomically
            with limits.acquire(requested={
                "db_connections": 2,
                "file_handles": 5,
                "api_tokens": 100
            }) as acq:
                # Use resources
                acq.update(usage={"api_tokens": 80})
                # No need to update ResourceLimits - automatic!

        With Worker::

            from concurry import Worker

            class DatabaseWorker(Worker):
                def query(self, sql: str):
                    with self.limits.acquire(requested={"db_connections": 1}):
                        return execute_query(sql)

            worker = DatabaseWorker.options(
                mode="thread",
                limits=limits
            ).init()

    Notes:
        - Resources are released automatically on context exit
        - No need to call update() in LimitSet context
        - Capacity must be >= 1 (enforced at initialization)
        - Semaphore logic is managed by LimitSet for thread-safety
        - Perfect for connection pools, file handle limits, etc.

    See Also:
        - RateLimit: Time-based rate limiting
        - CallLimit: Call counting (time-based)
        - LimitSet: Combine multiple limits atomically with thread-safety
    """

    capacity: int  # Must be >= 1
    _current_usage: int = 0  # Track current usage (not thread-safe on its own)

    def post_initialize(self) -> NoReturn:
        """Validate capacity."""
        if self.capacity < 1:
            raise ValueError(f"ResourceLimit capacity must be >= 1, got: {self.capacity}")

    def can_acquire(self, requested: int) -> bool:
        """Check if resources can be acquired.

        Warning:
            This method is NOT thread-safe. Only call from within LimitSet.
        """
        return (self._current_usage + requested) <= self.capacity

    def validate_usage(self, requested: int, used: int) -> None:
        """Validate usage (not applicable for ResourceLimit).

        ResourceLimits don't have variable usage - they're automatically released.
        """
        pass

    def get_stats(self) -> dict:
        """Get current resource limit statistics.

        Note: This is not thread-safe. For thread-safe stats, call via LimitSet.
        """
        return {
            "key": self.key,
            "capacity": self.capacity,
            "current_usage": self._current_usage,
            "available": self.capacity - self._current_usage,
            "utilization": self._current_usage / self.capacity if self.capacity > 0 else 0,
        }

post_initialize() -> NoReturn

Validate capacity.

Source code in src/concurry/core/limit/limit.py
def post_initialize(self) -> NoReturn:
    """Validate capacity."""
    if self.capacity < 1:
        raise ValueError(f"ResourceLimit capacity must be >= 1, got: {self.capacity}")

can_acquire(requested: int) -> bool

Check if resources can be acquired.

Warning

This method is NOT thread-safe. Only call from within LimitSet.

Source code in src/concurry/core/limit/limit.py
def can_acquire(self, requested: int) -> bool:
    """Check if resources can be acquired.

    Warning:
        This method is NOT thread-safe. Only call from within LimitSet.
    """
    return (self._current_usage + requested) <= self.capacity

validate_usage(requested: int, used: int) -> None

Validate usage (not applicable for ResourceLimit).

ResourceLimits don't have variable usage - they're automatically released.

Source code in src/concurry/core/limit/limit.py
def validate_usage(self, requested: int, used: int) -> None:
    """Validate usage (not applicable for ResourceLimit).

    ResourceLimits don't have variable usage - they're automatically released.
    """
    pass

get_stats() -> dict

Get current resource limit statistics.

Note: This is not thread-safe. For thread-safe stats, call via LimitSet.

Source code in src/concurry/core/limit/limit.py
def get_stats(self) -> dict:
    """Get current resource limit statistics.

    Note: This is not thread-safe. For thread-safe stats, call via LimitSet.
    """
    return {
        "key": self.key,
        "capacity": self.capacity,
        "current_usage": self._current_usage,
        "available": self.capacity - self._current_usage,
        "utilization": self._current_usage / self.capacity if self.capacity > 0 else 0,
    }

concurry.core.limit.limit_set.LimitSet(limits: List[Limit], shared: bool = False, mode: ExecutionMode = ExecutionMode.Sync, config: Optional[dict] = None) -> Union[InMemorySharedLimitSet, MultiprocessSharedLimitSet, RaySharedLimitSet]

Factory function to create appropriate LimitSet implementation.

Parameters:

Name Type Description Default
limits List[Limit]

List of Limit instances. Can be empty list to create a no-op LimitSet that always allows acquisition without blocking.

required
shared bool

If True, create a shared LimitSet for cross-worker use. If False, create a private LimitSet with warning.

False
mode ExecutionMode

Execution mode (ExecutionMode enum or string like "sync", "thread", "asyncio", "process", "ray")

Sync
config Optional[dict]

Static configuration dict (metadata) accessible via acquisition.config. Empty dict by default. Useful for multi-account/multi-region scenarios.

None

Returns:

Type Description
Union[InMemorySharedLimitSet, MultiprocessSharedLimitSet, RaySharedLimitSet]

Appropriate LimitSet implementation based on shared and mode

Raises:

Type Description
ValueError

If shared=False and mode != "sync"

Examples:

Private LimitSet (non-shared):

limits = LimitSet(
    limits=[RateLimit(...), ResourceLimit(...)],
    shared=False,
    mode="sync"
)

Empty LimitSet (always allows acquisition):

# Create empty LimitSet - never blocks, always succeeds
limits = LimitSet(limits=[], shared=False, mode="sync")

with limits.acquire():
    # Always succeeds immediately, no limits enforced
    do_work()

# Workers automatically get empty LimitSet when no limits provided
worker = MyWorker.options(mode="thread").init()
# worker.limits is available and always allows acquisition

Shared LimitSet for thread workers:

limits = LimitSet(
    limits=[RateLimit(...), ResourceLimit(...)],
    shared=True,
    mode="thread"
)
worker1 = MyWorker.options(mode="thread", limits=limits).init()
worker2 = MyWorker.options(mode="thread", limits=limits).init()
# worker1 and worker2 share the same limits

Shared LimitSet for process workers:

limits = LimitSet(
    limits=[RateLimit(...), ResourceLimit(...)],
    shared=True,
    mode="process"
)
worker1 = MyWorker.options(mode="process", limits=limits).init()
worker2 = MyWorker.options(mode="process", limits=limits).init()
# worker1 and worker2 share the same limits across processes

Notes
  • Empty LimitSet (limits=[]) is useful for conditional limit enforcement
  • Workers automatically get empty LimitSet when no limits parameter provided
  • Empty LimitSet has zero overhead - acquire() returns immediately
  • Code can safely call self.limits.acquire() without checking if limits exist
Source code in src/concurry/core/limit/limit_set.py
def LimitSet(
    limits: List[Limit],
    shared: bool = False,
    mode: ExecutionMode = ExecutionMode.Sync,
    config: Optional[dict] = None,
) -> Union[InMemorySharedLimitSet, MultiprocessSharedLimitSet, RaySharedLimitSet]:
    """Factory function to create appropriate LimitSet implementation.

    Args:
        limits: List of Limit instances. Can be empty list to create a no-op LimitSet
                that always allows acquisition without blocking.
        shared: If True, create a shared LimitSet for cross-worker use.
                If False, create a private LimitSet with warning.
        mode: Execution mode (ExecutionMode enum or string like "sync", "thread", "asyncio", "process", "ray")
        config: Static configuration dict (metadata) accessible via acquisition.config.
                Empty dict by default. Useful for multi-account/multi-region scenarios.

    Returns:
        Appropriate LimitSet implementation based on shared and mode

    Raises:
        ValueError: If shared=False and mode != "sync"

    Examples:
        Private LimitSet (non-shared):
            ```python
            limits = LimitSet(
                limits=[RateLimit(...), ResourceLimit(...)],
                shared=False,
                mode="sync"
            )
            ```

        Empty LimitSet (always allows acquisition):
            ```python
            # Create empty LimitSet - never blocks, always succeeds
            limits = LimitSet(limits=[], shared=False, mode="sync")

            with limits.acquire():
                # Always succeeds immediately, no limits enforced
                do_work()

            # Workers automatically get empty LimitSet when no limits provided
            worker = MyWorker.options(mode="thread").init()
            # worker.limits is available and always allows acquisition
            ```

        Shared LimitSet for thread workers:
            ```python
            limits = LimitSet(
                limits=[RateLimit(...), ResourceLimit(...)],
                shared=True,
                mode="thread"
            )
            worker1 = MyWorker.options(mode="thread", limits=limits).init()
            worker2 = MyWorker.options(mode="thread", limits=limits).init()
            # worker1 and worker2 share the same limits
            ```

        Shared LimitSet for process workers:
            ```python
            limits = LimitSet(
                limits=[RateLimit(...), ResourceLimit(...)],
                shared=True,
                mode="process"
            )
            worker1 = MyWorker.options(mode="process", limits=limits).init()
            worker2 = MyWorker.options(mode="process", limits=limits).init()
            # worker1 and worker2 share the same limits across processes
            ```

    Notes:
        - Empty LimitSet (limits=[]) is useful for conditional limit enforcement
        - Workers automatically get empty LimitSet when no limits parameter provided
        - Empty LimitSet has zero overhead - acquire() returns immediately
        - Code can safely call self.limits.acquire() without checking if limits exist
    """
    # Convert string to ExecutionMode if needed
    mode: ExecutionMode = ExecutionMode(mode)

    # Select appropriate implementation
    if mode in (ExecutionMode.Sync, ExecutionMode.Asyncio, ExecutionMode.Threads):
        return InMemorySharedLimitSet(limits=limits, shared=shared, config=config)
    elif mode == ExecutionMode.Processes:
        if shared is False:
            raise ValueError("Non-shared LimitSets cannot use mode='process'")
        return MultiprocessSharedLimitSet(limits=limits, shared=True, config=config)
    elif mode == ExecutionMode.Ray:
        if shared is False:
            raise ValueError("Non-shared LimitSets cannot use mode='ray'")
        return RaySharedLimitSet(limits=limits, shared=True, config=config)
    else:
        raise ValueError(
            f"Unknown execution mode: '{mode}'. Valid modes: sync, asyncio, thread, process, ray"
        )

concurry.core.limit.limit_pool.LimitPool

Bases: Typed

Private wrapper for load-balanced selection across multiple LimitSets.

LimitPool is designed for scenarios with multiple independent resource pools, such as multi-account/multi-region API access. Each LimitSet can have different configs (e.g., account ID, region) that are exposed via acquisition.config.

Architecture
  • Lives privately in each worker (NOT shared)
  • Wraps multiple shared LimitSets
  • Selects LimitSet using load balancing (no synchronization)
  • Delegates acquire() to selected LimitSet
Load Balancing
  • Random: Random selection (stateless, zero overhead)
  • RoundRobin: Circular with offset (worker_index based)
Thread-Safety

LimitPool itself does NOT need thread-safety since each worker has its own private instance. The LimitSets within the pool are shared and provide their own thread-safety.

Attributes:

Name Type Description
limit_sets List[BaseLimitSet]

List of LimitSet instances to select from

load_balancing Union[LoadBalancingAlgorithm, _NO_ARG_TYPE]

Algorithm (LoadBalancingAlgorithm enum). If None, uses value from global_config.defaults.limit_pool_load_balancing

worker_index Union[int, _NO_ARG_TYPE]

Starting offset for round-robin. If None, uses value from global_config.defaults.limit_pool_worker_index

Example

Basic usage with two regions::

from concurry import LimitSet, LimitPool, LoadBalancingAlgorithm

pool = LimitPool(
    limit_sets=[limitset_us, limitset_eu],
    load_balancing=LoadBalancingAlgorithm.RoundRobin,
    worker_index=0
)

# Acquire from selected LimitSet
with pool.acquire(requested={"tokens": 100}) as acq:
    region = acq.config["region"]
    result = call_api(region)
    acq.update(usage={"tokens": result.tokens})

Worker integration::

class APIWorker(Worker):
    def call_api(self, prompt: str):
        # self.limits is a LimitPool
        with self.limits.acquire(requested={"tokens": 100}) as acq:
            endpoint = acq.config["endpoint"]
            result = make_request(endpoint, prompt)
            acq.update(usage={"tokens": result.tokens})
            return result

# Create workers with LimitPool
pool = APIWorker.options(
    mode="thread",
    max_workers=10,
    limits=[limitset1, limitset2, limitset3]  # Creates LimitPool
).init()
See Also
  • LimitSet: Thread-safe limit set for atomic multi-limit acquisition
  • LoadBalancingAlgorithm: Enum of supported algorithms
  • User Guide: docs/user-guide/limits.md#limitpool
Source code in src/concurry/core/limit/limit_pool.py
class LimitPool(Typed):
    """Private wrapper for load-balanced selection across multiple LimitSets.

    LimitPool is designed for scenarios with multiple independent resource pools,
    such as multi-account/multi-region API access. Each LimitSet can have different
    configs (e.g., account ID, region) that are exposed via acquisition.config.

    Architecture:
        - Lives privately in each worker (NOT shared)
        - Wraps multiple shared LimitSets
        - Selects LimitSet using load balancing (no synchronization)
        - Delegates acquire() to selected LimitSet

    Load Balancing:
        - **Random**: Random selection (stateless, zero overhead)
        - **RoundRobin**: Circular with offset (worker_index based)

    Thread-Safety:
        LimitPool itself does NOT need thread-safety since each worker has its
        own private instance. The LimitSets within the pool are shared and
        provide their own thread-safety.

    Attributes:
        limit_sets: List of LimitSet instances to select from
        load_balancing: Algorithm (LoadBalancingAlgorithm enum). If None, uses value from
            global_config.defaults.limit_pool_load_balancing
        worker_index: Starting offset for round-robin. If None, uses value from
            global_config.defaults.limit_pool_worker_index

    Example:
        Basic usage with two regions::

            from concurry import LimitSet, LimitPool, LoadBalancingAlgorithm

            pool = LimitPool(
                limit_sets=[limitset_us, limitset_eu],
                load_balancing=LoadBalancingAlgorithm.RoundRobin,
                worker_index=0
            )

            # Acquire from selected LimitSet
            with pool.acquire(requested={"tokens": 100}) as acq:
                region = acq.config["region"]
                result = call_api(region)
                acq.update(usage={"tokens": result.tokens})

        Worker integration::

            class APIWorker(Worker):
                def call_api(self, prompt: str):
                    # self.limits is a LimitPool
                    with self.limits.acquire(requested={"tokens": 100}) as acq:
                        endpoint = acq.config["endpoint"]
                        result = make_request(endpoint, prompt)
                        acq.update(usage={"tokens": result.tokens})
                        return result

            # Create workers with LimitPool
            pool = APIWorker.options(
                mode="thread",
                max_workers=10,
                limits=[limitset1, limitset2, limitset3]  # Creates LimitPool
            ).init()

    See Also:
        - LimitSet: Thread-safe limit set for atomic multi-limit acquisition
        - LoadBalancingAlgorithm: Enum of supported algorithms
        - User Guide: docs/user-guide/limits.md#limitpool
    """

    # Public immutable attributes
    limit_sets: List[BaseLimitSet]
    load_balancing: Union[LoadBalancingAlgorithm, _NO_ARG_TYPE] = _NO_ARG
    worker_index: Union[int, _NO_ARG_TYPE] = _NO_ARG

    # Private mutable attributes
    _balancer: Any = PrivateAttr()

    def post_initialize(self) -> NoReturn:
        """Initialize private attributes after Typed validation.

        Creates the appropriate load balancer based on the load_balancing algorithm
        and worker_index offset.

        Raises:
            ValueError: If limit_sets is empty
        """
        from ...config import global_config

        local_config = global_config.clone()

        if len(self.limit_sets) == 0:
            raise ValueError("LimitPool requires at least one LimitSet")

        # Apply defaults from global config if not specified
        if self.load_balancing is _NO_ARG or self.worker_index is _NO_ARG:
            if self.load_balancing is _NO_ARG:
                object.__setattr__(self, "load_balancing", local_config.defaults.limit_pool_load_balancing)
            if self.worker_index is _NO_ARG:
                object.__setattr__(self, "worker_index", local_config.defaults.limit_pool_worker_index)

        # Create appropriate load balancer using factory
        if self.load_balancing == LoadBalancingAlgorithm.Random:
            balancer = LoadBalancer(LoadBalancingAlgorithm.Random)
        elif self.load_balancing == LoadBalancingAlgorithm.RoundRobin:
            # Use RoundRobin with offset support for distributed starting points
            balancer = LoadBalancer(LoadBalancingAlgorithm.RoundRobin, offset=self.worker_index)
        else:
            raise ValueError(
                f"Unsupported load balancing algorithm for LimitPool: {self.load_balancing}. "
                f"Supported: Random, RoundRobin"
            )

        # Store balancer in private attribute
        object.__setattr__(self, "_balancer", balancer)

    def acquire(
        self, requested: Optional[Dict[str, int]] = None, timeout: Optional[float] = None
    ) -> LimitSetAcquisition:
        """Acquire from a selected LimitSet.

        Selects a LimitSet using load balancing, then delegates acquisition.
        The returned acquisition will have .config from the selected LimitSet.

        Args:
            requested: Dict mapping limit keys to requested amounts.
                If None or empty, acquires all limits with defaults.
            timeout: Maximum time to wait for acquisition in seconds.
                If None, blocks indefinitely.

        Returns:
            LimitSetAcquisition with config from selected LimitSet

        Raises:
            TimeoutError: If acquisition times out
            ValueError: If requested amounts are invalid

        Example:
            Acquire with specific amounts::

                with pool.acquire(requested={"tokens": 100, "connections": 2}) as acq:
                    # acq.config contains selected LimitSet's config
                    region = acq.config.get("region", "unknown")
                    result = call_api(region)
                    acq.update(usage={"tokens": result.tokens})

            Acquire with defaults::

                with pool.acquire() as acq:
                    # CallLimit and ResourceLimit use defaults
                    result = operation()
        """
        # Select a LimitSet using load balancing
        selected_limitset = self._select_limit_set()
        # Delegate to selected LimitSet
        return selected_limitset.acquire(requested=requested, timeout=timeout)

    def try_acquire(self, requested: Optional[Dict[str, int]] = None) -> LimitSetAcquisition:
        """Try to acquire from a selected LimitSet without blocking.

        Selects a LimitSet using load balancing, then attempts non-blocking acquisition.
        Returns immediately with successful=False if limits cannot be acquired.

        Args:
            requested: Dict mapping limit keys to requested amounts.
                If None or empty, acquires all limits with defaults.

        Returns:
            LimitSetAcquisition with successful attribute indicating success

        Example:
            Try acquire with fallback::

                acq = pool.try_acquire(requested={"tokens": 100})
                if acq.successful:
                    with acq:
                        result = expensive_operation()
                        acq.update(usage={"tokens": result.tokens})
                else:
                    result = use_cached_result()
        """
        # Select a LimitSet using load balancing
        selected_limitset = self._select_limit_set()
        # Delegate to selected LimitSet
        return selected_limitset.try_acquire(requested=requested)

    def _select_limit_set(self) -> BaseLimitSet:
        """Select a LimitSet using configured load balancing algorithm.

        Returns:
            Selected BaseLimitSet instance
        """
        # Use balancer to select index
        index = self._balancer.select_worker(num_workers=len(self.limit_sets))
        return self.limit_sets[index]

    def get_stats(self) -> Dict[str, Any]:
        """Get statistics for the LimitPool and its constituent LimitSets.

        Returns:
            Dictionary containing:
                - num_limit_sets: Number of LimitSets in the pool
                - load_balancing: Load balancing algorithm
                - worker_index: Worker's starting offset
                - balancer_stats: Statistics from the load balancer
                - limit_sets: List of stats dicts, one per LimitSet

        Example:
            Get and display stats::

                stats = pool.get_stats()
                print(f"LimitSets: {stats['num_limit_sets']}")
                print(f"Algorithm: {stats['load_balancing']}")
                print(f"Balancer: {stats['balancer_stats']}")

                for i, ls_stats in enumerate(stats['limit_sets']):
                    print(f"LimitSet {i}: {ls_stats}")
        """
        return {
            "num_limit_sets": len(self.limit_sets),
            "load_balancing": self.load_balancing.value,
            "worker_index": self.worker_index,
            "balancer_stats": self._balancer.get_stats(),
            "limit_sets": [ls.get_stats() for ls in self.limit_sets],
        }

    def __getitem__(self, index: int) -> BaseLimitSet:
        """Get LimitSet by integer index.

        Note: String key access is NOT supported because different LimitSets
        in the pool may have different limit keys. To access a Limit by key,
        first get the LimitSet by index, then access the Limit:

            limit = pool[0]["tokens"]  # Get "tokens" limit from first LimitSet
            limit = pool.limit_sets[0]["tokens"]  # Equivalent, more explicit

        Args:
            index: Integer index of the LimitSet (0-based)

        Returns:
            BaseLimitSet at the specified index

        Raises:
            IndexError: If index is out of range
            TypeError: If index is not an integer

        Example:
            Access LimitSet by index::

                pool = LimitPool(limit_sets=[ls1, ls2, ls3])

                # Access first LimitSet
                first_limitset = pool[0]
                stats = first_limitset.get_stats()

                # Access Limit within first LimitSet
                token_limit = pool[0]["tokens"]
                capacity = token_limit.capacity
        """
        if not isinstance(index, int):
            raise TypeError(
                f"LimitPool indices must be integers, not {type(index).__name__}. "
                f"String key access is not supported because LimitSets may have different keys. "
                f"Use pool[i]['key'] to access a Limit from a specific LimitSet."
            )

        return self.limit_sets[index]

    def __getstate__(self) -> Dict[str, Any]:
        """Custom pickle support - exclude non-serializable balancer.

        The balancer contains threading.Lock which cannot be pickled. We store
        only the configuration needed to recreate it on unpickling.

        Returns:
            Dict of serializable state
        """
        state = self.__dict__.copy()
        # Remove the _balancer which contains locks
        state.pop("_balancer", None)
        return state

    def __setstate__(self, state: Dict[str, Any]) -> None:
        """Custom unpickle support - recreate balancer from configuration.

        Recreates the balancer based on load_balancing algorithm and worker_index.

        Args:
            state: Pickled state dict
        """
        self.__dict__.update(state)

        # Recreate balancer using factory
        if self.load_balancing == LoadBalancingAlgorithm.Random:
            balancer = LoadBalancer(LoadBalancingAlgorithm.Random)
        elif self.load_balancing == LoadBalancingAlgorithm.RoundRobin:
            balancer = LoadBalancer(LoadBalancingAlgorithm.RoundRobin, offset=self.worker_index)
        else:
            raise ValueError(f"Unknown load balancing algorithm: {self.load_balancing}")

        object.__setattr__(self, "_balancer", balancer)

post_initialize() -> NoReturn

Initialize private attributes after Typed validation.

Creates the appropriate load balancer based on the load_balancing algorithm and worker_index offset.

Raises:

Type Description
ValueError

If limit_sets is empty

Source code in src/concurry/core/limit/limit_pool.py
def post_initialize(self) -> NoReturn:
    """Initialize private attributes after Typed validation.

    Creates the appropriate load balancer based on the load_balancing algorithm
    and worker_index offset.

    Raises:
        ValueError: If limit_sets is empty
    """
    from ...config import global_config

    local_config = global_config.clone()

    if len(self.limit_sets) == 0:
        raise ValueError("LimitPool requires at least one LimitSet")

    # Apply defaults from global config if not specified
    if self.load_balancing is _NO_ARG or self.worker_index is _NO_ARG:
        if self.load_balancing is _NO_ARG:
            object.__setattr__(self, "load_balancing", local_config.defaults.limit_pool_load_balancing)
        if self.worker_index is _NO_ARG:
            object.__setattr__(self, "worker_index", local_config.defaults.limit_pool_worker_index)

    # Create appropriate load balancer using factory
    if self.load_balancing == LoadBalancingAlgorithm.Random:
        balancer = LoadBalancer(LoadBalancingAlgorithm.Random)
    elif self.load_balancing == LoadBalancingAlgorithm.RoundRobin:
        # Use RoundRobin with offset support for distributed starting points
        balancer = LoadBalancer(LoadBalancingAlgorithm.RoundRobin, offset=self.worker_index)
    else:
        raise ValueError(
            f"Unsupported load balancing algorithm for LimitPool: {self.load_balancing}. "
            f"Supported: Random, RoundRobin"
        )

    # Store balancer in private attribute
    object.__setattr__(self, "_balancer", balancer)

acquire(requested: Optional[Dict[str, int]] = None, timeout: Optional[float] = None) -> LimitSetAcquisition

Acquire from a selected LimitSet.

Selects a LimitSet using load balancing, then delegates acquisition. The returned acquisition will have .config from the selected LimitSet.

Parameters:

Name Type Description Default
requested Optional[Dict[str, int]]

Dict mapping limit keys to requested amounts. If None or empty, acquires all limits with defaults.

None
timeout Optional[float]

Maximum time to wait for acquisition in seconds. If None, blocks indefinitely.

None

Returns:

Type Description
LimitSetAcquisition

LimitSetAcquisition with config from selected LimitSet

Raises:

Type Description
TimeoutError

If acquisition times out

ValueError

If requested amounts are invalid

Example

Acquire with specific amounts::

with pool.acquire(requested={"tokens": 100, "connections": 2}) as acq:
    # acq.config contains selected LimitSet's config
    region = acq.config.get("region", "unknown")
    result = call_api(region)
    acq.update(usage={"tokens": result.tokens})

Acquire with defaults::

with pool.acquire() as acq:
    # CallLimit and ResourceLimit use defaults
    result = operation()
Source code in src/concurry/core/limit/limit_pool.py
def acquire(
    self, requested: Optional[Dict[str, int]] = None, timeout: Optional[float] = None
) -> LimitSetAcquisition:
    """Acquire from a selected LimitSet.

    Selects a LimitSet using load balancing, then delegates acquisition.
    The returned acquisition will have .config from the selected LimitSet.

    Args:
        requested: Dict mapping limit keys to requested amounts.
            If None or empty, acquires all limits with defaults.
        timeout: Maximum time to wait for acquisition in seconds.
            If None, blocks indefinitely.

    Returns:
        LimitSetAcquisition with config from selected LimitSet

    Raises:
        TimeoutError: If acquisition times out
        ValueError: If requested amounts are invalid

    Example:
        Acquire with specific amounts::

            with pool.acquire(requested={"tokens": 100, "connections": 2}) as acq:
                # acq.config contains selected LimitSet's config
                region = acq.config.get("region", "unknown")
                result = call_api(region)
                acq.update(usage={"tokens": result.tokens})

        Acquire with defaults::

            with pool.acquire() as acq:
                # CallLimit and ResourceLimit use defaults
                result = operation()
    """
    # Select a LimitSet using load balancing
    selected_limitset = self._select_limit_set()
    # Delegate to selected LimitSet
    return selected_limitset.acquire(requested=requested, timeout=timeout)

try_acquire(requested: Optional[Dict[str, int]] = None) -> LimitSetAcquisition

Try to acquire from a selected LimitSet without blocking.

Selects a LimitSet using load balancing, then attempts non-blocking acquisition. Returns immediately with successful=False if limits cannot be acquired.

Parameters:

Name Type Description Default
requested Optional[Dict[str, int]]

Dict mapping limit keys to requested amounts. If None or empty, acquires all limits with defaults.

None

Returns:

Type Description
LimitSetAcquisition

LimitSetAcquisition with successful attribute indicating success

Example

Try acquire with fallback::

acq = pool.try_acquire(requested={"tokens": 100})
if acq.successful:
    with acq:
        result = expensive_operation()
        acq.update(usage={"tokens": result.tokens})
else:
    result = use_cached_result()
Source code in src/concurry/core/limit/limit_pool.py
def try_acquire(self, requested: Optional[Dict[str, int]] = None) -> LimitSetAcquisition:
    """Try to acquire from a selected LimitSet without blocking.

    Selects a LimitSet using load balancing, then attempts non-blocking acquisition.
    Returns immediately with successful=False if limits cannot be acquired.

    Args:
        requested: Dict mapping limit keys to requested amounts.
            If None or empty, acquires all limits with defaults.

    Returns:
        LimitSetAcquisition with successful attribute indicating success

    Example:
        Try acquire with fallback::

            acq = pool.try_acquire(requested={"tokens": 100})
            if acq.successful:
                with acq:
                    result = expensive_operation()
                    acq.update(usage={"tokens": result.tokens})
            else:
                result = use_cached_result()
    """
    # Select a LimitSet using load balancing
    selected_limitset = self._select_limit_set()
    # Delegate to selected LimitSet
    return selected_limitset.try_acquire(requested=requested)

get_stats() -> Dict[str, Any]

Get statistics for the LimitPool and its constituent LimitSets.

Returns:

Type Description
Dict[str, Any]

Dictionary containing: - num_limit_sets: Number of LimitSets in the pool - load_balancing: Load balancing algorithm - worker_index: Worker's starting offset - balancer_stats: Statistics from the load balancer - limit_sets: List of stats dicts, one per LimitSet

Example

Get and display stats::

stats = pool.get_stats()
print(f"LimitSets: {stats['num_limit_sets']}")
print(f"Algorithm: {stats['load_balancing']}")
print(f"Balancer: {stats['balancer_stats']}")

for i, ls_stats in enumerate(stats['limit_sets']):
    print(f"LimitSet {i}: {ls_stats}")
Source code in src/concurry/core/limit/limit_pool.py
def get_stats(self) -> Dict[str, Any]:
    """Get statistics for the LimitPool and its constituent LimitSets.

    Returns:
        Dictionary containing:
            - num_limit_sets: Number of LimitSets in the pool
            - load_balancing: Load balancing algorithm
            - worker_index: Worker's starting offset
            - balancer_stats: Statistics from the load balancer
            - limit_sets: List of stats dicts, one per LimitSet

    Example:
        Get and display stats::

            stats = pool.get_stats()
            print(f"LimitSets: {stats['num_limit_sets']}")
            print(f"Algorithm: {stats['load_balancing']}")
            print(f"Balancer: {stats['balancer_stats']}")

            for i, ls_stats in enumerate(stats['limit_sets']):
                print(f"LimitSet {i}: {ls_stats}")
    """
    return {
        "num_limit_sets": len(self.limit_sets),
        "load_balancing": self.load_balancing.value,
        "worker_index": self.worker_index,
        "balancer_stats": self._balancer.get_stats(),
        "limit_sets": [ls.get_stats() for ls in self.limit_sets],
    }