@@ -6,6 +6,10 @@ class Synchronizer
66 include SplitIoClient ::Cache ::Fetchers
77 include SplitIoClient ::Cache ::Senders
88
9+ ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS = 10
10+ ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS = 60
11+ ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
12+
913 def initialize (
1014 repositories ,
1115 api_key ,
@@ -52,10 +56,42 @@ def stop_periodic_fetch
5256 @segment_fetcher . stop_segments_thread
5357 end
5458
55- def fetch_splits
59+ def fetch_splits ( target_change_number )
60+ return if target_change_number <= @splits_repository . get_change_number . to_i
61+
5662 fetch_options = { cache_control_headers : true , till : nil }
57- segment_names = @split_fetcher . fetch_splits ( fetch_options )
58- @segment_fetcher . fetch_segments_if_not_exists ( segment_names , true ) unless segment_names . empty?
63+
64+ result = attempt_splits_sync ( target_change_number ,
65+ fetch_options ,
66+ @config . on_demand_fetch_max_retries ,
67+ @config . on_demand_fetch_retry_delay_seconds ,
68+ false )
69+
70+ attempts = @config . on_demand_fetch_max_retries - result [ :remaining_attempts ]
71+ if result [ :success ]
72+ @segment_fetcher . fetch_segments_if_not_exists ( result [ :segment_names ] , true ) unless result [ :segment_names ] . empty?
73+ @config . logger . debug ( "Refresh completed in #{ attempts } attempts." ) if @config . debug_enabled
74+
75+ return
76+ end
77+
78+ fetch_options [ :till ] = target_change_number
79+ result = attempt_splits_sync ( target_change_number ,
80+ fetch_options ,
81+ ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES ,
82+ nil ,
83+ true )
84+
85+ attempts = @config . on_demand_fetch_max_retries - result [ :remaining_attempts ]
86+
87+ if result [ :success ]
88+ @segment_fetcher . fetch_segments_if_not_exists ( result [ :segment_names ] , true ) unless result [ :segment_names ] . empty?
89+ @config . logger . debug ( "Refresh completed bypassing the CDN in #{ attempts } attempts." )
90+ else
91+ @config . logger . debug ( "No changes fetched after #{ attempts } attempts with CDN bypassed." )
92+ end
93+ rescue StandardError => error
94+ @config . log_found_exception ( __method__ . to_s , error )
5995 end
6096
6197 def fetch_segment ( name )
@@ -65,6 +101,23 @@ def fetch_segment(name)
65101
66102 private
67103
104+ def attempt_splits_sync ( target_cn , fetch_options , max_retries , retry_delay_seconds , with_backoff )
105+ remaining_attempts = max_retries
106+ backoff = SSE ::EventSource ::BackOff . new ( ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS , 0 , ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS ) if with_backoff
107+
108+ loop do
109+ remaining_attempts -= 1
110+
111+ segment_names = @split_fetcher . fetch_splits ( fetch_options )
112+
113+ return split_sync_result ( true , remaining_attempts , segment_names ) if target_cn <= @splits_repository . get_change_number
114+ return split_sync_result ( false , remaining_attempts , segment_names ) if remaining_attempts <= 0
115+
116+ delay = with_backoff ? backoff . interval : retry_delay_seconds
117+ sleep ( delay )
118+ end
119+ end
120+
68121 def fetch_segments
69122 @segment_fetcher . fetch_segments
70123 end
@@ -87,6 +140,10 @@ def impressions_count_sender
87140 def start_telemetry_sync_task
88141 Telemetry ::SyncTask . new ( @config , @telemetry_synchronizer ) . call
89142 end
143+
144+ def split_sync_result ( success , remaining_attempts , segment_names )
145+ { success : success , remaining_attempts : remaining_attempts , segment_names : segment_names }
146+ end
90147 end
91148 end
92149end
0 commit comments