Skip to content

Commit 71e47f0

Browse files
committed
Refactored ResultProcessor to support specialized handling for the LOAD request, and protected ResultProcessorResult.get() against spurious wake up
1 parent 7584537 commit 71e47f0

1 file changed

Lines changed: 67 additions & 15 deletions

File tree

src/main/java/org/digitalmediaserver/cast/Channel.java

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,15 @@
6666
import org.digitalmediaserver.cast.message.StandardMessage.Ping;
6767
import org.digitalmediaserver.cast.message.StandardMessage.Pong;
6868
import org.digitalmediaserver.cast.message.entity.Application;
69+
import org.digitalmediaserver.cast.message.entity.ExtendedMediaStatus;
6970
import org.digitalmediaserver.cast.message.entity.LoadOptions;
7071
import org.digitalmediaserver.cast.message.entity.Media;
7172
import org.digitalmediaserver.cast.message.entity.MediaStatus;
7273
import org.digitalmediaserver.cast.message.entity.MediaVolume;
7374
import org.digitalmediaserver.cast.message.entity.QueueData;
7475
import org.digitalmediaserver.cast.message.entity.ReceiverStatus;
7576
import org.digitalmediaserver.cast.message.entity.Volume;
77+
import org.digitalmediaserver.cast.message.enumeration.ExtendedPlayerState;
7678
import org.digitalmediaserver.cast.message.enumeration.ResumeState;
7779
import org.digitalmediaserver.cast.message.enumeration.VirtualConnectionType;
7880
import org.digitalmediaserver.cast.message.enumeration.VolumeControlType;
@@ -98,6 +100,7 @@
98100
import org.digitalmediaserver.cast.protobuf.CastChannel;
99101
import org.digitalmediaserver.cast.protobuf.CastChannel.CastMessage;
100102
import org.digitalmediaserver.cast.util.JacksonHelper;
103+
import org.digitalmediaserver.cast.util.Util;
101104
import org.digitalmediaserver.cast.util.X509TrustAllManager;
102105
import org.slf4j.Logger;
103106
import org.slf4j.LoggerFactory;
@@ -556,10 +559,10 @@ public <T extends Response> T send(
556559
return null;
557560
}
558561

559-
ResultProcessor<T> rp = new ResultProcessor<>(session, responseClass, responseTimeout);
560-
synchronized (requests) {
561-
requests.put(Long.valueOf(requestId), rp);
562-
}
562+
ResultProcessor<T> rp = new ResultProcessor<>(
563+
session, responseClass, responseTimeout, message instanceof Load
564+
);
565+
insertResultProcessor(requestId, rp);
563566

564567
write(namespace, message, sourceId, destinationId);
565568
try {
@@ -2078,12 +2081,11 @@ protected void cacheVolume(@Nullable Volume volume) {
20782081

20792082
/**
20802083
* Claims (retrieves and removes from {@code requests}) the
2081-
* {@link ResultProcessorResult} for the specified request ID if one exists.
2084+
* {@link ResultProcessor} for the specified request ID if one exists.
20822085
*
20832086
* @param requestId the request ID to look up.
2084-
* @return The one and only {@link ResultProcessor} instance for the
2085-
* specified request ID if it exists and hasn't already been
2086-
* claimed, or {@code null}.
2087+
* @return The {@link ResultProcessor} instance for the specified request ID
2088+
* if it exists and hasn't already been claimed, or {@code null}.
20872089
*/
20882090
@Nullable
20892091
protected ResultProcessor<? extends Response> acquireResultProcessor(long requestId) {
@@ -2095,6 +2097,22 @@ protected ResultProcessor<? extends Response> acquireResultProcessor(long reques
20952097
}
20962098
}
20972099

2100+
/**
2101+
* Stores the specified {@link ResultProcessor} with the specified request ID.
2102+
*
2103+
* @param requestId the request ID to use.
2104+
* @param rp the {@link ResultProcessor} to store with the specified request ID.
2105+
*/
2106+
protected void insertResultProcessor(long requestId, @Nonnull ResultProcessor<? extends Response> rp) {
2107+
Util.requireNotNull(rp, "rp");
2108+
if (requestId < 1L) {
2109+
return;
2110+
}
2111+
synchronized (requests) {
2112+
requests.put(Long.valueOf(requestId), rp);
2113+
}
2114+
}
2115+
20982116
/**
20992117
* Writes the specified {@link Message} to the socket using the specified
21002118
* parameters.
@@ -2631,8 +2649,13 @@ protected void processStringMessage(@Nonnull ImmutableStringCastMessage message,
26312649
}
26322650
ResultProcessor<? extends Response> resultProcessor;
26332651
if (requestId > 0L && (resultProcessor = acquireResultProcessor(requestId)) != null) {
2634-
resultProcessor.process(jsonMessage);
2635-
} else if (parsedMessage == null || isCustomMessage(parsedMessage)) {
2652+
if (resultProcessor.process(jsonMessage)) {
2653+
return;
2654+
}
2655+
insertResultProcessor(requestId, resultProcessor);
2656+
}
2657+
2658+
if (parsedMessage == null || isCustomMessage(parsedMessage)) {
26362659
listeners.fire(new DefaultCastEvent<>(
26372660
CastEventType.CUSTOM_MESSAGE,
26382661
new CustomMessageEvent(
@@ -2766,6 +2789,9 @@ protected class ResultProcessor<T extends Response> {
27662789
/** The timeout in milliseconds */
27672790
protected final long requestTimeout;
27682791

2792+
/** {@link Load} requests needs special treatment, this flag triggers the special treatment */
2793+
protected final boolean isLoadRequest;
2794+
27692795
/** Whether the associated {@link Session} has been closed */
27702796
@GuardedBy("this")
27712797
protected boolean closed;
@@ -2780,11 +2806,13 @@ protected class ResultProcessor<T extends Response> {
27802806
* @param session the {@link Session} if one applies to the request.
27812807
* @param responseClass the expected response class.
27822808
* @param requestTimeout the timeout value in milliseconds.
2809+
* @param isLoadRequest whether the result is from a {@link Load} request.
27832810
*/
2784-
public ResultProcessor(@Nullable Session session, @Nonnull Class<T> responseClass, long requestTimeout) {
2811+
public ResultProcessor(@Nullable Session session, @Nonnull Class<T> responseClass, long requestTimeout, boolean isLoadRequest) {
27852812
requireNotNull(responseClass, "responseClass");
27862813
this.session = session;
27872814
this.responseClass = responseClass;
2815+
this.isLoadRequest = isLoadRequest;
27882816
this.requestTimeout = requestTimeout < 1 ? DEFAULT_RESPONSE_TIMEOUT : requestTimeout;
27892817
}
27902818

@@ -2804,11 +2832,12 @@ public void sessionClosed() {
28042832
* {@link ResultProcessor} by its request ID.
28052833
*
28062834
* @param jsonMSG the message content formatted as JSON.
2835+
* @return {@code true} if the message was processed, {@code false} if it wasn't.
28072836
* @throws JsonMappingException If the JSON mapping fails.
28082837
* @throws JsonProcessingException If the JSON can't be processed.
28092838
*/
28102839
@SuppressWarnings("unchecked")
2811-
public void process(String jsonMSG) throws JsonMappingException, JsonProcessingException {
2840+
public boolean process(String jsonMSG) throws JsonMappingException, JsonProcessingException {
28122841
Class<?> deserializeTo;
28132842
if (StandardResponse.class.isAssignableFrom(responseClass)) {
28142843
deserializeTo = StandardResponse.class;
@@ -2821,8 +2850,22 @@ public void process(String jsonMSG) throws JsonMappingException, JsonProcessingE
28212850
} catch (IllegalArgumentException e) {
28222851
synchronized (this) {
28232852
this.result = new ResultProcessorResult<>(null, null, jsonMSG);
2824-
this.notify();
2825-
return;
2853+
this.notifyAll();
2854+
return true;
2855+
}
2856+
}
2857+
if (isLoadRequest && object instanceof MediaStatusResponse) {
2858+
// The cast device will often respond twice to a load request, where the first response
2859+
// is merely a "LOADING" state, which is then followed up with either success or error.
2860+
// We don't want to react to the "LOADING" state, as this makes all requests seem a success.
2861+
List<MediaStatus> statuses = ((MediaStatusResponse) object).getStatuses();
2862+
ExtendedMediaStatus extendedStatus;
2863+
if (
2864+
!statuses.isEmpty() &&
2865+
(extendedStatus = statuses.get(0).getExtendedStatus()) != null &&
2866+
extendedStatus.getPlayerState() == ExtendedPlayerState.LOADING
2867+
) {
2868+
return false;
28262869
}
28272870
}
28282871
synchronized (this) {
@@ -2833,6 +2876,7 @@ public void process(String jsonMSG) throws JsonMappingException, JsonProcessingE
28332876
}
28342877
this.notifyAll();
28352878
}
2879+
return true;
28362880
}
28372881

28382882
/**
@@ -2853,10 +2897,18 @@ public ResultProcessorResult<T> get() throws InterruptedException, TimeoutExcept
28532897
if (result != null) {
28542898
return result;
28552899
}
2856-
this.wait(requestTimeout);
28572900
if (closed) {
28582901
throw new CastException("The session was closed by the cast device");
28592902
}
2903+
long start = System.currentTimeMillis();
2904+
long remaining = requestTimeout;
2905+
while (result == null && remaining > 0L) {
2906+
this.wait(remaining);
2907+
if (result == null && closed) {
2908+
throw new CastException("The session was closed by the cast device");
2909+
}
2910+
remaining = start + requestTimeout - System.currentTimeMillis();
2911+
}
28602912
if (result == null) {
28612913
throw new TimeoutException();
28622914
}

0 commit comments

Comments
 (0)