@@ -231,31 +231,34 @@ void shouldNotLogWarningWhenNoOverflow() {
231231 void shouldBeThreadSafeForDifferentKeys () throws InterruptedException {
232232 int threadCount = 10 ;
233233 int eventsPerThread = 100 ;
234+ KeyBuffer <String , String > largeBuffer = new KeyBuffer <>(10000 , String ::length );
234235 ExecutorService executor = Executors .newFixedThreadPool (threadCount );
235- CountDownLatch latch = new CountDownLatch (threadCount );
236-
237- // Each thread works with different key
238- for (int i = 0 ; i < threadCount ; i ++) {
239- final String key = "key" + i ;
240- executor .submit (() -> {
241- try {
242- for (int j = 0 ; j < eventsPerThread ; j ++) {
243- buffer .add (key , "event" + j );
236+ try {
237+ CountDownLatch latch = new CountDownLatch (threadCount );
238+
239+ // Each thread works with different key
240+ for (int i = 0 ; i < threadCount ; i ++) {
241+ final String key = "key" + i ;
242+ executor .submit (() -> {
243+ try {
244+ for (int j = 0 ; j < eventsPerThread ; j ++) {
245+ largeBuffer .add (key , "event" + j );
246+ }
247+ } finally {
248+ latch .countDown ();
244249 }
245- } finally {
246- latch .countDown ();
247- }
248- });
249- }
250+ });
251+ }
250252
251- try {
252253 assertThat (latch .await (5 , TimeUnit .SECONDS )).isTrue ();
253254
254255 // Verify each key has its events
255256 for (int i = 0 ; i < threadCount ; i ++) {
256257 String key = "key" + i ;
257- Deque <String > events = buffer .removeAll (key );
258- assertThat (events ).isNotNull ().isNotEmpty ();
258+ Deque <String > events = largeBuffer .removeAll (key );
259+ assertThat (events )
260+ .isNotNull ()
261+ .hasSize (eventsPerThread );
259262 }
260263 } finally {
261264 executor .shutdown ();
@@ -266,28 +269,31 @@ void shouldBeThreadSafeForDifferentKeys() throws InterruptedException {
266269 void shouldBeThreadSafeForSameKey () throws InterruptedException {
267270 int threadCount = 5 ;
268271 int eventsPerThread = 20 ;
272+ KeyBuffer <String , String > largeBuffer = new KeyBuffer <>(10000 , String ::length );
269273 ExecutorService executor = Executors .newFixedThreadPool (threadCount );
270- CountDownLatch latch = new CountDownLatch (threadCount );
271-
272- // All threads work with same key
273- for (int i = 0 ; i < threadCount ; i ++) {
274- final int threadId = i ;
275- executor .submit (() -> {
276- try {
277- for (int j = 0 ; j < eventsPerThread ; j ++) {
278- buffer .add ("sharedKey" , "t" + threadId + "e" + j );
274+ try {
275+ CountDownLatch latch = new CountDownLatch (threadCount );
276+
277+ // All threads work with same key
278+ for (int i = 0 ; i < threadCount ; i ++) {
279+ final int threadId = i ;
280+ executor .submit (() -> {
281+ try {
282+ for (int j = 0 ; j < eventsPerThread ; j ++) {
283+ largeBuffer .add ("sharedKey" , "t" + threadId + "e" + j );
284+ }
285+ } finally {
286+ latch .countDown ();
279287 }
280- } finally {
281- latch .countDown ();
282- }
283- });
284- }
288+ });
289+ }
285290
286- try {
287291 assertThat (latch .await (5 , TimeUnit .SECONDS )).isTrue ();
288292
289- Deque <String > events = buffer .removeAll ("sharedKey" );
290- assertThat (events ).isNotNull ().isNotEmpty ();
293+ Deque <String > events = largeBuffer .removeAll ("sharedKey" );
294+ assertThat (events )
295+ .isNotNull ()
296+ .hasSize (threadCount * eventsPerThread );
291297 } finally {
292298 executor .shutdown ();
293299 }
0 commit comments