Skip to content

Commit 0feabce

Browse files
πŸ§‘β€πŸ’» chore(dx): add RestApi for read events and reset processors (#17)
1 parent c603afd commit 0feabce

9 files changed

Lines changed: 307 additions & 28 deletions

File tree

β€Žsrc/main/java/com/dddheroes/heroesofddd/creaturerecruitment/read/DwellingReadModelProjector.javaβ€Ž

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.dddheroes.heroesofddd.shared.GameMetaData;
77
import org.axonframework.config.ProcessingGroup;
88
import org.axonframework.eventhandling.EventHandler;
9+
import org.axonframework.eventhandling.ResetHandler;
910
import org.axonframework.messaging.annotation.MetaDataValue;
1011
import org.springframework.stereotype.Component;
1112

@@ -44,4 +45,9 @@ void on(CreatureRecruited event) {
4445
.map(state -> state.withAvailableCreaturesDecreasedBy(event.quantity()))
4546
.ifPresent(repository::save);
4647
}
48+
49+
@ResetHandler
50+
void onReset() {
51+
repository.deleteAll();
52+
}
4753
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.dddheroes.heroesofddd.maintenance.read.geteventstream;
2+
3+
import org.axonframework.eventhandling.DomainEventMessage;
4+
import org.axonframework.eventsourcing.eventstore.EventStore;
5+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
6+
import org.springframework.web.bind.annotation.CrossOrigin;
7+
import org.springframework.web.bind.annotation.GetMapping;
8+
import org.springframework.web.bind.annotation.PathVariable;
9+
import org.springframework.web.bind.annotation.RestController;
10+
11+
import java.util.List;
12+
13+
@ConditionalOnProperty(name = "application.maintenance.enabled", havingValue = "true")
14+
@RestController
15+
class EventStreamsRestApi {
16+
17+
private final EventStore eventStore;
18+
19+
EventStreamsRestApi(EventStore eventStore) {
20+
this.eventStore = eventStore;
21+
}
22+
23+
@CrossOrigin
24+
@GetMapping("/maintenance/event-store/streams/{streamId}/events")
25+
List<? extends DomainEventMessage<?>> readEvents(@PathVariable String streamId) {
26+
return eventStore.readEvents(streamId).asStream().toList();
27+
}
28+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.dddheroes.heroesofddd.maintenance.write.resetprocessor;
2+
3+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
4+
import org.springframework.web.bind.annotation.CrossOrigin;
5+
import org.springframework.web.bind.annotation.PathVariable;
6+
import org.springframework.web.bind.annotation.PostMapping;
7+
import org.springframework.web.bind.annotation.RestController;
8+
9+
@ConditionalOnProperty(name = "application.maintenance.enabled", havingValue = "true")
10+
@RestController
11+
class ResetStreamProcessorRestApi {
12+
13+
private final StreamProcessorsOperations streamProcessorsOperations;
14+
15+
ResetStreamProcessorRestApi(StreamProcessorsOperations streamProcessorsOperations) {
16+
this.streamProcessorsOperations = streamProcessorsOperations;
17+
}
18+
19+
@CrossOrigin
20+
@PostMapping("/maintenance/processors/{name}/resets")
21+
void resetProcessor(@PathVariable String name) {
22+
streamProcessorsOperations.reset(name);
23+
}
24+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.dddheroes.heroesofddd.maintenance.write.resetprocessor;
2+
3+
import org.axonframework.config.EventProcessingConfiguration;
4+
import org.axonframework.eventhandling.ReplayToken;
5+
import org.axonframework.eventhandling.TrackingEventProcessor;
6+
import org.axonframework.eventhandling.tokenstore.TokenStore;
7+
import org.springframework.stereotype.Component;
8+
import org.springframework.transaction.annotation.Transactional;
9+
10+
import java.math.BigDecimal;
11+
import java.math.RoundingMode;
12+
import java.util.Optional;
13+
import java.util.OptionalLong;
14+
import java.util.stream.IntStream;
15+
16+
@Component
17+
public class StreamProcessorsOperations {
18+
19+
private final EventProcessingConfiguration eventProcessingConfiguration;
20+
private final TokenStore tokenStore;
21+
22+
StreamProcessorsOperations(EventProcessingConfiguration eventProcessingConfiguration, TokenStore tokenStore) {
23+
this.eventProcessingConfiguration = eventProcessingConfiguration;
24+
this.tokenStore = tokenStore;
25+
}
26+
27+
public void reset(String processor) {
28+
eventProcessingConfiguration
29+
.eventProcessorByProcessingGroup(processor, TrackingEventProcessor.class)
30+
.ifPresent(eventProcessor -> {
31+
if (eventProcessor.supportsReset()) {
32+
eventProcessor.shutDown();
33+
eventProcessor.resetTokens();
34+
eventProcessor.start();
35+
}
36+
});
37+
}
38+
39+
@Transactional
40+
public Optional<Progress> progressOf(String processor) {
41+
var segments = tokenStore.fetchSegments(processor);
42+
43+
if (segments.length == 0) {
44+
return Optional.empty();
45+
} else {
46+
var accumulatedProgress = IntStream.of(segments).mapToObj(segment -> {
47+
var token = tokenStore.fetchToken(processor, segment);
48+
49+
var maybeCurrent = token.position();
50+
var maybePositionAtReset = token instanceof ReplayToken replayToken
51+
? replayToken.getTokenAtReset().position()
52+
: OptionalLong.empty();
53+
54+
return new Progress(maybeCurrent.orElse(0L), maybePositionAtReset.orElse(0L));
55+
}).reduce(new Progress(0, 0), (acc, progress) ->
56+
new Progress(acc.current + progress.current, acc.tail + progress.tail));
57+
58+
return (accumulatedProgress.tail == 0L) ? Optional.empty() : Optional.of(accumulatedProgress);
59+
}
60+
}
61+
62+
public record Progress(long current, long tail) {
63+
64+
public BigDecimal progress() {
65+
return BigDecimal.valueOf(current, 2)
66+
.divide(BigDecimal.valueOf(tail, 2), RoundingMode.HALF_UP);
67+
}
68+
}
69+
}

β€Žsrc/main/resources/application.yamlβ€Ž

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,7 @@ axon:
4343
sequencing-policy: gameIdSequencingPolicy
4444
Read_GetAllDwellings_QueryCache:
4545
mode: subscribing
46-
sequencing-policy: gameIdSequencingPolicy
46+
sequencing-policy: gameIdSequencingPolicy
47+
application:
48+
maintenance:
49+
enabled: true

β€Žsrc/test/java/com/dddheroes/heroesofddd/astrologers/automation/whenweekstartedthenproclaimweeksymbol/WhenWeekStartedThenProclaimWeekSymbolTest.javaβ€Ž

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.dddheroes.heroesofddd.calendar.write.CalendarEvent;
66
import com.dddheroes.heroesofddd.calendar.write.CalendarId;
77
import com.dddheroes.heroesofddd.calendar.write.startday.DayStarted;
8+
import com.dddheroes.heroesofddd.maintenance.write.resetprocessor.StreamProcessorsOperations;
89
import com.dddheroes.heroesofddd.shared.GameId;
910
import com.dddheroes.heroesofddd.shared.GameMetaData;
1011
import com.dddheroes.heroesofddd.shared.PlayerId;
@@ -34,6 +35,9 @@ class WhenWeekStartedThenProclaimWeekSymbolTest {
3435
@Autowired
3536
private EventGateway eventGateway;
3637

38+
@Autowired
39+
private StreamProcessorsOperations streamProcessorsOperations;
40+
3741
@MockitoSpyBean
3842
private CommandGateway commandGateway;
3943

@@ -56,6 +60,32 @@ void whenDayStartedForFirstDayOfTheWeek_ThenProclaimWeekSymbol() {
5660
);
5761
}
5862

63+
@Test
64+
void givenDisallowedReplay_WhenReplayed_ThenShouldNotResendTheCommand() {
65+
// given
66+
var gameId = UUID.randomUUID().toString();
67+
var calendarId = CalendarId.of(gameId);
68+
givenCalendarEvents(
69+
gameId,
70+
new DayStarted(calendarId.raw(), 1, 1, 1)
71+
);
72+
73+
// when
74+
// processed by the automation
75+
76+
// then
77+
awaitUntilAsserted(() -> verify(commandGateway, times(1))
78+
.sendAndWait(ProclaimWeekSymbol.command(gameId, 1, 1, "angel", any()), eq(gameMetaData()))
79+
);
80+
81+
// when
82+
streamProcessorsOperations.reset("Automation_WhenWeekStartedThenProclaimWeekSymbol_Processor");
83+
84+
// then
85+
verify(commandGateway, times(1))
86+
.sendAndWait(ProclaimWeekSymbol.command(gameId, 1, 1, "angel", any()), eq(gameMetaData()));
87+
}
88+
5989
private void givenCalendarEvents(String gameId, CalendarEvent... events) {
6090
for (int i = 0; i < events.length; i++) {
6191
eventGateway.publish(calendarDomainEvent(gameId, i, events[i]));
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.dddheroes.heroesofddd.creaturerecruitment.read;
2+
3+
import com.dddheroes.heroesofddd.creaturerecruitment.write.DwellingEvent;
4+
import com.dddheroes.heroesofddd.shared.GameId;
5+
import com.dddheroes.heroesofddd.shared.GameMetaData;
6+
import com.dddheroes.heroesofddd.shared.PlayerId;
7+
import com.dddheroes.heroesofddd.shared.ResourceType;
8+
import org.axonframework.eventhandling.DomainEventMessage;
9+
import org.axonframework.eventhandling.GenericDomainEventMessage;
10+
import org.axonframework.eventhandling.gateway.EventGateway;
11+
12+
import java.util.Map;
13+
14+
public abstract class DwellingReadModelTest {
15+
16+
protected final String GAME_ID = GameId.random().raw();
17+
protected static final String PLAYER_ID = PlayerId.random().raw();
18+
protected static final Map<String, Integer> PHOENIX_COST = Map.of(
19+
ResourceType.GOLD.name(), 2000,
20+
ResourceType.MERCURY.name(), 1
21+
);
22+
23+
protected EventGateway eventGateway;
24+
25+
protected DwellingReadModelTest(EventGateway eventGateway) {
26+
this.eventGateway = eventGateway;
27+
}
28+
29+
protected void givenDwellingEvents(String dwellingId, DwellingEvent... events) {
30+
for (int i = 0; i < events.length; i++) {
31+
eventGateway.publish(dwellingDomainEvent(dwellingId, i, events[i]));
32+
}
33+
}
34+
35+
protected DomainEventMessage<?> dwellingDomainEvent(String dwellingId, int sequenceNumber,
36+
DwellingEvent payload) {
37+
return new GenericDomainEventMessage<>(
38+
"Dwelling",
39+
dwellingId,
40+
sequenceNumber,
41+
payload
42+
).andMetaData(GameMetaData.with(GAME_ID, PLAYER_ID));
43+
}
44+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.dddheroes.heroesofddd.creaturerecruitment.read.getalldwellings;
2+
3+
import com.dddheroes.heroesofddd.TestcontainersConfiguration;
4+
import com.dddheroes.heroesofddd.creaturerecruitment.read.DwellingReadModelTest;
5+
import com.dddheroes.heroesofddd.creaturerecruitment.write.DwellingId;
6+
import com.dddheroes.heroesofddd.creaturerecruitment.write.builddwelling.DwellingBuilt;
7+
import com.dddheroes.heroesofddd.shared.CreatureIds;
8+
import org.axonframework.eventhandling.gateway.EventGateway;
9+
import org.axonframework.queryhandling.QueryGateway;
10+
import org.junit.jupiter.api.*;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.boot.test.context.SpringBootTest;
13+
import org.springframework.context.annotation.Import;
14+
15+
import static com.dddheroes.heroesofddd.utils.AwaitilityUtils.awaitUntilAsserted;
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
@Import(TestcontainersConfiguration.class)
19+
@SpringBootTest
20+
class GetAllDwellingsTest extends DwellingReadModelTest {
21+
22+
private final QueryGateway queryGateway;
23+
24+
@Autowired
25+
GetAllDwellingsTest(
26+
EventGateway eventGateway,
27+
QueryGateway queryGateway
28+
) {
29+
super(eventGateway);
30+
this.queryGateway = queryGateway;
31+
}
32+
33+
@Test
34+
void projectingDwellingReadModel_TestCase1() {
35+
// when
36+
var query = getAllDwellings();
37+
38+
// then
39+
awaitUntilAsserted(() -> {
40+
var result = getDwellingReadModel(query).dwellings();
41+
assertThat(result).isEmpty();
42+
});
43+
}
44+
45+
@Test
46+
void projectingDwellingReadModel_TestCase2() {
47+
// given
48+
var creatureId = CreatureIds.phoenix().raw();
49+
50+
var dwellingId1 = DwellingId.random().raw();
51+
givenDwellingEvents(
52+
dwellingId1,
53+
new DwellingBuilt(dwellingId1, creatureId, PHOENIX_COST)
54+
);
55+
var dwellingId2 = DwellingId.random().raw();
56+
givenDwellingEvents(
57+
dwellingId2,
58+
new DwellingBuilt(dwellingId2, creatureId, PHOENIX_COST)
59+
);
60+
61+
// when
62+
var query = getAllDwellings();
63+
64+
// then
65+
awaitUntilAsserted(() -> {
66+
var result = getDwellingReadModel(query).dwellings();
67+
assertThat(result).hasSize(2);
68+
assertThat(result).anySatisfy(dwelling -> {
69+
assertThat(dwelling.getDwellingId()).isEqualTo(dwellingId1);
70+
assertThat(dwelling.getCreatureId()).isEqualTo(creatureId);
71+
assertThat(dwelling.getCostPerTroop()).isEqualTo(PHOENIX_COST);
72+
assertThat(dwelling.getAvailableCreatures()).isEqualTo(0);
73+
});
74+
assertThat(result).anySatisfy(dwelling -> {
75+
assertThat(dwelling.getDwellingId()).isEqualTo(dwellingId2);
76+
assertThat(dwelling.getCreatureId()).isEqualTo(creatureId);
77+
assertThat(dwelling.getCostPerTroop()).isEqualTo(PHOENIX_COST);
78+
assertThat(dwelling.getAvailableCreatures()).isEqualTo(0);
79+
});
80+
});
81+
}
82+
83+
private GetAllDwellings getAllDwellings() {
84+
System.out.println("GET ALL DWELLINGS: " + GAME_ID);
85+
return GetAllDwellings.query(GAME_ID);
86+
}
87+
88+
private GetAllDwellings.Result getDwellingReadModel(GetAllDwellings query) {
89+
return queryGateway.query(query, GetAllDwellings.Result.class).join();
90+
}
91+
}

0 commit comments

Comments
Β (0)