77
88namespace Shuttle . Core . Threading
99{
10- public class ProcessorThread : IThreadState
10+ public class ProcessorThread
1111 {
1212 private static readonly int ThreadJoinTimeoutInterval =
1313 ConfigurationItem < int > . ReadSetting ( "ThreadJoinTimeoutInterval" , 1000 ) . GetValue ( ) ;
1414
1515 private readonly ILog _log ;
1616 private readonly string _name ;
1717 private readonly IProcessor _processor ;
18+ private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource ( ) ;
1819
19- private volatile bool _active ;
20+ private bool _started ;
2021
2122 private Thread _thread ;
2223
@@ -27,14 +28,16 @@ public ProcessorThread(string name, IProcessor processor)
2728 _name = name ;
2829 _processor = processor ;
2930
31+ CancellationToken = _cancellationTokenSource . Token ;
32+
3033 _log = Log . For ( this ) ;
3134 }
3235
33- public bool Active => _active ;
36+ public CancellationToken CancellationToken { get ; }
3437
3538 public void Start ( )
3639 {
37- if ( _active )
40+ if ( _started )
3841 {
3942 return ;
4043 }
@@ -57,8 +60,6 @@ public void Start()
5760 _thread . IsBackground = true ;
5861 _thread . Priority = ThreadPriority . Normal ;
5962
60- _active = true ;
61-
6263 _thread . Start ( ) ;
6364
6465 if ( Log . IsTraceEnabled )
@@ -67,15 +68,17 @@ public void Start()
6768 _processor . GetType ( ) . FullName ) ) ;
6869 }
6970
70- while ( ! _thread . IsAlive && _active )
71+ while ( ! _thread . IsAlive && ! CancellationToken . IsCancellationRequested )
7172 {
7273 }
7374
74- if ( _active && Log . IsTraceEnabled )
75+ if ( ! CancellationToken . IsCancellationRequested && Log . IsTraceEnabled )
7576 {
7677 _log . Trace ( string . Format ( Resources . ProcessorThreadActive , _thread . ManagedThreadId ,
7778 _processor . GetType ( ) . FullName ) ) ;
7879 }
80+
81+ _started = true ;
7982 }
8083
8184 public void Stop ( )
@@ -86,7 +89,7 @@ public void Stop()
8689 _processor . GetType ( ) . FullName ) ) ;
8790 }
8891
89- _active = false ;
92+ _cancellationTokenSource . Cancel ( ) ;
9093
9194 _processor . AttemptDispose ( ) ;
9295
@@ -98,15 +101,15 @@ public void Stop()
98101
99102 private void Work ( )
100103 {
101- while ( _active )
104+ while ( ! CancellationToken . IsCancellationRequested )
102105 {
103106 if ( Log . IsVerboseEnabled )
104107 {
105108 _log . Verbose ( string . Format ( Resources . ProcessorExecuting , _thread . ManagedThreadId ,
106109 _processor . GetType ( ) . FullName ) ) ;
107110 }
108111
109- _processor . Execute ( this ) ;
112+ _processor . Execute ( CancellationToken ) ;
110113 }
111114
112115 if ( Log . IsTraceEnabled )
@@ -118,7 +121,7 @@ private void Work()
118121
119122 internal void Deactivate ( )
120123 {
121- _active = false ;
124+ _cancellationTokenSource . Cancel ( ) ;
122125 }
123126 }
124127}
0 commit comments