@@ -43,7 +43,7 @@ public class SimpleCache<T> implements SnapshotCache<T> {
4343
4444 @ GuardedBy ("lock" )
4545 private final Map <T , Snapshot > snapshots = new HashMap <>();
46- private final ConcurrentMap <T , CacheStatusInfo <T >> statuses = new ConcurrentHashMap <>();
46+ private final ConcurrentMap <T , ConcurrentMap < String , CacheStatusInfo <T > >> statuses = new ConcurrentHashMap <>();
4747
4848 private AtomicLong watchCount = new AtomicLong ();
4949
@@ -64,10 +64,10 @@ public boolean clearSnapshot(T group) {
6464 // we take a writeLock to prevent watches from being created
6565 writeLock .lock ();
6666 try {
67- CacheStatusInfo <T > status = statuses .get (group );
67+ Map < String , CacheStatusInfo <T > > status = statuses .get (group );
6868
6969 // If we don't know about this group, do nothing.
70- if (status != null && status .numWatches () > 0 ) {
70+ if (status != null && status .values (). stream (). mapToLong ( CacheStatusInfo :: numWatches ). sum () > 0 ) {
7171 LOGGER .warn ("tried to clear snapshot for group with existing watches, group={}" , group );
7272
7373 return false ;
@@ -106,7 +106,8 @@ public Watch createWatch(
106106 // doesn't conflict
107107 readLock .lock ();
108108 try {
109- CacheStatusInfo <T > status = statuses .computeIfAbsent (group , g -> new CacheStatusInfo <>(group ));
109+ CacheStatusInfo <T > status = statuses .computeIfAbsent (group , g -> new ConcurrentHashMap <>())
110+ .computeIfAbsent (request .getTypeUrl (), s -> new CacheStatusInfo <>(group ));
110111 status .setLastWatchRequestTime (System .currentTimeMillis ());
111112
112113 Snapshot snapshot = snapshots .get (group );
@@ -212,7 +213,7 @@ public Collection<T> groups() {
212213 @ Override
213214 public synchronized void setSnapshot (T group , Snapshot snapshot ) {
214215 // we take a writeLock to prevent watches from being created while we update the snapshot
215- CacheStatusInfo <T > status ;
216+ ConcurrentMap < String , CacheStatusInfo <T > > status ;
216217 writeLock .lock ();
217218 try {
218219 // Update the existing snapshot entry.
@@ -238,15 +239,26 @@ public StatusInfo statusInfo(T group) {
238239 readLock .lock ();
239240
240241 try {
241- return statuses .get (group );
242+ ConcurrentMap <String , CacheStatusInfo <T >> statusMap = statuses .get (group );
243+ if (statusMap == null || statusMap .isEmpty ()) {
244+ return null ;
245+ }
246+
247+ return new GroupCacheStatusInfo <>(statusMap .values ());
242248 } finally {
243249 readLock .unlock ();
244250 }
245251 }
246252
247253 @ VisibleForTesting
248- protected void respondWithSpecificOrder (T group , Snapshot snapshot , CacheStatusInfo <T > status ) {
254+ protected void respondWithSpecificOrder (T group , Snapshot snapshot ,
255+ ConcurrentMap <String , CacheStatusInfo <T >> statusMap ) {
249256 for (String typeUrl : Resources .TYPE_URLS ) {
257+ CacheStatusInfo <T > status = statusMap .get (typeUrl );
258+ if (status == null ) {
259+ continue ;
260+ }
261+
250262 status .watchesRemoveIf ((id , watch ) -> {
251263 if (!watch .request ().getTypeUrl ().equals (typeUrl )) {
252264 return false ;
0 commit comments