diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 36bf129f16043..fa7e919880f3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -165,6 +165,7 @@
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -787,6 +788,14 @@ protected void handleCommandScalableTopicLookup(
return;
}
+ // Scalable topics do not support non-persistent storage. Reject early with a
+ // clear error rather than failing later in segment infrastructure.
+ if (topicName.getDomain() == TopicDomain.non_persistent) {
+ ctx.writeAndFlush(Commands.newScalableTopicError(sessionId, ServerError.NotAllowedError,
+ "Scalable topics do not support non-persistent:// topics"));
+ return;
+ }
+
if (!this.service.getPulsar().isRunning()) {
log.warn("ScalableTopicLookup rejected: broker not ready");
ctx.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 2cc50301486dd..350125438e7cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -32,8 +32,10 @@
import org.apache.pulsar.common.api.proto.SegmentBrokerAddress;
import org.apache.pulsar.common.api.proto.SegmentInfoProto;
import org.apache.pulsar.common.api.proto.SegmentState;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.scalable.HashRange;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
@@ -61,6 +63,9 @@ public class DagWatchSession implements ScalableTopicResources.MetadataPathListe
private final BrokerService brokerService;
private final String metadataPath;
+ /** Canonical {@code topic://...} identity returned to the client regardless of the
+ * input form ({@code topic://}, {@code persistent://}, or short-form). */
+ private final String resolvedTopicName;
private volatile boolean closed = false;
public DagWatchSession(long sessionId,
@@ -74,6 +79,7 @@ public DagWatchSession(long sessionId,
this.resources = resources;
this.brokerService = brokerService;
this.metadataPath = resources.topicPath(topicName);
+ this.resolvedTopicName = topicName.toScalableTopic().toString();
this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build();
}
@@ -85,23 +91,121 @@ public String getMetadataPath() {
/**
* Start the session: load current metadata, set up watch, and return
* the initial layout response.
+ *
+ *
start() {
+ // A specific partition (persistent://t/n/x-partition-K) is not a valid
+ // scalable-topic lookup target — the synthetic layout models the whole
+ // partitioned topic. Reject it up front rather than producing a layout that
+ // wraps nonsensical -partition-K-partition-J names.
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.failedFuture(new IllegalArgumentException(
+ "Cannot open a scalable-topic lookup for an individual partition: " + topicName
+ + "; use the base topic name " + topicName.getPartitionedTopicName()));
+ }
+
// Register through the resources-level fan-out so close() can deregister us
// and we don't accumulate stale store-level listeners over time.
resources.registerPathListener(this);
return resources.getScalableTopicMetadataAsync(topicName, true)
.thenCompose(optMd -> {
- if (optMd.isEmpty()) {
- return CompletableFuture.failedFuture(
- new IllegalStateException("Scalable topic not found: " + topicName));
+ if (optMd.isPresent()) {
+ return buildResponse(optMd.get());
}
- ScalableTopicMetadata metadata = optMd.get();
- return buildResponse(metadata);
+ if (topicName.getDomain() == TopicDomain.persistent) {
+ return buildSyntheticResponse();
+ }
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Scalable topic not found: " + topicName));
});
}
+ /**
+ * Build a synthetic layout for a not-yet-migrated regular topic. Each partition
+ * (or the whole topic, for non-partitioned) becomes an active legacy segment
+ * that wraps the existing {@code persistent://...} topic. The synthetic layout
+ * uses mod-N routing (signalled by all active leaves being legacy segments)
+ * so V5 producers route the same way v4 producers do.
+ *
+ * Note: {@code fetchPartitionedTopicMetadataAsync} returns {@code partitions == 0}
+ * for both an existing non-partitioned topic and a topic that does not exist at all.
+ * In the latter case a single-segment synthetic layout is still returned; whether the
+ * underlying {@code persistent://...} topic gets auto-created is then decided by the
+ * namespace's auto-creation policy when the V5 producer/consumer first attaches —
+ * exactly the behaviour a v4 client would see.
+ */
+ private CompletableFuture buildSyntheticResponse() {
+ return brokerService.fetchPartitionedTopicMetadataAsync(topicName)
+ .thenApply(partitionedMd -> {
+ int partitions = partitionedMd.partitions;
+ long createdAtMs = System.currentTimeMillis();
+ Map segments = new LinkedHashMap<>();
+ if (partitions <= 0) {
+ // Non-partitioned: one legacy segment covering the full hash range.
+ // We're only called when topicName.getDomain() == persistent, so
+ // toString() is the canonical persistent://t/n/x form.
+ segments.put(0L, SegmentInfo.activeLegacy(
+ 0L,
+ HashRange.of(0x0000, 0xFFFF),
+ topicName.toString(),
+ /*createdAtEpoch*/ 0L,
+ createdAtMs));
+ } else {
+ // Partitioned: N legacy segments. The hash ranges here are cosmetic —
+ // routing for synthetic layouts is mod-N over segment_id (see
+ // SegmentRouter on the SDK side), so the ranges are never consulted
+ // for routing. They're assigned for stable, human-readable layouts.
+ for (int k = 0; k < partitions; k++) {
+ HashRange range = syntheticRange(k, partitions);
+ segments.put((long) k, SegmentInfo.activeLegacy(
+ k,
+ range,
+ topicName.getPartition(k).toString(),
+ /*createdAtEpoch*/ 0L,
+ createdAtMs));
+ }
+ }
+ return new ScalableTopicLayoutResponse(
+ /*epoch*/ 0L,
+ segments,
+ /*segmentBrokerAddresses*/ null,
+ /*segmentBrokerAddressesTls*/ null,
+ /*controllerBrokerUrl*/ null,
+ /*controllerBrokerUrlTls*/ null);
+ });
+ }
+
+ /**
+ * Cosmetic hash range for partition {@code k} of {@code n} in a synthetic layout.
+ * When {@code n <= 0x10000} the ranges tile {@code [0x0000, 0xFFFF]} contiguously;
+ * beyond that there are more partitions than hash slots, so a degenerate single-slot
+ * range (clamped to the hash space) is used. Either way the value is never consulted
+ * for routing — synthetic layouts route mod-N over segment_id.
+ */
+ private static HashRange syntheticRange(int k, int n) {
+ if (n > 0x10000) {
+ int slot = Math.min(k, 0xFFFF);
+ return HashRange.of(slot, slot);
+ }
+ int width = 0x10000 / n;
+ int start = k * width;
+ int end = (k == n - 1) ? 0xFFFF : (start + width - 1);
+ return HashRange.of(start, end);
+ }
+
/**
* Invoked by the {@link ScalableTopicResources} fan-out for every metadata event
* matching this session's topic path. The registry already path-filtered for us;
@@ -142,7 +246,10 @@ public void pushUpdate(ScalableTopicLayoutResponse response) {
}
ScalableTopicDAG dag = buildDagProto(response);
log.info().attr("epoch", response.epoch()).log("Pushing DAG update");
- cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(sessionId, dag));
+ // Always report the canonical topic://... identity so clients that looked up
+ // via persistent://... or short-form know the resolved name.
+ cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(
+ sessionId, resolvedTopicName, dag));
}
private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) {
@@ -170,6 +277,10 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) {
if (seg.sealedAtMs() >= 0) {
segProto.setSealedAtMs(seg.sealedAtMs());
}
+ // Legacy segments wrap an existing, externally managed persistent://... topic.
+ if (seg.legacyTopicName() != null) {
+ segProto.setLegacyTopicName(seg.legacyTopicName());
+ }
}
// Add broker addresses for active segments
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
index a14ef6d5780ff..849061e0520ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -42,6 +42,7 @@
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.scalable.HashRange;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.common.scalable.SegmentState;
@@ -104,6 +105,8 @@ public void testCloseIsIdempotent() {
@Test
public void testStartFailsWhenTopicMetadataMissing() {
+ // topic://... input + no scalable metadata = TopicNotFound. Synthetic layouts
+ // are only produced for persistent://... input (regular topics).
when(resources.getScalableTopicMetadataAsync(TOPIC, true))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
@@ -124,6 +127,121 @@ public void testStartFailsWhenTopicMetadataMissing() {
}
}
+ // --- synthetic layout for not-yet-migrated regular topics ---
+
+ @Test
+ public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() throws Exception {
+ // persistent:// input + no scalable metadata + non-partitioned regular topic
+ // (partitions=0) → synthetic layout with a single active legacy segment
+ // covering [0x0000, 0xFFFF] that wraps the existing persistent:// topic.
+ TopicName regular = TopicName.get("persistent://tenant/ns/my-regular");
+ String regularPath = "/admin/scalable-topics/tenant/ns/my-regular";
+ when(resources.topicPath(regular)).thenReturn(regularPath);
+ when(resources.getScalableTopicMetadataAsync(regular, true))
+ .thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+ when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
+ .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
+
+ DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
+ ScalableTopicLayoutResponse response = s.start().get();
+
+ assertEquals(response.epoch(), 0L);
+ assertEquals(response.segments().size(), 1);
+ SegmentInfo seg = response.segments().get(0L);
+ assertNotNull(seg);
+ assertEquals(seg.segmentId(), 0L);
+ assertEquals(seg.hashRange().start(), 0x0000);
+ assertEquals(seg.hashRange().end(), 0xFFFF);
+ assertTrue(seg.isActive());
+ assertTrue(seg.isLegacy(), "non-partitioned regular topic must wrap as a legacy segment");
+ assertEquals(seg.legacyTopicName(), "persistent://tenant/ns/my-regular");
+ }
+
+ @Test
+ public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws Exception {
+ // persistent:// input + no scalable metadata + 4-partition topic →
+ // synthetic layout with 4 active legacy segments wrapping each
+ // persistent://...-partition-K and equal-width contiguous hash ranges.
+ TopicName regular = TopicName.get("persistent://tenant/ns/my-partitioned");
+ String regularPath = "/admin/scalable-topics/tenant/ns/my-partitioned";
+ when(resources.topicPath(regular)).thenReturn(regularPath);
+ when(resources.getScalableTopicMetadataAsync(regular, true))
+ .thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+ when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
+ .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(4)));
+
+ DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
+ ScalableTopicLayoutResponse response = s.start().get();
+
+ assertEquals(response.epoch(), 0L);
+ assertEquals(response.segments().size(), 4);
+ for (int k = 0; k < 4; k++) {
+ SegmentInfo seg = response.segments().get((long) k);
+ assertNotNull(seg, "missing segment for partition " + k);
+ assertEquals(seg.segmentId(), k);
+ assertTrue(seg.isActive());
+ assertTrue(seg.isLegacy(), "partition " + k + " must wrap as a legacy segment");
+ assertEquals(seg.legacyTopicName(),
+ "persistent://tenant/ns/my-partitioned-partition-" + k);
+ }
+ // Hash ranges cover [0x0000, 0xFFFF] contiguously and end at 0xFFFF inclusive
+ // on the last segment.
+ assertEquals(response.segments().get(0L).hashRange().start(), 0x0000);
+ assertEquals(response.segments().get(3L).hashRange().end(), 0xFFFF);
+ // No gaps between consecutive segments.
+ for (int k = 0; k < 3; k++) {
+ int endK = response.segments().get((long) k).hashRange().end();
+ int startK1 = response.segments().get((long) (k + 1)).hashRange().start();
+ assertEquals(startK1, endK + 1, "gap between partition " + k + " and " + (k + 1));
+ }
+ }
+
+ @Test
+ public void testStartRejectsIndividualPartitionInput() throws Exception {
+ // A specific partition name must be rejected — the synthetic layout models the
+ // whole partitioned topic, and wrapping a single partition would otherwise
+ // produce nonsensical -partition-K-partition-J underlying names.
+ TopicName partition = TopicName.get("persistent://tenant/ns/my-partitioned-partition-3");
+
+ DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, resources, brokerService);
+ CompletableFuture future = s.start();
+
+ assertTrue(future.isCompletedExceptionally());
+ try {
+ future.get();
+ fail("expected failure");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException, "got: " + e.getCause());
+ assertTrue(e.getCause().getMessage().contains("individual partition"),
+ e.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() {
+ // The synthetic-layout response goes through pushUpdate, which always emits the
+ // canonical topic://... identity in resolved_topic_name regardless of input form.
+ TopicName regular = TopicName.get("persistent://tenant/ns/my-regular");
+ ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse(
+ 0L,
+ Map.of(0L, SegmentInfo.activeLegacy(0L, HashRange.of(0x0000, 0xFFFF),
+ "persistent://tenant/ns/my-regular", 0L, 12345L)),
+ null, null, null, null);
+
+ DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
+ s.pushUpdate(response);
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class);
+ verify(ctx).writeAndFlush(captor.capture());
+ BaseCommand cmd = parseFrame(captor.getValue());
+ assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(),
+ "topic://tenant/ns/my-regular");
+ // The legacy-segment marker round-trips through the wire format.
+ var seg = cmd.getScalableTopicUpdate().getDag().getSegmentAt(0);
+ assertTrue(seg.hasLegacyTopicName());
+ assertEquals(seg.getLegacyTopicName(), "persistent://tenant/ns/my-regular");
+ }
+
@Test
public void testStartRegistersWithResources() {
// start() routes through the resources-level fan-out instead of registering
@@ -314,7 +432,8 @@ private static SegmentInfo seg(long id, int start, int end, SegmentState state,
createdAt,
sealedAt,
createdAtMs,
- sealedAtMs);
+ sealedAtMs,
+ null);
}
private static java.util.List toList(long[] arr) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 183e66b68d95e..db274bc8712c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -539,19 +539,19 @@ public void testSeekSubscriptionDispatchesPerSegmentByTimestamp() throws Excepti
org.apache.pulsar.common.scalable.SegmentState.SEALED,
java.util.List.of(), java.util.List.of(3L),
/*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1,
- /*createdAtMs*/ t0, /*sealedAtMs*/ t1);
+ /*createdAtMs*/ t0, /*sealedAtMs*/ t1, null);
org.apache.pulsar.common.scalable.SegmentInfo seg1 = new org.apache.pulsar.common.scalable.SegmentInfo(
1L,
org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF),
org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
java.util.List.of(), java.util.List.of(),
- 0, -1, t1, -1);
+ 0, -1, t1, -1, null);
org.apache.pulsar.common.scalable.SegmentInfo seg2 = new org.apache.pulsar.common.scalable.SegmentInfo(
2L,
org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF),
org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
java.util.List.of(), java.util.List.of(),
- 0, -1, t3, -1);
+ 0, -1, t3, -1, null);
TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic");
ScalableTopicMetadata md = ScalableTopicMetadata.builder()
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 52e9a60ad9bf8..ea70d11a0ac7d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -300,6 +300,25 @@ public TopicName getPartition(int index) {
return get(partitionName);
}
+ /**
+ * Returns this topic re-expressed in the scalable {@code topic://...} domain,
+ * regardless of the original input domain. Used to derive the canonical
+ * scalable-topic identity for a name the user may have spelled as
+ * {@code persistent://...} or short-form.
+ *
+ * Any {@code -partition-K} suffix is stripped, so a partition name like
+ * {@code persistent://t/n/x-partition-3} resolves to the base topic's scalable
+ * identity {@code topic://t/n/x}, not {@code topic://t/n/x-partition-3}.
+ */
+ public TopicName toScalableTopic() {
+ if (domain == TopicDomain.topic) {
+ return this;
+ }
+ TopicName base = isPartitioned() ? get(getPartitionedTopicName()) : this;
+ return get(TopicDomain.topic.value() + "://" + base.getNamespace()
+ + "/" + base.getEncodedLocalName());
+ }
+
/**
* @return partition index of the completeTopicName.
* It returns -1 if the completeTopicName (topic) is not partitioned.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2c0c74fb378b8..e6cb2605315ad 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1700,10 +1700,16 @@ public static ByteBuf newScalableTopicClose(long sessionId) {
return serializeWithSize(cmd);
}
- public static ByteBuf newScalableTopicUpdate(long sessionId, ScalableTopicDAG dag) {
+ public static ByteBuf newScalableTopicUpdate(long sessionId, String resolvedTopicName,
+ ScalableTopicDAG dag) {
BaseCommand cmd = new BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE);
CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate()
.setSessionId(sessionId);
+ // resolved_topic_name is optional on the wire; guard against null because the
+ // lightproto setter would NPE rather than leave the field unset.
+ if (resolvedTopicName != null) {
+ update.setResolvedTopicName(resolvedTopicName);
+ }
update.setDag().copyFrom(dag);
return serializeWithSize(cmd);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
index 848326dd026f4..0b727b23e1805 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
@@ -35,15 +35,25 @@
* Used for retention-based segment GC and for timestamp-based seek.
*
*
- * @param segmentId monotonically increasing, unique within the topic
- * @param hashRange inclusive hash range [start, end]
- * @param state ACTIVE or SEALED
- * @param parentIds parent segment IDs in the DAG (empty for initial/root segments)
- * @param childIds child segment IDs in the DAG (empty for active leaf segments)
- * @param createdAtEpoch DAG epoch when this segment was created
- * @param sealedAtEpoch DAG epoch when sealed (-1 if still active)
- * @param createdAtMs wall-clock millis at creation time
- * @param sealedAtMs wall-clock millis at seal time (-1 if still active)
+ *
A segment may be a legacy segment — one that is not managed by the
+ * scalable-topic controller and has no {@code segment://...} URI of its own; instead it
+ * wraps an existing, externally managed {@code persistent://...} topic. Legacy segments
+ * appear in the synthetic-layout response returned for a regular (partitioned or
+ * non-partitioned) topic that has not yet been migrated to a scalable topic.
+ * {@code legacyTopicName} is non-null exactly for legacy segments.
+ *
+ * @param segmentId monotonically increasing, unique within the topic
+ * @param hashRange inclusive hash range [start, end]
+ * @param state ACTIVE or SEALED
+ * @param parentIds parent segment IDs in the DAG (empty for initial/root segments)
+ * @param childIds child segment IDs in the DAG (empty for active leaf segments)
+ * @param createdAtEpoch DAG epoch when this segment was created
+ * @param sealedAtEpoch DAG epoch when sealed (-1 if still active)
+ * @param createdAtMs wall-clock millis at creation time
+ * @param sealedAtMs wall-clock millis at seal time (-1 if still active)
+ * @param legacyTopicName for legacy segments: the externally managed
+ * {@code persistent://...} topic this segment wraps.
+ * {@code null} for regular controller-managed segments.
*/
public record SegmentInfo(
long segmentId,
@@ -54,7 +64,8 @@ public record SegmentInfo(
long createdAtEpoch,
long sealedAtEpoch,
long createdAtMs,
- long sealedAtMs
+ long sealedAtMs,
+ String legacyTopicName
) {
public SegmentInfo {
parentIds = parentIds != null ? List.copyOf(parentIds) : List.of();
@@ -65,32 +76,56 @@ public record SegmentInfo(
public static SegmentInfo active(long segmentId, HashRange hashRange,
long createdAtEpoch, long createdAtMs) {
return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
- List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1);
+ List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, null);
}
/** Create a new active segment with the given parent IDs. */
public static SegmentInfo active(long segmentId, HashRange hashRange,
List parentIds, long createdAtEpoch, long createdAtMs) {
return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
- parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1);
+ parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1, null);
+ }
+
+ /**
+ * Create a new active legacy segment that wraps the given externally managed
+ * {@code persistent://...} topic instead of having its own {@code segment://...} URI.
+ * Used by the synthetic-layout response for not-yet-migrated regular topics.
+ */
+ public static SegmentInfo activeLegacy(long segmentId, HashRange hashRange,
+ String legacyTopicName,
+ long createdAtEpoch, long createdAtMs) {
+ return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
+ List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, legacyTopicName);
}
/** Return a sealed copy of this segment with the given child IDs. */
public SegmentInfo sealed(long sealedAtEpoch, long sealedAtMs, List childIds) {
return new SegmentInfo(segmentId, hashRange, SegmentState.SEALED,
- parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs);
+ parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs,
+ legacyTopicName);
}
/** Return a copy with different parent IDs. */
public SegmentInfo withParentIds(List parentIds) {
return new SegmentInfo(segmentId, hashRange, state,
- parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs);
+ parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs,
+ legacyTopicName);
}
/** Return a copy with different child IDs. */
public SegmentInfo withChildIds(List childIds) {
return new SegmentInfo(segmentId, hashRange, state,
- parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs);
+ parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs,
+ legacyTopicName);
+ }
+
+ /**
+ * True if this is a legacy segment — one that wraps an existing, externally managed
+ * {@code persistent://...} topic rather than owning a controller-managed
+ * {@code segment://...} URI. An empty {@code legacyTopicName} does not count as legacy.
+ */
+ public boolean isLegacy() {
+ return legacyTopicName != null && !legacyTopicName.isEmpty();
}
public boolean isActive() {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b2a698d33da0f..28501c8bc6637 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -855,6 +855,14 @@ message SegmentInfoProto {
// and timestamp-based seek; epoch above is a DAG generation number, not a clock.
required uint64 created_at_ms = 9;
optional uint64 sealed_at_ms = 10;
+
+ // Legacy-segment marker. When set, this segment is not managed by the scalable-topic
+ // controller and has no segment://... topic of its own — it wraps the named, externally
+ // managed persistent://... topic instead. Used by the synthetic-layout response the
+ // broker returns for a regular (partitioned or non-partitioned) topic that has not yet
+ // been migrated to a scalable topic. When absent, the segment URI is computed normally
+ // from the scalable topic name and segment_id (segment:///).
+ optional string legacy_topic_name = 11;
}
message SegmentBrokerAddress {
@@ -874,7 +882,11 @@ message ScalableTopicDAG {
// Client -> Broker: Request scalable topic metadata and initiate watch session
message CommandScalableTopicLookup {
required uint64 session_id = 1; // Client-assigned session ID
- required string topic = 2; // e.g. "topic://tenant/ns/my-topic"
+ // Any of "topic://t/n/x", "persistent://t/n/x", or a short form like
+ // "my-topic" (normalized by the broker to persistent://public/default/my-topic).
+ // The broker resolves the input to the canonical topic://... identity and
+ // returns it in CommandScalableTopicUpdate.resolved_topic_name.
+ required string topic = 2;
}
// Broker -> Client: Used for BOTH initial response and subsequent pushed updates
@@ -884,6 +896,12 @@ message CommandScalableTopicUpdate {
optional ServerError error = 3;
optional string message = 4;
+
+ // Canonical scalable-topic identity (always "topic://t/n/x") that the client
+ // should use for downstream operations. Set on every success response,
+ // including for inputs that were given as persistent://... or short-form.
+ // Absent on error responses.
+ optional string resolved_topic_name = 5;
}
// Client -> Broker: Close the DAG watch session
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 412d3696b1eed..eb9254ecf4a14 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -517,4 +517,26 @@ public void testToFullTopicName() {
assertThrows(IllegalArgumentException.class,
() -> TopicName.toFullTopicName("non-persistent://tenant/cluster/ns/topic"));
}
+
+ @Test
+ public void testToScalableTopic() {
+ // topic://... is returned as-is.
+ TopicName already = TopicName.get("topic://tenant/ns/x");
+ assertEquals(already.toScalableTopic(), already);
+
+ // persistent://... is re-expressed in the topic:// domain.
+ assertEquals(TopicName.get("persistent://tenant/ns/x").toScalableTopic().toString(),
+ "topic://tenant/ns/x");
+
+ // Short forms normalise to persistent://public/default/... first, then to topic://.
+ assertEquals(TopicName.get("my-topic").toScalableTopic().toString(),
+ "topic://public/default/my-topic");
+ assertEquals(TopicName.get("tenant/ns/my-topic").toScalableTopic().toString(),
+ "topic://tenant/ns/my-topic");
+
+ // A -partition-K suffix is stripped: the partition resolves to the base topic's
+ // scalable identity, not topic://.../x-partition-K.
+ assertEquals(TopicName.get("persistent://tenant/ns/x-partition-3").toScalableTopic().toString(),
+ "topic://tenant/ns/x");
+ }
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
index c2c9267636284..a5d028889eea4 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
@@ -101,12 +101,13 @@ public void testNewScalableTopicUpdate() {
.addParentId(0L);
dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650");
- ByteBuf frame = Commands.newScalableTopicUpdate(77L, dag);
+ ByteBuf frame = Commands.newScalableTopicUpdate(77L, "topic://t/n/x", dag);
BaseCommand cmd = parseFrame(frame);
assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
assertTrue(cmd.hasScalableTopicUpdate());
assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 77L);
+ assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(), "topic://t/n/x");
assertFalse(cmd.getScalableTopicUpdate().hasError(),
"successful update must not carry an error field");
@@ -132,6 +133,27 @@ public void testNewScalableTopicUpdate() {
assertEquals(got.getSegmentBrokerAt(0).getBrokerUrl(), "pulsar://broker-a:6650");
}
+ @Test
+ public void testNewScalableTopicUpdateWithNullResolvedTopicNameLeavesFieldUnset() {
+ // resolved_topic_name is optional on the wire; a null must serialise cleanly with
+ // the field unset rather than NPE in the lightproto setter.
+ ScalableTopicDAG dag = new ScalableTopicDAG().setEpoch(1L);
+ dag.addSegment()
+ .setSegmentId(0L)
+ .setHashStart(0x0000)
+ .setHashEnd(0xFFFF)
+ .setState(SegmentState.ACTIVE)
+ .setCreatedAtEpoch(0L)
+ .setCreatedAtMs(System.currentTimeMillis());
+
+ ByteBuf frame = Commands.newScalableTopicUpdate(5L, null, dag);
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 5L);
+ assertFalse(cmd.getScalableTopicUpdate().hasResolvedTopicName(),
+ "null resolvedTopicName must leave the optional field unset");
+ }
+
@Test
public void testNewScalableTopicError() {
ByteBuf frame = Commands.newScalableTopicError(15L, ServerError.TopicNotFound,