Skip to content

Commit 512fbf6

Browse files
authored
chore: add extra dimension to celery concurrency limit hits (PostHog#28783)
1 parent d378f12 commit 512fbf6

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

posthog/clickhouse/client/limit.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
CONCURRENT_TASKS_LIMIT_EXCEEDED_COUNTER = Counter(
1212
"posthog_celery_task_concurrency_limit_exceeded",
1313
"Number of times a Celery task exceeded the concurrency limit",
14-
["task_name", "limit"],
14+
["task_name", "limit", "limit_name"],
1515
)
1616

1717
# Lua script for atomic check, remove expired if limit hit, and increment with TTL
@@ -44,7 +44,9 @@ class CeleryConcurrencyLimitExceeded(Exception):
4444
pass
4545

4646

47-
def limit_concurrency(max_concurrent_tasks: int, key: Optional[Callable] = None, ttl: int = 60 * 15) -> Callable:
47+
def limit_concurrency(
48+
max_concurrent_tasks: int, key: Optional[Callable] = None, ttl: int = 60 * 15, limit_name: str = ""
49+
) -> Callable:
4850
def decorator(task_func):
4951
@wraps(task_func)
5052
def wrapper(*args, **kwargs):
@@ -64,7 +66,9 @@ def wrapper(*args, **kwargs):
6466
redis_client.eval(lua_script, 1, running_tasks_key, current_time, task_id, max_concurrent_tasks, ttl)
6567
== 0
6668
):
67-
CONCURRENT_TASKS_LIMIT_EXCEEDED_COUNTER.labels(task_name=task_name, limit=max_concurrent_tasks).inc()
69+
CONCURRENT_TASKS_LIMIT_EXCEEDED_COUNTER.labels(
70+
task_name=task_name, limit=max_concurrent_tasks, limit_name=limit_name
71+
).inc()
6872

6973
raise CeleryConcurrencyLimitExceeded(
7074
f"Exceeded maximum concurrent tasks limit: {max_concurrent_tasks} for key: {dynamic_key}"

posthog/tasks/tasks.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,11 @@ def redis_heartbeat() -> None:
5252
expires=60 * 10, # Do not run queries that got stuck for more than this
5353
reject_on_worker_lost=True,
5454
)
55-
@limit_concurrency(150) # Do not go above what CH can handle (max_concurrent_queries)
55+
@limit_concurrency(150, limit_name="global") # Do not go above what CH can handle (max_concurrent_queries)
5656
@limit_concurrency(
57-
10, key=lambda *args, **kwargs: kwargs.get("team_id") or args[0]
57+
10,
58+
key=lambda *args, **kwargs: kwargs.get("team_id") or args[0],
59+
limit_name="per_team",
5860
) # Do not run too many queries at once for the same team
5961
def process_query_task(
6062
team_id: int,

0 commit comments

Comments
 (0)