Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -787,6 +788,14 @@ protected void handleCommandScalableTopicLookup(
return;
}

// Scalable topics do not support non-persistent storage. Reject early with a
// clear error rather than failing later in segment infrastructure.
if (topicName.getDomain() == TopicDomain.non_persistent) {
ctx.writeAndFlush(Commands.newScalableTopicError(sessionId, ServerError.NotAllowedError,
"Scalable topics do not support non-persistent:// topics"));
return;
}

if (!this.service.getPulsar().isRunning()) {
log.warn("ScalableTopicLookup rejected: broker not ready");
ctx.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.apache.pulsar.common.api.proto.SegmentBrokerAddress;
import org.apache.pulsar.common.api.proto.SegmentInfoProto;
import org.apache.pulsar.common.api.proto.SegmentState;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.scalable.HashRange;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
Expand Down Expand Up @@ -61,6 +63,9 @@ public class DagWatchSession implements ScalableTopicResources.MetadataPathListe
private final BrokerService brokerService;

private final String metadataPath;
/** Canonical {@code topic://...} identity returned to the client regardless of the
* input form ({@code topic://}, {@code persistent://}, or short-form). */
private final String resolvedTopicName;
private volatile boolean closed = false;

public DagWatchSession(long sessionId,
Expand All @@ -74,6 +79,7 @@ public DagWatchSession(long sessionId,
this.resources = resources;
this.brokerService = brokerService;
this.metadataPath = resources.topicPath(topicName);
this.resolvedTopicName = topicName.toScalableTopic().toString();
this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build();
}

Expand All @@ -85,6 +91,19 @@ public String getMetadataPath() {
/**
* Start the session: load current metadata, set up watch, and return
* the initial layout response.
*
* <p>If no scalable metadata exists at the canonical path:
* <ul>
* <li>{@code topic://...} input → fail with {@code TopicNotFound} (the scalable
* topic doesn't exist).</li>
* <li>{@code persistent://...} input → build a synthetic layout that wraps the
* existing regular (partitioned or non-partitioned) topic as one or more
* special segments, so V5 clients can operate against the regular topic
* through the scalable surface until the operator migrates it.</li>
* </ul>
* The metadata-store watch is registered regardless, so a subsequent migration
* that writes scalable metadata to the same path will be observed and the
* synthetic layout will be transparently replaced with the real DAG.
*/
public CompletableFuture<ScalableTopicLayoutResponse> start() {
// Register through the resources-level fan-out so close() can deregister us
Expand All @@ -93,12 +112,64 @@ public CompletableFuture<ScalableTopicLayoutResponse> start() {

return resources.getScalableTopicMetadataAsync(topicName, true)
.thenCompose(optMd -> {
if (optMd.isEmpty()) {
return CompletableFuture.failedFuture(
new IllegalStateException("Scalable topic not found: " + topicName));
if (optMd.isPresent()) {
return buildResponse(optMd.get());
}
ScalableTopicMetadata metadata = optMd.get();
return buildResponse(metadata);
if (topicName.getDomain() == TopicDomain.persistent) {
return buildSyntheticResponse();
}
return CompletableFuture.failedFuture(
new IllegalStateException("Scalable topic not found: " + topicName));
});
}

/**
* Build a synthetic layout for a not-yet-migrated regular topic. Each partition
* (or the whole topic, for non-partitioned) becomes an active special segment
* that wraps the existing {@code persistent://...} topic. The synthetic layout
* uses mod-N routing (signalled by all active leaves being special segments)
* so V5 producers route the same way v4 producers do.
*/
private CompletableFuture<ScalableTopicLayoutResponse> buildSyntheticResponse() {
return brokerService.fetchPartitionedTopicMetadataAsync(topicName)
.thenApply(partitionedMd -> {
int partitions = partitionedMd.partitions;
long createdAtMs = System.currentTimeMillis();
Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
if (partitions <= 0) {
// Non-partitioned: one special segment covering the full hash range.
// We're only called when topicName.getDomain() == persistent, so
// toString() is the canonical persistent://t/n/x form.
segments.put(0L, SegmentInfo.activeSpecial(
0L,
HashRange.of(0x0000, 0xFFFF),
topicName.toString(),
/*createdAtEpoch*/ 0L,
createdAtMs));
} else {
// Partitioned: N special segments with equal-width contiguous ranges.
// The ranges are cosmetic — routing for synthetic layouts is mod-N over
// segment_id, derived implicitly by the SDK from "all active leaves are
// special segments".
int width = 0x10000 / partitions;
for (int k = 0; k < partitions; k++) {
int start = k * width;
int end = (k == partitions - 1) ? 0xFFFF : (start + width - 1);
segments.put((long) k, SegmentInfo.activeSpecial(
k,
HashRange.of(start, end),
topicName.getPartition(k).toString(),
/*createdAtEpoch*/ 0L,
createdAtMs));
}
}
return new ScalableTopicLayoutResponse(
/*epoch*/ 0L,
segments,
/*segmentBrokerAddresses*/ null,
/*segmentBrokerAddressesTls*/ null,
/*controllerBrokerUrl*/ null,
/*controllerBrokerUrlTls*/ null);
});
}

Expand Down Expand Up @@ -142,7 +213,10 @@ public void pushUpdate(ScalableTopicLayoutResponse response) {
}
ScalableTopicDAG dag = buildDagProto(response);
log.info().attr("epoch", response.epoch()).log("Pushing DAG update");
cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(sessionId, dag));
// Always report the canonical topic://... identity so clients that looked up
// via persistent://... or short-form know the resolved name.
cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(
sessionId, resolvedTopicName, dag));
}

private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) {
Expand Down Expand Up @@ -170,6 +244,10 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) {
if (seg.sealedAtMs() >= 0) {
segProto.setSealedAtMs(seg.sealedAtMs());
}
// Special segments wrap an existing persistent://... topic.
if (seg.underlyingTopicName() != null) {
segProto.setUnderlyingTopicName(seg.underlyingTopicName());
}
}

// Add broker addresses for active segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.scalable.HashRange;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.common.scalable.SegmentState;
Expand Down Expand Up @@ -104,6 +105,8 @@ public void testCloseIsIdempotent() {

@Test
public void testStartFailsWhenTopicMetadataMissing() {
// topic://... input + no scalable metadata = TopicNotFound. Synthetic layouts
// are only produced for persistent://... input (regular topics).
when(resources.getScalableTopicMetadataAsync(TOPIC, true))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));

Expand All @@ -124,6 +127,100 @@ public void testStartFailsWhenTopicMetadataMissing() {
}
}

// --- synthetic layout for not-yet-migrated regular topics ---

@Test
public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() throws Exception {
// persistent:// input + no scalable metadata + non-partitioned regular topic
// (partitions=0) → synthetic layout with a single active special segment
// covering [0x0000, 0xFFFF] that wraps the existing persistent:// topic.
TopicName regular = TopicName.get("persistent://tenant/ns/my-regular");
String regularPath = "/admin/scalable-topics/tenant/ns/my-regular";
when(resources.topicPath(regular)).thenReturn(regularPath);
when(resources.getScalableTopicMetadataAsync(regular, true))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
.thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));

DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
ScalableTopicLayoutResponse response = s.start().get();

assertEquals(response.epoch(), 0L);
assertEquals(response.segments().size(), 1);
SegmentInfo seg = response.segments().get(0L);
assertNotNull(seg);
assertEquals(seg.segmentId(), 0L);
assertEquals(seg.hashRange().start(), 0x0000);
assertEquals(seg.hashRange().end(), 0xFFFF);
assertTrue(seg.isActive());
assertTrue(seg.isSpecial(), "non-partitioned regular topic must wrap as a special segment");
assertEquals(seg.underlyingTopicName(), "persistent://tenant/ns/my-regular");
}

@Test
public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws Exception {
// persistent:// input + no scalable metadata + 4-partition topic →
// synthetic layout with 4 active special segments wrapping each
// persistent://...-partition-K and equal-width contiguous hash ranges.
TopicName regular = TopicName.get("persistent://tenant/ns/my-partitioned");
String regularPath = "/admin/scalable-topics/tenant/ns/my-partitioned";
when(resources.topicPath(regular)).thenReturn(regularPath);
when(resources.getScalableTopicMetadataAsync(regular, true))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
.thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(4)));

DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
ScalableTopicLayoutResponse response = s.start().get();

assertEquals(response.epoch(), 0L);
assertEquals(response.segments().size(), 4);
for (int k = 0; k < 4; k++) {
SegmentInfo seg = response.segments().get((long) k);
assertNotNull(seg, "missing segment for partition " + k);
assertEquals(seg.segmentId(), k);
assertTrue(seg.isActive());
assertTrue(seg.isSpecial(), "partition " + k + " must wrap as a special segment");
assertEquals(seg.underlyingTopicName(),
"persistent://tenant/ns/my-partitioned-partition-" + k);
}
// Hash ranges cover [0x0000, 0xFFFF] contiguously and end at 0xFFFF inclusive
// on the last segment.
assertEquals(response.segments().get(0L).hashRange().start(), 0x0000);
assertEquals(response.segments().get(3L).hashRange().end(), 0xFFFF);
// No gaps between consecutive segments.
for (int k = 0; k < 3; k++) {
int endK = response.segments().get((long) k).hashRange().end();
int startK1 = response.segments().get((long) (k + 1)).hashRange().start();
assertEquals(startK1, endK + 1, "gap between partition " + k + " and " + (k + 1));
}
}

@Test
public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() {
// The synthetic-layout response goes through pushUpdate, which always emits the
// canonical topic://... identity in resolved_topic_name regardless of input form.
TopicName regular = TopicName.get("persistent://tenant/ns/my-regular");
ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse(
0L,
Map.of(0L, SegmentInfo.activeSpecial(0L, HashRange.of(0x0000, 0xFFFF),
"persistent://tenant/ns/my-regular", 0L, 12345L)),
null, null, null, null);

DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
s.pushUpdate(response);

ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verify(ctx).writeAndFlush(captor.capture());
BaseCommand cmd = parseFrame(captor.getValue());
assertEquals(cmd.getScalableTopicUpdate().getResolvedTopicName(),
"topic://tenant/ns/my-regular");
// The special-segment marker round-trips through the wire format.
var seg = cmd.getScalableTopicUpdate().getDag().getSegmentAt(0);
assertTrue(seg.hasUnderlyingTopicName());
assertEquals(seg.getUnderlyingTopicName(), "persistent://tenant/ns/my-regular");
}

@Test
public void testStartRegistersWithResources() {
// start() routes through the resources-level fan-out instead of registering
Expand Down Expand Up @@ -314,7 +411,8 @@ private static SegmentInfo seg(long id, int start, int end, SegmentState state,
createdAt,
sealedAt,
createdAtMs,
sealedAtMs);
sealedAtMs,
null);
}

private static java.util.List<Long> toList(long[] arr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,19 +539,19 @@ public void testSeekSubscriptionDispatchesPerSegmentByTimestamp() throws Excepti
org.apache.pulsar.common.scalable.SegmentState.SEALED,
java.util.List.of(), java.util.List.of(3L),
/*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1,
/*createdAtMs*/ t0, /*sealedAtMs*/ t1);
/*createdAtMs*/ t0, /*sealedAtMs*/ t1, null);
org.apache.pulsar.common.scalable.SegmentInfo seg1 = new org.apache.pulsar.common.scalable.SegmentInfo(
1L,
org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF),
org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
java.util.List.of(), java.util.List.of(),
0, -1, t1, -1);
0, -1, t1, -1, null);
org.apache.pulsar.common.scalable.SegmentInfo seg2 = new org.apache.pulsar.common.scalable.SegmentInfo(
2L,
org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF),
org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
java.util.List.of(), java.util.List.of(),
0, -1, t3, -1);
0, -1, t3, -1, null);

TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic");
ScalableTopicMetadata md = ScalableTopicMetadata.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,19 @@ public TopicName getPartition(int index) {
return get(partitionName);
}

/**
* Returns this topic re-expressed in the scalable {@code topic://...} domain,
* regardless of the original input domain. Used to derive the canonical
* scalable-topic identity for a name the user may have spelled as
* {@code persistent://...} or short-form.
*/
public TopicName toScalableTopic() {
if (domain == TopicDomain.topic) {
return this;
}
return get(TopicDomain.topic.value() + "://" + getNamespace() + "/" + getEncodedLocalName());
}

/**
* @return partition index of the completeTopicName.
* It returns -1 if the completeTopicName (topic) is not partitioned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,10 +1700,12 @@ public static ByteBuf newScalableTopicClose(long sessionId) {
return serializeWithSize(cmd);
}

public static ByteBuf newScalableTopicUpdate(long sessionId, ScalableTopicDAG dag) {
public static ByteBuf newScalableTopicUpdate(long sessionId, String resolvedTopicName,
ScalableTopicDAG dag) {
BaseCommand cmd = new BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE);
CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate()
.setSessionId(sessionId);
.setSessionId(sessionId)
.setResolvedTopicName(resolvedTopicName);
update.setDag().copyFrom(dag);
return serializeWithSize(cmd);
}
Expand Down
Loading
Loading