Skip to content

Synchronization Architecture

This document describes the design, implementation, and maintenance guidelines for Concurry's synchronization primitives (wait() and gather()).

Table of Contents

  1. Overview
  2. Architecture Layers
  3. BaseFuture Hierarchy
  4. Polling Strategies
  5. Wait Function
  6. Gather Function
  7. Key Design Decisions
  8. Performance Considerations
  9. Extension Points
  10. Common Pitfalls
  11. Testing Requirements

Overview

The synchronization system provides two main primitives that work across all execution modes (sync, asyncio, thread, process, ray):

  • wait(): Block until specified completion conditions are met
  • gather(): Collect results from multiple futures in order or as they complete

Design Goals

  1. Cross-Framework Compatibility: Single API works with all future types
  2. Performance: Efficient polling, minimal overhead, scalable to thousands of futures
  3. Flexibility: Support various input patterns (list, dict, individual futures)
  4. Observability: Progress tracking via progress bars or callbacks
  5. Configurability: Adaptive polling strategies for different workload patterns

Architecture Layers

┌─────────────────────────────────────────────────────────────────┐
│                      User-Facing API                             │
│                   wait() and gather()                            │
│    - Parameter validation                                        │
│    - Input structure handling (list/dict/tuple/set/single)      │
│    - Progress tracking setup                                     │
└──────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                    Backend Functions                             │
│   _gather_blocking_backend() / _gather_iter_backend()           │
│    - Wraps all futures with wrap_future()                       │
│    - Delegates to wait() for completion checking                │
│    - Collects results in order or as they complete              │
└──────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                    Core Wait Logic                               │
│                    wait() function                               │
│    - Batch completion checking via _check_futures_batch()       │
│    - Polling strategy integration                               │
│    - Return condition evaluation                                │
│    - Timeout management                                         │
└──────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                  Efficiency Layer                                │
│              _check_futures_batch()                              │
│    - Ray-specific: Single ray.wait() for batch checking         │
│    - Non-Ray: Individual .done() checks                         │
└──────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                  Unified Future Interface                        │
│                    BaseFuture + wrap_future()                    │
│    - SyncFuture (immediate results)                             │
│    - ConcurrentFuture (thread/process futures)                  │
│    - AsyncioFuture (asyncio futures)                            │
│    - RayFuture (Ray ObjectRefs)                                 │
└──────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                Underlying Framework Futures                      │
│  concurrent.futures.Future │ asyncio.Future │ ray.ObjectRef     │
└─────────────────────────────────────────────────────────────────┘

BaseFuture Hierarchy

Design Philosophy

BaseFuture provides a unified abstraction over different future implementations using the Adapter Pattern. All futures expose the same API regardless of the underlying framework.

Class Structure

BaseFuture (ABC)
├── SyncFuture          # Immediate results (already computed)
├── ConcurrentFuture    # Thread/Process futures (delegates to concurrent.futures.Future)
├── AsyncioFuture       # Asyncio futures (adds timeout support)
└── RayFuture           # Ray ObjectRefs (adds state tracking + callbacks)

Critical Design Rules

Rule 1: State Management Strategy

Each future type has a different state management strategy based on its characteristics:

SyncFuture: - Always done at creation (_done=True) - Result/exception immutable after construction - No lock needed (single-threaded usage) - Caches result/exception in _result and _exception slots

ConcurrentFuture: - Pure delegation wrapper (no state caching) - Does NOT cache _result, _exception, _done, or _cancelled - All state queries delegate directly to underlying concurrent.futures.Future - Underlying future is inherently thread-safe - Only stores: uuid, _future, _callbacks, _lock

AsyncioFuture: - Pure delegation wrapper (no state caching) - Does NOT cache _result, _exception, or _done - All state queries delegate directly to underlying asyncio.Future - Uses _lock for thread-safe access to asyncio future - Implements timeout support via polling (asyncio futures don't natively support timeouts) - Only stores: uuid, _future, _callbacks, _lock, _poll_interval

RayFuture: - Caches state after fetching from Ray - Stores: uuid, _object_ref, _result, _exception, _done, _cancelled, _callbacks, _lock - Uses global monitor thread for automatic callback invocation - Critical Bug Pattern: Must NOT set _done=True in done() method without fetching result - ❌ WRONG: done() sets _done=True without calling ray.get() - ✅ CORRECT: done() only calls ray.wait() to check, doesn't set _done - Reason: If _done=True but _result is still None, result(timeout=0) will return None instead of fetching

Rule 2: The Delegation vs. Caching Decision

When to cache state: - Future's underlying framework doesn't provide efficient state querying (Ray) - State queries are expensive (network calls) - Results need to be accessed multiple times

When to delegate: - Underlying future is already thread-safe and efficient (concurrent.futures, asyncio) - Framework manages state better than we can - Avoids state synchronization bugs

ConcurrentFuture and AsyncioFuture are pure wrappers because: 1. concurrent.futures.Future and asyncio.Future already manage state efficiently 2. Caching would duplicate state and risk inconsistency 3. Delegation is zero-overhead

RayFuture must cache because: 1. ray.get() is an expensive network call 2. ObjectRef doesn't cache results itself 3. Callbacks require local state tracking

Rule 3: Thread Safety Requirements

All futures must be thread-safe because synchronization primitives are multi-threaded: - Multiple threads may call wait() or gather() simultaneously - Worker proxy pools dispatch to multiple workers from multiple threads - Progress callbacks may be invoked from different threads

Implementation patterns: - SyncFuture: No lock (immutable after creation) - ConcurrentFuture: Delegates to inherently thread-safe concurrent.futures.Future - AsyncioFuture: Uses threading.Lock() to protect asyncio future access - RayFuture: Uses threading.Lock() to protect cached state

Rule 4: Exception Consistency

All futures must raise the same exception types: - concurrent.futures.CancelledError (not asyncio.CancelledError) - TimeoutError (not ray.exceptions.GetTimeoutError) - Original exception from computation (unwrapped)

AsyncioFuture must convert:

# ✅ Correct pattern
try:
    return self._future.result()
except asyncio.CancelledError:
    raise CancelledError("Future was cancelled") from None

RayFuture must convert:

# ✅ Correct pattern
except Exception as e:
    if e.__class__.__name__ == "GetTimeoutError":
        raise TimeoutError("Future did not complete within timeout") from e

Rule 5: Callback Invocation Rules

All futures must pass the wrapper (BaseFuture) to callbacks, not the underlying future:

# ✅ Correct - ConcurrentFuture
def add_done_callback(self, fn: Callable) -> None:
    self._future.add_done_callback(lambda _: fn(self))  # Pass 'self', not '_'

# ✅ Correct - AsyncioFuture
def add_done_callback(self, fn: Callable) -> None:
    def wrapped_callback(fut):
        fn(self)  # Pass wrapper, not 'fut'
    self._future.add_done_callback(wrapped_callback)

Callbacks must be invoked exactly once when the future completes.

Rule 6: The wrap_future() Function

Purpose: Automatically convert any future-like object into a BaseFuture.

Detection order (critical - order matters):

def wrap_future(future: Any) -> BaseFuture:
    if isinstance(future, BaseFuture):
        return future  # Already wrapped, idempotent
    elif isinstance(future, concurrent.futures.Future):
        return ConcurrentFuture(future=future)
    elif asyncio.isfuture(future):
        return AsyncioFuture(future=future)
    elif _IS_RAY_INSTALLED and isinstance(future, ray.ObjectRef):
        return RayFuture(object_ref=future)
    else:
        # Fallback: wrap as immediate result
        return SyncFuture(result_value=future)

Why this order: 1. Check BaseFuture first for idempotency 2. Check concurrent.futures.Future before asyncio (some futures might satisfy both checks) 3. Check Ray only if installed (avoid import errors) 4. Fallback to SyncFuture for non-future values (e.g., gather([1, 2, future1]))

Memory Optimization

slots usage is critical for performance when dealing with thousands of futures:

# Each future type defines minimal slots
class ConcurrentFuture(BaseFuture):
    __slots__ = ("uuid", "_future", "_callbacks", "_lock")
    # Saves 32 bytes per instance vs. caching _result, _exception, _done, _cancelled

class AsyncioFuture(BaseFuture):
    __slots__ = ("uuid", "_future", "_callbacks", "_lock", "_poll_interval")
    # Saves 24 bytes per instance

class RayFuture(BaseFuture):
    __slots__ = ("uuid", "_object_ref", "_result", "_exception", 
                 "_done", "_cancelled", "_callbacks", "_lock")
    # Must cache all state

Impact: With 10,000 futures, removing unused slots saves 320KB (ConcurrentFuture) or 240KB (AsyncioFuture).


Polling Strategies

Purpose

Polling strategies control how frequently we check future completion. Trade-off: - Fast polling: Low latency, high CPU usage - Slow polling: High latency, low CPU usage - Adaptive: Balance based on workload

Architecture

All strategies inherit from BasePollingStrategy (Registry + MutableTyped):

class BasePollingStrategy(Registry, MutableTyped, ABC):
    @abstractmethod
    def get_next_interval(self) -> float:
        """Return next sleep interval in seconds."""
        pass

    @abstractmethod
    def record_completion(self) -> None:
        """Called when futures complete."""
        pass

    @abstractmethod
    def record_no_completion(self) -> None:
        """Called when no futures complete."""
        pass

    @abstractmethod
    def reset(self) -> None:
        """Reset to initial state."""
        pass

Strategy Implementations

FixedPollingStrategy

Behavior: Constant interval, no adaptation.

Best for: Predictable workloads, testing, benchmarking.

Parameters (from global_config.defaults): - interval: polling_fixed_interval (default: 0.01 seconds = 10ms)

Algorithm:

Always return: interval

Behavior: Speeds up on completions, slows down when idle.

Best for: Variable workloads, unknown completion patterns, general use.

Parameters (from global_config.defaults): - min_interval: polling_adaptive_min_interval (default: 0.001s = 1ms) - max_interval: polling_adaptive_max_interval (default: 0.1s = 100ms) - current_interval: polling_adaptive_initial_interval (default: 0.01s = 10ms) - speedup_factor: 0.7 (speed up by 30% on completion) - slowdown_factor: 1.3 (slow down by 30% on idle)

Algorithm:

def record_completion():
    current_interval = max(min_interval, current_interval * 0.7)  # Speed up
    consecutive_empty = 0

def record_no_completion():
    consecutive_empty += 1
    if consecutive_empty >= 3:  # After 3 empty checks
        current_interval = min(max_interval, current_interval * 1.3)  # Slow down

Why this works: - Starts at moderate speed (10ms) - Speeds up when futures are completing (1ms minimum) - Slows down after consecutive empty checks (100ms maximum) - Prevents thrashing by requiring 3 empty checks before slowdown

ExponentialPollingStrategy

Behavior: Exponential backoff on idle, reset on completion.

Best for: Long-running operations, sporadic completions.

Parameters (from global_config.defaults): - initial_interval: polling_exponential_initial_interval (default: 0.001s = 1ms) - max_interval: polling_exponential_max_interval (default: 1.0s) - multiplier: 2.0 (double each empty check)

Algorithm:

def record_completion():
    current_interval = initial_interval  # Reset to fast

def record_no_completion():
    current_interval = min(max_interval, current_interval * 2.0)  # Double

Behavior over time (if no completions):

1ms → 2ms → 4ms → 8ms → 16ms → 32ms → 64ms → 128ms → 256ms → 512ms → 1000ms (capped)

ProgressivePollingStrategy

Behavior: Steps through fixed interval levels.

Best for: Predictable phases, explicit control.

Parameters (from global_config.defaults): - intervals: Generated from polling_progressive_min_interval and polling_progressive_max_interval - Default: (0.001, 0.005, 0.01, 0.05, 0.1) = (1ms, 5ms, 10ms, 50ms, 100ms) - checks_before_increase: 5 (stay at each level for 5 checks)

Algorithm:

def record_completion():
    current_index = 0  # Reset to fastest
    checks_at_level = 0

def record_no_completion():
    checks_at_level += 1
    if checks_at_level >= 5:  # After 5 checks at this level
        current_index = min(len(intervals) - 1, current_index + 1)  # Next level
        checks_at_level = 0

Strategy Selection in wait()/gather()

Default: PollingAlgorithm.Adaptive

How strategies are created:

  1. User passes polling parameter to wait() or gather()
  2. If it's a BasePollingStrategy instance, use as-is
  3. If it's a PollingAlgorithm enum or string, create strategy with config defaults:
from concurry.config import global_config

defaults = global_config.defaults

if polling == PollingAlgorithm.Adaptive:
    strategy = AdaptivePollingStrategy(
        min_interval=defaults.polling_adaptive_min_interval,
        max_interval=defaults.polling_adaptive_max_interval,
        current_interval=defaults.polling_adaptive_initial_interval,
    )

Configuration Integration: - All default intervals come from global_config.defaults - No hardcoded values in strategy creation - Users can customize via global_config or by passing strategy instance


Wait Function

Signature

def wait(
    fs: Union[List, Tuple, Set, Dict, Any],
    *futs,
    timeout: Optional[float] = None,
    return_when: Union[ReturnWhen, str] = ReturnWhen.ALL_COMPLETED,
    polling: Union[PollingAlgorithm, str] = PollingAlgorithm.Adaptive,
    progress: Union[bool, Dict, Callable, None] = None,
    recurse: bool = False,
) -> Tuple[Set[BaseFuture], Set[BaseFuture]]:

Input Patterns

Pattern 1: List/Tuple/Set (most common)

futures = [worker.task(i) for i in range(10)]
done, not_done = wait(futures)

Pattern 2: Dictionary (preserves keys)

tasks = {"download": f1, "process": f2, "upload": f3}
done, not_done = wait(tasks)  # done/not_done contain wrapped futures

Pattern 3: Individual futures (variadic)

done, not_done = wait(future1, future2, future3)

Pattern 4: Single future

done, not_done = wait(single_future)

Critical Validation

Cannot mix structure and variadic args:

# ❌ ERROR
futures = [f1, f2, f3]
wait(futures, f4, f5)  # Raises ValueError

# ✅ VALID
wait([f1, f2, f3, f4, f5])  # Pass as list
wait(f1, f2, f3, f4, f5)     # Pass individually

Implementation:

if len(futs) > 0 and isinstance(fs, (list, tuple, set, dict)):
    raise ValueError(
        "Cannot provide both a structure (list/tuple/set/dict) as first argument "
        "and additional futures via *futs. Either pass a structure, or pass individual futures."
    )

Return Conditions

ReturnWhen.ALL_COMPLETED (default): - Wait until all futures are done - Most common use case

ReturnWhen.FIRST_COMPLETED: - Return as soon as any future completes - Useful for racing multiple operations

ReturnWhen.FIRST_EXCEPTION: - Return as soon as any future raises an exception - Useful for fail-fast scenarios

Implementation:

# Check after each batch of completions
if return_when == ReturnWhen.FIRST_COMPLETED and len(done) > 0:
    return done, not_done

if return_when == ReturnWhen.FIRST_EXCEPTION:
    for fut in newly_done:
        try:
            if fut.exception(timeout=0) is not None:
                return done, not_done
        except Exception:
            pass

if return_when == ReturnWhen.ALL_COMPLETED and len(not_done) == 0:
    return done, not_done

Core Algorithm

def wait(...):
    # 1. Wrap all futures
    futures_list = [wrap_future(f) for f in ...]

    # 2. Initialize sets
    done: Set[BaseFuture] = set()
    not_done: Set[BaseFuture] = set(futures_list)

    # 3. Create polling strategy
    strategy = create_strategy(polling, global_config.defaults)

    # 4. Initial check
    initial_done = _check_futures_batch(not_done)
    done.update(initial_done)
    not_done.difference_update(initial_done)

    # Check if can return early
    if condition_met(return_when, done, not_done):
        return done, not_done

    # 5. Main polling loop
    while True:
        # Check timeout
        if timeout and elapsed >= timeout:
            raise TimeoutError(...)

        # Batch check
        newly_done = _check_futures_batch(not_done)

        if len(newly_done) > 0:
            done.update(newly_done)
            not_done.difference_update(newly_done)
            strategy.record_completion()
            update_progress(...)

            # Check return conditions
            if condition_met(return_when, done, not_done):
                return done, not_done
        else:
            strategy.record_no_completion()

        # Sleep
        interval = strategy.get_next_interval()
        time.sleep(interval)

Batch Checking Optimization

Purpose: Efficiently check multiple futures without O(n) IPC calls.

Ray Optimization:

def _check_futures_batch(futures_to_check):
    if len(futures_to_check) == 0:
        return set()

    completed = set()

    # Separate Ray futures
    if _IS_RAY_INSTALLED:
        ray_futures = []
        ray_future_map = {}

        for fut in futures_to_check:
            if hasattr(fut, "_object_ref"):
                ray_futures.append(fut._object_ref)
                ray_future_map[id(fut._object_ref)] = fut

        # Batch check ALL Ray futures with single ray.wait() call
        if len(ray_futures) > 0:
            ready, not_ready = ray.wait(
                ray_futures, 
                num_returns=len(ray_futures),  # Check all
                timeout=0  # Non-blocking
            )
            for ref in ready:
                completed.add(ray_future_map[id(ref)])

    # Check non-Ray futures individually
    for fut in futures_to_check:
        if fut not in completed:
            if fut.done():
                completed.add(fut)

    return completed

Why this matters: - Without optimization: 5000 Ray futures = 5000 ray.wait() calls (expensive IPC) - With optimization: 5000 Ray futures = 1 ray.wait() call (single IPC) - Dramatic performance improvement for large batches


Gather Function

Signature

def gather(
    fs: Union[List, Tuple, Set, Dict, Any],
    *futs,
    return_exceptions: bool = False,
    iter: bool = False,
    timeout: Optional[float] = None,
    polling: Union[PollingAlgorithm, str] = PollingAlgorithm.Adaptive,
    progress: Union[bool, Dict, Callable, None] = None,
    recurse: bool = False,
) -> Union[List[Any], Dict[Any, Any], Iterator[Tuple[int, Any]]]:

Return Types

List input → List output:

futures = [w.task(i) for i in range(5)]
results = gather(futures)  # Returns: [r0, r1, r2, r3, r4]

Dict input → Dict output (keys preserved):

tasks = {"download": f1, "process": f2}
results = gather(tasks)  # Returns: {"download": r1, "process": r2}

Iterator mode → Generator (yields as completed):

for idx, result in gather(futures, iter=True):
    print(f"Future {idx} completed: {result}")

for key, result in gather(tasks, iter=True):
    print(f"Task {key} completed: {result}")

Backend Architecture

Two backend functions: 1. _gather_blocking_backend(): Waits for all, returns in order 2. _gather_iter_backend(): Yields results as they complete

Dispatch logic:

def gather(...):
    # Validate and build args
    if len(futs) > 0:
        args = (fs,) + futs
        is_dict_input = False
    else:
        args = (fs,)
        is_dict_input = isinstance(fs, dict)

    # Delegate
    if iter:
        return _gather_iter_backend(args, ..., is_dict_input)
    else:
        return _gather_blocking_backend(args, ..., is_dict_input)

Blocking Backend Algorithm

def _gather_blocking_backend(fs, return_exceptions, timeout, polling, progress, recurse, is_dict_input):
    # Special case: Dict input
    if is_dict_input and len(fs) == 1:
        futures_dict = fs[0]
        keys = list(futures_dict.keys())
        futures_list = [wrap_future(v) for v in futures_dict.values()]

        # Wait for all
        done, not_done = wait(futures_list, timeout=timeout, ...)

        if len(not_done) > 0:
            raise TimeoutError(...)

        # Collect results preserving keys
        results_dict = {}
        for key, fut in zip(keys, futures_list):
            try:
                results_dict[key] = fut.result(timeout=0)
            except Exception as e:
                if return_exceptions:
                    results_dict[key] = e
                else:
                    raise

        return results_dict

    # Special case: List/Tuple/Set input (single structure)
    if len(fs) == 1 and isinstance(fs[0], (list, tuple, set)):
        futures_list = [wrap_future(f) for f in fs[0]]

        # Wait for all
        done, not_done = wait(futures_list, timeout=timeout, ...)

        if len(not_done) > 0:
            raise TimeoutError(...)

        # Collect results in order
        results = []
        for fut in futures_list:
            try:
                results.append(fut.result(timeout=0))
            except Exception as e:
                if return_exceptions:
                    results.append(e)
                else:
                    raise

        return results

    # General case: Multiple individual futures
    futures_list = [wrap_future(f) for f in fs]

    # Wait for all
    done, not_done = wait(futures_list, timeout=timeout, ...)

    if len(not_done) > 0:
        raise TimeoutError(...)

    # Collect results
    results = []
    for fut in futures_list:
        try:
            results.append(fut.result(timeout=0))
        except Exception as e:
            if return_exceptions:
                results.append(e)
            else:
                raise

    return results

Critical detail: Always call fut.result(timeout=0) after wait() completes. The future is already done, so timeout=0 is safe and avoids hanging.

Iterator Backend Algorithm

def _gather_iter_backend(fs, return_exceptions, timeout, polling, progress, recurse, is_dict_input):
    # Build future list and key mapping
    if is_dict_input and len(fs) == 1:
        keys_list = list(fs[0].keys())
        futures_list = [wrap_future(v) for v in fs[0].values()]
        future_to_key = {id(fut): key for fut, key in zip(futures_list, keys_list)}
    elif len(fs) == 1 and isinstance(fs[0], (list, tuple, set)):
        futures_list = [wrap_future(f) for f in fs[0]]
        future_to_key = {id(fut): i for i, fut in enumerate(futures_list)}
    else:
        futures_list = [wrap_future(f) for f in fs]
        future_to_key = {id(fut): i for i, fut in enumerate(futures_list)}

    # Create polling strategy
    strategy = create_strategy(polling, global_config.defaults)

    # Track pending
    pending = set(futures_list)

    # Main loop
    while len(pending) > 0:
        # Check timeout
        if timeout and elapsed >= timeout:
            raise TimeoutError(...)

        # Batch check
        newly_done = _check_futures_batch(pending)

        if len(newly_done) > 0:
            # Yield completed futures
            for fut in newly_done:
                key_or_index = future_to_key[id(fut)]
                try:
                    result = fut.result(timeout=0)
                    yield (key_or_index, result)
                except Exception as e:
                    if return_exceptions:
                        yield (key_or_index, e)
                    else:
                        raise

            pending.difference_update(newly_done)
            strategy.record_completion()
        else:
            strategy.record_no_completion()

        # Sleep
        if len(pending) > 0:
            interval = strategy.get_next_interval()
            time.sleep(interval)

Key difference from blocking backend: - Yields immediately when futures complete (out of order) - Returns (key/index, result) tuples for tracking - Continues until all futures are yielded


Key Design Decisions

1. Why Primary Signature is fs (not *fs)?

Rationale: Most common usage is passing a collection:

futures = [worker.task(i) for i in range(100)]
results = gather(futures)  # Most common - pass list directly

Alternative (worse):

results = gather(*futures)  # Unpacking required - less intuitive

Design choice: Optimize for the most common case. Support variadic for convenience, but validate against mixing patterns.

2. Why Dict Support with Key Preservation?

Use case: Named tasks with meaningful identifiers:

tasks = {
    "download_data": worker.download(),
    "process_data": worker.process(),
    "upload_results": worker.upload(),
}

results = gather(tasks)
print(results["download_data"])  # Access by name

for task_name, result in gather(tasks, iter=True):
    print(f"{task_name} completed: {result}")

Benefit: Self-documenting code, easier debugging, natural data structure.

3. Why iter=True Instead of Separate Function?

Alternatives considered: - gather() vs. gather_iter() (old design) - gather() vs. as_completed() (asyncio pattern)

Chosen: gather(iter=True)

Rationale: 1. Single function for gathering (simpler API) 2. Parameter makes behavior explicit 3. Same validation and input patterns for both modes 4. Less duplication in implementation

4. Why Batch Checking for Ray?

Problem: Individual ray.wait() calls are expensive (IPC overhead).

Solution: Single ray.wait() call with num_returns=len(futures):

ready, not_ready = ray.wait(ray_futures, num_returns=len(ray_futures), timeout=0)

Impact: - 5000 futures: ~50ms for batch check vs. ~5000ms for individual checks - 100x speedup for large batches

5. Why Adaptive Polling as Default?

Alternatives: - Fixed: Predictable but not adaptive - Exponential: Too aggressive for short tasks - Progressive: Less adaptive than Adaptive

Adaptive strategy: - Speeds up when futures complete frequently - Slows down when idle - Good balance for unknown workloads

Result: Best general-purpose default for Concurry users.

6. Why No Auto-Detection of Return Type?

Could do:

# Auto-detect if user wants list or iterator
def gather(futures, ...):
    if user_expects_iterator:  # How to detect?
        return generator
    else:
        return list

Why not: - Ambiguous: Can't reliably detect user intent - Implicit behavior: Hard to reason about - Type checking: Difficult for static analysis

Chosen: Explicit iter=True parameter.

7. Why return_exceptions Parameter?

Use case: Don't want to stop gathering if one task fails:

results = gather(
    [worker.task(i) for i in range(100)],
    return_exceptions=True
)

for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Task {i} failed: {result}")
    else:
        print(f"Task {i} succeeded: {result}")

Alternative (worse):

# Have to wrap every result access in try/except
results = gather(...)  # Raises on first exception

Benefit: Collect all results/errors, analyze batch failures.


Performance Considerations

Memory Usage

With 10,000 futures: - SyncFuture: 10K * 96 bytes = 960 KB - ConcurrentFuture: 10K * 64 bytes = 640 KB (after slot optimization) - RayFuture: 10K * 128 bytes = 1.28 MB

Optimization impact: - Removed unused slots from ConcurrentFuture: Saved 320 KB (32 bytes * 10K) - Removed unused slots from AsyncioFuture: Saved 240 KB (24 bytes * 10K)

Polling Overhead

Adaptive polling CPU usage (5000 futures, 10-second completion): - Starts at 10ms interval: ~1000 checks - Speeds up to 1ms on completions: ~2000 checks - Slows to 100ms on idle: ~100 checks - Total checks: ~3100 (vs. 10,000 with 1ms fixed)

CPU time: ~3100 * 0.1ms = 310ms over 10 seconds = 3.1% CPU.

Ray Batch Checking Scaling

5000 Ray futures: - Individual checks: 5000 * 1ms = 5000ms (5 seconds) per poll cycle - Batch check: 1 * 10ms = 10ms per poll cycle - Speedup: 500x

Progress Bar Overhead

With 10,000 futures and miniters=100: - Progress updates: 100 calls to ProgressBar.update() - Overhead: ~100 * 0.1ms = 10ms total - Negligible compared to polling


Extension Points

Adding a New Future Type

Example: Add support for Dask futures.

  1. Create DaskFuture class:

    class DaskFuture(BaseFuture):
        __slots__ = ("uuid", "_dask_future", "_result", "_exception", "_done", "_cancelled", "_callbacks", "_lock")
    
        FUTURE_UUID_PREFIX = "dask-future-"
    
        def __init__(self, dask_future):
            # Validate
            if not isinstance(dask_future, dask.distributed.Future):
                raise TypeError(...)
    
            self.uuid = f"{self.FUTURE_UUID_PREFIX}{id(self)}"
            self._dask_future = dask_future
            self._result = None
            self._exception = None
            self._done = False
            self._cancelled = False
            self._callbacks = []
            self._lock = threading.Lock()
    
        def done(self) -> bool:
            # Query Dask future
            return self._dask_future.done()
    
        def result(self, timeout: Optional[float] = None) -> Any:
            # Implement with Dask-specific logic
            ...
    
        # ... implement all abstract methods
    

  2. Update wrap_future():

    def wrap_future(future: Any) -> BaseFuture:
        if isinstance(future, BaseFuture):
            return future
        elif isinstance(future, concurrent.futures.Future):
            return ConcurrentFuture(future=future)
        elif asyncio.isfuture(future):
            return AsyncioFuture(future=future)
        elif _IS_DASK_INSTALLED and isinstance(future, dask.distributed.Future):  # Add here
            return DaskFuture(dask_future=future)
        elif _IS_RAY_INSTALLED and isinstance(future, ray.ObjectRef):
            return RayFuture(object_ref=future)
        else:
            return SyncFuture(result_value=future)
    

  3. Optimize batch checking (if applicable):

    def _check_futures_batch(futures_to_check):
        # ... existing Ray optimization ...
    
        # Add Dask batch checking
        if _IS_DASK_INSTALLED:
            dask_futures = []
            for fut in futures_to_check:
                if hasattr(fut, "_dask_future"):
                    dask_futures.append(fut._dask_future)
    
            if len(dask_futures) > 0:
                # Use Dask's batch API if available
                ready = dask.distributed.wait(dask_futures, timeout=0, return_when='FIRST_COMPLETED')
                # Map back to BaseFuture wrappers
                ...
    

Testing: Add Dask to worker_mode fixture in conftest.py.

Adding a New Polling Strategy

Example: Add a SinusoidalPollingStrategy that oscillates intervals.

  1. Create strategy class:

    class SinusoidalPollingStrategy(BasePollingStrategy):
        """Oscillating polling interval based on sine wave."""
    
        aliases = ["sinusoidal", PollingAlgorithm.Sinusoidal]  # Add to PollingAlgorithm enum
    
        min_interval: float
        max_interval: float
        current_step: int = 0
        period: int = 20  # Complete cycle in 20 checks
    
        def get_next_interval(self) -> float:
            # Compute sine wave position
            import math
            t = (self.current_step % self.period) / self.period  # 0 to 1
            sine_value = math.sin(2 * math.pi * t)  # -1 to 1
            normalized = (sine_value + 1) / 2  # 0 to 1
            interval = self.min_interval + normalized * (self.max_interval - self.min_interval)
            return interval
    
        def record_completion(self) -> None:
            self.current_step = 0  # Reset to start of cycle
    
        def record_no_completion(self) -> None:
            self.current_step += 1  # Progress through cycle
    
        def reset(self) -> None:
            self.current_step = 0
    

  2. Add to PollingAlgorithm enum:

    class PollingAlgorithm(AutoEnum):
        Fixed = auto()
        Adaptive = auto()
        Exponential = auto()
        Progressive = auto()
        Sinusoidal = auto()  # Add here
    

  3. Update wait() strategy creation:

    if polling == PollingAlgorithm.Sinusoidal:
        strategy = SinusoidalPollingStrategy(
            min_interval=defaults.polling_sinusoidal_min_interval,
            max_interval=defaults.polling_sinusoidal_max_interval,
        )
    

  4. Add configuration defaults:

    class GlobalDefaults(MutableTyped):
        # ... existing ...
        polling_sinusoidal_min_interval: float = 0.001
        polling_sinusoidal_max_interval: float = 0.1
    

Testing: Add tests to test_polling.py for new strategy.


Common Pitfalls

Pitfall 1: Setting _done=True Without Fetching Result (RayFuture Bug)

Problem:

# ❌ WRONG
def done(self) -> bool:
    if self._done:
        return True

    ready, _ = ray.wait([self._object_ref], timeout=0)
    if len(ready) > 0:
        self._done = True  # ❌ BUG: Set _done but didn't fetch result!
        return True
    return False

# Later...
def result(self, timeout: Optional[float] = None) -> Any:
    if self._done:
        return self._result  # ❌ Returns None!

Why this fails: - done() sets _done=True - result() sees _done=True, returns cached _result - But _result is still None because ray.get() was never called

Fix:

# ✅ CORRECT
def done(self) -> bool:
    if self._done:
        return True

    ready, _ = ray.wait([self._object_ref], timeout=0)
    # Don't set _done here - only set it in result() when actually fetching
    return len(ready) > 0

Lesson: State caching requires careful synchronization. Only set _done=True when the result is actually fetched.

Pitfall 2: Forgetting to Wrap Futures

Problem:

# ❌ WRONG
def custom_function(futures):
    # Assume futures are already BaseFuture
    for fut in futures:
        if fut.done():  # AttributeError if fut is a raw ObjectRef!
            ...

Fix:

# ✅ CORRECT
from concurry.core.future import wrap_future

def custom_function(futures):
    wrapped = [wrap_future(f) for f in futures]
    for fut in wrapped:
        if fut.done():  # Always works
            ...

Lesson: Always use wrap_future() when accepting future-like objects from external sources.

Pitfall 3: Caching State in Delegation Wrappers

Problem:

# ❌ WRONG - ConcurrentFuture with caching
class ConcurrentFuture(BaseFuture):
    __slots__ = ("uuid", "_future", "_result", "_done", ...)  # ❌ Don't cache!

    def done(self) -> bool:
        if self._done:
            return self._done
        self._done = self._future.done()  # ❌ Duplicates state
        return self._done

Why this fails: - Duplicate state risks desynchronization - Wastes memory - concurrent.futures.Future already manages state

Fix:

# ✅ CORRECT - Pure delegation
class ConcurrentFuture(BaseFuture):
    __slots__ = ("uuid", "_future", "_callbacks", "_lock")  # No caching

    def done(self) -> bool:
        return self._future.done()  # Direct delegation

Lesson: Only cache state when necessary (e.g., RayFuture). Otherwise, delegate.

Pitfall 4: Hardcoding Polling Intervals

Problem:

# ❌ WRONG
def wait(futures, ...):
    strategy = FixedPollingStrategy(interval=0.01)  # Hardcoded!

Fix:

# ✅ CORRECT
from concurry.config import global_config

def wait(futures, polling, ...):
    defaults = global_config.defaults

    if polling == PollingAlgorithm.Fixed:
        strategy = FixedPollingStrategy(
            interval=defaults.polling_fixed_interval  # From config
        )

Lesson: All defaults must go through global_config. See Configuration Architecture.

Pitfall 5: Not Handling Dict Input Correctly

Problem:

# ❌ WRONG - Loses keys
def gather(fs, ...):
    if isinstance(fs, dict):
        futures_list = list(fs.values())
        results = [fut.result() for fut in futures_list]
        return results  # ❌ Returns list, not dict!

Fix:

# ✅ CORRECT - Preserves keys
def gather(fs, ...):
    if isinstance(fs, dict):
        keys = list(fs.keys())
        futures_list = list(fs.values())
        results = [fut.result() for fut in futures_list]
        return dict(zip(keys, results))  # ✅ Returns dict

Lesson: When dict is provided, output must be dict with same keys.

Pitfall 6: Blocking Indefinitely Without Timeout

Problem:

# ❌ WRONG - No timeout, can hang forever
done, not_done = wait(futures)  # Hangs if futures never complete

Fix:

# ✅ CORRECT - Always use timeout in production
done, not_done = wait(futures, timeout=30.0)

Lesson: Always specify timeouts in production code to prevent deadlocks.

Pitfall 7: Mixing Structure and Variadic Args

Problem:

# ❌ WRONG - Ambiguous intent
futures = [f1, f2, f3]
done, not_done = wait(futures, f4, f5)  # Raises ValueError

Why forbidden: - Ambiguous: Is futures a single future or a list? - Hard to reason about what was intended

Fix:

# ✅ CORRECT - Clear intent
done, not_done = wait([f1, f2, f3, f4, f5])  # Pass as list
# OR
done, not_done = wait(f1, f2, f3, f4, f5)  # Pass individually

Lesson: Validate input patterns to prevent ambiguous usage.


Testing Requirements

Must Test Across All Execution Modes

All tests must use the worker_mode fixture:

def test_feature(self, worker_mode):
    """Test feature across all execution modes."""
    w = MyWorker.options(mode=worker_mode).init()
    # ...

Reason: Synchronization primitives must work identically across sync, asyncio, thread, process, and ray modes.

Must Test Edge Cases

Required edge case tests: 1. Empty collections: wait([]), gather([]) 2. Single items: wait(single_future), gather(single_future) 3. Mixed types: gather([future1, 42, future2]) 4. Large batches: wait([futures * 100]) 5. All polling algorithms 6. Dictionary inputs 7. Iterator mode 8. Exception handling with return_exceptions=True 9. Timeout behavior (except sync mode) 10. Progress tracking (bar and callback)

Must Test Return Type Consistency

Dict input must return dict:

def test_gather_dict_returns_dict(self, worker_mode):
    tasks = {"t1": f1, "t2": f2}
    results = gather(tasks)
    assert isinstance(results, dict)
    assert list(results.keys()) == ["t1", "t2"]

Iterator mode must yield correct tuples:

def test_gather_iter_yields_tuples(self, worker_mode):
    futures = [f1, f2, f3]
    items = list(gather(futures, iter=True))
    assert all(isinstance(item, tuple) and len(item) == 2 for item in items)

Must Test Validation

Test invalid input patterns:

def test_wait_rejects_mixed_structure_and_varargs(self):
    futures = [f1, f2]
    with pytest.raises(ValueError, match="Cannot provide both"):
        wait(futures, f3, f4)

Performance Testing

Test scalability (optional but recommended):

def test_wait_large_batch(self, worker_mode):
    """Test wait() with 1000 futures."""
    w = Worker.options(mode=worker_mode).init()
    futures = [w.task(i) for i in range(1000)]

    start = time.time()
    done, not_done = wait(futures, timeout=60.0)
    elapsed = time.time() - start

    assert len(done) == 1000
    assert elapsed < 30.0  # Should complete in reasonable time

Must NOT Skip Tests Due to Failures

❌ NEVER:

def test_feature(self, worker_mode):
    if worker_mode == "ray":
        pytest.skip("Fails on Ray")  # ❌ WRONG

✅ CORRECT:

def test_feature(self, worker_mode):
    if worker_mode == "sync":
        pytest.skip("Sync mode doesn't support timeouts")  # ✅ Valid reason

See: Cursor Rules: Testing Practices for complete testing guidelines.


Summary

The synchronization architecture provides:

  1. Unified Future Interface (BaseFuture + wrap_future())
  2. Works across all execution frameworks
  3. Efficient delegation or caching based on framework
  4. Consistent exception handling

  5. Adaptive Polling Strategies

  6. Fixed, Adaptive (default), Exponential, Progressive
  7. Configurable via global_config
  8. Extensible via Registry pattern

  9. Efficient Batch Checking

  10. Single ray.wait() call for all Ray futures
  11. 100x+ speedup for large batches

  12. Flexible Input Patterns

  13. List/tuple/set, dict (preserves keys), individual futures
  14. Validated to prevent ambiguous usage

  15. Multiple Output Modes

  16. Blocking: Returns all results in order
  17. Iterator: Yields results as they complete
  18. Dict: Preserves input keys

  19. Progress Tracking

  20. Progress bars with tqdm integration
  21. Custom callbacks for advanced use cases

  22. Extension Points

  23. Add new future types by implementing BaseFuture
  24. Add new polling strategies by implementing BasePollingStrategy
  25. All integrations use Registry pattern for factory creation

Key Invariants: - All futures must be thread-safe - All futures must raise consistent exception types - Delegation wrappers must NOT cache state - All defaults must go through global_config - Dict inputs must return dicts with same keys - result(timeout=0) after wait() must always work

For more details: - Configuration: Configuration Architecture - User Guide: Synchronization Guide - Testing: Testing Practices