Skip to content

Commit cce3dca

Browse files
authored
IGNITE-27926 Implement Raft group destruction in Segstore (#7958)
1 parent b90e6b4 commit cce3dca

5 files changed

Lines changed: 184 additions & 5 deletions

File tree

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static java.lang.Math.toIntExact;
2121
import static java.nio.file.StandardOpenOption.CREATE_NEW;
2222
import static java.nio.file.StandardOpenOption.WRITE;
23+
import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
2324
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
2425
import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
2526

@@ -137,7 +138,6 @@ class IndexFileManager {
137138
/**
138139
* Index file metadata grouped by Raft Group ID.
139140
*/
140-
// FIXME: This map is never cleaned up, see https://issues.apache.org/jira/browse/IGNITE-27926.
141141
private final Map<Long, GroupIndexMeta> groupIndexMetas = new ConcurrentHashMap<>();
142142

143143
IndexFileManager(Path baseDir) throws IOException {
@@ -430,6 +430,10 @@ private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
430430
return null;
431431
}
432432

433+
if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
434+
return null;
435+
}
436+
433437
if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
434438
// No prefix truncation required, simply create a new meta.
435439
return new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileProperties);
@@ -444,13 +448,19 @@ private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
444448
}
445449

446450
private void putIndexFileMeta(IndexMetaSpec metaSpec) {
447-
IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
448-
449451
// Using boxed value to avoid unnecessary autoboxing later.
450452
Long groupId = metaSpec.groupId();
451453

452454
long firstIndexKept = metaSpec.firstIndexKept();
453455

456+
if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
457+
groupIndexMetas.remove(groupId);
458+
459+
return;
460+
}
461+
462+
IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
463+
454464
GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
455465

456466
if (existingGroupIndexMeta == null) {

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ class SegmentFileManager implements ManuallyCloseable {
114114
*/
115115
static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; // 8 zero bytes.
116116

117+
/**
118+
* Special "destroy group" sentinel value for the log reset index.
119+
*
120+
* <p>Must not overlap with a valid stored log index.
121+
*/
122+
static final long GROUP_DESTROY_LOG_INDEX = Long.MIN_VALUE;
123+
117124
private final String storageName;
118125

119126
private final Path segmentFilesDir;
@@ -422,6 +429,14 @@ void reset(long groupId, long nextLogIndex) throws IOException {
422429
}
423430
}
424431

432+
/**
433+
* Destroys all log data for the given group. Writes a tombstone using {@link #GROUP_DESTROY_LOG_INDEX} so that the GC can discard
434+
* the group's entries on the next compaction pass.
435+
*/
436+
void destroyGroup(long groupId) throws IOException {
437+
reset(groupId, GROUP_DESTROY_LOG_INDEX);
438+
}
439+
425440
private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws IOException {
426441
while (true) {
427442
SegmentFileWithMemtable segmentFileWithMemtable = currentSegmentFile();

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@ public LogStorage createLogStorage(String raftNodeStorageId, RaftOptions raftOpt
7878

7979
@Override
8080
public void destroyLogStorage(String raftNodeStorageId) {
81-
// TODO IGNITE-28527 Implement.
81+
try {
82+
fileManager.destroyGroup(convertNodeId(raftNodeStorageId));
83+
} catch (IOException e) {
84+
throw new LogStorageException(String.format("Failed to destroy log storage [storageId=%s]", raftNodeStorageId), e);
85+
}
8286
}
8387

8488
@Override

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.raft.storage.segstore;
1919

20+
import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
2021
import static org.hamcrest.MatcherAssert.assertThat;
2122
import static org.hamcrest.Matchers.is;
2223
import static org.hamcrest.Matchers.notNullValue;
@@ -644,4 +645,54 @@ void testGetSegmentFilePointerReturnsNullForEmptyMetaRange() throws IOException
644645

645646
assertThat(indexFileManager.getSegmentFilePointer(1, 2), is(nullValue()));
646647
}
648+
649+
@Test
650+
void testDestroyGroupSoleTombstoneClearsIndexMetaOnCheckpoint() throws IOException {
651+
var memtable = new SingleThreadMemTable();
652+
memtable.appendSegmentFileOffset(0, 1, 10);
653+
memtable.appendSegmentFileOffset(1, 1, 20);
654+
indexFileManager.saveNewIndexMemtable(memtable);
655+
656+
assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
657+
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
658+
659+
// Sole tombstone: the segment file had no entries for group 0, only the destroy tombstone.
660+
var destroyMemtable = new SingleThreadMemTable();
661+
destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
662+
indexFileManager.saveNewIndexMemtable(destroyMemtable);
663+
664+
assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue()));
665+
assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
666+
assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));
667+
668+
// Group 1 must be unaffected.
669+
assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20)));
670+
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
671+
}
672+
673+
@Test
674+
void testDestroyGroupWithPriorDataClearsIndexMetaOnCheckpoint() throws IOException {
675+
var memtable = new SingleThreadMemTable();
676+
memtable.appendSegmentFileOffset(0, 1, 10);
677+
memtable.appendSegmentFileOffset(1, 1, 20);
678+
indexFileManager.saveNewIndexMemtable(memtable);
679+
680+
assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
681+
682+
// Group 0 writes more entries in the next segment, then gets destroyed in the same segment.
683+
var destroyMemtable = new SingleThreadMemTable();
684+
destroyMemtable.appendSegmentFileOffset(0, 2, 100);
685+
destroyMemtable.appendSegmentFileOffset(0, 3, 200);
686+
destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
687+
indexFileManager.saveNewIndexMemtable(destroyMemtable);
688+
689+
assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue()));
690+
assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(nullValue()));
691+
assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
692+
assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));
693+
694+
// Group 1 must be unaffected.
695+
assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20)));
696+
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
697+
}
647698
}

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static org.hamcrest.Matchers.is;
3030
import static org.hamcrest.Matchers.lessThan;
3131
import static org.hamcrest.Matchers.not;
32+
import static org.junit.jupiter.api.Assertions.fail;
3233

3334
import java.io.IOException;
3435
import java.io.UncheckedIOException;
@@ -646,6 +647,100 @@ void testCompactionOfFileAdjacentToStaleEntryInDequeCausesCorruption() throws Ex
646647
});
647648
}
648649

650+
/**
651+
* Verifies that after {@link SegmentFileManager#destroyGroup} is called, the GC can remove segment files belonging
652+
* to the destroyed group.
653+
*/
654+
@Test
655+
void testSegmentFilesRemovedByGcAfterGroupDestroy() throws Exception {
656+
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
657+
658+
for (int i = 0; i < batches.size(); i++) {
659+
appendBytes(GROUP_ID_1, batches.get(i), i);
660+
}
661+
662+
await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1)));
663+
664+
List<Path> oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1);
665+
List<Path> oldIndexFiles = indexFiles();
666+
667+
fileManager.destroyGroup(GROUP_ID_1);
668+
669+
// The destroy tombstone is in the current segment file's memtable. Trigger a rollover using a different group so the
670+
// checkpoint thread processes the tombstone and removes GROUP_ID_1 from the in-memory index.
671+
triggerAndAwaitCheckpoint(GROUP_ID_2, 0);
672+
673+
for (Path segmentFile : oldSegmentFiles) {
674+
runCompaction(segmentFile);
675+
}
676+
677+
for (Path segmentFile : oldSegmentFiles) {
678+
assertThat(segmentFile, not(exists()));
679+
}
680+
681+
for (Path indexFile : oldIndexFiles) {
682+
assertThat(indexFile, not(exists()));
683+
}
684+
685+
for (int i = 0; i < batches.size(); i++) {
686+
int index = i;
687+
688+
fileManager.getEntry(GROUP_ID_1, index, bs -> {
689+
fail("Entry for index " + index + " must be missing");
690+
691+
return null;
692+
});
693+
}
694+
}
695+
696+
/**
697+
* Verifies that when a group is destroyed while another group shares the same segment files, GC compacts each shared file by dropping
698+
* only the destroyed group's entries while preserving the surviving group's entries.
699+
*/
700+
@Test
701+
void testSegmentFilesCompactedByGcAfterGroupDestroyWithSurvivingGroup() throws Exception {
702+
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
703+
704+
for (int i = 0; i < batches.size(); i++) {
705+
appendBytes(GROUP_ID_1, batches.get(i), i);
706+
appendBytes(GROUP_ID_2, batches.get(i), i);
707+
}
708+
709+
await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1)));
710+
711+
List<Path> oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1);
712+
713+
// Destroy one group while the other remains active.
714+
fileManager.destroyGroup(GROUP_ID_1);
715+
716+
// Trigger a rollover using GROUP_ID_2 so the checkpoint thread processes the tombstone and removes GROUP_ID_1 from
717+
// the in-memory index, enabling GC to compact the shared segment files.
718+
triggerAndAwaitCheckpoint(GROUP_ID_2, batches.size() - 1);
719+
720+
for (Path segmentFile : oldSegmentFiles) {
721+
FileProperties originalProperties = SegmentFile.fileProperties(segmentFile);
722+
723+
runCompaction(segmentFile);
724+
725+
// Original file must have been replaced by a compacted generation (GROUP_ID_2 entries survive).
726+
assertThat(segmentFile, not(exists()));
727+
728+
var newFileProperties = new FileProperties(originalProperties.ordinal(), originalProperties.generation() + 1);
729+
730+
assertThat(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties)), exists());
731+
}
732+
733+
// GROUP_ID_2 entries must all still be readable.
734+
for (int i = 0; i < batches.size(); i++) {
735+
int index = i;
736+
737+
fileManager.getEntry(GROUP_ID_2, i, bs -> {
738+
assertThat(bs, is(batches.get(index)));
739+
return null;
740+
});
741+
}
742+
}
743+
649744
@Test
650745
void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception {
651746
// Fill multiple segment files to create both segment and index files.
@@ -717,13 +812,17 @@ public int size(LogEntry logEntry) {
717812
}
718813

719814
private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws IOException {
815+
triggerAndAwaitCheckpoint(GROUP_ID_1, lastGroupIndex);
816+
}
817+
818+
private void triggerAndAwaitCheckpoint(long groupId, long lastGroupIndex) throws IOException {
720819
List<Path> segmentFilesBeforeCheckpoint = segmentFiles();
721820

722821
// Insert some entries to trigger a rollover (and a checkpoint).
723822
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);
724823

725824
for (int i = 0; i < batches.size(); i++) {
726-
appendBytes(GROUP_ID_1, batches.get(i), lastGroupIndex + i + 1);
825+
appendBytes(groupId, batches.get(i), lastGroupIndex + i + 1);
727826
}
728827

729828
List<Path> segmentFilesAfterCheckpoint = segmentFiles();

0 commit comments

Comments
 (0)