Skip to content

Commit 9945ecb

Browse files
committed
IGNITE-12467 Implement transactions, rewrite async connections using protocol and transport - Fixes #40.
1 parent fa364df commit 9945ecb

31 files changed

Lines changed: 1114 additions & 606 deletions
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one or more
2+
contributor license agreements. See the NOTICE file distributed with
3+
this work for additional information regarding copyright ownership.
4+
The ASF licenses this file to You under the Apache License, Version 2.0
5+
(the "License"); you may not use this file except in compliance with
6+
the License. You may obtain a copy of the License at
7+
8+
.. http://www.apache.org/licenses/LICENSE-2.0
9+
10+
.. Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
16+
pyignite.datatypes.transactions module
17+
=======================================
18+
19+
.. automodule:: pyignite.datatypes.transactions
20+
:members:
21+
:show-inheritance:

docs/source/pyignite.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Submodules
4141
pyignite.aio_client
4242
pyignite.cluster
4343
pyignite.aio_cluster
44+
pyignite.transaction
4445
pyignite.cursors
4546
pyignite.exceptions
4647

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one or more
2+
contributor license agreements. See the NOTICE file distributed with
3+
this work for additional information regarding copyright ownership.
4+
The ASF licenses this file to You under the Apache License, Version 2.0
5+
(the "License"); you may not use this file except in compliance with
6+
the License. You may obtain a copy of the License at
7+
8+
.. http://www.apache.org/licenses/LICENSE-2.0
9+
10+
.. Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
16+
pyignite.transaction module
17+
=========================
18+
19+
.. automodule:: pyignite.transaction
20+
:members:
21+
:undoc-members:
22+
:show-inheritance:

pyignite/aio_cache.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import asyncio
1616
from typing import Any, Iterable, Optional, Union
1717

18+
from .api.tx_api import get_tx_connection
1819
from .datatypes import ExpiryPolicy
1920
from .datatypes.internal import AnyDataObject
2021
from .exceptions import CacheCreationError, CacheError, ParameterError
@@ -91,6 +92,9 @@ def __init__(self, client: 'AioClient', name: str, expiry_policy: ExpiryPolicy =
9192
super().__init__(client, name, expiry_policy)
9293

9394
async def _get_best_node(self, key=None, key_hint=None):
95+
tx_conn = get_tx_connection()
96+
if tx_conn:
97+
return tx_conn
9498
return await self.client.get_best_node(self, key, key_hint)
9599

96100
async def settings(self) -> Optional[dict]:

pyignite/aio_client.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
# limitations under the License.
1515
import asyncio
1616
import random
17+
import sys
1718
from itertools import chain
18-
from typing import Iterable, Type, Union, Any, Dict
19+
from typing import Iterable, Type, Union, Any, Dict, Optional
1920

2021
from .aio_cluster import AioCluster
2122
from .api import cache_get_node_partitions_async
@@ -27,10 +28,11 @@
2728
from .aio_cache import AioCache, get_cache, create_cache, get_or_create_cache
2829
from .connection import AioConnection
2930
from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
30-
from .datatypes import BinaryObject
31-
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
32-
from .queries.query import CacheInfo
31+
from .datatypes import BinaryObject, TransactionConcurrency, TransactionIsolation
32+
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors, NotSupportedError
33+
from .queries.cache_info import CacheInfo
3334
from .stream import AioBinaryStream, READ_BACKWARD
35+
from .transaction import AioTransaction
3436
from .utils import cache_id, entity_id, status_to_exception
3537

3638

@@ -471,9 +473,9 @@ def sql(
471473
elif isinstance(cache, AioCache):
472474
c_info = cache.cache_info
473475
else:
474-
c_info = None
476+
c_info = CacheInfo(protocol_context=self.protocol_context)
475477

476-
if c_info:
478+
if c_info.cache_id:
477479
schema = None
478480

479481
return AioSqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type,
@@ -487,3 +489,21 @@ def get_cluster(self) -> 'AioCluster':
487489
:return: :py:class:`~pyignite.aio_cluster.AioCluster` instance.
488490
"""
489491
return AioCluster(self)
492+
493+
def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
494+
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
495+
timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction':
496+
"""
497+
Start async thin client transaction.
498+
499+
:param concurrency: (optional) transaction concurrency, see
500+
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
501+
:param isolation: (optional) transaction isolation level, see
502+
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
503+
:param timeout: (optional) transaction timeout in seconds if float, in millis if int
504+
:param label: (optional) transaction label.
505+
:return: :py:class:`~pyignite.transaction.AioTransaction` instance.
506+
"""
507+
if sys.version_info < (3, 7):
508+
raise NotSupportedError(f"Transactions are not supported in async client on current python {sys.version}")
509+
return AioTransaction(self, concurrency, isolation, timeout, label)

pyignite/api/affinity.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,27 +68,23 @@
6868
])
6969

7070

71-
def cache_get_node_partitions(conn: 'Connection', caches: Union[int, Iterable[int]], query_id: int = None) -> APIResult:
71+
def cache_get_node_partitions(conn: 'Connection', caches: Union[int, Iterable[int]]) -> APIResult:
7272
"""
7373
Gets partition mapping for an Ignite cache or a number of caches. See
7474
“IEP-23: Best Effort Affinity for thin clients”.
7575
7676
:param conn: connection to Ignite server,
77-
:param caches: cache ID(s) the mapping is provided for,
78-
:param query_id: (optional) a value generated by client and returned as-is
79-
in response.query_id. When the parameter is omitted, a random value
80-
is generated,
77+
:param caches: cache ID(s) the mapping is provided for
8178
:return: API result data object.
8279
"""
83-
return __cache_get_node_partitions(conn, caches, query_id)
80+
return __cache_get_node_partitions(conn, caches)
8481

8582

86-
async def cache_get_node_partitions_async(conn: 'AioConnection', caches: Union[int, Iterable[int]],
87-
query_id: int = None) -> APIResult:
83+
async def cache_get_node_partitions_async(conn: 'AioConnection', caches: Union[int, Iterable[int]]) -> APIResult:
8884
"""
8985
Async version of cache_get_node_partitions.
9086
"""
91-
return await __cache_get_node_partitions(conn, caches, query_id)
87+
return await __cache_get_node_partitions(conn, caches)
9288

9389

9490
def __post_process_partitions(result):
@@ -135,13 +131,12 @@ def __post_process_partitions(result):
135131
return result
136132

137133

138-
def __cache_get_node_partitions(conn, caches, query_id):
134+
def __cache_get_node_partitions(conn, caches):
139135
query_struct = Query(
140136
OP_CACHE_PARTITIONS,
141137
[
142138
('cache_ids', cache_ids),
143-
],
144-
query_id=query_id
139+
]
145140
)
146141
if not is_iterable(caches):
147142
caches = [caches]

pyignite/api/binary.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,30 @@
2626
from ..queries.response import BinaryTypeResponse
2727

2828

29-
def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
29+
def get_binary_type(conn: 'Connection', binary_type: Union[str, int]) -> APIResult:
3030
"""
3131
Gets the binary type information by type ID.
3232
3333
:param conn: connection to Ignite server,
3434
:param binary_type: binary type name or ID,
35-
:param query_id: (optional) a value generated by client and returned as-is
36-
in response.query_id. When the parameter is omitted, a random value
37-
is generated,
3835
:return: API result data object.
3936
"""
40-
return __get_binary_type(conn, binary_type, query_id)
37+
return __get_binary_type(conn, binary_type)
4138

4239

43-
async def get_binary_type_async(conn: 'AioConnection', binary_type: Union[str, int], query_id=None) -> APIResult:
40+
async def get_binary_type_async(conn: 'AioConnection', binary_type: Union[str, int]) -> APIResult:
4441
"""
4542
Async version of get_binary_type.
4643
"""
47-
return await __get_binary_type(conn, binary_type, query_id)
44+
return await __get_binary_type(conn, binary_type)
4845

4946

50-
def __get_binary_type(conn, binary_type, query_id):
47+
def __get_binary_type(conn, binary_type):
5148
query_struct = Query(
5249
OP_GET_BINARY_TYPE,
5350
[
5451
('type_id', Int),
5552
],
56-
query_id=query_id,
5753
response_type=BinaryTypeResponse
5854
)
5955

@@ -63,7 +59,7 @@ def __get_binary_type(conn, binary_type, query_id):
6359

6460

6561
def put_binary_type(connection: 'Connection', type_name: str, affinity_key_field: str = None,
66-
is_enum=False, schema: dict = None, query_id=None) -> APIResult:
62+
is_enum=False, schema: dict = None) -> APIResult:
6763
"""
6864
Registers binary type information in cluster.
6965
@@ -76,20 +72,17 @@ def put_binary_type(connection: 'Connection', type_name: str, affinity_key_field
7672
parameter names as keys and an integers as values. When register binary
7773
type, pass a dict of field names: field types. Binary type with no fields
7874
is OK,
79-
:param query_id: (optional) a value generated by client and returned as-is
80-
in response.query_id. When the parameter is omitted, a random value
81-
is generated,
8275
:return: API result data object.
8376
"""
84-
return __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id)
77+
return __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema)
8578

8679

8780
async def put_binary_type_async(connection: 'AioConnection', type_name: str, affinity_key_field: str = None,
8881
is_enum=False, schema: dict = None, query_id=None) -> APIResult:
8982
"""
9083
Async version of put_binary_type.
9184
"""
92-
return await __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id)
85+
return await __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema)
9386

9487

9588
def __post_process_put_binary(type_id):
@@ -103,7 +96,7 @@ def internal(result):
10396
return internal
10497

10598

106-
def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id):
99+
def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema):
107100
# prepare data
108101
if schema is None:
109102
schema = {}
@@ -158,8 +151,7 @@ def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema
158151
('is_enum', Bool),
159152
('enums', enum_struct),
160153
('schema', schema_struct),
161-
],
162-
query_id=query_id,
154+
]
163155
)
164156
else:
165157
query_struct = Query(
@@ -171,8 +163,7 @@ def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema
171163
('binary_fields', binary_fields_struct),
172164
('is_enum', Bool),
173165
('schema', schema_struct),
174-
],
175-
query_id=query_id,
166+
]
176167
)
177168
return query_perform(query_struct, connection, query_params=data,
178169
post_process_fun=__post_process_put_binary(type_id))

0 commit comments

Comments
 (0)