Skip to content

fix: add transaction id to message metadata#1499

Open
jiangmocc wants to merge 1 commit into
apache:masterfrom
jiangmocc:codex/fix-transaction-metadata
Open

fix: add transaction id to message metadata#1499
jiangmocc wants to merge 1 commit into
apache:masterfrom
jiangmocc:codex/fix-transaction-metadata

Conversation

@jiangmocc
Copy link
Copy Markdown

Motivation

Fixes #1498.

Transactional sends already set the transaction id on CommandSend, but the serialized MessageMetadata did not carry txnid_most_bits and txnid_least_bits. Brokers rely on the message metadata transaction id when handling transaction buffer entries, so aborted transactional publishes can leak as visible messages.

Modifications

  • Set transaction id fields on MessageMetadata for single-message transactional sends.
  • Set transaction id fields on batch-level MessageMetadata for transactional batches.
  • Clear transaction id fields from reusable batch metadata and send command state after flushing.
  • Add focused unit tests for single-message metadata, batch metadata, and batch reset behavior.

Verifying this change

  • GOWORK=off go test -v ./pulsar/internal -run "TestBatchBuilder(AddsTxnIDToMessageMetadata|ClearsTxnIDAfterFlush)" -count=1
  • GOWORK=off go test -v ./pulsar -run TestUpdateMetaDataAddsTxnID -count=1

Does this pull request potentially affect one of the following parts:

  • Dependencies: no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no

Documentation

  • Does this pull request introduce a new feature? no

Copilot AI review requested due to automatic review settings May 19, 2026 10:05
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

This PR adds transaction ID (TxnID) propagation into Pulsar message metadata for both non-batched and batched producer sends, and introduces tests to validate the behavior.

Changes:

  • Set TxnID fields on producer message metadata when a send request is associated with a transaction.
  • Ensure batched message metadata also records TxnID and that TxnID fields are cleared after flush/reset.
  • Add unit tests covering TxnID metadata population and reset behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
pulsar/producer_partition.go Populates TxnID fields on generated message metadata when sr.transaction is present.
pulsar/producer_test.go Adds a unit test to validate updateMetaData sets TxnID fields.
pulsar/internal/batch_builder.go Copies TxnID into batch message metadata and clears TxnID fields on reset.
pulsar/internal/batch_builder_test.go Adds tests verifying TxnID is set in batch metadata and cleared after flush.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

pb.CompressionType_NONE,
compression.Level(0),
&bufferPoolImpl{},
NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer),
Comment thread pulsar/producer_test.go
assert.ErrorContains(t, err, "connection error")
}

func TestUpdateMetaDataAddsTxnID(t *testing.T) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Transactional publishes do not set txn id in MessageMetadata

2 participants