Skip to content

Commit dbb693e

Browse files
authored
Merge pull request #668 from splitgraph/feature/writeable-lq-splitfiles
Add support for writeable LQ to Splitfiles
2 parents 36fd830 + 3665bc9 commit dbb693e

6 files changed

Lines changed: 113 additions & 37 deletions

File tree

splitgraph/commandline/cloud.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -968,7 +968,9 @@ def upload_c(remote, file_format, repository, files):
968968

969969
wait_for_load(client, repository.namespace, repository.repository, task_id)
970970

971-
web_url = _construct_repo_url(gql_endpoint=client.endpoint, full_repo=repository) + "/-/tables"
971+
web_url = (
972+
_construct_repo_url(gql_endpoint=client.endpoint, full_repo=repository) + "/latest/-/tables"
973+
)
972974
click.echo()
973975
click.echo(
974976
"Success. See the repository at " + Color.BLUE + web_url + Color.END + " or query it with:"

splitgraph/commandline/splitfile.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@
2121
@click.option(
2222
"-o", "--output-repository", help="Repository to store the result in.", type=RepositoryType()
2323
)
24-
def build_c(splitfile, args, output_repository):
24+
@click.option(
25+
"-l",
26+
"--layered-querying",
27+
help="Use writeable layered querying when checking images out. Experimental.",
28+
is_flag=True,
29+
)
30+
def build_c(splitfile, args, output_repository, layered_querying):
2531
"""
2632
Build Splitgraph images.
2733
@@ -54,7 +60,9 @@ def build_c(splitfile, args, output_repository):
5460
file_name = os.path.splitext(os.path.basename(splitfile.name))[0]
5561
output_repository = Repository.from_schema(file_name)
5662

57-
execute_commands(splitfile.read(), args, output=output_repository)
63+
execute_commands(
64+
splitfile.read(), args, output=output_repository, use_writeable_lq=layered_querying
65+
)
5866

5967

6068
@click.command(name="provenance")
@@ -205,7 +213,13 @@ def dependents_c(image_spec, source_on, dependents_on):
205213
help="Images to substitute into the reconstructed Splitfile, of the form"
206214
" [NAMESPACE/]REPOSITORY[:HASH_OR_TAG]. Default tag is 'latest'.",
207215
)
208-
def rebuild_c(image_spec, update, against):
216+
@click.option(
217+
"-l",
218+
"--layered-querying",
219+
help="Use writeable layered querying when checking images out. Experimental.",
220+
is_flag=True,
221+
)
222+
def rebuild_c(image_spec, update, against, layered_querying):
209223
"""
210224
Rebuild images against different dependencies.
211225
@@ -244,4 +258,4 @@ def rebuild_c(image_spec, update, against):
244258

245259
from splitgraph.splitfile.execution import rebuild_image
246260

247-
rebuild_image(image, new_images)
261+
rebuild_image(image, new_images, use_writeable_lq=layered_querying)

splitgraph/splitfile/execution.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@ def _combine_hashes(hashes: List[str]) -> str:
3838
return sha256("".join(hashes).encode("ascii")).hexdigest()
3939

4040

41-
def _checkout_or_calculate_layer(output: Repository, image_hash: str, calc_func: Callable) -> None:
41+
def _checkout_or_calculate_layer(
42+
output: Repository, image_hash: str, calc_func: Callable, use_writeable_lq: bool = False
43+
) -> None:
4244
# Future optimization here: don't actually check the layer out if it exists -- only do it at Splitfile execution
4345
# end or when a command needs it.
4446

4547
# Have we already calculated this hash?
4648
try:
47-
output.images.by_hash(image_hash).checkout()
49+
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
4850
logging.info(" ---> Using cache")
4951
except ImageNotFoundError:
5052
try:
@@ -64,6 +66,7 @@ def execute_commands(
6466
params: Optional[Dict[str, str]] = None,
6567
output: Optional[Repository] = None,
6668
output_base: str = "0" * 32,
69+
use_writeable_lq: bool = False,
6770
) -> None:
6871
"""
6972
Executes a series of Splitfile commands.
@@ -74,6 +77,7 @@ def execute_commands(
7477
:param output: Output repository to execute the Splitfile against.
7578
:param output_base: If not None, a revision that gets checked out for all Splitfile actions to be committed
7679
on top of it.
80+
:param use_writeable_lq: Use writeable LQ to execute commands
7781
"""
7882
if params is None:
7983
params = {}
@@ -152,16 +156,18 @@ def _initialize_output(output):
152156
raise
153157

154158

155-
def checkout_if_changed(repository: Repository, image_hash: str) -> None:
159+
def checkout_if_changed(
160+
repository: Repository, image_hash: str, use_writeable_lq: bool = False
161+
) -> None:
156162
if (
157163
repository.head is None
158164
or (repository.head.image_hash != image_hash)
159165
or repository.has_pending_changes()
160166
):
161-
repository.images.by_hash(image_hash).checkout()
167+
repository.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
162168
else:
163169
logging.info(
164-
"Skipping checkout of %s as %s has it checked out " "and there have been no changes",
170+
"Skipping checkout of %s as %s has it checked out and there have been no changes",
165171
image_hash,
166172
repository,
167173
)
@@ -379,7 +385,9 @@ def _calc():
379385
source_mountpoint.delete()
380386

381387

382-
def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:
388+
def _execute_custom(
389+
node: Node, output: Repository, use_writeable_lq: bool = False
390+
) -> ProvenanceLine:
383391
assert output.head is not None
384392
command, args = parse_custom_command(node)
385393

@@ -410,7 +418,7 @@ def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:
410418
if command_hash is not None:
411419
image_hash = _combine_hashes([output_head, command_hash])
412420
try:
413-
output.images.by_hash(image_hash).checkout()
421+
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
414422
logging.info(" ---> Using cache")
415423
return {"type": "CUSTOM"}
416424
except ImageNotFoundError:
@@ -425,14 +433,16 @@ def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:
425433

426434
# Check just in case if the new hash produced by the command already exists.
427435
try:
428-
output.images.by_hash(image_hash).checkout()
436+
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
429437
except ImageNotFoundError:
430438
# Full command as a commit comment
431439
output.commit(image_hash, comment=node.text)
432440
return {"type": "CUSTOM"}
433441

434442

435-
def rebuild_image(image: Image, source_replacement: Dict[Repository, str]) -> None:
443+
def rebuild_image(
444+
image: Image, source_replacement: Dict[Repository, str], use_writeable_lq: bool = False
445+
) -> None:
436446
"""
437447
Recreates the Splitfile used to create a given image and reruns it, replacing its dependencies with a different
438448
set of versions.
@@ -444,4 +454,6 @@ def rebuild_image(image: Image, source_replacement: Dict[Repository, str]) -> No
444454
ignore_irreproducible=False, source_replacement=source_replacement
445455
)
446456
# Params are supposed to be stored in the commands already (baked in) -- what if there's sensitive data there?
447-
execute_commands("\n".join(splitfile_commands), output=image.repository)
457+
execute_commands(
458+
"\n".join(splitfile_commands), output=image.repository, use_writeable_lq=use_writeable_lq
459+
)

test/splitgraph/commandline/snapshots/test_cloud_jobs/test_csv_upload/True/sgr_cloud_upload_success.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ Uploading the files...
33
(SUCCESS) Waiting for task ID ingest_task
44

55

6-
Success. See the repository at http://www.example.com/someuser/somerepo_1/-/tables or query it with:
6+
Success. See the repository at http://www.example.com/someuser/somerepo_1/latest/-/tables or query it with:
77
sgr cloud sql 'SELECT * FROM "someuser/somerepo_1"."base_df"'

test/splitgraph/commandline/test_splitfile.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from unittest import mock
33
from unittest.mock import call
44

5+
import pytest
56
from click.testing import CliRunner
67

78
from splitgraph.commandline import build_c, dependents_c, provenance_c, rebuild_c
@@ -17,11 +18,17 @@ def test_splitfile_default():
1718
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "latest"],
1819
)
1920
assert ec.mock_calls == [
20-
call(mock.ANY, {"TAG": "latest"}, output=Repository("", "import_remote_multiple"))
21+
call(
22+
mock.ANY,
23+
{"TAG": "latest"},
24+
output=Repository("", "import_remote_multiple"),
25+
use_writeable_lq=False,
26+
)
2127
]
2228

2329

24-
def test_splitfile(local_engine_empty, pg_repo_remote):
30+
@pytest.mark.parametrize("use_writeable_lq", [True, False])
31+
def test_splitfile(local_engine_empty, pg_repo_remote, use_writeable_lq):
2532
runner = CliRunner()
2633

2734
result = runner.invoke(
@@ -33,7 +40,8 @@ def test_splitfile(local_engine_empty, pg_repo_remote):
3340
"latest",
3441
"-o",
3542
"output",
36-
],
43+
]
44+
+ (["-l"] if use_writeable_lq else []),
3745
)
3846
assert result.exit_code == 0
3947
assert OUTPUT.run_sql("SELECT id, fruit, vegetable FROM join_table") == [
@@ -78,17 +86,22 @@ def test_splitfile(local_engine_empty, pg_repo_remote):
7886
assert "%s:%s" % (OUTPUT, OUTPUT.head.image_hash) in result.output
7987

8088

81-
def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag):
89+
@pytest.mark.parametrize("use_writeable_lq", [True, False])
90+
def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag, use_writeable_lq):
8291
runner = CliRunner()
8392

8493
result = runner.invoke(
8594
build_c,
86-
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "v1", "-o", "output"],
95+
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "v1", "-o", "output"]
96+
+ (["-l"] if use_writeable_lq else []),
8797
)
8898
assert result.exit_code == 0
8999

90100
# Rerun the output:latest against v2 of the test/pg_mount
91-
result = runner.invoke(rebuild_c, ["output:latest", "--against", "test/pg_mount:v2"])
101+
result = runner.invoke(
102+
rebuild_c,
103+
["output:latest", "--against", "test/pg_mount:v2"] + (["-l"] if use_writeable_lq else []),
104+
)
92105
output_v2 = OUTPUT.head
93106
assert result.exit_code == 0
94107
v2 = pg_repo_remote_multitag.images["v2"]
@@ -98,7 +111,10 @@ def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag):
98111
# In this case, this should all resolve to the same version of test/pg_mount (v2) and not produce
99112
# any extra commits.
100113
curr_commits = OUTPUT.images()
101-
result = runner.invoke(rebuild_c, ["output:latest", "-u"])
114+
result = runner.invoke(
115+
rebuild_c,
116+
["output:latest", "-u"] + (["-l"] if use_writeable_lq else []),
117+
)
102118
assert result.exit_code == 0
103119
assert output_v2 == OUTPUT.head
104120
assert OUTPUT.images() == curr_commits

test/splitgraph/splitfile/test_execution.py

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,13 @@ def test_local_import_splitfile(pg_repo_local):
109109
assert not OUTPUT.engine.table_exists(OUTPUT.to_schema(), "fruits")
110110

111111

112-
def test_advanced_splitfile(pg_repo_local):
113-
execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
112+
@pytest.mark.parametrize("use_writeable_lq", [True, False])
113+
def test_advanced_splitfile(pg_repo_local, use_writeable_lq):
114+
execute_commands(
115+
load_splitfile("import_local_multiple_with_queries.splitfile"),
116+
output=OUTPUT,
117+
use_writeable_lq=use_writeable_lq,
118+
)
114119

115120
assert OUTPUT.engine.table_exists(OUTPUT.to_schema(), "my_fruits")
116121
assert OUTPUT.engine.table_exists(OUTPUT.to_schema(), "vegetables")
@@ -128,13 +133,22 @@ def test_advanced_splitfile(pg_repo_local):
128133
assert OUTPUT.run_sql("SELECT * FROM my_fruits") == [(2, "orange")]
129134

130135

131-
def test_splitfile_cached(pg_repo_local):
136+
@pytest.mark.parametrize("use_writeable_lq", [True, False])
137+
def test_splitfile_cached(pg_repo_local, use_writeable_lq):
132138
# Check that no new commits/snaps are created if we rerun the same splitfile
133-
execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
139+
execute_commands(
140+
load_splitfile("import_local_multiple_with_queries.splitfile"),
141+
output=OUTPUT,
142+
use_writeable_lq=use_writeable_lq,
143+
)
134144
images = OUTPUT.images()
135145
assert len(images) == 4
136146

137-
execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
147+
execute_commands(
148+
load_splitfile("import_local_multiple_with_queries.splitfile"),
149+
output=OUTPUT,
150+
use_writeable_lq=use_writeable_lq,
151+
)
138152
new_images = OUTPUT.images()
139153
assert new_images == images
140154

@@ -205,8 +219,14 @@ def test_import_updating_splitfile_with_uploading(
205219

206220

207221
@pytest.mark.mounting
222+
@pytest.mark.parametrize("use_writeable_lq", [True, False])
208223
def test_splitfile_end_to_end_with_uploading(
209-
local_engine_empty, remote_engine, pg_repo_remote_multitag, mg_repo_remote, clean_minio
224+
local_engine_empty,
225+
remote_engine,
226+
pg_repo_remote_multitag,
227+
mg_repo_remote,
228+
clean_minio,
229+
use_writeable_lq,
210230
):
211231
# An end-to-end test:
212232
# * Create a derived dataset from some tables imported from the remote engine
@@ -217,7 +237,10 @@ def test_splitfile_end_to_end_with_uploading(
217237

218238
# Do the same setting up first and run the splitfile against the remote data.
219239
execute_commands(
220-
load_splitfile("import_remote_multiple.splitfile"), params={"TAG": "v1"}, output=OUTPUT
240+
load_splitfile("import_remote_multiple.splitfile"),
241+
params={"TAG": "v1"},
242+
output=OUTPUT,
243+
use_writeable_lq=use_writeable_lq,
221244
)
222245

223246
remote_output = Repository(OUTPUT.namespace, OUTPUT.repository, remote_engine)
@@ -230,22 +253,31 @@ def test_splitfile_end_to_end_with_uploading(
230253
OUTPUT.objects.cleanup()
231254

232255
stage_2 = R("output_stage_2")
233-
execute_commands(load_splitfile("import_from_preuploaded_remote.splitfile"), output=stage_2)
256+
execute_commands(
257+
load_splitfile("import_from_preuploaded_remote.splitfile"),
258+
output=stage_2,
259+
use_writeable_lq=use_writeable_lq,
260+
)
234261

235262
assert stage_2.run_sql("SELECT id, name, fruit, vegetable FROM diet") == [
236263
(2, "James", "orange", "carrot")
237264
]
238265

239266

240267
@pytest.mark.mounting
241-
def test_splitfile_schema_changes(pg_repo_local, mg_repo_local):
242-
execute_commands(load_splitfile("schema_changes.splitfile"), output=OUTPUT)
268+
@pytest.mark.parametrize("use_writeable_lq", [True, False])
269+
def test_splitfile_schema_changes(pg_repo_local, mg_repo_local, use_writeable_lq):
270+
execute_commands(
271+
load_splitfile("schema_changes.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
272+
)
243273
old_output_head = OUTPUT.head
244274

245275
# Then, alter the dataset and rerun the splitfile.
246276
pg_repo_local.run_sql("INSERT INTO fruits VALUES (12, 'mayonnaise')")
247277
pg_repo_local.commit()
248-
execute_commands(load_splitfile("schema_changes.splitfile"), output=OUTPUT)
278+
execute_commands(
279+
load_splitfile("schema_changes.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
280+
)
249281
new_output_head = OUTPUT.head
250282

251283
old_output_head.checkout()
@@ -438,16 +470,16 @@ def test_splitfile_with_external_sql(readonly_pg_repo):
438470

439471

440472
@pytest.mark.registry
441-
def test_splitfile_inline_sql(readonly_pg_repo, pg_repo_local):
473+
@pytest.mark.parametrize("use_writeable_lq", [True, False])
474+
def test_splitfile_inline_sql(readonly_pg_repo, pg_repo_local, use_writeable_lq):
442475
# Test SQL commands accessing repos directly -- join a remote repo with
443476
# some local data.
444477

445478
prepare_lq_repo(pg_repo_local, commit_after_every=False, include_pk=True)
446479
pg_repo_local.head.tag("v2")
447480

448481
execute_commands(
449-
load_splitfile("inline_sql.splitfile"),
450-
output=OUTPUT,
482+
load_splitfile("inline_sql.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
451483
)
452484

453485
new_head = OUTPUT.head

0 commit comments

Comments
 (0)