Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 160 additions & 28 deletions treeherder/model/data_cycling/removal_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,44 @@ def max_timestamp(self):
return self._max_timestamp

def remove(self, using: CursorWrapper):
"""
Raw SQL is used here to avoid Django ORM cascade deletion, which can cause
database overload on large related tables (e.g. performance_datum_replicate).
"""
chunk_size = self._find_ideal_chunk_size()
deleted, _ = PerformanceDatum.objects.filter(
id__in=PerformanceDatum.objects.filter(
push_timestamp__lte=self._max_timestamp
).values_list("id")[:chunk_size]
).delete()
using.rowcount = deleted
using.execute(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments somewhere around the execute call to provide an explanation about what each part of the query does? We should do the same for all the other queries.

"""
WITH target_datum AS (
SELECT pd.id, pd.push_timestamp
FROM performance_datum pd
WHERE pd.push_timestamp <= %s
ORDER BY pd.push_timestamp
LIMIT %s
),
del_replicate AS (
DELETE FROM performance_datum_replicate r1
WHERE r1.performance_datum_id IN (
SELECT td.id
FROM target_datum td
WHERE td.push_timestamp <= %s
AND EXISTS (
SELECT 1
FROM performance_datum_replicate r2
WHERE r2.performance_datum_id = td.id
)
)
),
del_multi AS (
DELETE FROM perf_multicommitdatum pm
USING target_datum td
WHERE pm.perf_datum_id = td.id
)
DELETE FROM performance_datum pd
USING target_datum td
WHERE pd.id = td.id
""",
[self._max_timestamp, chunk_size, self._max_timestamp],
)

@property
def name(self) -> str:
Expand Down Expand Up @@ -175,14 +206,50 @@ def name(self) -> str:
return "try data removal strategy"

def __attempt_remove(self, using):
deleted, _ = PerformanceDatum.objects.filter(
id__in=PerformanceDatum.objects.filter(
repository_id=self.try_repo,
push_timestamp__lte=self._max_timestamp,
signature_id__in=self.target_signatures,
).values_list("id")[: self._chunk_size]
).delete()
using.rowcount = deleted
using.execute(
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use f-strings for these instead of %s? I think that would make the raw queries a bit more robust.

WITH target_datum AS (
SELECT pd.id, pd.repository_id, pd.push_timestamp, pd.signature_id
FROM performance_datum pd
WHERE pd.repository_id = %s
AND pd.push_timestamp <= %s
AND pd.signature_id = ANY(%s)
LIMIT %s
),
del_replicate AS (
DELETE FROM performance_datum_replicate r1
WHERE r1.performance_datum_id IN (
SELECT td.id
FROM target_datum td
WHERE td.repository_id = %s
AND td.push_timestamp <= %s
AND td.signature_id = ANY(%s)
AND EXISTS (
SELECT 1
FROM performance_datum_replicate r2
WHERE r2.performance_datum_id = td.id
)
)
),
del_multi AS (
DELETE FROM perf_multicommitdatum pm
USING target_datum td
WHERE pm.perf_datum_id = td.id
)
DELETE FROM performance_datum pd
USING target_datum td
WHERE pd.id = td.id
""",
[
self.try_repo,
self._max_timestamp,
list(self.target_signatures),
self._chunk_size,
self.try_repo,
self._max_timestamp,
list(self.target_signatures),
],
)

def __lookup_new_signature(self):
self.__target_signatures = self.__try_signatures[: self.SIGNATURE_BULK_SIZE]
Expand Down Expand Up @@ -246,12 +313,41 @@ def name(self) -> str:

def remove(self, using: CursorWrapper):
chunk_size = self._find_ideal_chunk_size()
deleted, _ = PerformanceDatum.objects.filter(
id__in=PerformanceDatum.objects.filter(
repository_id=self.irrelevant_repo, push_timestamp__lte=self._max_timestamp
).values_list("id")[:chunk_size]
).delete()
using.rowcount = deleted
repository_id = self.irrelevant_repo
using.execute(
"""
WITH target_datum AS (
SELECT pd.id, pd.repository_id, pd.push_timestamp
FROM performance_datum pd
WHERE pd.repository_id = %s
AND pd.push_timestamp <= %s
LIMIT %s
),
del_replicate AS (
DELETE FROM performance_datum_replicate r1
WHERE r1.performance_datum_id IN (
SELECT td.id
FROM target_datum td
WHERE td.repository_id = %s
AND td.push_timestamp <= %s
AND EXISTS (
SELECT 1
FROM performance_datum_replicate r2
WHERE r2.performance_datum_id = td.id
)
)
),
del_multi AS (
DELETE FROM perf_multicommitdatum pm
USING target_datum td
WHERE pm.perf_datum_id = td.id
)
DELETE FROM performance_datum pd
USING target_datum td
WHERE pd.id = td.id
""",
[repository_id, self._max_timestamp, chunk_size, repository_id, self._max_timestamp],
)

def _find_ideal_chunk_size(self) -> int:
max_id_of_non_expired_row = (
Expand Down Expand Up @@ -343,14 +439,50 @@ def name(self) -> str:
return "stalled data removal strategy"

def __attempt_remove(self, using: CursorWrapper):
deleted, _ = PerformanceDatum.objects.filter(
id__in=PerformanceDatum.objects.filter(
repository_id=self.target_signature.repository_id,
signature_id=self.target_signature.id,
push_timestamp__lte=self._max_timestamp,
).values_list("id")[: self._chunk_size]
).delete()
using.rowcount = deleted
using.execute(
"""
WITH target_datum AS (
SELECT pd.id, pd.repository_id, pd.signature_id, pd.push_timestamp
FROM performance_datum pd
WHERE pd.repository_id = %s
AND pd.signature_id = %s
AND pd.push_timestamp <= %s
LIMIT %s
),
del_replicate AS (
DELETE FROM performance_datum_replicate r1
WHERE r1.performance_datum_id IN (
SELECT td.id
FROM target_datum td
WHERE td.repository_id = %s
AND td.signature_id = %s
AND td.push_timestamp <= %s
AND EXISTS (
SELECT 1
FROM performance_datum_replicate r2
WHERE r2.performance_datum_id = td.id
)
)
),
del_multi AS (
DELETE FROM perf_multicommitdatum pm
USING target_datum td
WHERE pm.perf_datum_id = td.id
)
DELETE FROM performance_datum pd
USING target_datum td
WHERE pd.id = td.id
""",
[
self.target_signature.repository_id,
self.target_signature.id,
self._max_timestamp,
self._chunk_size,
self.target_signature.repository_id,
self.target_signature.id,
self._max_timestamp,
],
)

def __lookup_new_signature(self):
try:
Expand Down