Skip to content

Commit e22b34e

Browse files
committed
Add self-referential FK support and additional tests
- Fix transform_sql to update other_column for self-referential FKs when the referenced column is being renamed - Skip FK validation during transform when update_incoming_fks=True since self-referential FKs temporarily reference non-existent columns - Add tests for multiple tables referencing renamed column - Add test for self-referential FK handling
1 parent 23bc932 commit e22b34e

2 files changed

Lines changed: 128 additions & 13 deletions

File tree

sqlite_utils/db.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1944,19 +1944,29 @@ def transform(
19441944
finally:
19451945
self.db._skip_fk_validation = False
19461946

1947-
sqls = self.transform_sql(
1948-
types=types,
1949-
rename=rename,
1950-
drop=drop,
1951-
pk=pk,
1952-
not_null=not_null,
1953-
defaults=defaults,
1954-
drop_foreign_keys=drop_foreign_keys,
1955-
add_foreign_keys=add_foreign_keys,
1956-
foreign_keys=foreign_keys,
1957-
column_order=column_order,
1958-
keep_table=keep_table,
1959-
)
1947+
# Skip FK validation for main transform if update_incoming_fks is True
1948+
# because self-referential FKs will reference the new column name
1949+
# that only exists after the transform completes
1950+
if update_incoming_fks and rename:
1951+
self.db._skip_fk_validation = True
1952+
try:
1953+
sqls = self.transform_sql(
1954+
types=types,
1955+
rename=rename,
1956+
drop=drop,
1957+
pk=pk,
1958+
not_null=not_null,
1959+
defaults=defaults,
1960+
drop_foreign_keys=drop_foreign_keys,
1961+
add_foreign_keys=add_foreign_keys,
1962+
foreign_keys=foreign_keys,
1963+
column_order=column_order,
1964+
keep_table=keep_table,
1965+
)
1966+
finally:
1967+
if update_incoming_fks and rename:
1968+
self.db._skip_fk_validation = False
1969+
19601970
pragma_foreign_keys_was_on = self.db.execute("PRAGMA foreign_keys").fetchone()[
19611971
0
19621972
]
@@ -2034,6 +2044,9 @@ def transform_sql(
20342044
for table, column, other_table, other_column in self.foreign_keys:
20352045
# Copy over old foreign keys, unless we are dropping them
20362046
if (drop_foreign_keys is None) or (column not in drop_foreign_keys):
2047+
# For self-referential FKs, also update the referenced column if renamed
2048+
if other_table == self.name:
2049+
other_column = rename.get(other_column) or other_column
20372050
create_table_foreign_keys.append(
20382051
ForeignKey(
20392052
table,

tests/test_transform.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,3 +708,105 @@ def test_transform_update_incoming_fks_on_column_rename(fresh_db):
708708
assert fresh_db.execute("PRAGMA foreign_keys").fetchone()[0] == 1
709709
violations = list(fresh_db.execute("PRAGMA foreign_key_check").fetchall())
710710
assert violations == []
711+
712+
713+
def test_transform_update_incoming_fks_multiple_tables(fresh_db):
714+
"""
715+
Test that update_incoming_fks=True updates FK constraints in multiple tables
716+
when a referenced column is renamed.
717+
"""
718+
fresh_db.execute("PRAGMA foreign_keys=ON")
719+
720+
# Create authors table with id as PK
721+
fresh_db["authors"].insert({"id": 1, "name": "Alice"}, pk="id")
722+
723+
# Create multiple tables with FKs to authors.id
724+
fresh_db["books"].insert(
725+
{"id": 1, "title": "Book A", "author_id": 1},
726+
pk="id",
727+
foreign_keys=[("author_id", "authors", "id")],
728+
)
729+
fresh_db["articles"].insert(
730+
{"id": 1, "headline": "Article A", "writer_id": 1},
731+
pk="id",
732+
foreign_keys=[("writer_id", "authors", "id")],
733+
)
734+
fresh_db["quotes"].insert(
735+
{"id": 1, "text": "Quote A", "speaker_id": 1},
736+
pk="id",
737+
foreign_keys=[("speaker_id", "authors", "id")],
738+
)
739+
740+
# Rename authors.id to authors.author_pk with update_incoming_fks=True
741+
fresh_db["authors"].transform(
742+
rename={"id": "author_pk"},
743+
update_incoming_fks=True,
744+
)
745+
746+
# Verify authors column was renamed
747+
assert "author_pk" in fresh_db["authors"].columns_dict
748+
assert "id" not in fresh_db["authors"].columns_dict
749+
750+
# Verify all FKs were updated
751+
assert fresh_db["books"].foreign_keys == [
752+
ForeignKey(table="books", column="author_id", other_table="authors", other_column="author_pk")
753+
]
754+
assert fresh_db["articles"].foreign_keys == [
755+
ForeignKey(table="articles", column="writer_id", other_table="authors", other_column="author_pk")
756+
]
757+
assert fresh_db["quotes"].foreign_keys == [
758+
ForeignKey(table="quotes", column="speaker_id", other_table="authors", other_column="author_pk")
759+
]
760+
761+
# Verify FK enforcement still works
762+
violations = list(fresh_db.execute("PRAGMA foreign_key_check").fetchall())
763+
assert violations == []
764+
765+
766+
def test_transform_update_incoming_fks_self_referential(fresh_db):
767+
"""
768+
Test that update_incoming_fks=True handles self-referential FK constraints.
769+
"""
770+
fresh_db.execute("PRAGMA foreign_keys=ON")
771+
772+
# Create employees table with self-referential FK (manager_id -> id)
773+
fresh_db.execute("""
774+
CREATE TABLE employees (
775+
id INTEGER PRIMARY KEY,
776+
name TEXT,
777+
manager_id INTEGER REFERENCES employees(id)
778+
)
779+
""")
780+
fresh_db["employees"].insert_all([
781+
{"id": 1, "name": "CEO", "manager_id": None},
782+
{"id": 2, "name": "VP", "manager_id": 1},
783+
{"id": 3, "name": "Dev", "manager_id": 2},
784+
])
785+
786+
# Verify initial FK
787+
assert fresh_db["employees"].foreign_keys == [
788+
ForeignKey(table="employees", column="manager_id", other_table="employees", other_column="id")
789+
]
790+
791+
# Rename employees.id to employees.emp_id with update_incoming_fks=True
792+
fresh_db["employees"].transform(
793+
rename={"id": "emp_id"},
794+
update_incoming_fks=True,
795+
)
796+
797+
# Verify column was renamed
798+
assert "emp_id" in fresh_db["employees"].columns_dict
799+
assert "id" not in fresh_db["employees"].columns_dict
800+
801+
# Verify self-referential FK was updated
802+
assert fresh_db["employees"].foreign_keys == [
803+
ForeignKey(table="employees", column="manager_id", other_table="employees", other_column="emp_id")
804+
]
805+
806+
# Verify data integrity
807+
rows = list(fresh_db.execute("SELECT * FROM employees ORDER BY emp_id").fetchall())
808+
assert rows == [(1, "CEO", None), (2, "VP", 1), (3, "Dev", 2)]
809+
810+
# Verify FK enforcement still works
811+
violations = list(fresh_db.execute("PRAGMA foreign_key_check").fetchall())
812+
assert violations == []

0 commit comments

Comments
 (0)