1+ using System ;
2+ using System . Threading ;
3+ using System . Threading . Tasks ;
4+ using ServiceStack . Logging ;
5+ using ServiceStack . Redis ;
6+ using ServiceStack . Text ;
7+
8+ namespace ConsoleTests
9+ {
10+ public class MultiBlockingRemoveAfterReconnection
11+ {
12+ protected internal static RedisManagerPool RedisManager ;
13+
14+ public void Execute ( )
15+ {
16+ // LogManager.LogFactory = new ConsoleLogFactory();
17+ // RedisConfig.EnableVerboseLogging = true;
18+
19+ RedisConfig . DefaultConnectTimeout = 20 * 1000 ;
20+ RedisConfig . DefaultRetryTimeout = 20 * 1000 ;
21+
22+ RedisManager = new RedisManagerPool ( $ "localhost:6379?db=9") ;
23+
24+ MultipleBlocking ( 3 ) ;
25+
26+ Console . ReadLine ( ) ;
27+ }
28+
29+ private static void MultipleBlocking ( int count )
30+ {
31+ for ( int i = 0 ; i < count ; i ++ )
32+ {
33+ var queue = $ "Q{ i + 1 } ";
34+ RunTask ( ( ) => BlockingRemoveStartFromList ( queue ) , $ "Receive from { queue } ") ;
35+ }
36+ }
37+ public static void BlockingRemoveStartFromList ( string queue )
38+ {
39+ using ( var client = RedisManager . GetClient ( ) as RedisClient )
40+ {
41+ client . Ping ( ) ;
42+ Console . WriteLine ( $ "#{ client . ClientId } Listening to { queue } ") ;
43+
44+ var fromList = client . BlockingRemoveStartFromList ( queue , TimeSpan . FromHours ( 10 ) ) ;
45+ Console . WriteLine ( $ "#{ client . ClientId } Received: '{ fromList . Dump ( ) } ' from '{ queue } '") ;
46+ }
47+ }
48+
49+ private static void RunTask ( Action action , string name )
50+ {
51+ Task . Run ( ( ) =>
52+ {
53+
54+ while ( true )
55+ {
56+ try
57+ {
58+ Console . WriteLine ( $ "Invoking { name } ") ;
59+ action . Invoke ( ) ;
60+ }
61+ catch ( Exception exception )
62+ {
63+ Console . WriteLine ( $ "Exception in { name } : { exception } ") ;
64+ //Thread.Sleep(5000);// Give redis some time to wake up!
65+ }
66+
67+ Thread . Sleep ( 100 ) ;
68+ }
69+ } ) ;
70+ }
71+ }
72+ }
0 commit comments