Skip to content

Commit c372912

Browse files
authored
Merge pull request #634 from splitgraph/aws-athena-data-source-cu-1wged02
Amazon Athena data source
2 parents db1cff7 + 778684d commit c372912

14 files changed

Lines changed: 273 additions & 23 deletions

File tree

.ci/install.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ poetry install -E pandas
1313
python -m venv "$DBT_VENV"
1414
. "$DBT_VENV"/bin/activate
1515
pip install dbt-core==1.0.0 dbt-postgres==1.0.0
16+
pip install --force-reinstall --upgrade markupsafe==2.0.1
1617

1718
# Singer tap integration test
1819
python -m venv "$TAP_MYSQL_VENV"

engine/Dockerfile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ COPY ./engine/src/postgres-elasticsearch-fdw/pg_es_fdw /pg_es_fdw/pg_es_fdw
190190
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
191191
pip install "git+https://github.com/splitgraph/snowflake-sqlalchemy.git@14e64cc0ef7374df0cecc91923ff6901b0d721b7"
192192

193+
# Install PyAthena for Amazon Athena SQLAlchemy-based FDW, as well as pandas
194+
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
195+
pip install "PyAthena>=2.4.1" && \
196+
pip install "pandas>=1.0.0"
197+
193198
ENV PATH "${PATH}:/splitgraph/bin"
194199
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"
195200

engine/Dockerfile.debug

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,19 @@ COPY ./bin /splitgraph/bin
105105
# "Install" elasticsearch_fdw
106106
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
107107
mkdir /pg_es_fdw && \
108-
pip install "elasticsearch>=7.7.0"
108+
pip install "elasticsearch>=7.7.0,<8.0"
109109
COPY ./engine/src/postgres-elasticsearch-fdw/pg_es_fdw /pg_es_fdw/pg_es_fdw
110110

111111
# Install the Snowflake SQLAlchemy connector
112112
# Use our fork that supports server-side cursors
113113
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
114114
pip install "git+https://github.com/splitgraph/snowflake-sqlalchemy.git@14e64cc0ef7374df0cecc91923ff6901b0d721b7"
115115

116+
# Install PyAthena for Amazon Athena SQLAlchemy-based FDW, as well as pandas
117+
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
118+
pip install "PyAthena>=2.4.1" && \
119+
pip install "pandas>=1.0.0"
120+
116121
ENV PATH "${PATH}:/splitgraph/bin"
117122
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"
118123

splitgraph/config/keys.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
"csv": "splitgraph.ingestion.csv.CSVDataSource",
6767
"snowflake": "splitgraph.ingestion.snowflake.SnowflakeDataSource",
6868
"dbt": "splitgraph.ingestion.dbt.data_source.DBTDataSource",
69+
"athena": "splitgraph.ingestion.athena.AmazonAthenaDataSource",
6970
},
7071
}
7172

@@ -125,8 +126,8 @@
125126
"SG_ENGINE": """Current engine name in use by Splitgraph. By default, this is the local engine.
126127
127128
This can be overridden to make `sgr` use a different engine in cases where the `--remote` flag is not supported.""",
128-
"SG_LOGLEVEL": """Logging threshold (log messages not emitted below this).
129-
Accepted values are CRITICAL, ERROR, WARNING, INFO and DEBUG.
129+
"SG_LOGLEVEL": """Logging threshold (log messages not emitted below this).
130+
Accepted values are CRITICAL, ERROR, WARNING, INFO and DEBUG.
130131
This can also be changed by passing `--verbosity` to `sgr`, e.g. `sgr --verbosity DEBUG init`.""",
131132
"SG_ENGINE_PREFIX": "Prefix for Docker containers that are treated as Splitgraph engines by `sgr engine`.",
132133
"SG_NAMESPACE": "Namespace used by default when pushing to this engine, if not explicitly specified. Normally this is set to the user's username on the registry.",

splitgraph/ingestion/athena/BUILD

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
python_sources(
2+
skip_black=True,
3+
dependencies=[
4+
"src/py/splitgraph/splitgraph/resources/icons",
5+
],
6+
)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from typing import TYPE_CHECKING, Any, Dict, Optional
2+
3+
from splitgraph.core.types import Credentials, Params, TableInfo
4+
from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource
5+
from splitgraph.ingestion.common import build_commandline_help
6+
7+
if TYPE_CHECKING:
8+
from splitgraph.engine.postgres.engine import PostgresEngine
9+
10+
11+
class AmazonAthenaDataSource(ForeignDataWrapperDataSource):
12+
credentials_schema: Dict[str, Any] = {
13+
"type": "object",
14+
"properties": {
15+
"aws_access_key_id": {"type": "string", "title": "AWS Access Key Id"},
16+
"aws_secret_access_key": {"type": "string", "title": "AWS Secret Access Key"},
17+
},
18+
"required": ["aws_access_key_id", "aws_secret_access_key"],
19+
}
20+
21+
params_schema = {
22+
"type": "object",
23+
"properties": {
24+
"region_name": {
25+
"type": "string",
26+
"title": "S3 region",
27+
"description": "Region of the S3 bucket",
28+
},
29+
"schema_name": {
30+
"type": "string",
31+
"title": "Schema",
32+
"description": "Athena database name",
33+
},
34+
"s3_staging_dir": {
35+
"title": "S3 results folder",
36+
"type": "string",
37+
"description": "Folder for storing query output",
38+
},
39+
},
40+
"required": ["region_name", "schema_name", "s3_staging_dir"],
41+
}
42+
43+
supports_mount = True
44+
supports_load = True
45+
supports_sync = False
46+
47+
commandline_help = """Mount a Amazon Athena database.
48+
49+
This will mount an Athena schema or a table:
50+
51+
\b
52+
```
53+
$ sgr mount athena s3 -o@- <<EOF
54+
{
55+
"aws_access_key_id": "ABCD",
56+
"aws_secret_access_key": "abcd",
57+
"region_name": "eu-west-3",
58+
"schema_name": "mydatabase",
59+
"s3_staging_dir": "s3://my-bucket/output/",
60+
}
61+
EOF
62+
```
63+
"""
64+
65+
commandline_kwargs_help: str = (
66+
build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema)
67+
)
68+
69+
_icon_file = "athena.svg"
70+
71+
def __init__(
72+
self,
73+
engine: "PostgresEngine",
74+
credentials: Credentials,
75+
params: Params,
76+
tables: Optional[TableInfo] = None,
77+
):
78+
super().__init__(engine, credentials, params, tables)
79+
80+
def get_fdw_name(self):
81+
return "multicorn"
82+
83+
@classmethod
84+
def get_name(cls) -> str:
85+
return "Amazon Athena"
86+
87+
@classmethod
88+
def get_description(cls) -> str:
89+
return "Query data in Amazon S3 files and folders"
90+
91+
def get_table_options(
92+
self, table_name: str, tables: Optional[TableInfo] = None
93+
) -> Dict[str, str]:
94+
result = super().get_table_options(table_name, tables)
95+
result["tablename"] = result.get("tablename", table_name)
96+
return result
97+
98+
def get_server_options(self):
99+
options: Dict[str, Optional[str]] = {
100+
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
101+
"db_url": self._build_db_url(),
102+
"cast_quals": "true",
103+
}
104+
105+
# For some reason, in SQLAlchemy, if this is not passed
106+
# to the FDW params (even if it is in the DB URL), it doesn't
107+
# schema-qualify tables and server-side cursors don't work for scanning
108+
# (loads the whole table instead of scrolling through it).
109+
if "schema" in self.params:
110+
options["schema"] = self.params["schema"]
111+
112+
return options
113+
114+
def _build_db_url(self) -> str:
115+
"""Construct the SQLAlchemy Amazon Athena db_url"""
116+
117+
aws_access_key_id = self.credentials["aws_access_key_id"]
118+
aws_secret_access_key = self.credentials["aws_secret_access_key"]
119+
region_name = self.params["region_name"]
120+
schema_name = self.params["schema_name"]
121+
s3_staging_dir = self.params["s3_staging_dir"]
122+
123+
db_url = (
124+
f"awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@"
125+
f"athena.{region_name}.amazonaws.com:443/"
126+
f"{schema_name}?s3_staging_dir={s3_staging_dir}"
127+
)
128+
129+
return db_url
130+
131+
def get_remote_schema_name(self) -> str:
132+
if "schema_name" not in self.params:
133+
raise ValueError("Cannot IMPORT FOREIGN SCHEMA without a schema_name!")
134+
return str(self.params["schema_name"])

splitgraph/ingestion/csv/__init__.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -241,23 +241,29 @@ class CSVDataSource(ForeignDataWrapperDataSource):
241241
242242
If passed an URL, this will live query a CSV file on an HTTP server. If passed
243243
S3 access credentials, this will scan a bucket for CSV files, infer their schema
244-
and make them available to query over SQL.
244+
and make them available to query over SQL.
245245
246-
For example:
246+
For example:
247247
248248
\b
249249
```
250250
sgr mount csv target_schema -o@- <<EOF
251-
{
252-
"s3_endpoint": "cdn.mycompany.com:9000",
253-
"s3_access_key": "ABCDEF",
254-
"s3_secret_key": "GHIJKL",
255-
"s3_bucket": "data",
256-
"s3_object_prefix": "csv_files/current/",
251+
{
252+
"s3_access_key": "ABCD",
253+
"s3_secret_key": "abcd",
254+
"connection":
255+
{
256+
"connection_type": "s3",
257+
"s3_bucket": "my-bucket-name",
258+
"s3_endpoint": "s3.amazonaws.com",
259+
"s3_region": "eu-west-3",
260+
"s3_object_prefix": "",
261+
"s3_object": "iris/iris.csv"
262+
},
257263
"autodetect_header": true,
258264
"autodetect_dialect": true,
259265
"autodetect_encoding": true
260-
}
266+
}
261267
EOF
262268
```
263269
"""

splitgraph/ingestion/snowflake/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ def get_server_options(self):
228228

229229
if "batch_size" in self.params:
230230
options["batch_size"] = str(self.params["batch_size"])
231+
else:
232+
options["batch_size"] = "10000"
231233

232234
if self.credentials["secret"]["secret_type"] == "private_key":
233235
options["connect_args"] = json.dumps(
Lines changed: 25 additions & 0 deletions
Loading

0 commit comments

Comments
 (0)