Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Commit 4643d47

Browse files
committed
test: add concurrent checks
* group together fixtures for backend testing * Add per-backend setup to allow slow initialization. This is specially important as kafka subscribe can return while the consumer is not ready. See related test setup upstream: - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/test_consumer.py#L433 - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/_testutil.py#L376 - https://github.com/aio-libs/aiokafka/blob/2c54e10c57760f779961a8c2f5df8ad609ef6983/tests/_testutil.py#L364 fixes #42
1 parent 435c35e commit 4643d47

3 files changed

Lines changed: 107 additions & 40 deletions

File tree

tests/conftest.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Check for #22"""
2+
3+
import asyncio
4+
import functools
5+
6+
import pytest_asyncio
7+
8+
from broadcaster import Broadcast
9+
from broadcaster._backends.kafka import KafkaBackend
10+
11+
12+
async def __has_topic_now(client, topic):
13+
if await client.force_metadata_update():
14+
if topic in client.cluster.topics():
15+
print(f'Topic "{topic}" exists')
16+
return True
17+
return False
18+
19+
20+
async def __wait_has_topic(client, topic, *, timeout_sec=5):
21+
poll_time_sec = 1 / 10000
22+
from datetime import datetime
23+
24+
pre = datetime.now()
25+
while True:
26+
if (datetime.now() - pre).total_seconds() >= timeout_sec:
27+
raise ValueError(f'No topic "{topic}" exists')
28+
if await __has_topic_now(client, topic):
29+
return
30+
await asyncio.sleep(poll_time_sec)
31+
32+
33+
def kafka_backend_setup(kafka_backend):
34+
"""Block until consumer client contains the topic"""
35+
subscribe_impl = kafka_backend.subscribe
36+
37+
@functools.wraps(subscribe_impl)
38+
async def subscribe(channel: str) -> None:
39+
await subscribe_impl(channel)
40+
await __wait_has_topic(kafka_backend._consumer._client, channel)
41+
42+
kafka_backend.subscribe = subscribe
43+
44+
45+
BROADCASTS_SETUP = {
46+
KafkaBackend: kafka_backend_setup,
47+
}
48+
49+
50+
@pytest_asyncio.fixture(scope="function")
51+
async def setup_broadcast(request):
52+
url = request.param
53+
async with Broadcast(url) as broadcast:
54+
backend = broadcast._backend
55+
for klass, setup in BROADCASTS_SETUP.items():
56+
if isinstance(backend, klass):
57+
setup(backend)
58+
break
59+
yield broadcast

tests/test_broadcast.py

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,23 @@
1-
import pytest
2-
3-
from broadcaster import Broadcast
4-
5-
6-
@pytest.mark.asyncio
7-
async def test_memory():
8-
async with Broadcast("memory://") as broadcast:
9-
async with broadcast.subscribe("chatroom") as subscriber:
10-
await broadcast.publish("chatroom", "hello")
11-
event = await subscriber.get()
12-
assert event.channel == "chatroom"
13-
assert event.message == "hello"
14-
15-
16-
@pytest.mark.asyncio
17-
async def test_redis():
18-
async with Broadcast("redis://localhost:6379") as broadcast:
19-
async with broadcast.subscribe("chatroom") as subscriber:
20-
await broadcast.publish("chatroom", "hello")
21-
event = await subscriber.get()
22-
assert event.channel == "chatroom"
23-
assert event.message == "hello"
1+
from uuid import uuid4
242

3+
import pytest
254

26-
@pytest.mark.asyncio
27-
async def test_postgres():
28-
async with Broadcast(
29-
"postgres://postgres:postgres@localhost:5432/broadcaster"
30-
) as broadcast:
31-
async with broadcast.subscribe("chatroom") as subscriber:
32-
await broadcast.publish("chatroom", "hello")
33-
event = await subscriber.get()
34-
assert event.channel == "chatroom"
35-
assert event.message == "hello"
5+
URLS = [
6+
("memory://",),
7+
("redis://localhost:6379",),
8+
("postgres://postgres:postgres@localhost:5432/broadcaster",),
9+
("kafka://localhost:9092",),
10+
]
3611

3712

3813
@pytest.mark.asyncio
39-
async def test_kafka():
40-
async with Broadcast("kafka://localhost:9092") as broadcast:
41-
async with broadcast.subscribe("chatroom") as subscriber:
42-
await broadcast.publish("chatroom", "hello")
43-
event = await subscriber.get()
44-
assert event.channel == "chatroom"
45-
assert event.message == "hello"
14+
@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True)
15+
async def test_broadcast(setup_broadcast):
16+
uid = uuid4()
17+
channel = f"chatroom-{uid}"
18+
msg = f"hello {uid}"
19+
async with setup_broadcast.subscribe(channel) as subscriber:
20+
await setup_broadcast.publish(channel, msg)
21+
event = await subscriber.get()
22+
assert event.channel == channel
23+
assert event.message == msg

tests/test_concurrent.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Check for #22"""
2+
import asyncio
3+
from uuid import uuid4
4+
5+
import pytest
6+
7+
MESSAGES = ["hello", "goodbye"]
8+
9+
URLS = [
10+
("memory://",),
11+
("redis://localhost:6379",),
12+
("postgres://postgres:postgres@localhost:5432/broadcaster",),
13+
("kafka://localhost:9092",),
14+
]
15+
16+
17+
@pytest.mark.asyncio
18+
@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True)
19+
async def test_broadcast(setup_broadcast):
20+
uid = uuid4()
21+
channel = f"chatroom-{uid}"
22+
msgs = [f"{msg} {uid}" for msg in MESSAGES]
23+
async with setup_broadcast.subscribe(channel) as subscriber:
24+
to_publish = [setup_broadcast.publish(channel, msg) for msg in msgs]
25+
26+
await asyncio.gather(*to_publish)
27+
for msg in msgs:
28+
event = await subscriber.get()
29+
assert event.channel == channel
30+
assert event.message == msg

0 commit comments

Comments
 (0)