3636import com .google .genai .types .FunctionCall ;
3737import com .google .genai .types .FunctionResponse ;
3838import com .google .genai .types .Part ;
39- import java .util .HashMap ;
39+ import java .util .ArrayList ;
40+ import java .util .ConcurrentModificationException ;
41+ import java .util .Iterator ;
4042import java .util .List ;
43+ import java .util .ListIterator ;
4144import java .util .Map ;
4245import java .util .Objects ;
4346import java .util .Optional ;
44- import java .util .concurrent .CountDownLatch ;
45- import java .util .concurrent .atomic .AtomicReference ;
47+ import java .util .stream .Stream ;
4648import org .junit .Test ;
4749import org .junit .runner .RunWith ;
4850import org .junit .runners .JUnit4 ;
@@ -787,15 +789,49 @@ public void processRequest_notEmptyContent() {
787789 public void processRequest_concurrentReadAndWrite_noException () throws Exception {
788790 LlmAgent agent =
789791 LlmAgent .builder ().name (AGENT ).includeContents (LlmAgent .IncludeContents .DEFAULT ).build ();
792+ List <Event > customEvents =
793+ new ArrayList <Event >() {
794+ private void checkLock () {
795+ if (!Thread .holdsLock (this )) {
796+ throw new ConcurrentModificationException ("Unsynchronized iteration detected!" );
797+ }
798+ }
799+
800+ @ Override
801+ public Iterator <Event > iterator () {
802+ checkLock ();
803+ return super .iterator ();
804+ }
805+
806+ @ Override
807+ public ListIterator <Event > listIterator () {
808+ checkLock ();
809+ return super .listIterator ();
810+ }
811+
812+ @ Override
813+ public ListIterator <Event > listIterator (int index ) {
814+ checkLock ();
815+ return super .listIterator (index );
816+ }
817+
818+ @ Override
819+ public Stream <Event > stream () {
820+ checkLock ();
821+ return super .stream ();
822+ }
823+ };
824+
790825 Session session =
791- sessionService
792- .createSession ("test-app" , "test-user" , new HashMap <>(), "test-session" )
793- .blockingGet ();
826+ Session .builder ("test-session" )
827+ .appName ("test-app" )
828+ .userId ("test-user" )
829+ .events (customEvents )
830+ .build ();
794831
795- // Seed with dummy events to widen the race capability
796- for (int i = 0 ; i < 5000 ; i ++) {
797- session .events ().add (createUserEvent ("dummy" + i , "dummy" ));
798- }
832+ // The list must have at least one element so that operations interacting with events trigger
833+ // iteration.
834+ customEvents .add (createUserEvent ("dummy" , "dummy" ));
799835
800836 InvocationContext context =
801837 InvocationContext .builder ()
@@ -807,37 +843,8 @@ public void processRequest_concurrentReadAndWrite_noException() throws Exception
807843
808844 LlmRequest initialRequest = LlmRequest .builder ().build ();
809845
810- AtomicReference <Throwable > writerError = new AtomicReference <>();
811- CountDownLatch startLatch = new CountDownLatch (1 );
812-
813- Thread writerThread =
814- new Thread (
815- () -> {
816- startLatch .countDown ();
817- try {
818- for (int i = 0 ; i < 2000 ; i ++) {
819- session .events ().add (createUserEvent ("writer" + i , "new data" ));
820- }
821- } catch (Throwable t ) {
822- writerError .set (t );
823- }
824- });
825-
826- writerThread .start ();
827- startLatch .await (); // wait for writer to be ready
828-
829- // Process (read) requests concurrently to trigger race conditions
830- for (int i = 0 ; i < 200 ; i ++) {
831- var unused = contentsProcessor .processRequest (context , initialRequest ).blockingGet ();
832- if (writerError .get () != null ) {
833- throw new RuntimeException ("Writer failed" , writerError .get ());
834- }
835- }
836-
837- writerThread .join ();
838- if (writerError .get () != null ) {
839- throw new RuntimeException ("Writer failed" , writerError .get ());
840- }
846+ // This single call will throw the exception if the list is accessed insecurely.
847+ var unused = contentsProcessor .processRequest (context , initialRequest ).blockingGet ();
841848 }
842849
843850 private static Event createUserEvent (String id , String text ) {
0 commit comments