Skip to content

Commit d95809c

Browse files
authored
Merge branch 'main' into frosty/shutdown-hang-fix
2 parents 9ec8330 + e62d5ba commit d95809c

16 files changed

Lines changed: 620 additions & 44 deletions

aries_cloudagent/core/dispatcher.py

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import os
1111
import warnings
1212

13-
from typing import Callable, Coroutine, Union
13+
from typing import Callable, Coroutine, Optional, Union, Tuple
1414
import weakref
1515

1616
from aiohttp.web import HTTPException
@@ -36,6 +36,13 @@
3636

3737
from .error import ProtocolMinorVersionNotSupported
3838
from .protocol_registry import ProtocolRegistry
39+
from .util import (
40+
get_version_from_message_type,
41+
validate_get_response_version,
42+
# WARNING_DEGRADED_FEATURES,
43+
# WARNING_VERSION_MISMATCH,
44+
# WARNING_VERSION_NOT_SUPPORTED,
45+
)
3946

4047
LOGGER = logging.getLogger(__name__)
4148

@@ -133,16 +140,22 @@ async def handle_message(
133140
inbound_message: The inbound message instance
134141
send_outbound: Async function to send outbound messages
135142
143+
# Raises:
144+
# MessageParseError: If the message type version is not supported
145+
136146
Returns:
137147
The response from the handler
138148
139149
"""
140150
r_time = get_timer()
141151

142152
error_result = None
153+
version_warning = None
143154
message = None
144155
try:
145-
message = await self.make_message(inbound_message.payload)
156+
(message, warning) = await self.make_message(
157+
profile, inbound_message.payload
158+
)
146159
except ProblemReportParseError:
147160
pass # avoid problem report recursion
148161
except MessageParseError as e:
@@ -155,6 +168,47 @@ async def handle_message(
155168
)
156169
if inbound_message.receipt.thread_id:
157170
error_result.assign_thread_id(inbound_message.receipt.thread_id)
171+
# if warning:
172+
# warning_message_type = inbound_message.payload.get("@type")
173+
# if warning == WARNING_DEGRADED_FEATURES:
174+
# LOGGER.error(
175+
# f"Sending {WARNING_DEGRADED_FEATURES} problem report, "
176+
# "message type received with a minor version at or higher"
177+
# " than protocol minimum supported and current minor version "
178+
# f"for message_type {warning_message_type}"
179+
# )
180+
# version_warning = ProblemReport(
181+
# description={
182+
# "en": (
183+
# "message type received with a minor version at or "
184+
# "higher than protocol minimum supported and current"
185+
# f" minor version for message_type {warning_message_type}"
186+
# ),
187+
# "code": WARNING_DEGRADED_FEATURES,
188+
# }
189+
# )
190+
# elif warning == WARNING_VERSION_MISMATCH:
191+
# LOGGER.error(
192+
# f"Sending {WARNING_VERSION_MISMATCH} problem report, message "
193+
# "type received with a minor version higher than current minor "
194+
# f"version for message_type {warning_message_type}"
195+
# )
196+
# version_warning = ProblemReport(
197+
# description={
198+
# "en": (
199+
# "message type received with a minor version higher"
200+
# " than current minor version for message_type"
201+
# f" {warning_message_type}"
202+
# ),
203+
# "code": WARNING_VERSION_MISMATCH,
204+
# }
205+
# )
206+
# elif warning == WARNING_VERSION_NOT_SUPPORTED:
207+
# raise MessageParseError(
208+
# f"Message type version not supported for {warning_message_type}"
209+
# )
210+
# if version_warning and inbound_message.receipt.thread_id:
211+
# version_warning.assign_thread_id(inbound_message.receipt.thread_id)
158212

159213
trace_event(
160214
self.profile.settings,
@@ -199,6 +253,8 @@ async def handle_message(
199253

200254
if error_result:
201255
await responder.send_reply(error_result)
256+
elif version_warning:
257+
await responder.send_reply(version_warning)
202258
elif context.message:
203259
context.injector.bind_instance(BaseResponder, responder)
204260

@@ -215,7 +271,9 @@ async def handle_message(
215271
perf_counter=r_time,
216272
)
217273

218-
async def make_message(self, parsed_msg: dict) -> BaseMessage:
274+
async def make_message(
275+
self, profile: Profile, parsed_msg: dict
276+
) -> Tuple[BaseMessage, Optional[str]]:
219277
"""
220278
Deserialize a message dict into the appropriate message instance.
221279
@@ -224,6 +282,7 @@ async def make_message(self, parsed_msg: dict) -> BaseMessage:
224282
225283
Args:
226284
parsed_msg: The parsed message
285+
profile: Profile
227286
228287
Returns:
229288
An instance of the corresponding message class for this message
@@ -237,6 +296,7 @@ async def make_message(self, parsed_msg: dict) -> BaseMessage:
237296
if not isinstance(parsed_msg, dict):
238297
raise MessageParseError("Expected a JSON object")
239298
message_type = parsed_msg.get("@type")
299+
message_type_rec_version = get_version_from_message_type(message_type)
240300

241301
if not message_type:
242302
raise MessageParseError("Message does not contain '@type' parameter")
@@ -256,8 +316,10 @@ async def make_message(self, parsed_msg: dict) -> BaseMessage:
256316
if "/problem-report" in message_type:
257317
raise ProblemReportParseError("Error parsing problem report message")
258318
raise MessageParseError(f"Error deserializing message: {e}") from e
259-
260-
return instance
319+
_, warning = await validate_get_response_version(
320+
profile, message_type_rec_version, message_cls
321+
)
322+
return (instance, warning)
261323

262324
async def complete(self, timeout: float = 0.1):
263325
"""Wait for pending tasks to complete."""

aries_cloudagent/core/protocol_registry.py

Lines changed: 82 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44

5+
from string import Template
56
from typing import Mapping, Sequence
67

78
from ..config.injection_context import InjectionContext
@@ -74,6 +75,73 @@ def parse_type_string(self, message_type):
7475
"minor_version": int(version_string_tokens[1]),
7576
}
7677

78+
def create_msg_types_for_minor_version(self, typesets, version_definition):
79+
"""
80+
Return mapping of message type to module path for minor versions.
81+
82+
Args:
83+
typesets: Mappings of message types to register
84+
version_definition: Optional version definition dict
85+
86+
Returns:
87+
Typesets mapping
88+
89+
"""
90+
updated_typeset = {}
91+
curr_minor_version = version_definition["current_minor_version"]
92+
min_minor_version = version_definition["minimum_minor_version"]
93+
major_version = version_definition["major_version"]
94+
if curr_minor_version >= min_minor_version and curr_minor_version >= 1:
95+
for version_index in range(min_minor_version, curr_minor_version + 1):
96+
to_check = f"{str(major_version)}.{str(version_index)}"
97+
updated_typeset.update(
98+
self._get_updated_tyoeset_dict(typesets, to_check, updated_typeset)
99+
)
100+
return (updated_typeset,)
101+
102+
def _get_updated_tyoeset_dict(self, typesets, to_check, updated_typeset) -> dict:
103+
for typeset in typesets:
104+
for msg_type_string, module_path in typeset.items():
105+
updated_msg_type_string = Template(msg_type_string).substitute(
106+
version=to_check
107+
)
108+
updated_typeset[updated_msg_type_string] = module_path
109+
return updated_typeset
110+
111+
def _template_message_type_check(self, typeset) -> bool:
112+
for msg_type_string, _ in typeset.items():
113+
if "$version" in msg_type_string:
114+
return True
115+
return False
116+
117+
def _create_and_register_updated_typesets(self, typesets, version_definition):
118+
updated_typesets = self.create_msg_types_for_minor_version(
119+
typesets, version_definition
120+
)
121+
update_flag = False
122+
for typeset in updated_typesets:
123+
if typeset:
124+
self._typemap.update(typeset)
125+
update_flag = True
126+
if update_flag:
127+
return updated_typesets
128+
else:
129+
return None
130+
131+
def _update_version_map(self, message_type_string, module_path, version_definition):
132+
parsed_type_string = self.parse_type_string(message_type_string)
133+
134+
if version_definition["major_version"] not in self._versionmap:
135+
self._versionmap[version_definition["major_version"]] = []
136+
137+
self._versionmap[version_definition["major_version"]].append(
138+
{
139+
"parsed_type_string": parsed_type_string,
140+
"version_definition": version_definition,
141+
"message_module": module_path,
142+
}
143+
)
144+
77145
def register_message_types(self, *typesets, version_definition=None):
78146
"""
79147
Add new supported message types.
@@ -85,24 +153,26 @@ def register_message_types(self, *typesets, version_definition=None):
85153
"""
86154

87155
# Maintain support for versionless protocol modules
156+
template_msg_type_version = True
157+
updated_typesets = None
88158
for typeset in typesets:
89-
self._typemap.update(typeset)
159+
if not self._template_message_type_check(typeset):
160+
self._typemap.update(typeset)
161+
template_msg_type_version = False
90162

91163
# Track versioned modules for version routing
92164
if version_definition:
165+
# create updated typesets for minor versions and register them
166+
if template_msg_type_version:
167+
updated_typesets = self._create_and_register_updated_typesets(
168+
typesets, version_definition
169+
)
170+
if updated_typesets:
171+
typesets = updated_typesets
93172
for typeset in typesets:
94173
for message_type_string, module_path in typeset.items():
95-
parsed_type_string = self.parse_type_string(message_type_string)
96-
97-
if version_definition["major_version"] not in self._versionmap:
98-
self._versionmap[version_definition["major_version"]] = []
99-
100-
self._versionmap[version_definition["major_version"]].append(
101-
{
102-
"parsed_type_string": parsed_type_string,
103-
"version_definition": version_definition,
104-
"message_module": module_path,
105-
}
174+
self._update_version_map(
175+
message_type_string, module_path, version_definition
106176
)
107177

108178
def register_controllers(self, *controller_sets, version_definition=None):

0 commit comments

Comments
 (0)