Skip to content

Commit 3b0c955

Browse files
authored
Merge pull request #396 from splitio/readiness-refactor
Readiness refactor implementation
2 parents 7717632 + 5d5075f commit 3b0c955

42 files changed

Lines changed: 474 additions & 376 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
CHANGES
22

3+
7.3.2 (Dec 10, 2021)
4+
- Updated the readiness flow to be more consistent with the other sdks and improve the readiness time.
5+
- Updated the name of telemety key latencies in Redis.
6+
37
7.3.1 (Jul 26, 2021)
48
- Updated the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced.
59

lib/splitclient-rb.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,9 @@
2828
require 'splitclient-rb/cache/senders/events_sender'
2929
require 'splitclient-rb/cache/senders/impressions_count_sender'
3030
require 'splitclient-rb/cache/senders/localhost_repo_cleaner'
31-
require 'splitclient-rb/cache/stores/store_utils'
3231
require 'splitclient-rb/cache/stores/localhost_split_builder'
33-
require 'splitclient-rb/cache/stores/sdk_blocker'
3432
require 'splitclient-rb/cache/stores/localhost_split_store'
33+
require 'splitclient-rb/cache/stores/store_utils'
3534

3635
require 'splitclient-rb/clients/split_client'
3736
require 'splitclient-rb/managers/split_manager'
@@ -87,6 +86,7 @@
8786
require 'splitclient-rb/engine/auth_api_client'
8887
require 'splitclient-rb/engine/back_off'
8988
require 'splitclient-rb/engine/push_manager'
89+
require 'splitclient-rb/engine/status_manager'
9090
require 'splitclient-rb/engine/sync_manager'
9191
require 'splitclient-rb/engine/synchronizer'
9292
require 'splitclient-rb/utilitites'

lib/splitclient-rb/cache/fetchers/segment_fetcher.rb

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ module Fetchers
44
class SegmentFetcher
55
attr_reader :segments_repository
66

7-
def initialize(segments_repository, api_key, config, sdk_blocker, telemetry_runtime_producer)
7+
def initialize(segments_repository, api_key, config, telemetry_runtime_producer)
88
@segments_repository = segments_repository
99
@api_key = api_key
1010
@config = config
11-
@sdk_blocker = sdk_blocker
1211
@semaphore = Mutex.new
1312
@telemetry_runtime_producer = telemetry_runtime_producer
1413
end
@@ -52,11 +51,11 @@ def fetch_segments
5251
@semaphore.synchronize do
5352
segments_api.fetch_segments_by_names(@segments_repository.used_segment_names)
5453

55-
@sdk_blocker.segments_ready!
56-
@sdk_blocker.sdk_internal_ready
54+
true
5755
end
5856
rescue StandardError => error
5957
@config.log_found_exception(__method__.to_s, error)
58+
false
6059
end
6160

6261
def stop_segments_thread
@@ -70,11 +69,6 @@ def segments_thread
7069
@config.logger.info('Starting segments fetcher service') if @config.debug_enabled
7170

7271
loop do
73-
unless @sdk_blocker.splits_repository.ready?
74-
sleep 0.2
75-
next
76-
end
77-
7872
fetch_segments
7973
@config.logger.debug("Segment names: #{@segments_repository.used_segment_names.to_a}") if @config.debug_enabled
8074

lib/splitclient-rb/cache/fetchers/split_fetcher.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ module Fetchers
44
class SplitFetcher
55
attr_reader :splits_repository
66

7-
def initialize(splits_repository, api_key, config, sdk_blocker, telemetry_runtime_producer)
7+
def initialize(splits_repository, api_key, config, telemetry_runtime_producer)
88
@splits_repository = splits_repository
99
@api_key = api_key
1010
@config = config
11-
@sdk_blocker = sdk_blocker
1211
@semaphore = Mutex.new
1312
@telemetry_runtime_producer = telemetry_runtime_producer
1413
end
@@ -40,13 +39,11 @@ def fetch_splits(fetch_options = { cache_control_headers: false, till: nil })
4039

4140
@config.logger.debug("segments seen(#{data[:segment_names].length}): #{data[:segment_names].to_a}") if @config.debug_enabled
4241

43-
@sdk_blocker.splits_ready!
44-
45-
data[:segment_names]
42+
{ segment_names: data[:segment_names], success: true }
4643
end
4744
rescue StandardError => error
4845
@config.log_found_exception(__method__.to_s, error)
49-
[]
46+
{ segment_names: [], success: false }
5047
end
5148

5249
def stop_splits_thread

lib/splitclient-rb/cache/stores/localhost_split_store.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ class LocalhostSplitStore
77
require 'yaml'
88
attr_reader :splits_repository
99

10-
def initialize(splits_repository, config, sdk_blocker = nil)
10+
def initialize(splits_repository, config, status_manager = nil)
1111
@splits_repository = splits_repository
1212
@config = config
13-
@sdk_blocker = sdk_blocker
13+
@status_manager = status_manager
1414
end
1515

1616
def call
@@ -45,10 +45,7 @@ def store_splits
4545
store_split(split)
4646
end
4747

48-
if @sdk_blocker
49-
@sdk_blocker.splits_ready!
50-
@sdk_blocker.segments_ready!
51-
end
48+
@status_manager.ready! if @status_manager
5249
rescue StandardError => error
5350
@config.logger.error('Error while parsing the split file. ' \
5451
'Check that the input file matches the expected format')

lib/splitclient-rb/cache/stores/sdk_blocker.rb

Lines changed: 0 additions & 64 deletions
This file was deleted.

lib/splitclient-rb/clients/split_client.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ class SplitClient
1414
# @param api_key [String] the API key for your split account
1515
#
1616
# @return [SplitIoClient] split.io client instance
17-
def initialize(api_key, repositories, sdk_blocker, config, impressions_manager, telemetry_evaluation_producer)
17+
def initialize(api_key, repositories, status_manager, config, impressions_manager, telemetry_evaluation_producer)
1818
@api_key = api_key
1919
@splits_repository = repositories[:splits]
2020
@segments_repository = repositories[:segments]
2121
@impressions_repository = repositories[:impressions]
2222
@events_repository = repositories[:events]
23-
@sdk_blocker = sdk_blocker
23+
@status_manager = status_manager
2424
@destroyed = false
2525
@config = config
2626
@impressions_manager = impressions_manager
@@ -137,7 +137,7 @@ def parsed_treatment(multiple, treatment_data)
137137
else
138138
{
139139
treatment: treatment_data[:treatment],
140-
config: treatment_data[:config]
140+
config: treatment_data[:config],
141141
}
142142
end
143143
end
@@ -157,7 +157,7 @@ def sanitize_split_names(calling_method, split_names)
157157
end
158158

159159
def block_until_ready(time = nil)
160-
@sdk_blocker.block(time) if @sdk_blocker && !@sdk_blocker.ready?
160+
@status_manager.wait_until_ready(time) if @status_manager
161161
end
162162

163163
private
@@ -310,7 +310,7 @@ def variable_size(value)
310310
end
311311

312312
def ready?
313-
return @sdk_blocker.ready? if @sdk_blocker
313+
return @status_manager.ready? if @status_manager
314314
true
315315
end
316316

lib/splitclient-rb/engine/common/impressions_manager.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ def build_impression(matching_key, bucketing_key, split_name, treatment, params
2121
@impression_counter.inc(split_name, impression_data[:m]) if optimized? && !redis?
2222

2323
impression(impression_data, params[:attributes])
24-
rescue StandardError => error
25-
@config.log_found_exception(__method__.to_s, error)
24+
rescue StandardError => e
25+
@config.log_found_exception(__method__.to_s, e)
2626
end
2727

2828
def track(impressions)
@@ -48,8 +48,8 @@ def track(impressions)
4848
end
4949

5050
record_stats(queued, dropped, dedupe)
51-
rescue StandardError => error
52-
@config.log_found_exception(__method__.to_s, error)
51+
rescue StandardError => e
52+
@config.log_found_exception(__method__.to_s, e)
5353
end
5454

5555
private
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
module SplitIoClient
4+
module Engine
5+
class StatusManager
6+
def initialize(config)
7+
@config = config
8+
@sdk_ready = Concurrent::CountDownLatch.new(1)
9+
end
10+
11+
def ready?
12+
return true if @config.consumer?
13+
14+
@sdk_ready.wait(0)
15+
end
16+
17+
def ready!
18+
return if ready?
19+
20+
@sdk_ready.count_down
21+
@config.logger.info('SplitIO SDK is ready')
22+
end
23+
24+
def wait_until_ready(seconds = nil)
25+
return if @config.consumer?
26+
27+
timeout = seconds || @config.block_until_ready
28+
29+
raise SDKBlockerTimeoutExpiredException, 'SDK start up timeout expired' unless @sdk_ready.wait(timeout)
30+
end
31+
end
32+
end
33+
end

lib/splitclient-rb/engine/sync_manager.rb

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ def initialize(
1212
config,
1313
synchronizer,
1414
telemetry_runtime_producer,
15-
sdk_blocker,
16-
telemetry_synchronizer
15+
telemetry_synchronizer,
16+
status_manager
1717
)
1818
@synchronizer = synchronizer
1919
notification_manager_keeper = SSE::NotificationManagerKeeper.new(config, telemetry_runtime_producer) do |manager|
@@ -33,55 +33,41 @@ def initialize(
3333
@sse_connected = Concurrent::AtomicBoolean.new(false)
3434
@config = config
3535
@telemetry_runtime_producer = telemetry_runtime_producer
36-
@sdk_blocker = sdk_blocker
3736
@telemetry_synchronizer = telemetry_synchronizer
37+
@status_manager = status_manager
3838
end
3939

4040
def start
41-
if @config.streaming_enabled
42-
start_stream
43-
start_stream_forked if defined?(PhusionPassenger)
44-
elsif @config.standalone?
45-
start_poll
46-
end
47-
48-
synchronize_telemetry_config
41+
start_thread
42+
PhusionPassenger.on_event(:starting_worker_process) { |forked| start_thread if forked } if defined?(PhusionPassenger)
4943
end
5044

5145
private
5246

53-
# Starts tasks if stream is enabled.
54-
def start_stream
55-
@config.logger.debug('Starting push mode ...')
56-
@synchronizer.sync_all
57-
@synchronizer.start_periodic_data_recording
58-
59-
start_sse_connection_thread
60-
end
47+
def start_thread
48+
@config.threads[:start_sdk] = Thread.new do
49+
sleep(0.5) until @synchronizer.sync_all(false)
6150

62-
def start_poll
63-
@config.logger.debug('Starting polling mode ...')
64-
@synchronizer.start_periodic_fetch
65-
@synchronizer.start_periodic_data_recording
66-
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
67-
rescue StandardError => e
68-
@config.logger.error("start_poll error : #{e.inspect}")
69-
end
51+
@status_manager.ready!
52+
@telemetry_synchronizer.synchronize_config
53+
@synchronizer.start_periodic_data_recording
54+
connected = false
7055

71-
# Starts thread which connect to sse and after that fetch splits and segments once.
72-
def start_sse_connection_thread
73-
@config.threads[:sync_manager_start_sse] = Thread.new do
74-
begin
56+
if @config.streaming_enabled
57+
@config.logger.debug('Starting Straming mode ...')
7558
connected = @push_manager.start_sse
76-
@synchronizer.start_periodic_fetch unless connected
77-
rescue StandardError => e
78-
@config.logger.error("start_sse_connection_thread error : #{e.inspect}")
59+
60+
if defined?(PhusionPassenger)
61+
PhusionPassenger.on_event(:starting_worker_process) { |forked| sse_thread_forked if forked }
62+
end
7963
end
80-
end
81-
end
8264

83-
def start_stream_forked
84-
PhusionPassenger.on_event(:starting_worker_process) { |forked| start_stream if forked }
65+
unless connected
66+
@config.logger.debug('Starting polling mode ...')
67+
@synchronizer.start_periodic_fetch
68+
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
69+
end
70+
end
8571
end
8672

8773
def process_action(action)
@@ -165,16 +151,9 @@ def record_telemetry(type, data)
165151
@telemetry_runtime_producer.record_streaming_event(type, data)
166152
end
167153

168-
def synchronize_telemetry_config
169-
@config.threads[:telemetry_config_sender] = Thread.new do
170-
begin
171-
@sdk_blocker.wait_unitil_internal_ready unless @config.consumer?
172-
@telemetry_synchronizer.synchronize_config
173-
rescue SplitIoClient::SDKShutdownException
174-
@telemetry_synchronizer.synchronize_config
175-
@config.logger.info('Posting Telemetry config due to shutdown')
176-
end
177-
end
154+
def sse_thread_forked
155+
connected = @push_manager.start_sse
156+
@synchronizer.start_periodic_fetch unless connected
178157
end
179158
end
180159
end

0 commit comments

Comments
 (0)