Skip to content

Commit fcee374

Browse files
committed
new_order: at most one remote request per database
1 parent f9c9824 commit fcee374

1 file changed

Lines changed: 119 additions & 55 deletions

File tree

modules/tpcc/src/new_order.rs

Lines changed: 119 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use std::collections::HashMap;
2+
13
use crate::{
24
district, find_customer_by_id, find_district, find_stock, find_warehouse, item, order_line, pack_order_key,
35
remote::{call_remote_reducer, remote_warehouse_home},
4-
stock, District, Item, OrderLine, Stock, DISTRICTS_PER_WAREHOUSE, TAX_SCALE,
6+
stock, District, Item, OrderLine, Stock, WarehouseId, DISTRICTS_PER_WAREHOUSE, TAX_SCALE,
57
};
68
use spacetimedb::{log_stopwatch::LogStopwatch, reducer, Identity, ReducerContext, SpacetimeType, Table, Timestamp};
79

@@ -71,51 +73,61 @@ pub fn new_order(
7173

7274
let all_local_warehouse = order_lines.iter().all(|order_line| order_line.supply_w_id == w_id);
7375

74-
let line_results = order_lines
76+
// TECHNICALLY NON-CONFORMANT: If we encounter a non-existent item in the order,
77+
// we'll short-circuit and exit here.
78+
// TPC-C technically requires, in 2.4.2.3, that we still retrieve and process all the valid item numbers.
79+
// This would be a horrendous pain to implement, so we won't.
80+
// We don't do the things the spec tells us it doesn't want us to do, namely:
81+
// - changing the execution of other steps
82+
// - using a different type of transaction
83+
// But we do skip inspecting some number of valid items and stocks.
84+
let items = order_lines
85+
.iter()
86+
.map(|line| find_item(ctx, line.item_id))
87+
.collect::<Result<Vec<Item>, String>>()?;
88+
89+
let (remote_order_lines, local_order_lines) = partition_lines_by_database(ctx, &order_lines);
90+
91+
let remote_order_outputs = remote_order_lines
7592
.into_iter()
76-
.enumerate()
77-
.map(|(idx, line)| {
78-
ensure!(line.quantity > 0, "order line quantity must be positive");
79-
80-
// TECHNICALLY NON-CONFORMANT: If we encounter a non-existent item in the order,
81-
// we'll short-circuit and exit here.
82-
// TPC-C technically requires, in 2.4.2.3, that we still retrieve and process all the valid item numbers.
83-
// This would be a horrendous pain to implement, so we won't.
84-
// We don't do the things the spec tells us it doesn't want us to do, namely:
85-
// - changing the execution of other steps
86-
// - using a different type of transaction
87-
// But we do skip inspecting some number of valid items and stocks.
88-
let item = find_item(ctx, line.item_id)?;
89-
90-
let is_remote_warehouse = w_id == line.supply_w_id;
91-
let supply_warehouse_id = line.supply_w_id;
92-
93-
let input = OrderItemInput {
94-
line: line.clone(),
93+
.map(|(remote_database_identity, lines)| {
94+
let input = OrderMultipleItemsInput {
95+
lines,
9596
district: d_id,
96-
is_remote_warehouse,
97+
terminal_warehouse: w_id,
9798
};
99+
call_remote_order_multiple_items_and_decrement_stock(ctx, remote_database_identity, input)
100+
})
101+
.collect::<Result<Vec<_>, String>>()?
102+
.into_iter()
103+
.flat_map(|v| v.into_iter())
104+
.collect::<Vec<OrderItemOutput>>();
98105

99-
let order_item_output = match remote_warehouse_home(ctx, supply_warehouse_id) {
100-
None => order_item_and_decrement_stock(ctx, input)?,
101-
Some(remote_database_identity) => {
102-
call_remote_order_item_and_decrement_stock(ctx, remote_database_identity, input)?
103-
}
104-
};
106+
let local_order_outputs = local_order_lines
107+
.into_iter()
108+
.map(|line| order_item_and_decrement_stock(ctx, line.line, line.index, d_id, w_id))
109+
.collect::<Result<Vec<_>, String>>()?;
105110

106-
Ok(ProcessedNewOrderItem {
107-
idx,
108-
line,
109-
item,
110-
district_stock_info: order_item_output.s_dist,
111-
stock_data: order_item_output.s_data,
112-
updated_quantity: order_item_output.updated_quantity,
113-
})
111+
let line_results = remote_order_outputs
112+
.into_iter()
113+
.chain(local_order_outputs)
114+
.map(|output| {
115+
(
116+
items[output.index as usize].clone(),
117+
order_lines[output.index as usize].clone(),
118+
output,
119+
)
114120
})
115-
.map(|processed_item| {
116-
processed_item.map(|processed_item| insert_order_line(ctx, w_id, d_id, order_id, processed_item))
121+
.map(|(item, line, order_item_output)| ProcessedNewOrderItem {
122+
index: order_item_output.index,
123+
line,
124+
item,
125+
district_stock_info: order_item_output.s_dist,
126+
stock_data: order_item_output.s_data,
127+
updated_quantity: order_item_output.updated_quantity,
117128
})
118-
.collect::<Result<Vec<_>, String>>()?;
129+
.map(|processed_item| insert_order_line(ctx, w_id, d_id, order_id, processed_item))
130+
.collect::<Vec<_>>();
119131

120132
let subtotal_cents = line_results.iter().map(|line_result| line_result.amount_cents).sum();
121133

@@ -139,16 +151,48 @@ pub fn new_order(
139151
})
140152
}
141153

142-
fn call_remote_order_item_and_decrement_stock(
154+
#[derive(SpacetimeType)]
155+
struct NewOrderLineAndIndex {
156+
line: NewOrderLineInput,
157+
index: u8,
158+
}
159+
160+
fn partition_lines_by_database(
161+
ctx: &ReducerContext,
162+
order_lines: &[NewOrderLineInput],
163+
) -> (HashMap<Identity, Vec<NewOrderLineAndIndex>>, Vec<NewOrderLineAndIndex>) {
164+
let mut remote_lines: HashMap<Identity, Vec<_>> = HashMap::new();
165+
let mut local_lines = Vec::with_capacity(order_lines.len());
166+
for (index, line) in order_lines.iter().cloned().enumerate() {
167+
let index = index as u8;
168+
if let Some(remote_database_identity) = remote_warehouse_home(ctx, line.supply_w_id) {
169+
remote_lines
170+
.entry(remote_database_identity)
171+
.or_default()
172+
.push(NewOrderLineAndIndex { line, index });
173+
} else {
174+
local_lines.push(NewOrderLineAndIndex { line, index });
175+
}
176+
}
177+
178+
(remote_lines, local_lines)
179+
}
180+
181+
fn call_remote_order_multiple_items_and_decrement_stock(
143182
ctx: &ReducerContext,
144183
remote_database_identity: Identity,
145-
input: OrderItemInput,
146-
) -> Result<OrderItemOutput, String> {
147-
call_remote_reducer(ctx, remote_database_identity, "order_item_and_decrement_stock", &input)
184+
input: OrderMultipleItemsInput,
185+
) -> Result<Vec<OrderItemOutput>, String> {
186+
call_remote_reducer(
187+
ctx,
188+
remote_database_identity,
189+
"order_multiple_items_and_decrement_stock",
190+
&input,
191+
)
148192
}
149193

150194
struct ProcessedNewOrderItem {
151-
idx: usize,
195+
index: u8,
152196
line: NewOrderLineInput,
153197
item: Item,
154198
district_stock_info: String,
@@ -164,7 +208,7 @@ fn insert_order_line(
164208
processed_item: ProcessedNewOrderItem,
165209
) -> NewOrderLineResult {
166210
let ProcessedNewOrderItem {
167-
idx,
211+
index,
168212
line,
169213
item,
170214
district_stock_info,
@@ -178,11 +222,11 @@ fn insert_order_line(
178222
"G"
179223
};
180224
tx.db.order_line().insert(OrderLine {
181-
order_line_key: pack_order_line_key(warehouse_id, district_id, order_id, (idx + 1) as u8),
225+
order_line_key: pack_order_line_key(warehouse_id, district_id, order_id, index + 1),
182226
ol_w_id: warehouse_id,
183227
ol_d_id: district_id,
184228
ol_o_id: order_id,
185-
ol_number: (idx + 1) as u8,
229+
ol_number: index + 1,
186230
ol_i_id: line.item_id,
187231
ol_supply_w_id: line.supply_w_id,
188232
ol_delivery_d: None,
@@ -208,23 +252,42 @@ pub struct OrderItemOutput {
208252
s_dist: String,
209253
s_data: String,
210254
updated_quantity: i32,
255+
index: u8,
211256
}
212257

213258
#[derive(SpacetimeType)]
214-
pub struct OrderItemInput {
215-
line: NewOrderLineInput,
259+
pub struct OrderMultipleItemsInput {
260+
lines: Vec<NewOrderLineAndIndex>,
216261
district: u8,
217-
is_remote_warehouse: bool,
262+
terminal_warehouse: WarehouseId,
218263
}
219264

220265
#[reducer]
221-
pub fn order_item_and_decrement_stock(ctx: &ReducerContext, input: OrderItemInput) -> Result<OrderItemOutput, String> {
222-
let _timer = LogStopwatch::new("order_item_and_decrement_stock");
223-
let OrderItemInput {
224-
line,
266+
fn order_multiple_items_and_decrement_stocks(
267+
ctx: &ReducerContext,
268+
input: OrderMultipleItemsInput,
269+
) -> Result<Vec<OrderItemOutput>, String> {
270+
let _timer = LogStopwatch::new("order_multiple_items_and_decrement_stock");
271+
let OrderMultipleItemsInput {
272+
lines,
225273
district,
226-
is_remote_warehouse,
274+
terminal_warehouse,
227275
} = input;
276+
lines
277+
.into_iter()
278+
.map(|line| order_item_and_decrement_stock(ctx, line.line, line.index, district, terminal_warehouse))
279+
.collect()
280+
}
281+
282+
fn order_item_and_decrement_stock(
283+
ctx: &ReducerContext,
284+
line: NewOrderLineInput,
285+
index: u8,
286+
district: u8,
287+
terminal_warehouse: WarehouseId,
288+
) -> Result<OrderItemOutput, String> {
289+
let is_remote_warehouse = terminal_warehouse == line.supply_w_id;
290+
228291
let stock = find_stock(ctx, line.supply_w_id, line.item_id)?;
229292

230293
let ordered_quantity = line.quantity;
@@ -234,6 +297,7 @@ pub fn order_item_and_decrement_stock(ctx: &ReducerContext, input: OrderItemInpu
234297
s_dist: district_stock_info(&stock, district),
235298
s_data: stock.s_data.clone(),
236299
updated_quantity,
300+
index,
237301
};
238302

239303
ctx.db.stock().stock_key().update(Stock {

0 commit comments

Comments
 (0)