Skip to content

Commit 75c057e

Browse files
authored
Merge pull request openwallet-foundation#2009 from ianco/mediator-testing
Fix for mediator load testing race condition when scaling horizontally
2 parents f857f8c + ab6c64e commit 75c057e

1 file changed

Lines changed: 33 additions & 14 deletions

File tree

aries_cloudagent/protocols/routing/v1_0/manager.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Routing manager classes for tracking and inspecting routing records."""
22

3+
import asyncio
4+
import logging
35
from typing import Coroutine, Sequence
46

57
from ....core.error import BaseError
@@ -16,6 +18,12 @@
1618
from .models.route_updated import RouteUpdated
1719

1820

21+
LOGGER = logging.getLogger(__name__)
22+
23+
RECIP_ROUTE_PAUSE = 0.1
24+
RECIP_ROUTE_RETRY = 10
25+
26+
1927
class RoutingManagerError(BaseError):
2028
"""Generic routing error."""
2129

@@ -54,21 +62,30 @@ async def get_recipient(self, recip_verkey: str) -> RouteRecord:
5462
if not recip_verkey:
5563
raise RoutingManagerError("Must pass non-empty recip_verkey")
5664

57-
try:
58-
async with self._profile.session() as session:
59-
record = await RouteRecord.retrieve_by_recipient_key(
60-
session, recip_verkey
65+
i = 0
66+
record = None
67+
while not record:
68+
try:
69+
LOGGER.info(">>> fetching routing record for verkey: " + recip_verkey)
70+
async with self._profile.session() as session:
71+
record = await RouteRecord.retrieve_by_recipient_key(
72+
session, recip_verkey
73+
)
74+
LOGGER.info(">>> FOUND routing record for verkey: " + recip_verkey)
75+
return record
76+
except StorageDuplicateError:
77+
LOGGER.info(">>> DUPLICATE routing record for verkey: " + recip_verkey)
78+
raise RouteNotFoundError(
79+
f"More than one route record found with recipient key: {recip_verkey}"
6180
)
62-
except StorageDuplicateError:
63-
raise RouteNotFoundError(
64-
f"More than one route record found with recipient key: {recip_verkey}"
65-
)
66-
except StorageNotFoundError:
67-
raise RouteNotFoundError(
68-
f"No route found with recipient key: {recip_verkey}"
69-
)
70-
71-
return record
81+
except StorageNotFoundError:
82+
LOGGER.info(">>> NOT FOUND routing record for verkey: " + recip_verkey)
83+
i += 1
84+
if i > RECIP_ROUTE_RETRY:
85+
raise RouteNotFoundError(
86+
f"No route found with recipient key: {recip_verkey}"
87+
)
88+
await asyncio.sleep(RECIP_ROUTE_PAUSE)
7289

7390
async def get_routes(
7491
self, client_connection_id: str = None, tag_filter: dict = None
@@ -136,13 +153,15 @@ async def create_route_record(
136153
)
137154
if not recipient_key:
138155
raise RoutingManagerError("Missing recipient_key")
156+
LOGGER.info(">>> creating routing record for verkey: " + recipient_key)
139157
route = RouteRecord(
140158
connection_id=client_connection_id,
141159
wallet_id=internal_wallet_id,
142160
recipient_key=recipient_key,
143161
)
144162
async with self._profile.session() as session:
145163
await route.save(session, reason="Created new route")
164+
LOGGER.info(">>> CREATED routing record for verkey: " + recipient_key)
146165
return route
147166

148167
async def update_routes(

0 commit comments

Comments
 (0)