Skip to content

Commit 8dead15

Browse files
authored
GG-32605 [IGNITE-13967] Optimizations and refactoring of parsing (#20)
(cherry picked from commit 2ead7b9)
1 parent 86fb5c5 commit 8dead15

32 files changed

Lines changed: 785 additions & 896 deletions

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
.vscode
33
.eggs
44
.pytest_cache
5+
.tox
6+
tests/config/*.xml
7+
junit*.xml
58
pygridgain.egg-info
69
ignite-log-*
7-
__pycache__
10+
__pycache__

pygridgain/api/affinity.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@
5555
partition_mapping = StructArray([
5656
('is_applicable', Bool),
5757

58-
('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
59-
lambda ctx: ctx['is_applicable'] is True,
58+
('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
59+
lambda ctx: ctx['is_applicable'],
6060
cache_mapping, empty_cache_mapping)),
6161

62-
('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
63-
lambda ctx: ctx['is_applicable'] is True,
62+
('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
63+
lambda ctx: ctx['is_applicable'],
6464
node_mapping, empty_node_mapping)),
6565
])
6666

pygridgain/api/binary.py

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,15 @@
2424
from pygridgain.queries.op_codes import *
2525
from pygridgain.utils import int_overflow, entity_id
2626
from .result import APIResult
27+
from ..stream import BinaryStream, READ_BACKWARD
2728
from ..queries.response import Response
2829

2930

30-
def get_binary_type(
31-
connection: 'Connection', binary_type: Union[str, int], query_id=None,
32-
) -> APIResult:
31+
def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
3332
"""
3433
Gets the binary type information by type ID.
3534
36-
:param connection: connection to GridGain server,
35+
:param conn: connection to GridGain server,
3736
:param binary_type: binary type name or ID,
3837
:param query_id: (optional) a value generated by client and returned as-is
3938
in response.query_id. When the parameter is omitted, a random value
@@ -49,39 +48,42 @@ def get_binary_type(
4948
query_id=query_id,
5049
)
5150

52-
_, send_buffer = query_struct.from_python({
53-
'type_id': entity_id(binary_type),
54-
})
55-
connection.send(send_buffer)
51+
with BinaryStream(conn) as stream:
52+
query_struct.from_python(stream, {
53+
'type_id': entity_id(binary_type),
54+
})
55+
conn.send(stream.getbuffer())
5656

57-
response_head_struct = Response(protocol_version=connection.get_protocol_version(),
57+
response_head_struct = Response(protocol_version=conn.get_protocol_version(),
5858
following=[('type_exists', Bool)])
5959

60-
response_head_type, recv_buffer = response_head_struct.parse(connection)
61-
response_head = response_head_type.from_buffer_copy(recv_buffer)
62-
response_parts = []
63-
if response_head.type_exists:
64-
resp_body_type, resp_body_buffer = body_struct.parse(connection)
65-
response_parts.append(('body', resp_body_type))
66-
resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)
67-
recv_buffer += resp_body_buffer
68-
if resp_body.is_enum:
69-
resp_enum, resp_enum_buffer = enum_struct.parse(connection)
70-
response_parts.append(('enums', resp_enum))
71-
recv_buffer += resp_enum_buffer
72-
resp_schema_type, resp_schema_buffer = schema_struct.parse(connection)
73-
response_parts.append(('schema', resp_schema_type))
74-
recv_buffer += resp_schema_buffer
75-
76-
response_class = type(
77-
'GetBinaryTypeResponse',
78-
(response_head_type,),
79-
{
80-
'_pack_': 1,
81-
'_fields_': response_parts,
82-
}
83-
)
84-
response = response_class.from_buffer_copy(recv_buffer)
60+
with BinaryStream(conn, conn.recv()) as stream:
61+
init_pos = stream.tell()
62+
response_head_type = response_head_struct.parse(stream)
63+
response_head = stream.read_ctype(response_head_type, direction=READ_BACKWARD)
64+
65+
response_parts = []
66+
if response_head.type_exists:
67+
resp_body_type = body_struct.parse(stream)
68+
response_parts.append(('body', resp_body_type))
69+
resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD)
70+
if resp_body.is_enum:
71+
resp_enum = enum_struct.parse(stream)
72+
response_parts.append(('enums', resp_enum))
73+
74+
resp_schema_type = schema_struct.parse(stream)
75+
response_parts.append(('schema', resp_schema_type))
76+
77+
response_class = type(
78+
'GetBinaryTypeResponse',
79+
(response_head_type,),
80+
{
81+
'_pack_': 1,
82+
'_fields_': response_parts,
83+
}
84+
)
85+
response = stream.read_ctype(response_class, position=init_pos)
86+
8587
result = APIResult(response)
8688
if result.status != 0:
8789
return result

pygridgain/binary.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,17 @@ def __new__(
102102
mcs, name, (GenericObjectProps, )+base_classes, namespace
103103
)
104104

105-
def _build(self, client: 'Client' = None) -> int:
105+
def _from_python(self, stream, save_to_buf=False):
106106
"""
107107
Method for building binary representation of the Generic object
108108
and calculating a hashcode from it.
109109
110110
:param self: Generic object instance,
111-
:param client: (optional) connection to GridGain cluster,
111+
:param stream: BinaryStream
112+
:param save_to_buf: Optional. If True, save serialized data to buffer.
112113
"""
113-
if client is None:
114-
compact_footer = True
115-
else:
116-
compact_footer = client.compact_footer
114+
115+
compact_footer = stream.compact_footer
117116

118117
# prepare header
119118
header_class = BinaryObject.build_header()
@@ -129,18 +128,19 @@ def _build(self, client: 'Client' = None) -> int:
129128
header.type_id = self.type_id
130129
header.schema_id = self.schema_id
131130

131+
header_len = ctypes.sizeof(header_class)
132+
initial_pos = stream.tell()
133+
132134
# create fields and calculate offsets
133135
offsets = [ctypes.sizeof(header_class)]
134-
field_buffer = bytearray()
135136
schema_items = list(self.schema.items())
137+
138+
stream.seek(initial_pos + header_len)
136139
for field_name, field_type in schema_items:
137-
partial_buffer = field_type.from_python(
138-
getattr(
139-
self, field_name, getattr(field_type, 'default', None)
140-
)
141-
)
142-
offsets.append(max(offsets) + len(partial_buffer))
143-
field_buffer += partial_buffer
140+
val = getattr(self, field_name, getattr(field_type, 'default', None))
141+
field_start_pos = stream.tell()
142+
field_type.from_python(stream, val)
143+
offsets.append(max(offsets) + stream.tell() - field_start_pos)
144144

145145
offsets = offsets[:-1]
146146

@@ -160,15 +160,18 @@ def _build(self, client: 'Client' = None) -> int:
160160
schema[i].offset = offset
161161

162162
# calculate size and hash code
163-
header.schema_offset = (
164-
ctypes.sizeof(header_class)
165-
+ len(field_buffer)
166-
)
163+
fields_data_len = stream.tell() - initial_pos - header_len
164+
header.schema_offset = fields_data_len + header_len
167165
header.length = header.schema_offset + ctypes.sizeof(schema_class)
168-
header.hash_code = hashcode(field_buffer)
166+
header.hash_code = stream.hashcode(initial_pos + header_len, fields_data_len)
167+
168+
stream.seek(initial_pos)
169+
stream.write(header)
170+
stream.seek(initial_pos + header.schema_offset)
171+
stream.write(schema)
169172

170-
# reuse the results
171-
self._buffer = bytes(header) + field_buffer + bytes(schema)
173+
if save_to_buf:
174+
self._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
172175
self._hashcode = header.hash_code
173176

174177
def _setattr(self, attr_name: str, attr_value: Any):
@@ -180,7 +183,7 @@ def _setattr(self, attr_name: str, attr_value: Any):
180183
# `super()` is really need these parameters
181184
super(result, self).__setattr__(attr_name, attr_value)
182185

183-
setattr(result, _build.__name__, _build)
186+
setattr(result, _from_python.__name__, _from_python)
184187
setattr(result, '__setattr__', _setattr)
185188
setattr(result, '_buffer', None)
186189
setattr(result, '_hashcode', None)

pygridgain/cache.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from typing import Any, Dict, Iterable, Optional, Tuple, Union
1818

1919
from .constants import *
20-
from .binary import GenericObjectMeta
20+
from .binary import GenericObjectMeta, unwrap_binary
2121
from .datatypes import prop_codes
2222
from .datatypes.internal import AnyDataObject
2323
from .exceptions import (
@@ -26,7 +26,7 @@
2626
)
2727
from .utils import (
2828
cache_id, get_field_by_id, is_wrapped,
29-
status_to_exception, unsigned, unwrap_binary,
29+
status_to_exception, unsigned
3030
)
3131
from .api.cache_config import (
3232
cache_create, cache_create_with_config,

0 commit comments

Comments
 (0)