Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.uber.cadence.internal.sync;

import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.common.WorkflowExecutionHistory;
import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.internal.common.InternalUtils;
Expand All @@ -37,15 +36,12 @@
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.WorkflowInterceptor;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

/** Workflow worker that supports POJO workflow implementations. */
public class SyncWorkflowWorker
implements SuspendableWorker, Consumer<PollForDecisionTaskResponse> {
public class SyncWorkflowWorker implements SuspendableWorker {

private final WorkflowWorker workflowWorker;
private final LocalActivityWorker laWorker;
Expand All @@ -68,7 +64,6 @@ public SyncWorkflowWorker(
SingleWorkerOptions locallyDispatchedActivityOptions,
DeciderCache cache,
String stickyTaskListName,
Duration stickyDecisionScheduleToStartTimeout,
ThreadPoolExecutor workflowThreadPool) {
Objects.requireNonNull(workflowThreadPool);
this.dataConverter = workflowOptions.getDataConverter();
Expand Down Expand Up @@ -100,7 +95,7 @@ public SyncWorkflowWorker(
cache,
workflowOptions,
stickyTaskListName,
stickyDecisionScheduleToStartTimeout,
workflowOptions.getStickyTaskListScheduleToStartTimeout(),
service,
laWorker.getLocalActivityTaskPoller());

Expand Down Expand Up @@ -241,11 +236,6 @@ public <R> R queryWorkflowExecution(
return dataConverter.fromData(result, resultClass, resultType);
}

@Override
public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
workflowWorker.accept(pollForDecisionTaskResponse);
}

public CompletableFuture<Boolean> isHealthy() {
return service.isHealthy();
}
Expand Down
149 changes: 149 additions & 0 deletions src/main/java/com/uber/cadence/internal/testservice/TaskQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.testservice;

import com.google.common.base.Preconditions;
import java.util.LinkedList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;

/**
* A specialized unbounded queue that requires blocking poll operations to happen through a Future
* so that they can be cancelled (i.e. cancelling the future breaks out of the poll via a
* j.u.c.CancellationException).
*
* @param <E>
*/
class TaskQueue<E> {
private final LinkedList<E> backlog = new LinkedList<>();
private final LinkedList<PollFuture> waiters = new LinkedList<>();

/**
* Adds the provided element to the tail of this queue.
*
* @param element the value to add
*/
synchronized void add(E element) {
for (PollFuture future = waiters.poll(); future != null; future = waiters.poll()) {
if (future.set(element)) {
return;
}
}
backlog.push(element);
Comment thread
Shaddoll marked this conversation as resolved.
}

/**
* Creates a new j.u.c.Future whose get() method will eventually return a value from the head of
* this queue. Note that failing to call get() on the returned Future can result in missed queue
* updates.
*
* @return a Future providing one-shot access to the head of this queue.
*/
Future<E> poll() {
final PollFuture future = new PollFuture();
E element;
synchronized (this) {
if (backlog.isEmpty()) {
waiters.push(future);
return future;
}
element = backlog.pop();
}
future.set(element);
return future;
}

/**
* A Future implementation specifically for consuming from the enclosing TaskQueue type. The get
* method on this class blocks until a value is available from the queue but unlike
* BlockingQueue#take, a blocked consumer can be "interrupted" without the use of thread
* interruption by calling #cancel() on this Future.
*/
private class PollFuture implements Future<E> {
boolean cancelled = false;
E value;

private synchronized boolean set(E element) {
Preconditions.checkState(value == null);
if (cancelled) {
return false;
}
value = element;
notifyAll();
return true;
}

@Override
public boolean cancel(boolean ignored) {
synchronized (TaskQueue.this) {
TaskQueue.this.waiters.remove(this);
}
synchronized (this) {
if (value != null) {
return false;
}
cancelled = true;
notifyAll();
return true;
}
}

@Override
public synchronized boolean isCancelled() {
return cancelled;
}

@Override
public synchronized boolean isDone() {
return value != null;
}

@Override
public synchronized E get() throws InterruptedException, ExecutionException {
while (value == null && !cancelled) {
this.wait();
}
if (cancelled) {
throw new CancellationException();
}
return value;
}

@Override
public synchronized E get(long timeout, @Nonnull TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long waitTimeNanos = unit.toNanos(timeout);
long deadline = System.nanoTime() + waitTimeNanos;
while (value == null && !cancelled) {
long remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
throw new TimeoutException();
}
TimeUnit.NANOSECONDS.timedWait(this, remainingNanos);
}
if (cancelled) {
throw new CancellationException();
}
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void apply(RequestContext ctx)
private final Map<String, WorkflowQuery> pendingQueries = new ConcurrentHashMap<>();
private final Optional<String> continuedExecutionRunId;
public StickyExecutionAttributes stickyExecutionAttributes;
private StickyExecutionAttributes previousStickyExecutionAttributes;

/**
* @param retryState present if workflow is a retry
Expand Down Expand Up @@ -151,18 +152,31 @@ private void completeDecisionUpdate(UpdateProcedure updater, StickyExecutionAttr
throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
BadRequestError {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
stickyExecutionAttributes = attributes;
update(true, updater, stackTraceElements[2].getMethodName());
update(true, updater, stackTraceElements[2].getMethodName(), attributes);
}

private void update(boolean completeDecisionUpdate, UpdateProcedure updater, String caller)
throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
BadRequestError {
update(completeDecisionUpdate, updater, caller, null);
}

private void update(
boolean completeDecisionUpdate,
UpdateProcedure updater,
String caller,
StickyExecutionAttributes attributes)
throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
BadRequestError {
String callerInfo = "Decision Update from " + caller;
lock.lock();
LockHandle lockHandle = selfAdvancingTimer.lockTimeSkipping(callerInfo);

try {
if (completeDecisionUpdate) {
previousStickyExecutionAttributes = stickyExecutionAttributes;
stickyExecutionAttributes = attributes;
}
checkCompleted();
boolean concurrentDecision =
!completeDecisionUpdate
Expand Down Expand Up @@ -618,6 +632,7 @@ private void timeoutDecisionTask(long scheduledEventId) {
|| decision.getData().scheduledEventId != scheduledEventId
|| decision.getState() == State.COMPLETED) {
// timeout for a previous decision
this.stickyExecutionAttributes = previousStickyExecutionAttributes;
return;
}
decision.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,29 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeou
public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskRequest pollRequest)
throws BadRequestError, InternalServiceError, ServiceBusyError, CadenceError {
PollForDecisionTaskResponse task;
java.util.concurrent.Future<PollForDecisionTaskResponse> future =
store.pollForDecisionTask(pollRequest);
try {
task = store.pollForDecisionTask(pollRequest);
// Poll with 60 second timeout to match production long poll behavior
task = future.get(60, java.util.concurrent.TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.cancel(true); // Cancel the Future to remove it from waiters queue
return new PollForDecisionTaskResponse();
} catch (java.util.concurrent.TimeoutException e) {
// Poll timed out - cancel to remove from waiters queue
future.cancel(true);
return new PollForDecisionTaskResponse();
} catch (java.util.concurrent.CancellationException e) {
// Poll was cancelled - return empty response
return new PollForDecisionTaskResponse();
} catch (java.util.concurrent.ExecutionException e) {
future.cancel(true);
throw new InternalServiceError("Error polling for decision task: " + e.getMessage());
}
// Return empty response if poll timed out
if (task.getWorkflowExecution() == null) {
return task;
}
ExecutionId executionId = new ExecutionId(pollRequest.getDomain(), task.getWorkflowExecution());
TestWorkflowMutableState mutableState = getMutableState(executionId);
Expand Down Expand Up @@ -440,10 +459,29 @@ public PollForActivityTaskResponse PollForActivityTask(PollForActivityTaskReques
throws BadRequestError, InternalServiceError, ServiceBusyError, CadenceError {
PollForActivityTaskResponse task;
while (true) {
java.util.concurrent.Future<PollForActivityTaskResponse> future =
store.pollForActivityTask(pollRequest);
try {
task = store.pollForActivityTask(pollRequest);
// Poll with 60 second timeout to match production long poll behavior
task = future.get(60, java.util.concurrent.TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.cancel(true); // Cancel the Future to remove it from waiters queue
return new PollForActivityTaskResponse();
} catch (java.util.concurrent.TimeoutException e) {
// Poll timed out - cancel to remove from waiters queue
future.cancel(true);
return new PollForActivityTaskResponse();
} catch (java.util.concurrent.CancellationException e) {
// Poll was cancelled - return empty response
return new PollForActivityTaskResponse();
} catch (java.util.concurrent.ExecutionException e) {
future.cancel(true);
throw new InternalServiceError("Error polling for activity task: " + e.getMessage());
}
// Return empty response if poll timed out
if (task.getWorkflowExecution() == null) {
return task;
}
ExecutionId executionId =
new ExecutionId(pollRequest.getDomain(), task.getWorkflowExecution());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Future;

interface TestWorkflowStore {

Expand Down Expand Up @@ -144,11 +145,9 @@ long save(RequestContext requestContext)

void registerDelayedCallback(Duration delay, Runnable r);

PollForDecisionTaskResponse pollForDecisionTask(PollForDecisionTaskRequest pollRequest)
throws InterruptedException;
Future<PollForDecisionTaskResponse> pollForDecisionTask(PollForDecisionTaskRequest pollRequest);

PollForActivityTaskResponse pollForActivityTask(PollForActivityTaskRequest pollRequest)
throws InterruptedException;
Future<PollForActivityTaskResponse> pollForActivityTask(PollForActivityTaskRequest pollRequest);

/** @return queryId */
void sendQueryTask(ExecutionId executionId, TaskListId taskList, PollForDecisionTaskResponse task)
Expand Down
Loading
Loading