Skip to content

Commit e4b025b

Browse files
committed
Added logic to perform maintenance functions, expireSnapshots and rewriteManifests
1 parent 9162359 commit e4b025b

1 file changed

Lines changed: 40 additions & 0 deletions

File tree

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package com.altinity.ice.rest.catalog.internal.maintenance;
22

33
import java.util.HashMap;
4+
import java.util.List;
45
import java.util.Map;
56
import java.util.Properties;
67
import java.util.concurrent.TimeUnit;
78
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
import org.apache.iceberg.Table;
11+
import org.apache.iceberg.actions.Action;
812
import org.apache.iceberg.catalog.Catalog;
13+
import org.apache.iceberg.catalog.Namespace;
14+
import org.apache.iceberg.catalog.SupportsNamespaces;
15+
import org.apache.iceberg.catalog.TableIdentifier;
916
import org.quartz.CronScheduleBuilder;
1017
import org.quartz.JobBuilder;
1118
import org.quartz.JobDetail;
@@ -172,6 +179,39 @@ public void performMaintenance() {
172179
// - Compact small files
173180

174181
// For now, just log that we're performing maintenance
182+
// get the list of namespaces.
183+
List<Namespace> namespaces;
184+
if (catalog instanceof SupportsNamespaces) {
185+
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
186+
namespaces = nsCatalog.listNamespaces();
187+
for (Namespace ns : namespaces) {
188+
logger.debug("Namespace: " + ns);
189+
}
190+
} else {
191+
logger.error("Catalog does not support namespace operations.");
192+
return;
193+
} // Iterate through namespace
194+
195+
for (Namespace namespace : namespaces) {
196+
// Get the table
197+
List<TableIdentifier> tables = catalog.listTables(namespace);
198+
for (TableIdentifier tableIdent : tables) {
199+
long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30);
200+
// Get the table
201+
Table table = catalog.loadTable(tableIdent);
202+
// Expire snapshots older than 30 days
203+
table.expireSnapshots()
204+
.expireOlderThan(olderThanMillis)
205+
.commit();
206+
// Remove Orphan Files
207+
208+
// Rewrite Manifests
209+
table.rewriteManifests()
210+
.rewriteIf(manifest -> true)
211+
.commit();
212+
}
213+
214+
}
175215
logger.info("Maintenance operations completed for catalog: {}", catalog.name());
176216
} else {
177217
logger.warn("No catalog available for maintenance operations");

0 commit comments

Comments
 (0)