Skip to content

Commit 672a767

Browse files
committed
IGNITE-14211 Remove existing cache requirement from SQL API
This closes #18
1 parent e5ca3fc commit 672a767

6 files changed

Lines changed: 98 additions & 39 deletions

File tree

pyignite/api/sql.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -283,36 +283,31 @@ def sql_fields(
283283
Performs SQL fields query.
284284
285285
:param conn: connection to Ignite server,
286-
:param cache: name or ID of the cache,
286+
:param cache: name or ID of the cache. If zero, then schema is used.
287287
:param query_str: SQL query string,
288288
:param page_size: cursor page size,
289289
:param query_args: (optional) query arguments. List of values or
290290
(value, type hint) tuples,
291-
:param schema: (optional) schema for the query. Defaults to `PUBLIC`,
291+
:param schema: schema for the query.
292292
:param statement_type: (optional) statement type. Can be:
293293
294294
* StatementType.ALL − any type (default),
295295
* StatementType.SELECT − select,
296296
* StatementType.UPDATE − update.
297297
298-
:param distributed_joins: (optional) distributed joins. Defaults to False,
298+
:param distributed_joins: (optional) distributed joins.
299299
:param local: (optional) pass True if this query should be executed
300-
on local node only. Defaults to False,
300+
on local node only.
301301
:param replicated_only: (optional) whether query contains only
302-
replicated tables or not. Defaults to False,
303-
:param enforce_join_order: (optional) enforce join order. Defaults
304-
to False,
302+
replicated tables or not.
303+
:param enforce_join_order: (optional) enforce join order.
305304
:param collocated: (optional) whether your data is co-located or not.
306-
Defaults to False,
307-
:param lazy: (optional) lazy query execution. Defaults to False,
305+
:param lazy: (optional) lazy query execution.
308306
:param include_field_names: (optional) include field names in result.
309-
Defaults to False,
310-
:param max_rows: (optional) query-wide maximum of rows. Defaults to -1
311-
(all rows),
307+
:param max_rows: (optional) query-wide maximum of rows.
312308
:param timeout: (optional) non-negative timeout value in ms. Zero disables
313-
timeout (default),
309+
timeout.
314310
:param binary: (optional) pass True to keep the value in binary form.
315-
False by default,
316311
:param query_id: (optional) a value generated by client and returned as-is
317312
in response.query_id. When the parameter is omitted, a random value
318313
is generated,

pyignite/client.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors,
5959
)
6060
from .utils import (
61-
capitalize, entity_id, schema_id, process_delimiter,
61+
cache_id, capitalize, entity_id, schema_id, process_delimiter,
6262
status_to_exception, is_iterable,
6363
)
6464
from .binary import GenericObjectMeta
@@ -513,13 +513,14 @@ def get_cache_names(self) -> list:
513513
return cache_get_names(self.random_node)
514514

515515
def sql(
516-
self, query_str: str, page_size: int = 1024, query_args: Iterable = None,
517-
schema: Union[int, str] = 'PUBLIC',
516+
self, query_str: str, page_size: int = 1024,
517+
query_args: Iterable = None, schema: str = 'PUBLIC',
518518
statement_type: int = 0, distributed_joins: bool = False,
519519
local: bool = False, replicated_only: bool = False,
520520
enforce_join_order: bool = False, collocated: bool = False,
521521
lazy: bool = False, include_field_names: bool = False,
522522
max_rows: int = -1, timeout: int = 0,
523+
cache: Union[int, str, Cache] = None
523524
):
524525
"""
525526
Runs an SQL query and returns its result.
@@ -553,6 +554,8 @@ def sql(
553554
(all rows),
554555
:param timeout: (optional) non-negative timeout value in ms.
555556
Zero disables timeout (default),
557+
:param cache (optional) Name or ID of the cache to use to infer schema.
558+
If set, 'schema' argument is ignored,
556559
:return: generator with result rows as a lists. If
557560
`include_field_names` was set, the first row will hold field names.
558561
"""
@@ -580,10 +583,13 @@ def generate_result(value):
580583

581584
conn = self.random_node
582585

583-
schema = self.get_cache(schema)
586+
c_id = cache.cache_id if isinstance(cache, Cache) else cache_id(cache)
587+
588+
if c_id != 0:
589+
schema = None
590+
584591
result = sql_fields(
585-
conn, schema.cache_id, query_str,
586-
page_size, query_args, schema.name,
592+
conn, c_id, query_str, page_size, query_args, schema,
587593
statement_type, distributed_joins, local, replicated_only,
588594
enforce_join_order, collocated, lazy, include_field_names,
589595
max_rows, timeout,

pyignite/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int:
105105

106106

107107
def __hashcode_fallback(data: Union[str, bytes, bytearray, memoryview]) -> int:
108+
if data is None:
109+
return 0
110+
108111
if isinstance(data, str):
109112
"""
110113
For strings we iterate over code point which are of the int type

tests/test_binary.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@
6363

6464

6565
def test_sql_read_as_binary(client):
66-
67-
client.get_or_create_cache(scheme_name)
6866
client.sql(drop_query)
6967

7068
# create table
@@ -92,9 +90,6 @@ def test_sql_read_as_binary(client):
9290

9391

9492
def test_sql_write_as_binary(client):
95-
96-
client.get_or_create_cache(scheme_name)
97-
9893
# configure cache as an SQL table
9994
type_name = table_cache_name
10095

tests/test_cache_class.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ def test_cache_remove(client):
6262

6363

6464
def test_cache_get(client):
65-
client.get_or_create_cache('my_cache')
66-
67-
my_cache = client.get_cache('my_cache')
65+
my_cache = client.get_or_create_cache('my_cache')
6866
assert my_cache.settings[PROP_NAME] == 'my_cache'
6967
my_cache.destroy()
7068

tests/test_sql.py

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
sql, sql_cursor_get_page,
2121
cache_get_configuration,
2222
)
23+
from pyignite.datatypes.cache_config import CacheMode
2324
from pyignite.datatypes.prop_codes import *
2425
from pyignite.exceptions import SQLError
2526
from pyignite.utils import entity_id
2627
from pyignite.binary import unwrap_binary
2728

28-
2929
initial_data = [
3030
('John', 'Doe', 5),
3131
('Jane', 'Roe', 4),
@@ -59,9 +59,10 @@ def test_sql(client):
5959

6060
result = sql_fields(
6161
conn,
62-
'PUBLIC',
62+
0,
6363
create_query,
6464
page_size,
65+
schema='PUBLIC',
6566
include_field_names=True
6667
)
6768
assert result.status == 0, result.message
@@ -70,9 +71,10 @@ def test_sql(client):
7071
fname, lname, grade = data_line
7172
result = sql_fields(
7273
conn,
73-
'PUBLIC',
74+
0,
7475
insert_query,
7576
page_size,
77+
schema='PUBLIC',
7678
query_args=[i, fname, lname, grade],
7779
include_field_names=True
7880
)
@@ -108,7 +110,7 @@ def test_sql(client):
108110
assert data.type_id == entity_id(binary_type_name)
109111

110112
# repeat cleanup
111-
result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
113+
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
112114
assert result.status == 0
113115

114116

@@ -121,9 +123,10 @@ def test_sql_fields(client):
121123

122124
result = sql_fields(
123125
conn,
124-
'PUBLIC',
126+
0,
125127
create_query,
126128
page_size,
129+
schema='PUBLIC',
127130
include_field_names=True
128131
)
129132
assert result.status == 0, result.message
@@ -132,19 +135,21 @@ def test_sql_fields(client):
132135
fname, lname, grade = data_line
133136
result = sql_fields(
134137
conn,
135-
'PUBLIC',
138+
0,
136139
insert_query,
137140
page_size,
141+
schema='PUBLIC',
138142
query_args=[i, fname, lname, grade],
139143
include_field_names=True
140144
)
141145
assert result.status == 0, result.message
142146

143147
result = sql_fields(
144148
conn,
145-
'PUBLIC',
149+
0,
146150
select_query,
147151
page_size,
152+
schema='PUBLIC',
148153
include_field_names=True
149154
)
150155
assert result.status == 0
@@ -159,7 +164,7 @@ def test_sql_fields(client):
159164
assert result.value['more'] is False
160165

161166
# repeat cleanup
162-
result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
167+
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
163168
assert result.status == 0
164169

165170

@@ -176,7 +181,7 @@ def test_long_multipage_query(client):
176181

177182
client.sql('DROP TABLE LongMultipageQuery IF EXISTS')
178183

179-
client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" % \
184+
client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" %
180185
(fields[0] + " INT(11) PRIMARY KEY", ",".join(map(lambda f: f + " INT(11)", fields[1:]))))
181186

182187
for id in range(1, 21):
@@ -193,6 +198,63 @@ def test_long_multipage_query(client):
193198
client.sql(drop_query)
194199

195200

196-
def test_sql_not_create_cache(client):
201+
def test_sql_not_create_cache_with_schema(client):
197202
with pytest.raises(SQLError, match=r".*Cache does not exist.*"):
198-
client.sql(schema='IS_NOT_EXISTING', query_str='select * from IsNotExisting')
203+
client.sql(schema=None, cache='NOT_EXISTING', query_str='select * from NotExisting')
204+
205+
206+
def test_sql_not_create_cache_with_cache(client):
207+
with pytest.raises(SQLError, match=r".*Failed to set schema.*"):
208+
client.sql(schema='NOT_EXISTING', query_str='select * from NotExisting')
209+
210+
211+
def test_query_with_cache(client):
212+
test_key = 42
213+
test_value = 'Lorem ipsum'
214+
215+
cache_name = test_query_with_cache.__name__.upper()
216+
schema_name = f'{cache_name}_schema'.upper()
217+
table_name = f'{cache_name}_table'.upper()
218+
219+
cache = client.create_cache({
220+
PROP_NAME: cache_name,
221+
PROP_SQL_SCHEMA: schema_name,
222+
PROP_CACHE_MODE: CacheMode.PARTITIONED,
223+
PROP_QUERY_ENTITIES: [
224+
{
225+
'table_name': table_name,
226+
'key_field_name': 'KEY',
227+
'value_field_name': 'VALUE',
228+
'key_type_name': 'java.lang.Long',
229+
'value_type_name': 'java.lang.String',
230+
'query_indexes': [],
231+
'field_name_aliases': [],
232+
'query_fields': [
233+
{
234+
'name': 'KEY',
235+
'type_name': 'java.lang.Long',
236+
'is_key_field': True,
237+
'is_notnull_constraint_field': True,
238+
},
239+
{
240+
'name': 'VALUE',
241+
'type_name': 'java.lang.String',
242+
},
243+
],
244+
},
245+
],
246+
})
247+
248+
cache.put(test_key, test_value)
249+
250+
args_to_check = [
251+
('schema', schema_name),
252+
('cache', cache),
253+
('cache', cache.name),
254+
('cache', cache.cache_id)
255+
]
256+
257+
for param, value in args_to_check:
258+
page = client.sql(f'select value from {table_name}', **{param: value})
259+
received = next(page)[0]
260+
assert test_value == received

0 commit comments

Comments
 (0)