Skip to content

Commit a47a558

Browse files
committed
fixed sse connection and polishing streaming flow
1 parent 5328dc1 commit a47a558

15 files changed

Lines changed: 66 additions & 67 deletions

File tree

lib/splitclient-rb/engine/push_manager.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def start_sse
3434
end
3535

3636
def stop_sse
37-
@sse_handler.process_disconnect if @sse_handler.sse_client.nil?
3837
@sse_handler.stop
3938
SplitIoClient::Helpers::ThreadHelper.stop(:schedule_next_token_refresh, @config)
4039
end

lib/splitclient-rb/engine/sync_manager.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def process_disconnect(reconnect)
136136
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)
137137

138138
if reconnect
139+
@push_manager.stop_sse
139140
@synchronizer.sync_all
140141
@push_manager.start_sse
141142
end

lib/splitclient-rb/helpers/thread_helper.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ def self.stop(thread_sym, config)
88

99
unless thread.nil?
1010
config.logger.debug("Stopping #{thread_sym} thread...") if config.debug_enabled
11-
sleep(0.1) while thread.status == 'run'
1211
Thread.kill(thread)
1312
end
1413
rescue StandardError => e

lib/splitclient-rb/sse/event_source/client.rb

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ def on_action(&action)
3535
@on[:action] = action
3636
end
3737

38-
def close(action = Constants::PUSH_NONRETRYABLE_ERROR)
39-
dispatch_action(action)
38+
def close(action = nil)
39+
unless connected?
40+
@config.logger.error('SSEClient already disconected.') if @config.debug_enabled
41+
return
42+
end
43+
4044
@connected.make_false
41-
SplitIoClient::Helpers::ThreadHelper.stop(:connect_stream, @config)
4245
@socket&.close
46+
dispatch_action(action) unless action.nil?
4347
rescue StandardError => e
4448
@config.logger.error("SSEClient close Error: #{e.inspect}")
4549
end
@@ -72,12 +76,14 @@ def connected?
7276
def connect_thread(latch)
7377
@config.threads[:connect_stream] = Thread.new do
7478
@config.logger.info('Starting connect_stream thread ...') if @config.debug_enabled
75-
connect_stream(latch)
79+
action = connect_stream(latch)
80+
dispatch_action(action) unless action.nil?
81+
@config.logger.info('connect_stream thread finished.') if @config.debug_enabled
7682
end
7783
end
7884

7985
def connect_stream(latch)
80-
socket_write(latch)
86+
return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch)
8187

8288
while connected? || @first_event.value
8389
begin
@@ -86,24 +92,30 @@ def connect_stream(latch)
8692
read_first_event(partial_data, latch)
8793

8894
raise 'eof exception' if partial_data == :eof
95+
rescue Errno::EBADF, IOError => e
96+
@config.logger.error(e.inspect)
97+
return nil
8998
rescue StandardError => e
90-
@config.logger.error('Error reading partial data: ' + e.inspect) if @config.debug_enabled
91-
close(Constants::PUSH_RETRYABLE_ERROR)
92-
return
99+
return nil if ENV['SPLITCLIENT_ENV'] == 'test'
100+
101+
@config.logger.error("Error reading partial data: #{e.inspect}") if @config.debug_enabled
102+
return Constants::PUSH_RETRYABLE_ERROR
93103
end
94104

95105
process_data(partial_data)
96106
end
107+
nil
97108
end
98109

99110
def socket_write(latch)
100111
@first_event.make_true
101112
@socket = socket_connect
102113
@socket.write(build_request(@uri))
114+
true
103115
rescue StandardError => e
104116
@config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}")
105-
close(Constants::PUSH_NONRETRYABLE_ERROR)
106117
latch.count_down
118+
false
107119
end
108120

109121
def read_first_event(data, latch)
@@ -181,8 +193,10 @@ def dispatch_event(event)
181193
end
182194

183195
def dispatch_action(action)
184-
@config.logger.debug("Dispatching action: #{action}") if @config.debug_enabled
185-
@on[:action].call(action)
196+
@config.threads[:dispatch_action] = Thread.new do
197+
@config.logger.debug("Dispatching action: #{action}") if @config.debug_enabled
198+
@on[:action].call(action)
199+
end
186200
end
187201
end
188202
end

lib/splitclient-rb/sse/sse_handler.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def start(token_jwt, channels)
3131
end
3232

3333
def stop
34-
@sse_client.close
34+
@sse_client.close(Constants::PUSH_NONRETRYABLE_ERROR)
3535
stop_workers
3636
rescue StandardError => e
3737
@config.logger.debug("SSEHandler stop error: #{e.inspect}") if @config.debug_enabled

lib/splitclient-rb/sse/workers/segments_worker.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ def initialize(synchronizer, config, segments_repository)
1313
end
1414

1515
def add_to_queue(change_number, segment_name)
16-
unless @running.value
17-
@config.logger.debug('segments worker not running.')
18-
return
19-
end
20-
2116
item = { change_number: change_number, segment_name: segment_name }
2217
@config.logger.debug("SegmentsWorker add to queue #{item}")
2318
@queue.push(item)

lib/splitclient-rb/sse/workers/splits_worker.rb

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,11 @@ def stop
3333
end
3434

3535
def add_to_queue(change_number)
36-
unless @running.value
37-
@config.logger.debug('splits worker not running.')
38-
return
39-
end
40-
4136
@config.logger.debug("SplitsWorker add to queue #{change_number}")
4237
@queue.push(change_number)
4338
end
4439

4540
def kill_split(change_number, split_name, default_treatment)
46-
unless @running.value
47-
@config.logger.debug('splits worker not running.')
48-
return
49-
end
50-
5141
return if @splits_repository.get_change_number.to_i > change_number
5242

5343
@config.logger.debug("SplitsWorker kill #{split_name}, #{change_number}")

lib/splitclient-rb/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module SplitIoClient
2-
VERSION = '7.3.3.pre.rc8'
2+
VERSION = '7.3.3.pre.rc18'
33
end

spec/engine/push_manager_spec.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@
9393

9494
expect(connected).to eq(false)
9595
expect(sse_handler.connected?).to eq(false)
96-
expect(action_event).to eq(SplitIoClient::Constants::PUSH_NONRETRYABLE_ERROR)
9796
end
9897

9998
it 'must not connect to server. Auth server return 401' do
@@ -119,7 +118,6 @@
119118

120119
expect(connected).to eq(false)
121120
expect(sse_handler.connected?).to eq(false)
122-
expect(action_event).to eq(SplitIoClient::Constants::PUSH_NONRETRYABLE_ERROR)
123121
end
124122
end
125123

spec/engine/sync_manager_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
sleep(2)
7575
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once
7676

77-
expect(config.threads.size).to eq(10)
77+
expect(config.threads.size).to eq(11)
7878
end
7979
end
8080

@@ -93,7 +93,7 @@
9393
sleep(2)
9494
expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=-1')).to have_been_made.once
9595

96-
expect(config.threads.size).to eq(6)
96+
expect(config.threads.size).to eq(7)
9797
end
9898
end
9999

0 commit comments

Comments
 (0)