@@ -6,7 +6,9 @@ class Synchronizer
66 include SplitIoClient ::Cache ::Fetchers
77 include SplitIoClient ::Cache ::Senders
88
9- FORCE_CACHE_CONTROL_HEADERS = true
9+ ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS = 10
10+ ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS = 60
11+ ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
1012
1113 def initialize (
1214 repositories ,
@@ -54,17 +56,116 @@ def stop_periodic_fetch
5456 @segment_fetcher . stop_segments_thread
5557 end
5658
57- def fetch_splits
58- segment_names = @split_fetcher . fetch_splits ( FORCE_CACHE_CONTROL_HEADERS )
59- @segment_fetcher . fetch_segments_if_not_exists ( segment_names , FORCE_CACHE_CONTROL_HEADERS ) unless segment_names . empty?
59+ def fetch_splits ( target_change_number )
60+ return if target_change_number <= @splits_repository . get_change_number . to_i
61+
62+ fetch_options = { cache_control_headers : true , till : nil }
63+
64+ result = attempt_splits_sync ( target_change_number ,
65+ fetch_options ,
66+ @config . on_demand_fetch_max_retries ,
67+ @config . on_demand_fetch_retry_delay_seconds ,
68+ false )
69+
70+ attempts = @config . on_demand_fetch_max_retries - result [ :remaining_attempts ]
71+ if result [ :success ]
72+ @segment_fetcher . fetch_segments_if_not_exists ( result [ :segment_names ] , true ) unless result [ :segment_names ] . empty?
73+ @config . logger . debug ( "Refresh completed in #{ attempts } attempts." ) if @config . debug_enabled
74+
75+ return
76+ end
77+
78+ fetch_options [ :till ] = target_change_number
79+ result = attempt_splits_sync ( target_change_number ,
80+ fetch_options ,
81+ ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES ,
82+ nil ,
83+ true )
84+
85+ attempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - result [ :remaining_attempts ]
86+
87+ if result [ :success ]
88+ @segment_fetcher . fetch_segments_if_not_exists ( result [ :segment_names ] , true ) unless result [ :segment_names ] . empty?
89+ @config . logger . debug ( "Refresh completed bypassing the CDN in #{ attempts } attempts." ) if @config . debug_enabled
90+ else
91+ @config . logger . debug ( "No changes fetched after #{ attempts } attempts with CDN bypassed." ) if @config . debug_enabled
92+ end
93+ rescue StandardError => error
94+ @config . log_found_exception ( __method__ . to_s , error )
6095 end
6196
62- def fetch_segment ( name )
63- @segment_fetcher . fetch_segment ( name , FORCE_CACHE_CONTROL_HEADERS )
97+ def fetch_segment ( name , target_change_number )
98+ return if target_change_number <= @segments_repository . get_change_number ( name ) . to_i
99+
100+ fetch_options = { cache_control_headers : true , till : nil }
101+ result = attempt_segment_sync ( name ,
102+ target_change_number ,
103+ fetch_options ,
104+ @config . on_demand_fetch_max_retries ,
105+ @config . on_demand_fetch_retry_delay_seconds ,
106+ false )
107+
108+ attempts = @config . on_demand_fetch_max_retries - result [ :remaining_attempts ]
109+ if result [ :success ]
110+ @config . logger . debug ( "Segment #{ name } refresh completed in #{ attempts } attempts." ) if @config . debug_enabled
111+
112+ return
113+ end
114+
115+ fetch_options = { cache_control_headers : true , till : target_change_number }
116+ result = attempt_segment_sync ( name ,
117+ target_change_number ,
118+ fetch_options ,
119+ ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES ,
120+ nil ,
121+ true )
122+
123+ attempts = @config . on_demand_fetch_max_retries - result [ :remaining_attempts ]
124+ if result [ :success ]
125+ @config . logger . debug ( "Segment #{ name } refresh completed bypassing the CDN in #{ attempts } attempts." ) if @config . debug_enabled
126+ else
127+ @config . logger . debug ( "No changes fetched for segment #{ name } after #{ attempts } attempts with CDN bypassed." ) if @config . debug_enabled
128+ end
129+ rescue StandardError => error
130+ @config . log_found_exception ( __method__ . to_s , error )
64131 end
65132
66133 private
67134
135+ def attempt_segment_sync ( name , target_cn , fetch_options , max_retries , retry_delay_seconds , with_backoff )
136+ remaining_attempts = max_retries
137+ backoff = Engine ::BackOff . new ( ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS , 0 , ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS ) if with_backoff
138+
139+ loop do
140+ remaining_attempts -= 1
141+
142+ @segment_fetcher . fetch_segment ( name , fetch_options )
143+
144+ return sync_result ( true , remaining_attempts ) if target_cn <= @segments_repository . get_change_number ( name ) . to_i
145+ return sync_result ( false , remaining_attempts ) if remaining_attempts <= 0
146+
147+ delay = with_backoff ? backoff . interval : retry_delay_seconds
148+ sleep ( delay )
149+ end
150+ end
151+
152+ def attempt_splits_sync ( target_cn , fetch_options , max_retries , retry_delay_seconds , with_backoff )
153+ remaining_attempts = max_retries
154+ backoff = Engine ::BackOff . new ( ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS , 0 , ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS ) if with_backoff
155+
156+ loop do
157+ remaining_attempts -= 1
158+
159+ segment_names = @split_fetcher . fetch_splits ( fetch_options )
160+
161+ return sync_result ( true , remaining_attempts , segment_names ) if target_cn <= @splits_repository . get_change_number
162+ return sync_result ( false , remaining_attempts , segment_names ) if remaining_attempts <= 0
163+
164+ delay = with_backoff ? backoff . interval : retry_delay_seconds
165+ sleep ( delay )
166+ end
167+ end
168+
68169 def fetch_segments
69170 @segment_fetcher . fetch_segments
70171 end
@@ -87,6 +188,10 @@ def impressions_count_sender
87188 def start_telemetry_sync_task
88189 Telemetry ::SyncTask . new ( @config , @telemetry_synchronizer ) . call
89190 end
191+
192+ def sync_result ( success , remaining_attempts , segment_names = nil )
193+ { success : success , remaining_attempts : remaining_attempts , segment_names : segment_names }
194+ end
90195 end
91196 end
92197end
0 commit comments