The @task Decorator¶
Add concurrency to any function with a single line of code.
The Problem: Boilerplate¶
Sometimes you don't want to define a class MyWorker(Worker). You just have a function, and you want it to run in the background, or on a separate process, or on a Ray cluster.
Writing a full worker class for a single function feels like overkill.
The Solution: @task¶
The @task decorator transforms a regular function into a fully-managed TaskWorker.
from concurry import task
# 1. Decorate your function
@task(mode="thread", max_workers=4)
def heavy_computation(x):
return x ** 2
# 2. Call it! (Returns a Future)
future = heavy_computation(10)
print(future.result()) # 100
# 3. Map over it
results = list(heavy_computation.map(range(10)))
The "Magic": What actually happens?¶
When you decorate a function, Concurry replaces it with a TaskWorker instance.
* It's not a function anymore: It's an object with methods like .submit(), .map(), and .stop().
* It manages a pool: The max_workers=4 argument created a thread pool behind the scenes.
Decision Matrix: @task vs Worker Class¶
| Feature | @task Decorator |
@worker Class Decorator |
|---|---|---|
| Use Case | Simple, stateless functions. | Complex, stateful actors. |
| State | No shared state (pure functions). | Can hold state (self.db_conn). |
| Setup | 1 line. | Class definition + initialization. |
| Best For | Scripts, data pipelines, one-offs. | Services, resource managers. |
Advanced Features¶
1. Auto-Injection of Limits¶
If your function accepts a limits argument, Concurry automatically passes the worker's limit set to it.
from concurry import RateLimit
@task(
mode="thread",
limits=[RateLimit(key="api", capacity=5)]
)
def fetch(url, limits):
# The limits object is injected!
with limits.acquire(requested={"api": 1}):
return requests.get(url)
2. On-Demand vs Persistent¶
By default, @task creates a persistent pool. But you can make it ephemeral:
# Creates a NEW thread for every call, then destroys it.
# Good for infrequent tasks.
@task(mode="thread", on_demand=True)
def occasional_job(x):
...
3. Context Manager Cleanup¶
Since the decorated function is a worker, you should clean it up.
Caveats¶
- Lifecycle: You must call
.stop()(or usewith) to clean up resources, especially for Process/Ray modes. - Serialization: Arguments and return values must be pickleable (for Process/Ray).
- Type Hints: Static analysis tools (mypy) might get confused because the type changes from
CallabletoTaskWorker.
See Also¶
- TaskWorker for the underlying implementation.