Skip to content

Commit 23bc932

Browse files
committed
Add update_incoming_fks parameter to transform()
When renaming columns that are referenced by foreign keys in other tables, the transform() method now accepts update_incoming_fks=True to automatically update those FK constraints. Implementation: - Added _get_incoming_fks_needing_update() helper to find tables with FKs pointing to renamed columns - Modified transform() to collect SQL for updating incoming FKs - Execute main transform first (so new column exists), then update incoming FKs, all within the same transaction - Added _skip_fk_validation flag to allow generating SQL for FKs that reference columns that don't exist yet (will exist after main transform completes) Includes test for basic column rename case with FK enforcement ON.
1 parent 6944ec2 commit 23bc932

2 files changed

Lines changed: 118 additions & 9 deletions

File tree

sqlite_utils/db.py

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -974,15 +974,17 @@ def sort_key(p):
974974
column_items.insert(0, (hash_id, str))
975975
pk = hash_id
976976
# Soundness check foreign_keys point to existing tables
977-
for fk in foreign_keys:
978-
if fk.other_table == name and columns.get(fk.other_column):
979-
continue
980-
if fk.other_column != "rowid" and not any(
981-
c for c in self[fk.other_table].columns if c.name == fk.other_column
982-
):
983-
raise AlterError(
984-
"No such column: {}.{}".format(fk.other_table, fk.other_column)
985-
)
977+
# (can be skipped for internal operations like update_incoming_fks)
978+
if not getattr(self, "_skip_fk_validation", False):
979+
for fk in foreign_keys:
980+
if fk.other_table == name and columns.get(fk.other_column):
981+
continue
982+
if fk.other_column != "rowid" and not any(
983+
c for c in self[fk.other_table].columns if c.name == fk.other_column
984+
):
985+
raise AlterError(
986+
"No such column: {}.{}".format(fk.other_table, fk.other_column)
987+
)
986988

987989
column_defs = []
988990
# ensure pk is a tuple
@@ -1850,6 +1852,42 @@ def duplicate(self, new_name: str) -> "Table":
18501852
self.db.execute(sql)
18511853
return self.db.table(new_name)
18521854

1855+
def _get_incoming_fks_needing_update(self, rename: dict) -> list:
1856+
"""
1857+
Find all tables with FK constraints pointing to columns being renamed.
1858+
1859+
Returns a list of (table_name, new_fks) tuples where new_fks is the
1860+
updated list of foreign keys for that table.
1861+
1862+
:param rename: Dictionary mapping old column names to new column names
1863+
"""
1864+
tables_needing_update = []
1865+
1866+
for other_table_name in self.db.table_names():
1867+
if other_table_name == self.name:
1868+
continue
1869+
1870+
other_table = self.db[other_table_name]
1871+
other_fks = other_table.foreign_keys
1872+
1873+
# Check if any FK references a column being renamed
1874+
needs_update = False
1875+
new_fks = []
1876+
for fk in other_fks:
1877+
if fk.other_table == self.name and fk.other_column in rename:
1878+
# This FK needs updating
1879+
needs_update = True
1880+
new_fks.append(
1881+
(fk.column, fk.other_table, rename[fk.other_column])
1882+
)
1883+
else:
1884+
new_fks.append((fk.column, fk.other_table, fk.other_column))
1885+
1886+
if needs_update:
1887+
tables_needing_update.append((other_table_name, new_fks))
1888+
1889+
return tables_needing_update
1890+
18531891
def transform(
18541892
self,
18551893
*,
@@ -1864,6 +1902,7 @@ def transform(
18641902
foreign_keys: Optional[ForeignKeysType] = None,
18651903
column_order: Optional[List[str]] = None,
18661904
keep_table: Optional[str] = None,
1905+
update_incoming_fks: bool = False,
18671906
) -> "Table":
18681907
"""
18691908
Apply an advanced alter table, including operations that are not supported by
@@ -1884,8 +1923,27 @@ def transform(
18841923
to use when creating the table
18851924
:param keep_table: If specified, the existing table will be renamed to this and will not be
18861925
dropped
1926+
:param update_incoming_fks: If True, automatically update foreign key constraints in other
1927+
tables that reference columns being renamed in this table
18871928
"""
18881929
assert self.exists(), "Cannot transform a table that doesn't exist yet"
1930+
1931+
# Collect SQL for updating incoming FKs if needed
1932+
incoming_fk_sqls = []
1933+
if update_incoming_fks and rename:
1934+
tables_needing_update = self._get_incoming_fks_needing_update(rename)
1935+
for other_table_name, new_fks in tables_needing_update:
1936+
other_table = self.db[other_table_name]
1937+
# Generate transform SQL for the other table with updated FKs
1938+
# Skip FK validation since the new column doesn't exist yet
1939+
try:
1940+
self.db._skip_fk_validation = True
1941+
incoming_fk_sqls.extend(
1942+
other_table.transform_sql(foreign_keys=new_fks)
1943+
)
1944+
finally:
1945+
self.db._skip_fk_validation = False
1946+
18891947
sqls = self.transform_sql(
18901948
types=types,
18911949
rename=rename,
@@ -1906,8 +1964,12 @@ def transform(
19061964
if pragma_foreign_keys_was_on:
19071965
self.db.execute("PRAGMA foreign_keys=0;")
19081966
with self.db.conn:
1967+
# First: transform the main table (so renamed columns exist)
19091968
for sql in sqls:
19101969
self.db.execute(sql)
1970+
# Then: update incoming FKs in other tables
1971+
for sql in incoming_fk_sqls:
1972+
self.db.execute(sql)
19111973
# Run the foreign_key_check before we commit
19121974
if pragma_foreign_keys_was_on:
19131975
self.db.execute("PRAGMA foreign_key_check;")

tests/test_transform.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,3 +661,50 @@ def test_transform_with_unique_constraint_implicit_index(fresh_db):
661661
"You must manually drop this index prior to running this transformation and manually recreate the new index after running this transformation."
662662
in str(excinfo.value)
663663
)
664+
665+
666+
def test_transform_update_incoming_fks_on_column_rename(fresh_db):
667+
"""
668+
Test that update_incoming_fks=True updates FK constraints in other tables
669+
when a referenced column is renamed.
670+
"""
671+
fresh_db.execute("PRAGMA foreign_keys=ON")
672+
673+
# Create authors table with id as PK
674+
fresh_db["authors"].insert({"id": 1, "name": "Alice"}, pk="id")
675+
676+
# Create books table with FK to authors.id
677+
fresh_db["books"].insert(
678+
{"id": 1, "title": "Book A", "author_id": 1},
679+
pk="id",
680+
foreign_keys=[("author_id", "authors", "id")],
681+
)
682+
683+
# Verify initial FK
684+
assert fresh_db["books"].foreign_keys == [
685+
ForeignKey(table="books", column="author_id", other_table="authors", other_column="id")
686+
]
687+
688+
# Rename authors.id to authors.author_pk with update_incoming_fks=True
689+
fresh_db["authors"].transform(
690+
rename={"id": "author_pk"},
691+
update_incoming_fks=True,
692+
)
693+
694+
# Verify authors column was renamed
695+
assert "author_pk" in fresh_db["authors"].columns_dict
696+
assert "id" not in fresh_db["authors"].columns_dict
697+
698+
# Verify books FK was updated to point to new column name
699+
assert fresh_db["books"].foreign_keys == [
700+
ForeignKey(table="books", column="author_id", other_table="authors", other_column="author_pk")
701+
]
702+
703+
# Verify data integrity
704+
assert list(fresh_db["authors"].rows) == [{"author_pk": 1, "name": "Alice"}]
705+
assert list(fresh_db["books"].rows) == [{"id": 1, "title": "Book A", "author_id": 1}]
706+
707+
# Verify FK enforcement still works
708+
assert fresh_db.execute("PRAGMA foreign_keys").fetchone()[0] == 1
709+
violations = list(fresh_db.execute("PRAGMA foreign_key_check").fetchall())
710+
assert violations == []

0 commit comments

Comments
 (0)