Skip to content

Commit de07126

Browse files
committed
IGNITE-15118 Implement handshake timeout - Fixes #47.
1 parent 82f29e2 commit de07126

15 files changed

Lines changed: 394 additions & 78 deletions

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ jobs:
5151
env: TOXENV=py39
5252

5353
install: pip install tox
54-
script: tox
54+
script: tox

examples/transactions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ def sync_example():
130130

131131

132132
if __name__ == '__main__':
133+
client = Client()
134+
with client.connect('127.0.0.1', 10800):
135+
if not client.protocol_context.is_transactions_supported():
136+
print("'Transactions' API is not supported by cluster. Finishing...")
137+
exit(0)
138+
133139
print("Starting sync example")
134140
sync_example()
135141

pyignite/aio_client.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
6565
"""
6666
Initialize client.
6767
68+
For the use of the SSL-related parameters see
69+
https://docs.python.org/3/library/ssl.html#ssl-certificates.
70+
6871
:param compact_footer: (optional) use compact (True, recommended) or
6972
full (False) schema approach when serializing Complex objects.
7073
Default is to use the same approach the server is using (None).
@@ -73,7 +76,37 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
7376
:param partition_aware: (optional) try to calculate the exact data
7477
placement from the key before to issue the key operation to the
7578
server node, `True` by default,
76-
:param event_listeners: (optional) event listeners.
79+
:param event_listeners: (optional) event listeners,
80+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
81+
with node. Default is 10.0 seconds,
82+
:param use_ssl: (optional) set to True if Ignite server uses SSL
83+
on its binary connector. Defaults to use SSL when username
84+
and password has been supplied, not to use SSL otherwise,
85+
:param ssl_version: (optional) SSL version constant from standard
86+
`ssl` module. Defaults to TLS v1.2,
87+
:param ssl_ciphers: (optional) ciphers to use. If not provided,
88+
`ssl` default ciphers are used,
89+
:param ssl_cert_reqs: (optional) determines how the remote side
90+
certificate is treated:
91+
92+
* `ssl.CERT_NONE` − remote certificate is ignored (default),
93+
* `ssl.CERT_OPTIONAL` − remote certificate will be validated,
94+
if provided,
95+
* `ssl.CERT_REQUIRED` − valid remote certificate is required,
96+
97+
:param ssl_keyfile: (optional) a path to SSL key file to identify
98+
local (client) party,
99+
:param ssl_keyfile_password: (optional) password for SSL key file,
100+
can be provided when key file is encrypted to prevent OpenSSL
101+
password prompt,
102+
:param ssl_certfile: (optional) a path to ssl certificate file
103+
to identify local (client) party,
104+
:param ssl_ca_certfile: (optional) a path to a trusted certificate
105+
or a certificate chain. Required to check the validity of the remote
106+
(server-side) certificate,
107+
:param username: (optional) user name to authenticate to Ignite
108+
cluster,
109+
:param password: (optional) password to authenticate to Ignite cluster.
77110
"""
78111
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
79112
self._registry_mux = asyncio.Lock()

pyignite/client.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
346346
"""
347347
Initialize client.
348348
349+
For the use of the SSL-related parameters see
350+
https://docs.python.org/3/library/ssl.html#ssl-certificates.
351+
349352
:param compact_footer: (optional) use compact (True, recommended) or
350353
full (False) schema approach when serializing Complex objects.
351354
Default is to use the same approach the server is using (None).
@@ -354,7 +357,41 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
354357
:param partition_aware: (optional) try to calculate the exact data
355358
placement from the key before to issue the key operation to the
356359
server node, `True` by default,
357-
:param event_listeners: (optional) event listeners.
360+
:param event_listeners: (optional) event listeners,
361+
:param timeout: (optional) sets timeout (in seconds) for each socket
362+
operation including `connect`. 0 means non-blocking mode, which is
363+
virtually guaranteed to fail. Can accept integer or float value.
364+
Default is None (blocking mode),
365+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
366+
with node. Default is 10.0 seconds,
367+
:param use_ssl: (optional) set to True if Ignite server uses SSL
368+
on its binary connector. Defaults to use SSL when username
369+
and password has been supplied, not to use SSL otherwise,
370+
:param ssl_version: (optional) SSL version constant from standard
371+
`ssl` module. Defaults to TLS v1.2,
372+
:param ssl_ciphers: (optional) ciphers to use. If not provided,
373+
`ssl` default ciphers are used,
374+
:param ssl_cert_reqs: (optional) determines how the remote side
375+
certificate is treated:
376+
377+
* `ssl.CERT_NONE` − remote certificate is ignored (default),
378+
* `ssl.CERT_OPTIONAL` − remote certificate will be validated,
379+
if provided,
380+
* `ssl.CERT_REQUIRED` − valid remote certificate is required,
381+
382+
:param ssl_keyfile: (optional) a path to SSL key file to identify
383+
local (client) party,
384+
:param ssl_keyfile_password: (optional) password for SSL key file,
385+
can be provided when key file is encrypted to prevent OpenSSL
386+
password prompt,
387+
:param ssl_certfile: (optional) a path to ssl certificate file
388+
to identify local (client) party,
389+
:param ssl_ca_certfile: (optional) a path to a trusted certificate
390+
or a certificate chain. Required to check the validity of the remote
391+
(server-side) certificate,
392+
:param username: (optional) user name to authenticate to Ignite
393+
cluster,
394+
:param password: (optional) password to authenticate to Ignite cluster.
358395
"""
359396
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
360397

pyignite/connection/aio_connection.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,13 @@ def __init__(self, client: 'AioClient', host: str, port: int, username: str = No
118118
:param client: Ignite client object,
119119
:param host: Ignite server node's host name or IP,
120120
:param port: Ignite server node's port number,
121+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
122+
with node. Default is 10.0 seconds,
121123
:param use_ssl: (optional) set to True if Ignite server uses SSL
122124
on its binary connector. Defaults to use SSL when username
123125
and password has been supplied, not to use SSL otherwise,
124126
:param ssl_version: (optional) SSL version constant from standard
125-
`ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
127+
`ssl` module. Defaults to TLS v1.2,
126128
:param ssl_ciphers: (optional) ciphers to use. If not provided,
127129
`ssl` default ciphers are used,
128130
:param ssl_cert_reqs: (optional) determines how the remote side
@@ -165,7 +167,6 @@ async def connect(self):
165167
"""
166168
if self.alive:
167169
return
168-
self._closed = False
169170
await self._connect()
170171

171172
async def _connect(self):
@@ -176,27 +177,28 @@ async def _connect(self):
176177
detecting_protocol = True
177178
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
178179

179-
try:
180-
self._on_handshake_start()
181-
result = await self._connect_version()
182-
except HandshakeError as e:
183-
if e.expected_version in PROTOCOLS:
184-
self.client.protocol_context.version = e.expected_version
180+
while True:
181+
try:
182+
self._on_handshake_start()
185183
result = await self._connect_version()
186-
else:
184+
self._on_handshake_success(result)
185+
return
186+
except HandshakeError as e:
187+
if e.expected_version in PROTOCOLS:
188+
self.client.protocol_context.version = e.expected_version
189+
continue
190+
else:
191+
self._on_handshake_fail(e)
192+
raise e
193+
except AuthenticationError as e:
187194
self._on_handshake_fail(e)
188195
raise e
189-
except AuthenticationError as e:
190-
self._on_handshake_fail(e)
191-
raise e
192-
except Exception as e:
193-
self._on_handshake_fail(e)
194-
# restore undefined protocol version
195-
if detecting_protocol:
196-
self.client.protocol_context = None
197-
raise e
198-
199-
self._on_handshake_success(result)
196+
except Exception as e:
197+
self._on_handshake_fail(e)
198+
# restore undefined protocol version
199+
if detecting_protocol:
200+
self.client.protocol_context = None
201+
raise e
200202

201203
def process_connection_lost(self, err, reconnect=False):
202204
self.failed = True
@@ -225,9 +227,13 @@ async def _connect_version(self) -> Union[dict, OrderedDict]:
225227

226228
ssl_context = create_ssl_context(self.ssl_params)
227229
handshake_fut = self._loop.create_future()
230+
self._closed = False
228231
self._transport, _ = await self._loop.create_connection(lambda: BaseProtocol(self, handshake_fut),
229232
host=self.host, port=self.port, ssl=ssl_context)
230-
hs_response = await handshake_fut
233+
try:
234+
hs_response = await asyncio.wait_for(handshake_fut, self.handshake_timeout)
235+
except asyncio.TimeoutError:
236+
raise ConnectionError('timed out')
231237

232238
if hs_response.op_code == 0:
233239
await self.close()

pyignite/connection/connection.py

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from typing import Union
2020

2121
from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER
22-
from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
22+
from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError, ParameterError
2323
from .bitmask_feature import BitmaskFeature
2424

2525
from .handshake import HandshakeRequest, HandshakeResponse
@@ -34,14 +34,18 @@
3434

3535
class BaseConnection:
3636
def __init__(self, client, host: str = None, port: int = None, username: str = None, password: str = None,
37-
**ssl_params):
37+
handshake_timeout: float = 10.0, **ssl_params):
3838
self.client = client
39+
self.handshake_timeout = handshake_timeout
3940
self.host = host if host else IGNITE_DEFAULT_HOST
4041
self.port = port if port else IGNITE_DEFAULT_PORT
4142
self.username = username
4243
self.password = password
4344
self.uuid = None
4445

46+
if handshake_timeout <= 0.0:
47+
raise ParameterError("handshake_timeout should be positive")
48+
4549
check_ssl_params(ssl_params)
4650

4751
if self.username and self.password and 'use_ssl' not in ssl_params:
@@ -162,8 +166,9 @@ class Connection(BaseConnection):
162166
* binary protocol connector. Encapsulates handshake and failover reconnection.
163167
"""
164168

165-
def __init__(self, client: 'Client', host: str, port: int, timeout: float = None,
166-
username: str = None, password: str = None, **ssl_params):
169+
def __init__(self, client: 'Client', host: str, port: int, username: str = None, password: str = None,
170+
timeout: float = None, handshake_timeout: float = 10.0,
171+
**ssl_params):
167172
"""
168173
Initialize connection.
169174
@@ -177,11 +182,13 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None
177182
operation including `connect`. 0 means non-blocking mode, which is
178183
virtually guaranteed to fail. Can accept integer or float value.
179184
Default is None (blocking mode),
185+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
186+
with node. Default is 10.0.
180187
:param use_ssl: (optional) set to True if Ignite server uses SSL
181188
on its binary connector. Defaults to use SSL when username
182189
and password has been supplied, not to use SSL otherwise,
183190
:param ssl_version: (optional) SSL version constant from standard
184-
`ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
191+
`ssl` module. Defaults to TLS v1.2,
185192
:param ssl_ciphers: (optional) ciphers to use. If not provided,
186193
`ssl` default ciphers are used,
187194
:param ssl_cert_reqs: (optional) determines how the remote side
@@ -206,7 +213,7 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None
206213
cluster,
207214
:param password: (optional) password to authenticate to Ignite cluster.
208215
"""
209-
super().__init__(client, host, port, username, password, **ssl_params)
216+
super().__init__(client, host, port, username, password, handshake_timeout, **ssl_params)
210217
self.timeout = timeout
211218
self._socket = None
212219

@@ -225,27 +232,29 @@ def connect(self):
225232
detecting_protocol = True
226233
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
227234

228-
try:
229-
self._on_handshake_start()
230-
result = self._connect_version()
231-
except HandshakeError as e:
232-
if e.expected_version in PROTOCOLS:
233-
self.client.protocol_context.version = e.expected_version
235+
while True:
236+
try:
237+
self._on_handshake_start()
234238
result = self._connect_version()
235-
else:
239+
self._socket.settimeout(self.timeout)
240+
self._on_handshake_success(result)
241+
return
242+
except HandshakeError as e:
243+
if e.expected_version in PROTOCOLS:
244+
self.client.protocol_context.version = e.expected_version
245+
continue
246+
else:
247+
self._on_handshake_fail(e)
248+
raise e
249+
except AuthenticationError as e:
236250
self._on_handshake_fail(e)
237251
raise e
238-
except AuthenticationError as e:
239-
self._on_handshake_fail(e)
240-
raise e
241-
except Exception as e:
242-
self._on_handshake_fail(e)
243-
# restore undefined protocol version
244-
if detecting_protocol:
245-
self.client.protocol_context = None
246-
raise e
247-
248-
self._on_handshake_success(result)
252+
except Exception as e:
253+
self._on_handshake_fail(e)
254+
# restore undefined protocol version
255+
if detecting_protocol:
256+
self.client.protocol_context = None
257+
raise e
249258

250259
def _connect_version(self) -> Union[dict, OrderedDict]:
251260
"""
@@ -254,7 +263,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]:
254263
"""
255264

256265
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
257-
self._socket.settimeout(self.timeout)
266+
self._socket.settimeout(self.handshake_timeout)
258267
self._socket = wrap(self._socket, self.ssl_params)
259268
self._socket.connect((self.host, self.port))
260269

pyignite/connection/protocol_context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ def __eq__(self, other):
4040
def __str__(self):
4141
return f'ProtocolContext(version={self._version}, features={self._features})'
4242

43+
def __repr__(self):
44+
return self.__str__()
45+
4346
def _ensure_consistency(self):
4447
if not self.is_feature_flags_supported():
4548
self._features = None

pyignite/transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424

2525
def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]):
26-
if value not in cls:
26+
if value not in set(v.value for v in cls): # Use this trick to disable warning on python 3.7
2727
raise ValueError(f'{value} not in {cls}')
2828
return value
2929

tests/common/test_query_listener.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from pyignite import Client, AioClient
1818
from pyignite.exceptions import CacheError
1919
from pyignite.monitoring import QueryEventListener, QueryStartEvent, QueryFailEvent, QuerySuccessEvent
20-
from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CLUSTER_GET_STATE
20+
from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CACHE_GET_NAMES
2121

2222
events = []
2323

@@ -93,17 +93,17 @@ def __assert_fail_events(client):
9393
assert ev.port == conn.port
9494
assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
9595
assert 'Cache does not exist' in ev.err_msg
96-
assert ev.duration > 0
96+
assert ev.duration >= 0
9797

9898

9999
def test_query_success_events(client):
100-
client.get_cluster().get_state()
100+
client.get_cache_names()
101101
__assert_success_events(client)
102102

103103

104104
@pytest.mark.asyncio
105105
async def test_query_success_events_async(async_client):
106-
await async_client.get_cluster().get_state()
106+
await async_client.get_cache_names()
107107
__assert_success_events(async_client)
108108

109109

@@ -112,16 +112,16 @@ def __assert_success_events(client):
112112
conn = client._nodes[0]
113113
for ev in events:
114114
if isinstance(ev, QueryStartEvent):
115-
assert ev.op_code == OP_CLUSTER_GET_STATE
116-
assert ev.op_name == 'OP_CLUSTER_GET_STATE'
115+
assert ev.op_code == OP_CACHE_GET_NAMES
116+
assert ev.op_name == 'OP_CACHE_GET_NAMES'
117117
assert ev.host == conn.host
118118
assert ev.port == conn.port
119119
assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
120120

121121
if isinstance(ev, QuerySuccessEvent):
122-
assert ev.op_code == OP_CLUSTER_GET_STATE
123-
assert ev.op_name == 'OP_CLUSTER_GET_STATE'
122+
assert ev.op_code == OP_CACHE_GET_NAMES
123+
assert ev.op_name == 'OP_CACHE_GET_NAMES'
124124
assert ev.host == conn.host
125125
assert ev.port == conn.port
126126
assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
127-
assert ev.duration > 0
127+
assert ev.duration >= 0

0 commit comments

Comments
 (0)