Skip to content

Commit 1976070

Browse files
committed
lint
1 parent ffcb5ed commit 1976070

12 files changed

Lines changed: 57 additions & 76 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,9 @@ pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
291291
)
292292
.into_response();
293293
if !prepare_id.is_empty() {
294-
response.headers_mut().insert(
295-
"X-Prepare-Id",
296-
http::HeaderValue::from_str(&prepare_id).unwrap(),
297-
);
294+
response
295+
.headers_mut()
296+
.insert("X-Prepare-Id", http::HeaderValue::from_str(&prepare_id).unwrap());
298297
}
299298
Ok(response)
300299
}

crates/core/src/db/relational_db.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ type RowCountFn = Arc<dyn Fn(TableId, &str) -> i64 + Send + Sync>;
7676
/// The type of transactions committed by [RelationalDB].
7777
pub type Txdata = commitlog::payload::Txdata<ProductValue>;
7878

79-
8079
/// We've added a module version field to the system tables, but we don't yet
8180
/// have the infrastructure to support multiple versions.
8281
/// All modules are currently locked to this version, but this will be
@@ -460,9 +459,7 @@ impl RelationalDB {
460459
/// If non-empty, the caller must resume these transactions: retransmit PREPARED to
461460
/// the coordinator and await a COMMIT or ABORT decision before allowing normal operation.
462461
pub fn pending_2pc_prepares(&self) -> Result<Vec<String>, DBError> {
463-
self.with_auto_commit(Workload::Internal, |tx| {
464-
tx.scan_st_2pc_state().map_err(DBError::from)
465-
})
462+
self.with_auto_commit(Workload::Internal, |tx| tx.scan_st_2pc_state().map_err(DBError::from))
466463
}
467464

468465
/// Read the set of clients currently connected to the database.

crates/core/src/host/instance_env.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,10 @@ impl InstanceEnv {
10341034
let result = async {
10351035
let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?;
10361036
let status = response.status().as_u16();
1037-
let body = response.bytes().await.map_err(|e| NodesError::HttpError(e.to_string()))?;
1037+
let body = response
1038+
.bytes()
1039+
.await
1040+
.map_err(|e| NodesError::HttpError(e.to_string()))?;
10381041
Ok((status, body))
10391042
}
10401043
.await;
@@ -1102,7 +1105,10 @@ impl InstanceEnv {
11021105
.get("X-Prepare-Id")
11031106
.and_then(|v| v.to_str().ok())
11041107
.map(|s| s.to_owned());
1105-
let body = response.bytes().await.map_err(|e| NodesError::HttpError(e.to_string()))?;
1108+
let body = response
1109+
.bytes()
1110+
.await
1111+
.map_err(|e| NodesError::HttpError(e.to_string()))?;
11061112
Ok((status, body, prepare_id))
11071113
}
11081114
.await;
@@ -1121,9 +1127,7 @@ impl InstanceEnv {
11211127
}
11221128

11231129
/// Commit all prepared participants (called after coordinator's reducer succeeds).
1124-
pub fn commit_all_prepared(
1125-
&mut self,
1126-
) -> impl Future<Output = ()> + use<> {
1130+
pub fn commit_all_prepared(&mut self) -> impl Future<Output = ()> + use<> {
11271131
let participants = mem::take(&mut self.prepared_participants);
11281132
let client = self.replica_ctx.call_reducer_client.clone();
11291133
let router = self.replica_ctx.call_reducer_router.clone();
@@ -1167,9 +1171,7 @@ impl InstanceEnv {
11671171
}
11681172

11691173
/// Abort all prepared participants (called when coordinator's reducer fails).
1170-
pub fn abort_all_prepared(
1171-
&mut self,
1172-
) -> impl Future<Output = ()> + use<> {
1174+
pub fn abort_all_prepared(&mut self) -> impl Future<Output = ()> + use<> {
11731175
let participants = mem::take(&mut self.prepared_participants);
11741176
let client = self.replica_ctx.call_reducer_client.clone();
11751177
let router = self.replica_ctx.call_reducer_router.clone();

crates/core/src/host/module_host.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,8 +1791,7 @@ impl ModuleHost {
17911791
let prepare_id = format!("prepare-{}", PREPARE_COUNTER.fetch_add(1, Ordering::Relaxed));
17921792

17931793
// Channel for signalling PREPARED result back to this task.
1794-
let (prepared_tx, prepared_rx) =
1795-
tokio::sync::oneshot::channel::<(ReducerCallResult, Option<Bytes>)>();
1794+
let (prepared_tx, prepared_rx) = tokio::sync::oneshot::channel::<(ReducerCallResult, Option<Bytes>)>();
17961795
// Channel for sending the COMMIT/ABORT decision to the executor thread.
17971796
let (decision_tx, decision_rx) = std::sync::mpsc::channel::<bool>();
17981797

@@ -1855,7 +1854,8 @@ impl ModuleHost {
18551854

18561855
/// Finalize a prepared transaction as COMMIT.
18571856
pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> {
1858-
let info = self.prepared_txs
1857+
let info = self
1858+
.prepared_txs
18591859
.remove(prepare_id)
18601860
.ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?;
18611861
// Unblock the executor thread to commit.
@@ -1865,7 +1865,8 @@ impl ModuleHost {
18651865

18661866
/// Abort a prepared transaction.
18671867
pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> {
1868-
let info = self.prepared_txs
1868+
let info = self
1869+
.prepared_txs
18691870
.remove(prepare_id)
18701871
.ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?;
18711872
// Unblock the executor thread to abort.

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,9 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
654654
if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| {
655655
Ok(del_tx.delete_st_2pc_state(&prepare_id)?)
656656
}) {
657-
log::error!("call_reducer_prepare_and_hold: abort: failed to delete st_2pc_state for {prepare_id}: {e}");
657+
log::error!(
658+
"call_reducer_prepare_and_hold: abort: failed to delete st_2pc_state for {prepare_id}: {e}"
659+
);
658660
}
659661
}
660662
}
@@ -953,11 +955,9 @@ impl InstanceCommon {
953955
std::thread::scope(|s| {
954956
s.spawn(|| {
955957
handle.block_on(async {
956-
if committed {
957-
if let Some(mut durable_offset) = stdb.durable_tx_offset() {
958-
let current: u64 = durable_offset.last_seen().unwrap_or(0);
959-
let _ = durable_offset.wait_for(current + 1).await;
960-
}
958+
if committed && let Some(mut durable_offset) = stdb.durable_tx_offset() {
959+
let current: u64 = durable_offset.last_seen().unwrap_or(0);
960+
let _ = durable_offset.wait_for(current + 1).await;
961961
}
962962

963963
let client = replica_ctx.call_reducer_client.clone();

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2076,12 +2076,10 @@ impl WasmInstanceEnv {
20762076
match result {
20772077
Ok((status, body, prepare_id)) => {
20782078
// If we got a prepare_id, register this participant.
2079-
if let Some(pid) = prepare_id {
2080-
if status < 300 {
2081-
env.instance_env
2082-
.prepared_participants
2083-
.push((database_identity, pid));
2084-
}
2079+
if let Some(pid) = prepare_id
2080+
&& status < 300
2081+
{
2082+
env.instance_env.prepared_participants.push((database_identity, pid));
20852083
}
20862084
let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?;
20872085
bytes_source.0.write_to(mem, out)?;

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,13 +1430,11 @@ impl CommittedState {
14301430
row: &ProductValue,
14311431
) -> Result<TxData> {
14321432
let (table, blob_store, pool) = self.get_table_and_blob_store_or_create(table_id, schema);
1433-
table
1434-
.insert(pool, blob_store, row)
1435-
.map_err(|e| match e {
1436-
InsertError::Duplicate(e) => DatastoreError::from(TableError::Duplicate(e)),
1437-
InsertError::Bflatn(e) => DatastoreError::from(TableError::Bflatn(e)),
1438-
InsertError::IndexError(e) => DatastoreError::from(IndexError::UniqueConstraintViolation(e)),
1439-
})?;
1433+
table.insert(pool, blob_store, row).map_err(|e| match e {
1434+
InsertError::Duplicate(e) => DatastoreError::from(TableError::Duplicate(e)),
1435+
InsertError::Bflatn(e) => DatastoreError::from(TableError::Bflatn(e)),
1436+
InsertError::IndexError(e) => DatastoreError::from(IndexError::UniqueConstraintViolation(e)),
1437+
})?;
14401438

14411439
let row_arc: Arc<[ProductValue]> = Arc::from([row.clone()]);
14421440
let mut tx_data = TxData::default();

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ use crate::{
2020
use crate::{
2121
error::{IndexError, SequenceError, TableError},
2222
system_tables::{
23-
with_sys_table_buf, StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields,
24-
StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields,
25-
StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow,
26-
StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow,
27-
StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID,
28-
ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID,
29-
ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, St2pcStateFields, St2pcStateRow, ST_2PC_STATE_ID,
23+
with_sys_table_buf, St2pcStateFields, St2pcStateRow, StClientFields, StClientRow, StColumnAccessorFields,
24+
StColumnAccessorRow, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow,
25+
StFields as _, StIndexAccessorFields, StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields,
26+
StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow,
27+
StTableAccessorFields, StTableAccessorRow, StTableFields, StTableRow, SystemTable, ST_2PC_STATE_ID,
28+
ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID,
29+
ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID,
3030
},
3131
};
3232
use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow};
@@ -2774,9 +2774,7 @@ impl MutTxId {
27742774
// Collect deletes: row pointers live in the committed state; read them
27752775
// without deleting.
27762776
for (table_id, delete_table) in &self.tx_state.delete_tables {
2777-
if let Ok((table, blob_store, _)) =
2778-
self.committed_state_write_lock.get_table_and_blob_store(*table_id)
2779-
{
2777+
if let Ok((table, blob_store, _)) = self.committed_state_write_lock.get_table_and_blob_store(*table_id) {
27802778
let rows: std::sync::Arc<[ProductValue]> = delete_table
27812779
.iter()
27822780
.map(|row_ptr| {

crates/smoketests/tests/smoketests/cross_db_2pc.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,12 @@ fn test_cross_db_2pc_happy_path() {
9191
// Publish bank B (the participant that will be debited).
9292
test.publish_module_named(&db_b_name, false)
9393
.expect("failed to publish bank B");
94-
let db_b_identity = test
95-
.database_identity
96-
.clone()
97-
.expect("bank B identity not set");
94+
let db_b_identity = test.database_identity.clone().expect("bank B identity not set");
9895

9996
// Publish bank A (the coordinator that will be credited).
10097
test.publish_module_named(&db_a_name, false)
10198
.expect("failed to publish bank A");
102-
let _db_a_identity = test
103-
.database_identity
104-
.clone()
105-
.expect("bank A identity not set");
99+
let _db_a_identity = test.database_identity.clone().expect("bank A identity not set");
106100

107101
// Transfer 50 from B's alice to A's alice.
108102
// The coordinator is bank A. It credits locally, then calls debit on B via 2PC.
@@ -155,10 +149,7 @@ fn test_cross_db_2pc_abort_insufficient_funds() {
155149
// Publish bank B.
156150
test.publish_module_named(&db_b_name, false)
157151
.expect("failed to publish bank B");
158-
let db_b_identity = test
159-
.database_identity
160-
.clone()
161-
.expect("bank B identity not set");
152+
let db_b_identity = test.database_identity.clone().expect("bank B identity not set");
162153

163154
// Publish bank A.
164155
test.publish_module_named(&db_a_name, false)
@@ -167,7 +158,10 @@ fn test_cross_db_2pc_abort_insufficient_funds() {
167158
// Try to transfer 200 -- B only has 100, so the remote debit will fail.
168159
let result = test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "200"]);
169160
// The call should fail because the remote debit panicked.
170-
assert!(result.is_err(), "Expected transfer_funds to fail due to insufficient funds");
161+
assert!(
162+
result.is_err(),
163+
"Expected transfer_funds to fail due to insufficient funds"
164+
);
171165

172166
// Verify bank A: alice should still have 100 (the local credit was rolled back).
173167
let result_a = test

tools/tpcc-runner/src/client.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,10 @@ impl ModuleClient {
8686
increment_pending(pending);
8787
let pending_for_callback = Arc::clone(pending);
8888
let errors = Arc::clone(errors);
89-
if let Err(err) = self
90-
.conn
91-
.reducers
92-
.load_remote_warehouses_then(rows, move |_, res| {
93-
handle_reducer_result("load_remote_warehouses", res, &errors);
94-
decrement_pending(&pending_for_callback);
95-
})
96-
{
89+
if let Err(err) = self.conn.reducers.load_remote_warehouses_then(rows, move |_, res| {
90+
handle_reducer_result("load_remote_warehouses", res, &errors);
91+
decrement_pending(&pending_for_callback);
92+
}) {
9793
decrement_pending(pending);
9894
return Err(anyhow!("load_remote_warehouses send error: {err}"));
9995
}

0 commit comments

Comments
 (0)