forked from tursodatabase/libsql-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdialect.py
More file actions
119 lines (94 loc) · 3.48 KB
/
dialect.py
File metadata and controls
119 lines (94 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import urllib.parse
from sqlalchemy import util
from sqlalchemy.dialects import registry as _registry
from sqlalchemy.dialects.sqlite.pysqlite import SQLiteDialect_pysqlite
__version__ = "0.1.0-pre"
_registry.register(
"sqlite.libsql", "sqlalchemy_libsql", "SQLiteDialect_libsql"
)
def _build_connection_url(url, query, secure):
# sorting of keys is for unit test support
query_str = urllib.parse.urlencode(sorted(query.items()))
if not url.host:
if query_str:
return f"{url.database}?{query_str}"
return url.database
elif secure: # yes, pop to remove
scheme = "wss"
else:
scheme = "ws"
if url.username and url.password:
netloc = f"{url.username}:{url.password}@{url.host}"
elif url.username:
netloc = f"{url.username}@{url.host}"
else:
netloc = url.host
if url.port:
netloc += f":{url.port}"
return urllib.parse.urlunsplit(
(
scheme,
netloc,
url.database or "",
query_str,
"", # fragment
)
)
class SQLiteDialect_libsql(SQLiteDialect_pysqlite):
driver = "libsql"
# need to be set explicitly
supports_statement_cache = SQLiteDialect_pysqlite.supports_statement_cache
@classmethod
def import_dbapi(cls):
import libsql
return libsql
def on_connect(self):
import libsql
sqlite3_connect = super().on_connect()
def connect(conn):
# LibSQL: there is no support for create_function()
if isinstance(conn, Connection):
return
return sqlite3_connect(conn)
return connect
def create_connect_args(self, url):
pysqlite_args = (
("uri", bool),
("timeout", float),
("isolation_level", str),
("detect_types", int),
("check_same_thread", bool),
("cached_statements", int),
("secure", bool), # LibSQL extra, selects between ws and wss
)
opts = url.query
libsql_opts = {}
for key, type_ in pysqlite_args:
util.coerce_kw_type(opts, key, type_, dest=libsql_opts)
if url.host:
libsql_opts["uri"] = True
if libsql_opts.get("uri", False):
uri_opts = dict(opts)
# here, we are actually separating the parameters that go to
# sqlite3/pysqlite vs. those that go the SQLite URI. What if
# two names conflict? again, this seems to be not the case right
# now, and in the case that new names are added to
# either side which overlap, again the sqlite3/pysqlite parameters
# can be passed through connect_args instead of in the URL.
# If SQLite native URIs add a parameter like "timeout" that
# we already have listed here for the python driver, then we need
# to adjust for that here.
for key, type_ in pysqlite_args:
uri_opts.pop(key, None)
secure = libsql_opts.pop("secure", False)
connect_url = _build_connection_url(url, uri_opts, secure)
else:
connect_url = url.database or ":memory:"
if connect_url != ":memory:":
connect_url = os.path.abspath(connect_url)
libsql_opts.setdefault(
"check_same_thread", not self._is_url_file_db(url)
)
return ([connect_url], libsql_opts)
dialect = SQLiteDialect_libsql