fix: add transaction id to message metadata#1499
Open
jiangmocc wants to merge 1 commit into
Open
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR adds transaction ID (TxnID) propagation into Pulsar message metadata for both non-batched and batched producer sends, and introduces tests to validate the behavior.
Changes:
- Set TxnID fields on producer message metadata when a send request is associated with a transaction.
- Ensure batched message metadata also records TxnID and that TxnID fields are cleared after flush/reset.
- Add unit tests covering TxnID metadata population and reset behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| pulsar/producer_partition.go | Populates TxnID fields on generated message metadata when sr.transaction is present. |
| pulsar/producer_test.go | Adds a unit test to validate updateMetaData sets TxnID fields. |
| pulsar/internal/batch_builder.go | Copies TxnID into batch message metadata and clears TxnID fields on reset. |
| pulsar/internal/batch_builder_test.go | Adds tests verifying TxnID is set in batch metadata and cleared after flush. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pb.CompressionType_NONE, | ||
| compression.Level(0), | ||
| &bufferPoolImpl{}, | ||
| NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer), |
| assert.ErrorContains(t, err, "connection error") | ||
| } | ||
|
|
||
| func TestUpdateMetaDataAddsTxnID(t *testing.T) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Fixes #1498.
Transactional sends already set the transaction id on
CommandSend, but the serializedMessageMetadatadid not carrytxnid_most_bitsandtxnid_least_bits. Brokers rely on the message metadata transaction id when handling transaction buffer entries, so aborted transactional publishes can leak as visible messages.Modifications
MessageMetadatafor single-message transactional sends.MessageMetadatafor transactional batches.Verifying this change
GOWORK=off go test -v ./pulsar/internal -run "TestBatchBuilder(AddsTxnIDToMessageMetadata|ClearsTxnIDAfterFlush)" -count=1GOWORK=off go test -v ./pulsar -run TestUpdateMetaDataAddsTxnID -count=1Does this pull request potentially affect one of the following parts:
Documentation