@@ -13,11 +13,19 @@ public class RedisPubSubServer : IRedisPubSubServer
1313 private DateTime serverTimeAtStart ;
1414 private Stopwatch startedAt ;
1515
16+ public TimeSpan ? HeartbeatInterval = TimeSpan . FromSeconds ( 10 ) ;
17+ public TimeSpan HeartbeatTimeout = TimeSpan . FromSeconds ( 30 ) ;
18+ private long lastHeartbeatTicks ;
19+ private Timer heartbeatTimer ;
20+
1621 public Action OnInit { get ; set ; }
1722 public Action OnStart { get ; set ; }
23+ public Action OnHeartbeatSent { get ; set ; }
24+ public Action OnHeartbeatReceived { get ; set ; }
1825 public Action OnStop { get ; set ; }
1926 public Action OnDispose { get ; set ; }
2027 public Action < string , string > OnMessage { get ; set ; }
28+ public Action < string > OnControlCommand { get ; set ; }
2129 public Action < string > OnUnSubscribe { get ; set ; }
2230 public Action < Exception > OnError { get ; set ; }
2331 public Action < IRedisPubSubServer > OnFailover { get ; set ; }
@@ -34,6 +42,10 @@ public class RedisPubSubServer : IRedisPubSubServer
3442 private int status ;
3543 private Thread bgThread ; //Subscription controller thread
3644 private long bgThreadCount = 0 ;
45+ private int autoRestart = YES ;
46+
47+ private const int NO = 0 ;
48+ private const int YES = 1 ;
3749
3850 public DateTime CurrentServerTime
3951 {
@@ -62,6 +74,8 @@ public RedisPubSubServer(IRedisClientsManager clientsManager, params string[] ch
6274
6375 public IRedisPubSubServer Start ( )
6476 {
77+ Interlocked . CompareExchange ( ref autoRestart , 0 , autoRestart ) ;
78+
6579 if ( Interlocked . CompareExchange ( ref status , 0 , 0 ) == Status . Started )
6680 {
6781 //Start any stopped worker threads
@@ -122,10 +136,63 @@ private void Init()
122136 startedAt = Stopwatch . StartNew ( ) ;
123137 }
124138
139+ DisposeHeartbeatTimer ( ) ;
140+
141+ if ( HeartbeatInterval != null )
142+ {
143+ heartbeatTimer = new Timer ( SendHeartbeat , null ,
144+ TimeSpan . FromMilliseconds ( 0 ) , HeartbeatInterval . Value ) ;
145+ }
146+
147+ Interlocked . CompareExchange ( ref lastHeartbeatTicks , DateTime . UtcNow . Ticks , lastHeartbeatTicks ) ;
148+
125149 if ( OnInit != null )
126150 OnInit ( ) ;
127151 }
128152
153+ void SendHeartbeat ( object state )
154+ {
155+ if ( OnHeartbeatSent != null )
156+ OnHeartbeatSent ( ) ;
157+
158+ if ( Interlocked . CompareExchange ( ref status , 0 , 0 ) != Status . Started )
159+ return ;
160+
161+ NotifyAllSubscribers ( ControlCommand . Pulse ) ;
162+
163+ if ( DateTime . UtcNow - new DateTime ( lastHeartbeatTicks ) > HeartbeatTimeout )
164+ {
165+ if ( Interlocked . CompareExchange ( ref status , 0 , 0 ) == Status . Started )
166+ {
167+ Restart ( ) ;
168+ }
169+ }
170+ }
171+
172+ void Pulse ( )
173+ {
174+ Interlocked . CompareExchange ( ref lastHeartbeatTicks , DateTime . UtcNow . Ticks , lastHeartbeatTicks ) ;
175+
176+ if ( OnHeartbeatReceived != null )
177+ OnHeartbeatReceived ( ) ;
178+ }
179+
180+ private void DisposeHeartbeatTimer ( )
181+ {
182+ if ( heartbeatTimer == null )
183+ return ;
184+
185+ try
186+ {
187+ heartbeatTimer . Dispose ( ) ;
188+ }
189+ catch ( Exception ex )
190+ {
191+ if ( this . OnError != null ) this . OnError ( ex ) ;
192+ }
193+ heartbeatTimer = null ;
194+ }
195+
129196 private IRedisClient masterClient ;
130197 private void RunLoop ( )
131198 {
@@ -150,9 +217,21 @@ private void RunLoop()
150217
151218 subscription . OnMessage = ( channel , msg ) =>
152219 {
153- if ( msg == Operation . ControlCommand )
220+ if ( string . IsNullOrEmpty ( msg ) )
221+ return ;
222+
223+ var ctrlMsg = msg . SplitOnFirst ( ':' ) ;
224+ if ( ctrlMsg [ 0 ] == ControlCommand . Control )
154225 {
155226 var op = Interlocked . CompareExchange ( ref doOperation , Operation . NoOp , doOperation ) ;
227+
228+ var msgType = ctrlMsg . Length > 1
229+ ? ctrlMsg [ 1 ]
230+ : null ;
231+
232+ if ( OnControlCommand != null )
233+ OnControlCommand ( msgType ?? Operation . GetName ( op ) ) ;
234+
156235 switch ( op )
157236 {
158237 case Operation . Stop :
@@ -169,9 +248,15 @@ private void RunLoop()
169248 subscription . UnSubscribeFromAllChannels ( ) ; //Un block thread.
170249 return ;
171250 }
172- }
173251
174- if ( ! string . IsNullOrEmpty ( msg ) )
252+ switch ( msgType )
253+ {
254+ case ControlCommand . Pulse :
255+ Pulse ( ) ;
256+ break ;
257+ }
258+ }
259+ else
175260 {
176261 OnMessage ( channel , msg ) ;
177262 }
@@ -200,19 +285,22 @@ private void RunLoop()
200285
201286 if ( this . OnError != null )
202287 this . OnError ( ex ) ;
288+ }
203289
290+ if ( Interlocked . CompareExchange ( ref autoRestart , 0 , 0 ) == YES
291+ && Interlocked . CompareExchange ( ref status , 0 , 0 ) != Status . Disposed )
292+ {
204293 if ( KeepAliveRetryAfterMs != null )
205- {
206294 Thread . Sleep ( KeepAliveRetryAfterMs . Value ) ;
207295
208- if ( Interlocked . CompareExchange ( ref status , 0 , 0 ) != Status . Disposed )
209- Start ( ) ;
210- }
296+ Start ( ) ;
211297 }
212298 }
213299
214300 public void Stop ( )
215301 {
302+ Interlocked . CompareExchange ( ref autoRestart , NO , autoRestart ) ;
303+
216304 if ( Interlocked . CompareExchange ( ref status , 0 , 0 ) == Status . Disposed )
217305 throw new ObjectDisposedException ( "RedisPubSubServer has been disposed" ) ;
218306
@@ -221,20 +309,36 @@ public void Stop()
221309 Log . Debug ( "Stopping RedisPubSubServer..." ) ;
222310
223311 //Unblock current bgthread by issuing StopCommand
224- try
312+ SendControlCommand ( Operation . Stop ) ;
313+ }
314+ }
315+
316+ private void SendControlCommand ( int operation )
317+ {
318+ Interlocked . CompareExchange ( ref doOperation , operation , doOperation ) ;
319+ NotifyAllSubscribers ( ) ;
320+ }
321+
322+ private void NotifyAllSubscribers ( string commandType = null )
323+ {
324+ var msg = ControlCommand . Control ;
325+ if ( commandType != null )
326+ msg += ":" + commandType ;
327+
328+ try
329+ {
330+ using ( var redis = ClientsManager . GetClient ( ) )
225331 {
226- using ( var redis = ClientsManager . GetClient ( ) )
332+ foreach ( var channel in Channels )
227333 {
228- Interlocked . CompareExchange ( ref doOperation , Operation . Stop , doOperation ) ;
229- Channels . Each ( x =>
230- redis . PublishMessage ( x , Operation . ControlCommand ) ) ;
334+ redis . PublishMessage ( channel , msg ) ;
231335 }
232336 }
233- catch ( Exception ex )
234- {
235- if ( this . OnError != null ) this . OnError ( ex ) ;
236- Log . Warn ( "Could not send STOP message to bg thread: " + ex . Message ) ;
237- }
337+ }
338+ catch ( Exception ex )
339+ {
340+ if ( this . OnError != null ) this . OnError ( ex ) ;
341+ Log . Warn ( "Could not send '{0}' message to bg thread: {1}" . Fmt ( msg , ex . Message ) ) ;
238342 }
239343 }
240344
@@ -251,8 +355,10 @@ private void HandleFailover(IRedisClientsManager clientsManager)
251355 using ( var currentlySubscribedClient = ( ( RedisClient ) masterClient ) . CloneClient ( ) )
252356 {
253357 Interlocked . CompareExchange ( ref doOperation , Operation . Reset , doOperation ) ;
254- Channels . Each ( x =>
255- currentlySubscribedClient . PublishMessage ( x , Operation . ControlCommand ) ) ;
358+ foreach ( var channel in Channels )
359+ {
360+ currentlySubscribedClient . PublishMessage ( channel , ControlCommand . Control ) ;
361+ }
256362 }
257363 }
258364 else
@@ -279,7 +385,7 @@ void HandleUnSubscribe(string channel)
279385 public void Restart ( )
280386 {
281387 Stop ( ) ;
282- Start ( ) ;
388+ Interlocked . CompareExchange ( ref autoRestart , YES , autoRestart ) ;
283389 }
284390
285391 private void KillBgThreadIfExists ( )
@@ -319,12 +425,33 @@ private void SleepBackOffMultiplier(int continuousErrorsCount)
319425
320426 public static class Operation //dep-free copy of WorkerOperation
321427 {
322- public const string ControlCommand = "CTRL" ;
323-
324428 public const int NoOp = 0 ;
325429 public const int Stop = 1 ;
326430 public const int Reset = 2 ;
327431 public const int Restart = 3 ;
432+
433+ public static string GetName ( int op )
434+ {
435+ switch ( op )
436+ {
437+ case NoOp :
438+ return "NoOp" ;
439+ case Stop :
440+ return "Stop" ;
441+ case Reset :
442+ return "Reset" ;
443+ case Restart :
444+ return "Restart" ;
445+ default :
446+ return null ;
447+ }
448+ }
449+ }
450+
451+ public static class ControlCommand
452+ {
453+ public const string Control = "CTRL" ;
454+ public const string Pulse = "PULSE" ;
328455 }
329456
330457 class Status //dep-free copy of WorkerStatus
@@ -396,6 +523,8 @@ public virtual void Dispose()
396523 {
397524 if ( this . OnError != null ) this . OnError ( ex ) ;
398525 }
526+
527+ DisposeHeartbeatTimer ( ) ;
399528 }
400529 }
401530}
0 commit comments