@@ -17,17 +17,16 @@ namespace Vertex.Stream.Kafka.Consumer
1717 public class ConsumerManager : IHostedService , IDisposable
1818 {
1919 private const int HoldTime = 20 * 1000 ;
20- private const int MonitTime = 60 * 2 * 1000 ;
20+ private const int MonitorTime = 60 * 2 * 1000 ;
2121 private const int CheckTime = 10 * 1000 ;
2222 private const int LockHoldingSeconds = 60 ;
2323
2424 private readonly List < QueueInfo > queues ;
2525 private readonly ILogger < ConsumerManager > logger ;
26- private readonly IKafkaClient client ;
2726 private readonly IServiceProvider provider ;
2827 private readonly IGrainFactory grainFactory ;
29- private readonly ConcurrentDictionary < string , ConsumerRunner > consumerRunners = new ConcurrentDictionary < string , ConsumerRunner > ( ) ;
30- private readonly ConcurrentDictionary < string , long > runners = new ConcurrentDictionary < string , long > ( ) ;
28+ private readonly ConcurrentDictionary < string , ConsumerRunner > consumerRunners = new ( ) ;
29+ private readonly ConcurrentDictionary < string , long > runners = new ( ) ;
3130
3231 private Timer distributedHoldTimer ;
3332 private Timer distributedMonitorTime ;
@@ -39,16 +38,14 @@ public class ConsumerManager : IHostedService, IDisposable
3938
4039 public ConsumerManager (
4140 ILogger < ConsumerManager > logger ,
42- IKafkaClient client ,
4341 IGrainFactory grainFactory ,
4442 IServiceProvider provider )
4543 {
4644 this . provider = provider ;
47- this . client = client ;
4845 this . logger = logger ;
4946 this . grainFactory = grainFactory ;
5047
51- this . queues = new List < QueueInfo > ( ) ;
48+ this . queues = [ ] ;
5249 foreach ( var assembly in AssemblyHelper . GetAssemblies ( logger ) )
5350 {
5451 foreach ( var type in assembly . GetTypes ( ) )
@@ -87,7 +84,7 @@ public Task StartAsync(CancellationToken cancellationToken)
8784 this . logger . LogInformation ( "EventBus Background Service is starting." ) ;
8885 }
8986
90- this . distributedMonitorTime = new Timer ( state => this . DistributedStart ( ) . Wait ( ) , null , 1000 , MonitTime ) ;
87+ this . distributedMonitorTime = new Timer ( state => this . DistributedStart ( ) . Wait ( ) , null , 1000 , MonitorTime ) ;
9188 this . distributedHoldTimer = new Timer ( state => this . DistributedHold ( ) . Wait ( ) , null , HoldTime , HoldTime ) ;
9289 this . heathCheckTimer = new Timer ( state => { this . HeathCheck ( ) . Wait ( ) ; } , null , CheckTime , CheckTime ) ;
9390 return Task . CompletedTask ;
0 commit comments