Skip to content

Commit 0e80e6e

Browse files
authored
Merge pull request openwallet-foundation#1659 from shaangill025/cleanup_redis
Redis PQ Cleanup
2 parents db6b769 + 0a1cb4b commit 0e80e6e

22 files changed

Lines changed: 287 additions & 625 deletions

OutboundQueue.md

Lines changed: 0 additions & 53 deletions
This file was deleted.

aries_cloudagent/config/argparse.py

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,22 +1169,6 @@ def add_arguments(self, parser: ArgumentParser):
11691169
"Supported outbound transport types are 'http' and 'ws'."
11701170
),
11711171
)
1172-
parser.add_argument(
1173-
"-oq",
1174-
"--outbound-queue",
1175-
dest="outbound_queue",
1176-
type=str,
1177-
env_var="ACAPY_OUTBOUND_TRANSPORT_QUEUE",
1178-
help=(
1179-
"Defines the location of the Outbound Queue Engine. This must be "
1180-
"a 'dotpath' to a Python module on the PYTHONPATH, followed by a "
1181-
"colon, followed by the name of a Python class that implements "
1182-
"BaseOutboundQueue. This commandline option is the official entry "
1183-
"point of ACA-py's pluggable queue interface. The default value is: "
1184-
"'aries_cloudagent.transport.outbound.queue.redis:RedisOutboundQueue'."
1185-
""
1186-
),
1187-
)
11881172
parser.add_argument(
11891173
"-l",
11901174
"--label",
@@ -1264,20 +1248,10 @@ def get_settings(self, args: Namespace):
12641248
settings["transport.inbound_configs"] = args.inbound_transports
12651249
else:
12661250
raise ArgsParseError("-it/--inbound-transport is required")
1267-
if not args.outbound_transports and not args.outbound_queue:
1268-
raise ArgsParseError(
1269-
"-ot/--outbound-transport or -oq/--outbound-queue is required"
1270-
)
1271-
if args.outbound_transports and args.outbound_queue:
1272-
raise ArgsParseError(
1273-
"-ot/--outbound-transport and -oq/--outbound-queue are "
1274-
"not allowed together"
1275-
)
12761251
if args.outbound_transports:
12771252
settings["transport.outbound_configs"] = args.outbound_transports
1278-
if args.outbound_queue:
1279-
settings["transport.outbound_queue"] = args.outbound_queue
1280-
1253+
else:
1254+
raise ArgsParseError("-ot/--outbound-transport is required")
12811255
settings["transport.enable_undelivered_queue"] = args.enable_undelivered_queue
12821256

12831257
if args.label:

aries_cloudagent/config/logging.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ def print_banner(
8383
agent_label,
8484
inbound_transports,
8585
outbound_transports,
86-
outbound_queue,
8786
public_did,
8887
admin_server=None,
8988
banner_length=40,
@@ -96,7 +95,6 @@ def print_banner(
9695
agent_label: Agent Label
9796
inbound_transports: Configured inbound transports
9897
outbound_transports: Configured outbound transports
99-
outbound_queue: The outbound queue engine instance
10098
admin_server: Admin server info
10199
public_did: Public DID
102100
banner_length: (Default value = 40) Length of the banner
@@ -124,6 +122,18 @@ def print_banner(
124122
)
125123
banner.print_spacer()
126124

125+
external_in_transports = set().union(
126+
*(
127+
transport
128+
for transport in inbound_transports.values()
129+
if transport.is_external
130+
)
131+
)
132+
if external_in_transports:
133+
banner.print_spacer()
134+
banner.print_list([f"{external_in_transports}"])
135+
banner.print_spacer()
136+
127137
# Outbound transports
128138
schemes = set().union(
129139
*(transport.schemes for transport in outbound_transports.values())
@@ -134,15 +144,16 @@ def print_banner(
134144
banner.print_list([f"{scheme}" for scheme in sorted(schemes)])
135145
banner.print_spacer()
136146

137-
# Outbound queue
138-
if outbound_queue:
139-
banner.print_subtitle("Outbound Queue")
140-
banner.print_spacer()
141-
banner.print_list(
142-
[
143-
f"{outbound_queue}",
144-
]
147+
external_out_transports = set().union(
148+
*(
149+
transport
150+
for transport in outbound_transports.values()
151+
if transport.is_external
145152
)
153+
)
154+
if external_out_transports:
155+
banner.print_spacer()
156+
banner.print_list([f"{external_out_transports}"])
146157
banner.print_spacer()
147158

148159
# DID info

aries_cloudagent/config/tests/test_argparse.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -144,27 +144,6 @@ async def test_outbound_is_required(self):
144144
with self.assertRaises(argparse.ArgsParseError):
145145
settings = group.get_settings(result)
146146

147-
async def test_outbound_queue(self):
148-
"""Test outbound queue class path string."""
149-
parser = argparse.create_argument_parser()
150-
group = argparse.TransportGroup()
151-
group.add_arguments(parser)
152-
153-
result = parser.parse_args(
154-
[
155-
"--inbound-transport",
156-
"http",
157-
"0.0.0.0",
158-
"80",
159-
"--outbound-queue",
160-
"my_queue.mod.path",
161-
]
162-
)
163-
164-
settings = group.get_settings(result)
165-
166-
assert settings.get("transport.outbound_queue") == "my_queue.mod.path"
167-
168147
async def test_general_settings_file(self):
169148
"""Test file argument parsing."""
170149

aries_cloudagent/config/tests/test_logging.py

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,38 +66,15 @@ def test_banner_did(self):
6666
test_label,
6767
{"in": mock_http},
6868
{"out": mock_https},
69-
None,
7069
test_did,
7170
mock_admin_server,
7271
)
7372
test_module.LoggingConfigurator.print_banner(
74-
test_label, {"in": mock_http}, {"out": mock_https}, None, test_did
73+
test_label, {"in": mock_http}, {"out": mock_https}, test_did
7574
)
7675
output = stdout.getvalue()
7776
assert test_did in output
7877

79-
def test_banner_outbound_queue(self):
80-
stdout = StringIO()
81-
mock_http = async_mock.MagicMock(scheme="http", host="1.2.3.4", port=8081)
82-
mock_queue = "mocked queue text"
83-
mock_admin_server = async_mock.MagicMock(host="1.2.3.4", port=8091)
84-
with contextlib.redirect_stdout(stdout):
85-
test_label = "Aries Cloud Agent"
86-
test_did = "55GkHamhTU1ZbTbV2ab9DE"
87-
test_module.LoggingConfigurator.print_banner(
88-
test_label,
89-
{"in": mock_http},
90-
{},
91-
mock_queue,
92-
test_did,
93-
mock_admin_server,
94-
)
95-
test_module.LoggingConfigurator.print_banner(
96-
test_label, {"in": mock_http}, {}, mock_queue, test_did
97-
)
98-
output = stdout.getvalue()
99-
assert "mocked queue text" in output
100-
10178
def test_load_resource(self):
10279
with async_mock.patch("builtins.open", async_mock.MagicMock()) as mock_open:
10380
test_module.load_resource("abc", encoding="utf-8")

aries_cloudagent/core/conductor.py

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@
5656
from ..transport.outbound.base import OutboundDeliveryError
5757
from ..transport.outbound.manager import OutboundTransportManager, QueuedOutboundMessage
5858
from ..transport.outbound.message import OutboundMessage
59-
from ..transport.outbound.queue.base import BaseOutboundQueue
60-
from ..transport.outbound.queue.loader import get_outbound_queue
6159
from ..transport.outbound.status import OutboundSendStatus
6260
from ..transport.wire_format import BaseWireFormat
6361
from ..utils.stats import Collector
@@ -97,7 +95,6 @@ def __init__(self, context_builder: ContextBuilder) -> None:
9795
self.outbound_transport_manager: OutboundTransportManager = None
9896
self.root_profile: Profile = None
9997
self.setup_public_did: DIDInfo = None
100-
self.outbound_queue: BaseOutboundQueue = None
10198

10299
@property
103100
def context(self) -> InjectionContext:
@@ -211,8 +208,6 @@ async def setup(self):
211208
DocumentLoader, DocumentLoader(self.root_profile)
212209
)
213210

214-
self.outbound_queue = get_outbound_queue(self.root_profile)
215-
216211
# Admin API
217212
if context.settings.get("admin.enabled"):
218213
try:
@@ -273,13 +268,6 @@ async def start(self) -> None:
273268
LOGGER.exception("Unable to start outbound transports")
274269
raise
275270

276-
if self.outbound_queue:
277-
try:
278-
await self.outbound_queue.start()
279-
except Exception:
280-
LOGGER.exception("Unable to start outbound queue")
281-
raise
282-
283271
# Start up Admin server
284272
if self.admin_server:
285273
try:
@@ -303,7 +291,6 @@ async def start(self) -> None:
303291
default_label,
304292
self.inbound_transport_manager.registered_transports,
305293
self.outbound_transport_manager.registered_transports,
306-
self.outbound_queue,
307294
self.setup_public_did and self.setup_public_did.did,
308295
self.admin_server,
309296
)
@@ -489,8 +476,6 @@ async def stop(self, timeout=1.0):
489476
shutdown.run(self.inbound_transport_manager.stop())
490477
if self.outbound_transport_manager:
491478
shutdown.run(self.outbound_transport_manager.stop())
492-
if self.outbound_queue:
493-
shutdown.run(self.outbound_queue.stop())
494479

495480
# close multitenant profiles
496481
multitenant_mgr = self.context.inject_or(BaseMultitenantManager)
@@ -669,44 +654,14 @@ async def queue_outbound(
669654
self.admin_server.notify_fatal_error()
670655
raise
671656
del conn_mgr
672-
# If ``self.outbound_queue`` is specified (usually set via
673-
# outbound queue `-oq` commandline option), use that external
674-
# queue. Else save the message to an internal queue. This
675-
# internal queue usually results in the message to be sent over
676-
# ACA-py `-ot` transport.
677-
if self.outbound_queue:
678-
return await self._queue_external(profile, outbound)
679-
else:
680-
return self._queue_internal(profile, outbound)
681-
682-
async def _queue_external(
683-
self,
684-
profile: Profile,
685-
outbound: OutboundMessage,
686-
) -> OutboundSendStatus:
687-
"""Save the message to an external outbound queue."""
688-
async with self.outbound_queue:
689-
targets = (
690-
[outbound.target] if outbound.target else (outbound.target_list or [])
691-
)
692-
for target in targets:
693-
encoded_outbound_message = (
694-
await self.outbound_transport_manager.encode_outbound_message(
695-
profile, outbound, target
696-
)
697-
)
698-
await self.outbound_queue.enqueue_message(
699-
encoded_outbound_message.payload, target.endpoint
700-
)
701-
702-
return OutboundSendStatus.SENT_TO_EXTERNAL_QUEUE
657+
return await self._queue_message(profile, outbound)
703658

704-
def _queue_internal(
659+
async def _queue_message(
705660
self, profile: Profile, outbound: OutboundMessage
706661
) -> OutboundSendStatus:
707662
"""Save the message to an internal outbound queue."""
708663
try:
709-
self.outbound_transport_manager.enqueue_message(profile, outbound)
664+
await self.outbound_transport_manager.enqueue_message(profile, outbound)
710665
return OutboundSendStatus.QUEUED_FOR_DELIVERY
711666
except OutboundDeliveryError:
712667
LOGGER.warning("Cannot queue message for delivery, no supported transport")

0 commit comments

Comments
 (0)