Skip to content

Commit 8a8622c

Browse files
committed
removed streaming events logic.
1 parent 1a654df commit 8a8622c

10 files changed

Lines changed: 163 additions & 225 deletions

File tree

.rubocop.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Metrics/CyclomaticComplexity:
2020
Max: 8
2121

2222
Metrics/ParameterLists:
23+
Max: 8
2324
Exclude:
2425
- lib/splitclient-rb/engine/sync_manager.rb
2526

@@ -32,6 +33,7 @@ Metrics/LineLength:
3233
- spec/engine/auth_api_client_spec.rb
3334
- spec/telemetry/synchronizer_spec.rb
3435
- spec/splitclient/split_config_spec.rb
36+
- spec/engine/push_manager_spec.rb
3537

3638
Style/BracesAroundHashParameters:
3739
Exclude:

lib/splitclient-rb/engine/sync_manager.rb

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,21 @@ class SyncManager
66
SYNC_MODE_STREAMING = 0
77
SYNC_MODE_POLLING = 1
88

9-
def initialize(
10-
repositories,
11-
api_key,
12-
config,
13-
synchronizer,
14-
telemetry_runtime_producer,
15-
telemetry_synchronizer,
16-
status_manager
17-
)
18-
@synchronizer = synchronizer
19-
notification_manager_keeper = SSE::NotificationManagerKeeper.new(config, telemetry_runtime_producer) do |manager|
20-
manager.on_action { |action| process_action(action) }
21-
end
22-
@sse_handler = SSE::SSEHandler.new(
23-
{ config: config, api_key: api_key },
24-
@synchronizer,
25-
repositories,
26-
notification_manager_keeper,
27-
telemetry_runtime_producer
28-
) do |handler|
29-
handler.on_action { |action| process_action(action) }
30-
end
31-
32-
@push_manager = PushManager.new(config, @sse_handler, api_key, telemetry_runtime_producer)
33-
@sse_connected = Concurrent::AtomicBoolean.new(false)
9+
def initialize(config,
10+
synchronizer,
11+
telemetry_runtime_producer,
12+
telemetry_synchronizer,
13+
status_manager,
14+
sse_handler,
15+
push_manager)
3416
@config = config
17+
@synchronizer = synchronizer
3518
@telemetry_runtime_producer = telemetry_runtime_producer
3619
@telemetry_synchronizer = telemetry_synchronizer
3720
@status_manager = status_manager
21+
@sse_handler = sse_handler
22+
@push_manager = push_manager
23+
@sse_connected = Concurrent::AtomicBoolean.new(false)
3824
end
3925

4026
def start

lib/splitclient-rb/split_factory.rb

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,10 @@ def initialize(api_key, config_hash = {})
3333
register_factory
3434

3535
build_telemetry_components
36+
build_repositories
37+
build_impressions_components
38+
build_telemetry_synchronizer
3639

37-
@splits_repository = SplitsRepository.new(@config)
38-
@segments_repository = SegmentsRepository.new(@config)
39-
@impressions_repository = ImpressionsRepository.new(@config)
40-
@events_repository = EventsRepository.new(@config, @api_key, @runtime_producer)
41-
@impression_counter = SplitIoClient::Engine::Common::ImpressionCounter.new
42-
@impressions_manager = SplitIoClient::Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer)
43-
@telemetry_api = SplitIoClient::Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
44-
@telemetry_synchronizer = Telemetry::Synchronizer.new(@config, @telemetry_consumers, @init_producer, repositories, @telemetry_api)
4540
@status_manager = Engine::StatusManager.new(@config)
4641

4742
start!
@@ -59,18 +54,12 @@ def start!
5954
return
6055
end
6156

62-
split_fetcher = SplitFetcher.new(@splits_repository, @api_key, config, @runtime_producer)
63-
segment_fetcher = SegmentFetcher.new(@segments_repository, @api_key, config, @runtime_producer)
64-
params = {
65-
split_fetcher: split_fetcher,
66-
segment_fetcher: segment_fetcher,
67-
imp_counter: @impression_counter,
68-
telemetry_runtime_producer: @runtime_producer,
69-
telemetry_synchronizer: @telemetry_synchronizer
70-
}
57+
build_fetchers
58+
build_synchronizer
59+
build_streaming_components
60+
build_sync_manager
7161

72-
synchronizer = SplitIoClient::Engine::Synchronizer.new(repositories, @api_key, @config, params)
73-
SplitIoClient::Engine::SyncManager.new(repositories, @api_key, @config, synchronizer, @runtime_producer, @telemetry_synchronizer, @status_manager).start
62+
@sync_manager.start
7463
end
7564

7665
def stop!
@@ -166,6 +155,55 @@ def build_telemetry_components
166155
@runtime_producer = Telemetry::RuntimeProducer.new(@config)
167156

168157
@telemetry_consumers = { init: @init_consumer, evaluation: @evaluation_consumer, runtime: @runtime_consumer }
169-
end
158+
end
159+
160+
def build_fetchers
161+
@split_fetcher = SplitFetcher.new(@splits_repository, @api_key, @config, @runtime_producer)
162+
@segment_fetcher = SegmentFetcher.new(@segments_repository, @api_key, @config, @runtime_producer)
163+
end
164+
165+
def build_synchronizer
166+
params = {
167+
split_fetcher: @split_fetcher,
168+
segment_fetcher: @segment_fetcher,
169+
imp_counter: @impression_counter,
170+
telemetry_runtime_producer: @runtime_producer,
171+
telemetry_synchronizer: @telemetry_synchronizer
172+
}
173+
174+
@synchronizer = SplitIoClient::Engine::Synchronizer.new(repositories, @api_key, @config, params)
175+
end
176+
177+
def build_sync_manager
178+
@sync_manager = SplitIoClient::Engine::SyncManager.new(@config, @synchronizer, @runtime_producer, @telemetry_synchronizer, @status_manager, @sse_handler, @push_manager)
179+
end
180+
181+
def build_streaming_components
182+
splits_worker = SplitIoClient::SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository)
183+
segments_worker = SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, @config, @segments_repository)
184+
notification_manager_keeper = SSE::NotificationManagerKeeper.new(@config, @runtime_producer)
185+
notification_processor = SplitIoClient::SSE::NotificationProcessor.new(@config, @splits_worker, @segments_worker)
186+
event_parser = SSE::EventSource::EventParser.new(config)
187+
sse_client = SSE::EventSource::Client.new(@config, @api_key, @runtime_producer, event_parser, notification_manager_keeper, notification_processor)
188+
@sse_handler = SSE::SSEHandler.new(@config, splits_worker, segments_worker, sse_client)
189+
@push_manager = PushManager.new(@config, @sse_handler, @api_key, @runtime_producer)
190+
end
191+
192+
def build_repositories
193+
@splits_repository = SplitsRepository.new(@config)
194+
@segments_repository = SegmentsRepository.new(@config)
195+
@impressions_repository = ImpressionsRepository.new(@config)
196+
@events_repository = EventsRepository.new(@config, @api_key, @runtime_producer)
197+
end
198+
199+
def build_telemetry_synchronizer
200+
telemetry_api = SplitIoClient::Api::TelemetryApi.new(@config, @api_key, @runtime_producer)
201+
@telemetry_synchronizer = Telemetry::Synchronizer.new(@config, @telemetry_consumers, @init_producer, repositories, telemetry_api)
202+
end
203+
204+
def build_impressions_components
205+
@impression_counter = SplitIoClient::Engine::Common::ImpressionCounter.new
206+
@impressions_manager = SplitIoClient::Engine::Common::ImpressionManager.new(@config, @impressions_repository, @impression_counter, @runtime_producer)
207+
end
170208
end
171209
end

lib/splitclient-rb/sse/event_source/client.rb

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,23 @@ class Client
1313
KEEP_ALIVE_RESPONSE = "c\r\n:keepalive\n\n\r\n".freeze
1414
ERROR_EVENT_TYPE = 'error'.freeze
1515

16-
def initialize(config, api_key, telemetry_runtime_producer, read_timeout: DEFAULT_READ_TIMEOUT)
16+
def initialize(config,
17+
api_key,
18+
telemetry_runtime_producer,
19+
event_parser,
20+
notification_manager_keeper,
21+
notification_processor,
22+
read_timeout: DEFAULT_READ_TIMEOUT)
1723
@config = config
24+
@api_key = api_key
25+
@telemetry_runtime_producer = telemetry_runtime_producer
26+
@event_parser = event_parser
27+
@notification_manager_keeper = notification_manager_keeper
28+
@notification_processor = notification_processor
1829
@read_timeout = read_timeout
1930
@connected = Concurrent::AtomicBoolean.new(false)
2031
@first_event = Concurrent::AtomicBoolean.new(true)
2132
@socket = nil
22-
@event_parser = SSE::EventSource::EventParser.new(config)
23-
@on = { event: ->(_) {}, action: ->(_) {} }
24-
@api_key = api_key
25-
@telemetry_runtime_producer = telemetry_runtime_producer
26-
27-
yield self if block_given?
28-
end
29-
30-
def on_event(&action)
31-
@on[:event] = action
32-
end
33-
34-
def on_action(&action)
35-
@on[:action] = action
3633
end
3734

3835
def close(action = nil)
@@ -188,15 +185,15 @@ def dispatch_error(event)
188185
end
189186

190187
def dispatch_event(event)
191-
@config.logger.debug("Dispatching event: #{event.event_type}, #{event.channel}") if @config.debug_enabled
192-
@on[:event].call(event)
188+
if event.occupancy?
189+
@notification_manager_keeper.handle_incoming_occupancy_event(event)
190+
else
191+
@notification_processor.process(event)
192+
end
193193
end
194194

195195
def dispatch_action(action)
196-
@config.threads[:dispatch_action] = Thread.new do
197-
@config.logger.debug("Dispatching action: #{action}") if @config.debug_enabled
198-
@on[:action].call(action)
199-
end
196+
# TODO: will use status queue here.
200197
end
201198
end
202199
end

lib/splitclient-rb/sse/notification_manager_keeper.rb

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@ def initialize(config, telemetry_runtime_producer)
1414
@publisher_available = Concurrent::AtomicBoolean.new(true)
1515
@publishers_pri = Concurrent::AtomicFixnum.new
1616
@publishers_sec = Concurrent::AtomicFixnum.new
17-
@on = { action: ->(_) {} }
1817
@telemetry_runtime_producer = telemetry_runtime_producer
19-
20-
yield self if block_given?
2118
end
2219

2320
def handle_incoming_occupancy_event(event)
@@ -27,14 +24,9 @@ def handle_incoming_occupancy_event(event)
2724
process_event_occupancy(event.channel, event.data['metrics']['publishers'])
2825
end
2926
rescue StandardError => e
30-
p e
3127
@config.logger.error(e)
3228
end
3329

34-
def on_action(&action)
35-
@on[:action] = action
36-
end
37-
3830
private
3931

4032
def process_event_control(type)
@@ -83,7 +75,7 @@ def are_publishers_available?
8375

8476
def dispatch_action(action)
8577
@config.logger.debug("Dispatching action: #{action}")
86-
@on[:action].call(action)
78+
# TODO: will use status queue here.
8779
end
8880
end
8981
end

lib/splitclient-rb/sse/sse_handler.rb

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,14 @@ module SSE
55
class SSEHandler
66
attr_reader :sse_client
77

8-
def initialize(metadata,
9-
synchronizer,
10-
repositories,
11-
notification_manager_keeper,
12-
telemetry_runtime_producer)
13-
@config = metadata[:config]
14-
@notification_manager_keeper = notification_manager_keeper
15-
@splits_worker = SplitIoClient::SSE::Workers::SplitsWorker.new(synchronizer, @config, repositories[:splits])
16-
@segments_worker = SplitIoClient::SSE::Workers::SegmentsWorker.new(synchronizer, @config, repositories[:segments])
17-
@notification_processor = SplitIoClient::SSE::NotificationProcessor.new(@config, @splits_worker, @segments_worker)
18-
@sse_client = SSE::EventSource::Client.new(@config, metadata[:api_key], telemetry_runtime_producer) do |client|
19-
client.on_event { |event| handle_incoming_message(event) }
20-
client.on_action { |action| process_action(action) }
21-
end
22-
23-
@on = { action: ->(_) {} }
24-
25-
yield self if block_given?
8+
def initialize(config,
9+
splits_worker,
10+
segments_worker,
11+
sse_client)
12+
@config = config
13+
@splits_worker = splits_worker
14+
@segments_worker = segments_worker
15+
@sse_client = sse_client
2616
end
2717

2818
def start(token_jwt, channels)
@@ -50,24 +40,6 @@ def stop_workers
5040
@splits_worker.stop
5141
@segments_worker.stop
5242
end
53-
54-
def on_action(&action)
55-
@on[:action] = action
56-
end
57-
58-
private
59-
60-
def process_action(action)
61-
@on[:action].call(action)
62-
end
63-
64-
def handle_incoming_message(notification)
65-
if notification.occupancy?
66-
@notification_manager_keeper.handle_incoming_occupancy_event(notification)
67-
else
68-
@notification_processor.process(notification)
69-
end
70-
end
7143
end
7244
end
7345
end

0 commit comments

Comments
 (0)