From 476cbb8d161adc1b60ab0b6c11a1c841e9d369db Mon Sep 17 00:00:00 2001 From: jtrzupek Date: Fri, 19 Jun 2026 11:09:06 +0200 Subject: [PATCH] =?UTF-8?q?fix(ingest):=20race-safe=20scene=5Ftags=20inser?= =?UTF-8?q?t=20(ON=20CONFLICT)=20=E2=80=94=20GOON-M?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scene_resolver._sync_tags used check-then-insert (select existing -> add if None), which races under concurrent ingest of the same scene: two runs both see existing=None, both add, flush -> IntegrityError pk_scene_tags (Sentry GOON-M, 4 events). Switched to pg_insert(...).on_conflict_do_nothing(index_elements=[scene_id, tag_id]) + in-batch dedup, identical to movie_resolver._sync_tags. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/resolve/scene_resolver.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/app/resolve/scene_resolver.py b/app/resolve/scene_resolver.py index a5a008b..282eef3 100644 --- a/app/resolve/scene_resolver.py +++ b/app/resolve/scene_resolver.py @@ -584,18 +584,24 @@ def _sync_tags( norm: NormalizedScene, source_id: uuid.UUID, ) -> None: + # PostgreSQL INSERT ... ON CONFLICT DO NOTHING — race-safe. Wcześniejsze + # check-then-insert ścigało się przy równoległym ingescie tej samej sceny + # (dwa runy widziały existing=None → oba add → IntegrityError pk_scene_tags, + # GOON-M). Analogicznie do movie_resolver._sync_tags. + from sqlalchemy.dialects.postgresql import insert as pg_insert + + seen_tag_ids: set[uuid.UUID] = set() for t_norm in norm.tags: tag = resolve_tag(session, norm=t_norm) - if tag is None: + if tag is None or tag.id in seen_tag_ids: continue - existing = session.execute( - select(SceneTag).where( - SceneTag.scene_id == scene_id, - SceneTag.tag_id == tag.id, - ) - ).scalar_one_or_none() - if existing is None: - session.add(SceneTag(scene_id=scene_id, tag_id=tag.id, source_id=source_id)) + seen_tag_ids.add(tag.id) + stmt = ( + pg_insert(SceneTag.__table__) + .values(scene_id=scene_id, tag_id=tag.id, source_id=source_id) + .on_conflict_do_nothing(index_elements=["scene_id", "tag_id"]) + ) + session.execute(stmt) def _sync_fingerprints(