feat(spanner): add multiplexed session support#437
Open
JulienEllie wants to merge 1 commit intoyoshidan:mainfrom
Open
feat(spanner): add multiplexed session support#437JulienEllie wants to merge 1 commit intoyoshidan:mainfrom
JulienEllie wants to merge 1 commit intoyoshidan:mainfrom
Conversation
Adds support for multiplexed Spanner sessions, required by Spanner Omni
deployments and supported by Cloud Spanner for concurrent workloads. A
single multiplexed session is created via CreateSession (with
Session.multiplexed = true) and shared across callers; no per-session
pooling or expiry is maintained. Auto-enabled when SPANNER_OMNI_ENDPOINT
is set.
Session management
- SessionManager holds a long-lived multiplexed handle behind an RwLock
so concurrent get() callers can clone in parallel; only recreation
takes the write lock.
- NotFound on any clone flips a shared invalid flag; the next get()
recreates the master under an async lock that double-checks the flag.
- get() and recreate_multiplexed_session() honour the cancellation
token, returning FailedToCreateSession after close() so a closed
manager cannot be silently resurrected.
- close() drops the multiplexed master without issuing DeleteSession;
per the v1 proto, multiplexed sessions cannot be deleted via the API
and are reclaimed by server-side TTL.
Read-write transactions (inline begin)
- Multiplexed RW transactions use inline begin: the transaction is
started implicitly on the first query/DML, with the server-assigned
tx_id captured from response metadata into a slot shared with the
parent transaction. PartitionedDml is excluded and continues to use
explicit BeginTransaction.
- Streaming retries upgrade the cached request's selector to Id(tx_id)
once captured, so a resumed stream cannot accidentally start a second
server-side transaction.
- RowIterator only pre-fetches the first frame on the very first op
(when the inline-begin slot is still unpopulated); subsequent ops on
the same transaction behave like non-multiplexed iterators.
- Mutations-only commits go through an explicit BeginTransaction with
mutation_key set, capturing both tx_id and the response precommit
token; the lifetime transaction_tag is used for the begin RPC.
- commit() returns a synthetic empty CommitResponse if the transaction
reaches commit with no operations and no buffered mutations, mirroring
rollback()'s no-op guard.
Precommit token handling
- Multiplexed-session precommit tokens are kept by highest seq_num per
proto contract (MultiplexedSessionPrecommitToken.seq_num); a shared
helper enforces this at every capture site (DML, batch DML, streaming
reads, mutations-only BeginTransaction).
- The captured token is forwarded on CommitRequest.precommit_token.
Apply path
- Client::apply_at_least_once routes through the regular RW transaction
path on multiplexed sessions, since SingleUseTransaction commits with
mutations are not supported on multiplexed sessions (no place to
attach mutation_key / precommit_token).
Tests
- Unit tests:
* update_precommit_token: empty slot, strictly-greater seq_num,
lower seq_num, equal seq_num.
* Reader::update_transaction_selector for both StatementReader and
TableReader: Begin(...) replaced with Id(tx_id), resume_token
untouched.
* SessionManager::get after close returns FailedToCreateSession.
* Multiplexed master is recreated when the invalid flag is flipped
(server-assigned session name changes after invalidation).
- Integration tests against the Cloud Spanner emulator (which does
participate in the multiplexed-session protocol on recent images —
it accepts multiplexed:true sessions, returns precommitToken on
BeginTransaction with mutation_key, and rejects buggy request shapes
with NOT_FOUND/UNIMPLEMENTED). Coverage:
* inline begin: query then commit captures tx_id.
* mutations-only commit drives begin_for_mutations_only.
* empty transaction commits cleanly without shipping
TransactionId(empty) (which the server rejects).
* multi-op transaction reuses the captured tx_id across ops.
* apply_at_least_once on a multiplexed session (server rejects the
SingleUseTransaction-with-mutations shape used by the non-
multiplexed path).
* partitioned_update on a multiplexed session (server rejects the
inline-begin PartitionedDml shape).
* DML followed by buffered mutations in the same RW transaction.
- Not exercised end-to-end (would need either a gRPC mock or an
emulator that strictly enforces these): streaming-retry selector
upgrade, strict precommit_token / seq_num enforcement, and the
performance contract of the RwLock-protected multiplexed clone.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
The Spanner v1 protocol has a multiplexed session shape (
Session.multiplexed = true) — one long-lived session shared by many concurrent ops, no per-session pooling, no expiry. Spanner Omni deployments only accept multiplexed sessions for read-write transactions; vanilla Cloud Spanner accepts them for read workloads. The Rust client previously had no support for this, so any user pointed at Omni couldn't use the crate at all.This PR adds end-to-end support, with the multiplexed code path auto-enabled when
SPANNER_OMNI_ENDPOINTis set in the environment.Session management
SessionManagerholds the long-lived multiplexed handle in aparking_lot::RwLockso concurrentget()callers can clone it in parallel; only recreation needs the write lock.NotFoundon any clone flips a shared invalid flag; the nextget()recreates the master under an async lock that double-checks the flag. A monotonicmultiplexed_generationcounter, snapshotted onto each clone, prevents stale clones (whose master has already been rotated) from re-flagging a freshly-recreated master and triggering redundantCreateSessionRPCs.get()andrecreate_multiplexed_session()honour the manager'sCancellationToken, returningFailedToCreateSessionafterclose()so a closed manager can't be silently resurrected.close()drops the multiplexed master withoutDeleteSession— per the v1 proto, multiplexed sessions can't be deleted via the API and are reclaimed by server-side TTL.Read-write transactions (inline begin)
tx_idcaptured from response metadata into a slot shared with the parent transaction.PartitionedDmlis excluded — it continues to use explicitBeginTransaction, since inline begin withPartitionedDmlis rejected by the server.Id(tx_id)once captured (newReader::update_transaction_selectortrait method, with a no-op default body so external impls don't break — SemVer-compatible). Without this, a resumed stream withBegin(...)+resume_tokencould start a second server-side transaction the client never tracks.RowIteratoronly pre-fetches the first frame on the very first op of a transaction (when the inline-begin slot is still unpopulated); subsequent ops on the same transaction behave like non-multiplexed iterators.BeginTransactionwithmutation_keyset to a randomly-selected buffered mutation (per proto guidance for partition-lookup load distribution), capturing bothtx_idand the responseprecommit_token. The lifetimetransaction_tagis used for the begin RPC.commit()returns a synthetic emptyCommitResponseif the transaction reaches commit with no operations and no buffered mutations — mirrorsrollback()'s no-op guard, so an empty multiplexed tx doesn't shipTransactionId(\"\")to the server (which is rejected).Precommit token handling
seq_numper proto contract, not last-write-wins. A shared helper enforces this at every capture site (DML, batch DML, streaming reads, mutations-onlyBeginTransaction).CommitRequest.precommit_token.Apply path
Client::apply_at_least_onceroutes through the regular RW transaction path on multiplexed sessions, sinceSingleUseTransactioncommits with mutations are not supported on multiplexed sessions (no place to attachmutation_key/precommit_token; server returns UNIMPLEMENTED).Tests
Unit tests:
update_precommit_token: empty slot, strictly-greaterseq_num, lowerseq_num, equalseq_num.Reader::update_transaction_selectorfor bothStatementReaderandTableReader:Begin(...)replaced withId(tx_id),resume_tokenuntouched.Readerimpl that omitsupdate_transaction_selectorstill compiles (default body present).SessionManager::getafterclosereturnsFailedToCreateSession.Integration tests against the Cloud Spanner emulator (recent images participate in the multiplexed protocol — accept
multiplexed:true, returnprecommitTokenonBeginTransactionwithmutation_key, and reject buggy request shapes withNOT_FOUND/UNIMPLEMENTED):tx_id.begin_for_mutations_only.TransactionId(\"\").tx_idacross ops.apply_at_least_onceon a multiplexed session (server rejects theSingleUseTransaction-with-mutations shape used by the non-multiplexed path).partitioned_updateon a multiplexed session (server rejects the inline-beginPartitionedDmlshape).Not exercised end-to-end (would need either a gRPC mock harness or a stricter Spanner endpoint): streaming-retry selector upgrade in production, strict
precommit_token/seq_numenforcement (the emulator participates but doesn't reject mismatches), and the lock-contention contract of theRwLock-protected multiplexed clone.Test plan
cargo test -p gcloud-spanner --lib --test client_test --test transaction_rw_test --test transaction_ro_test --test multiplexed_test).SPANNER_OMNI_ENDPOINTset: verify RW transactions, mutations-only commits, and concurrent ops succeed.