diff --git a/spanner/src/client.rs b/spanner/src/client.rs index 280a83a5..09e62c14 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -95,14 +95,42 @@ pub struct ClientConfig { impl Default for ClientConfig { fn default() -> Self { + // SPANNER_OMNI_ENDPOINT and SPANNER_EMULATOR_HOST both reuse the + // `Environment::Emulator` variant because they share the same + // connection profile we need for our Omni deployment: plain-text gRPC + // and no auth interceptor. This is a deliberate overload — the variant + // name describes the connection shape we want, not the kind of server + // on the other end. If your Omni deployment requires TLS or auth, do + // not rely on `Default`; build a `ClientConfig` with a custom + // `environment` instead. + // + // SPANNER_OMNI_ENDPOINT takes precedence over SPANNER_EMULATOR_HOST, + // and additionally flips `use_multiplexed_session` on (Omni requires + // multiplexed sessions for read-write transactions). + let (environment, use_multiplexed, env_source) = if let Ok(v) = var("SPANNER_OMNI_ENDPOINT") { + (Environment::Emulator(v), true, Some("SPANNER_OMNI_ENDPOINT")) + } else if let Ok(v) = var("SPANNER_EMULATOR_HOST") { + (Environment::Emulator(v), false, Some("SPANNER_EMULATOR_HOST")) + } else { + (Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})), false, None) + }; + if let Some(name) = env_source { + tracing::info!( + env_var = name, + use_multiplexed_session = use_multiplexed, + "ClientConfig::default(): using plaintext+no-auth profile because {} is set", + name, + ); + } + + let mut session_config = SessionConfig::default(); + session_config.use_multiplexed_session = use_multiplexed; + let mut config = ClientConfig { channel_config: Default::default(), - session_config: Default::default(), + session_config, endpoint: SPANNER.to_string(), - environment: match var("SPANNER_EMULATOR_HOST").ok() { - Some(v) => Environment::Emulator(v), - None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})), - }, + environment, disable_route_to_leader: false, metrics: MetricsConfig::default(), }; @@ -415,6 +443,22 @@ impl Client { let ro = TransactionRetrySetting::default(); let mut session = self.get_session().await?; + // Multiplexed sessions reject mutation commits sent through a + // SingleUseTransaction (there is no place to attach the required + // `mutation_key`/`precommit_token`). Fall back to the regular RW + // transaction path, which routes through `begin_for_mutations_only` + // and includes the precommit token on Commit. The single-RPC + // optimisation does not apply on multiplexed sessions. + if session.session.multiplexed { + drop(session); + let opts = ReadWriteTransactionOption { + commit_options: options, + ..Default::default() + }; + let result = self.apply_with_option(ms, opts).await?; + return Ok(Some(result)); + } + invoke_fn( Some(ro), |session| async { @@ -423,7 +467,7 @@ impl Client { mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())), isolation_level: IsolationLevel::Unspecified as i32, }); - match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader).await { + match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader, None).await { Ok(s) => Ok(Some(s.into())), Err(e) => Err((Error::GRPC(e), session)), } diff --git a/spanner/src/reader.rs b/spanner/src/reader.rs index 9694c475..12c15afb 100644 --- a/spanner/src/reader.rs +++ b/spanner/src/reader.rs @@ -1,11 +1,16 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; +use google_cloud_googleapis::spanner::v1::MultiplexedSessionPrecommitToken; +use parking_lot::Mutex; + use prost::Message; use prost_types::{value::Kind, Value}; use google_cloud_gax::grpc::{Code, Response, Status, Streaming}; use google_cloud_googleapis::spanner::v1::struct_type::Field; +use google_cloud_googleapis::spanner::v1::transaction_selector::Selector as TransactionSelectorEnum; +use google_cloud_googleapis::spanner::v1::TransactionSelector; use google_cloud_googleapis::spanner::v1::{ ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats, }; @@ -15,6 +20,33 @@ use crate::row::Row; use crate::session::SessionHandle; use crate::transaction::CallOptions; +/// Capture the inline-begin transaction id (from the first response's metadata) +/// and the latest multiplexed-session precommit token from a PartialResultSet +/// into the slots shared with the parent transaction. +fn capture_inline_metadata( + inline_tx_id: &Option>>>>, + inline_precommit_token: &Option>>>, + prs: &PartialResultSet, +) { + if let Some(slot) = inline_tx_id { + if let Some(meta) = prs.metadata.as_ref() { + if let Some(txn) = meta.transaction.as_ref() { + if !txn.id.is_empty() { + let mut guard = slot.lock(); + if guard.is_none() { + *guard = Some(txn.id.clone()); + } + } + } + } + } + if let Some(slot) = inline_precommit_token { + if let Some(token) = prs.precommit_token.as_ref() { + crate::transaction_rw::update_precommit_token(slot, token); + } + } +} + pub trait Reader: Send + Sync { fn read( &self, @@ -25,6 +57,16 @@ pub trait Reader: Send + Sync { fn update_token(&mut self, resume_token: Vec); + /// Replace the request's transaction selector with `Id(tx_id)`. Used + /// after inline-begin captures the server-assigned tx_id so that any + /// subsequent retry of the same streaming RPC does not re-issue a + /// `Begin(...)` selector and accidentally start a second server-side + /// transaction. Default impl is a no-op so external `Reader` + /// implementations don't need to be updated; only callers that + /// participate in inline-begin (the in-crate `StatementReader` / + /// `TableReader`) need to override it. + fn update_transaction_selector(&mut self, _tx_id: Vec) {} + fn can_resume(&self) -> bool; } @@ -52,6 +94,12 @@ impl Reader for StatementReader { self.request.resume_token = resume_token; } + fn update_transaction_selector(&mut self, tx_id: Vec) { + self.request.transaction = Some(TransactionSelector { + selector: Some(TransactionSelectorEnum::Id(tx_id)), + }); + } + fn can_resume(&self) -> bool { self.enable_resume && !self.request.resume_token.is_empty() } @@ -80,6 +128,12 @@ impl Reader for TableReader { self.request.resume_token = resume_token; } + fn update_transaction_selector(&mut self, tx_id: Vec) { + self.request.transaction = Some(TransactionSelector { + selector: Some(TransactionSelectorEnum::Id(tx_id)), + }); + } + fn can_resume(&self) -> bool { !self.request.resume_token.is_empty() } @@ -287,6 +341,13 @@ where resumable: bool, end_of_stream: bool, stream_retry: StreamingRetry, + /// Populated by the first PartialResultSet when using inline begin. + /// The RowIterator writes the transaction ID here; the owning + /// ReadWriteTransaction reads it to upgrade its selector to Id(tx_id). + inline_tx_id: Option>>>>, + /// Updated with the latest precommit token from each PartialResultSet. + /// Required in the Commit request for multiplexed session transactions. + inline_precommit_token: Option>>>, } impl<'a, T> RowIterator<'a, T> @@ -298,8 +359,10 @@ where reader: T, option: Option, disable_route_to_leader: bool, + inline_tx_id: Option>>>>, + inline_precommit_token: Option>>>, ) -> Result, Status> { - let streaming = reader + let mut streaming = reader .read(session, option, disable_route_to_leader) .await? .into_inner(); @@ -309,6 +372,36 @@ where rows: VecDeque::new(), chunked_value: false, }; + let mut prs_buffer = ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS); + let mut end_of_stream = false; + let mut resumable = true; + // For inline-begin (multiplexed sessions) the parent transaction + // depends on the tx_id carried in the first PartialResultSet's + // metadata. Pre-fetch that frame here so the id is captured before + // the caller has any chance to drop the iterator unconsumed. Once + // the slot has been populated by the first op of this transaction, + // subsequent ops should behave like non-multiplexed iterators + // (no constructor pre-fetch). + let needs_inline_prefetch = inline_tx_id + .as_ref() + .is_some_and(|slot| slot.lock().is_none()); + if needs_inline_prefetch { + match streaming.message().await? { + Some(prs) => { + if prs.last { + end_of_stream = true; + } + capture_inline_metadata(&inline_tx_id, &inline_precommit_token, &prs); + prs_buffer.push(prs); + if prs_buffer.unretryable { + resumable = false; + } + } + None => { + end_of_stream = true; + } + } + } Ok(Self { streaming, session, @@ -317,10 +410,12 @@ where reader_option: None, disable_route_to_leader, stats: None, - prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS), - resumable: true, - end_of_stream: false, + prs_buffer, + resumable, + end_of_stream, stream_retry: StreamingRetry::new(), + inline_tx_id, + inline_precommit_token, }) } @@ -367,6 +462,16 @@ where } tracing::debug!("streaming error: {}. resume reading by resume_token", e); self.stream_retry.next(e).await?; + // If inline begin already captured the server-assigned + // tx_id, upgrade the cached request's selector to + // `Id(tx_id)` before retrying so the server does not + // start a second transaction in response to a `Begin` + // + resume_token combination. + if let Some(slot) = self.inline_tx_id.as_ref() { + if let Some(tx_id) = slot.lock().clone() { + self.reader.update_transaction_selector(tx_id); + } + } let call_option = option.clone(); let result = self .reader @@ -383,6 +488,7 @@ where if result_set.last { self.end_of_stream = true; } + capture_inline_metadata(&self.inline_tx_id, &self.inline_precommit_token, &result_set); self.prs_buffer.push(result_set); if self.prs_buffer.unretryable { self.resumable = false; @@ -441,9 +547,13 @@ mod tests { use prost_types::Value; use google_cloud_googleapis::spanner::v1::struct_type::Field; - use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType}; + use google_cloud_googleapis::spanner::v1::transaction_selector::Selector as TxSelector; + use google_cloud_googleapis::spanner::v1::{ + transaction_options, ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, StructType, + TransactionOptions, TransactionSelector, + }; - use crate::reader::ResultSet; + use crate::reader::{Reader, ResultSet, StatementReader, TableReader}; use crate::row::{Row, TryFromValue}; use crate::statement::ToKind; @@ -947,4 +1057,83 @@ mod tests { }; assert!(!rs_partial.is_row_boundary()); } + + fn begin_selector() -> Option { + Some(TransactionSelector { + selector: Some(TxSelector::Begin(TransactionOptions { + exclude_txn_from_change_streams: false, + mode: Some(transaction_options::Mode::ReadWrite( + transaction_options::ReadWrite::default(), + )), + isolation_level: 0, + })), + }) + } + + #[test] + fn statement_reader_update_transaction_selector_replaces_begin_with_id() { + let mut reader = StatementReader { + enable_resume: true, + request: ExecuteSqlRequest { + transaction: begin_selector(), + resume_token: b"some-token".to_vec(), + ..Default::default() + }, + }; + reader.update_transaction_selector(b"tx-1".to_vec()); + match reader.request.transaction.as_ref().unwrap().selector.as_ref().unwrap() { + TxSelector::Id(id) => assert_eq!(id, b"tx-1"), + other => panic!("expected Id selector, got {other:?}"), + } + // Resume token must not be touched — that's update_token's job. + assert_eq!(reader.request.resume_token, b"some-token"); + } + + #[test] + fn table_reader_update_transaction_selector_replaces_begin_with_id() { + let mut reader = TableReader { + request: ReadRequest { + transaction: begin_selector(), + resume_token: b"some-token".to_vec(), + ..Default::default() + }, + }; + reader.update_transaction_selector(b"tx-1".to_vec()); + match reader.request.transaction.as_ref().unwrap().selector.as_ref().unwrap() { + TxSelector::Id(id) => assert_eq!(id, b"tx-1"), + other => panic!("expected Id selector, got {other:?}"), + } + assert_eq!(reader.request.resume_token, b"some-token"); + } + + /// Compile-time canary: an external `Reader` impl must be able to omit + /// `update_transaction_selector` and rely on the trait's default body. + /// If someone removes the default body to make the method required + /// again, this stops compiling — which is a SemVer-major break for + /// downstream crates with their own `Reader` impls. + #[test] + fn reader_trait_has_default_update_transaction_selector() { + struct ExternalStyleReader; + impl Reader for ExternalStyleReader { + async fn read( + &self, + _session: &mut crate::session::SessionHandle, + _option: Option, + _disable_route_to_leader: bool, + ) -> Result< + google_cloud_gax::grpc::Response>, + google_cloud_gax::grpc::Status, + > { + unreachable!("not invoked") + } + fn update_token(&mut self, _resume_token: Vec) {} + fn can_resume(&self) -> bool { + false + } + // Intentionally NO `update_transaction_selector` — relies on default. + } + // Default impl is a no-op; calling it must not panic. + let mut r = ExternalStyleReader; + r.update_transaction_selector(b"tx".to_vec()); + } } diff --git a/spanner/src/session.rs b/spanner/src/session.rs index 2e34380b..bc6e7525 100644 --- a/spanner/src/session.rs +++ b/spanner/src/session.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::mem; use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -18,7 +19,7 @@ use tracing::{Instrument, Span}; use google_cloud_gax::grpc::metadata::MetadataMap; use google_cloud_gax::grpc::{Code, Status}; use google_cloud_gax::retry::TryAs; -use google_cloud_googleapis::spanner::v1::{BatchCreateSessionsRequest, DeleteSessionRequest, Session}; +use google_cloud_googleapis::spanner::v1::{BatchCreateSessionsRequest, CreateSessionRequest, DeleteSessionRequest, Session}; use crate::apiv1::conn_pool::ConnectionManager; use crate::apiv1::spanner_client::{ping_query_request, Client}; @@ -26,6 +27,18 @@ use crate::metrics::{MetricsRecorder, SessionPoolSnapshot, SessionPoolStatsFn}; const MAX_IN_USE_WINDOW: Duration = Duration::from_secs(600); +/// Carried by clones of the multiplexed master so that `invalidate_if_needed` +/// can flip the manager's invalid flag if and only if the failing clone +/// belongs to the master generation that is currently installed. Without the +/// generation check, a delayed `NotFound` from a stale clone (whose master +/// has already been recreated) would falsely re-flag the new master as +/// invalid and trigger another redundant `CreateSession` RPC. +pub(crate) struct MultiplexedInvalidator { + flag: Arc, + clone_generation: u64, + current_generation: Arc, +} + /// Session pub struct SessionHandle { pub session: Session, @@ -36,6 +49,13 @@ pub struct SessionHandle { last_checked_at: Instant, last_pong_at: Instant, created_at: Instant, + /// Set on clones of a multiplexed session. When `invalidate_if_needed` + /// observes a NotFound on a multiplexed handle, this flag is raised so + /// `SessionManager::get` recreates the underlying session on the next + /// call (subject to a generation check, see `MultiplexedInvalidator`). + /// Multiplexed sessions cannot be deleted (per the v1 proto), so the + /// delete RPC is skipped in that case. + multiplexed_invalidator: Option, } impl SessionHandle { @@ -49,6 +69,26 @@ impl SessionHandle { last_checked_at: now, last_pong_at: now, created_at: now, + multiplexed_invalidator: None, + } + } + + pub(crate) fn new_multiplexed_clone( + session: Session, + spanner_client: Client, + now: Instant, + invalidator: MultiplexedInvalidator, + ) -> SessionHandle { + SessionHandle { + session, + spanner_client, + valid: true, + deleted: false, + last_used_at: now, + last_checked_at: now, + last_pong_at: now, + created_at: now, + multiplexed_invalidator: Some(invalidator), } } @@ -57,8 +97,32 @@ impl SessionHandle { Ok(s) => Ok(s), Err(e) => { if e.code() == Code::NotFound && e.message().contains("Session not found:") { - tracing::debug!("session invalidate {}", self.session.name); - self.delete().await; + if let Some(inv) = &self.multiplexed_invalidator { + // Multiplexed sessions cannot be deleted via the API; + // signal SessionManager to recreate on the next get(), + // but only if this clone still belongs to the master + // generation currently installed. A `NotFound` from a + // stale clone (master already rotated) is silently + // swallowed so it does not trigger a redundant + // CreateSession. + let current = inv.current_generation.load(Ordering::Acquire); + if current == inv.clone_generation { + tracing::debug!("multiplexed session invalidated: {}", self.session.name); + self.valid = false; + inv.flag.store(true, Ordering::Release); + } else { + tracing::debug!( + "stale multiplexed clone NotFound ignored (clone_gen={}, current_gen={}, name={})", + inv.clone_generation, + current, + self.session.name, + ); + self.valid = false; + } + } else { + tracing::debug!("session invalidate {}", self.session.name); + self.delete().await; + } } Err(e) } @@ -82,6 +146,9 @@ impl SessionHandle { pub struct ManagedSession { session_pool: SessionPool, session: Option, + /// When false (multiplexed session mode), the session is NOT returned to + /// the pool on drop — it is a short-lived clone of a shared handle. + recycle_on_drop: bool, } impl ManagedSession { @@ -89,6 +156,15 @@ impl ManagedSession { ManagedSession { session_pool, session: Some(session), + recycle_on_drop: true, + } + } + + fn new_multiplexed(session_pool: SessionPool, session: SessionHandle) -> Self { + ManagedSession { + session_pool, + session: Some(session), + recycle_on_drop: false, } } } @@ -96,7 +172,15 @@ impl ManagedSession { impl Drop for ManagedSession { fn drop(&mut self) { let session = self.session.take().unwrap(); - self.session_pool.recycle(session); + if self.recycle_on_drop { + self.session_pool.recycle(session); + } else { + // Multiplexed sessions are shared; the handle is simply dropped. + // recycle() emits record_session_released for pool sessions, so + // emit it directly here to keep acquire/release counts balanced + // for observability. + self.session_pool.metrics.record_session_released(); + } } } @@ -255,7 +339,6 @@ impl SessionPool { config, metrics, }; - pool.metrics.register_session_pool(pool.snapshot_fn()); Ok(pool) } @@ -414,7 +497,7 @@ impl SessionPool { self.remove_orphans().await; } - fn snapshot_fn(&self) -> SessionPoolStatsFn { + fn snapshot_fn(&self, has_multiplexed_session: bool) -> SessionPoolStatsFn { let inner = self.inner.clone(); let max_allowed = self.config.max_opened; Arc::new(move || { @@ -425,7 +508,7 @@ impl SessionPool { idle_sessions: sessions.available_sessions.len(), max_allowed_sessions: max_allowed, max_in_use_last_window: sessions.max_inuse_window, - has_multiplexed_session: false, + has_multiplexed_session, } }) } @@ -475,6 +558,16 @@ pub struct SessionConfig { /// incStep is the number of sessions to create in one batch when at least /// one more session is needed. inc_step: usize, + + /// use_multiplexed_session enables multiplexed session mode. + /// + /// When true, a single multiplexed session is created via `CreateSession` + /// (with `Session.multiplexed = true`) and shared across all callers. + /// No session pool is maintained; the session does not expire. + /// + /// Required for Spanner Omni deployments, which only accept multiplexed + /// sessions. Set automatically when `SPANNER_OMNI_ENDPOINT` is detected. + pub use_multiplexed_session: bool, } impl Default for SessionConfig { @@ -488,6 +581,7 @@ impl Default for SessionConfig { session_alive_trust_duration: Duration::from_secs(55 * 60), session_get_timeout: Duration::from_secs(1), refresh_interval: Duration::from_secs(5 * 60), + use_multiplexed_session: false, } } } @@ -511,8 +605,35 @@ impl TryAs for SessionError { } } +/// Context required to (re)create a multiplexed session after the original +/// has been observed as `NotFound` server-side. +struct MultiplexedRecreateCtx { + conn_pool: Arc, + database: String, + disable_route_to_leader: bool, + metrics: Arc, +} + pub(crate) struct SessionManager { session_pool: SessionPool, + /// Multiplexed session mode: holds the single long-lived session used as a + /// template. Each call to `get()` clones the session proto + gRPC client to + /// produce a fresh `SessionHandle` without pooling or expiry semantics. + multiplexed: RwLock>, + /// Shared with every multiplexed clone. A NotFound on any clone flips + /// this to `true`; the next `get()` triggers a recreation. Subject to + /// the generation check carried by `MultiplexedInvalidator` so stale + /// clones cannot re-flag a freshly-recreated master. + multiplexed_invalid: Arc, + /// Bumped each time a new master multiplexed session is installed + /// (initial create + every recreation). Each clone snapshots the + /// current value at clone time; `invalidate_if_needed` only flips the + /// invalid flag when the snapshot matches the current value. + multiplexed_generation: Arc, + /// Serializes concurrent recreation attempts (an async mutex because + /// `create_multiplexed_session` is awaitable). + multiplexed_create_lock: tokio::sync::Mutex<()>, + multiplexed_recreate_ctx: Option, cancel: CancellationToken, tasks: Mutex>>, } @@ -526,32 +647,80 @@ impl SessionManager { metrics: Arc, ) -> Result, Status> { let database = database.into(); + let use_multiplexed = config.use_multiplexed_session; + + // For multiplexed mode we still create a minimal (empty) pool as a + // placeholder — it satisfies the SessionPool type without pre-creating + // any regular sessions (min_opened = 0). + let pool_config = if use_multiplexed { + let mut c = config.clone(); + c.min_opened = 0; + c + } else { + config.clone() + }; + + let conn_pool = Arc::new(conn_pool); let (sender, receiver) = mpsc::unbounded_channel(); let session_pool = SessionPool::new( database.clone(), - &conn_pool, + conn_pool.as_ref(), sender, - Arc::new(config.clone()), + Arc::new(pool_config), disable_route_to_leader, metrics.clone(), ) .await?; let cancel = CancellationToken::new(); - let task_session_cleaner = Self::spawn_health_check_task(config, session_pool.clone(), cancel.clone()); - let task_session_creator = Self::spawn_session_creation_task( - session_pool.clone(), - database, - conn_pool, - receiver, - cancel.clone(), - disable_route_to_leader, - ); + let multiplexed_invalid = Arc::new(AtomicBool::new(false)); + let multiplexed_generation = Arc::new(AtomicU64::new(0)); + + let (multiplexed, tasks, multiplexed_recreate_ctx) = if use_multiplexed { + // Create the single multiplexed session and store it. + let handle = + create_multiplexed_session(conn_pool.as_ref(), &database, disable_route_to_leader, metrics.clone()) + .await?; + tracing::debug!("multiplexed session created: {}", handle.session.name); + // First master generation = 1 (clones default to 0; bumping here + // ensures even the first clone snapshots a non-zero value that + // matches a real installed master). + multiplexed_generation.store(1, Ordering::Release); + let ctx = MultiplexedRecreateCtx { + conn_pool: conn_pool.clone(), + database: database.clone(), + disable_route_to_leader, + metrics: metrics.clone(), + }; + (RwLock::new(Some(handle)), Mutex::new(vec![]), Some(ctx)) + } else { + // Standard pool mode: spin up the background maintenance tasks. + let task_cleaner = Self::spawn_health_check_task(config, session_pool.clone(), cancel.clone()); + let task_creator = Self::spawn_session_creation_task( + session_pool.clone(), + database, + conn_pool, + receiver, + cancel.clone(), + disable_route_to_leader, + ); + (RwLock::new(None), Mutex::new(vec![task_cleaner, task_creator]), None) + }; + + // Register the session-pool stats snapshot with the metrics recorder + // now that we know whether a multiplexed session was created. + let has_multiplexed = use_multiplexed && multiplexed.read().is_some(); + metrics.register_session_pool(session_pool.snapshot_fn(has_multiplexed)); let sm = SessionManager { session_pool, + multiplexed, + multiplexed_invalid, + multiplexed_generation, + multiplexed_create_lock: tokio::sync::Mutex::new(()), + multiplexed_recreate_ctx, cancel, - tasks: Mutex::new(vec![task_session_cleaner, task_session_creator]), + tasks, }; Ok(Arc::new(sm)) } @@ -561,9 +730,92 @@ impl SessionManager { } pub async fn get(&self) -> Result { + if self.cancel.is_cancelled() { + return Err(SessionError::FailedToCreateSession); + } + if let Some(ctx) = &self.multiplexed_recreate_ctx { + // Multiplexed mode: try the fast path; fall back to recreating + // the master if a prior NotFound flagged it as invalid. Record + // acquire/latency here so observability matches the pool path + // (record_session_acquired/_acquire_latency are otherwise only + // emitted from SessionPool::acquire). + let started_at = Instant::now(); + let metrics = self.session_pool.metrics.clone(); + if !self.multiplexed_invalid.load(Ordering::Acquire) { + if let Some(handle) = self.clone_multiplexed_handle() { + metrics.record_session_acquired(); + metrics.record_session_acquire_latency(started_at.elapsed()); + return Ok(handle); + } + } + self.recreate_multiplexed_session(ctx).await?; + let handle = self + .clone_multiplexed_handle() + .ok_or(SessionError::FailedToCreateSession)?; + metrics.record_session_acquired(); + metrics.record_session_acquire_latency(started_at.elapsed()); + return Ok(handle); + } self.session_pool.acquire().await } + /// Clone session proto + gRPC client of the master multiplexed handle. + /// The read guard is dropped at the end of the inner block, so the + /// SessionHandle is constructed without the lock held. parking_lot's + /// RwLock allows multiple concurrent readers, so this also lets parallel + /// `get()` callers clone in parallel; only recreate_multiplexed_session + /// needs the write lock. + fn clone_multiplexed_handle(&self) -> Option { + let snapshot = { + let guard = self.multiplexed.read(); + guard.as_ref().map(|h| (h.session.clone(), h.spanner_client.clone())) + }; + snapshot.map(|(session, client)| { + let handle = SessionHandle::new_multiplexed_clone( + session, + client, + Instant::now(), + self.multiplexed_invalidator(), + ); + ManagedSession::new_multiplexed(self.session_pool.clone(), handle) + }) + } + + fn multiplexed_invalidator(&self) -> MultiplexedInvalidator { + MultiplexedInvalidator { + flag: self.multiplexed_invalid.clone(), + clone_generation: self.multiplexed_generation.load(Ordering::Acquire), + current_generation: self.multiplexed_generation.clone(), + } + } + + async fn recreate_multiplexed_session(&self, ctx: &MultiplexedRecreateCtx) -> Result<(), Status> { + // Serialize concurrent recreation attempts; double-check the flag + // after acquiring the lock so we only pay for one CreateSession RPC. + let _guard = self.multiplexed_create_lock.lock().await; + if self.cancel.is_cancelled() { + return Err(Status::new(Code::Cancelled, "session manager is closed")); + } + if !self.multiplexed_invalid.load(Ordering::Acquire) && self.multiplexed.read().is_some() { + return Ok(()); + } + let new_handle = create_multiplexed_session( + ctx.conn_pool.as_ref(), + &ctx.database, + ctx.disable_route_to_leader, + ctx.metrics.clone(), + ) + .await?; + tracing::debug!("multiplexed session recreated: {}", new_handle.session.name); + *self.multiplexed.write() = Some(new_handle); + // Bump the generation BEFORE clearing the invalid flag: any + // late-arriving NotFound from a stale clone will then observe + // a mismatched generation and skip the flag flip. + self.multiplexed_generation.fetch_add(1, Ordering::AcqRel); + self.multiplexed_invalid.store(false, Ordering::Release); + Ok(()) + } + pub async fn close(&self) { if self.cancel.is_cancelled() { return; @@ -573,13 +825,18 @@ impl SessionManager { for task in tasks { let _ = task.await; } + // Drop the multiplexed master handle. Multiplexed sessions cannot + // be deleted via the API per the v1 proto ("Multiplexed sessions + // may not be deleted nor listed"); the server reclaims them via + // its own TTL once the channel is gone. + let _ = self.multiplexed.write().take(); self.session_pool.close().await; } fn spawn_session_creation_task( session_pool: SessionPool, database: String, - conn_pool: ConnectionManager, + conn_pool: Arc, mut rx: UnboundedReceiver, cancel: CancellationToken, disable_route_to_leader: bool, @@ -694,6 +951,36 @@ async fn health_check( tracing::trace!("end health check elapsed={}msec", start.elapsed().as_millis()); } +/// Create a single multiplexed session via the `CreateSession` RPC. +/// +/// Multiplexed sessions are long-lived, shared across concurrent operations, +/// and do not expire. They are required by Spanner Omni (and supported by +/// Cloud Spanner for read-only workloads). See: +/// +async fn create_multiplexed_session( + conn_pool: &ConnectionManager, + database: &str, + disable_route_to_leader: bool, + metrics: Arc, +) -> Result { + let mut client = conn_pool + .conn() + .with_metrics(metrics) + .with_metadata(client_metadata(database)); + let req = CreateSessionRequest { + database: database.to_string(), + session: Some(Session { + multiplexed: true, + ..Default::default() + }), + }; + let session = client + .create_session(req, disable_route_to_leader, None) + .await? + .into_inner(); + Ok(SessionHandle::new(session, client, Instant::now())) +} + async fn batch_create_sessions( spanner_client: Client, database: &str, @@ -762,6 +1049,7 @@ mod tests { use tokio_util::sync::CancellationToken; use google_cloud_gax::conn::{ConnectionOptions, Environment}; + use google_cloud_gax::grpc::{Code, Status}; use google_cloud_googleapis::spanner::v1::ExecuteSqlRequest; use crate::apiv1::conn_pool::ConnectionManager; @@ -1207,6 +1495,122 @@ mod tests { assert_eq!(sm.session_pool.inner.read().orphans.len(), 0); } + #[tokio::test(flavor = "multi_thread")] + #[serial] + async fn test_multiplexed_session_recreated_after_invalidation() { + let cm = ConnectionManager::new( + 1, + &Environment::Emulator("localhost:9010".to_string()), + "", + &ConnectionOptions::default(), + ) + .await + .unwrap(); + let config = SessionConfig { + use_multiplexed_session: true, + min_opened: 0, + max_opened: 1, + ..Default::default() + }; + let sm = SessionManager::new(DATABASE, cm, config, false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); + + // First get(): returns a clone of the freshly-created master. + let s1 = sm.get().await.unwrap(); + let name1 = (*s1).session.name.clone(); + drop(s1); + + // Simulate what invalidate_if_needed would do on a NotFound from + // any operation that uses the multiplexed session. + sm.multiplexed_invalid.store(true, Ordering::Release); + + // Next get(): must recreate the master and hand back a clone of + // the new session, which has a different server-assigned name. + let s2 = sm.get().await.unwrap(); + let name2 = (*s2).session.name.clone(); + + assert_ne!(name1, name2, "expected the master multiplexed session to be recreated"); + assert!(!sm.multiplexed_invalid.load(Ordering::Acquire), "invalid flag should be cleared after recreate"); + sm.close().await; + } + + #[tokio::test(flavor = "multi_thread")] + #[serial] + async fn test_stale_multiplexed_clone_does_not_reflag_invalid() { + let cm = ConnectionManager::new( + 1, + &Environment::Emulator("localhost:9010".to_string()), + "", + &ConnectionOptions::default(), + ) + .await + .unwrap(); + let config = SessionConfig { + use_multiplexed_session: true, + min_opened: 0, + max_opened: 1, + ..Default::default() + }; + let sm = SessionManager::new(DATABASE, cm, config, false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); + + // Take a clone at generation N. + let mut stale = sm.get().await.unwrap(); + + // Simulate a recreation having already happened by bumping the + // generation under the stale clone. Clear the invalid flag so we + // can see whether `invalidate_if_needed` flips it back. + sm.multiplexed_generation.fetch_add(1, Ordering::AcqRel); + sm.multiplexed_invalid.store(false, Ordering::Release); + + // Mimic a server-side `NotFound` arriving on the stale clone. The + // generation check must skip the flag flip. + let err = Status::new(Code::NotFound, "Session not found: projects/.../sessions/stale"); + let _ = stale.invalidate_if_needed::<()>(Err(err)).await; + assert!( + !sm.multiplexed_invalid.load(Ordering::Acquire), + "stale clone (older generation) must not re-flag the master invalid" + ); + drop(stale); + + // A fresh clone snapshots the current generation; a NotFound on + // it must flip the flag. + let mut fresh = sm.get().await.unwrap(); + let err = Status::new(Code::NotFound, "Session not found: projects/.../sessions/fresh"); + let _ = fresh.invalidate_if_needed::<()>(Err(err)).await; + assert!( + sm.multiplexed_invalid.load(Ordering::Acquire), + "current-generation clone must flip the master invalid flag" + ); + drop(fresh); + sm.close().await; + } + + #[tokio::test(flavor = "multi_thread")] + #[serial] + async fn test_get_after_close_returns_error() { + let cm = ConnectionManager::new( + 4, + &Environment::Emulator("localhost:9010".to_string()), + "", + &ConnectionOptions::default(), + ) + .await + .unwrap(); + let config = SessionConfig::default(); + let sm = SessionManager::new(DATABASE, cm, config, false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); + sm.close().await; + match sm.get().await { + Err(SessionError::FailedToCreateSession) => {} + Err(e) => panic!("expected FailedToCreateSession after close, got {e:?}"), + Ok(_) => panic!("expected FailedToCreateSession after close, got Ok"), + } + } + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_batch_create_sessions() { diff --git a/spanner/src/transaction.rs b/spanner/src/transaction.rs index 45ebec60..75a22b05 100644 --- a/spanner/src/transaction.rs +++ b/spanner/src/transaction.rs @@ -1,5 +1,9 @@ use std::ops::DerefMut; use std::sync::atomic::AtomicI64; +use std::sync::Arc; + +use google_cloud_googleapis::spanner::v1::MultiplexedSessionPrecommitToken; +use parking_lot::Mutex; use prost_types::Struct; @@ -7,8 +11,8 @@ use google_cloud_gax::grpc::Status; use google_cloud_gax::retry::RetrySetting; use google_cloud_googleapis::spanner::v1::request_options::Priority; use google_cloud_googleapis::spanner::v1::{ - execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest, - ReadRequest, RequestOptions, TransactionSelector, + execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, transaction_selector, + ExecuteSqlRequest, ReadRequest, RequestOptions, TransactionSelector, }; use crate::key::{Key, KeySet}; @@ -108,6 +112,15 @@ pub struct Transaction { /// disableRouteToLeader specifies if all the requests of type read-write and PDML /// need to be routed to the leader region. pub(crate) disable_route_to_leader: bool, + /// When using inline begin (multiplexed sessions), the transaction ID is + /// not known until the first RPC response. This shared slot is populated + /// by the first response that carries `metadata.transaction.id`. Once + /// populated, callers must upgrade `transaction_selector` to `Id(tx_id)`. + pub(crate) pending_inline_tx_id: Option>>>>, + /// The most recent precommit token received from Spanner for multiplexed + /// session transactions. Spanner Omni requires this to be included in the + /// Commit request when using inline begin. + pub(crate) pending_precommit_token: Option>>>, } impl Transaction { @@ -140,6 +153,7 @@ impl Transaction { statement: Statement, options: QueryOptions, ) -> Result, Status> { + self.resolve_inline_begin_selector(); let request = ExecuteSqlRequest { session: self.session.as_ref().unwrap().session.name.to_string(), transaction: Some(self.transaction_selector.clone()), @@ -166,7 +180,15 @@ impl Transaction { enable_resume: options.enable_resume, request, }; - RowIterator::new(session, reader, Some(options.call_options), self.disable_route_to_leader).await + RowIterator::new( + session, + reader, + Some(options.call_options), + self.disable_route_to_leader, + self.pending_inline_tx_id.clone(), + self.pending_precommit_token.clone(), + ) + .await } /// read returns a RowIterator for reading multiple rows from the database. @@ -207,6 +229,7 @@ impl Transaction { key_set: impl Into, options: ReadOptions, ) -> Result, Status> { + self.resolve_inline_begin_selector(); let request = ReadRequest { session: self.get_session_name(), transaction: Some(self.transaction_selector.clone()), @@ -228,9 +251,19 @@ impl Transaction { }; let disable_route_to_leader = self.disable_route_to_leader; + let inline_tx_id = self.pending_inline_tx_id.clone(); + let inline_precommit_token = self.pending_precommit_token.clone(); let session = self.as_mut_session(); let reader = TableReader { request }; - RowIterator::new(session, reader, Some(options.call_options), disable_route_to_leader).await + RowIterator::new( + session, + reader, + Some(options.call_options), + disable_route_to_leader, + inline_tx_id, + inline_precommit_token, + ) + .await } /// read returns a RowIterator for reading multiple rows from the database. @@ -270,6 +303,26 @@ impl Transaction { self.session.as_ref().unwrap().session.name.to_string() } + /// If inline begin captured a transaction ID in `pending_inline_tx_id` and + /// the selector is still `Begin(...)`, upgrade it to `Id(tx_id)` so the + /// next request joins the existing server-side transaction instead of + /// starting a new one. Idempotent; the slot is left in place so that + /// `ReadWriteTransaction::commit` and `rollback` can read the same value. + pub(crate) fn resolve_inline_begin_selector(&mut self) { + let Some(ref slot) = self.pending_inline_tx_id else { + return; + }; + if matches!(self.transaction_selector.selector, Some(transaction_selector::Selector::Id(_))) { + return; + } + let Some(tx_id) = slot.lock().clone() else { + return; + }; + self.transaction_selector = TransactionSelector { + selector: Some(transaction_selector::Selector::Id(tx_id)), + }; + } + pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession { self.session.as_mut().unwrap() } diff --git a/spanner/src/transaction_ro.rs b/spanner/src/transaction_ro.rs index df237a31..375906fb 100644 --- a/spanner/src/transaction_ro.rs +++ b/spanner/src/transaction_ro.rs @@ -66,6 +66,8 @@ impl ReadOnlyTransaction { }, transaction_tag: None, disable_route_to_leader: true, + pending_inline_tx_id: None, + pending_precommit_token: None, }, rts: None, }) @@ -106,6 +108,8 @@ impl ReadOnlyTransaction { }, transaction_tag: None, disable_route_to_leader: true, + pending_inline_tx_id: None, + pending_precommit_token: None, }, rts: Some(OffsetDateTime::from(st)), }) @@ -308,6 +312,6 @@ impl BatchReadOnlyTransaction { ) -> Result, Status> { let disable_route_to_leader = self.disable_route_to_leader; let session = self.as_mut_session(); - RowIterator::new(session, partition.reader, option, disable_route_to_leader).await + RowIterator::new(session, partition.reader, option, disable_route_to_leader, None, None).await } } diff --git a/spanner/src/transaction_rw.rs b/spanner/src/transaction_rw.rs index fd6757d9..de230a6e 100644 --- a/spanner/src/transaction_rw.rs +++ b/spanner/src/transaction_rw.rs @@ -1,6 +1,11 @@ use std::ops::Deref; use std::ops::DerefMut; use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use google_cloud_googleapis::spanner::v1::MultiplexedSessionPrecommitToken; +use parking_lot::Mutex; use std::time::Duration; use prost_types::Struct; @@ -34,6 +39,24 @@ pub struct CommitResult { pub mutation_count: Option, } +/// Update the precommit-token slot with `new_token` only if its `seq_num` is +/// strictly greater than the currently-stored token's `seq_num`. The proto +/// requires the client to send the precommit token with the highest `seq_num` +/// of the transaction attempt. +pub(crate) fn update_precommit_token( + slot: &Mutex>, + new_token: &MultiplexedSessionPrecommitToken, +) { + let mut guard = slot.lock(); + let should_replace = match guard.as_ref() { + None => true, + Some(cur) => new_token.seq_num > cur.seq_num, + }; + if should_replace { + *guard = Some(new_token.clone()); + } +} + impl From for CommitResult { fn from(value: CommitResponse) -> Self { Self { @@ -160,13 +183,43 @@ impl ReadWriteTransaction { transaction_tag: Option, disable_route_to_leader: bool, ) -> Result { + let is_read_write = matches!(mode, transaction_options::Mode::ReadWrite(_)); + let tx_options = TransactionOptions { + exclude_txn_from_change_streams: false, + mode: Some(mode), + isolation_level: IsolationLevel::Unspecified as i32, + }; + + // Multiplexed sessions use inline begin for read-write transactions: + // the transaction is started implicitly with the first RPC (query or + // DML), not via a separate BeginTransaction call. This matches the + // Java client's behaviour. (Read-only transactions on multiplexed + // sessions still use explicit BeginTransaction in this crate; see + // ReadOnlyTransaction::begin.) PartitionedDml is not supported by + // inline begin and must always go through BeginTransaction. + if session.session.multiplexed && is_read_write { + let pending_tx_id = Arc::new(Mutex::new(None)); + let pending_token = Arc::new(Mutex::new(None)); + return Ok(ReadWriteTransaction { + base_tx: Transaction { + session: Some(session), + sequence_number: AtomicI64::new(0), + transaction_selector: TransactionSelector { + selector: Some(transaction_selector::Selector::Begin(tx_options)), + }, + transaction_tag, + disable_route_to_leader, + pending_inline_tx_id: Some(pending_tx_id), + pending_precommit_token: Some(pending_token), + }, + tx_id: vec![], + wb: vec![], + }); + } + let request = BeginTransactionRequest { session: session.session.name.to_string(), - options: Some(TransactionOptions { - exclude_txn_from_change_streams: false, - mode: Some(mode), - isolation_level: IsolationLevel::Unspecified as i32, - }), + options: Some(tx_options), request_options: Transaction::create_request_options(options.priority, transaction_tag.clone()), mutation_key: None, }; @@ -190,12 +243,29 @@ impl ReadWriteTransaction { }, transaction_tag, disable_route_to_leader, + pending_inline_tx_id: None, + pending_precommit_token: None, }, tx_id: tx.id, wb: vec![], }) } + /// If inline begin captured a transaction ID, upgrade the selector to + /// `Id(tx_id)` and mirror the id into `self.tx_id` (used by commit and + /// rollback). Idempotent. + fn resolve_inline_begin(&mut self) { + self.base_tx.resolve_inline_begin_selector(); + if !self.tx_id.is_empty() { + return; + } + if let Some(ref slot) = self.base_tx.pending_inline_tx_id { + if let Some(tx_id) = slot.lock().clone() { + self.tx_id = tx_id; + } + } + } + pub fn buffer_write(&mut self, ms: Vec) { self.wb.extend_from_slice(&ms) } @@ -205,6 +275,7 @@ impl ReadWriteTransaction { } pub async fn update_with_option(&mut self, stmt: Statement, options: QueryOptions) -> Result { + self.resolve_inline_begin(); let request = ExecuteSqlRequest { session: self.get_session_name(), transaction: Some(self.transaction_selector.clone()), @@ -231,7 +302,29 @@ impl ReadWriteTransaction { .execute_sql(request, disable_route_to_leader, options.call_options.retry) .await; let response = session.invalidate_if_needed(result).await?; - Ok(extract_row_count(response.into_inner().stats)) + let result_set = response.into_inner(); + // When this is the first operation in an inline-begin transaction, + // the server returns the new transaction ID in the response metadata. + // Capture it so commit() can use Id(tx_id) instead of Begin. + if let Some(ref slot) = self.base_tx.pending_inline_tx_id { + if let Some(ref meta) = result_set.metadata { + if let Some(ref txn) = meta.transaction { + if !txn.id.is_empty() { + let mut guard = slot.lock(); + if guard.is_none() { + *guard = Some(txn.id.clone()); + } + } + } + } + } + // Capture the precommit token from DML responses (multiplexed sessions). + if let Some(ref slot) = self.base_tx.pending_precommit_token { + if let Some(token) = result_set.precommit_token.as_ref() { + update_precommit_token(slot, token); + } + } + Ok(extract_row_count(result_set.stats)) } pub async fn batch_update(&mut self, stmt: Vec) -> Result, Status> { @@ -243,6 +336,7 @@ impl ReadWriteTransaction { stmt: Vec, options: QueryOptions, ) -> Result, Status> { + self.resolve_inline_begin(); let request = ExecuteBatchDmlRequest { session: self.get_session_name(), transaction: Some(self.transaction_selector.clone()), @@ -268,13 +362,31 @@ impl ReadWriteTransaction { .spanner_client .execute_batch_dml(request, disable_route_to_leader, options.call_options.retry) .await; - let response = session.invalidate_if_needed(result).await?; - Ok(response - .into_inner() - .result_sets - .into_iter() - .map(|x| extract_row_count(x.stats)) - .collect()) + let response = session.invalidate_if_needed(result).await?.into_inner(); + // When this is the first operation in an inline-begin transaction, + // the server returns the new transaction ID in the metadata of the + // first ResultSet (only result_sets[0] carries valid metadata). + if let Some(ref slot) = self.base_tx.pending_inline_tx_id { + if let Some(first) = response.result_sets.first() { + if let Some(ref meta) = first.metadata { + if let Some(ref txn) = meta.transaction { + if !txn.id.is_empty() { + let mut guard = slot.lock(); + if guard.is_none() { + *guard = Some(txn.id.clone()); + } + } + } + } + } + } + // Capture the response-level precommit token (multiplexed sessions). + if let Some(ref slot) = self.base_tx.pending_precommit_token { + if let Some(token) = response.precommit_token.as_ref() { + update_precommit_token(slot, token); + } + } + Ok(response.result_sets.into_iter().map(|x| extract_row_count(x.stats)).collect()) } pub async fn end( @@ -349,14 +461,111 @@ impl ReadWriteTransaction { } pub(crate) async fn commit(&mut self, options: CommitOptions) -> Result { + self.resolve_inline_begin(); + // Mutations-only commit on a multiplexed session: no read/DML fired, + // so there is no server-side transaction yet. The Spanner protocol + // requires an explicit BeginTransaction with `mutation_key` set to one + // of the buffered mutations in this case. + if self.tx_id.is_empty() && self.base_tx.pending_inline_tx_id.is_some() && !self.wb.is_empty() { + self.begin_for_mutations_only(&options).await?; + } + // No server-side transaction was ever started (no read/DML and no + // mutations to commit). There is nothing to commit; return a synthetic + // empty response rather than shipping `TransactionId(empty)`, which + // the server would reject. + if self.tx_id.is_empty() { + return Ok(CommitResponse::default()); + } let tx_id = self.tx_id.clone(); let mutations = self.wb.to_vec(); let disable_route_to_leader = self.disable_route_to_leader; + // Collect the precommit token required by Spanner Omni for inline-begin + // transactions on multiplexed sessions. + let precommit_token = self + .base_tx + .pending_precommit_token + .as_ref() + .and_then(|slot| slot.lock().clone()); + let session = self.as_mut_session(); + commit(session, mutations, TransactionId(tx_id), options, disable_route_to_leader, precommit_token).await + } + + /// Issue an explicit BeginTransaction with `mutation_key` for the + /// multiplexed-session mutations-only case. The returned tx_id is mirrored + /// into the inline-begin slot, the selector, and `self.tx_id`. + async fn begin_for_mutations_only(&mut self, options: &CommitOptions) -> Result<(), Status> { + let tx_options = match self.base_tx.transaction_selector.selector { + Some(transaction_selector::Selector::Begin(ref opts)) => opts.clone(), + _ => TransactionOptions { + exclude_txn_from_change_streams: false, + mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())), + isolation_level: IsolationLevel::Unspecified as i32, + }, + }; + // Spanner v1 proto for BeginTransactionRequest.mutation_key advises: + // "Clients should randomly select one of the mutations from the + // mutation set and send it as a part of this request." A stable + // first-element pick would defeat the server's partition lookup + // load-spreading. Use clock nanos for a cheap pseudo-random index + // (no new crate dependency; spec only requires load distribution, + // not cryptographic randomness). + let mutation_key = if self.wb.is_empty() { + None + } else { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.subsec_nanos() as usize) + .unwrap_or(0); + self.wb.get(nanos % self.wb.len()).cloned() + }; + let request = BeginTransactionRequest { + session: self.get_session_name(), + options: Some(tx_options), + request_options: Transaction::create_request_options( + options.call_options.priority, + self.base_tx.transaction_tag.clone(), + ), + mutation_key, + }; + let disable_route_to_leader = self.disable_route_to_leader; + let retry = options.call_options.retry.clone(); let session = self.as_mut_session(); - commit(session, mutations, TransactionId(tx_id), options, disable_route_to_leader).await + let result = session.spanner_client.begin_transaction(request, disable_route_to_leader, retry).await; + let response = session.invalidate_if_needed(result).await?; + let tx = response.into_inner(); + if tx.id.is_empty() { + return Err(Status::new( + Code::Internal, + "BeginTransaction returned empty transaction id for mutations-only commit", + )); + } + // The server returns a precommit token on the BeginTransaction response + // when `mutation_key` is set on a multiplexed session; it must be sent + // back on the Commit request or the server rejects the commit. + if let Some(ref slot) = self.base_tx.pending_precommit_token { + if let Some(token) = tx.precommit_token.as_ref() { + update_precommit_token(slot, token); + } + } + if let Some(ref slot) = self.base_tx.pending_inline_tx_id { + *slot.lock() = Some(tx.id.clone()); + } + self.base_tx.transaction_selector = TransactionSelector { + selector: Some(transaction_selector::Selector::Id(tx.id.clone())), + }; + self.tx_id = tx.id; + Ok(()) } pub(crate) async fn rollback(&mut self, retry: Option) -> Result<(), Status> { + // Mirror any inline-begin tx_id captured by a prior read/DML so that + // rollback targets the right server-side transaction. + self.resolve_inline_begin(); + // If no server-side transaction was ever started (no read/DML and no + // mutations-only begin), there is nothing to roll back. + if self.tx_id.is_empty() { + return Ok(()); + } let request = RollbackRequest { transaction_id: self.tx_id.clone(), session: self.get_session_name(), @@ -378,6 +587,7 @@ pub(crate) async fn commit( tx: commit_request::Transaction, commit_options: CommitOptions, disable_route_to_leader: bool, + precommit_token: Option, ) -> Result { let request = CommitRequest { session: session.session.name.to_string(), @@ -389,7 +599,7 @@ pub(crate) async fn commit( ), return_commit_stats: commit_options.return_commit_stats, max_commit_delay: commit_options.max_commit_delay.map(|d| d.try_into().unwrap()), - precommit_token: None, + precommit_token, }; let result = session .spanner_client @@ -414,3 +624,46 @@ fn extract_row_count(rs: Option) -> i64 { None => 0, } } + +#[cfg(test)] +mod tests { + use super::*; + + fn token(bytes: &[u8], seq_num: i32) -> MultiplexedSessionPrecommitToken { + MultiplexedSessionPrecommitToken { + precommit_token: bytes.to_vec(), + seq_num, + } + } + + #[test] + fn update_precommit_token_stores_into_empty_slot() { + let slot = Mutex::new(None); + update_precommit_token(&slot, &token(b"a", 5)); + assert_eq!(slot.lock().as_ref().unwrap().seq_num, 5); + assert_eq!(slot.lock().as_ref().unwrap().precommit_token, b"a"); + } + + #[test] + fn update_precommit_token_replaces_when_seq_num_strictly_greater() { + let slot = Mutex::new(Some(token(b"a", 3))); + update_precommit_token(&slot, &token(b"b", 5)); + assert_eq!(slot.lock().as_ref().unwrap().seq_num, 5); + assert_eq!(slot.lock().as_ref().unwrap().precommit_token, b"b"); + } + + #[test] + fn update_precommit_token_keeps_existing_when_seq_num_lower() { + let slot = Mutex::new(Some(token(b"a", 5))); + update_precommit_token(&slot, &token(b"b", 3)); + assert_eq!(slot.lock().as_ref().unwrap().seq_num, 5); + assert_eq!(slot.lock().as_ref().unwrap().precommit_token, b"a"); + } + + #[test] + fn update_precommit_token_keeps_existing_when_seq_num_equal() { + let slot = Mutex::new(Some(token(b"a", 5))); + update_precommit_token(&slot, &token(b"b", 5)); + assert_eq!(slot.lock().as_ref().unwrap().precommit_token, b"a"); + } +} diff --git a/spanner/tests/multiplexed_test.rs b/spanner/tests/multiplexed_test.rs new file mode 100644 index 00000000..27e063e6 --- /dev/null +++ b/spanner/tests/multiplexed_test.rs @@ -0,0 +1,223 @@ +//! Integration tests that exercise the multiplexed-session paths against a +//! running Spanner emulator (or Spanner Omni). The standard Cloud Spanner +//! emulator did not historically support multiplexed sessions; if the +//! emulator at SPANNER_EMULATOR_HOST rejects `multiplexed: true` these tests +//! will fail loudly rather than silently passing on a non-multiplexed path. +//! +//! All tests build a Client with `session_config.use_multiplexed_session = +//! true` so the multiplexed code path is the one under test. + +use serial_test::serial; +use time::OffsetDateTime; + +use common::*; +use gcloud_spanner::client::{ChannelConfig, Client, ClientConfig}; +use gcloud_spanner::session::SessionConfig; +use gcloud_spanner::statement::Statement; +use google_cloud_gax::conn::Environment; + +mod common; + +const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database"; +const ENDPOINT: &str = "localhost:9010"; + +#[ctor::ctor] +fn init() { + let filter = tracing_subscriber::filter::EnvFilter::from_default_env() + .add_directive("google_cloud_spanner=trace".parse().unwrap()); + let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init(); + std::env::set_var("SPANNER_EMULATOR_HOST", ENDPOINT); +} + +async fn multiplexed_client() -> Client { + let mut session_config = SessionConfig::default(); + session_config.use_multiplexed_session = true; + session_config.min_opened = 0; + session_config.max_opened = 1; + Client::new( + DATABASE, + ClientConfig { + session_config, + environment: Environment::Emulator(ENDPOINT.to_string()), + channel_config: ChannelConfig { + num_channels: 1, + ..Default::default() + }, + ..Default::default() + }, + ) + .await + .unwrap() +} + +/// Inline-begin path: the very first op of a RW transaction is a query. +/// Verifies that tx_id is captured from the first PartialResultSet's metadata +/// and that commit succeeds. +#[tokio::test] +#[serial] +async fn test_multiplexed_inline_begin_query_then_commit() { + let now = OffsetDateTime::now_utc(); + let user_id = format!("mux_q_{}", now.unix_timestamp_nanos()); + let bootstrap = create_data_client().await; + bootstrap + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + + let client = multiplexed_client().await; + let user_id_clone = user_id.clone(); + let result = client + .read_write_transaction(|tx| { + let user_id = user_id_clone.clone(); + Box::pin(async move { + let mut stmt = Statement::new("SELECT NotNullINT64 FROM User WHERE UserId = @uid"); + stmt.add_param("uid", &user_id); + let mut iter = tx.query(stmt).await?; + let row = iter.next().await?.expect("row"); + let v: i64 = row.column::(0)?; + Ok::<_, gcloud_spanner::client::Error>(v) + }) + }) + .await; + let (_cr, v) = result.unwrap(); + assert_eq!(v, 1); +} + +/// Mutations-only commit: no read/DML fires, so `commit()` must call +/// `begin_for_mutations_only` (explicit BeginTransaction with mutation_key) +/// and forward the precommit_token from that response on Commit. +#[tokio::test] +#[serial] +async fn test_multiplexed_mutations_only_commit() { + let now = OffsetDateTime::now_utc(); + let user_id = format!("mux_m_{}", now.unix_timestamp_nanos()); + let client = multiplexed_client().await; + // Goes through read_write_transaction_sync_with_option -> begin (inline) + // -> buffer_write -> commit -> begin_for_mutations_only. + let cr = client + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + assert!(cr.timestamp.is_some()); +} + +/// Empty transaction: begin + immediate commit with nothing buffered. +/// Verifies the empty-tx_id guard returns a synthetic empty CommitResponse +/// instead of shipping `TransactionId(empty)` to the server. +#[tokio::test] +#[serial] +async fn test_multiplexed_empty_transaction_commits_cleanly() { + let client = multiplexed_client().await; + let result = client + .read_write_transaction(|_tx| { + Box::pin(async move { Ok::<(), gcloud_spanner::client::Error>(()) }) + }) + .await; + let (_cr, _) = result.unwrap(); +} + +/// Multiple ops on the same multiplexed RW transaction: the first op +/// captures tx_id; subsequent ops should reuse it (no second BeginTransaction +/// or constructor-side pre-fetch divergence). +#[tokio::test] +#[serial] +async fn test_multiplexed_multi_op_transaction() { + let now = OffsetDateTime::now_utc(); + let user_id = format!("mux_x_{}", now.unix_timestamp_nanos()); + let bootstrap = create_data_client().await; + bootstrap + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + + let client = multiplexed_client().await; + let user_id_clone = user_id.clone(); + let result = client + .read_write_transaction(|tx| { + let user_id = user_id_clone.clone(); + Box::pin(async move { + let mut stmt1 = Statement::new("SELECT NotNullINT64 FROM User WHERE UserId = @uid"); + stmt1.add_param("uid", &user_id); + let mut iter1 = tx.query(stmt1).await?; + let _ = iter1.next().await?; + + let mut stmt2 = Statement::new("SELECT NotNullFloat64 FROM User WHERE UserId = @uid"); + stmt2.add_param("uid", &user_id); + let mut iter2 = tx.query(stmt2).await?; + let row = iter2.next().await?.expect("row"); + let v: f64 = row.column::(0)?; + Ok::<_, gcloud_spanner::client::Error>(v) + }) + }) + .await; + let (_cr, v) = result.unwrap(); + assert_eq!(v, 1.0); +} + +/// `apply_at_least_once` on a multiplexed session must route through the RW +/// transaction path (single-use commits with mutations are not supported on +/// multiplexed sessions: server rejects with UNIMPLEMENTED). +#[tokio::test] +#[serial] +async fn test_multiplexed_apply_at_least_once_routes_through_rw() { + let now = OffsetDateTime::now_utc(); + let user_id = format!("mux_aalo_{}", now.unix_timestamp_nanos()); + let client = multiplexed_client().await; + let result = client + .apply_at_least_once(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + assert!(result.is_some()); +} + +/// `partitioned_update` on a multiplexed session must take the explicit +/// BeginTransaction path. ExecuteSql with `Begin(PartitionedDml)` on a +/// multiplexed session is rejected (NOT_FOUND); the fix gates inline begin +/// on `mode == ReadWrite`. +#[tokio::test] +#[serial] +async fn test_multiplexed_partitioned_update() { + let client = multiplexed_client().await; + let stmt = Statement::new("DELETE FROM Guild WHERE GuildId = \"never_exists_mux\""); + let n = client.partitioned_update(stmt).await.unwrap(); + assert_eq!(n, 0); +} + +/// DML followed by buffered mutations in the same RW transaction. Exercises +/// the inline-begin tx_id capture from an ExecuteSql response (different +/// site than the streaming-read or mutations-only paths) followed by a +/// commit that ships both the buffered mutations and the captured +/// precommit_token. +#[tokio::test] +#[serial] +async fn test_multiplexed_dml_and_mutations() { + let now = OffsetDateTime::now_utc(); + let user_id = format!("mux_dml_{}", now.unix_timestamp_nanos()); + let bootstrap = create_data_client().await; + bootstrap + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + + let client = multiplexed_client().await; + let user_id_clone = user_id.clone(); + let result = client + .read_write_transaction(|tx| { + let user_id = user_id_clone.clone(); + Box::pin(async move { + // First op: an UPDATE — captures tx_id via ExecuteSql metadata + // and populates the precommit_token slot from the DML response. + let mut stmt = Statement::new("UPDATE User SET NullableINT64 = 7 WHERE UserId = @uid"); + stmt.add_param("uid", &user_id); + let updated = tx.update(stmt).await?; + // Second op: buffer a mutation under the same tx. Commit must + // ship the mutation against the captured tx_id (Id selector, + // not Begin) along with the precommit_token from the DML. + tx.buffer_write(vec![create_user_item_mutation(&user_id, 99)]); + Ok::<_, gcloud_spanner::client::Error>(updated) + }) + }) + .await; + let (_cr, updated) = result.unwrap(); + assert_eq!(updated, 1); +}