Skip to content

Commit 023fe62

Browse files
committed
Added StatusQueue and listener
1 parent f1afd50 commit 023fe62

10 files changed

Lines changed: 100 additions & 58 deletions

File tree

.rubocop.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ Metrics/MethodLength:
1010
Max: 20
1111
Exclude:
1212
- lib/splitclient-rb/telemetry/memory/memory_synchronizer.rb
13+
- lib/splitclient-rb/engine/sync_manager.rb
1314

1415
Metrics/ClassLength:
1516
Max: 150
1617
Exclude:
1718
- lib/splitclient-rb/telemetry/memory/memory_synchronizer.rb
1819

1920
Metrics/CyclomaticComplexity:
20-
Max: 8
21+
Max: 11
2122

2223
Metrics/ParameterLists:
2324
Max: 8

lib/splitclient-rb/constants.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ class SplitIoClient::Constants
1212
PUSH_SUBSYSTEM_DOWN = 'PUSH_SUBSYSTEM_DOWN'
1313
PUSH_SUBSYSTEM_READY = 'PUSH_SUBSYSTEM_READY'
1414
PUSH_SUBSYSTEM_OFF = 'PUSH_SUBSYSTEM_OFF'
15+
PUSH_FORCED_STOP = 'PUSH_FORCED_STOP'
1516
end

lib/splitclient-rb/engine/sync_manager.rb

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ def initialize(config,
1212
telemetry_synchronizer,
1313
status_manager,
1414
sse_handler,
15-
push_manager)
15+
push_manager,
16+
status_queue)
1617
@config = config
1718
@synchronizer = synchronizer
1819
@telemetry_runtime_producer = telemetry_runtime_producer
1920
@telemetry_synchronizer = telemetry_synchronizer
2021
@status_manager = status_manager
2122
@sse_handler = sse_handler
2223
@push_manager = push_manager
24+
@status_queue = status_queue
2325
@sse_connected = Concurrent::AtomicBoolean.new(false)
2426
end
2527

@@ -41,6 +43,7 @@ def start_thread
4143

4244
if @config.streaming_enabled
4345
@config.logger.debug('Starting Straming mode ...')
46+
start_push_status_monitor
4447
connected = @push_manager.start_sse
4548
end
4649

@@ -52,27 +55,6 @@ def start_thread
5255
end
5356
end
5457

55-
def process_action(action)
56-
case action
57-
when Constants::PUSH_CONNECTED
58-
process_connected
59-
when Constants::PUSH_RETRYABLE_ERROR
60-
process_disconnect(true)
61-
when Constants::PUSH_NONRETRYABLE_ERROR
62-
process_disconnect(false)
63-
when Constants::PUSH_SUBSYSTEM_DOWN
64-
process_subsystem_down
65-
when Constants::PUSH_SUBSYSTEM_READY
66-
process_subsystem_ready
67-
when Constants::PUSH_SUBSYSTEM_OFF
68-
process_push_shutdown
69-
else
70-
@config.logger.debug('Incorrect action type.')
71-
end
72-
rescue StandardError => e
73-
@config.logger.error("process_action error: #{e.inspect}")
74-
end
75-
7658
def process_subsystem_ready
7759
@synchronizer.stop_periodic_fetch
7860
@synchronizer.sync_all
@@ -110,6 +92,19 @@ def process_connected
11092
@config.logger.error("process_connected error: #{e.inspect}")
11193
end
11294

95+
def process_forced_stop
96+
unless @sse_connected.value
97+
@config.logger.debug('Streaming already disconnected.')
98+
return
99+
end
100+
101+
@sse_connected.make_false
102+
@synchronizer.start_periodic_fetch
103+
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
104+
rescue StandardError => e
105+
@config.logger.error("process_connected error: #{e.inspect}")
106+
end
107+
113108
def process_disconnect(reconnect)
114109
unless @sse_connected.value
115110
@config.logger.debug('Streaming already disconnected.')
@@ -133,6 +128,40 @@ def process_disconnect(reconnect)
133128
def record_telemetry(type, data)
134129
@telemetry_runtime_producer.record_streaming_event(type, data)
135130
end
131+
132+
def start_push_status_monitor
133+
@config.threads[:push_status_handler] = Thread.new do
134+
@config.logger.debug('Starting push status handler ...') if @config.debug_enabled
135+
incoming_push_status_handler
136+
end
137+
end
138+
139+
def incoming_push_status_handler
140+
while (status = @status_queue.pop)
141+
@config.logger.debug("Push status handler dequeue #{status}") if @config.debug_enabled
142+
143+
case status
144+
when Constants::PUSH_CONNECTED
145+
process_connected
146+
when Constants::PUSH_RETRYABLE_ERROR
147+
process_disconnect(true)
148+
when Constants::PUSH_FORCED_STOP
149+
process_forced_stop
150+
when Constants::PUSH_NONRETRYABLE_ERROR
151+
process_disconnect(false)
152+
when Constants::PUSH_SUBSYSTEM_DOWN
153+
process_subsystem_down
154+
when Constants::PUSH_SUBSYSTEM_READY
155+
process_subsystem_ready
156+
when Constants::PUSH_SUBSYSTEM_OFF
157+
process_push_shutdown
158+
else
159+
@config.logger.debug('Incorrect push status type.')
160+
end
161+
end
162+
rescue StandardError => e
163+
@config.logger.error("Push status handler error: #{e.inspect}")
164+
end
136165
end
137166
end
138167
end

lib/splitclient-rb/split_factory.rb

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -171,22 +171,23 @@ def build_synchronizer
171171
telemetry_synchronizer: @telemetry_synchronizer
172172
}
173173

174-
@synchronizer = SplitIoClient::Engine::Synchronizer.new(repositories, @api_key, @config, params)
175-
end
176-
177-
def build_sync_manager
178-
@sync_manager = SplitIoClient::Engine::SyncManager.new(@config, @synchronizer, @runtime_producer, @telemetry_synchronizer, @status_manager, @sse_handler, @push_manager)
174+
@synchronizer = Engine::Synchronizer.new(repositories, @api_key, @config, params)
179175
end
180176

181177
def build_streaming_components
182-
splits_worker = SplitIoClient::SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository)
183-
segments_worker = SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, @config, @segments_repository)
178+
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository)
179+
segments_worker = SSE::Workers::SegmentsWorker.new(@synchronizer, @config, @segments_repository)
184180
notification_manager_keeper = SSE::NotificationManagerKeeper.new(@config, @runtime_producer)
185-
notification_processor = SplitIoClient::SSE::NotificationProcessor.new(@config, @splits_worker, @segments_worker)
181+
notification_processor = SSE::NotificationProcessor.new(@config, splits_worker, segments_worker)
186182
event_parser = SSE::EventSource::EventParser.new(config)
187-
sse_client = SSE::EventSource::Client.new(@config, @api_key, @runtime_producer, event_parser, notification_manager_keeper, notification_processor)
183+
@push_status_queue = Queue.new
184+
sse_client = SSE::EventSource::Client.new(@config, @api_key, @runtime_producer, event_parser, notification_manager_keeper, notification_processor, @push_status_queue)
188185
@sse_handler = SSE::SSEHandler.new(@config, splits_worker, segments_worker, sse_client)
189-
@push_manager = PushManager.new(@config, @sse_handler, @api_key, @runtime_producer)
186+
@push_manager = Engine::PushManager.new(@config, @sse_handler, @api_key, @runtime_producer)
187+
end
188+
189+
def build_sync_manager
190+
@sync_manager = Engine::SyncManager.new(@config, @synchronizer, @runtime_producer, @telemetry_synchronizer, @status_manager, @sse_handler, @push_manager, @push_status_queue)
190191
end
191192

192193
def build_repositories
@@ -197,13 +198,13 @@ def build_repositories
197198
end
198199

199200
def build_telemetry_synchronizer
200-
telemetry_api = SplitIoClient::Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
201+
telemetry_api = Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
201202
@telemetry_synchronizer = Telemetry::Synchronizer.new(@config, @telemetry_consumers, @init_producer, repositories, telemetry_api)
202203
end
203204

204205
def build_impressions_components
205-
@impression_counter = SplitIoClient::Engine::Common::ImpressionCounter.new
206-
@impressions_manager = SplitIoClient::Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer)
206+
@impression_counter = Engine::Common::ImpressionCounter.new
207+
@impressions_manager = Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer)
207208
end
208209
end
209210
end

lib/splitclient-rb/sse/event_source/client.rb

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,30 @@ def initialize(config,
1919
event_parser,
2020
notification_manager_keeper,
2121
notification_processor,
22+
status_queue,
2223
read_timeout: DEFAULT_READ_TIMEOUT)
2324
@config = config
2425
@api_key = api_key
2526
@telemetry_runtime_producer = telemetry_runtime_producer
2627
@event_parser = event_parser
2728
@notification_manager_keeper = notification_manager_keeper
2829
@notification_processor = notification_processor
30+
@status_queue = status_queue
2931
@read_timeout = read_timeout
3032
@connected = Concurrent::AtomicBoolean.new(false)
3133
@first_event = Concurrent::AtomicBoolean.new(true)
3234
@socket = nil
3335
end
3436

35-
def close(action = nil)
37+
def close(status = nil)
3638
unless connected?
3739
@config.logger.error('SSEClient already disconected.') if @config.debug_enabled
3840
return
3941
end
4042

4143
@connected.make_false
4244
@socket&.close
43-
dispatch_action(action) unless action.nil?
45+
push_status(status)
4446
rescue StandardError => e
4547
@config.logger.error("SSEClient close Error: #{e.inspect}")
4648
end
@@ -73,8 +75,8 @@ def connected?
7375
def connect_thread(latch)
7476
@config.threads[:connect_stream] = Thread.new do
7577
@config.logger.info('Starting connect_stream thread ...') if @config.debug_enabled
76-
action = connect_stream(latch)
77-
dispatch_action(action) unless action.nil?
78+
new_status = connect_stream(latch)
79+
push_status(new_status)
7880
@config.logger.info('connect_stream thread finished.') if @config.debug_enabled
7981
end
8082
end
@@ -129,7 +131,7 @@ def read_first_event(data, latch)
129131
if response_code == OK_CODE && !error_event
130132
@connected.make_true
131133
@telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil)
132-
dispatch_action(Constants::PUSH_CONNECTED)
134+
push_status(Constants::PUSH_CONNECTED)
133135
end
134136

135137
latch.count_down
@@ -192,8 +194,11 @@ def dispatch_event(event)
192194
end
193195
end
194196

195-
def dispatch_action(action)
196-
# TODO: will use status queue here.
197+
def push_status(status)
198+
return if status.nil?
199+
200+
@config.logger.debug("Pushing new sse status: #{status}")
201+
@status_queue.push(status)
197202
end
198203
end
199204
end

lib/splitclient-rb/sse/sse_handler.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def start(token_jwt, channels)
2121
end
2222

2323
def stop
24-
@sse_client.close(Constants::PUSH_NONRETRYABLE_ERROR)
24+
@sse_client.close(Constants::PUSH_FORCED_STOP)
2525
stop_workers
2626
rescue StandardError => e
2727
@config.logger.debug("SSEHandler stop error: #{e.inspect}") if @config.debug_enabled

spec/engine/push_manager_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
end
3131
let(:synchronizer) { SplitIoClient::Engine::Synchronizer.new(repositories, api_key, config, params) }
3232
let(:event_parser) { SplitIoClient::SSE::EventSource::EventParser.new(config) }
33-
let(:sse_client) { SplitIoClient::SSE::EventSource::Client.new(config, api_key, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor) }
33+
let(:push_status_queue) { Queue.new }
34+
let(:sse_client) { SplitIoClient::SSE::EventSource::Client.new(config, api_key, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) }
3435

3536
context 'start_sse' do
3637
it 'must connect to server' do

spec/engine/sync_manager_spec.rb

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@
5151
let(:segments_worker) { SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, config, segments_repository) }
5252
let(:notification_processor) { SplitIoClient::SSE::NotificationProcessor.new(config, splits_worker, segments_worker) }
5353
let(:event_parser) { SplitIoClient::SSE::EventSource::EventParser.new(config) }
54-
let(:sse_client) { SplitIoClient::SSE::EventSource::Client.new(config, api_key, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor) }
54+
let(:push_status_queue) { Queue.new }
55+
let(:sse_client) { SplitIoClient::SSE::EventSource::Client.new(config, api_key, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) }
5556
let(:sse_handler) { SplitIoClient::SSE::SSEHandler.new(config, splits_worker, segments_worker, sse_client) }
5657
let(:push_manager) { SplitIoClient::Engine::PushManager.new(config, sse_handler, api_key, telemetry_runtime_producer) }
58+
let(:push_status_queue) { Queue.new }
5759

5860
before do
5961
mock_split_changes_with_since(splits, '-1')
@@ -75,7 +77,7 @@
7577

7678
config.streaming_service_url = server.base_uri
7779

78-
sync_manager = subject.new(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer, status_manager, sse_handler, push_manager)
80+
sync_manager = subject.new(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer, status_manager, sse_handler, push_manager, push_status_queue)
7981
sync_manager.start
8082

8183
sleep(2)
@@ -94,7 +96,7 @@
9496
config.streaming_service_url = 'https://fake-sse.io'
9597
config.connection_timeout = 1
9698

97-
sync_manager = subject.new(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer, status_manager, sse_handler, push_manager)
99+
sync_manager = subject.new(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer, status_manager, sse_handler, push_manager, push_status_queue)
98100
sync_manager.start
99101

100102
sleep(2)
@@ -112,7 +114,7 @@
112114

113115
config.streaming_service_url = server.base_uri
114116

115-
sync_manager = subject.new(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer, status_manager, sse_handler, push_manager)
117+
sync_manager = subject.new(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer, status_manager, sse_handler, push_manager, push_status_queue)
116118
sync_manager.start
117119

118120
sleep(2)

spec/sse/event_source/client_spec.rb

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
let(:segments_worker) { SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, config, repositories[:segments]) }
3434
let(:notification_manager_keeper) { SplitIoClient::SSE::NotificationManagerKeeper.new(config, telemetry_runtime_producer) }
3535
let(:notification_processor) { SplitIoClient::SSE::NotificationProcessor.new(config, splits_worker, segments_worker) }
36+
let(:push_status_queue) { Queue.new }
3637

3738
let(:event_split_update) { "fb\r\nid: 123\nevent: message\ndata: {\"id\":\"1\",\"clientId\":\"emptyClientId\",\"connectionId\":\"1\",\"timestamp\":1582045421733,\"channel\":\"channel-test\",\"data\":\"{\\\"type\\\" : \\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\": 5564531221}\",\"name\":\"asdasd\"}\n\n\r\n" }
3839
let(:event_split_kill) { "fb\r\nid: 123\nevent: message\ndata: {\"id\":\"1\",\"clientId\":\"emptyClientId\",\"connectionId\":\"1\",\"timestamp\":1582045421733,\"channel\":\"channel-test\",\"data\":\"{\\\"type\\\" : \\\"SPLIT_KILL\\\",\\\"changeNumber\\\": 5564531221, \\\"defaultTreatment\\\" : \\\"off\\\", \\\"splitName\\\" : \\\"split-test\\\"}\",\"name\":\"asdasd\"}\n\n\r\n" }
@@ -49,7 +50,7 @@
4950
end
5051
event_queue = Queue.new
5152
action_event = ''
52-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
53+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
5354

5455
connected = sse_client.start(server.base_uri)
5556
expect(connected).to eq(true)
@@ -78,7 +79,7 @@
7879

7980
event_queue = Queue.new
8081
action_event = ''
81-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
82+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
8283

8384
connected = sse_client.start(server.base_uri)
8485
expect(connected).to eq(true)
@@ -109,7 +110,7 @@
109110

110111
event_queue = Queue.new
111112
action_event = ''
112-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
113+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
113114

114115
connected = sse_client.start(server.base_uri)
115116
expect(connected).to eq(true)
@@ -139,7 +140,7 @@
139140

140141
event_queue = Queue.new
141142
action_event = ''
142-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
143+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
143144

144145
connected = sse_client.start(server.base_uri)
145146
expect(connected).to eq(true)
@@ -168,7 +169,7 @@
168169

169170
event_queue = Queue.new
170171
action_event = ''
171-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
172+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
172173

173174
connected = sse_client.start(server.base_uri)
174175
expect(connected).to eq(true)
@@ -192,7 +193,7 @@
192193

193194
event_queue = Queue.new
194195
action_event = ''
195-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
196+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
196197

197198
connected = sse_client.start(server.base_uri)
198199
expect(connected).to eq(true)
@@ -219,7 +220,7 @@
219220
end
220221

221222
event_queue = Queue.new
222-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
223+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
223224

224225
connected = sse_client.start(server.base_uri)
225226

@@ -240,7 +241,7 @@
240241
send_stream_content(res, event_error, 400)
241242
end
242243

243-
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor)
244+
sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)
244245

245246
connected = sse_client.start(server.base_uri)
246247
expect(connected).to eq(false)

0 commit comments

Comments
 (0)