@@ -7,6 +7,7 @@ use std::collections::BTreeMap;
77use std:: fs;
88use std:: path:: Path ;
99use std:: sync:: Arc ;
10+ use std:: time:: Duration ;
1011
1112use crate :: config:: CoordinatorConfig ;
1213use crate :: metrics_module_bindings:: reset_reducer:: reset;
@@ -15,7 +16,9 @@ use crate::protocol::{
1516 DriverAssignment , RegisterDriverRequest , RegisterDriverResponse , RunSchedule , ScheduleResponse ,
1617 SubmitSummaryRequest ,
1718} ;
18- use crate :: summary:: { aggregate_summaries, now_millis, write_json, AggregateSummary , DriverSummary } ;
19+ use crate :: summary:: {
20+ aggregate_summaries, log_aggregate_summary, now_millis, write_json, AggregateSummary , DriverSummary ,
21+ } ;
1922
2023#[ derive( Clone ) ]
2124struct AppState {
@@ -66,8 +69,8 @@ async fn register_driver(
6669 Json ( request) : Json < RegisterDriverRequest > ,
6770) -> Json < RegisterDriverResponse > {
6871 let mut inner = state. inner . lock ( ) ;
69- let assignment = match inner. registrations . get ( & request. driver_id ) {
70- Some ( existing) => existing. assignment . clone ( ) ,
72+ let ( assignment, is_new_registration ) = match inner. registrations . get ( & request. driver_id ) {
73+ Some ( existing) => ( existing. assignment . clone ( ) , false ) ,
7174 None => {
7275 if inner. registration_order . len ( ) >= inner. config . expected_drivers {
7376 return Json ( RegisterDriverResponse {
@@ -84,10 +87,30 @@ async fn register_driver(
8487 assignment : assignment. clone ( ) ,
8588 } ,
8689 ) ;
87- assignment
90+ ( assignment, true )
8891 }
8992 } ;
9093 maybe_create_schedule ( & mut inner) ;
94+ let registered = inner. registrations . len ( ) ;
95+ let warehouse_end = assignment_end ( & assignment) ;
96+ if is_new_registration {
97+ log:: info!(
98+ "driver {} registered and ready ({}/{}): warehouses {}..={} ({} warehouse(s))" ,
99+ request. driver_id,
100+ registered,
101+ inner. config. expected_drivers,
102+ assignment. warehouse_start,
103+ warehouse_end,
104+ assignment. driver_warehouse_count
105+ ) ;
106+ } else {
107+ log:: info!(
108+ "driver {} re-registered and remains ready: warehouses {}..={}" ,
109+ request. driver_id,
110+ assignment. warehouse_start,
111+ warehouse_end
112+ ) ;
113+ }
91114 Json ( RegisterDriverResponse {
92115 accepted : true ,
93116 assignment : Some ( assignment) ,
@@ -108,16 +131,32 @@ async fn submit_summary(
108131) -> Result < Json < AggregateSummary > , axum:: http:: StatusCode > {
109132 let aggregate = {
110133 let mut inner = state. inner . lock ( ) ;
111- inner
134+ let replaced = inner
112135 . summaries
113- . insert ( request. summary . driver_id . clone ( ) , request. summary . clone ( ) ) ;
114- if inner. summaries . len ( ) == inner. config . expected_drivers {
136+ . insert ( request. summary . driver_id . clone ( ) , request. summary . clone ( ) )
137+ . is_some ( ) ;
138+ let received = inner. summaries . len ( ) ;
139+ log:: info!(
140+ "received summary from driver {} ({}/{}{})" ,
141+ request. summary. driver_id,
142+ received,
143+ inner. config. expected_drivers,
144+ if replaced { ", replaced existing summary" } else { "" }
145+ ) ;
146+ if received == inner. config . expected_drivers {
115147 let summaries: Vec < _ > = inner. summaries . values ( ) . cloned ( ) . collect ( ) ;
116148 let aggregate = aggregate_summaries ( inner. config . run_id . clone ( ) , & summaries) ;
149+ let summary_path = aggregate_summary_path ( & inner. config . output_dir , & aggregate) ;
117150 if let Err ( err) = write_aggregate ( & inner. config . output_dir , & aggregate) {
118151 log:: error!( "failed to write aggregate summary: {err:#}" ) ;
119152 return Err ( axum:: http:: StatusCode :: INTERNAL_SERVER_ERROR ) ;
120153 }
154+ log:: info!(
155+ "received all {} summary(s) for run {}" ,
156+ inner. config. expected_drivers,
157+ inner. config. run_id
158+ ) ;
159+ log_aggregate_summary ( & aggregate, & summary_path) ;
121160 aggregate
122161 } else {
123162 aggregate_summaries (
@@ -142,18 +181,23 @@ fn maybe_create_schedule(inner: &mut CoordinatorState) {
142181 . reducers
143182 . reset ( inner. config . warmup_secs * 1000 , measure_start_ms, measure_end_ms) ;
144183
145- inner . schedule = Some ( RunSchedule {
184+ let schedule = RunSchedule {
146185 run_id : inner. config . run_id . clone ( ) ,
147186 warmup_start_ms,
148187 measure_start_ms,
149188 measure_end_ms,
150189 stop_ms : measure_end_ms,
151- } ) ;
190+ } ;
191+ inner. schedule = Some ( schedule. clone ( ) ) ;
152192 log:: info!(
153- "all {} driver(s) registered; schedule ready for run {}" ,
193+ "all {} driver(s) registered; schedule ready for run {} (warmup_start_ms={} measure_start_ms={} measure_end_ms={}) " ,
154194 inner. config. expected_drivers,
155- inner. config. run_id
195+ inner. config. run_id,
196+ warmup_start_ms,
197+ measure_start_ms,
198+ measure_end_ms
156199 ) ;
200+ tokio:: spawn ( log_schedule_events ( schedule) ) ;
157201}
158202
159203fn assignment_for_index ( config : & CoordinatorConfig , index : usize ) -> DriverAssignment {
@@ -173,7 +217,35 @@ fn assignment_for_index(config: &CoordinatorConfig, index: usize) -> DriverAssig
173217}
174218
175219fn write_aggregate ( output_dir : & Path , aggregate : & AggregateSummary ) -> Result < ( ) > {
176- let run_dir = output_dir. join ( & aggregate. run_id ) ;
177- fs:: create_dir_all ( & run_dir) . with_context ( || format ! ( "failed to create {}" , run_dir. display( ) ) ) ?;
178- write_json ( & run_dir. join ( "summary.json" ) , aggregate)
220+ let summary_path = aggregate_summary_path ( output_dir, aggregate) ;
221+ if let Some ( run_dir) = summary_path. parent ( ) {
222+ fs:: create_dir_all ( run_dir) . with_context ( || format ! ( "failed to create {}" , run_dir. display( ) ) ) ?;
223+ }
224+ write_json ( & summary_path, aggregate)
225+ }
226+
227+ fn aggregate_summary_path ( output_dir : & Path , aggregate : & AggregateSummary ) -> std:: path:: PathBuf {
228+ output_dir. join ( & aggregate. run_id ) . join ( "summary.json" )
229+ }
230+
231+ fn assignment_end ( assignment : & DriverAssignment ) -> u32 {
232+ assignment
233+ . warehouse_start
234+ . saturating_add ( assignment. driver_warehouse_count . saturating_sub ( 1 ) )
235+ }
236+
237+ async fn log_schedule_events ( schedule : RunSchedule ) {
238+ sleep_until_ms ( schedule. warmup_start_ms ) . await ;
239+ log:: info!( "run {} warmup started" , schedule. run_id) ;
240+ sleep_until_ms ( schedule. measure_start_ms ) . await ;
241+ log:: info!( "run {} measurement started" , schedule. run_id) ;
242+ sleep_until_ms ( schedule. measure_end_ms ) . await ;
243+ log:: info!( "run {} measurement ended" , schedule. run_id) ;
244+ }
245+
246+ async fn sleep_until_ms ( target_ms : u64 ) {
247+ let now_ms = now_millis ( ) ;
248+ if target_ms > now_ms {
249+ tokio:: time:: sleep ( Duration :: from_millis ( target_ms - now_ms) ) . await ;
250+ }
179251}
0 commit comments