@@ -11,8 +11,7 @@ def initialize(
1111 )
1212 @synchronizer = synchronizer
1313 notification_manager_keeper = SplitIoClient ::SSE ::NotificationManagerKeeper . new ( config ) do |manager |
14- manager . on_occupancy { |publisher_available | process_occupancy ( publisher_available ) }
15- manager . on_push_shutdown { process_push_shutdown }
14+ manager . on_action { |action | process_action ( action ) }
1615 end
1716 @sse_handler = SplitIoClient ::SSE ::SSEHandler . new (
1817 config ,
@@ -21,8 +20,7 @@ def initialize(
2120 repositories [ :segments ] ,
2221 notification_manager_keeper
2322 ) do |handler |
24- handler . on_connected { process_connected }
25- handler . on_disconnect { |reconnect | process_disconnect ( reconnect ) }
23+ handler . on_action { |action | process_action ( action ) }
2624 end
2725
2826 @push_manager = PushManager . new ( config , @sse_handler , api_key )
@@ -44,10 +42,10 @@ def start
4442 # Starts tasks if stream is enabled.
4543 def start_stream
4644 @config . logger . debug ( 'Starting push mode ...' )
47- stream_start_thread
45+ sync_all_thread
4846 @synchronizer . start_periodic_data_recording
4947
50- stream_start_sse_thread
48+ start_sse_connection_thread
5149 end
5250
5351 def start_poll
@@ -59,23 +57,24 @@ def start_poll
5957 end
6058
6159 # Starts thread which fetch splits and segments once and trigger task to periodic data recording.
62- def stream_start_thread
60+ def sync_all_thread
6361 @config . threads [ :sync_manager_start_stream ] = Thread . new do
6462 begin
6563 @synchronizer . sync_all
6664 rescue StandardError => e
67- @config . logger . error ( "stream_start_thread error : #{ e . inspect } " )
65+ @config . logger . error ( "sync_all_thread error : #{ e . inspect } " )
6866 end
6967 end
7068 end
7169
7270 # Starts thread which connect to sse and after that fetch splits and segments once.
73- def stream_start_sse_thread
71+ def start_sse_connection_thread
7472 @config . threads [ :sync_manager_start_sse ] = Thread . new do
7573 begin
76- @push_manager . start_sse
74+ connected = @push_manager . start_sse
75+ @synchronizer . start_periodic_fetch unless connected
7776 rescue StandardError => e
78- @config . logger . error ( "stream_start_sse_thread error : #{ e . inspect } " )
77+ @config . logger . error ( "start_sse_connection_thread error : #{ e . inspect } " )
7978 end
8079 end
8180 end
@@ -84,6 +83,46 @@ def start_stream_forked
8483 PhusionPassenger . on_event ( :starting_worker_process ) { |forked | start_stream if forked }
8584 end
8685
86+ def process_action ( action )
87+ case action
88+ when Constants ::PUSH_CONNECTED
89+ process_connected
90+ when Constants ::PUSH_RETRYABLE_ERROR
91+ process_disconnect ( true )
92+ when Constants ::PUSH_NONRETRYABLE_ERROR
93+ process_disconnect ( false )
94+ when Constants ::PUSH_SUBSYSTEM_DOWN
95+ process_subsystem_down
96+ when Constants ::PUSH_SUBSYSTEM_READY
97+ process_subsystem_ready
98+ when Constants ::PUSH_SUBSYSTEM_OFF
99+ process_push_shutdown
100+ else
101+ @config . logger . debug ( 'Incorrect action type.' )
102+ end
103+ rescue StandardError => e
104+ @config . logger . error ( "process_action error: #{ e . inspect } " )
105+ end
106+
107+ def process_subsystem_ready
108+ @synchronizer . stop_periodic_fetch
109+ @synchronizer . sync_all
110+ @sse_handler . start_workers
111+ end
112+
113+ def process_subsystem_down
114+ @sse_handler . stop_workers
115+ @synchronizer . start_periodic_fetch
116+ end
117+
118+ def process_push_shutdown
119+ @push_manager . stop_sse
120+ @sse_handler . stop_workers
121+ @synchronizer . start_periodic_fetch
122+ rescue StandardError => e
123+ @config . logger . error ( "process_push_shutdown error: #{ e . inspect } " )
124+ end
125+
87126 def process_connected
88127 if @sse_connected . value
89128 @config . logger . debug ( 'Streaming already connected.' )
@@ -115,28 +154,6 @@ def process_disconnect(reconnect)
115154 rescue StandardError => e
116155 @config . logger . error ( "process_disconnect error: #{ e . inspect } " )
117156 end
118-
119- def process_occupancy ( push_enable )
120- if push_enable
121- @synchronizer . stop_periodic_fetch
122- @synchronizer . sync_all
123- @sse_handler . start_workers
124- return
125- end
126-
127- @sse_handler . stop_workers
128- @synchronizer . start_periodic_fetch
129- rescue StandardError => e
130- @config . logger . error ( "process_occupancy error: #{ e . inspect } " )
131- end
132-
133- def process_push_shutdown
134- @push_manager . stop_sse
135- @sse_handler . stop_workers
136- @synchronizer . start_periodic_fetch
137- rescue StandardError => e
138- @config . logger . error ( "process_push_shutdown error: #{ e . inspect } " )
139- end
140157 end
141158 end
142159end
0 commit comments