Skip to content

Commit 384a362

Browse files
committed
Add response compilation and background websocket connection thread
1 parent 8abf968 commit 384a362

11 files changed

Lines changed: 718 additions & 115 deletions

File tree

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@ lint: format
1212
poetry run pylint py_boilingdata/*.py
1313

1414
build:
15-
poetry build
15+
poetry build
16+
17+
run:
18+
poetry run python main.py

README.md

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,37 @@
1-
# BoilingData WebSocket Client for Python
1+
# BoilingData Client for Python
22

3-
Simple client to send SQL queries and receive results from BoilingData WebSockets API.
3+
A python client for sending SQL queries and receive results from BoilingData WebSockets API.
44

5-
For example usage, please see [tests/test_connection.py](tests/test_connection.py).
5+
- This package uses `asyncio` and `threads` to make the `websocket.WebSocketApp()` run on the background.
6+
- For example usage, please see tests and [main.py](main.py). You can run the main.py with `make run`.
7+
- [`DataQueue class`](py_boilingdata/data_queue.py) is used for book keeping all incoming `DATA` messages and when all pieces are in place, order them and pass back via callback function.
8+
9+
> **NOTE:** This package is considered experimental. Feel free to suggest improvements, especially on how to make this module easy to use.
10+
11+
```python
12+
import asyncio
13+
from pprint import pprint
14+
from py_boilingdata import BoilingData
15+
16+
17+
async def main():
18+
boiling = BoilingData()
19+
await boiling.connect()
20+
await boiling.populate()
21+
22+
def cb(resp):
23+
pprint(resp)
24+
25+
q = "SELECT first_name, email FROM parquet_scan('s3://boilingdata-demo/test.parquet') LIMIT 10"
26+
await boiling.execute(q, cb)
27+
await asyncio.sleep(10) # There is no way to await for the cb call..
28+
print("DONE.")
29+
30+
loop = asyncio.get_event_loop()
31+
loop.run_until_complete(main())
32+
```
33+
34+
## Build and Install
635

736
```shell
837
make install

main.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import asyncio
2+
from pprint import pprint
3+
from py_boilingdata import BoilingData
4+
5+
6+
async def main():
7+
boiling = BoilingData()
8+
await boiling.connect()
9+
await boiling.populate()
10+
11+
def cb(resp):
12+
pprint(resp)
13+
14+
q = "SELECT first_name, email FROM parquet_scan('s3://boilingdata-demo/test.parquet') LIMIT 10"
15+
await boiling.execute(q, cb)
16+
await asyncio.sleep(10)
17+
print("DONE.")
18+
19+
20+
loop = asyncio.get_event_loop()
21+
loop.run_until_complete(main())

poetry.lock

Lines changed: 160 additions & 78 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

py_boilingdata/__init__.py

Lines changed: 179 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,128 @@
1-
"""BoilingData WebSockets Client module"""
2-
import os
3-
import websockets
1+
"""BoilingData Client"""
2+
import os, json, uuid, time
3+
import rel
4+
import duckdb
5+
import threading
6+
import websocket
47
import boto3
8+
import asyncio
59
import botocore.auth
10+
from pprint import pprint
611
from botocore.exceptions import NoCredentialsError
712
from botocore.awsrequest import AWSRequest
813
from botocore.credentials import Credentials
14+
from py_boilingdata.data_queue import DataQueue
15+
916

1017
# Preview environment in eu-west-1
18+
# TODO: Put in dotenv for example
1119
AWS_REGION = "eu-west-1"
1220
USER_POOL_ID = "eu-west-1_0GLV9KO1p"
1321
CLIENT_ID = "37f44ql7bp5p8fpk5qrh2sgu8"
14-
BOILING_WSS_URL = "wss://4rpyi2ae3f.execute-api.eu-west-1.amazonaws.com/prodbd/"
22+
# BOILING_WSS_URL = "wss://4rpyi2ae3f.execute-api.eu-west-1.amazonaws.com/prodbd/"
23+
BOILING_WSS_URL = "wss://e4f3t7fs58.execute-api.eu-west-1.amazonaws.com/devbd/"
1524
IDP_URL = "cognito-idp.eu-west-1.amazonaws.com/eu-west-1_0GLV9KO1p"
1625
IDENTITY_POOL_ID = "eu-west-1:bce21571-e3a6-47a4-8032-fd015213405f"
26+
BOILING_SEARCH_TERMS = [
27+
"'s3://",
28+
"glue('",
29+
"glue ('",
30+
"list('",
31+
"list ('",
32+
"share('",
33+
"share ('",
34+
"boilingdata",
35+
"boilingshares",
36+
]
1737

1838

1939
class BoilingData:
40+
"""Run SQL with BoilingData and local DuckDB"""
41+
42+
def __init__(self):
43+
self.bd_conn = BoilingDataConnection()
44+
self.conn = duckdb.connect(":memory:")
45+
46+
async def populate(self):
47+
self.conn.execute("ATTACH ':memory:' AS boilingdata;")
48+
self.conn.execute("SET search_path='memory,boilingdata';")
49+
# Boiling specific "information_schema" table
50+
q = "SELECT * FROM information_schema.create_tables_statements"
51+
52+
def cb(bd_tables):
53+
if bd_tables:
54+
for table in bd_tables:
55+
self.conn.execute(table)
56+
57+
await self.bd_conn.bd_execute(q, cb)
58+
59+
def _is_boiling_execute(self, sql):
60+
## 1) Get all Boiling tables so we know what to intercept
61+
q = """
62+
SELECT table_schema, table_name
63+
FROM information_schema.tables
64+
WHERE table_catalog = 'boilingdata';
65+
"""
66+
boiling_tables = self.conn.execute(q).fetchall()
67+
for table in boiling_tables:
68+
if (
69+
sql
70+
and table[0] in sql
71+
and table[1] in sql
72+
and "SELECT column_name, data_type AS column_type, is_nullable AS null"
73+
not in sql
74+
):
75+
return True
76+
## 2) static words
77+
__sql = sql.lower().replace('"', "")
78+
if (
79+
not sql.lower().startswith("prepare ")
80+
and not "information_schema" in sql.lower()
81+
and not "SHOW catalogs like" in sql
82+
and any(term in __sql for term in BOILING_SEARCH_TERMS)
83+
):
84+
return True
85+
return False
86+
87+
##
88+
## public
89+
##
90+
91+
async def connect(self):
92+
"""Connect to BoilingData"""
93+
await self.bd_conn.connect()
94+
95+
async def close(self):
96+
"""Close WebSocket connection to Boiling"""
97+
await self.bd_conn.close()
98+
99+
async def execute(self, sql, cb):
100+
"""Send SQL Query to Boiling or run locally"""
101+
if not self._is_boiling_execute(sql):
102+
return self.conn.execute(sql).fetchall()
103+
return await self.bd_conn.bd_execute(sql, cb)
104+
105+
106+
class BoilingDataConnection:
20107
"""Create authenticated WebSocket connection to BoilingData"""
21108

22109
def __init__(self, region=AWS_REGION):
23-
self.websocket = None
24-
self.aws_creds = None
25110
self.region = region
26-
self.id_client = boto3.client("cognito-identity")
27-
self.idp_client = boto3.client("cognito-idp")
28111
self.username = os.getenv("BD_USERNAME", "")
29112
self.password = os.getenv("BD_PASSWORD", "")
30113
if self.username == "" or self.password == "":
31114
raise ValueError(
32115
"Missing username (BD_USERNAME) and/or "
33116
+ "password (BD_PASSWORD) environment variable(s)"
34117
)
118+
self.websocket = None
119+
self.aws_creds = None
120+
self.ws_app = None
121+
self.ws_trace = False
122+
self.bd_is_open = False
123+
self.id_client = boto3.client("cognito-identity")
124+
self.idp_client = boto3.client("cognito-idp")
125+
self.requests = dict()
35126

36127
def _get_auth_headers(self):
37128
"""
@@ -44,7 +135,10 @@ def _get_auth_headers(self):
44135
request = AWSRequest(method="GET", url=BOILING_WSS_URL)
45136
signer = botocore.auth.SigV4Auth(credentials, "execute-api", self.region)
46137
signer.add_auth(request)
47-
return request.headers
138+
headers = dict()
139+
for key, value in request.headers.items():
140+
headers[key] = value
141+
return headers
48142

49143
def _get_cognito_tokens(self, username, password):
50144
try:
@@ -80,21 +174,86 @@ def _get_credentials(self):
80174
)
81175
return self.aws_creds
82176

177+
async def _ws_send(self, msg):
178+
print(f"> {msg}")
179+
return self.ws_app.send(msg)
180+
181+
def _on_open(self, ws_app):
182+
print("WS OPEN")
183+
self.bd_is_open = True
184+
185+
def _on_msg(self, ws_app, data):
186+
print(f"< {data}")
187+
msg = json.loads(data)
188+
reqId = msg.get("requestId")
189+
if not reqId:
190+
return
191+
msg_type = msg.get("messageType")
192+
if msg_type != "DATA":
193+
return
194+
req = self.requests.get(reqId)
195+
if not req:
196+
raise Exception(f"Could not find request queue for '{reqId}'")
197+
q = req["q"]
198+
q.push(msg)
199+
if q.is_done():
200+
q.delete()
201+
del self.requests[reqId]
202+
203+
def _on_error(self, ws_app, error):
204+
print(f"WS ERROR: {error}")
205+
206+
def _on_close(self, ws_app, code, msg):
207+
print(f"WS CLOSE: {code} {msg}")
208+
209+
##
210+
## public
211+
##
212+
213+
def _all_messages_received(self, event):
214+
requestId = event["requestId"]
215+
data = event["data"]
216+
cb = self.requests.get(requestId)
217+
cb["callback"](data)
218+
219+
async def bd_execute(self, sql, cb):
220+
if self.bd_is_open is not True:
221+
raise Exception("No Boiling connection")
222+
reqId = uuid.uuid4().hex
223+
body = '{"sql":"' + sql + '","requestId":"' + reqId + '"}'
224+
self.requests[reqId] = {
225+
"q": DataQueue(reqId, self._all_messages_received),
226+
"reqId": reqId,
227+
"callback": cb,
228+
}
229+
await self._ws_send(body)
230+
83231
async def connect(self):
84232
"""Connect to BoilingData WebSocket API"""
233+
if self.websocket is not None:
234+
raise Exception("WebSocket already exists")
235+
self.websocket = websocket.WebSocket()
236+
websocket.enableTrace(self.ws_trace)
85237
auth_headers = self._get_auth_headers()
86-
self.websocket = await websockets.connect(
87-
BOILING_WSS_URL, extra_headers=auth_headers
238+
self.ws_app = websocket.WebSocketApp(
239+
BOILING_WSS_URL,
240+
header=auth_headers,
241+
on_message=self._on_msg,
242+
on_error=self._on_error,
243+
on_close=self._on_close,
244+
on_open=self._on_open,
88245
)
89-
90-
async def send(self, msg):
91-
"""Send message to BoilingData"""
92-
return await self.websocket.send(msg)
246+
wst = threading.Thread(target=self.ws_app.run_forever)
247+
wst.daemon = True
248+
wst.start()
249+
timeoutS = 1
250+
while self.bd_is_open is not True and timeoutS < 10:
251+
await asyncio.sleep(1)
252+
timeoutS = timeoutS + 1
93253

94254
async def close(self):
95255
"""Close WebSocket connection to Boiling"""
96-
return await self.websocket.close()
97-
98-
async def recv(self):
99-
"""Receive message from Boiling"""
100-
return await self.websocket.recv()
256+
if self.ws_app:
257+
self.ws_app.close()
258+
if self.websocket:
259+
self.websocket.close()

0 commit comments

Comments
 (0)