From c10d8c21b37841852e4bc0caa29245be026a5af1 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 19 May 2026 14:28:52 +0100 Subject: [PATCH 1/2] [feat][broker] PIP-475: synthetic-layout lookup for regular topics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the broker-side foundation for PIP-475 (regular-to-scalable topic migration). The scalable-topic lookup session now accepts persistent:// and short-form topic names in addition to topic://, and synthesises a layout that wraps an existing regular (partitioned or non-partitioned) topic as one or more special segments. V5 clients can then operate against any topic through the scalable surface, even before it has been migrated. Schema additions: * SegmentInfoProto.underlying_topic_name — when set, the segment wraps the named persistent://... topic instead of having its own segment://... URI. Used by the synthetic-layout response. * CommandScalableTopicUpdate.resolved_topic_name — the canonical topic://t/n/x identity, set on every successful response so a client that looked up via persistent://... or short-form learns the resolved name. Java surface mirrors the wire additions on SegmentInfo (with a new activeSpecial(...) factory and isSpecial() check) and a new TopicName.toScalableTopic() helper used by the broker to derive the canonical identity regardless of input form. Broker behaviour: * ServerCnx rejects non-persistent:// scalable-topic lookups early. * DagWatchSession.start() falls back to a synthetic layout for persistent:// input when no scalable metadata exists at the canonical path. Non-partitioned → 1 special segment covering the full hash range; partitioned with N partitions → N special segments wrapping each persistent://...-partition-K with equal-width contiguous hash ranges. The metadata-store watch remains active so a later migration that writes scalable metadata to the same path is observed and transparently replaces the synthetic layout. * pushUpdate emits the canonical topic://... resolvedTopicName, plus the new special-segment marker when set. Tests: * DagWatchSessionTest covers synthetic layouts for both non-partitioned and partitioned regular topics, and verifies the wire response carries resolved_topic_name and underlying_topic_name. * CommandsScalableTopicTest verifies resolved_topic_name round-trips through the new Commands.newScalableTopicUpdate signature. --- .../pulsar/broker/service/ServerCnx.java | 9 ++ .../service/scalable/DagWatchSession.java | 90 ++++++++++++++-- .../service/scalable/DagWatchSessionTest.java | 100 +++++++++++++++++- .../scalable/ScalableTopicControllerTest.java | 6 +- .../pulsar/common/naming/TopicName.java | 13 +++ .../pulsar/common/protocol/Commands.java | 6 +- .../pulsar/common/scalable/SegmentInfo.java | 62 ++++++++--- pulsar-common/src/main/proto/PulsarApi.proto | 20 +++- .../protocol/CommandsScalableTopicTest.java | 3 +- 9 files changed, 280 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 36bf129f16043..fa7e919880f3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -165,6 +165,7 @@ import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -787,6 +788,14 @@ protected void handleCommandScalableTopicLookup( return; } + // Scalable topics do not support non-persistent storage. Reject early with a + // clear error rather than failing later in segment infrastructure. + if (topicName.getDomain() == TopicDomain.non_persistent) { + ctx.writeAndFlush(Commands.newScalableTopicError(sessionId, ServerError.NotAllowedError, + "Scalable topics do not support non-persistent:// topics")); + return; + } + if (!this.service.getPulsar().isRunning()) { log.warn("ScalableTopicLookup rejected: broker not ready"); ctx.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java index 2cc50301486dd..7e02808972169 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java @@ -32,8 +32,10 @@ import org.apache.pulsar.common.api.proto.SegmentBrokerAddress; import org.apache.pulsar.common.api.proto.SegmentInfoProto; import org.apache.pulsar.common.api.proto.SegmentState; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.scalable.HashRange; import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -61,6 +63,9 @@ public class DagWatchSession implements ScalableTopicResources.MetadataPathListe private final BrokerService brokerService; private final String metadataPath; + /** Canonical {@code topic://...} identity returned to the client regardless of the + * input form ({@code topic://}, {@code persistent://}, or short-form). */ + private final String resolvedTopicName; private volatile boolean closed = false; public DagWatchSession(long sessionId, @@ -74,6 +79,7 @@ public DagWatchSession(long sessionId, this.resources = resources; this.brokerService = brokerService; this.metadataPath = resources.topicPath(topicName); + this.resolvedTopicName = topicName.toScalableTopic().toString(); this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build(); } @@ -85,6 +91,19 @@ public String getMetadataPath() { /** * Start the session: load current metadata, set up watch, and return * the initial layout response. + * + *

If no scalable metadata exists at the canonical path: + *

+ * The metadata-store watch is registered regardless, so a subsequent migration + * that writes scalable metadata to the same path will be observed and the + * synthetic layout will be transparently replaced with the real DAG. */ public CompletableFuture start() { // Register through the resources-level fan-out so close() can deregister us @@ -93,12 +112,64 @@ public CompletableFuture start() { return resources.getScalableTopicMetadataAsync(topicName, true) .thenCompose(optMd -> { - if (optMd.isEmpty()) { - return CompletableFuture.failedFuture( - new IllegalStateException("Scalable topic not found: " + topicName)); + if (optMd.isPresent()) { + return buildResponse(optMd.get()); } - ScalableTopicMetadata metadata = optMd.get(); - return buildResponse(metadata); + if (topicName.getDomain() == TopicDomain.persistent) { + return buildSyntheticResponse(); + } + return CompletableFuture.failedFuture( + new IllegalStateException("Scalable topic not found: " + topicName)); + }); + } + + /** + * Build a synthetic layout for a not-yet-migrated regular topic. Each partition + * (or the whole topic, for non-partitioned) becomes an active special segment + * that wraps the existing {@code persistent://...} topic. The synthetic layout + * uses mod-N routing (signalled by all active leaves being special segments) + * so V5 producers route the same way v4 producers do. + */ + private CompletableFuture buildSyntheticResponse() { + return brokerService.fetchPartitionedTopicMetadataAsync(topicName) + .thenApply(partitionedMd -> { + int partitions = partitionedMd.partitions; + long createdAtMs = System.currentTimeMillis(); + Map segments = new LinkedHashMap<>(); + if (partitions <= 0) { + // Non-partitioned: one special segment covering the full hash range. + // We're only called when topicName.getDomain() == persistent, so + // toString() is the canonical persistent://t/n/x form. + segments.put(0L, SegmentInfo.activeSpecial( + 0L, + HashRange.of(0x0000, 0xFFFF), + topicName.toString(), + /*createdAtEpoch*/ 0L, + createdAtMs)); + } else { + // Partitioned: N special segments with equal-width contiguous ranges. + // The ranges are cosmetic — routing for synthetic layouts is mod-N over + // segment_id, derived implicitly by the SDK from "all active leaves are + // special segments". + int width = 0x10000 / partitions; + for (int k = 0; k < partitions; k++) { + int start = k * width; + int end = (k == partitions - 1) ? 0xFFFF : (start + width - 1); + segments.put((long) k, SegmentInfo.activeSpecial( + k, + HashRange.of(start, end), + topicName.getPartition(k).toString(), + /*createdAtEpoch*/ 0L, + createdAtMs)); + } + } + return new ScalableTopicLayoutResponse( + /*epoch*/ 0L, + segments, + /*segmentBrokerAddresses*/ null, + /*segmentBrokerAddressesTls*/ null, + /*controllerBrokerUrl*/ null, + /*controllerBrokerUrlTls*/ null); }); } @@ -142,7 +213,10 @@ public void pushUpdate(ScalableTopicLayoutResponse response) { } ScalableTopicDAG dag = buildDagProto(response); log.info().attr("epoch", response.epoch()).log("Pushing DAG update"); - cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(sessionId, dag)); + // Always report the canonical topic://... identity so clients that looked up + // via persistent://... or short-form know the resolved name. + cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate( + sessionId, resolvedTopicName, dag)); } private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) { @@ -170,6 +244,10 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) { if (seg.sealedAtMs() >= 0) { segProto.setSealedAtMs(seg.sealedAtMs()); } + // Special segments wrap an existing persistent://... topic. + if (seg.underlyingTopicName() != null) { + segProto.setUnderlyingTopicName(seg.underlyingTopicName()); + } } // Add broker addresses for active segments diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java index a14ef6d5780ff..5652ff2094956 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.scalable.HashRange; import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.common.scalable.SegmentState; @@ -104,6 +105,8 @@ public void testCloseIsIdempotent() { @Test public void testStartFailsWhenTopicMetadataMissing() { + // topic://... input + no scalable metadata = TopicNotFound. Synthetic layouts + // are only produced for persistent://... input (regular topics). when(resources.getScalableTopicMetadataAsync(TOPIC, true)) .thenReturn(CompletableFuture.completedFuture(Optional.empty())); @@ -124,6 +127,100 @@ public void testStartFailsWhenTopicMetadataMissing() { } } + // --- synthetic layout for not-yet-migrated regular topics --- + + @Test + public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() throws Exception { + // persistent:// input + no scalable metadata + non-partitioned regular topic + // (partitions=0) → synthetic layout with a single active special segment + // covering [0x0000, 0xFFFF] that wraps the existing persistent:// topic. + TopicName regular = TopicName.get("persistent://tenant/ns/my-regular"); + String regularPath = "/admin/scalable-topics/tenant/ns/my-regular"; + when(resources.topicPath(regular)).thenReturn(regularPath); + when(resources.getScalableTopicMetadataAsync(regular, true)) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(brokerService.fetchPartitionedTopicMetadataAsync(regular)) + .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); + + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + ScalableTopicLayoutResponse response = s.start().get(); + + assertEquals(response.epoch(), 0L); + assertEquals(response.segments().size(), 1); + SegmentInfo seg = response.segments().get(0L); + assertNotNull(seg); + assertEquals(seg.segmentId(), 0L); + assertEquals(seg.hashRange().start(), 0x0000); + assertEquals(seg.hashRange().end(), 0xFFFF); + assertTrue(seg.isActive()); + assertTrue(seg.isSpecial(), "non-partitioned regular topic must wrap as a special segment"); + assertEquals(seg.underlyingTopicName(), "persistent://tenant/ns/my-regular"); + } + + @Test + public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws Exception { + // persistent:// input + no scalable metadata + 4-partition topic → + // synthetic layout with 4 active special segments wrapping each + // persistent://...-partition-K and equal-width contiguous hash ranges. + TopicName regular = TopicName.get("persistent://tenant/ns/my-partitioned"); + String regularPath = "/admin/scalable-topics/tenant/ns/my-partitioned"; + when(resources.topicPath(regular)).thenReturn(regularPath); + when(resources.getScalableTopicMetadataAsync(regular, true)) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(brokerService.fetchPartitionedTopicMetadataAsync(regular)) + .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(4))); + + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + ScalableTopicLayoutResponse response = s.start().get(); + + assertEquals(response.epoch(), 0L); + assertEquals(response.segments().size(), 4); + for (int k = 0; k < 4; k++) { + SegmentInfo seg = response.segments().get((long) k); + assertNotNull(seg, "missing segment for partition " + k); + assertEquals(seg.segmentId(), k); + assertTrue(seg.isActive()); + assertTrue(seg.isSpecial(), "partition " + k + " must wrap as a special segment"); + assertEquals(seg.underlyingTopicName(), + "persistent://tenant/ns/my-partitioned-partition-" + k); + } + // Hash ranges cover [0x0000, 0xFFFF] contiguously and end at 0xFFFF inclusive + // on the last segment. + assertEquals(response.segments().get(0L).hashRange().start(), 0x0000); + assertEquals(response.segments().get(3L).hashRange().end(), 0xFFFF); + // No gaps between consecutive segments. + for (int k = 0; k < 3; k++) { + int endK = response.segments().get((long) k).hashRange().end(); + int startK1 = response.segments().get((long) (k + 1)).hashRange().start(); + assertEquals(startK1, endK + 1, "gap between partition " + k + " and " + (k + 1)); + } + } + + @Test + public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() { + // The synthetic-layout response goes through pushUpdate, which always emits the + // canonical topic://... identity in resolved_topic_name regardless of input form. + TopicName regular = TopicName.get("persistent://tenant/ns/my-regular"); + ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse( + 0L, + Map.of(0L, SegmentInfo.activeSpecial(0L, HashRange.of(0x0000, 0xFFFF), + "persistent://tenant/ns/my-regular", 0L, 12345L)), + null, null, null, null); + + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + s.pushUpdate(response); + + ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); + verify(ctx).writeAndFlush(captor.capture()); + BaseCommand cmd = parseFrame(captor.getValue()); + assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(), + "topic://tenant/ns/my-regular"); + // The special-segment marker round-trips through the wire format. + var seg = cmd.getScalableTopicUpdate().getDag().getSegmentAt(0); + assertTrue(seg.hasUnderlyingTopicName()); + assertEquals(seg.getUnderlyingTopicName(), "persistent://tenant/ns/my-regular"); + } + @Test public void testStartRegistersWithResources() { // start() routes through the resources-level fan-out instead of registering @@ -314,7 +411,8 @@ private static SegmentInfo seg(long id, int start, int end, SegmentState state, createdAt, sealedAt, createdAtMs, - sealedAtMs); + sealedAtMs, + null); } private static java.util.List toList(long[] arr) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 183e66b68d95e..db274bc8712c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -539,19 +539,19 @@ public void testSeekSubscriptionDispatchesPerSegmentByTimestamp() throws Excepti org.apache.pulsar.common.scalable.SegmentState.SEALED, java.util.List.of(), java.util.List.of(3L), /*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1, - /*createdAtMs*/ t0, /*sealedAtMs*/ t1); + /*createdAtMs*/ t0, /*sealedAtMs*/ t1, null); org.apache.pulsar.common.scalable.SegmentInfo seg1 = new org.apache.pulsar.common.scalable.SegmentInfo( 1L, org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF), org.apache.pulsar.common.scalable.SegmentState.ACTIVE, java.util.List.of(), java.util.List.of(), - 0, -1, t1, -1); + 0, -1, t1, -1, null); org.apache.pulsar.common.scalable.SegmentInfo seg2 = new org.apache.pulsar.common.scalable.SegmentInfo( 2L, org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF), org.apache.pulsar.common.scalable.SegmentState.ACTIVE, java.util.List.of(), java.util.List.of(), - 0, -1, t3, -1); + 0, -1, t3, -1, null); TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic"); ScalableTopicMetadata md = ScalableTopicMetadata.builder() diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 52e9a60ad9bf8..a37c805fefe04 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -300,6 +300,19 @@ public TopicName getPartition(int index) { return get(partitionName); } + /** + * Returns this topic re-expressed in the scalable {@code topic://...} domain, + * regardless of the original input domain. Used to derive the canonical + * scalable-topic identity for a name the user may have spelled as + * {@code persistent://...} or short-form. + */ + public TopicName toScalableTopic() { + if (domain == TopicDomain.topic) { + return this; + } + return get(TopicDomain.topic.value() + "://" + getNamespace() + "/" + getEncodedLocalName()); + } + /** * @return partition index of the completeTopicName. * It returns -1 if the completeTopicName (topic) is not partitioned. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 2c0c74fb378b8..c1b367e5feb69 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1700,10 +1700,12 @@ public static ByteBuf newScalableTopicClose(long sessionId) { return serializeWithSize(cmd); } - public static ByteBuf newScalableTopicUpdate(long sessionId, ScalableTopicDAG dag) { + public static ByteBuf newScalableTopicUpdate(long sessionId, String resolvedTopicName, + ScalableTopicDAG dag) { BaseCommand cmd = new BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE); CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate() - .setSessionId(sessionId); + .setSessionId(sessionId) + .setResolvedTopicName(resolvedTopicName); update.setDag().copyFrom(dag); return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java index 848326dd026f4..5bda99e1385b4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java @@ -35,15 +35,23 @@ * Used for retention-based segment GC and for timestamp-based seek. * * - * @param segmentId monotonically increasing, unique within the topic - * @param hashRange inclusive hash range [start, end] - * @param state ACTIVE or SEALED - * @param parentIds parent segment IDs in the DAG (empty for initial/root segments) - * @param childIds child segment IDs in the DAG (empty for active leaf segments) - * @param createdAtEpoch DAG epoch when this segment was created - * @param sealedAtEpoch DAG epoch when sealed (-1 if still active) - * @param createdAtMs wall-clock millis at creation time - * @param sealedAtMs wall-clock millis at seal time (-1 if still active) + *

A segment may be a special segment — one that wraps an existing + * {@code persistent://...} topic rather than having its own {@code segment://...} URI. + * Special segments appear in the synthetic-layout response returned for a regular + * (partitioned or non-partitioned) topic that has not yet been migrated to a scalable + * topic. {@code underlyingTopicName} is non-null exactly for special segments. + * + * @param segmentId monotonically increasing, unique within the topic + * @param hashRange inclusive hash range [start, end] + * @param state ACTIVE or SEALED + * @param parentIds parent segment IDs in the DAG (empty for initial/root segments) + * @param childIds child segment IDs in the DAG (empty for active leaf segments) + * @param createdAtEpoch DAG epoch when this segment was created + * @param sealedAtEpoch DAG epoch when sealed (-1 if still active) + * @param createdAtMs wall-clock millis at creation time + * @param sealedAtMs wall-clock millis at seal time (-1 if still active) + * @param underlyingTopicName for special segments: the underlying persistent://... topic + * name. {@code null} for regular segments. */ public record SegmentInfo( long segmentId, @@ -54,7 +62,8 @@ public record SegmentInfo( long createdAtEpoch, long sealedAtEpoch, long createdAtMs, - long sealedAtMs + long sealedAtMs, + String underlyingTopicName ) { public SegmentInfo { parentIds = parentIds != null ? List.copyOf(parentIds) : List.of(); @@ -65,32 +74,55 @@ public record SegmentInfo( public static SegmentInfo active(long segmentId, HashRange hashRange, long createdAtEpoch, long createdAtMs) { return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE, - List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1); + List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, null); } /** Create a new active segment with the given parent IDs. */ public static SegmentInfo active(long segmentId, HashRange hashRange, List parentIds, long createdAtEpoch, long createdAtMs) { return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE, - parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1); + parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1, null); + } + + /** + * Create a new active special segment that wraps the given {@code persistent://...} + * topic instead of having its own {@code segment://...} URI. Used by the + * synthetic-layout response for not-yet-migrated regular topics. + */ + public static SegmentInfo activeSpecial(long segmentId, HashRange hashRange, + String underlyingTopicName, + long createdAtEpoch, long createdAtMs) { + return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE, + List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, underlyingTopicName); } /** Return a sealed copy of this segment with the given child IDs. */ public SegmentInfo sealed(long sealedAtEpoch, long sealedAtMs, List childIds) { return new SegmentInfo(segmentId, hashRange, SegmentState.SEALED, - parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs); + parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, + underlyingTopicName); } /** Return a copy with different parent IDs. */ public SegmentInfo withParentIds(List parentIds) { return new SegmentInfo(segmentId, hashRange, state, - parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs); + parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, + underlyingTopicName); } /** Return a copy with different child IDs. */ public SegmentInfo withChildIds(List childIds) { return new SegmentInfo(segmentId, hashRange, state, - parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs); + parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, + underlyingTopicName); + } + + /** + * True if this is a special segment — one that wraps an existing + * {@code persistent://...} topic rather than owning a {@code segment://...} URI. + */ + public boolean isSpecial() { + return underlyingTopicName != null; } public boolean isActive() { diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index b2a698d33da0f..50873ef244eff 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -855,6 +855,14 @@ message SegmentInfoProto { // and timestamp-based seek; epoch above is a DAG generation number, not a clock. required uint64 created_at_ms = 9; optional uint64 sealed_at_ms = 10; + + // Special segment marker. When set, this segment does not have its own + // segment://... topic — it wraps the named persistent://... topic instead. + // Used by the synthetic-layout response the broker returns for a regular + // (partitioned or non-partitioned) topic that has not yet been migrated to + // a scalable topic. When absent, the segment URI is computed normally from + // the scalable topic name and segment_id (segment:///). + optional string underlying_topic_name = 11; } message SegmentBrokerAddress { @@ -874,7 +882,11 @@ message ScalableTopicDAG { // Client -> Broker: Request scalable topic metadata and initiate watch session message CommandScalableTopicLookup { required uint64 session_id = 1; // Client-assigned session ID - required string topic = 2; // e.g. "topic://tenant/ns/my-topic" + // Any of "topic://t/n/x", "persistent://t/n/x", or a short form like + // "my-topic" (normalized by the broker to persistent://public/default/my-topic). + // The broker resolves the input to the canonical topic://... identity and + // returns it in CommandScalableTopicUpdate.resolved_topic_name. + required string topic = 2; } // Broker -> Client: Used for BOTH initial response and subsequent pushed updates @@ -884,6 +896,12 @@ message CommandScalableTopicUpdate { optional ServerError error = 3; optional string message = 4; + + // Canonical scalable-topic identity (always "topic://t/n/x") that the client + // should use for downstream operations. Set on every success response, + // including for inputs that were given as persistent://... or short-form. + // Absent on error responses. + optional string resolved_topic_name = 5; } // Client -> Broker: Close the DAG watch session diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java index c2c9267636284..2c7c5abc96e97 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java @@ -101,12 +101,13 @@ public void testNewScalableTopicUpdate() { .addParentId(0L); dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650"); - ByteBuf frame = Commands.newScalableTopicUpdate(77L, dag); + ByteBuf frame = Commands.newScalableTopicUpdate(77L, "topic://t/n/x", dag); BaseCommand cmd = parseFrame(frame); assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE); assertTrue(cmd.hasScalableTopicUpdate()); assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 77L); + assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(), "topic://t/n/x"); assertFalse(cmd.getScalableTopicUpdate().hasError(), "successful update must not carry an error field"); From 74fab132937b1f01f67966491245023b9211b52f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 20 May 2026 21:36:19 +0100 Subject: [PATCH 2/2] [improve][broker] PIP-475: address review feedback on synthetic-layout lookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename + bug fixes from the PR #25822 review. Rename "special segment" → "legacy segment": * The concept was vague. A legacy segment is one not managed by the scalable-topic controller — it wraps an existing, externally managed persistent:// topic. Renamed the proto field (underlying_topic_name → legacy_topic_name), SegmentInfo (underlyingTopicName → legacyTopicName, activeSpecial → activeLegacy, isSpecial → isLegacy), and all comments. Bug fixes: * TopicName.toScalableTopic() now strips any -partition-K suffix, so persistent://t/n/x-partition-3 resolves to topic://t/n/x rather than topic://t/n/x-partition-3. * DagWatchSession.start() rejects an individual-partition lookup target up front (the synthetic layout models the whole partitioned topic; wrapping one partition would otherwise produce nonsensical -partition-K-partition-J names). * Synthetic partitioned layout no longer throws when partitions exceeds the 16-bit hash space (width would be 0 → HashRange invariant violation). Ranges are cosmetic for synthetic layouts (routing is mod-N over segment_id), so a degenerate clamped slot is used past 65536 partitions. * SegmentInfo.isLegacy() rejects an empty legacyTopicName, not just null. * Commands.newScalableTopicUpdate guards a null resolvedTopicName (the field is optional; the lightproto setter would otherwise NPE). Docs/tests: * Documented that a synthetic layout is built even for a non-existent topic — existence/auto-create is decided downstream by the namespace policy when the producer/consumer attaches, matching v4. * Documented that synthetic hash ranges are cosmetic (mod-N routing). * Added TopicNameTest.testToScalableTopic (topic://, persistent://, short-form, and -partition-K stripping — also exercises short-form input per the review). * Added DagWatchSessionTest.testStartRejectsIndividualPartitionInput. * Added CommandsScalableTopicTest coverage for the null-resolved-name guard. Confirmed non-issue: V1 names (with a cluster component) are rejected by TopicName.get() before toScalableTopic() can see them. --- .../service/scalable/DagWatchSession.java | 67 ++++++++++++++----- .../service/scalable/DagWatchSessionTest.java | 41 +++++++++--- .../pulsar/common/naming/TopicName.java | 8 ++- .../pulsar/common/protocol/Commands.java | 8 ++- .../pulsar/common/scalable/SegmentInfo.java | 47 +++++++------ pulsar-common/src/main/proto/PulsarApi.proto | 14 ++-- .../pulsar/common/naming/TopicNameTest.java | 22 ++++++ .../protocol/CommandsScalableTopicTest.java | 21 ++++++ 8 files changed, 169 insertions(+), 59 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java index 7e02808972169..350125438e7cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java @@ -98,7 +98,7 @@ public String getMetadataPath() { * topic doesn't exist). *

  • {@code persistent://...} input → build a synthetic layout that wraps the * existing regular (partitioned or non-partitioned) topic as one or more - * special segments, so V5 clients can operate against the regular topic + * legacy segments, so V5 clients can operate against the regular topic * through the scalable surface until the operator migrates it.
  • * * The metadata-store watch is registered regardless, so a subsequent migration @@ -106,6 +106,16 @@ public String getMetadataPath() { * synthetic layout will be transparently replaced with the real DAG. */ public CompletableFuture start() { + // A specific partition (persistent://t/n/x-partition-K) is not a valid + // scalable-topic lookup target — the synthetic layout models the whole + // partitioned topic. Reject it up front rather than producing a layout that + // wraps nonsensical -partition-K-partition-J names. + if (topicName.isPartitioned()) { + return CompletableFuture.failedFuture(new IllegalArgumentException( + "Cannot open a scalable-topic lookup for an individual partition: " + topicName + + "; use the base topic name " + topicName.getPartitionedTopicName())); + } + // Register through the resources-level fan-out so close() can deregister us // and we don't accumulate stale store-level listeners over time. resources.registerPathListener(this); @@ -125,10 +135,17 @@ public CompletableFuture start() { /** * Build a synthetic layout for a not-yet-migrated regular topic. Each partition - * (or the whole topic, for non-partitioned) becomes an active special segment + * (or the whole topic, for non-partitioned) becomes an active legacy segment * that wraps the existing {@code persistent://...} topic. The synthetic layout - * uses mod-N routing (signalled by all active leaves being special segments) + * uses mod-N routing (signalled by all active leaves being legacy segments) * so V5 producers route the same way v4 producers do. + * + *

    Note: {@code fetchPartitionedTopicMetadataAsync} returns {@code partitions == 0} + * for both an existing non-partitioned topic and a topic that does not exist at all. + * In the latter case a single-segment synthetic layout is still returned; whether the + * underlying {@code persistent://...} topic gets auto-created is then decided by the + * namespace's auto-creation policy when the V5 producer/consumer first attaches — + * exactly the behaviour a v4 client would see. */ private CompletableFuture buildSyntheticResponse() { return brokerService.fetchPartitionedTopicMetadataAsync(topicName) @@ -137,27 +154,25 @@ private CompletableFuture buildSyntheticResponse() long createdAtMs = System.currentTimeMillis(); Map segments = new LinkedHashMap<>(); if (partitions <= 0) { - // Non-partitioned: one special segment covering the full hash range. + // Non-partitioned: one legacy segment covering the full hash range. // We're only called when topicName.getDomain() == persistent, so // toString() is the canonical persistent://t/n/x form. - segments.put(0L, SegmentInfo.activeSpecial( + segments.put(0L, SegmentInfo.activeLegacy( 0L, HashRange.of(0x0000, 0xFFFF), topicName.toString(), /*createdAtEpoch*/ 0L, createdAtMs)); } else { - // Partitioned: N special segments with equal-width contiguous ranges. - // The ranges are cosmetic — routing for synthetic layouts is mod-N over - // segment_id, derived implicitly by the SDK from "all active leaves are - // special segments". - int width = 0x10000 / partitions; + // Partitioned: N legacy segments. The hash ranges here are cosmetic — + // routing for synthetic layouts is mod-N over segment_id (see + // SegmentRouter on the SDK side), so the ranges are never consulted + // for routing. They're assigned for stable, human-readable layouts. for (int k = 0; k < partitions; k++) { - int start = k * width; - int end = (k == partitions - 1) ? 0xFFFF : (start + width - 1); - segments.put((long) k, SegmentInfo.activeSpecial( + HashRange range = syntheticRange(k, partitions); + segments.put((long) k, SegmentInfo.activeLegacy( k, - HashRange.of(start, end), + range, topicName.getPartition(k).toString(), /*createdAtEpoch*/ 0L, createdAtMs)); @@ -173,6 +188,24 @@ private CompletableFuture buildSyntheticResponse() }); } + /** + * Cosmetic hash range for partition {@code k} of {@code n} in a synthetic layout. + * When {@code n <= 0x10000} the ranges tile {@code [0x0000, 0xFFFF]} contiguously; + * beyond that there are more partitions than hash slots, so a degenerate single-slot + * range (clamped to the hash space) is used. Either way the value is never consulted + * for routing — synthetic layouts route mod-N over segment_id. + */ + private static HashRange syntheticRange(int k, int n) { + if (n > 0x10000) { + int slot = Math.min(k, 0xFFFF); + return HashRange.of(slot, slot); + } + int width = 0x10000 / n; + int start = k * width; + int end = (k == n - 1) ? 0xFFFF : (start + width - 1); + return HashRange.of(start, end); + } + /** * Invoked by the {@link ScalableTopicResources} fan-out for every metadata event * matching this session's topic path. The registry already path-filtered for us; @@ -244,9 +277,9 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) { if (seg.sealedAtMs() >= 0) { segProto.setSealedAtMs(seg.sealedAtMs()); } - // Special segments wrap an existing persistent://... topic. - if (seg.underlyingTopicName() != null) { - segProto.setUnderlyingTopicName(seg.underlyingTopicName()); + // Legacy segments wrap an existing, externally managed persistent://... topic. + if (seg.legacyTopicName() != null) { + segProto.setLegacyTopicName(seg.legacyTopicName()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java index 5652ff2094956..849061e0520ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java @@ -132,7 +132,7 @@ public void testStartFailsWhenTopicMetadataMissing() { @Test public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() throws Exception { // persistent:// input + no scalable metadata + non-partitioned regular topic - // (partitions=0) → synthetic layout with a single active special segment + // (partitions=0) → synthetic layout with a single active legacy segment // covering [0x0000, 0xFFFF] that wraps the existing persistent:// topic. TopicName regular = TopicName.get("persistent://tenant/ns/my-regular"); String regularPath = "/admin/scalable-topics/tenant/ns/my-regular"; @@ -153,14 +153,14 @@ public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() thr assertEquals(seg.hashRange().start(), 0x0000); assertEquals(seg.hashRange().end(), 0xFFFF); assertTrue(seg.isActive()); - assertTrue(seg.isSpecial(), "non-partitioned regular topic must wrap as a special segment"); - assertEquals(seg.underlyingTopicName(), "persistent://tenant/ns/my-regular"); + assertTrue(seg.isLegacy(), "non-partitioned regular topic must wrap as a legacy segment"); + assertEquals(seg.legacyTopicName(), "persistent://tenant/ns/my-regular"); } @Test public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws Exception { // persistent:// input + no scalable metadata + 4-partition topic → - // synthetic layout with 4 active special segments wrapping each + // synthetic layout with 4 active legacy segments wrapping each // persistent://...-partition-K and equal-width contiguous hash ranges. TopicName regular = TopicName.get("persistent://tenant/ns/my-partitioned"); String regularPath = "/admin/scalable-topics/tenant/ns/my-partitioned"; @@ -180,8 +180,8 @@ public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws assertNotNull(seg, "missing segment for partition " + k); assertEquals(seg.segmentId(), k); assertTrue(seg.isActive()); - assertTrue(seg.isSpecial(), "partition " + k + " must wrap as a special segment"); - assertEquals(seg.underlyingTopicName(), + assertTrue(seg.isLegacy(), "partition " + k + " must wrap as a legacy segment"); + assertEquals(seg.legacyTopicName(), "persistent://tenant/ns/my-partitioned-partition-" + k); } // Hash ranges cover [0x0000, 0xFFFF] contiguously and end at 0xFFFF inclusive @@ -196,6 +196,27 @@ public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws } } + @Test + public void testStartRejectsIndividualPartitionInput() throws Exception { + // A specific partition name must be rejected — the synthetic layout models the + // whole partitioned topic, and wrapping a single partition would otherwise + // produce nonsensical -partition-K-partition-J underlying names. + TopicName partition = TopicName.get("persistent://tenant/ns/my-partitioned-partition-3"); + + DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, resources, brokerService); + CompletableFuture future = s.start(); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail("expected failure"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException, "got: " + e.getCause()); + assertTrue(e.getCause().getMessage().contains("individual partition"), + e.getCause().getMessage()); + } + } + @Test public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() { // The synthetic-layout response goes through pushUpdate, which always emits the @@ -203,7 +224,7 @@ public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() { TopicName regular = TopicName.get("persistent://tenant/ns/my-regular"); ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse( 0L, - Map.of(0L, SegmentInfo.activeSpecial(0L, HashRange.of(0x0000, 0xFFFF), + Map.of(0L, SegmentInfo.activeLegacy(0L, HashRange.of(0x0000, 0xFFFF), "persistent://tenant/ns/my-regular", 0L, 12345L)), null, null, null, null); @@ -215,10 +236,10 @@ public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() { BaseCommand cmd = parseFrame(captor.getValue()); assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(), "topic://tenant/ns/my-regular"); - // The special-segment marker round-trips through the wire format. + // The legacy-segment marker round-trips through the wire format. var seg = cmd.getScalableTopicUpdate().getDag().getSegmentAt(0); - assertTrue(seg.hasUnderlyingTopicName()); - assertEquals(seg.getUnderlyingTopicName(), "persistent://tenant/ns/my-regular"); + assertTrue(seg.hasLegacyTopicName()); + assertEquals(seg.getLegacyTopicName(), "persistent://tenant/ns/my-regular"); } @Test diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index a37c805fefe04..ea70d11a0ac7d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -305,12 +305,18 @@ public TopicName getPartition(int index) { * regardless of the original input domain. Used to derive the canonical * scalable-topic identity for a name the user may have spelled as * {@code persistent://...} or short-form. + * + *

    Any {@code -partition-K} suffix is stripped, so a partition name like + * {@code persistent://t/n/x-partition-3} resolves to the base topic's scalable + * identity {@code topic://t/n/x}, not {@code topic://t/n/x-partition-3}. */ public TopicName toScalableTopic() { if (domain == TopicDomain.topic) { return this; } - return get(TopicDomain.topic.value() + "://" + getNamespace() + "/" + getEncodedLocalName()); + TopicName base = isPartitioned() ? get(getPartitionedTopicName()) : this; + return get(TopicDomain.topic.value() + "://" + base.getNamespace() + + "/" + base.getEncodedLocalName()); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index c1b367e5feb69..e6cb2605315ad 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1704,8 +1704,12 @@ public static ByteBuf newScalableTopicUpdate(long sessionId, String resolvedTopi ScalableTopicDAG dag) { BaseCommand cmd = new BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE); CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate() - .setSessionId(sessionId) - .setResolvedTopicName(resolvedTopicName); + .setSessionId(sessionId); + // resolved_topic_name is optional on the wire; guard against null because the + // lightproto setter would NPE rather than leave the field unset. + if (resolvedTopicName != null) { + update.setResolvedTopicName(resolvedTopicName); + } update.setDag().copyFrom(dag); return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java index 5bda99e1385b4..0b727b23e1805 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java @@ -35,11 +35,12 @@ * Used for retention-based segment GC and for timestamp-based seek. * * - *

    A segment may be a special segment — one that wraps an existing - * {@code persistent://...} topic rather than having its own {@code segment://...} URI. - * Special segments appear in the synthetic-layout response returned for a regular - * (partitioned or non-partitioned) topic that has not yet been migrated to a scalable - * topic. {@code underlyingTopicName} is non-null exactly for special segments. + *

    A segment may be a legacy segment — one that is not managed by the + * scalable-topic controller and has no {@code segment://...} URI of its own; instead it + * wraps an existing, externally managed {@code persistent://...} topic. Legacy segments + * appear in the synthetic-layout response returned for a regular (partitioned or + * non-partitioned) topic that has not yet been migrated to a scalable topic. + * {@code legacyTopicName} is non-null exactly for legacy segments. * * @param segmentId monotonically increasing, unique within the topic * @param hashRange inclusive hash range [start, end] @@ -50,8 +51,9 @@ * @param sealedAtEpoch DAG epoch when sealed (-1 if still active) * @param createdAtMs wall-clock millis at creation time * @param sealedAtMs wall-clock millis at seal time (-1 if still active) - * @param underlyingTopicName for special segments: the underlying persistent://... topic - * name. {@code null} for regular segments. + * @param legacyTopicName for legacy segments: the externally managed + * {@code persistent://...} topic this segment wraps. + * {@code null} for regular controller-managed segments. */ public record SegmentInfo( long segmentId, @@ -63,7 +65,7 @@ public record SegmentInfo( long sealedAtEpoch, long createdAtMs, long sealedAtMs, - String underlyingTopicName + String legacyTopicName ) { public SegmentInfo { parentIds = parentIds != null ? List.copyOf(parentIds) : List.of(); @@ -85,44 +87,45 @@ public static SegmentInfo active(long segmentId, HashRange hashRange, } /** - * Create a new active special segment that wraps the given {@code persistent://...} - * topic instead of having its own {@code segment://...} URI. Used by the - * synthetic-layout response for not-yet-migrated regular topics. + * Create a new active legacy segment that wraps the given externally managed + * {@code persistent://...} topic instead of having its own {@code segment://...} URI. + * Used by the synthetic-layout response for not-yet-migrated regular topics. */ - public static SegmentInfo activeSpecial(long segmentId, HashRange hashRange, - String underlyingTopicName, - long createdAtEpoch, long createdAtMs) { + public static SegmentInfo activeLegacy(long segmentId, HashRange hashRange, + String legacyTopicName, + long createdAtEpoch, long createdAtMs) { return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE, - List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, underlyingTopicName); + List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, legacyTopicName); } /** Return a sealed copy of this segment with the given child IDs. */ public SegmentInfo sealed(long sealedAtEpoch, long sealedAtMs, List childIds) { return new SegmentInfo(segmentId, hashRange, SegmentState.SEALED, parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, - underlyingTopicName); + legacyTopicName); } /** Return a copy with different parent IDs. */ public SegmentInfo withParentIds(List parentIds) { return new SegmentInfo(segmentId, hashRange, state, parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, - underlyingTopicName); + legacyTopicName); } /** Return a copy with different child IDs. */ public SegmentInfo withChildIds(List childIds) { return new SegmentInfo(segmentId, hashRange, state, parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, - underlyingTopicName); + legacyTopicName); } /** - * True if this is a special segment — one that wraps an existing - * {@code persistent://...} topic rather than owning a {@code segment://...} URI. + * True if this is a legacy segment — one that wraps an existing, externally managed + * {@code persistent://...} topic rather than owning a controller-managed + * {@code segment://...} URI. An empty {@code legacyTopicName} does not count as legacy. */ - public boolean isSpecial() { - return underlyingTopicName != null; + public boolean isLegacy() { + return legacyTopicName != null && !legacyTopicName.isEmpty(); } public boolean isActive() { diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 50873ef244eff..28501c8bc6637 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -856,13 +856,13 @@ message SegmentInfoProto { required uint64 created_at_ms = 9; optional uint64 sealed_at_ms = 10; - // Special segment marker. When set, this segment does not have its own - // segment://... topic — it wraps the named persistent://... topic instead. - // Used by the synthetic-layout response the broker returns for a regular - // (partitioned or non-partitioned) topic that has not yet been migrated to - // a scalable topic. When absent, the segment URI is computed normally from - // the scalable topic name and segment_id (segment:///). - optional string underlying_topic_name = 11; + // Legacy-segment marker. When set, this segment is not managed by the scalable-topic + // controller and has no segment://... topic of its own — it wraps the named, externally + // managed persistent://... topic instead. Used by the synthetic-layout response the + // broker returns for a regular (partitioned or non-partitioned) topic that has not yet + // been migrated to a scalable topic. When absent, the segment URI is computed normally + // from the scalable topic name and segment_id (segment:///). + optional string legacy_topic_name = 11; } message SegmentBrokerAddress { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 412d3696b1eed..eb9254ecf4a14 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -517,4 +517,26 @@ public void testToFullTopicName() { assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("non-persistent://tenant/cluster/ns/topic")); } + + @Test + public void testToScalableTopic() { + // topic://... is returned as-is. + TopicName already = TopicName.get("topic://tenant/ns/x"); + assertEquals(already.toScalableTopic(), already); + + // persistent://... is re-expressed in the topic:// domain. + assertEquals(TopicName.get("persistent://tenant/ns/x").toScalableTopic().toString(), + "topic://tenant/ns/x"); + + // Short forms normalise to persistent://public/default/... first, then to topic://. + assertEquals(TopicName.get("my-topic").toScalableTopic().toString(), + "topic://public/default/my-topic"); + assertEquals(TopicName.get("tenant/ns/my-topic").toScalableTopic().toString(), + "topic://tenant/ns/my-topic"); + + // A -partition-K suffix is stripped: the partition resolves to the base topic's + // scalable identity, not topic://.../x-partition-K. + assertEquals(TopicName.get("persistent://tenant/ns/x-partition-3").toScalableTopic().toString(), + "topic://tenant/ns/x"); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java index 2c7c5abc96e97..a5d028889eea4 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java @@ -133,6 +133,27 @@ public void testNewScalableTopicUpdate() { assertEquals(got.getSegmentBrokerAt(0).getBrokerUrl(), "pulsar://broker-a:6650"); } + @Test + public void testNewScalableTopicUpdateWithNullResolvedTopicNameLeavesFieldUnset() { + // resolved_topic_name is optional on the wire; a null must serialise cleanly with + // the field unset rather than NPE in the lightproto setter. + ScalableTopicDAG dag = new ScalableTopicDAG().setEpoch(1L); + dag.addSegment() + .setSegmentId(0L) + .setHashStart(0x0000) + .setHashEnd(0xFFFF) + .setState(SegmentState.ACTIVE) + .setCreatedAtEpoch(0L) + .setCreatedAtMs(System.currentTimeMillis()); + + ByteBuf frame = Commands.newScalableTopicUpdate(5L, null, dag); + BaseCommand cmd = parseFrame(frame); + + assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 5L); + assertFalse(cmd.getScalableTopicUpdate().hasResolvedTopicName(), + "null resolvedTopicName must leave the optional field unset"); + } + @Test public void testNewScalableTopicError() { ByteBuf frame = Commands.newScalableTopicError(15L, ServerError.TopicNotFound,