From 89a4b9924ae1b96237bf9d8e42033fc696877426 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 19 May 2026 17:00:03 +0100 Subject: [PATCH] [fix][test] Add timeout to initial receives in ResendRequestTest.testSharedSingleAckedPartitionedTopic The test creates a partitioned topic with 3 partitions, produces 10 messages via RoundRobinPartition (so ~3-4 per partition), and subscribes two consumers with a Shared subscription and receiverQueueSize=7. Because the receive queue is larger than the per-partition message count, the broker can dispatch every message to whichever consumer connected first, leaving the other consumer's queue empty. The test then calls consumer1.receive() followed by consumer2.receive() with no timeout. If consumer2 got nothing, that second call blocks forever and the test times out: org.testng.internal.thread.ThreadTimeoutException: Method ... didn't finish within the time-out 60000 ... at GrowableArrayBlockingQueue.take(GrowableArrayBlockingQueue.java:200) at MultiTopicsConsumerImpl.internalReceive(MultiTopicsConsumerImpl.java:388) at ConsumerBase.receive(ConsumerBase.java:282) at ResendRequestTest.testSharedSingleAckedPartitionedTopic(ResendRequestTest.java:517) Use timeouts on the initial receives like the loop below does. The totalMessages assertion at the end of the receive loop still verifies that both consumers together got all 10 messages, so we don't lose coverage by allowing a null on the first receive. Example failure: https://scans.gradle.com/s/by57j4rphce62/tests/task/:pulsar-broker:test/details/org.apache.pulsar.broker.service.ResendRequestTest/testSharedSingleAckedPartitionedTopic/2/output --- .../apache/pulsar/broker/service/ResendRequestTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java index f290e707f772b..4292b9236c853 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java @@ -513,8 +513,11 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception { } // 4. Receive messages - Message message1 = consumer1.receive(); - Message message2 = consumer2.receive(); + // Use timeouts on the initial receives — with a Shared subscription the broker may + // dispatch all messages to a single consumer (receiverQueueSize is larger than the + // number of messages per partition), and a blocking receive() would hang. + Message message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS); + Message message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS); int messageCount1 = 0; int messageCount2 = 0; int ackCount1 = 0;