Skip to content

Commit 83a3681

Browse files
committed
feat: event and webhook on keylist update stored
Signed-off-by: Daniel Bluhm <dbluhm@pm.me>
1 parent db6b769 commit 83a3681

4 files changed

Lines changed: 95 additions & 64 deletions

File tree

aries_cloudagent/admin/server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
"acapy::actionmenu::received": "actionmenu",
5353
"acapy::actionmenu::get-active-menu": "get-active-menu",
5454
"acapy::actionmenu::perform-menu-action": "perform-menu-action",
55+
"acapy::keylist::updated": "keylist",
5556
}
5657

5758

aries_cloudagent/core/event_bus.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Optional,
1616
Pattern,
1717
TYPE_CHECKING,
18+
Tuple,
1819
)
1920
from functools import partial
2021

@@ -193,7 +194,7 @@ class MockEventBus(EventBus):
193194
def __init__(self):
194195
"""Initialize MockEventBus."""
195196
super().__init__()
196-
self.events = []
197+
self.events: List[Tuple[Profile, Event]] = []
197198

198199
async def notify(self, profile: "Profile", event: Event):
199200
"""Append the event to MockEventBus.events."""

aries_cloudagent/protocols/coordinate_mediation/v1_0/manager.py

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class MediationManager:
6060
SET_TO_DEFAULT_ON_GRANTED = "set_to_default_on_granted"
6161
METADATA_KEY = "mediation"
6262
METADATA_ID = "id"
63+
KEYLIST_UPDATED_EVENT = "acapy::keylist::updated"
6364

6465
def __init__(self, profile: Profile):
6566
"""Initialize Mediation Manager.
@@ -534,63 +535,74 @@ async def store_update_results(
534535
session: An active profile session
535536
536537
"""
537-
session = await self._profile.session()
538538
to_save: Sequence[RouteRecord] = []
539539
to_remove: Sequence[RouteRecord] = []
540-
for updated in results:
541-
if updated.result != KeylistUpdated.RESULT_SUCCESS:
542-
# TODO better handle different results?
543-
LOGGER.warning(
544-
"Keylist update failure: %s(%s): %s",
545-
updated.action,
546-
updated.recipient_key,
547-
updated.result,
548-
)
549-
continue
550-
if updated.action == KeylistUpdateRule.RULE_ADD:
551-
# Multi-tenancy uses route record for internal relaying of wallets
552-
# So the record could already exist. We update in that case
553-
try:
554-
record = await RouteRecord.retrieve_by_recipient_key(
555-
session, updated.recipient_key
556-
)
557-
record.connection_id = connection_id
558-
record.role = RouteRecord.ROLE_CLIENT
559-
except StorageNotFoundError:
560-
record = RouteRecord(
561-
role=RouteRecord.ROLE_CLIENT,
562-
recipient_key=updated.recipient_key,
563-
connection_id=connection_id,
564-
)
565-
to_save.append(record)
566-
elif updated.action == KeylistUpdateRule.RULE_REMOVE:
567-
try:
568-
records = await RouteRecord.query(
569-
session,
570-
{
571-
"role": RouteRecord.ROLE_CLIENT,
572-
"connection_id": connection_id,
573-
"recipient_key": updated.recipient_key,
574-
},
575-
)
576-
except StorageNotFoundError as err:
577-
LOGGER.error(
578-
"No route found while processing keylist update response: %s",
579-
err,
540+
541+
async with self._profile.session() as session:
542+
for updated in results:
543+
if updated.result != KeylistUpdated.RESULT_SUCCESS:
544+
# TODO better handle different results?
545+
LOGGER.warning(
546+
"Keylist update failure: %s(%s): %s",
547+
updated.action,
548+
updated.recipient_key,
549+
updated.result,
580550
)
581-
else:
582-
if len(records) > 1:
551+
continue
552+
if updated.action == KeylistUpdateRule.RULE_ADD:
553+
# Multi-tenancy uses route record for internal relaying of wallets
554+
# So the record could already exist. We update in that case
555+
try:
556+
record = await RouteRecord.retrieve_by_recipient_key(
557+
session, updated.recipient_key
558+
)
559+
record.connection_id = connection_id
560+
record.role = RouteRecord.ROLE_CLIENT
561+
except StorageNotFoundError:
562+
record = RouteRecord(
563+
role=RouteRecord.ROLE_CLIENT,
564+
recipient_key=updated.recipient_key,
565+
connection_id=connection_id,
566+
)
567+
to_save.append(record)
568+
elif updated.action == KeylistUpdateRule.RULE_REMOVE:
569+
try:
570+
records = await RouteRecord.query(
571+
session,
572+
{
573+
"role": RouteRecord.ROLE_CLIENT,
574+
"connection_id": connection_id,
575+
"recipient_key": updated.recipient_key,
576+
},
577+
)
578+
except StorageNotFoundError as err:
583579
LOGGER.error(
584-
f"Too many ({len(records)}) routes found "
585-
"while processing keylist update response"
580+
"No route found while processing keylist update response: %s",
581+
err,
586582
)
587-
record = records[0]
588-
to_remove.append(record)
589-
590-
for record_for_saving in to_save:
591-
await record_for_saving.save(session, reason="Route successfully added.")
592-
for record_for_removal in to_remove:
593-
await record_for_removal.delete_record(session)
583+
else:
584+
if len(records) > 1:
585+
LOGGER.error(
586+
f"Too many ({len(records)}) routes found "
587+
"while processing keylist update response"
588+
)
589+
record = records[0]
590+
to_remove.append(record)
591+
592+
for record_for_saving in to_save:
593+
await record_for_saving.save(
594+
session, reason="Route successfully added."
595+
)
596+
for record_for_removal in to_remove:
597+
await record_for_removal.delete_record(session)
598+
599+
await self._profile.notify(
600+
self.KEYLIST_UPDATED_EVENT,
601+
{
602+
"connection_id": connection_id,
603+
"updated": [update.serialize() for update in results],
604+
},
605+
)
594606

595607
async def get_my_keylist(
596608
self, connection_id: Optional[str] = None

aries_cloudagent/protocols/coordinate_mediation/v1_0/tests/test_mediation_manager.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
"""Test MediationManager."""
22
import logging
3+
from typing import AsyncIterable, Iterable
34

45
import pytest
56

67
from asynctest import mock as async_mock
78

89
from .....core.profile import Profile, ProfileSession
10+
from .....core.in_memory import InMemoryProfile
11+
from .....core.event_bus import EventBus, MockEventBus
912
from .....connections.models.conn_record import ConnRecord
1013
from .....messaging.request_context import RequestContext
1114
from .....storage.error import StorageNotFoundError
@@ -36,29 +39,32 @@
3639

3740

3841
@pytest.fixture
39-
async def profile() -> Profile:
42+
def profile() -> Iterable[Profile]:
4043
"""Fixture for profile used in tests."""
4144
# pylint: disable=W0621
42-
context = RequestContext.test_context()
43-
context.message_receipt = MessageReceipt(sender_verkey=TEST_VERKEY)
44-
context.connection_record = ConnRecord(connection_id=TEST_CONN_ID)
45-
yield context.profile
45+
yield InMemoryProfile.test_profile(bind={EventBus: MockEventBus()})
4646

4747

4848
@pytest.fixture
49-
async def session(profile) -> ProfileSession: # pylint: disable=W0621
49+
def mock_event_bus(profile: Profile):
50+
yield profile.inject(EventBus)
51+
52+
53+
@pytest.fixture
54+
async def session(profile) -> AsyncIterable[ProfileSession]: # pylint: disable=W0621
5055
"""Fixture for profile session used in tests."""
51-
yield await profile.session()
56+
async with profile.session() as session:
57+
yield session
5258

5359

5460
@pytest.fixture
55-
async def manager(profile) -> MediationManager: # pylint: disable=W0621
61+
def manager(profile) -> Iterable[MediationManager]: # pylint: disable=W0621
5662
"""Fixture for manager used in tests."""
5763
yield MediationManager(profile)
5864

5965

6066
@pytest.fixture
61-
def record() -> MediationRecord:
67+
def record() -> Iterable[MediationRecord]:
6268
"""Fixture for record used in tests."""
6369
yield MediationRecord(
6470
state=MediationRecord.STATE_GRANTED, connection_id=TEST_CONN_ID
@@ -71,7 +77,7 @@ class TestMediationManager: # pylint: disable=R0904,W0621
7177
async def test_create_manager_no_profile(self):
7278
"""test_create_manager_no_profile."""
7379
with pytest.raises(MediationManagerError):
74-
await MediationManager(None)
80+
MediationManager(None)
7581

7682
async def test_create_did(self, manager, session):
7783
"""test_create_did."""
@@ -363,7 +369,12 @@ async def test_add_remove_key_mix(self, manager):
363369
assert update.updates[0].recipient_key == TEST_VERKEY
364370
assert update.updates[1].recipient_key == TEST_ROUTE_VERKEY
365371

366-
async def test_store_update_results(self, session, manager):
372+
async def test_store_update_results(
373+
self,
374+
session: ProfileSession,
375+
manager: MediationManager,
376+
mock_event_bus: MockEventBus,
377+
):
367378
"""test_store_update_results."""
368379
await RouteRecord(
369380
role=RouteRecord.ROLE_CLIENT,
@@ -383,6 +394,12 @@ async def test_store_update_results(self, session, manager):
383394
),
384395
]
385396
await manager.store_update_results(TEST_CONN_ID, results)
397+
assert mock_event_bus.events
398+
assert mock_event_bus.events[0][1].topic == manager.KEYLIST_UPDATED_EVENT
399+
assert mock_event_bus.events[0][1].payload == {
400+
"connection_id": TEST_CONN_ID,
401+
"updated": [result.serialize() for result in results],
402+
}
386403
routes = await RouteRecord.query(session)
387404

388405
assert len(routes) == 1

0 commit comments

Comments
 (0)