Skip to content

Synchronization

Orchestrate chaos. Coordinate thousands of concurrent futures with simple, powerful primitives.

The Problem: "Herding Cats"

Launching 1000 tasks is easy. Managing them is hard. * How do you know when they are all done? * What if you only care about the first one to finish? * What if one fails? Do you stop everything or keep going?

The Solution: Two Powerful Primitives

Concurry gives you two main tools: wait() and gather().

Decision Matrix: Which one do I use?

I want to... Use... Why?
Collect all results in order gather(futures) Simple list of results [r1, r2, r3].
Process results as they finish gather(futures, iter=True) Streaming iterator. Low memory usage.
Race multiple tasks wait(futures, return_when="FIRST_COMPLETED") Returns as soon as one wins.
Handle failures immediately wait(futures, return_when="FIRST_EXCEPTION") Fail-fast orchestration.

Quick Start

from concurry import worker, wait, ReturnWhen

@worker(mode="thread")
class DataWorker:
    def fetch_data(self, id: int) -> dict:
        return {"id": id, "data": f"result_{id}"}

# Create worker and submit tasks
worker = DataWorker.init()
futures = [worker.fetch_data(i) for i in range(10)]

# Wait for all to complete
done, not_done = wait(futures, timeout=30.0)

print(f"Completed: {len(done)}, Pending: {len(not_done)}")
worker.stop()

Pattern 1: The "Batch" (Gather)

You have a list of tasks and you want all the answers.

# 1. Launch tasks
futures = [worker.calculate(i) for i in range(100)]

# 2. Wait for all of them
results = gather(futures, progress=True)

# results is [0, 2, 4, ...]

Dictionary Support

If you lose track of which result belongs to which task, use a dictionary!

tasks = {
    "users": worker.get_users(),
    "posts": worker.get_posts(),
    "comments": worker.get_comments()
}

# Results preserve keys
results = gather(tasks)
print(results["users"])

Pattern 2: The "Stream" (Iterator)

Waiting for all 1000 tasks to finish before processing anything is slow. Use iter=True to process them as they complete.

futures = [worker.slow_task(i) for i in range(1000)]

# Yields results as soon as they arrive!
for i, result in gather(futures, iter=True):
    save_to_db(result)

Why this delights users: It makes your application feel faster and uses less memory.


Pattern 3: The "Race" (Wait)

You want to query 3 mirrors and use the fastest one.

from concurry import wait, ReturnWhen

sources = [mirror1.get(), mirror2.get(), mirror3.get()]

# Return as soon as ONE finishes
done, not_done = wait(
    sources, 
    return_when=ReturnWhen.FIRST_COMPLETED
)

fastest_result = done.pop().result()

# Optional: Cancel the losers
for f in not_done:
    f.cancel()

Async Support

If you are using async def functions, use the async-native versions:

from concurry import async_gather

async def main():
    # Works exactly like gather(), but awaitable
    results = await async_gather(futures)

Deep Dive: Polling Strategies

By default, Concurry uses Adaptive Polling. It checks frequently at first, then backs off if tasks are slow, then speeds up again when they start finishing.

You can tune this in global_config, but the default "Just Works" for 99% of cases.