diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 36bf129f16043..fa7e919880f3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -165,6 +165,7 @@ import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -787,6 +788,14 @@ protected void handleCommandScalableTopicLookup( return; } + // Scalable topics do not support non-persistent storage. Reject early with a + // clear error rather than failing later in segment infrastructure. + if (topicName.getDomain() == TopicDomain.non_persistent) { + ctx.writeAndFlush(Commands.newScalableTopicError(sessionId, ServerError.NotAllowedError, + "Scalable topics do not support non-persistent:// topics")); + return; + } + if (!this.service.getPulsar().isRunning()) { log.warn("ScalableTopicLookup rejected: broker not ready"); ctx.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java index 2cc50301486dd..350125438e7cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java @@ -32,8 +32,10 @@ import org.apache.pulsar.common.api.proto.SegmentBrokerAddress; import org.apache.pulsar.common.api.proto.SegmentInfoProto; import org.apache.pulsar.common.api.proto.SegmentState; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.scalable.HashRange; import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -61,6 +63,9 @@ public class DagWatchSession implements ScalableTopicResources.MetadataPathListe private final BrokerService brokerService; private final String metadataPath; + /** Canonical {@code topic://...} identity returned to the client regardless of the + * input form ({@code topic://}, {@code persistent://}, or short-form). */ + private final String resolvedTopicName; private volatile boolean closed = false; public DagWatchSession(long sessionId, @@ -74,6 +79,7 @@ public DagWatchSession(long sessionId, this.resources = resources; this.brokerService = brokerService; this.metadataPath = resources.topicPath(topicName); + this.resolvedTopicName = topicName.toScalableTopic().toString(); this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build(); } @@ -85,23 +91,121 @@ public String getMetadataPath() { /** * Start the session: load current metadata, set up watch, and return * the initial layout response. + * + *

If no scalable metadata exists at the canonical path: + *

+ * The metadata-store watch is registered regardless, so a subsequent migration + * that writes scalable metadata to the same path will be observed and the + * synthetic layout will be transparently replaced with the real DAG. */ public CompletableFuture start() { + // A specific partition (persistent://t/n/x-partition-K) is not a valid + // scalable-topic lookup target — the synthetic layout models the whole + // partitioned topic. Reject it up front rather than producing a layout that + // wraps nonsensical -partition-K-partition-J names. + if (topicName.isPartitioned()) { + return CompletableFuture.failedFuture(new IllegalArgumentException( + "Cannot open a scalable-topic lookup for an individual partition: " + topicName + + "; use the base topic name " + topicName.getPartitionedTopicName())); + } + // Register through the resources-level fan-out so close() can deregister us // and we don't accumulate stale store-level listeners over time. resources.registerPathListener(this); return resources.getScalableTopicMetadataAsync(topicName, true) .thenCompose(optMd -> { - if (optMd.isEmpty()) { - return CompletableFuture.failedFuture( - new IllegalStateException("Scalable topic not found: " + topicName)); + if (optMd.isPresent()) { + return buildResponse(optMd.get()); } - ScalableTopicMetadata metadata = optMd.get(); - return buildResponse(metadata); + if (topicName.getDomain() == TopicDomain.persistent) { + return buildSyntheticResponse(); + } + return CompletableFuture.failedFuture( + new IllegalStateException("Scalable topic not found: " + topicName)); }); } + /** + * Build a synthetic layout for a not-yet-migrated regular topic. Each partition + * (or the whole topic, for non-partitioned) becomes an active legacy segment + * that wraps the existing {@code persistent://...} topic. The synthetic layout + * uses mod-N routing (signalled by all active leaves being legacy segments) + * so V5 producers route the same way v4 producers do. + * + *

Note: {@code fetchPartitionedTopicMetadataAsync} returns {@code partitions == 0} + * for both an existing non-partitioned topic and a topic that does not exist at all. + * In the latter case a single-segment synthetic layout is still returned; whether the + * underlying {@code persistent://...} topic gets auto-created is then decided by the + * namespace's auto-creation policy when the V5 producer/consumer first attaches — + * exactly the behaviour a v4 client would see. + */ + private CompletableFuture buildSyntheticResponse() { + return brokerService.fetchPartitionedTopicMetadataAsync(topicName) + .thenApply(partitionedMd -> { + int partitions = partitionedMd.partitions; + long createdAtMs = System.currentTimeMillis(); + Map segments = new LinkedHashMap<>(); + if (partitions <= 0) { + // Non-partitioned: one legacy segment covering the full hash range. + // We're only called when topicName.getDomain() == persistent, so + // toString() is the canonical persistent://t/n/x form. + segments.put(0L, SegmentInfo.activeLegacy( + 0L, + HashRange.of(0x0000, 0xFFFF), + topicName.toString(), + /*createdAtEpoch*/ 0L, + createdAtMs)); + } else { + // Partitioned: N legacy segments. The hash ranges here are cosmetic — + // routing for synthetic layouts is mod-N over segment_id (see + // SegmentRouter on the SDK side), so the ranges are never consulted + // for routing. They're assigned for stable, human-readable layouts. + for (int k = 0; k < partitions; k++) { + HashRange range = syntheticRange(k, partitions); + segments.put((long) k, SegmentInfo.activeLegacy( + k, + range, + topicName.getPartition(k).toString(), + /*createdAtEpoch*/ 0L, + createdAtMs)); + } + } + return new ScalableTopicLayoutResponse( + /*epoch*/ 0L, + segments, + /*segmentBrokerAddresses*/ null, + /*segmentBrokerAddressesTls*/ null, + /*controllerBrokerUrl*/ null, + /*controllerBrokerUrlTls*/ null); + }); + } + + /** + * Cosmetic hash range for partition {@code k} of {@code n} in a synthetic layout. + * When {@code n <= 0x10000} the ranges tile {@code [0x0000, 0xFFFF]} contiguously; + * beyond that there are more partitions than hash slots, so a degenerate single-slot + * range (clamped to the hash space) is used. Either way the value is never consulted + * for routing — synthetic layouts route mod-N over segment_id. + */ + private static HashRange syntheticRange(int k, int n) { + if (n > 0x10000) { + int slot = Math.min(k, 0xFFFF); + return HashRange.of(slot, slot); + } + int width = 0x10000 / n; + int start = k * width; + int end = (k == n - 1) ? 0xFFFF : (start + width - 1); + return HashRange.of(start, end); + } + /** * Invoked by the {@link ScalableTopicResources} fan-out for every metadata event * matching this session's topic path. The registry already path-filtered for us; @@ -142,7 +246,10 @@ public void pushUpdate(ScalableTopicLayoutResponse response) { } ScalableTopicDAG dag = buildDagProto(response); log.info().attr("epoch", response.epoch()).log("Pushing DAG update"); - cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(sessionId, dag)); + // Always report the canonical topic://... identity so clients that looked up + // via persistent://... or short-form know the resolved name. + cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate( + sessionId, resolvedTopicName, dag)); } private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) { @@ -170,6 +277,10 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) { if (seg.sealedAtMs() >= 0) { segProto.setSealedAtMs(seg.sealedAtMs()); } + // Legacy segments wrap an existing, externally managed persistent://... topic. + if (seg.legacyTopicName() != null) { + segProto.setLegacyTopicName(seg.legacyTopicName()); + } } // Add broker addresses for active segments diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java index a14ef6d5780ff..849061e0520ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.scalable.HashRange; import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.common.scalable.SegmentState; @@ -104,6 +105,8 @@ public void testCloseIsIdempotent() { @Test public void testStartFailsWhenTopicMetadataMissing() { + // topic://... input + no scalable metadata = TopicNotFound. Synthetic layouts + // are only produced for persistent://... input (regular topics). when(resources.getScalableTopicMetadataAsync(TOPIC, true)) .thenReturn(CompletableFuture.completedFuture(Optional.empty())); @@ -124,6 +127,121 @@ public void testStartFailsWhenTopicMetadataMissing() { } } + // --- synthetic layout for not-yet-migrated regular topics --- + + @Test + public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() throws Exception { + // persistent:// input + no scalable metadata + non-partitioned regular topic + // (partitions=0) → synthetic layout with a single active legacy segment + // covering [0x0000, 0xFFFF] that wraps the existing persistent:// topic. + TopicName regular = TopicName.get("persistent://tenant/ns/my-regular"); + String regularPath = "/admin/scalable-topics/tenant/ns/my-regular"; + when(resources.topicPath(regular)).thenReturn(regularPath); + when(resources.getScalableTopicMetadataAsync(regular, true)) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(brokerService.fetchPartitionedTopicMetadataAsync(regular)) + .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); + + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + ScalableTopicLayoutResponse response = s.start().get(); + + assertEquals(response.epoch(), 0L); + assertEquals(response.segments().size(), 1); + SegmentInfo seg = response.segments().get(0L); + assertNotNull(seg); + assertEquals(seg.segmentId(), 0L); + assertEquals(seg.hashRange().start(), 0x0000); + assertEquals(seg.hashRange().end(), 0xFFFF); + assertTrue(seg.isActive()); + assertTrue(seg.isLegacy(), "non-partitioned regular topic must wrap as a legacy segment"); + assertEquals(seg.legacyTopicName(), "persistent://tenant/ns/my-regular"); + } + + @Test + public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws Exception { + // persistent:// input + no scalable metadata + 4-partition topic → + // synthetic layout with 4 active legacy segments wrapping each + // persistent://...-partition-K and equal-width contiguous hash ranges. + TopicName regular = TopicName.get("persistent://tenant/ns/my-partitioned"); + String regularPath = "/admin/scalable-topics/tenant/ns/my-partitioned"; + when(resources.topicPath(regular)).thenReturn(regularPath); + when(resources.getScalableTopicMetadataAsync(regular, true)) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + when(brokerService.fetchPartitionedTopicMetadataAsync(regular)) + .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(4))); + + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + ScalableTopicLayoutResponse response = s.start().get(); + + assertEquals(response.epoch(), 0L); + assertEquals(response.segments().size(), 4); + for (int k = 0; k < 4; k++) { + SegmentInfo seg = response.segments().get((long) k); + assertNotNull(seg, "missing segment for partition " + k); + assertEquals(seg.segmentId(), k); + assertTrue(seg.isActive()); + assertTrue(seg.isLegacy(), "partition " + k + " must wrap as a legacy segment"); + assertEquals(seg.legacyTopicName(), + "persistent://tenant/ns/my-partitioned-partition-" + k); + } + // Hash ranges cover [0x0000, 0xFFFF] contiguously and end at 0xFFFF inclusive + // on the last segment. + assertEquals(response.segments().get(0L).hashRange().start(), 0x0000); + assertEquals(response.segments().get(3L).hashRange().end(), 0xFFFF); + // No gaps between consecutive segments. + for (int k = 0; k < 3; k++) { + int endK = response.segments().get((long) k).hashRange().end(); + int startK1 = response.segments().get((long) (k + 1)).hashRange().start(); + assertEquals(startK1, endK + 1, "gap between partition " + k + " and " + (k + 1)); + } + } + + @Test + public void testStartRejectsIndividualPartitionInput() throws Exception { + // A specific partition name must be rejected — the synthetic layout models the + // whole partitioned topic, and wrapping a single partition would otherwise + // produce nonsensical -partition-K-partition-J underlying names. + TopicName partition = TopicName.get("persistent://tenant/ns/my-partitioned-partition-3"); + + DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, resources, brokerService); + CompletableFuture future = s.start(); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail("expected failure"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException, "got: " + e.getCause()); + assertTrue(e.getCause().getMessage().contains("individual partition"), + e.getCause().getMessage()); + } + } + + @Test + public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() { + // The synthetic-layout response goes through pushUpdate, which always emits the + // canonical topic://... identity in resolved_topic_name regardless of input form. + TopicName regular = TopicName.get("persistent://tenant/ns/my-regular"); + ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse( + 0L, + Map.of(0L, SegmentInfo.activeLegacy(0L, HashRange.of(0x0000, 0xFFFF), + "persistent://tenant/ns/my-regular", 0L, 12345L)), + null, null, null, null); + + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + s.pushUpdate(response); + + ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); + verify(ctx).writeAndFlush(captor.capture()); + BaseCommand cmd = parseFrame(captor.getValue()); + assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(), + "topic://tenant/ns/my-regular"); + // The legacy-segment marker round-trips through the wire format. + var seg = cmd.getScalableTopicUpdate().getDag().getSegmentAt(0); + assertTrue(seg.hasLegacyTopicName()); + assertEquals(seg.getLegacyTopicName(), "persistent://tenant/ns/my-regular"); + } + @Test public void testStartRegistersWithResources() { // start() routes through the resources-level fan-out instead of registering @@ -314,7 +432,8 @@ private static SegmentInfo seg(long id, int start, int end, SegmentState state, createdAt, sealedAt, createdAtMs, - sealedAtMs); + sealedAtMs, + null); } private static java.util.List toList(long[] arr) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 183e66b68d95e..db274bc8712c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -539,19 +539,19 @@ public void testSeekSubscriptionDispatchesPerSegmentByTimestamp() throws Excepti org.apache.pulsar.common.scalable.SegmentState.SEALED, java.util.List.of(), java.util.List.of(3L), /*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1, - /*createdAtMs*/ t0, /*sealedAtMs*/ t1); + /*createdAtMs*/ t0, /*sealedAtMs*/ t1, null); org.apache.pulsar.common.scalable.SegmentInfo seg1 = new org.apache.pulsar.common.scalable.SegmentInfo( 1L, org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF), org.apache.pulsar.common.scalable.SegmentState.ACTIVE, java.util.List.of(), java.util.List.of(), - 0, -1, t1, -1); + 0, -1, t1, -1, null); org.apache.pulsar.common.scalable.SegmentInfo seg2 = new org.apache.pulsar.common.scalable.SegmentInfo( 2L, org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF), org.apache.pulsar.common.scalable.SegmentState.ACTIVE, java.util.List.of(), java.util.List.of(), - 0, -1, t3, -1); + 0, -1, t3, -1, null); TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic"); ScalableTopicMetadata md = ScalableTopicMetadata.builder() diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 52e9a60ad9bf8..ea70d11a0ac7d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -300,6 +300,25 @@ public TopicName getPartition(int index) { return get(partitionName); } + /** + * Returns this topic re-expressed in the scalable {@code topic://...} domain, + * regardless of the original input domain. Used to derive the canonical + * scalable-topic identity for a name the user may have spelled as + * {@code persistent://...} or short-form. + * + *

Any {@code -partition-K} suffix is stripped, so a partition name like + * {@code persistent://t/n/x-partition-3} resolves to the base topic's scalable + * identity {@code topic://t/n/x}, not {@code topic://t/n/x-partition-3}. + */ + public TopicName toScalableTopic() { + if (domain == TopicDomain.topic) { + return this; + } + TopicName base = isPartitioned() ? get(getPartitionedTopicName()) : this; + return get(TopicDomain.topic.value() + "://" + base.getNamespace() + + "/" + base.getEncodedLocalName()); + } + /** * @return partition index of the completeTopicName. * It returns -1 if the completeTopicName (topic) is not partitioned. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 2c0c74fb378b8..e6cb2605315ad 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1700,10 +1700,16 @@ public static ByteBuf newScalableTopicClose(long sessionId) { return serializeWithSize(cmd); } - public static ByteBuf newScalableTopicUpdate(long sessionId, ScalableTopicDAG dag) { + public static ByteBuf newScalableTopicUpdate(long sessionId, String resolvedTopicName, + ScalableTopicDAG dag) { BaseCommand cmd = new BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE); CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate() .setSessionId(sessionId); + // resolved_topic_name is optional on the wire; guard against null because the + // lightproto setter would NPE rather than leave the field unset. + if (resolvedTopicName != null) { + update.setResolvedTopicName(resolvedTopicName); + } update.setDag().copyFrom(dag); return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java index 848326dd026f4..0b727b23e1805 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java @@ -35,15 +35,25 @@ * Used for retention-based segment GC and for timestamp-based seek. * * - * @param segmentId monotonically increasing, unique within the topic - * @param hashRange inclusive hash range [start, end] - * @param state ACTIVE or SEALED - * @param parentIds parent segment IDs in the DAG (empty for initial/root segments) - * @param childIds child segment IDs in the DAG (empty for active leaf segments) - * @param createdAtEpoch DAG epoch when this segment was created - * @param sealedAtEpoch DAG epoch when sealed (-1 if still active) - * @param createdAtMs wall-clock millis at creation time - * @param sealedAtMs wall-clock millis at seal time (-1 if still active) + *

A segment may be a legacy segment — one that is not managed by the + * scalable-topic controller and has no {@code segment://...} URI of its own; instead it + * wraps an existing, externally managed {@code persistent://...} topic. Legacy segments + * appear in the synthetic-layout response returned for a regular (partitioned or + * non-partitioned) topic that has not yet been migrated to a scalable topic. + * {@code legacyTopicName} is non-null exactly for legacy segments. + * + * @param segmentId monotonically increasing, unique within the topic + * @param hashRange inclusive hash range [start, end] + * @param state ACTIVE or SEALED + * @param parentIds parent segment IDs in the DAG (empty for initial/root segments) + * @param childIds child segment IDs in the DAG (empty for active leaf segments) + * @param createdAtEpoch DAG epoch when this segment was created + * @param sealedAtEpoch DAG epoch when sealed (-1 if still active) + * @param createdAtMs wall-clock millis at creation time + * @param sealedAtMs wall-clock millis at seal time (-1 if still active) + * @param legacyTopicName for legacy segments: the externally managed + * {@code persistent://...} topic this segment wraps. + * {@code null} for regular controller-managed segments. */ public record SegmentInfo( long segmentId, @@ -54,7 +64,8 @@ public record SegmentInfo( long createdAtEpoch, long sealedAtEpoch, long createdAtMs, - long sealedAtMs + long sealedAtMs, + String legacyTopicName ) { public SegmentInfo { parentIds = parentIds != null ? List.copyOf(parentIds) : List.of(); @@ -65,32 +76,56 @@ public record SegmentInfo( public static SegmentInfo active(long segmentId, HashRange hashRange, long createdAtEpoch, long createdAtMs) { return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE, - List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1); + List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, null); } /** Create a new active segment with the given parent IDs. */ public static SegmentInfo active(long segmentId, HashRange hashRange, List parentIds, long createdAtEpoch, long createdAtMs) { return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE, - parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1); + parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1, null); + } + + /** + * Create a new active legacy segment that wraps the given externally managed + * {@code persistent://...} topic instead of having its own {@code segment://...} URI. + * Used by the synthetic-layout response for not-yet-migrated regular topics. + */ + public static SegmentInfo activeLegacy(long segmentId, HashRange hashRange, + String legacyTopicName, + long createdAtEpoch, long createdAtMs) { + return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE, + List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1, legacyTopicName); } /** Return a sealed copy of this segment with the given child IDs. */ public SegmentInfo sealed(long sealedAtEpoch, long sealedAtMs, List childIds) { return new SegmentInfo(segmentId, hashRange, SegmentState.SEALED, - parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs); + parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, + legacyTopicName); } /** Return a copy with different parent IDs. */ public SegmentInfo withParentIds(List parentIds) { return new SegmentInfo(segmentId, hashRange, state, - parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs); + parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, + legacyTopicName); } /** Return a copy with different child IDs. */ public SegmentInfo withChildIds(List childIds) { return new SegmentInfo(segmentId, hashRange, state, - parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs); + parentIds, childIds, createdAtEpoch, sealedAtEpoch, createdAtMs, sealedAtMs, + legacyTopicName); + } + + /** + * True if this is a legacy segment — one that wraps an existing, externally managed + * {@code persistent://...} topic rather than owning a controller-managed + * {@code segment://...} URI. An empty {@code legacyTopicName} does not count as legacy. + */ + public boolean isLegacy() { + return legacyTopicName != null && !legacyTopicName.isEmpty(); } public boolean isActive() { diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index b2a698d33da0f..28501c8bc6637 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -855,6 +855,14 @@ message SegmentInfoProto { // and timestamp-based seek; epoch above is a DAG generation number, not a clock. required uint64 created_at_ms = 9; optional uint64 sealed_at_ms = 10; + + // Legacy-segment marker. When set, this segment is not managed by the scalable-topic + // controller and has no segment://... topic of its own — it wraps the named, externally + // managed persistent://... topic instead. Used by the synthetic-layout response the + // broker returns for a regular (partitioned or non-partitioned) topic that has not yet + // been migrated to a scalable topic. When absent, the segment URI is computed normally + // from the scalable topic name and segment_id (segment:///). + optional string legacy_topic_name = 11; } message SegmentBrokerAddress { @@ -874,7 +882,11 @@ message ScalableTopicDAG { // Client -> Broker: Request scalable topic metadata and initiate watch session message CommandScalableTopicLookup { required uint64 session_id = 1; // Client-assigned session ID - required string topic = 2; // e.g. "topic://tenant/ns/my-topic" + // Any of "topic://t/n/x", "persistent://t/n/x", or a short form like + // "my-topic" (normalized by the broker to persistent://public/default/my-topic). + // The broker resolves the input to the canonical topic://... identity and + // returns it in CommandScalableTopicUpdate.resolved_topic_name. + required string topic = 2; } // Broker -> Client: Used for BOTH initial response and subsequent pushed updates @@ -884,6 +896,12 @@ message CommandScalableTopicUpdate { optional ServerError error = 3; optional string message = 4; + + // Canonical scalable-topic identity (always "topic://t/n/x") that the client + // should use for downstream operations. Set on every success response, + // including for inputs that were given as persistent://... or short-form. + // Absent on error responses. + optional string resolved_topic_name = 5; } // Client -> Broker: Close the DAG watch session diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 412d3696b1eed..eb9254ecf4a14 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -517,4 +517,26 @@ public void testToFullTopicName() { assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("non-persistent://tenant/cluster/ns/topic")); } + + @Test + public void testToScalableTopic() { + // topic://... is returned as-is. + TopicName already = TopicName.get("topic://tenant/ns/x"); + assertEquals(already.toScalableTopic(), already); + + // persistent://... is re-expressed in the topic:// domain. + assertEquals(TopicName.get("persistent://tenant/ns/x").toScalableTopic().toString(), + "topic://tenant/ns/x"); + + // Short forms normalise to persistent://public/default/... first, then to topic://. + assertEquals(TopicName.get("my-topic").toScalableTopic().toString(), + "topic://public/default/my-topic"); + assertEquals(TopicName.get("tenant/ns/my-topic").toScalableTopic().toString(), + "topic://tenant/ns/my-topic"); + + // A -partition-K suffix is stripped: the partition resolves to the base topic's + // scalable identity, not topic://.../x-partition-K. + assertEquals(TopicName.get("persistent://tenant/ns/x-partition-3").toScalableTopic().toString(), + "topic://tenant/ns/x"); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java index c2c9267636284..a5d028889eea4 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java @@ -101,12 +101,13 @@ public void testNewScalableTopicUpdate() { .addParentId(0L); dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650"); - ByteBuf frame = Commands.newScalableTopicUpdate(77L, dag); + ByteBuf frame = Commands.newScalableTopicUpdate(77L, "topic://t/n/x", dag); BaseCommand cmd = parseFrame(frame); assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE); assertTrue(cmd.hasScalableTopicUpdate()); assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 77L); + assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(), "topic://t/n/x"); assertFalse(cmd.getScalableTopicUpdate().hasError(), "successful update must not carry an error field"); @@ -132,6 +133,27 @@ public void testNewScalableTopicUpdate() { assertEquals(got.getSegmentBrokerAt(0).getBrokerUrl(), "pulsar://broker-a:6650"); } + @Test + public void testNewScalableTopicUpdateWithNullResolvedTopicNameLeavesFieldUnset() { + // resolved_topic_name is optional on the wire; a null must serialise cleanly with + // the field unset rather than NPE in the lightproto setter. + ScalableTopicDAG dag = new ScalableTopicDAG().setEpoch(1L); + dag.addSegment() + .setSegmentId(0L) + .setHashStart(0x0000) + .setHashEnd(0xFFFF) + .setState(SegmentState.ACTIVE) + .setCreatedAtEpoch(0L) + .setCreatedAtMs(System.currentTimeMillis()); + + ByteBuf frame = Commands.newScalableTopicUpdate(5L, null, dag); + BaseCommand cmd = parseFrame(frame); + + assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 5L); + assertFalse(cmd.getScalableTopicUpdate().hasResolvedTopicName(), + "null resolvedTopicName must leave the optional field unset"); + } + @Test public void testNewScalableTopicError() { ByteBuf frame = Commands.newScalableTopicError(15L, ServerError.TopicNotFound,