Skip to content

Commit 151ffce

Browse files
committed
Upgrade App Engine dev task queue emulation to Quartz 2.4.1 which is the supported version going forward.
PiperOrigin-RevId: 899225510 Change-Id: I7be55cf6b2b38e0913b9e1470b24545e86909502
1 parent 7113028 commit 151ffce

10 files changed

Lines changed: 205 additions & 343 deletions

File tree

api_dev/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
<artifactId>jsoup</artifactId>
113113
</dependency>
114114
<dependency>
115-
<groupId>org.quartz-scheduler</groupId>
115+
<groupId>quartz</groupId>
116116
<artifactId>quartz</artifactId>
117117
</dependency>
118118
<dependency>
@@ -244,4 +244,3 @@
244244
</plugins>
245245
</build>
246246
</project>
247-

api_dev/src/main/java/com/google/appengine/api/taskqueue/dev/DevPushQueue.java

Lines changed: 36 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@
1616

1717
package com.google.appengine.api.taskqueue.dev;
1818

19-
import static org.quartz.JobBuilder.newJob;
20-
import static org.quartz.JobKey.jobKey;
21-
import static org.quartz.TriggerBuilder.newTrigger;
22-
import static org.quartz.impl.matchers.GroupMatcher.jobGroupEquals;
23-
import static org.quartz.impl.matchers.GroupMatcher.triggerGroupEquals;
24-
2519
import com.google.appengine.api.taskqueue.dev.QueueStateInfo.TaskStateInfo;
2620
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueAddRequest;
2721
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueAddRequest.Header;
@@ -34,24 +28,19 @@
3428
import com.google.apphosting.utils.config.QueueXml;
3529
import com.google.common.flogger.GoogleLogger;
3630
import com.google.protobuf.ByteString;
37-
import java.time.Instant;
3831
import java.util.ArrayList;
32+
import java.util.Arrays;
3933
import java.util.Collections;
4034
import java.util.Date;
4135
import java.util.List;
42-
import java.util.Set;
4336
import org.quartz.Job;
44-
import org.quartz.JobDataMap;
4537
import org.quartz.JobDetail;
4638
import org.quartz.JobExecutionContext;
4739
import org.quartz.JobExecutionException;
48-
import org.quartz.JobKey;
4940
import org.quartz.Scheduler;
5041
import org.quartz.SchedulerException;
5142
import org.quartz.SimpleTrigger;
5243
import org.quartz.Trigger;
53-
import org.quartz.impl.JobExecutionContextImpl;
54-
import org.quartz.spi.OperableTrigger;
5544
import org.quartz.spi.TriggerFiredBundle;
5645

5746
/**
@@ -100,7 +89,7 @@ Mode getMode() {
10089
// to the contrary, pausing a job group only pauses jobs that already
10190
// exist. We need to make sure all future jobs are paused, and that
10291
// works if we pause the trigger group.
103-
scheduler.pauseTriggers(triggerGroupEquals(getQueueName()));
92+
scheduler.pauseTriggerGroup(getQueueName());
10493
} catch (SchedulerException e) {
10594
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE, e.getMessage());
10695
}
@@ -123,7 +112,7 @@ private synchronized String scheduleTask(TaskQueueAddRequest.Builder addRequest)
123112
taskName = genTaskName();
124113
}
125114
try {
126-
if (scheduler.checkExists(jobKey(taskName, getQueueName()))) {
115+
if (scheduler.getJobDetail(taskName, getQueueName()) != null) {
127116
throw new ApiProxy.ApplicationException(ErrorCode.TASK_ALREADY_EXISTS_VALUE);
128117
}
129118
} catch (SchedulerException e) {
@@ -132,32 +121,19 @@ private synchronized String scheduleTask(TaskQueueAddRequest.Builder addRequest)
132121

133122
TaskQueueRetryParameters retryParams = getRetryParameters(addRequest);
134123
long etaMillis = addRequest.getEtaUsec() / 1000L;
135-
136-
JobDataMap jobDataMap =
137-
newUrlFetchJobDataMap(taskName, getQueueName(), addRequest, retryParams);
138-
139-
JobDetail job =
140-
newJob(UrlFetchJob.class)
141-
.withIdentity(taskName, getQueueName())
142-
.usingJobData(jobDataMap)
143-
.build();
144-
145-
Trigger trigger =
146-
newTrigger()
147-
.withIdentity(taskName, getQueueName())
148-
.startAt(Date.from(Instant.ofEpochMilli(etaMillis)))
149-
.build();
150-
124+
SimpleTrigger trigger = new SimpleTrigger(taskName, getQueueName());
125+
trigger.setStartTime(new Date(etaMillis));
126+
JobDetail jd = newUrlFetchJobDetail(taskName, getQueueName(), addRequest, retryParams);
151127
try {
152-
scheduler.scheduleJob(job, trigger);
128+
scheduler.scheduleJob(jd, trigger);
153129
} catch (SchedulerException e) {
154130
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE, e.getMessage());
155131
}
156132
return taskName;
157133
}
158134

159135
// broken out to support testing
160-
JobDataMap newUrlFetchJobDataMap(
136+
JobDetail newUrlFetchJobDetail(
161137
String taskName,
162138
String queueName,
163139
TaskQueueAddRequest.Builder addRequest,
@@ -167,20 +143,18 @@ JobDataMap newUrlFetchJobDataMap(
167143
String host = header.getValue().toStringUtf8();
168144
if (host.startsWith("localhost:")) {
169145
return new UrlFetchJobDetail(
170-
taskName,
171-
queueName,
172-
addRequest,
173-
"http://" + host,
174-
callback,
175-
queueXmlEntry,
176-
retryParams)
177-
.getJobDataMap();
146+
taskName,
147+
queueName,
148+
addRequest,
149+
"http://" + host,
150+
callback,
151+
queueXmlEntry,
152+
retryParams);
178153
}
179154
}
180155
}
181156
return new UrlFetchJobDetail(
182-
taskName, queueName, addRequest, baseUrl, callback, queueXmlEntry, retryParams)
183-
.getJobDataMap();
157+
taskName, queueName, addRequest, baseUrl, callback, queueXmlEntry, retryParams);
184158
}
185159

186160
@Override
@@ -203,11 +177,8 @@ TaskQueueAddResponse add(TaskQueueAddRequest.Builder addRequest) {
203177
}
204178

205179
List<String> getSortedJobNames() throws SchedulerException {
206-
Set<JobKey> jobKeys = scheduler.getJobKeys(jobGroupEquals(getQueueName()));
207-
List<String> jobNameList = new ArrayList<>();
208-
for (JobKey jobKey : jobKeys) {
209-
jobNameList.add(jobKey.getName());
210-
}
180+
String[] jobNames = scheduler.getJobNames(getQueueName());
181+
List<String> jobNameList = Arrays.asList(jobNames);
211182
Collections.sort(jobNameList);
212183
return jobNameList;
213184
}
@@ -220,27 +191,23 @@ QueueStateInfo getStateInfo() {
220191
// Get the names of all jobs belonging to this queue (group).
221192
for (String jobName : getSortedJobNames()) {
222193
// Now get job details
223-
JobDetail jobDetail = scheduler.getJobDetail(jobKey(jobName, getQueueName()));
224-
if (jobDetail == null) {
194+
UrlFetchJobDetail jd = (UrlFetchJobDetail) scheduler.getJobDetail(jobName, getQueueName());
195+
if (jd == null) {
225196
// oops, gone, must have already run
226197
continue;
227198
}
228-
229-
UrlFetchJobDetail jd = new UrlFetchJobDetail(jobDetail.getJobDataMap());
230-
231-
List<? extends Trigger> triggers =
232-
scheduler.getTriggersOfJob(jobKey(jobName, getQueueName()));
233-
if (triggers.isEmpty()) {
199+
Trigger[] triggers = scheduler.getTriggersOfJob(jobName, getQueueName());
200+
if (triggers.length == 0) {
234201
// must have run in between the time we fetched the job detail and the time we fetched the
235202
// trigger
236203
continue;
237204
}
238-
if (triggers.size() != 1) {
205+
if (triggers.length != 1) {
239206
throw new IllegalStateException(
240207
"Multiple triggers for task " + jobName + " in queue " + getQueueName());
241208
}
242-
long execTime = triggers.get(0).getStartTime().toInstant().toEpochMilli();
243-
taskInfoList.add(new TaskStateInfo(jd.getTaskName(), execTime, jd.getAddRequest(), clock));
209+
long execTime = triggers[0].getStartTime().getTime();
210+
taskInfoList.add(new TaskStateInfo(jd.getName(), execTime, jd.getAddRequest(), clock));
244211
}
245212
} catch (SchedulerException e) {
246213
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE);
@@ -264,7 +231,7 @@ QueueStateInfo getStateInfo() {
264231
@Override
265232
boolean deleteTask(String taskName) {
266233
try {
267-
return scheduler.deleteJob(jobKey(taskName, getQueueName()));
234+
return scheduler.deleteJob(taskName, getQueueName());
268235
} catch (SchedulerException e) {
269236
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE);
270237
}
@@ -274,18 +241,20 @@ boolean deleteTask(String taskName) {
274241
@Override
275242
void flush() {
276243
try {
277-
Set<JobKey> jobKeys = scheduler.getJobKeys(jobGroupEquals(getQueueName()));
278-
scheduler.deleteJobs(new ArrayList<>(jobKeys));
244+
for (String name : scheduler.getJobNames(getQueueName())) {
245+
scheduler.deleteJob(name, getQueueName());
246+
}
279247
} catch (SchedulerException e) {
280248
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE);
281249
}
282250
}
283251

284-
private JobExecutionContext getExecutionContext(JobDetail jobDetail, SimpleTrigger trigger) {
252+
private JobExecutionContext getExecutionContext(UrlFetchJobDetail jobDetail) {
253+
Trigger trigger = new SimpleTrigger(jobDetail.getTaskName(), jobDetail.getQueueName());
254+
trigger.setJobDataMap(jobDetail.getJobDataMap());
285255
TriggerFiredBundle bundle =
286-
new TriggerFiredBundle(
287-
jobDetail, (OperableTrigger) trigger, null, false, null, null, null, null);
288-
return new JobExecutionContextImpl(scheduler, bundle, null);
256+
new TriggerFiredBundle(jobDetail, trigger, null, false, null, null, null, null);
257+
return new JobExecutionContext(scheduler, bundle, null);
289258
}
290259

291260
/**
@@ -300,19 +269,11 @@ boolean runTask(String taskName) {
300269
Job job;
301270
JobExecutionContext context;
302271
try {
303-
JobDetail jd = scheduler.getJobDetail(jobKey(taskName, getQueueName()));
272+
UrlFetchJobDetail jd = (UrlFetchJobDetail) scheduler.getJobDetail(taskName, getQueueName());
304273
if (jd == null) {
305274
return false;
306275
}
307-
// Reconstruct trigger for execution context - just needs to hold data map
308-
SimpleTrigger trigger =
309-
(SimpleTrigger)
310-
newTrigger()
311-
.withIdentity(taskName, getQueueName())
312-
.usingJobData(jd.getJobDataMap())
313-
.build();
314-
315-
context = getExecutionContext(jd, trigger);
276+
context = getExecutionContext(jd);
316277
job = (Job) jd.getJobClass().newInstance();
317278
} catch (SchedulerException e) {
318279
return false;

api_dev/src/main/java/com/google/appengine/api/taskqueue/dev/LocalTaskQueue.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import java.util.List;
6464
import java.util.Map;
6565
import java.util.Map.Entry;
66-
import java.util.Properties;
6766
import java.util.Random;
6867
import java.util.TreeMap;
6968
import org.jspecify.annotations.Nullable;
@@ -691,13 +690,7 @@ static Scheduler startScheduler(boolean disableAutoTaskExecution) {
691690
// TODO: Investigate config options for the scheduler like
692691
// threadpool size.
693692
try {
694-
StdSchedulerFactory factory = new StdSchedulerFactory();
695-
Properties props = new Properties();
696-
props.setProperty("org.quartz.scheduler.instanceName", "AppEngineLocalTaskQueue");
697-
props.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
698-
props.setProperty("org.quartz.threadPool.threadCount", "10");
699-
factory.initialize(props);
700-
Scheduler scheduler = factory.getScheduler();
693+
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
701694
// When a scheduler is first created it is in standby mode, which means
702695
// it will accept and schedule tasks but won't ever run them.
703696
if (!disableAutoTaskExecution) {

api_dev/src/main/java/com/google/appengine/api/taskqueue/dev/UrlFetchJob.java

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,23 @@
1616

1717
package com.google.appengine.api.taskqueue.dev;
1818

19-
import static org.quartz.JobBuilder.newJob;
20-
import static org.quartz.TriggerBuilder.newTrigger;
21-
2219
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueAddRequest;
2320
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueRetryParameters;
2421
import com.google.appengine.api.urlfetch.URLFetchServicePb.URLFetchRequest;
22+
import com.google.appengine.api.urlfetch.dev.LocalURLFetchService;
2523
import com.google.appengine.tools.development.Clock;
2624
import com.google.appengine.tools.development.LocalServerEnvironment;
2725
import com.google.apphosting.utils.config.QueueXml;
2826
import com.google.common.flogger.GoogleLogger;
2927
import java.text.DecimalFormat;
30-
import java.time.Instant;
3128
import java.util.Date;
3229
import org.quartz.Job;
33-
import org.quartz.JobDataMap;
34-
import org.quartz.JobDetail;
3530
import org.quartz.JobExecutionContext;
3631
import org.quartz.JobExecutionException;
3732
import org.quartz.Scheduler;
3833
import org.quartz.SchedulerException;
3934
import org.quartz.SimpleTrigger;
4035
import org.quartz.Trigger;
41-
import org.quartz.TriggerKey;
4236

4337
/**
4438
* Quartz {@link Job} implementation that hits a url. The url to hit, the http method to invoke,
@@ -91,9 +85,8 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
9185
throw new JobExecutionException(
9286
"Interrupted while waiting for server to initialize.", e, false);
9387
}
94-
95-
UrlFetchJobDetail jd = new UrlFetchJobDetail(context.getJobDetail().getJobDataMap());
96-
88+
Trigger trigger = context.getTrigger();
89+
UrlFetchJobDetail jd = (UrlFetchJobDetail) context.getJobDetail();
9790
URLFetchRequest fetchReq =
9891
newFetchRequest(
9992
jd.getTaskName(),
@@ -111,11 +104,10 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
111104
if ((status < 200 || status > 299) && canRetry(jd, firstTryMs)) {
112105
logger.atInfo().log(
113106
"Web hook at %s returned status code %d. Rescheduling...", fetchReq.getUrl(), status);
114-
reschedule(context.getScheduler(), context.getTrigger(), jd, firstTryMs, status);
107+
reschedule(context.getScheduler(), trigger, jd, firstTryMs, status);
115108
} else {
116109
try {
117-
Trigger trigger = context.getTrigger();
118-
context.getScheduler().unscheduleJob(trigger.getKey());
110+
context.getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
119111
} catch (SchedulerException e) {
120112
logger.atSevere().withCause(e).log("Unsubscription of task %s failed.", jd.getAddRequest());
121113
}
@@ -149,29 +141,16 @@ private void reschedule(
149141
long firstTryMs,
150142
int previousResponse) {
151143
// Builds a new job.
152-
UrlFetchJobDetail newJobData = jd.retry(firstTryMs, previousResponse);
153-
JobDataMap newJobDataMap = newJobData.getJobDataMap();
144+
UrlFetchJobDetail newJobDetail = jd.retry(firstTryMs, previousResponse);
154145

155146
// Build the new trigger from the old trigger
156-
TriggerKey triggerKey = trigger.getKey();
157-
Instant newStartTime =
158-
Instant.ofEpochMilli(clock.getCurrentTime() + newJobData.getRetryDelayMs());
159-
160-
SimpleTrigger newTrigger =
161-
(SimpleTrigger)
162-
newTrigger().withIdentity(triggerKey).startAt(Date.from(newStartTime)).build();
163-
164-
JobDetail newJob =
165-
newJob(UrlFetchJob.class)
166-
.withIdentity(jd.getTaskName(), jd.getQueueName())
167-
.usingJobData(newJobDataMap)
168-
.build();
169-
147+
SimpleTrigger newTrigger = new SimpleTrigger(trigger.getName(), trigger.getGroup());
148+
newTrigger.setStartTime(new Date(clock.getCurrentTime() + newJobDetail.getRetryDelayMs()));
170149
try {
171-
// Quartz 2.x doesn't allow 2 jobs with the same name so we need to first
150+
// Quartz doesn't allow 2 jobs with the same name so we need to first
172151
// unschedule the currently executing job before we reschedule
173-
scheduler.unscheduleJob(triggerKey);
174-
scheduler.scheduleJob(newJob, newTrigger);
152+
scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());
153+
scheduler.scheduleJob(newJobDetail, newTrigger);
175154
} catch (SchedulerException e) {
176155
logger.atSevere().withCause(e).log("Reschedule of task %s failed.", jd.getAddRequest());
177156
}

0 commit comments

Comments
 (0)