11import asyncio
2- import typing
32from contextlib import asynccontextmanager
3+ from typing import Any , AsyncGenerator , AsyncIterator , Dict , Optional
44from urllib .parse import urlparse
55
66
77class Event :
8- def __init__ (self , channel , message ) :
8+ def __init__ (self , channel : str , message : str ) -> None :
99 self .channel = channel
1010 self .message = message
1111
12- def __eq__ (self , other ) :
12+ def __eq__ (self , other : object ) -> bool :
1313 return (
1414 isinstance (other , Event )
1515 and self .channel == other .channel
1616 and self .message == other .message
1717 )
1818
19- def __repr__ (self ):
20- return f' Event(channel={ self .channel !r} , message={ self .message !r} )'
19+ def __repr__ (self ) -> str :
20+ return f" Event(channel={ self .channel !r} , message={ self .message !r} )"
2121
2222
2323class Unsubscribed (Exception ):
@@ -26,29 +26,36 @@ class Unsubscribed(Exception):
2626
2727class Broadcast :
2828 def __init__ (self , url : str ):
29+ from broadcaster ._backends .base import BroadcastBackend
30+
2931 parsed_url = urlparse (url )
30- self ._subscribers = {}
31- if parsed_url .scheme == 'redis' :
32- from ._backends .redis import RedisBackend
32+ self ._backend : BroadcastBackend
33+ self ._subscribers : Dict [str , Any ] = {}
34+ if parsed_url .scheme == "redis" :
35+ from broadcaster ._backends .redis import RedisBackend
36+
3337 self ._backend = RedisBackend (url )
3438
35- elif parsed_url .scheme in ('postgres' , 'postgresql' ):
36- from ._backends .postgres import PostgresBackend
39+ elif parsed_url .scheme in ("postgres" , "postgresql" ):
40+ from broadcaster ._backends .postgres import PostgresBackend
41+
3742 self ._backend = PostgresBackend (url )
3843
39- if parsed_url .scheme == 'kafka' :
40- from ._backends .kafka import KafkaBackend
44+ if parsed_url .scheme == "kafka" :
45+ from broadcaster ._backends .kafka import KafkaBackend
46+
4147 self ._backend = KafkaBackend (url )
4248
43- elif parsed_url .scheme == 'memory' :
44- from ._backends .memory import MemoryBackend
49+ elif parsed_url .scheme == "memory" :
50+ from broadcaster ._backends .memory import MemoryBackend
51+
4552 self ._backend = MemoryBackend (url )
4653
47- async def __aenter__ (self ) -> ' Broadcast' :
54+ async def __aenter__ (self ) -> " Broadcast" :
4855 await self .connect ()
4956 return self
5057
51- async def __aexit__ (self , * args , ** kwargs ) -> None :
58+ async def __aexit__ (self , * args : Any , ** kwargs : Any ) -> None :
5259 await self .disconnect ()
5360
5461 async def connect (self ) -> None :
@@ -68,11 +75,11 @@ async def _listener(self) -> None:
6875 for queue in list (self ._subscribers .get (event .channel , [])):
6976 await queue .put (event )
7077
71- async def publish (self , channel : str , message : typing . Any ) -> None :
78+ async def publish (self , channel : str , message : Any ) -> None :
7279 await self ._backend .publish (channel , message )
7380
7481 @asynccontextmanager
75- async def subscribe (self , channel : str ) -> ' Subscriber' :
82+ async def subscribe (self , channel : str ) -> AsyncIterator [ " Subscriber" ] :
7683 queue : asyncio .Queue = asyncio .Queue ()
7784
7885 try :
@@ -93,10 +100,10 @@ async def subscribe(self, channel: str) -> 'Subscriber':
93100
94101
95102class Subscriber :
96- def __init__ (self , queue ) :
103+ def __init__ (self , queue : asyncio . Queue ) -> None :
97104 self ._queue = queue
98105
99- async def __aiter__ (self ):
106+ async def __aiter__ (self ) -> Optional [ AsyncGenerator ] :
100107 try :
101108 while True :
102109 yield await self .get ()
0 commit comments