Skip to content

Commit 0fe85b3

Browse files
authored
fix(test): batch of fixes for weekly only tests (#1759)
* fix(test): batch of fixes for weekly only tests * fix(test): compilation issues * fix(test): rework and fine tune fixes * fix(test): keep improving some tests and remove a parallel execution * fix(test): keep improving some tests and remove a parallel execution * fix(test): attempt to stabilize * fix(test): small tweaks for stability * fix(test): small tuning for flaky tests under load * fix(test): use the filter and not the name
1 parent d69ae1f commit 0fe85b3

41 files changed

Lines changed: 2471 additions & 2039 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class MemoryUsageConcurrencyTest {
4040
@Test
4141
public void testCycle() throws Exception {
4242
final Random r = new Random(0xb4a14);
43-
for (int i = 0; i < 30000; i++) {
43+
for (int i = 0; i < 3000; i++) {
4444
checkPercentage(i, i, r.nextInt(100) + 10, i % 2 == 0, i % 5 == 0);
4545
}
4646
}

activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1701,7 +1701,7 @@ public void testReceiveMessageSentWhileOffline() throws Exception {
17011701
// ConsumerInfo asynchronously, so messages may not be ready for dispatch yet.
17021702
assertTrue("Subscription should become active in run " + (j + 1),
17031703
Wait.waitFor(() -> isSubscriptionActive(topics[0], mqttSub.getClientId().toString()),
1704-
TimeUnit.SECONDS.toMillis(30), 100));
1704+
TimeUnit.SECONDS.toMillis(60), 100));
17051705

17061706
for (int i = 0; i < messagesPerRun; ++i) {
17071707
Message message = connectionSub.receive(5, TimeUnit.SECONDS);

activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java

Lines changed: 85 additions & 141 deletions
Large diffs are not rendered by default.

activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -338,17 +338,17 @@ public void testBrokerZeroPrefetchConfig() throws Exception {
338338
final MessageProducer producer = session.createProducer(brokerZeroQueue);
339339
producer.send(session.createTextMessage("Msg1"));
340340
// now lets receive it
341-
final MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
341+
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue);
342342

343-
// Wait for broker subscription to be created and policy applied (same as testBrokerZeroPrefetchConfigWithConsumerControl)
343+
// Wait for broker subscription to be created, policy applied, and ConsumerControl
344+
// propagated back to the client (the broker sends a ConsumerControl to override
345+
// the prefetch to 0, but the client processes it asynchronously)
344346
final ActiveMQDestination transformedDest = ActiveMQDestination.transform(brokerZeroQueue);
345-
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
346-
@Override
347-
public boolean isSatisified() throws Exception {
348-
return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
349-
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
350-
}
351-
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
347+
org.apache.activemq.util.Wait.waitFor(() ->
348+
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
349+
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
350+
&& consumer.info.getCurrentPrefetchSize() == 0
351+
, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
352352

353353
final TextMessage answer = (TextMessage)consumer.receive(TimeUnit.SECONDS.toMillis(5));
354354
assertNotNull("Consumer should have read a message", answer);
@@ -358,22 +358,19 @@ public boolean isSatisified() throws Exception {
358358
// https://issues.apache.org/jira/browse/AMQ-4234
359359
// https://issues.apache.org/jira/browse/AMQ-4235
360360
public void testBrokerZeroPrefetchConfigWithConsumerControl() throws Exception {
361-
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
361+
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
362362

363363
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue);
364364

365365
// Wait for broker subscription to be created, policy applied, and ConsumerControl
366366
// propagated back to the client (the broker sends a ConsumerControl to override
367367
// the prefetch to 0, but the client processes it asynchronously)
368368
final ActiveMQDestination transformedDest = ActiveMQDestination.transform(brokerZeroQueue);
369-
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
370-
@Override
371-
public boolean isSatisified() throws Exception {
372-
return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
373-
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
374-
&& consumer.info.getCurrentPrefetchSize() == 0;
375-
}
376-
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
369+
org.apache.activemq.util.Wait.waitFor(() ->
370+
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
371+
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
372+
&& consumer.info.getCurrentPrefetchSize() == 0
373+
, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
377374

378375
assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
379376

@@ -383,32 +380,29 @@ public boolean isSatisified() throws Exception {
383380
assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
384381

385382
// manipulate Prefetch (like failover and stomp)
386-
ConsumerControl consumerControl = new ConsumerControl();
383+
final ConsumerControl consumerControl = new ConsumerControl();
387384
consumerControl.setConsumerId(consumer.info.getConsumerId());
388385
consumerControl.setDestination(transformedDest);
389386
consumerControl.setPrefetch(1000); // default for a q
390387

391-
Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
388+
final Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
392389
assertTrue("good request", !(reply instanceof ExceptionResponse));
393390

394391
// Wait for the ConsumerControl to be processed - broker policy should override back to 0
395-
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
396-
@Override
397-
public boolean isSatisified() throws Exception {
398-
return consumer.info.getCurrentPrefetchSize() == 0
399-
&& sub.getConsumerInfo().getCurrentPrefetchSize() == 0;
400-
}
401-
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
392+
org.apache.activemq.util.Wait.waitFor(() ->
393+
consumer.info.getCurrentPrefetchSize() == 0
394+
&& sub.getConsumerInfo().getCurrentPrefetchSize() == 0
395+
, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
402396

403397
assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
404398
assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
405399
}
406400

407401
@Override
408402
protected BrokerService createBroker() throws Exception {
409-
BrokerService brokerService = super.createBroker();
410-
PolicyMap policyMap = new PolicyMap();
411-
PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
403+
final BrokerService brokerService = super.createBroker();
404+
final PolicyMap policyMap = new PolicyMap();
405+
final PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
412406
zeroPrefetchPolicy.setQueuePrefetch(0);
413407
policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy);
414408
brokerService.setDestinationPolicy(policyMap);

activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public void testMBeanPresenceOnRestart() throws Exception {
8888
private void restartBroker() throws Exception {
8989
broker.stop();
9090
broker.waitUntilStopped();
91-
Thread.sleep(5 * 1000);
9291
createBroker(false);
9392
broker.waitUntilStarted();
9493
}

activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616
*/
1717
package org.apache.activemq.broker.advisory;
1818

19-
import org.apache.activemq.broker.BrokerFactory;
2019
import org.apache.activemq.broker.BrokerService;
21-
22-
import java.net.URI;
20+
import org.apache.activemq.command.ActiveMQQueue;
21+
import org.apache.activemq.command.ActiveMQTopic;
22+
import org.apache.activemq.network.NetworkConnector;
2323

2424
public class AdvisoryDuplexNetworkBridgeTest extends AdvisoryNetworkBridgeTest {
2525

2626
@Override
2727
public void createBroker1() throws Exception {
2828
broker1 = new BrokerService();
2929
broker1.setBrokerName("broker1");
30-
broker1.addConnector("tcp://localhost:61617");
30+
broker1.addConnector("tcp://localhost:0");
3131
broker1.setUseJmx(false);
3232
broker1.setPersistent(false);
3333
broker1.start();
@@ -36,12 +36,28 @@ public void createBroker1() throws Exception {
3636

3737
@Override
3838
public void createBroker2() throws Exception {
39-
broker2 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/duplexLocalBroker.xml"));
39+
// Programmatic equivalent of duplexLocalBroker.xml with ephemeral port
40+
broker2 = new BrokerService();
41+
broker2.setBrokerName("localBroker");
42+
broker2.setPersistent(true);
43+
broker2.setUseShutdownHook(false);
44+
broker2.setUseJmx(false);
45+
broker2.addConnector("tcp://localhost:0");
46+
47+
final String broker1Uri = broker1.getTransportConnectors().get(0).getConnectUri().toString();
48+
final NetworkConnector nc = broker2.addNetworkConnector("static:(" + broker1Uri + ")");
49+
nc.setDuplex(true);
50+
nc.setDynamicOnly(false);
51+
nc.setConduitSubscriptions(true);
52+
nc.setDecreaseNetworkConsumerPriority(false);
53+
nc.getExcludedDestinations().add(new ActiveMQQueue("exclude.test.foo"));
54+
nc.getExcludedDestinations().add(new ActiveMQTopic("exclude.test.bar"));
55+
4056
broker2.start();
4157
broker2.waitUntilStarted();
4258
}
4359

44-
public void assertCreatedByDuplex(boolean createdByDuplex) {
60+
public void assertCreatedByDuplex(final boolean createdByDuplex) {
4561
assertTrue(createdByDuplex);
4662
}
4763
}

activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.activemq.broker.BrokerService;
2727
import org.apache.activemq.command.ActiveMQMessage;
2828
import org.apache.activemq.command.BrokerInfo;
29+
import org.apache.activemq.util.Wait;
2930

3031
import java.net.URI;
3132

@@ -38,26 +39,24 @@ public class AdvisoryNetworkBridgeTest extends TestCase {
3839
public void testAdvisory() throws Exception {
3940
createBroker1();
4041

41-
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
42-
Connection conn = factory.createConnection();
43-
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
42+
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
43+
final Connection conn = factory.createConnection();
44+
final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
4445
conn.start();
45-
MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
46-
47-
Thread.sleep(1000);
46+
final MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
4847

4948
createBroker2();
50-
51-
ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
49+
50+
ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(5000);
5251
assertNotNull(advisory);
5352
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
5453
assertTrue(advisory.getBooleanProperty("started"));
5554
assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex"));
56-
55+
5756
broker2.stop();
5857
broker2.waitUntilStopped();
5958

60-
advisory = (ActiveMQMessage)consumer.receive(2000);
59+
advisory = (ActiveMQMessage)consumer.receive(5000);
6160
assertNotNull(advisory);
6261
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
6362
assertFalse(advisory.getBooleanProperty("started"));
@@ -70,15 +69,13 @@ public void testAddConsumerLater() throws Exception {
7069

7170
createBroker2();
7271

73-
Thread.sleep(1000);
74-
75-
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
76-
Connection conn = factory.createConnection();
77-
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
72+
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
73+
final Connection conn = factory.createConnection();
74+
final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
7875
conn.start();
7976
MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
8077

81-
ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
78+
ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(5000);
8279
assertNotNull(advisory);
8380
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
8481
assertTrue(advisory.getBooleanProperty("started"));
@@ -87,7 +84,7 @@ public void testAddConsumerLater() throws Exception {
8784
broker2.stop();
8885
broker2.waitUntilStopped();
8986

90-
advisory = (ActiveMQMessage)consumer.receive(2000);
87+
advisory = (ActiveMQMessage)consumer.receive(5000);
9188
assertNotNull(advisory);
9289
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
9390
assertFalse(advisory.getBooleanProperty("started"));

0 commit comments

Comments
 (0)