Skip to content

Commit 0a6c03f

Browse files
committed
Refactored ResultProcessor to support specialized handling for the LOAD request, and protected ResultProcessorResult.get() against spurious wake up
1 parent 070f149 commit 0a6c03f

1 file changed

Lines changed: 67 additions & 15 deletions

File tree

src/main/java/org/digitalmediaserver/cast/Channel.java

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,15 @@
6666
import org.digitalmediaserver.cast.message.StandardMessage.Ping;
6767
import org.digitalmediaserver.cast.message.StandardMessage.Pong;
6868
import org.digitalmediaserver.cast.message.entity.Application;
69+
import org.digitalmediaserver.cast.message.entity.ExtendedMediaStatus;
6970
import org.digitalmediaserver.cast.message.entity.LoadOptions;
7071
import org.digitalmediaserver.cast.message.entity.Media;
7172
import org.digitalmediaserver.cast.message.entity.MediaStatus;
7273
import org.digitalmediaserver.cast.message.entity.MediaVolume;
7374
import org.digitalmediaserver.cast.message.entity.QueueData;
7475
import org.digitalmediaserver.cast.message.entity.ReceiverStatus;
7576
import org.digitalmediaserver.cast.message.entity.Volume;
77+
import org.digitalmediaserver.cast.message.enumeration.ExtendedPlayerState;
7678
import org.digitalmediaserver.cast.message.enumeration.ResumeState;
7779
import org.digitalmediaserver.cast.message.enumeration.VirtualConnectionType;
7880
import org.digitalmediaserver.cast.message.enumeration.VolumeControlType;
@@ -98,6 +100,7 @@
98100
import org.digitalmediaserver.cast.protobuf.CastChannel;
99101
import org.digitalmediaserver.cast.protobuf.CastChannel.CastMessage;
100102
import org.digitalmediaserver.cast.util.JacksonHelper;
103+
import org.digitalmediaserver.cast.util.Util;
101104
import org.digitalmediaserver.cast.util.X509TrustAllManager;
102105
import org.slf4j.Logger;
103106
import org.slf4j.LoggerFactory;
@@ -556,10 +559,10 @@ public <T extends Response> T send(
556559
return null;
557560
}
558561

559-
ResultProcessor<T> rp = new ResultProcessor<>(session, responseClass, responseTimeout);
560-
synchronized (requests) {
561-
requests.put(Long.valueOf(requestId), rp);
562-
}
562+
ResultProcessor<T> rp = new ResultProcessor<>(
563+
session, responseClass, responseTimeout, message instanceof Load
564+
);
565+
insertResultProcessor(requestId, rp);
563566

564567
write(namespace, message, sourceId, destinationId);
565568
try {
@@ -2126,12 +2129,11 @@ protected void cacheVolume(@Nullable Volume volume) {
21262129

21272130
/**
21282131
* Claims (retrieves and removes from {@code requests}) the
2129-
* {@link ResultProcessorResult} for the specified request ID if one exists.
2132+
* {@link ResultProcessor} for the specified request ID if one exists.
21302133
*
21312134
* @param requestId the request ID to look up.
2132-
* @return The one and only {@link ResultProcessor} instance for the
2133-
* specified request ID if it exists and hasn't already been
2134-
* claimed, or {@code null}.
2135+
* @return The {@link ResultProcessor} instance for the specified request ID
2136+
* if it exists and hasn't already been claimed, or {@code null}.
21352137
*/
21362138
@Nullable
21372139
protected ResultProcessor<? extends Response> acquireResultProcessor(long requestId) {
@@ -2143,6 +2145,22 @@ protected ResultProcessor<? extends Response> acquireResultProcessor(long reques
21432145
}
21442146
}
21452147

2148+
/**
2149+
* Stores the specified {@link ResultProcessor} with the specified request ID.
2150+
*
2151+
* @param requestId the request ID to use.
2152+
* @param rp the {@link ResultProcessor} to store with the specified request ID.
2153+
*/
2154+
protected void insertResultProcessor(long requestId, @Nonnull ResultProcessor<? extends Response> rp) {
2155+
Util.requireNotNull(rp, "rp");
2156+
if (requestId < 1L) {
2157+
return;
2158+
}
2159+
synchronized (requests) {
2160+
requests.put(Long.valueOf(requestId), rp);
2161+
}
2162+
}
2163+
21462164
/**
21472165
* Writes the specified {@link Message} to the socket using the specified
21482166
* parameters.
@@ -2679,8 +2697,13 @@ protected void processStringMessage(@Nonnull ImmutableStringCastMessage message,
26792697
}
26802698
ResultProcessor<? extends Response> resultProcessor;
26812699
if (requestId > 0L && (resultProcessor = acquireResultProcessor(requestId)) != null) {
2682-
resultProcessor.process(jsonMessage);
2683-
} else if (parsedMessage == null || isCustomMessage(parsedMessage)) {
2700+
if (resultProcessor.process(jsonMessage)) {
2701+
return;
2702+
}
2703+
insertResultProcessor(requestId, resultProcessor);
2704+
}
2705+
2706+
if (parsedMessage == null || isCustomMessage(parsedMessage)) {
26842707
listeners.fire(new DefaultCastEvent<>(
26852708
CastEventType.CUSTOM_MESSAGE,
26862709
new CustomMessageEvent(
@@ -2814,6 +2837,9 @@ protected class ResultProcessor<T extends Response> {
28142837
/** The timeout in milliseconds */
28152838
protected final long requestTimeout;
28162839

2840+
/** {@link Load} requests needs special treatment, this flag triggers the special treatment */
2841+
protected final boolean isLoadRequest;
2842+
28172843
/** Whether the associated {@link Session} has been closed */
28182844
@GuardedBy("this")
28192845
protected boolean closed;
@@ -2828,11 +2854,13 @@ protected class ResultProcessor<T extends Response> {
28282854
* @param session the {@link Session} if one applies to the request.
28292855
* @param responseClass the expected response class.
28302856
* @param requestTimeout the timeout value in milliseconds.
2857+
* @param isLoadRequest whether the result is from a {@link Load} request.
28312858
*/
2832-
public ResultProcessor(@Nullable Session session, @Nonnull Class<T> responseClass, long requestTimeout) {
2859+
public ResultProcessor(@Nullable Session session, @Nonnull Class<T> responseClass, long requestTimeout, boolean isLoadRequest) {
28332860
requireNotNull(responseClass, "responseClass");
28342861
this.session = session;
28352862
this.responseClass = responseClass;
2863+
this.isLoadRequest = isLoadRequest;
28362864
this.requestTimeout = requestTimeout < 1 ? DEFAULT_RESPONSE_TIMEOUT : requestTimeout;
28372865
}
28382866

@@ -2852,11 +2880,12 @@ public void sessionClosed() {
28522880
* {@link ResultProcessor} by its request ID.
28532881
*
28542882
* @param jsonMSG the message content formatted as JSON.
2883+
* @return {@code true} if the message was processed, {@code false} if it wasn't.
28552884
* @throws JsonMappingException If the JSON mapping fails.
28562885
* @throws JsonProcessingException If the JSON can't be processed.
28572886
*/
28582887
@SuppressWarnings("unchecked")
2859-
public void process(String jsonMSG) throws JsonMappingException, JsonProcessingException {
2888+
public boolean process(String jsonMSG) throws JsonMappingException, JsonProcessingException {
28602889
Class<?> deserializeTo;
28612890
if (StandardResponse.class.isAssignableFrom(responseClass)) {
28622891
deserializeTo = StandardResponse.class;
@@ -2869,8 +2898,22 @@ public void process(String jsonMSG) throws JsonMappingException, JsonProcessingE
28692898
} catch (IllegalArgumentException e) {
28702899
synchronized (this) {
28712900
this.result = new ResultProcessorResult<>(null, null, jsonMSG);
2872-
this.notify();
2873-
return;
2901+
this.notifyAll();
2902+
return true;
2903+
}
2904+
}
2905+
if (isLoadRequest && object instanceof MediaStatusResponse) {
2906+
// The cast device will often respond twice to a load request, where the first response
2907+
// is merely a "LOADING" state, which is then followed up with either success or error.
2908+
// We don't want to react to the "LOADING" state, as this makes all requests seem a success.
2909+
List<MediaStatus> statuses = ((MediaStatusResponse) object).getStatuses();
2910+
ExtendedMediaStatus extendedStatus;
2911+
if (
2912+
!statuses.isEmpty() &&
2913+
(extendedStatus = statuses.get(0).getExtendedStatus()) != null &&
2914+
extendedStatus.getPlayerState() == ExtendedPlayerState.LOADING
2915+
) {
2916+
return false;
28742917
}
28752918
}
28762919
synchronized (this) {
@@ -2881,6 +2924,7 @@ public void process(String jsonMSG) throws JsonMappingException, JsonProcessingE
28812924
}
28822925
this.notifyAll();
28832926
}
2927+
return true;
28842928
}
28852929

28862930
/**
@@ -2901,10 +2945,18 @@ public ResultProcessorResult<T> get() throws InterruptedException, TimeoutExcept
29012945
if (result != null) {
29022946
return result;
29032947
}
2904-
this.wait(requestTimeout);
29052948
if (closed) {
29062949
throw new CastException("The session was closed by the cast device");
29072950
}
2951+
long start = System.currentTimeMillis();
2952+
long remaining = requestTimeout;
2953+
while (result == null && remaining > 0L) {
2954+
this.wait(remaining);
2955+
if (result == null && closed) {
2956+
throw new CastException("The session was closed by the cast device");
2957+
}
2958+
remaining = start + requestTimeout - System.currentTimeMillis();
2959+
}
29082960
if (result == null) {
29092961
throw new TimeoutException();
29102962
}

0 commit comments

Comments
 (0)