Skip to content

Commit 8e9fb08

Browse files
google-genai-botcopybara-github
authored andcommitted
refactor: Use concatMap for sequential event persistence in Runner
Ensure sequential event processing and persistence in ADK Runner. This ensures that events are appended in order and returned from runAsync in order. This aligns better with the Python implementation. PiperOrigin-RevId: 886961696
1 parent 3633a7d commit 8e9fb08

2 files changed

Lines changed: 43 additions & 1 deletion

File tree

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ private Flowable<Event> runAgentWithFreshSession(
529529
contextWithUpdatedSession
530530
.agent()
531531
.runAsync(contextWithUpdatedSession)
532-
.flatMap(
532+
.concatMap(
533533
agentEvent ->
534534
this.sessionService
535535
.appendEvent(updatedSession, agentEvent)

core/src/test/java/com/google/adk/runner/RunnerTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
import static com.google.common.truth.Truth.assertThat;
2727
import static java.nio.charset.StandardCharsets.UTF_8;
2828
import static java.util.Arrays.stream;
29+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2930
import static org.mockito.ArgumentMatchers.any;
3031
import static org.mockito.Mockito.CALLS_REAL_METHODS;
3132
import static org.mockito.Mockito.mock;
3233
import static org.mockito.Mockito.never;
3334
import static org.mockito.Mockito.verify;
3435
import static org.mockito.Mockito.when;
3536

37+
import com.google.adk.agents.BaseAgent;
3638
import com.google.adk.agents.InvocationContext;
3739
import com.google.adk.agents.LiveRequestQueue;
3840
import com.google.adk.agents.LlmAgent;
@@ -43,6 +45,7 @@
4345
import com.google.adk.flows.llmflows.Functions;
4446
import com.google.adk.models.LlmResponse;
4547
import com.google.adk.plugins.BasePlugin;
48+
import com.google.adk.sessions.BaseSessionService;
4649
import com.google.adk.sessions.Session;
4750
import com.google.adk.sessions.SessionKey;
4851
import com.google.adk.summarizer.EventsCompactionConfig;
@@ -851,6 +854,45 @@ public void beforeRunCallback_withStateDelta_seesMergedState() {
851854
assertThat(sessionInCallback.state()).containsEntry("number", 123);
852855
}
853856

857+
@Test
858+
public void runAsync_ensureEventsAreAppendedInOrder() throws Exception {
859+
Event event1 = TestUtils.createEvent("1");
860+
Event event2 = TestUtils.createEvent("2");
861+
BaseAgent mockAgent = TestUtils.createSubAgent("test agent", event1, event2);
862+
863+
BaseSessionService mockSessionService = mock(BaseSessionService.class);
864+
865+
when(mockSessionService.getSession(any(), any(), any(), any())).thenReturn(Maybe.just(session));
866+
when(mockSessionService.appendEvent(any(), any()))
867+
.thenAnswer(
868+
invocation -> {
869+
Event eventArg = invocation.getArgument(1);
870+
Single<Event> result = Single.just(eventArg);
871+
if (eventArg.id().equals("1")) {
872+
// Artificially delay the first event to ensure it is appended first.
873+
return result.delay(100, MILLISECONDS);
874+
}
875+
return result;
876+
});
877+
878+
Runner mockRunner =
879+
Runner.builder()
880+
.agent(mockAgent)
881+
.appName("test")
882+
.sessionService(mockSessionService)
883+
.build();
884+
885+
List<Event> results =
886+
mockRunner
887+
.runAsync("user", session.id(), createContent("user message"))
888+
.toList()
889+
.blockingGet();
890+
891+
assertThat(simplifyEvents(results))
892+
.containsExactly("author: content for event 1", "author: content for event 2")
893+
.inOrder();
894+
}
895+
854896
private Content createContent(String text) {
855897
return Content.builder().parts(Part.builder().text(text).build()).build();
856898
}

0 commit comments

Comments
 (0)