Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 1035774

Browse files
committed
Add new slimmer and simpler RedisManagerPool
1 parent 4e1178d commit 1035774

1 file changed

Lines changed: 339 additions & 0 deletions

File tree

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
//Copyright (c) Service Stack LLC. All Rights Reserved.
2+
//License: https://raw.github.com/ServiceStack/ServiceStack/master/license.txt
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading;
7+
using ServiceStack.Caching;
8+
using ServiceStack.Logging;
9+
10+
namespace ServiceStack.Redis
11+
{
12+
public class RedisPoolConfig
13+
{
14+
public const int DefaultPoolTimeoutMs = 2000;
15+
public const int DefaultMaxPoolSize = 50;
16+
17+
public RedisPoolConfig()
18+
{
19+
MaxPoolSize = DefaultMaxPoolSize;
20+
PoolTimeout = TimeSpan.FromMilliseconds(DefaultPoolTimeoutMs);
21+
}
22+
23+
public TimeSpan PoolTimeout { get; set; }
24+
public int MaxPoolSize { get; set; }
25+
}
26+
27+
/// <summary>
28+
/// Provides thread-safe pooling of redis client connections.
29+
/// </summary>
30+
public partial class RedisManagerPool
31+
: IRedisClientsManager, IRedisFailover, IHandleClientDispose
32+
{
33+
private static readonly ILog Log = LogManager.GetLogger(typeof(RedisManagerPool));
34+
35+
private const string PoolTimeoutError =
36+
"Redis Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use.";
37+
38+
public int RecheckPoolAfterMs = 100;
39+
40+
private List<RedisEndpoint> Hosts { get; set; }
41+
42+
public List<Action<IRedisClientsManager>> OnFailover { get; private set; }
43+
44+
private RedisClient[] clients = new RedisClient[0];
45+
protected int poolIndex;
46+
47+
protected int RedisClientCounter = 0;
48+
49+
public IRedisClientFactory RedisClientFactory { get; set; }
50+
51+
public Action<IRedisNativeClient> ConnectionFilter { get; set; }
52+
53+
public int PoolTimeoutMs { get; set; }
54+
public int MaxPoolSize { get; set; }
55+
56+
public RedisManagerPool() : this(RedisNativeClient.DefaultHost) {}
57+
public RedisManagerPool(string host) : this(new[]{ host }) {}
58+
public RedisManagerPool(IEnumerable<string> hosts) : this(hosts, null) {}
59+
60+
public RedisManagerPool(IEnumerable<string> hosts, RedisPoolConfig config)
61+
{
62+
if (hosts == null)
63+
throw new ArgumentNullException("hosts");
64+
65+
Hosts = hosts.ToRedisEndPoints();
66+
67+
if (Hosts.Count == 0)
68+
throw new Exception("Must provide at least ");
69+
70+
this.RedisClientFactory = Redis.RedisClientFactory.Instance;
71+
72+
if (config == null)
73+
config = new RedisPoolConfig();
74+
75+
this.OnFailover = new List<Action<IRedisClientsManager>>();
76+
77+
this.PoolTimeoutMs = (int) config.PoolTimeout.TotalMilliseconds;
78+
this.MaxPoolSize = config.MaxPoolSize;
79+
80+
clients = new RedisClient[MaxPoolSize];
81+
poolIndex = 0;
82+
}
83+
84+
public void FailoverTo(params string[] readWriteHosts)
85+
{
86+
lock (clients)
87+
{
88+
for (var i = 0; i < clients.Length; i++)
89+
{
90+
clients[i] = null;
91+
}
92+
Hosts = readWriteHosts.ToRedisEndPoints();
93+
}
94+
95+
if (this.OnFailover != null)
96+
{
97+
foreach (var callback in OnFailover)
98+
{
99+
try
100+
{
101+
callback(this);
102+
}
103+
catch (Exception ex)
104+
{
105+
Log.Error("Error firing OnFailover callback(): ", ex);
106+
}
107+
}
108+
}
109+
}
110+
111+
/// <summary>
112+
/// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
113+
/// </summary>
114+
/// <returns></returns>
115+
public IRedisClient GetClient()
116+
{
117+
lock (clients)
118+
{
119+
AssertValidPool();
120+
121+
RedisClient inActiveClient;
122+
while ((inActiveClient = GetInActiveClient()) == null)
123+
{
124+
// wait for a connection, cry out if made to wait too long
125+
if (!Monitor.Wait(clients, PoolTimeoutMs))
126+
throw new TimeoutException(PoolTimeoutError);
127+
}
128+
129+
poolIndex++;
130+
inActiveClient.Active = true;
131+
132+
InitClient(inActiveClient);
133+
134+
return inActiveClient;
135+
}
136+
}
137+
138+
public IRedisClient GetReadOnlyClient()
139+
{
140+
return GetClient();
141+
}
142+
143+
/// <summary>
144+
/// Called within a lock
145+
/// </summary>
146+
/// <returns></returns>
147+
private RedisClient GetInActiveClient()
148+
{
149+
var desiredIndex = poolIndex % clients.Length;
150+
//this will loop through all hosts in readClients once even though there are 2 for loops
151+
//both loops are used to try to get the prefered host according to the round robin algorithm
152+
for (int x = 0; x < Hosts.Count; x++)
153+
{
154+
var nextHostIndex = (desiredIndex + x) % Hosts.Count;
155+
var nextHost = Hosts[nextHostIndex];
156+
for (var i = nextHostIndex; i < clients.Length; i += Hosts.Count)
157+
{
158+
if (clients[i] != null && !clients[i].Active && !clients[i].HadExceptions)
159+
return clients[i];
160+
else if (clients[i] == null || clients[i].HadExceptions)
161+
{
162+
if (clients[i] != null)
163+
clients[i].DisposeConnection();
164+
165+
var client = InitNewClient(nextHost);
166+
clients[i] = client;
167+
168+
return client;
169+
}
170+
}
171+
}
172+
return null;
173+
}
174+
175+
private RedisClient InitNewClient(RedisEndpoint nextHost)
176+
{
177+
var client = RedisClientFactory.CreateRedisClient(nextHost);
178+
client.Id = Interlocked.Increment(ref RedisClientCounter);
179+
client.ClientManager = this;
180+
client.ConnectionFilter = ConnectionFilter;
181+
182+
return client;
183+
}
184+
185+
private void InitClient(RedisClient client)
186+
{
187+
}
188+
189+
public void DisposeClient(RedisNativeClient client)
190+
{
191+
lock (clients)
192+
{
193+
for (var i = 0; i < clients.Length; i++)
194+
{
195+
var writeClient = clients[i];
196+
if (client != writeClient) continue;
197+
client.Active = false;
198+
Monitor.PulseAll(clients);
199+
return;
200+
}
201+
}
202+
203+
//Client not found in any pool, pulse both pools.
204+
lock (clients)
205+
Monitor.PulseAll(clients);
206+
}
207+
208+
/// <summary>
209+
/// Disposes the write client.
210+
/// </summary>
211+
/// <param name="client">The client.</param>
212+
public void DisposeWriteClient(RedisNativeClient client)
213+
{
214+
lock (clients)
215+
{
216+
client.Active = false;
217+
Monitor.PulseAll(clients);
218+
}
219+
}
220+
221+
public Dictionary<string, string> GetStats()
222+
{
223+
var clientsPoolSize = clients.Length;
224+
var clientsCreated = 0;
225+
var clientsWithExceptions = 0;
226+
var clientsInUse = 0;
227+
var clientsConnected = 0;
228+
229+
foreach (var client in clients)
230+
{
231+
if (client == null)
232+
{
233+
clientsCreated++;
234+
continue;
235+
}
236+
237+
if (client.HadExceptions)
238+
clientsWithExceptions++;
239+
if (client.Active)
240+
clientsInUse++;
241+
if (client.IsSocketConnected())
242+
clientsConnected++;
243+
}
244+
245+
var ret = new Dictionary<string, string>
246+
{
247+
{"clientsPoolSize", "" + clientsPoolSize},
248+
{"clientsCreated", "" + clientsCreated},
249+
{"clientsWithExceptions", "" + clientsWithExceptions},
250+
{"clientsInUse", "" + clientsInUse},
251+
{"clientsConnected", "" + clientsConnected},
252+
};
253+
254+
return ret;
255+
}
256+
257+
private void AssertValidPool()
258+
{
259+
if (clients.Length < 1)
260+
throw new InvalidOperationException("Need a minimum pool size of 1");
261+
}
262+
263+
public int[] GetClientPoolActiveStates()
264+
{
265+
var activeStates = new int[clients.Length];
266+
lock (clients)
267+
{
268+
for (int i = 0; i < clients.Length; i++)
269+
{
270+
activeStates[i] = clients[i] == null
271+
? -1
272+
: clients[i].Active ? 1 : 0;
273+
}
274+
}
275+
return activeStates;
276+
}
277+
278+
~RedisManagerPool()
279+
{
280+
Dispose(false);
281+
}
282+
283+
public void Dispose()
284+
{
285+
Dispose(true);
286+
GC.SuppressFinalize(this);
287+
}
288+
289+
protected virtual void Dispose(bool disposing)
290+
{
291+
if (Interlocked.Increment(ref disposeAttempts) > 1) return;
292+
293+
if (disposing)
294+
{
295+
// get rid of managed resources
296+
}
297+
298+
try
299+
{
300+
// get rid of unmanaged resources
301+
for (var i = 0; i < clients.Length; i++)
302+
{
303+
Dispose(clients[i]);
304+
}
305+
}
306+
catch (Exception ex)
307+
{
308+
Log.Error("Error when trying to dispose of PooledRedisClientManager", ex);
309+
}
310+
}
311+
312+
private int disposeAttempts = 0;
313+
314+
protected void Dispose(RedisClient redisClient)
315+
{
316+
if (redisClient == null) return;
317+
try
318+
{
319+
redisClient.DisposeConnection();
320+
}
321+
catch (Exception ex)
322+
{
323+
Log.Error(string.Format(
324+
"Error when trying to dispose of RedisClient to host {0}:{1}",
325+
redisClient.Host, redisClient.Port), ex);
326+
}
327+
}
328+
329+
public ICacheClient GetCacheClient()
330+
{
331+
return new RedisClientManagerCacheClient(this);
332+
}
333+
334+
public ICacheClient GetReadOnlyCacheClient()
335+
{
336+
return new RedisClientManagerCacheClient(this) { ReadOnly = true };
337+
}
338+
}
339+
}

0 commit comments

Comments
 (0)