Skip to content

Commit 440a2bc

Browse files
committed
feat: no longer do cache rollups in new pipeline
we don’t need to cache rollups in the new pipeline anymore since the API is consuming from timescale directly So we’re removing all code related to caching rollups in the timescale implementation of the TA pipeline
1 parent b3ef429 commit 440a2bc

File tree

8 files changed

+10
-922
lines changed

8 files changed

+10
-922
lines changed

apps/codecov-api/utils/test_results.py

Lines changed: 1 addition & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import tempfile
2-
from datetime import UTC, date, datetime, timedelta
1+
from datetime import UTC, datetime
32
from functools import lru_cache
43

54
import polars as pl
@@ -154,121 +153,3 @@ def old_get_results(
154153
table = dedup_table(table)
155154

156155
return table
157-
158-
159-
def rollup_blob_path(repoid: int, branch: str | None = None) -> str:
160-
return (
161-
f"test_analytics/branch_rollups/{repoid}/{branch}.arrow"
162-
if branch
163-
else f"test_analytics/repo_rollups/{repoid}.arrow"
164-
)
165-
166-
167-
def no_version_agg_table(table: pl.LazyFrame) -> pl.LazyFrame:
168-
total_tests = pl.col("fail_count") + pl.col("pass_count")
169-
total_time = pl.col("avg_duration") * total_tests
170-
171-
failure_rate_expr = (pl.col("fail_count")).sum() / total_tests.sum()
172-
173-
flake_rate_expr = (pl.col("flaky_fail_count")).sum() / total_tests.sum()
174-
175-
avg_duration_expr = total_time.sum() / total_tests.sum()
176-
177-
total_duration_expr = total_time.sum()
178-
179-
table = table.group_by(pl.col("computed_name").alias("name")).agg(
180-
pl.col("flags")
181-
.explode()
182-
.unique()
183-
.alias("flags"), # TODO: filter by this before we aggregate
184-
pl.col("failing_commits").sum().alias("commits_where_fail"),
185-
pl.col("last_duration").max().alias("last_duration"),
186-
failure_rate_expr.alias("failure_rate"),
187-
flake_rate_expr.alias("flake_rate"),
188-
avg_duration_expr.alias("avg_duration"),
189-
total_duration_expr.alias("total_duration"),
190-
pl.col("pass_count").sum().alias("total_pass_count"),
191-
pl.col("fail_count").sum().alias("total_fail_count"),
192-
pl.col("flaky_fail_count").sum().alias("total_flaky_fail_count"),
193-
pl.col("skip_count").sum().alias("total_skip_count"),
194-
pl.col("updated_at").max().alias("updated_at"),
195-
)
196-
197-
return table
198-
199-
200-
def v1_agg_table(table: pl.LazyFrame) -> pl.LazyFrame:
201-
total_tests = pl.col("fail_count") + pl.col("pass_count")
202-
total_time = pl.col("avg_duration") * total_tests
203-
204-
failure_rate_expr = (pl.col("fail_count")).sum() / total_tests.sum()
205-
206-
flake_rate_expr = (pl.col("flaky_fail_count")).sum() / total_tests.sum()
207-
208-
avg_duration_expr = total_time.sum() / total_tests.sum()
209-
210-
total_duration_expr = total_time.sum()
211-
212-
table = table.group_by(pl.col("computed_name").alias("name")).agg(
213-
pl.col("testsuite").alias(
214-
"testsuite"
215-
), # TODO: filter by this before we aggregate
216-
pl.col("flags")
217-
.explode()
218-
.unique()
219-
.alias("flags"), # TODO: filter by this before we aggregate
220-
pl.col("failing_commits").sum().alias("commits_where_fail"),
221-
pl.col("last_duration").max().alias("last_duration"),
222-
failure_rate_expr.fill_nan(0).alias("failure_rate"),
223-
flake_rate_expr.fill_nan(0).alias("flake_rate"),
224-
avg_duration_expr.fill_nan(0).alias("avg_duration"),
225-
total_duration_expr.alias("total_duration"),
226-
pl.col("pass_count").sum().alias("total_pass_count"),
227-
pl.col("fail_count").sum().alias("total_fail_count"),
228-
pl.col("flaky_fail_count").sum().alias("total_flaky_fail_count"),
229-
pl.col("skip_count").sum().alias("total_skip_count"),
230-
pl.col("updated_at").max().alias("updated_at"),
231-
)
232-
233-
return table
234-
235-
236-
def new_get_results(
237-
repoid: int,
238-
branch: str | None,
239-
interval_start: int,
240-
interval_end: int | None = None,
241-
) -> pl.DataFrame | None:
242-
storage_service = get_appropriate_storage_service(repoid)
243-
key = rollup_blob_path(repoid, branch)
244-
try:
245-
with tempfile.TemporaryFile() as tmp:
246-
metadata = {}
247-
storage_service.read_file(
248-
bucket_name=settings.GCS_BUCKET_NAME,
249-
path=key,
250-
file_obj=tmp,
251-
metadata_container=metadata,
252-
)
253-
254-
table = pl.scan_ipc(tmp)
255-
256-
# filter start
257-
start_date = date.today() - timedelta(days=interval_start)
258-
table = table.filter(pl.col("timestamp_bin") >= start_date)
259-
260-
# filter end
261-
if interval_end is not None:
262-
end_date = date.today() - timedelta(days=interval_end)
263-
table = table.filter(pl.col("timestamp_bin") <= end_date)
264-
265-
# aggregate
266-
match metadata.get("version"):
267-
case "1":
268-
table = v1_agg_table(table)
269-
case _: # no version is missding
270-
table = no_version_agg_table(table)
271-
272-
return table.collect()
273-
except FileNotInStorageError:
274-
return None

apps/codecov-api/utils/timescale/test_results.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,7 @@ def get_test_data_queryset_via_testrun(
153153
distinct=True,
154154
),
155155
total_count=(
156-
F("total_pass_count")
157-
+ F("total_fail_count")
158-
+ F("total_skip_count")
159-
+ F("total_flaky_fail_count")
156+
F("total_pass_count") + F("total_fail_count") + F("total_flaky_fail_count")
160157
),
161158
failure_rate=Case(
162159
When(

apps/worker/services/test_analytics/ta_cache_rollups.py

Lines changed: 0 additions & 114 deletions
This file was deleted.

apps/worker/services/test_analytics/ta_finish_upload.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
TestResultsNotifier,
3535
should_do_flaky_detection,
3636
)
37-
from shared.celery_config import cache_test_rollups_task_name, process_flakes_task_name
37+
from shared.celery_config import process_flakes_task_name
3838
from shared.django_apps.reports.models import ReportSession, UploadError
3939
from shared.helpers.redis import get_redis_connection
4040
from shared.reports.types import UploadType
@@ -130,16 +130,6 @@ def queue_followup_tasks(
130130
},
131131
)
132132

133-
if commit.branch is not None:
134-
celery_app.send_task(
135-
cache_test_rollups_task_name,
136-
kwargs={
137-
"repo_id": repo.repoid,
138-
"branch": commit.branch,
139-
"impl_type": impl_type,
140-
},
141-
)
142-
143133

144134
@sentry_sdk.trace
145135
def new_impl(

apps/worker/services/test_analytics/ta_timeseries.py

Lines changed: 1 addition & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

3-
from dataclasses import dataclass
4-
from datetime import datetime, timedelta
3+
from datetime import datetime
54
from typing import TypedDict
65

76
import test_results_parser
@@ -11,14 +10,10 @@
1110
from services.test_results import FlakeInfo
1211
from shared.django_apps.ta_timeseries.models import (
1312
Testrun,
14-
TestrunBranchSummary,
15-
TestrunSummary,
1613
calc_test_id,
1714
)
1815
from shared.django_apps.test_analytics.models import Flake
1916

20-
LOWER_BOUND_NUM_DAYS = 60
21-
2217

2318
def get_flaky_tests_set(repo_id: int) -> set[bytes]:
2419
return {
@@ -177,92 +172,3 @@ def update_testrun_to_flaky(timestamp: datetime, test_id: bytes):
177172
"UPDATE ta_timeseries_testrun SET outcome = %s WHERE timestamp = %s AND test_id = %s",
178173
["flaky_fail", timestamp, test_id],
179174
)
180-
181-
182-
def timestamp_lower_bound():
183-
return datetime.now() - timedelta(days=LOWER_BOUND_NUM_DAYS)
184-
185-
186-
def get_summary(repo_id: int) -> list[TestrunSummary]:
187-
return list(
188-
TestrunSummary.objects.filter(
189-
repo_id=repo_id, timestamp_bin__gte=timestamp_lower_bound()
190-
)
191-
)
192-
193-
194-
def get_branch_summary(repo_id: int, branch: str) -> list[TestrunBranchSummary]:
195-
return list(
196-
TestrunBranchSummary.objects.filter(
197-
repo_id=repo_id, branch=branch, timestamp_bin__gte=timestamp_lower_bound()
198-
)
199-
)
200-
201-
202-
@dataclass
203-
class BranchSummary:
204-
testsuite: str
205-
classname: str
206-
name: str
207-
timestamp_bin: datetime
208-
computed_name: str
209-
failing_commits: int
210-
last_duration_seconds: float
211-
avg_duration_seconds: float
212-
pass_count: int
213-
fail_count: int
214-
skip_count: int
215-
flaky_fail_count: int
216-
updated_at: datetime
217-
flags: list[str]
218-
219-
220-
def get_testrun_branch_summary_via_testrun(
221-
repo_id: int, branch: str
222-
) -> list[BranchSummary]:
223-
with connections["ta_timeseries"].cursor() as cursor:
224-
cursor.execute(
225-
"""
226-
select
227-
testsuite,
228-
classname,
229-
name,
230-
time_bucket(interval '1 days', timestamp) as timestamp_bin,
231-
232-
min(computed_name) as computed_name,
233-
COUNT(DISTINCT CASE WHEN outcome = 'failure' OR outcome = 'flaky_fail' THEN commit_sha ELSE NULL END) AS failing_commits,
234-
last(duration_seconds, timestamp) as last_duration_seconds,
235-
avg(duration_seconds) as avg_duration_seconds,
236-
COUNT(*) FILTER (WHERE outcome = 'pass') AS pass_count,
237-
COUNT(*) FILTER (WHERE outcome = 'failure') AS fail_count,
238-
COUNT(*) FILTER (WHERE outcome = 'skip') AS skip_count,
239-
COUNT(*) FILTER (WHERE outcome = 'flaky_fail') AS flaky_fail_count,
240-
MAX(timestamp) AS updated_at,
241-
array_merge_dedup_agg(flags) as flags
242-
from ta_timeseries_testrun
243-
where repo_id = %s and branch = %s and timestamp > %s
244-
group by
245-
testsuite, classname, name, timestamp_bin;
246-
""",
247-
[repo_id, branch, timestamp_lower_bound()],
248-
)
249-
250-
return [
251-
BranchSummary(
252-
testsuite=row[0],
253-
classname=row[1],
254-
name=row[2],
255-
timestamp_bin=row[3],
256-
computed_name=row[4],
257-
failing_commits=row[5],
258-
last_duration_seconds=row[6],
259-
avg_duration_seconds=row[7],
260-
pass_count=row[8],
261-
fail_count=row[9],
262-
skip_count=row[10],
263-
flaky_fail_count=row[11],
264-
updated_at=row[12],
265-
flags=row[13] or [],
266-
)
267-
for row in cursor.fetchall()
268-
]

0 commit comments

Comments
 (0)