Skip to content

Commit 9125115

Browse files
refactor: Improve logging and retry handling in task management
1 parent d10f764 commit 9125115

File tree

6 files changed

+127
-62
lines changed

6 files changed

+127
-62
lines changed

apps/worker/services/lock_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ def locked(self, lock_type: LockType, retry_num=0, max_retries: int | None = Non
120120
log.error(
121121
"Not retrying since we already had too many retries",
122122
extra={
123-
"repoid": self.repoid,
124123
"commitid": self.commitid,
125124
"lock_name": lock_name,
126125
"max_retries": max_retries,
127-
"retry_num": retry_num,
126+
"repoid": self.repoid,
128127
"report_type": self.report_type.value,
128+
"retry_num": retry_num,
129129
},
130130
exc_info=True,
131131
)
@@ -153,12 +153,12 @@ def locked(self, lock_type: LockType, retry_num=0, max_retries: int | None = Non
153153
log.warning(
154154
"Unable to acquire lock",
155155
extra={
156-
"repoid": self.repoid,
157156
"commitid": self.commitid,
158-
"lock_name": lock_name,
159157
"countdown": countdown,
160-
"retry_num": retry_num,
158+
"lock_name": lock_name,
161159
"max_retries": max_retries,
160+
"repoid": self.repoid,
161+
"retry_num": retry_num,
162162
},
163163
)
164164
raise LockRetry(countdown)

apps/worker/tasks/base.py

Lines changed: 89 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -238,32 +238,62 @@ def _max_total_attempts(self, max_retries: int | None) -> int | None:
238238
"""Convert max retries to max total attempts (includes initial attempt)."""
239239
return max_retries + 1 if max_retries is not None else None
240240

241-
def _get_total_attempts(self) -> int:
242-
"""Get total attempts including re-deliveries from visibility timeout expiration."""
241+
def _get_retry_count(self) -> int:
242+
"""Get retry count safely, handling missing request or retries attribute."""
243243
if not hasattr(self, "request"):
244244
return 0
245+
return self.request.retries if hasattr(self.request, "retries") else 0
246+
247+
def _get_request_headers(self) -> dict:
248+
"""Get request headers safely, handling missing request or get method."""
249+
if not hasattr(self, "request") or self.request is None:
250+
return {}
251+
request = self.request
252+
try:
253+
if hasattr(request, "get") and callable(getattr(request, "get", None)):
254+
return request.get("headers", {})
255+
except (AttributeError, TypeError):
256+
pass
257+
return getattr(request, "headers", {})
258+
259+
def _expected_total_attempts(self) -> int:
260+
"""Get expected total attempts based on retry count (retries + initial attempt)."""
261+
return self._get_retry_count() + 1
262+
263+
def _get_total_attempts(self) -> int:
264+
"""Get total attempts including re-deliveries from visibility timeout expiration.
265+
266+
Returns:
267+
- Header value if present and valid (most accurate)
268+
- retry_count + 1 if header missing/invalid (best guess based on retry count)
269+
- 0 if request unavailable (rare, safe default for comparisons)
270+
271+
Returns int (not None) to be safe for comparisons and logging without null checks.
272+
"""
273+
if not hasattr(self, "request") or self.request is None:
274+
return 0
245275

246-
retry_count = self.request.retries if hasattr(self.request, "retries") else 0
247-
headers = self.request.get("headers", {}) or {}
276+
headers = self._get_request_headers()
248277
total_attempts_header = headers.get("total_attempts", None)
249278

250279
if total_attempts_header is not None:
251280
try:
252281
return int(total_attempts_header)
253282
except (ValueError, TypeError):
283+
retry_count = self._get_retry_count()
254284
log.warning(
255285
"Invalid total_attempts header value",
256286
extra={"value": total_attempts_header, "retry_count": retry_count},
257287
)
258-
return retry_count + 1
259-
return retry_count + 1
288+
return self._expected_total_attempts()
289+
return self._expected_total_attempts()
260290

261291
def _has_exceeded_max_attempts(self, max_retries: int | None) -> bool:
262292
"""Check if task has exceeded max attempts (including re-deliveries)."""
263293
if max_retries is None:
264294
return False
265295

266-
current_retries = self.request.retries if hasattr(self, "request") else 0
296+
current_retries = self._get_retry_count()
267297
total_attempts = self._get_total_attempts()
268298
max_total_attempts = self._max_total_attempts(max_retries)
269299

@@ -279,31 +309,32 @@ def safe_retry(self, max_retries=None, countdown=None, exc=None, **kwargs):
279309
task_max_retries = max_retries if max_retries is not None else self.max_retries
280310

281311
if self._has_exceeded_max_attempts(task_max_retries):
282-
current_retries = self.request.retries if hasattr(self, "request") else 0
312+
current_retries = self._get_retry_count()
283313
total_attempts = self._get_total_attempts()
284314
max_total_attempts = self._max_total_attempts(task_max_retries)
285315
log.error(
286316
f"Task {self.name} exceeded max retries",
287317
extra={
288-
"task_name": self.name,
289318
"current_retries": current_retries,
290-
"total_attempts": total_attempts,
291319
"max_retries": task_max_retries,
292320
"max_total_attempts": max_total_attempts,
321+
"task_name": self.name,
322+
"total_attempts": total_attempts,
293323
},
294324
)
295325
TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc()
296326
return False
297327

298-
current_retries = self.request.retries if hasattr(self, "request") else 0
328+
current_retries = self._get_retry_count()
299329
if countdown is None:
300330
countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**current_retries)
301331

302332
try:
303333
total_attempts = self._get_total_attempts()
304334
headers = {}
305-
if hasattr(self, "request") and hasattr(self.request, "headers"):
306-
headers.update(self.request.headers or {})
335+
request_headers = self._get_request_headers()
336+
if request_headers:
337+
headers.update(request_headers)
307338
headers.update(kwargs.get("headers", {}) or {})
308339
headers["total_attempts"] = total_attempts + 1
309340
kwargs["headers"] = headers
@@ -351,13 +382,23 @@ def _analyse_error(self, exception: SQLAlchemyError, *args, **kwargs):
351382
)
352383

353384
def _emit_queue_metrics(self):
354-
created_timestamp = self.request.get("created_timestamp", None)
385+
if not hasattr(self, "request") or self.request is None:
386+
return
387+
request = self.request
388+
if not hasattr(request, "get"):
389+
return
390+
created_timestamp = request.get("created_timestamp", None)
355391
if created_timestamp:
356392
enqueued_time = datetime.fromisoformat(created_timestamp)
357393
now = datetime.now()
358394
delta = now - enqueued_time
359395

360-
queue_name = self.request.get("delivery_info", {}).get("routing_key", None)
396+
delivery_info = request.get("delivery_info", {})
397+
queue_name = (
398+
delivery_info.get("routing_key", None)
399+
if isinstance(delivery_info, dict)
400+
else None
401+
)
361402
time_in_queue_timer = TASK_TIME_IN_QUEUE.labels(
362403
task=self.name, queue=queue_name
363404
) # TODO is None a valid label value
@@ -377,36 +418,41 @@ def run(self, *args, **kwargs):
377418
task = get_current_task()
378419
if task and task.request:
379420
log_context.task_name = task.name
380-
log_context.task_id = task.request.id
381-
382-
# Track total attempts including re-deliveries from visibility timeout
383-
# If this is a re-delivery (visibility timeout expired), log it
384-
# Note: We can't modify headers here (they're read-only), but we can detect
385-
# re-deliveries by checking if total_attempts is ahead of retry_count + 1
386-
# (which indicates Redis re-queued the task without incrementing retry_count)
387-
headers = task.request.get("headers", {}) or {}
388-
total_attempts = headers.get("total_attempts", None)
389-
retry_count = (
390-
task.request.retries if hasattr(task.request, "retries") else 0
391-
)
392-
393-
if total_attempts is not None:
394-
try:
395-
total_attempts_int = int(total_attempts)
396-
if total_attempts_int > retry_count + 1:
397-
log.warning(
398-
f"Task {task.name} re-delivered (visibility timeout expired)",
421+
task_id = getattr(task.request, "id", None)
422+
if task_id:
423+
log_context.task_id = task_id
424+
425+
try:
426+
headers = self._get_request_headers()
427+
header_total_attempts = headers.get("total_attempts")
428+
if header_total_attempts is not None:
429+
try:
430+
total_attempts = int(header_total_attempts)
431+
retry_count = (
432+
getattr(task.request, "retries", 0)
433+
if hasattr(task.request, "retries")
434+
else 0
435+
)
436+
if total_attempts > retry_count + 1:
437+
log.warning(
438+
f"Task {task.name} re-delivered (visibility timeout expired)",
439+
extra={
440+
"retry_count": retry_count,
441+
"task_id": task_id,
442+
"total_attempts": total_attempts,
443+
},
444+
)
445+
except (ValueError, TypeError):
446+
log.debug(
447+
"Invalid total_attempts header, skipping re-delivery detection",
399448
extra={
400-
"task_id": task.request.id,
401-
"retry_count": retry_count,
402-
"total_attempts": total_attempts_int,
449+
"task_id": task_id,
450+
"value": header_total_attempts,
403451
},
404452
)
405-
except (ValueError, TypeError):
406-
log.debug(
407-
"Invalid total_attempts header, skipping re-delivery detection",
408-
extra={"task_id": task.request.id, "value": total_attempts},
409-
)
453+
except Exception:
454+
# Silently ignore errors accessing request headers to avoid breaking task execution
455+
pass
410456

411457
log_context.populate_from_sqlalchemy(db_session)
412458
set_log_context(log_context)
@@ -485,10 +531,8 @@ def wrap_up_dbsession(self, db_session):
485531
def on_retry(self, exc, task_id, args, kwargs, einfo):
486532
res = super().on_retry(exc, task_id, args, kwargs, einfo)
487533
self.task_retry_counter.inc()
488-
# Track retry count for better observability
489-
retry_count = self.request.retries if hasattr(self, "request") else 0
490534
TASK_RETRY_WITH_COUNT_COUNTER.labels(
491-
task=self.name, retry_count=str(retry_count)
535+
task=self.name, retry_count=str(self._get_retry_count())
492536
).inc()
493537
return res
494538

apps/worker/tasks/bundle_analysis_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ def process_impl_within_lock(
223223
log.exception(
224224
"Failed to commit upload error state",
225225
extra={
226-
"repoid": repoid,
227226
"commit": commitid,
227+
"repoid": repoid,
228228
"upload_id": upload.id_,
229229
},
230230
)

apps/worker/tasks/tests/unit/test_base.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,9 @@ def test_safe_retry_fails_at_max_retries(self, mocker):
564564
task = SampleTask()
565565
task.request.retries = 10
566566
task.max_retries = 10
567+
# Set up request.get() and headers for _get_request_headers()
568+
task.request.get = lambda key, default=None: {} if key == "headers" else default
569+
task.request.headers = {}
567570

568571
mock_retry = mocker.patch.object(task, "retry")
569572

@@ -836,7 +839,6 @@ class SampleTask(BaseCodecovTask, name="test.SampleTask"):
836839
def run_impl(self, dbsession):
837840
return "success"
838841

839-
task = SampleTask()
840842
mock_current_task = MagicMock()
841843
mock_current_task.name = "test.SampleTask"
842844
mock_request = MagicMock()
@@ -848,6 +850,11 @@ def run_impl(self, dbsession):
848850
)
849851
mock_request.headers = {"total_attempts": 5}
850852
mock_current_task.request = mock_request
853+
# Set up self.request on task instance using PropertyMock (request is a property)
854+
# Must patch BEFORE creating task instance
855+
mock_property = PropertyMock(return_value=mock_request)
856+
mocker.patch.object(SampleTask, "request", mock_property, create=True)
857+
task = SampleTask()
851858

852859
mocker.patch("tasks.base.get_current_task", return_value=mock_current_task)
853860
mock_log_warning = mocker.patch("tasks.base.log.warning")
@@ -903,7 +910,6 @@ class SampleTask(BaseCodecovTask, name="test.SampleTask"):
903910
def run_impl(self, dbsession):
904911
return "success"
905912

906-
task = SampleTask()
907913
mock_current_task = MagicMock()
908914
mock_current_task.name = "test.SampleTask"
909915
mock_request = MagicMock()
@@ -915,6 +921,11 @@ def run_impl(self, dbsession):
915921
) # Invalid value
916922
mock_request.headers = {"total_attempts": "invalid"}
917923
mock_current_task.request = mock_request
924+
# Set up self.request on task instance using PropertyMock (request is a property)
925+
# Must patch BEFORE creating task instance
926+
mock_property = PropertyMock(return_value=mock_request)
927+
mocker.patch.object(SampleTask, "request", mock_property, create=True)
928+
task = SampleTask()
918929

919930
mocker.patch("tasks.base.get_current_task", return_value=mock_current_task)
920931
mock_log_warning = mocker.patch("tasks.base.log.warning")
@@ -972,7 +983,6 @@ class SampleTask(BaseCodecovTask, name="test.SampleTask"):
972983
def run_impl(self, dbsession):
973984
return "success"
974985

975-
task = SampleTask()
976986
mock_current_task = MagicMock()
977987
mock_current_task.name = "test.SampleTask"
978988
mock_request = MagicMock()
@@ -984,6 +994,11 @@ def run_impl(self, dbsession):
984994
)
985995
mock_request.headers = {"total_attempts": None}
986996
mock_current_task.request = mock_request
997+
# Set up self.request on task instance using PropertyMock (request is a property)
998+
# Must patch BEFORE creating task instance
999+
mock_property = PropertyMock(return_value=mock_request)
1000+
mocker.patch.object(SampleTask, "request", mock_property, create=True)
1001+
task = SampleTask()
9871002

9881003
mocker.patch("tasks.base.get_current_task", return_value=mock_current_task)
9891004
mock_log_warning = mocker.patch("tasks.base.log.warning")

apps/worker/tasks/tests/unit/test_upload_finisher_task.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,9 @@ def test_retry_on_report_lock(self, dbsession, mocker, mock_redis, mock_self_app
831831

832832
task = UploadFinisherTask()
833833
task.request.retries = 0
834+
# Set up request.get() and headers for _get_request_headers()
835+
task.request.get = lambda key, default=None: {} if key == "headers" else default
836+
task.request.headers = {}
834837

835838
with pytest.raises(Retry):
836839
task.run_impl(
@@ -903,6 +906,9 @@ def test_die_on_finisher_lock(
903906

904907
task = UploadFinisherTask()
905908
task.request.retries = 0
909+
# Set up request.get() and headers for _get_request_headers()
910+
task.request.get = lambda key, default=None: {} if key == "headers" else default
911+
task.request.headers = {}
906912

907913
# Task should call self.retry() which raises Retry exception
908914
with pytest.raises(Retry):

apps/worker/tasks/upload_finisher.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,12 @@ def _process_reports_with_lock(
471471
log.error(
472472
"Upload finisher exceeded max retries",
473473
extra={
474-
"repoid": repoid,
475474
"commitid": commitid,
476-
"retry_count": self.request.retries,
477-
"total_attempts": total_attempts,
478475
"max_retries": MAX_RETRIES,
479476
"max_total_attempts": max_total_attempts,
477+
"repoid": repoid,
478+
"retry_count": self.request.retries,
479+
"total_attempts": total_attempts,
480480
},
481481
)
482482
self._call_upload_breadcrumb_task(
@@ -499,8 +499,8 @@ def _process_reports_with_lock(
499499
log.error(
500500
"Failed to schedule retry for upload finisher",
501501
extra={
502-
"repoid": repoid,
503502
"commitid": commitid,
503+
"repoid": repoid,
504504
"retry_count": self.request.retries,
505505
"total_attempts": total_attempts,
506506
},
@@ -601,12 +601,12 @@ def _handle_finisher_lock(
601601
log.error(
602602
"Upload finisher exceeded max retries (finisher lock)",
603603
extra={
604-
"repoid": repoid,
605604
"commitid": commitid,
606-
"retry_count": self.request.retries,
607-
"total_attempts": total_attempts,
608605
"max_retries": MAX_RETRIES,
609606
"max_total_attempts": max_total_attempts,
607+
"repoid": repoid,
608+
"retry_count": self.request.retries,
609+
"total_attempts": total_attempts,
610610
},
611611
)
612612
self._call_upload_breadcrumb_task(
@@ -629,8 +629,8 @@ def _handle_finisher_lock(
629629
log.error(
630630
"Failed to schedule retry for upload finisher (finisher lock)",
631631
extra={
632-
"repoid": repoid,
633632
"commitid": commitid,
633+
"repoid": repoid,
634634
"retry_count": self.request.retries,
635635
"total_attempts": total_attempts,
636636
},

0 commit comments

Comments
 (0)