Skip to content

Commit 2bb3fc2

Browse files
authored
Merge pull request #382 from splitio/streaming-retries-bypass
Streaming fetch limited retries and Fastly bypass implementation
2 parents daaadf4 + 5d559dd commit 2bb3fc2

23 files changed

Lines changed: 376 additions & 77 deletions

.rubocop.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Metrics/LineLength:
3131
- spec/engine/sync_manager_spec.rb
3232
- spec/engine/auth_api_client_spec.rb
3333
- spec/telemetry/synchronizer_spec.rb
34+
- spec/splitclient/split_config_spec.rb
3435

3536
Style/BracesAroundHashParameters:
3637
Exclude:
@@ -62,3 +63,4 @@ AllCops:
6263
- lib/splitclient-rb/engine/models/**/*
6364
- lib/splitclient-rb/engine/parser/**/*
6465
- spec/telemetry/synchronizer_spec.rb
66+
- lib/splitclient-rb/engine/synchronizer.rb

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
CHANGES
22

3+
7.3.1 (Jul 26, 2021)
4+
- Updated the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced.
5+
36
7.3.0 (Jul 12, 2021)
47
- Updated SDK telemetry storage, metrics and updater to be more effective and send less often.
58
- Fixed high cpu usage when api key is wrong.

lib/splitclient-rb.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@
8585
require 'splitclient-rb/engine/models/label'
8686
require 'splitclient-rb/engine/models/treatment'
8787
require 'splitclient-rb/engine/auth_api_client'
88+
require 'splitclient-rb/engine/back_off'
8889
require 'splitclient-rb/engine/push_manager'
8990
require 'splitclient-rb/engine/sync_manager'
9091
require 'splitclient-rb/engine/synchronizer'
9192
require 'splitclient-rb/utilitites'
9293

93-
# SSE
94-
require 'splitclient-rb/sse/event_source/back_off'
94+
# SSE
9595
require 'splitclient-rb/sse/event_source/client'
9696
require 'splitclient-rb/sse/event_source/event_parser'
9797
require 'splitclient-rb/sse/event_source/event_types'

lib/splitclient-rb/cache/fetchers/segment_fetcher.rb

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,19 @@ def call
3030
def fetch_segments_if_not_exists(names, cache_control_headers = false)
3131
names.each do |name|
3232
change_number = @segments_repository.get_change_number(name)
33-
34-
fetch_segment(name, cache_control_headers) if change_number == -1
33+
34+
if change_number == -1
35+
fetch_options = { cache_control_headers: cache_control_headers, till: nil }
36+
fetch_segment(name, fetch_options) if change_number == -1
37+
end
3538
end
3639
rescue StandardError => error
3740
@config.log_found_exception(__method__.to_s, error)
3841
end
3942

40-
def fetch_segment(name, cache_control_headers = false)
43+
def fetch_segment(name, fetch_options = { cache_control_headers: false, till: nil })
4144
@semaphore.synchronize do
42-
segments_api.fetch_segments_by_names([name], cache_control_headers)
45+
segments_api.fetch_segments_by_names([name], fetch_options)
4346
end
4447
rescue StandardError => error
4548
@config.log_found_exception(__method__.to_s, error)

lib/splitclient-rb/cache/fetchers/split_fetcher.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ def call
2727
end
2828
end
2929

30-
def fetch_splits(cache_control_headers = false)
30+
def fetch_splits(fetch_options = { cache_control_headers: false, till: nil })
3131
@semaphore.synchronize do
32-
data = splits_since(@splits_repository.get_change_number, cache_control_headers)
32+
data = splits_since(@splits_repository.get_change_number, fetch_options)
3333

3434
data[:splits] && data[:splits].each do |split|
3535
add_split_unless_archived(split)
@@ -68,8 +68,8 @@ def splits_thread
6868
end
6969
end
7070

71-
def splits_since(since, cache_control_headers = false)
72-
splits_api.since(since, cache_control_headers)
71+
def splits_since(since, fetch_options = { cache_control_headers: false, till: nil })
72+
splits_api.since(since, fetch_options)
7373
end
7474

7575
def add_split_unless_archived(split)

lib/splitclient-rb/engine/api/segments.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ def initialize(api_key, segments_repository, config, telemetry_runtime_producer)
1111
@telemetry_runtime_producer = telemetry_runtime_producer
1212
end
1313

14-
def fetch_segments_by_names(names, cache_control_headers = false)
14+
def fetch_segments_by_names(names, fetch_options = { cache_control_headers: false, till: nil })
1515
return if names.nil? || names.empty?
1616

1717
names.each do |name|
1818
since = @segments_repository.get_change_number(name)
19+
1920
loop do
20-
segment = fetch_segment_changes(name, since, cache_control_headers)
21+
segment = fetch_segment_changes(name, since, fetch_options)
2122
@segments_repository.add_to_segment(segment)
2223

2324
@config.split_logger.log_if_debug("Segment #{name} fetched before: #{since}, \
@@ -32,9 +33,12 @@ def fetch_segments_by_names(names, cache_control_headers = false)
3233

3334
private
3435

35-
def fetch_segment_changes(name, since, cache_control_headers = false)
36+
def fetch_segment_changes(name, since, fetch_options = { cache_control_headers: false, till: nil })
3637
start = Time.now
37-
response = get_api("#{@config.base_uri}/segmentChanges/#{name}", @api_key, { since: since }, cache_control_headers)
38+
39+
params = { since: since }
40+
params[:till] = fetch_options[:till] unless fetch_options[:till].nil?
41+
response = get_api("#{@config.base_uri}/segmentChanges/#{name}", @api_key, params, fetch_options[:cache_control_headers])
3842

3943
if response.success?
4044
segment = JSON.parse(response.body, symbolize_names: true)

lib/splitclient-rb/engine/api/splits.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ def initialize(api_key, config, telemetry_runtime_producer)
1010
@telemetry_runtime_producer = telemetry_runtime_producer
1111
end
1212

13-
def since(since, cache_control_headers = false)
13+
def since(since, fetch_options = { cache_control_headers: false, till: nil })
1414
start = Time.now
15-
16-
response = get_api("#{@config.base_uri}/splitChanges", @api_key, { since: since }, cache_control_headers)
15+
16+
params = { since: since }
17+
params[:till] = fetch_options[:till] unless fetch_options[:till].nil?
18+
response = get_api("#{@config.base_uri}/splitChanges", @api_key, params, fetch_options[:cache_control_headers])
1719
if response.success?
1820
result = splits_with_segment_names(response.body)
1921

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# frozen_string_literal: false
2+
3+
module SplitIoClient
4+
module Engine
5+
BACKOFF_MAX_ALLOWED = 1.8
6+
class BackOff
7+
def initialize(back_off_base, attempt = 0, max_allowed = BACKOFF_MAX_ALLOWED)
8+
@attempt = attempt
9+
@back_off_base = back_off_base
10+
@max_allowed = max_allowed
11+
end
12+
13+
def interval
14+
interval = 0
15+
interval = (@back_off_base * (2**@attempt)) if @attempt.positive?
16+
@attempt += 1
17+
18+
interval >= @max_allowed ? @max_allowed : interval
19+
end
20+
21+
def reset
22+
@attempt = 0
23+
end
24+
end
25+
end
26+
end

lib/splitclient-rb/engine/push_manager.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def initialize(config, sse_handler, api_key, telemetry_runtime_producer)
88
@sse_handler = sse_handler
99
@auth_api_client = AuthApiClient.new(@config, telemetry_runtime_producer)
1010
@api_key = api_key
11-
@back_off = SplitIoClient::SSE::EventSource::BackOff.new(@config.auth_retry_back_off_base, 1)
11+
@back_off = Engine::BackOff.new(@config.auth_retry_back_off_base, 1)
1212
@telemetry_runtime_producer = telemetry_runtime_producer
1313
end
1414

lib/splitclient-rb/engine/synchronizer.rb

Lines changed: 111 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ class Synchronizer
66
include SplitIoClient::Cache::Fetchers
77
include SplitIoClient::Cache::Senders
88

9-
FORCE_CACHE_CONTROL_HEADERS = true
9+
ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS = 10
10+
ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS = 60
11+
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
1012

1113
def initialize(
1214
repositories,
@@ -54,17 +56,116 @@ def stop_periodic_fetch
5456
@segment_fetcher.stop_segments_thread
5557
end
5658

57-
def fetch_splits
58-
segment_names = @split_fetcher.fetch_splits(FORCE_CACHE_CONTROL_HEADERS)
59-
@segment_fetcher.fetch_segments_if_not_exists(segment_names, FORCE_CACHE_CONTROL_HEADERS) unless segment_names.empty?
59+
def fetch_splits(target_change_number)
60+
return if target_change_number <= @splits_repository.get_change_number.to_i
61+
62+
fetch_options = { cache_control_headers: true, till: nil }
63+
64+
result = attempt_splits_sync(target_change_number,
65+
fetch_options,
66+
@config.on_demand_fetch_max_retries,
67+
@config.on_demand_fetch_retry_delay_seconds,
68+
false)
69+
70+
attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
71+
if result[:success]
72+
@segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
73+
@config.logger.debug("Refresh completed in #{attempts} attempts.") if @config.debug_enabled
74+
75+
return
76+
end
77+
78+
fetch_options[:till] = target_change_number
79+
result = attempt_splits_sync(target_change_number,
80+
fetch_options,
81+
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
82+
nil,
83+
true)
84+
85+
attempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - result[:remaining_attempts]
86+
87+
if result[:success]
88+
@segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
89+
@config.logger.debug("Refresh completed bypassing the CDN in #{attempts} attempts.") if @config.debug_enabled
90+
else
91+
@config.logger.debug("No changes fetched after #{attempts} attempts with CDN bypassed.") if @config.debug_enabled
92+
end
93+
rescue StandardError => error
94+
@config.log_found_exception(__method__.to_s, error)
6095
end
6196

62-
def fetch_segment(name)
63-
@segment_fetcher.fetch_segment(name, FORCE_CACHE_CONTROL_HEADERS)
97+
def fetch_segment(name, target_change_number)
98+
return if target_change_number <= @segments_repository.get_change_number(name).to_i
99+
100+
fetch_options = { cache_control_headers: true, till: nil }
101+
result = attempt_segment_sync(name,
102+
target_change_number,
103+
fetch_options,
104+
@config.on_demand_fetch_max_retries,
105+
@config.on_demand_fetch_retry_delay_seconds,
106+
false)
107+
108+
attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
109+
if result[:success]
110+
@config.logger.debug("Segment #{name} refresh completed in #{attempts} attempts.") if @config.debug_enabled
111+
112+
return
113+
end
114+
115+
fetch_options = { cache_control_headers: true, till: target_change_number }
116+
result = attempt_segment_sync(name,
117+
target_change_number,
118+
fetch_options,
119+
ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
120+
nil,
121+
true)
122+
123+
attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
124+
if result[:success]
125+
@config.logger.debug("Segment #{name} refresh completed bypassing the CDN in #{attempts} attempts.") if @config.debug_enabled
126+
else
127+
@config.logger.debug("No changes fetched for segment #{name} after #{attempts} attempts with CDN bypassed.") if @config.debug_enabled
128+
end
129+
rescue StandardError => error
130+
@config.log_found_exception(__method__.to_s, error)
64131
end
65132

66133
private
67134

135+
def attempt_segment_sync(name, target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
136+
remaining_attempts = max_retries
137+
backoff = Engine::BackOff.new(ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS, 0, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS) if with_backoff
138+
139+
loop do
140+
remaining_attempts -= 1
141+
142+
@segment_fetcher.fetch_segment(name, fetch_options)
143+
144+
return sync_result(true, remaining_attempts) if target_cn <= @segments_repository.get_change_number(name).to_i
145+
return sync_result(false, remaining_attempts) if remaining_attempts <= 0
146+
147+
delay = with_backoff ? backoff.interval : retry_delay_seconds
148+
sleep(delay)
149+
end
150+
end
151+
152+
def attempt_splits_sync(target_cn, fetch_options, max_retries, retry_delay_seconds, with_backoff)
153+
remaining_attempts = max_retries
154+
backoff = Engine::BackOff.new(ON_DEMAND_FETCH_BACKOFF_BASE_SECONDS, 0, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_SECONDS) if with_backoff
155+
156+
loop do
157+
remaining_attempts -= 1
158+
159+
segment_names = @split_fetcher.fetch_splits(fetch_options)
160+
161+
return sync_result(true, remaining_attempts, segment_names) if target_cn <= @splits_repository.get_change_number
162+
return sync_result(false, remaining_attempts, segment_names) if remaining_attempts <= 0
163+
164+
delay = with_backoff ? backoff.interval : retry_delay_seconds
165+
sleep(delay)
166+
end
167+
end
168+
68169
def fetch_segments
69170
@segment_fetcher.fetch_segments
70171
end
@@ -87,6 +188,10 @@ def impressions_count_sender
87188
def start_telemetry_sync_task
88189
Telemetry::SyncTask.new(@config, @telemetry_synchronizer).call
89190
end
191+
192+
def sync_result(success, remaining_attempts, segment_names = nil)
193+
{ success: success, remaining_attempts: remaining_attempts, segment_names: segment_names }
194+
end
90195
end
91196
end
92197
end

0 commit comments

Comments
 (0)