Skip to content

Commit dd2ec2d

Browse files
authored
Merge pull request #412 from splitio/development
[7.3.4] Development into master
2 parents 0be3a73 + 3ecef45 commit dd2ec2d

21 files changed

Lines changed: 573 additions & 848 deletions

.rubocop.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,18 @@ Metrics/MethodLength:
1010
Max: 20
1111
Exclude:
1212
- lib/splitclient-rb/telemetry/memory/memory_synchronizer.rb
13+
- lib/splitclient-rb/engine/sync_manager.rb
1314

1415
Metrics/ClassLength:
1516
Max: 150
1617
Exclude:
1718
- lib/splitclient-rb/telemetry/memory/memory_synchronizer.rb
1819

1920
Metrics/CyclomaticComplexity:
20-
Max: 8
21+
Max: 11
2122

2223
Metrics/ParameterLists:
24+
Max: 8
2325
Exclude:
2426
- lib/splitclient-rb/engine/sync_manager.rb
2527

@@ -32,6 +34,7 @@ Metrics/LineLength:
3234
- spec/engine/auth_api_client_spec.rb
3335
- spec/telemetry/synchronizer_spec.rb
3436
- spec/splitclient/split_config_spec.rb
37+
- spec/engine/push_manager_spec.rb
3538

3639
Style/BracesAroundHashParameters:
3740
Exclude:

CHANGES.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
CHANGES
22

3-
7.3.3 (Jan 28, 2021)
3+
7.3.4 (Feb 21, 2022)
4+
- Updated streaming events architecture with a new queue logic.
5+
- Fixed redis integration Pipelining command deprecation warning.
6+
7+
7.3.3 (Jan 28, 2022)
48
- Fixed edge cases where the sdk lost streaming connection.
59
- Updated default auth service url to https://auth.split.io/api/v2/auth
610
- Updated dependencies:

lib/splitclient-rb/cache/adapters/redis_adapter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def inc(key, inc = 1)
160160
end
161161

162162
def pipelined
163-
@redis.pipelined do
163+
@redis.pipelined do |pipeline|
164164
yield
165165
end
166166
end

lib/splitclient-rb/constants.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ class SplitIoClient::Constants
1212
PUSH_SUBSYSTEM_DOWN = 'PUSH_SUBSYSTEM_DOWN'
1313
PUSH_SUBSYSTEM_READY = 'PUSH_SUBSYSTEM_READY'
1414
PUSH_SUBSYSTEM_OFF = 'PUSH_SUBSYSTEM_OFF'
15+
PUSH_FORCED_STOP = 'PUSH_FORCED_STOP'
1516
end

lib/splitclient-rb/engine/sync_manager.rb

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,23 @@ class SyncManager
66
SYNC_MODE_STREAMING = 0
77
SYNC_MODE_POLLING = 1
88

9-
def initialize(
10-
repositories,
11-
api_key,
12-
config,
13-
synchronizer,
14-
telemetry_runtime_producer,
15-
telemetry_synchronizer,
16-
status_manager
17-
)
18-
@synchronizer = synchronizer
19-
notification_manager_keeper = SSE::NotificationManagerKeeper.new(config, telemetry_runtime_producer) do |manager|
20-
manager.on_action { |action| process_action(action) }
21-
end
22-
@sse_handler = SSE::SSEHandler.new(
23-
{ config: config, api_key: api_key },
24-
@synchronizer,
25-
repositories,
26-
notification_manager_keeper,
27-
telemetry_runtime_producer
28-
) do |handler|
29-
handler.on_action { |action| process_action(action) }
30-
end
31-
32-
@push_manager = PushManager.new(config, @sse_handler, api_key, telemetry_runtime_producer)
33-
@sse_connected = Concurrent::AtomicBoolean.new(false)
9+
def initialize(config,
10+
synchronizer,
11+
telemetry_runtime_producer,
12+
telemetry_synchronizer,
13+
status_manager,
14+
sse_handler,
15+
push_manager,
16+
status_queue)
3417
@config = config
18+
@synchronizer = synchronizer
3519
@telemetry_runtime_producer = telemetry_runtime_producer
3620
@telemetry_synchronizer = telemetry_synchronizer
3721
@status_manager = status_manager
22+
@sse_handler = sse_handler
23+
@push_manager = push_manager
24+
@status_queue = status_queue
25+
@sse_connected = Concurrent::AtomicBoolean.new(false)
3826
end
3927

4028
def start
@@ -55,6 +43,7 @@ def start_thread
5543

5644
if @config.streaming_enabled
5745
@config.logger.debug('Starting Straming mode ...')
46+
start_push_status_monitor
5847
connected = @push_manager.start_sse
5948
end
6049

@@ -66,27 +55,6 @@ def start_thread
6655
end
6756
end
6857

69-
def process_action(action)
70-
case action
71-
when Constants::PUSH_CONNECTED
72-
process_connected
73-
when Constants::PUSH_RETRYABLE_ERROR
74-
process_disconnect(true)
75-
when Constants::PUSH_NONRETRYABLE_ERROR
76-
process_disconnect(false)
77-
when Constants::PUSH_SUBSYSTEM_DOWN
78-
process_subsystem_down
79-
when Constants::PUSH_SUBSYSTEM_READY
80-
process_subsystem_ready
81-
when Constants::PUSH_SUBSYSTEM_OFF
82-
process_push_shutdown
83-
else
84-
@config.logger.debug('Incorrect action type.')
85-
end
86-
rescue StandardError => e
87-
@config.logger.error("process_action error: #{e.inspect}")
88-
end
89-
9058
def process_subsystem_ready
9159
@synchronizer.stop_periodic_fetch
9260
@synchronizer.sync_all
@@ -124,6 +92,19 @@ def process_connected
12492
@config.logger.error("process_connected error: #{e.inspect}")
12593
end
12694

95+
def process_forced_stop
96+
unless @sse_connected.value
97+
@config.logger.debug('Streaming already disconnected.')
98+
return
99+
end
100+
101+
@sse_connected.make_false
102+
@synchronizer.start_periodic_fetch
103+
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
104+
rescue StandardError => e
105+
@config.logger.error("process_connected error: #{e.inspect}")
106+
end
107+
127108
def process_disconnect(reconnect)
128109
unless @sse_connected.value
129110
@config.logger.debug('Streaming already disconnected.')
@@ -147,6 +128,40 @@ def process_disconnect(reconnect)
147128
def record_telemetry(type, data)
148129
@telemetry_runtime_producer.record_streaming_event(type, data)
149130
end
131+
132+
def start_push_status_monitor
133+
@config.threads[:push_status_handler] = Thread.new do
134+
@config.logger.debug('Starting push status handler ...') if @config.debug_enabled
135+
incoming_push_status_handler
136+
end
137+
end
138+
139+
def incoming_push_status_handler
140+
while (status = @status_queue.pop)
141+
@config.logger.debug("Push status handler dequeue #{status}") if @config.debug_enabled
142+
143+
case status
144+
when Constants::PUSH_CONNECTED
145+
process_connected
146+
when Constants::PUSH_RETRYABLE_ERROR
147+
process_disconnect(true)
148+
when Constants::PUSH_FORCED_STOP
149+
process_forced_stop
150+
when Constants::PUSH_NONRETRYABLE_ERROR
151+
process_disconnect(false)
152+
when Constants::PUSH_SUBSYSTEM_DOWN
153+
process_subsystem_down
154+
when Constants::PUSH_SUBSYSTEM_READY
155+
process_subsystem_ready
156+
when Constants::PUSH_SUBSYSTEM_OFF
157+
process_push_shutdown
158+
else
159+
@config.logger.debug('Incorrect push status type.')
160+
end
161+
end
162+
rescue StandardError => e
163+
@config.logger.error("Push status handler error: #{e.inspect}")
164+
end
150165
end
151166
end
152167
end

lib/splitclient-rb/split_factory.rb

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,10 @@ def initialize(api_key, config_hash = {})
3333
register_factory
3434

3535
build_telemetry_components
36+
build_repositories
37+
build_impressions_components
38+
build_telemetry_synchronizer
3639

37-
@splits_repository = SplitsRepository.new(@config)
38-
@segments_repository = SegmentsRepository.new(@config)
39-
@impressions_repository = ImpressionsRepository.new(@config)
40-
@events_repository = EventsRepository.new(@config, @api_key, @runtime_producer)
41-
@impression_counter = SplitIoClient::Engine::Common::ImpressionCounter.new
42-
@impressions_manager = SplitIoClient::Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer)
43-
@telemetry_api = SplitIoClient::Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
44-
@telemetry_synchronizer = Telemetry::Synchronizer.new(@config, @telemetry_consumers, @init_producer, repositories, @telemetry_api)
4540
@status_manager = Engine::StatusManager.new(@config)
4641

4742
start!
@@ -59,18 +54,12 @@ def start!
5954
return
6055
end
6156

62-
split_fetcher = SplitFetcher.new(@splits_repository, @api_key, config, @runtime_producer)
63-
segment_fetcher = SegmentFetcher.new(@segments_repository, @api_key, config, @runtime_producer)
64-
params = {
65-
split_fetcher: split_fetcher,
66-
segment_fetcher: segment_fetcher,
67-
imp_counter: @impression_counter,
68-
telemetry_runtime_producer: @runtime_producer,
69-
telemetry_synchronizer: @telemetry_synchronizer
70-
}
57+
build_fetchers
58+
build_synchronizer
59+
build_streaming_components
60+
build_sync_manager
7161

72-
synchronizer = SplitIoClient::Engine::Synchronizer.new(repositories, @api_key, @config, params)
73-
SplitIoClient::Engine::SyncManager.new(repositories, @api_key, @config, synchronizer, @runtime_producer, @telemetry_synchronizer, @status_manager).start
62+
@sync_manager.start
7463
end
7564

7665
def stop!
@@ -166,6 +155,56 @@ def build_telemetry_components
166155
@runtime_producer = Telemetry::RuntimeProducer.new(@config)
167156

168157
@telemetry_consumers = { init: @init_consumer, evaluation: @evaluation_consumer, runtime: @runtime_consumer }
169-
end
158+
end
159+
160+
def build_fetchers
161+
@split_fetcher = SplitFetcher.new(@splits_repository, @api_key, @config, @runtime_producer)
162+
@segment_fetcher = SegmentFetcher.new(@segments_repository, @api_key, @config, @runtime_producer)
163+
end
164+
165+
def build_synchronizer
166+
params = {
167+
split_fetcher: @split_fetcher,
168+
segment_fetcher: @segment_fetcher,
169+
imp_counter: @impression_counter,
170+
telemetry_runtime_producer: @runtime_producer,
171+
telemetry_synchronizer: @telemetry_synchronizer
172+
}
173+
174+
@synchronizer = Engine::Synchronizer.new(repositories, @api_key, @config, params)
175+
end
176+
177+
def build_streaming_components
178+
@push_status_queue = Queue.new
179+
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository)
180+
segments_worker = SSE::Workers::SegmentsWorker.new(@synchronizer, @config, @segments_repository)
181+
notification_manager_keeper = SSE::NotificationManagerKeeper.new(@config, @runtime_producer, @push_status_queue)
182+
notification_processor = SSE::NotificationProcessor.new(@config, splits_worker, segments_worker)
183+
event_parser = SSE::EventSource::EventParser.new(config)
184+
sse_client = SSE::EventSource::Client.new(@config, @api_key, @runtime_producer, event_parser, notification_manager_keeper, notification_processor, @push_status_queue)
185+
@sse_handler = SSE::SSEHandler.new(@config, splits_worker, segments_worker, sse_client)
186+
@push_manager = Engine::PushManager.new(@config, @sse_handler, @api_key, @runtime_producer)
187+
end
188+
189+
def build_sync_manager
190+
@sync_manager = Engine::SyncManager.new(@config, @synchronizer, @runtime_producer, @telemetry_synchronizer, @status_manager, @sse_handler, @push_manager, @push_status_queue)
191+
end
192+
193+
def build_repositories
194+
@splits_repository = SplitsRepository.new(@config)
195+
@segments_repository = SegmentsRepository.new(@config)
196+
@impressions_repository = ImpressionsRepository.new(@config)
197+
@events_repository = EventsRepository.new(@config, @api_key, @runtime_producer)
198+
end
199+
200+
def build_telemetry_synchronizer
201+
telemetry_api = Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
202+
@telemetry_synchronizer = Telemetry::Synchronizer.new(@config, @telemetry_consumers, @init_producer, repositories, telemetry_api)
203+
end
204+
205+
def build_impressions_components
206+
@impression_counter = Engine::Common::ImpressionCounter.new
207+
@impressions_manager = Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer)
208+
end
170209
end
171210
end

0 commit comments

Comments
 (0)