@@ -52,7 +52,6 @@ public class PushManagerImp implements PushManager {
5252 private final ScheduledExecutorService _scheduledExecutorService ;
5353 private AtomicLong _expirationTime ;
5454 private final TelemetryRuntimeProducer _telemetryRuntimeProducer ;
55- private final int _streamingTokenRefreshRate ;
5655
5756 @ VisibleForTesting
5857 /* package private */ PushManagerImp (AuthApiClient authApiClient ,
@@ -61,8 +60,7 @@ public class PushManagerImp implements PushManager {
6160 Worker <SegmentQueueDto > segmentWorker ,
6261 PushStatusTracker pushStatusTracker ,
6362 TelemetryRuntimeProducer telemetryRuntimeProducer ,
64- ThreadFactory threadFactory ,
65- int streamingTokenRefreshRate ) {
63+ ThreadFactory threadFactory ) {
6664
6765 _authApiClient = checkNotNull (authApiClient );
6866 _eventSourceClient = checkNotNull (eventSourceClient );
@@ -72,7 +70,6 @@ public class PushManagerImp implements PushManager {
7270 _expirationTime = new AtomicLong ();
7371 _scheduledExecutorService = buildSingleThreadScheduledExecutor (threadFactory , "Split-SSERefreshToken-%d" );
7472 _telemetryRuntimeProducer = checkNotNull (telemetryRuntimeProducer );
75- _streamingTokenRefreshRate = streamingTokenRefreshRate ;
7673 }
7774
7875 public static PushManagerImp build (Synchronizer synchronizer ,
@@ -86,8 +83,7 @@ public static PushManagerImp build(Synchronizer synchronizer,
8683 SplitCacheProducer splitCacheProducer ,
8784 FlagSetsFilter flagSetsFilter ,
8885 RuleBasedSegmentCache ruleBasedSegmentCache ,
89- RuleBasedSegmentParser ruleBasedSegmentParser ,
90- int streamingTokenRefreshRate ) {
86+ RuleBasedSegmentParser ruleBasedSegmentParser ) {
9187 FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp (synchronizer , splitParser , ruleBasedSegmentParser , splitCacheProducer ,
9288 ruleBasedSegmentCache , telemetryRuntimeProducer , flagSetsFilter );
9389 Worker <SegmentQueueDto > segmentWorker = new SegmentsWorkerImp (synchronizer );
@@ -100,26 +96,23 @@ public static PushManagerImp build(Synchronizer synchronizer,
10096 segmentWorker ,
10197 pushStatusTracker ,
10298 telemetryRuntimeProducer ,
103- threadFactory ,
104- streamingTokenRefreshRate );
99+ threadFactory );
105100 }
106101
107102 @ Override
108103 public void start () {
109- _log .debug ("#1 - Start PushManagerImp" );
110104 try {
111105 lock .lock ();
112106 AuthenticationResponse response = _authApiClient .Authenticate ();
113107 _log .debug (String .format ("Auth service response pushEnabled: %s" , response .isPushEnabled ()));
114108 if (response .isPushEnabled () && startSse (response .getToken (), response .getChannels ())) {
115109 _log .debug ("#2 - PushManagerImp connected" );
116- _expirationTime .set (_streamingTokenRefreshRate );
110+ _expirationTime .set (response . getExpiration () );
117111 _telemetryRuntimeProducer .recordStreamingEvents (new StreamingEvent (StreamEventsEnum .TOKEN_REFRESH .getType (),
118112 response .getExpiration (), System .currentTimeMillis ()));
119113 return ;
120114 }
121115
122- _log .debug ("#3 - PushManagerImp error" );
123116 cleanUpResources ();
124117 if (response .isRetry ()) {
125118 _log .debug (String .format ("Handling retry error response" ));
@@ -128,7 +121,6 @@ public void start() {
128121 _log .debug (String .format ("Auth service response is disabled: %s" , response .getToken ()));
129122 _pushStatusTracker .forcePushDisable ();
130123 }
131- _log .debug ("#4 - PushManagerImp error" );
132124 } catch (Exception e ) {
133125 _log .debug ("Exception in PushManager start: " + e .getMessage ());
134126 } finally {
@@ -138,11 +130,9 @@ public void start() {
138130
139131 @ Override
140132 public void stop () {
141- _log .debug ("#1 - Stopping PushManagerImp" );
142133 try {
143134 lock .lock ();
144135 cleanUpResources ();
145- _log .debug ("#2 - Stopped PushManagerImp" );
146136 } catch (Exception e ) {
147137 _log .debug ("Exception in stopping push manager: " + e .getMessage ());
148138 } finally {
@@ -154,11 +144,8 @@ public void stop() {
154144 public void scheduleConnectionReset () {
155145 _log .debug (String .format ("scheduleNextTokenRefresh in %s SECONDS" , _expirationTime ));
156146 _nextTokenRefreshTask = _scheduledExecutorService .schedule (() -> {
157- _log .debug ("#1 - Starting scheduleNextTokenRefresh ..." );
158147 stop ();
159- _log .debug ("#2 - Finished to stop all streaming engine" );
160148 start ();
161- _log .debug ("#3 - Finished to start streaming connection" );
162149 }, _expirationTime .get (), TimeUnit .SECONDS );
163150 }
164151
@@ -175,9 +162,7 @@ private boolean startSse(String token, String channels) {
175162 @ Override
176163 public void startWorkers () {
177164 try {
178- _log .debug ("Starting featureflag worker" );
179165 _featureFlagsWorker .start ();
180- _log .debug ("Starting segment worker" );
181166 _segmentWorker .start ();
182167 } catch (Exception e ) {
183168 _log .debug ("Exception in starting workers: " + e .getMessage ());
@@ -187,25 +172,18 @@ public void startWorkers() {
187172 @ Override
188173 public void stopWorkers () {
189174 try {
190- _log .debug ("Stopping featureflag worker" );
191175 _featureFlagsWorker .stop ();
192- _log .debug ("Stopping segment worker" );
193176 _segmentWorker .stop ();
194177 } catch (Exception e ) {
195178 _log .debug ("Exception in stopping workers: " + e .getMessage ());
196179 }
197180 }
198181
199182 private void cleanUpResources () {
200- _log .debug ("Starting cleanUpResources - #1" );
201183 _eventSourceClient .stop ();
202- _log .debug ("cleanUpResources - #2" );
203184 stopWorkers ();
204185 if (_nextTokenRefreshTask != null ) {
205- _log .debug ("Cancel nextTokenRefreshTask" );
206186 _nextTokenRefreshTask .cancel (false );
207- _log .debug ("Finished cleanUpResources - #3 - Finished cancel nextTokenRefreshTask" );
208187 }
209- _log .debug ("Finished cleanUpResources - #4" );
210188 }
211189}
0 commit comments