99import com .google .common .collect .Sets ;
1010import com .google .protobuf .Message ;
1111import io .envoyproxy .controlplane .cache .Resources .ResourceType ;
12-
12+ import io . envoyproxy . envoy . config . endpoint . v3 . ClusterLoadAssignment ;
1313import java .util .Collection ;
1414import java .util .Collections ;
1515import java .util .HashMap ;
1616import java .util .List ;
1717import java .util .Map ;
1818import java .util .Objects ;
1919import java .util .Set ;
20+ import java .util .UUID ;
2021import java .util .concurrent .atomic .AtomicLong ;
2122import java .util .concurrent .locks .Lock ;
2223import java .util .concurrent .locks .ReadWriteLock ;
2627import java .util .stream .Collectors ;
2728import java .util .stream .Stream ;
2829import javax .annotation .concurrent .GuardedBy ;
29-
3030import org .slf4j .Logger ;
3131import org .slf4j .LoggerFactory ;
3232
3333/**
3434 * {@code SimpleCache} provides a default implementation of {@link SnapshotCache}. It maintains a single versioned
3535 * {@link Snapshot} per node group. For the protocol to work correctly in ADS mode, EDS/RDS requests are responded to
36- * only when all resources in the snapshot xDS response are named as part of the request. It is expected that the CDS
37- * response names all EDS clusters, and the LDS response names all RDS routes in a snapshot, to ensure that Envoy makes
38- * the request for all EDS clusters or RDS routes eventually.
36+ * only when all resources in the snapshot xDS response are named as part of the request by default. It is expected
37+ * that the CDS response names all EDS clusters, and the LDS response names all RDS routes in a snapshot, to ensure
38+ * that Envoy makes the request for all EDS clusters or RDS routes eventually.
39+ *<p/>
40+ * when allowIncompleteEdsUpdate is true, we will send EDS response even if some clusters names are missing in the
41+ * snapshot in ADS.
3942 *
4043 * <p>The snapshot can be partial, e.g. only include RDS or EDS resources.
4144 */
@@ -93,7 +96,7 @@ public Watch createWatch(
9396 XdsRequest request ,
9497 Set <String > knownResourceNames ,
9598 Consumer <Response > responseConsumer ) {
96- return createWatch (ads , request , knownResourceNames , responseConsumer , false );
99+ return createWatch (ads , request , knownResourceNames , responseConsumer , false , false );
97100 }
98101
99102 /**
@@ -105,7 +108,8 @@ public Watch createWatch(
105108 XdsRequest request ,
106109 Set <String > knownResourceNames ,
107110 Consumer <Response > responseConsumer ,
108- boolean hasClusterChanged ) {
111+ boolean hasClusterChanged ,
112+ boolean allowDefaultEmptyEdsUpdate ) {
109113 ResourceType requestResourceType = request .getResourceType ();
110114 Preconditions .checkNotNull (requestResourceType , "unsupported type URL %s" ,
111115 request .getTypeUrl ());
@@ -124,7 +128,7 @@ public Watch createWatch(
124128 String version = snapshot == null ? "" : snapshot .version (requestResourceType ,
125129 request .getResourceNamesList ());
126130
127- Watch watch = new Watch (ads , request , responseConsumer );
131+ Watch watch = new Watch (ads , allowDefaultEmptyEdsUpdate , request , responseConsumer );
128132
129133 if (snapshot != null ) {
130134 Set <String > requestedResources = ImmutableSet .copyOf (request .getResourceNamesList ());
@@ -440,13 +444,14 @@ protected void respondWithSpecificOrder(T group,
440444 }
441445
442446 private Response createResponse (XdsRequest request , Map <String , VersionedResource <?>> resources ,
443- String version ) {
447+ String version , boolean allowDefaultResource ) {
444448 Collection <? extends Message > filtered = request .getResourceNamesList ().isEmpty ()
445449 ? resources .values ().stream ()
446450 .map (VersionedResource ::resource )
447451 .collect (Collectors .toList ())
448452 : request .getResourceNamesList ().stream ()
449- .map (resources ::get )
453+ .map (name -> resources .getOrDefault (name ,
454+ allowDefaultResource ? defaultResource (name , request .getResourceType ()) : null ))
450455 .filter (Objects ::nonNull )
451456 .map (VersionedResource ::resource )
452457 .collect (Collectors .toList ());
@@ -458,21 +463,35 @@ private boolean respond(Watch watch, U snapshot, T group) {
458463 Map <String , VersionedResource <?>> snapshotResources =
459464 snapshot .versionedResources (watch .request ().getResourceType ());
460465
466+ boolean allowDefaultResource = false ;
461467 if (!watch .request ().getResourceNamesList ().isEmpty () && watch .ads ()) {
462468 Collection <String > missingNames = watch .request ().getResourceNamesList ().stream ()
463469 .filter (name -> !snapshotResources .containsKey (name ))
464470 .collect (Collectors .toList ());
465471
466472 if (!missingNames .isEmpty ()) {
467- LOGGER .info (
468- "not responding in ADS mode for {} from node {} at version {} for request [{}] since [{}] not in snapshot" ,
469- watch .request ().getTypeUrl (),
470- group ,
471- snapshot .version (watch .request ().getResourceType (), watch .request ().getResourceNamesList ()),
472- String .join (", " , watch .request ().getResourceNamesList ()),
473- String .join (", " , missingNames ));
473+ if (watch .allowDefaultEmptyEdsUpdate () && watch .request ().getResourceType ().equals (ResourceType .ENDPOINT )) {
474+ allowDefaultResource = true ;
475+ LOGGER .info (
476+ "responding with empty ClusterLoadAssignments in ADS mode for {} from node {} at version {} "
477+ + "for request [{}] and [{}] not in snapshot" ,
478+ watch .request ().getTypeUrl (),
479+ group ,
480+ snapshot .version (watch .request ().getResourceType (), watch .request ().getResourceNamesList ()),
481+ String .join (", " , watch .request ().getResourceNamesList ()),
482+ String .join (", " , missingNames ));
483+ } else {
484+ LOGGER .info (
485+ "not responding in ADS mode for {} from node {} at version {} for request [{}] since [{}] not in"
486+ + " snapshot" ,
487+ watch .request ().getTypeUrl (),
488+ group ,
489+ snapshot .version (watch .request ().getResourceType (), watch .request ().getResourceNamesList ()),
490+ String .join (", " , watch .request ().getResourceNamesList ()),
491+ String .join (", " , missingNames ));
474492
475- return false ;
493+ return false ;
494+ }
476495 }
477496 }
478497
@@ -488,7 +507,8 @@ private boolean respond(Watch watch, U snapshot, T group) {
488507 Response response = createResponse (
489508 watch .request (),
490509 snapshotResources ,
491- version );
510+ version ,
511+ allowDefaultResource );
492512
493513 try {
494514 watch .respond (response );
@@ -573,6 +593,14 @@ private ResponseState respondDelta(DeltaWatch watch,
573593 return ResponseState .CANCELLED ;
574594 }
575595
596+ private VersionedResource <?> defaultResource (String resourceName , ResourceType resourceType ) {
597+ if (resourceType .equals (ResourceType .ENDPOINT )) {
598+ return VersionedResource .create (ClusterLoadAssignment .newBuilder ().setClusterName (resourceName ).build (),
599+ UUID .randomUUID ().toString ());
600+ }
601+ throw new IllegalArgumentException (String .format ("no default resource for resourceType: [%s]" , resourceType ));
602+ }
603+
576604 private enum ResponseState {
577605 RESPONDED ,
578606 UNRESPONDED ,
0 commit comments