@@ -16,8 +16,7 @@ use crate::protocol::{
1616 RegisterDriverRequest , RegisterDriverResponse , RunSchedule , ScheduleResponse , SubmitSummaryRequest ,
1717} ;
1818use crate :: summary:: {
19- log_driver_summary, write_json, DriverSummary , DriverSummaryMeta , SharedMetrics , TransactionKind ,
20- TransactionRecord ,
19+ log_driver_summary, write_json, DriverSummary , DriverSummaryMeta , SharedMetrics , TransactionKind , TransactionRecord ,
2120} ;
2221use crate :: topology:: DatabaseTopology ;
2322use crate :: tpcc:: * ;
@@ -27,6 +26,7 @@ struct TerminalRuntime {
2726 metrics : SharedMetrics ,
2827 abort : Arc < AtomicBool > ,
2928 start_logged : Arc < AtomicBool > ,
29+ startup_progress : Arc < StartupProgress > ,
3030 request_ids : Arc < AtomicU64 > ,
3131 schedule : RunSchedule ,
3232 run_constants : RunConstants ,
@@ -35,6 +35,24 @@ struct TerminalRuntime {
3535 seed : u64 ,
3636}
3737
38+ struct StartupProgress {
39+ primary_connect_started : AtomicU64 ,
40+ primary_connect_ready : AtomicU64 ,
41+ metrics_connect_started : AtomicU64 ,
42+ metrics_connect_ready : AtomicU64 ,
43+ }
44+
45+ impl StartupProgress {
46+ fn new ( ) -> Self {
47+ Self {
48+ primary_connect_started : AtomicU64 :: new ( 0 ) ,
49+ primary_connect_ready : AtomicU64 :: new ( 0 ) ,
50+ metrics_connect_started : AtomicU64 :: new ( 0 ) ,
51+ metrics_connect_ready : AtomicU64 :: new ( 0 ) ,
52+ }
53+ }
54+ }
55+
3856struct TransactionContext < ' a > {
3957 client : & ' a ModuleClient ,
4058 config : & ' a DriverConfig ,
@@ -65,8 +83,10 @@ pub async fn run(config: DriverConfig) -> Result<()> {
6583
6684 let abort = Arc :: new ( AtomicBool :: new ( false ) ) ;
6785 let start_logged = Arc :: new ( AtomicBool :: new ( false ) ) ;
86+ let startup_progress = Arc :: new ( StartupProgress :: new ( ) ) ;
6887 let request_ids = Arc :: new ( AtomicU64 :: new ( 1 ) ) ;
6988 let mut tasks = JoinSet :: new ( ) ;
89+ let terminal_count = u64:: from ( config. terminals ( ) ) ;
7090
7191 log:: info!(
7292 "driver {} ready for run {}: warehouses {}..={} terminals={} warmup_start_ms={} measure_start_ms={} measure_end_ms={}" ,
@@ -79,6 +99,55 @@ pub async fn run(config: DriverConfig) -> Result<()> {
7999 schedule. measure_start_ms,
80100 schedule. measure_end_ms
81101 ) ;
102+ log:: info!(
103+ "driver {} launching {} terminal task(s); this run will attempt {} primary database connections and {} metrics connections" ,
104+ config. driver_id,
105+ terminal_count,
106+ terminal_count,
107+ terminal_count
108+ ) ;
109+
110+ {
111+ let reporter_driver_id = config. driver_id . clone ( ) ;
112+ let reporter_run_id = run_id. clone ( ) ;
113+ let reporter_start_logged = start_logged. clone ( ) ;
114+ let reporter_abort = abort. clone ( ) ;
115+ let reporter_progress = startup_progress. clone ( ) ;
116+ tokio:: spawn ( async move {
117+ loop {
118+ if reporter_abort. load ( Ordering :: Relaxed ) || reporter_start_logged. load ( Ordering :: Relaxed ) {
119+ break ;
120+ }
121+
122+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
123+ if reporter_abort. load ( Ordering :: Relaxed ) || reporter_start_logged. load ( Ordering :: Relaxed ) {
124+ break ;
125+ }
126+
127+ let primary_started = reporter_progress. primary_connect_started . load ( Ordering :: Relaxed ) ;
128+ let primary_ready = reporter_progress. primary_connect_ready . load ( Ordering :: Relaxed ) ;
129+ let metrics_started = reporter_progress. metrics_connect_started . load ( Ordering :: Relaxed ) ;
130+ let metrics_ready = reporter_progress. metrics_connect_ready . load ( Ordering :: Relaxed ) ;
131+ log:: info!(
132+ "driver {} startup progress for run {}: primary_connect_started={}/{} primary_connected={}/{} metrics_connect_started={}/{} metrics_connected={}/{}" ,
133+ reporter_driver_id,
134+ reporter_run_id,
135+ primary_started,
136+ terminal_count,
137+ primary_ready,
138+ terminal_count,
139+ metrics_started,
140+ terminal_count,
141+ metrics_ready,
142+ terminal_count
143+ ) ;
144+
145+ if metrics_ready >= terminal_count {
146+ break ;
147+ }
148+ }
149+ } ) ;
150+ }
82151
83152 for warehouse_id in config. warehouse_start ..=config. warehouse_end ( ) {
84153 let database_identity = topology. identity_for_warehouse ( warehouse_id) ?;
@@ -101,6 +170,7 @@ pub async fn run(config: DriverConfig) -> Result<()> {
101170 metrics : terminal_metrics,
102171 abort : terminal_abort,
103172 start_logged : terminal_start_logged,
173+ startup_progress : startup_progress. clone ( ) ,
104174 request_ids : terminal_request_ids,
105175 schedule : terminal_schedule,
106176 run_constants : terminal_constants,
@@ -119,13 +189,26 @@ pub async fn run(config: DriverConfig) -> Result<()> {
119189 Ok ( Err ( err) ) => {
120190 abort. store ( true , Ordering :: Relaxed ) ;
121191 if first_error. is_none ( ) {
192+ log:: error!(
193+ "driver {} aborting run {} after terminal task error: {err:#}" ,
194+ config. driver_id,
195+ run_id
196+ ) ;
122197 first_error = Some ( err) ;
198+ tasks. abort_all ( ) ;
123199 }
124200 }
125201 Err ( err) => {
126202 abort. store ( true , Ordering :: Relaxed ) ;
127203 if first_error. is_none ( ) {
128- first_error = Some ( anyhow ! ( "terminal task failed: {}" , err) ) ;
204+ let err = anyhow ! ( "terminal task failed: {}" , err) ;
205+ log:: error!(
206+ "driver {} aborting run {} after terminal task failure: {err:#}" ,
207+ config. driver_id,
208+ run_id
209+ ) ;
210+ first_error = Some ( err) ;
211+ tasks. abort_all ( ) ;
129212 }
130213 }
131214 }
@@ -165,15 +248,20 @@ async fn run_terminal(runtime: TerminalRuntime) -> Result<()> {
165248 metrics,
166249 abort,
167250 start_logged,
251+ startup_progress,
168252 request_ids,
169253 schedule,
170254 run_constants,
171255 assignment,
172256 database_identity,
173257 seed,
174258 } = runtime;
259+ startup_progress. primary_connect_started . fetch_add ( 1 , Ordering :: Relaxed ) ;
175260 let client = ModuleClient :: connect_async ( & config. connection , database_identity) . await ?;
261+ startup_progress. primary_connect_ready . fetch_add ( 1 , Ordering :: Relaxed ) ;
262+ startup_progress. metrics_connect_started . fetch_add ( 1 , Ordering :: Relaxed ) ;
176263 let metrics_client = connect_metrics_module_async ( & config. connection ) . await ?;
264+ startup_progress. metrics_connect_ready . fetch_add ( 1 , Ordering :: Relaxed ) ;
177265 log:: info!(
178266 "driver {} terminal {} connected to {} for warehouse {} district {}" ,
179267 config. driver_id,
@@ -582,8 +670,7 @@ async fn resolve_driver_setup(config: DriverConfig) -> Result<(DriverConfig, Run
582670 let response = match response {
583671 Some ( response) => response,
584672 None => {
585- return Err ( last_error
586- . unwrap_or_else ( || anyhow ! ( "driver registration failed without an error" ) ) ) ;
673+ return Err ( last_error. unwrap_or_else ( || anyhow ! ( "driver registration failed without an error" ) ) ) ;
587674 }
588675 } ;
589676 if !response. accepted {
0 commit comments