@@ -76,6 +76,84 @@ type RowCountFn = Arc<dyn Fn(TableId, &str) -> i64 + Send + Sync>;
7676/// The type of transactions committed by [RelationalDB].
7777pub type Txdata = commitlog:: payload:: Txdata < ProductValue > ;
7878
79+ /// A buffered durability request, held behind the persistence barrier.
80+ pub struct BufferedDurabilityRequest {
81+ pub reducer_context : Option < ReducerContext > ,
82+ pub tx_data : Arc < TxData > ,
83+ }
84+
85+ /// The persistence barrier prevents durability requests from being sent to the
86+ /// durability worker while a 2PC PREPARE is pending.
87+ ///
88+ /// When active:
89+ /// - The PREPARE's own durability request has already been sent to the worker.
90+ /// - All subsequent durability requests are buffered here.
91+ /// - Once the PREPARE is confirmed durable and a COMMIT/ABORT decision is made:
92+ /// - COMMIT: buffered requests are flushed to the worker.
93+ /// - ABORT: buffered requests are discarded.
94+ #[ derive( Default ) ]
95+ pub struct PersistenceBarrier {
96+ inner : std:: sync:: Mutex < PersistenceBarrierInner > ,
97+ }
98+
99+ #[ derive( Default ) ]
100+ struct PersistenceBarrierInner {
101+ /// If Some, a PREPARE is pending at this offset. All durability requests
102+ /// are buffered until the barrier is lifted.
103+ active_prepare : Option < TxOffset > ,
104+ /// Buffered durability requests that arrived while the barrier was active.
105+ buffered : Vec < BufferedDurabilityRequest > ,
106+ }
107+
108+ impl PersistenceBarrier {
109+ pub fn new ( ) -> Self {
110+ Self :: default ( )
111+ }
112+
113+ /// Activate the barrier for a PREPARE at the given offset.
114+ pub fn activate ( & self , prepare_offset : TxOffset ) {
115+ let mut inner = self . inner . lock ( ) . unwrap ( ) ;
116+ assert ! (
117+ inner. active_prepare. is_none( ) ,
118+ "persistence barrier already active at offset {:?}, cannot activate for {prepare_offset}" ,
119+ inner. active_prepare,
120+ ) ;
121+ inner. active_prepare = Some ( prepare_offset) ;
122+ inner. buffered . clear ( ) ;
123+ }
124+
125+ /// If the barrier is active, buffer the durability request and return None.
126+ /// If the barrier is not active, return the arguments back unchanged.
127+ pub fn try_buffer (
128+ & self ,
129+ reducer_context : Option < ReducerContext > ,
130+ tx_data : & Arc < TxData > ,
131+ ) -> Option < Option < ReducerContext > > {
132+ let mut inner = self . inner . lock ( ) . unwrap ( ) ;
133+ if inner. active_prepare . is_some ( ) {
134+ inner. buffered . push ( BufferedDurabilityRequest {
135+ reducer_context,
136+ tx_data : tx_data. clone ( ) ,
137+ } ) ;
138+ None // buffered
139+ } else {
140+ Some ( reducer_context) // not buffered, return back
141+ }
142+ }
143+
144+ /// Deactivate the barrier and return the buffered requests.
145+ pub fn deactivate ( & self ) -> Vec < BufferedDurabilityRequest > {
146+ let mut inner = self . inner . lock ( ) . unwrap ( ) ;
147+ inner. active_prepare = None ;
148+ std:: mem:: take ( & mut inner. buffered )
149+ }
150+
151+ /// Check if the barrier is currently active.
152+ pub fn is_active ( & self ) -> bool {
153+ self . inner . lock ( ) . unwrap ( ) . active_prepare . is_some ( )
154+ }
155+ }
156+
79157/// We've added a module version field to the system tables, but we don't yet
80158/// have the infrastructure to support multiple versions.
81159/// All modules are currently locked to this version, but this will be
@@ -114,7 +192,7 @@ pub struct RelationalDB {
114192
115193 /// 2PC persistence barrier. When active, durability requests are buffered
116194 /// instead of being sent to the durability worker.
117- persistence_barrier : crate :: host :: prepared_tx :: PersistenceBarrier ,
195+ persistence_barrier : PersistenceBarrier ,
118196}
119197
120198/// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions.
@@ -179,7 +257,7 @@ impl RelationalDB {
179257
180258 workload_type_to_exec_counters,
181259 metrics_recorder_queue,
182- persistence_barrier : crate :: host :: prepared_tx :: PersistenceBarrier :: new ( ) ,
260+ persistence_barrier : PersistenceBarrier :: new ( ) ,
183261 }
184262 }
185263
@@ -934,7 +1012,7 @@ impl RelationalDB {
9341012 }
9351013
9361014 /// Get a reference to the persistence barrier (for 2PC).
937- pub fn persistence_barrier ( & self ) -> & crate :: host :: prepared_tx :: PersistenceBarrier {
1015+ pub fn persistence_barrier ( & self ) -> & PersistenceBarrier {
9381016 & self . persistence_barrier
9391017 }
9401018
0 commit comments