Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions spanner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,42 @@ pub struct ClientConfig {

impl Default for ClientConfig {
fn default() -> Self {
// SPANNER_OMNI_ENDPOINT and SPANNER_EMULATOR_HOST both reuse the
// `Environment::Emulator` variant because they share the same
// connection profile we need for our Omni deployment: plain-text gRPC
// and no auth interceptor. This is a deliberate overload — the variant
// name describes the connection shape we want, not the kind of server
// on the other end. If your Omni deployment requires TLS or auth, do
// not rely on `Default`; build a `ClientConfig` with a custom
// `environment` instead.
//
// SPANNER_OMNI_ENDPOINT takes precedence over SPANNER_EMULATOR_HOST,
// and additionally flips `use_multiplexed_session` on (Omni requires
// multiplexed sessions for read-write transactions).
let (environment, use_multiplexed, env_source) = if let Ok(v) = var("SPANNER_OMNI_ENDPOINT") {
(Environment::Emulator(v), true, Some("SPANNER_OMNI_ENDPOINT"))
} else if let Ok(v) = var("SPANNER_EMULATOR_HOST") {
(Environment::Emulator(v), false, Some("SPANNER_EMULATOR_HOST"))
} else {
(Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})), false, None)
};
if let Some(name) = env_source {
tracing::info!(
env_var = name,
use_multiplexed_session = use_multiplexed,
"ClientConfig::default(): using plaintext+no-auth profile because {} is set",
name,
);
}

let mut session_config = SessionConfig::default();
session_config.use_multiplexed_session = use_multiplexed;

let mut config = ClientConfig {
channel_config: Default::default(),
session_config: Default::default(),
session_config,
endpoint: SPANNER.to_string(),
environment: match var("SPANNER_EMULATOR_HOST").ok() {
Some(v) => Environment::Emulator(v),
None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
},
environment,
disable_route_to_leader: false,
metrics: MetricsConfig::default(),
};
Expand Down Expand Up @@ -415,6 +443,22 @@ impl Client {
let ro = TransactionRetrySetting::default();
let mut session = self.get_session().await?;

// Multiplexed sessions reject mutation commits sent through a
// SingleUseTransaction (there is no place to attach the required
// `mutation_key`/`precommit_token`). Fall back to the regular RW
// transaction path, which routes through `begin_for_mutations_only`
// and includes the precommit token on Commit. The single-RPC
// optimisation does not apply on multiplexed sessions.
if session.session.multiplexed {
drop(session);
let opts = ReadWriteTransactionOption {
commit_options: options,
..Default::default()
};
let result = self.apply_with_option(ms, opts).await?;
return Ok(Some(result));
}

invoke_fn(
Some(ro),
|session| async {
Expand All @@ -423,7 +467,7 @@ impl Client {
mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
isolation_level: IsolationLevel::Unspecified as i32,
});
match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader).await {
match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader, None).await {
Ok(s) => Ok(Some(s.into())),
Err(e) => Err((Error::GRPC(e), session)),
}
Expand Down
201 changes: 195 additions & 6 deletions spanner/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use google_cloud_googleapis::spanner::v1::MultiplexedSessionPrecommitToken;
use parking_lot::Mutex;

use prost::Message;
use prost_types::{value::Kind, Value};

use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
use google_cloud_googleapis::spanner::v1::struct_type::Field;
use google_cloud_googleapis::spanner::v1::transaction_selector::Selector as TransactionSelectorEnum;
use google_cloud_googleapis::spanner::v1::TransactionSelector;
use google_cloud_googleapis::spanner::v1::{
ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
};
Expand All @@ -15,6 +20,33 @@ use crate::row::Row;
use crate::session::SessionHandle;
use crate::transaction::CallOptions;

/// Capture the inline-begin transaction id (from the first response's metadata)
/// and the latest multiplexed-session precommit token from a PartialResultSet
/// into the slots shared with the parent transaction.
fn capture_inline_metadata(
inline_tx_id: &Option<Arc<Mutex<Option<Vec<u8>>>>>,
inline_precommit_token: &Option<Arc<Mutex<Option<MultiplexedSessionPrecommitToken>>>>,
prs: &PartialResultSet,
) {
if let Some(slot) = inline_tx_id {
if let Some(meta) = prs.metadata.as_ref() {
if let Some(txn) = meta.transaction.as_ref() {
if !txn.id.is_empty() {
let mut guard = slot.lock();
if guard.is_none() {
*guard = Some(txn.id.clone());
}
}
}
}
}
if let Some(slot) = inline_precommit_token {
if let Some(token) = prs.precommit_token.as_ref() {
crate::transaction_rw::update_precommit_token(slot, token);
}
}
}

pub trait Reader: Send + Sync {
fn read(
&self,
Expand All @@ -25,6 +57,16 @@ pub trait Reader: Send + Sync {

fn update_token(&mut self, resume_token: Vec<u8>);

/// Replace the request's transaction selector with `Id(tx_id)`. Used
/// after inline-begin captures the server-assigned tx_id so that any
/// subsequent retry of the same streaming RPC does not re-issue a
/// `Begin(...)` selector and accidentally start a second server-side
/// transaction. Default impl is a no-op so external `Reader`
/// implementations don't need to be updated; only callers that
/// participate in inline-begin (the in-crate `StatementReader` /
/// `TableReader`) need to override it.
fn update_transaction_selector(&mut self, _tx_id: Vec<u8>) {}

fn can_resume(&self) -> bool;
}

Expand Down Expand Up @@ -52,6 +94,12 @@ impl Reader for StatementReader {
self.request.resume_token = resume_token;
}

fn update_transaction_selector(&mut self, tx_id: Vec<u8>) {
self.request.transaction = Some(TransactionSelector {
selector: Some(TransactionSelectorEnum::Id(tx_id)),
});
}

fn can_resume(&self) -> bool {
self.enable_resume && !self.request.resume_token.is_empty()
}
Expand Down Expand Up @@ -80,6 +128,12 @@ impl Reader for TableReader {
self.request.resume_token = resume_token;
}

fn update_transaction_selector(&mut self, tx_id: Vec<u8>) {
self.request.transaction = Some(TransactionSelector {
selector: Some(TransactionSelectorEnum::Id(tx_id)),
});
}

fn can_resume(&self) -> bool {
!self.request.resume_token.is_empty()
}
Expand Down Expand Up @@ -287,6 +341,13 @@ where
resumable: bool,
end_of_stream: bool,
stream_retry: StreamingRetry,
/// Populated by the first PartialResultSet when using inline begin.
/// The RowIterator writes the transaction ID here; the owning
/// ReadWriteTransaction reads it to upgrade its selector to Id(tx_id).
inline_tx_id: Option<Arc<Mutex<Option<Vec<u8>>>>>,
/// Updated with the latest precommit token from each PartialResultSet.
/// Required in the Commit request for multiplexed session transactions.
inline_precommit_token: Option<Arc<Mutex<Option<MultiplexedSessionPrecommitToken>>>>,
}

impl<'a, T> RowIterator<'a, T>
Expand All @@ -298,8 +359,10 @@ where
reader: T,
option: Option<CallOptions>,
disable_route_to_leader: bool,
inline_tx_id: Option<Arc<Mutex<Option<Vec<u8>>>>>,
inline_precommit_token: Option<Arc<Mutex<Option<MultiplexedSessionPrecommitToken>>>>,
) -> Result<RowIterator<'a, T>, Status> {
let streaming = reader
let mut streaming = reader
.read(session, option, disable_route_to_leader)
.await?
.into_inner();
Expand All @@ -309,6 +372,36 @@ where
rows: VecDeque::new(),
chunked_value: false,
};
let mut prs_buffer = ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS);
let mut end_of_stream = false;
let mut resumable = true;
// For inline-begin (multiplexed sessions) the parent transaction
// depends on the tx_id carried in the first PartialResultSet's
// metadata. Pre-fetch that frame here so the id is captured before
// the caller has any chance to drop the iterator unconsumed. Once
// the slot has been populated by the first op of this transaction,
// subsequent ops should behave like non-multiplexed iterators
// (no constructor pre-fetch).
let needs_inline_prefetch = inline_tx_id
.as_ref()
.is_some_and(|slot| slot.lock().is_none());
if needs_inline_prefetch {
match streaming.message().await? {
Some(prs) => {
if prs.last {
end_of_stream = true;
}
capture_inline_metadata(&inline_tx_id, &inline_precommit_token, &prs);
prs_buffer.push(prs);
if prs_buffer.unretryable {
resumable = false;
}
}
None => {
end_of_stream = true;
}
}
}
Ok(Self {
streaming,
session,
Expand All @@ -317,10 +410,12 @@ where
reader_option: None,
disable_route_to_leader,
stats: None,
prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS),
resumable: true,
end_of_stream: false,
prs_buffer,
resumable,
end_of_stream,
stream_retry: StreamingRetry::new(),
inline_tx_id,
inline_precommit_token,
})
}

Expand Down Expand Up @@ -367,6 +462,16 @@ where
}
tracing::debug!("streaming error: {}. resume reading by resume_token", e);
self.stream_retry.next(e).await?;
// If inline begin already captured the server-assigned
// tx_id, upgrade the cached request's selector to
// `Id(tx_id)` before retrying so the server does not
// start a second transaction in response to a `Begin`
// + resume_token combination.
if let Some(slot) = self.inline_tx_id.as_ref() {
if let Some(tx_id) = slot.lock().clone() {
self.reader.update_transaction_selector(tx_id);
}
}
let call_option = option.clone();
let result = self
.reader
Expand All @@ -383,6 +488,7 @@ where
if result_set.last {
self.end_of_stream = true;
}
capture_inline_metadata(&self.inline_tx_id, &self.inline_precommit_token, &result_set);
self.prs_buffer.push(result_set);
if self.prs_buffer.unretryable {
self.resumable = false;
Expand Down Expand Up @@ -441,9 +547,13 @@ mod tests {
use prost_types::Value;

use google_cloud_googleapis::spanner::v1::struct_type::Field;
use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType};
use google_cloud_googleapis::spanner::v1::transaction_selector::Selector as TxSelector;
use google_cloud_googleapis::spanner::v1::{
transaction_options, ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, StructType,
TransactionOptions, TransactionSelector,
};

use crate::reader::ResultSet;
use crate::reader::{Reader, ResultSet, StatementReader, TableReader};
use crate::row::{Row, TryFromValue};
use crate::statement::ToKind;

Expand Down Expand Up @@ -947,4 +1057,83 @@ mod tests {
};
assert!(!rs_partial.is_row_boundary());
}

fn begin_selector() -> Option<TransactionSelector> {
Some(TransactionSelector {
selector: Some(TxSelector::Begin(TransactionOptions {
exclude_txn_from_change_streams: false,
mode: Some(transaction_options::Mode::ReadWrite(
transaction_options::ReadWrite::default(),
)),
isolation_level: 0,
})),
})
}

#[test]
fn statement_reader_update_transaction_selector_replaces_begin_with_id() {
let mut reader = StatementReader {
enable_resume: true,
request: ExecuteSqlRequest {
transaction: begin_selector(),
resume_token: b"some-token".to_vec(),
..Default::default()
},
};
reader.update_transaction_selector(b"tx-1".to_vec());
match reader.request.transaction.as_ref().unwrap().selector.as_ref().unwrap() {
TxSelector::Id(id) => assert_eq!(id, b"tx-1"),
other => panic!("expected Id selector, got {other:?}"),
}
// Resume token must not be touched — that's update_token's job.
assert_eq!(reader.request.resume_token, b"some-token");
}

#[test]
fn table_reader_update_transaction_selector_replaces_begin_with_id() {
let mut reader = TableReader {
request: ReadRequest {
transaction: begin_selector(),
resume_token: b"some-token".to_vec(),
..Default::default()
},
};
reader.update_transaction_selector(b"tx-1".to_vec());
match reader.request.transaction.as_ref().unwrap().selector.as_ref().unwrap() {
TxSelector::Id(id) => assert_eq!(id, b"tx-1"),
other => panic!("expected Id selector, got {other:?}"),
}
assert_eq!(reader.request.resume_token, b"some-token");
}

/// Compile-time canary: an external `Reader` impl must be able to omit
/// `update_transaction_selector` and rely on the trait's default body.
/// If someone removes the default body to make the method required
/// again, this stops compiling — which is a SemVer-major break for
/// downstream crates with their own `Reader` impls.
#[test]
fn reader_trait_has_default_update_transaction_selector() {
struct ExternalStyleReader;
impl Reader for ExternalStyleReader {
async fn read(
&self,
_session: &mut crate::session::SessionHandle,
_option: Option<crate::transaction::CallOptions>,
_disable_route_to_leader: bool,
) -> Result<
google_cloud_gax::grpc::Response<google_cloud_gax::grpc::Streaming<PartialResultSet>>,
google_cloud_gax::grpc::Status,
> {
unreachable!("not invoked")
}
fn update_token(&mut self, _resume_token: Vec<u8>) {}
fn can_resume(&self) -> bool {
false
}
// Intentionally NO `update_transaction_selector` — relies on default.
}
// Default impl is a no-op; calling it must not panic.
let mut r = ExternalStyleReader;
r.update_transaction_selector(b"tx".to_vec());
}
}
Loading