fix(ingest): race-safe scene_tags insert (ON CONFLICT) — GOON-M

scene_resolver._sync_tags used check-then-insert (select existing -> add if None), which races under concurrent ingest of the same scene: two runs both see existing=None, both add, flush -> IntegrityError pk_scene_tags (Sentry GOON-M, 4 events). Switched to pg_insert(...).on_conflict_do_nothing(index_elements=[scene_id, tag_id]) + in-batch dedup, identical to movie_resolver._sync_tags.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
jtrzupek 2026-06-19 11:09:06 +02:00
parent 567a8fb3b5
commit 476cbb8d16

View file

@ -584,18 +584,24 @@ def _sync_tags(
norm: NormalizedScene, norm: NormalizedScene,
source_id: uuid.UUID, source_id: uuid.UUID,
) -> None: ) -> None:
# PostgreSQL INSERT ... ON CONFLICT DO NOTHING — race-safe. Wcześniejsze
# check-then-insert ścigało się przy równoległym ingescie tej samej sceny
# (dwa runy widziały existing=None → oba add → IntegrityError pk_scene_tags,
# GOON-M). Analogicznie do movie_resolver._sync_tags.
from sqlalchemy.dialects.postgresql import insert as pg_insert
seen_tag_ids: set[uuid.UUID] = set()
for t_norm in norm.tags: for t_norm in norm.tags:
tag = resolve_tag(session, norm=t_norm) tag = resolve_tag(session, norm=t_norm)
if tag is None: if tag is None or tag.id in seen_tag_ids:
continue continue
existing = session.execute( seen_tag_ids.add(tag.id)
select(SceneTag).where( stmt = (
SceneTag.scene_id == scene_id, pg_insert(SceneTag.__table__)
SceneTag.tag_id == tag.id, .values(scene_id=scene_id, tag_id=tag.id, source_id=source_id)
) .on_conflict_do_nothing(index_elements=["scene_id", "tag_id"])
).scalar_one_or_none() )
if existing is None: session.execute(stmt)
session.add(SceneTag(scene_id=scene_id, tag_id=tag.id, source_id=source_id))
def _sync_fingerprints( def _sync_fingerprints(