@@ -6,6 +6,10 @@ class Synchronizer
66 include SplitIoClient ::Cache ::Fetchers
77 include SplitIoClient ::Cache ::Senders
88
9+ ON_DEMAND_FETCH_BACKOFF_BASE_MS = 10_000
10+ ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS = 60_000
11+ ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
12+
913 def initialize (
1014 repositories ,
1115 api_key ,
@@ -25,6 +29,7 @@ def initialize(
2529 @impressions_api = SplitIoClient ::Api ::Impressions . new ( @api_key , @config , params [ :telemetry_runtime_producer ] )
2630 @impression_counter = params [ :imp_counter ]
2731 @telemetry_synchronizer = params [ :telemetry_synchronizer ]
32+ @backoff = new SSE ::EventSource ::BackOff . new ( )
2833 end
2934
3035 def sync_all
@@ -52,10 +57,40 @@ def stop_periodic_fetch
5257 @segment_fetcher . stop_segments_thread
5358 end
5459
55- def fetch_splits
60+ def fetch_splits ( target_change_number )
61+ return if target_change_number <= @splits_repository . get_change_number
62+
5663 fetch_options = { cache_control_headers : true , till : nil }
57- segment_names = @split_fetcher . fetch_splits ( fetch_options )
58- @segment_fetcher . fetch_segments_if_not_exists ( segment_names , true ) unless segment_names . empty?
64+
65+ result = attempt_splits_sync ( target_change_number ,
66+ fetch_options ,
67+ @config . on_demand_fetch_max_retries ,
68+ @config . on_demand_fetch_retry_delay_ms )
69+
70+ attempts = @config . on_demand_fetch_max_retries - result [ :remaining_attempts ]
71+ if result [ :success ]
72+ @segment_fetcher . fetch_segments_if_not_exists ( result [ :segment_names ] , true ) unless result [ :segment_names ] . empty?
73+ @config . logger . debug ( "Refresh completed in #{ attempts } attempts." ) if @config . debug_enabled
74+
75+ return
76+ end
77+
78+ fetch_options [ :till ] = target_change_number
79+ result = attempt_splits_sync ( target_change_number ,
80+ fetch_options ,
81+ ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES ,
82+ @config . on_demand_fetch_retry_delay_ms )
83+
84+ attempts = @config . on_demand_fetch_max_retries - result [ :remaining_attempts ]
85+
86+ if result [ :success ]
87+ @segment_fetcher . fetch_segments_if_not_exists ( result [ :segment_names ] , true ) unless result [ :segment_names ] . empty?
88+ @config . logger . debug ( "Refresh completed bypassing the CDN in #{ attempts } attempts." )
89+ else
90+ @config . logger . debug ( "No changes fetched after #{ attempts } attempts with CDN bypassed." )
91+ end
92+ rescue StandardError => error
93+ @config . log_found_exception ( __method__ . to_s , error )
5994 end
6095
6196 def fetch_segment ( name )
@@ -65,6 +100,21 @@ def fetch_segment(name)
65100
66101 private
67102
103+ def attempt_splits_sync ( target_cn , fetch_options , max_retries , retry_delay_ms )
104+ remaining_attempts = max_retries
105+
106+ loop do
107+ remaining_attempts -= 1
108+
109+ segment_names = @split_fetcher . fetch_splits ( fetch_options )
110+
111+ return split_sync_result ( true , remaining_attempts , segment_names ) if target_cn <= @splits_repository . get_change_number
112+ return split_sync_result ( false , remaining_attempts , segment_names ) if remaining_attempts <= 0
113+
114+ sleep ( retry_delay_ms )
115+ end
116+ end
117+
68118 def fetch_segments
69119 @segment_fetcher . fetch_segments
70120 end
@@ -87,6 +137,10 @@ def impressions_count_sender
87137 def start_telemetry_sync_task
88138 Telemetry ::SyncTask . new ( @config , @telemetry_synchronizer ) . call
89139 end
140+
141+ def split_sync_result ( success , remaining_attempts , segment_names )
142+ { success : success , remaining_attempts : remaining_attempts , segment_names : segment_names }
143+ end
90144 end
91145 end
92146end
0 commit comments