Skip to content

Commit b42a645

Browse files
luxlCopilot
authored andcommitted
fix: prevent circular dependency and wrong startNodes in backfill dependent workflow
- Problem 1: Replace single-level self-dep check with ThreadLocal visited set to detect and skip indirect circular dependencies (A→B→A), preventing StackOverflowError when allLevelDependent=true - Problem 2: Set startNodes=null for downstream workflows; upstream task node codes are not valid in a different workflow definition - Add tests for OFFLINE skip, not-found skip, and startNodes null assertion Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 8073719 commit b42a645

2 files changed

Lines changed: 173 additions & 75 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java

Lines changed: 94 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import java.time.ZonedDateTime;
4242
import java.util.ArrayList;
4343
import java.util.Collections;
44+
import java.util.HashSet;
4445
import java.util.List;
46+
import java.util.Set;
4547
import java.util.stream.Collectors;
4648

4749
import lombok.extern.slf4j.Slf4j;
@@ -55,6 +57,8 @@
5557
@Component
5658
public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate<BackfillWorkflowDTO, List<Integer>> {
5759

60+
private static final ThreadLocal<Set<Long>> BACKFILL_VISITING_WORKFLOWS = new ThreadLocal<>();
61+
5862
@Autowired
5963
private CommandDao commandDao;
6064

@@ -183,89 +187,104 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
183187

184188
private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
185189
final List<String> backfillTimeList) {
186-
// 1) Query downstream dependent workflows for the current workflow
187-
final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition();
188-
final long upstreamWorkflowCode = upstreamWorkflow.getCode();
189-
190-
List<DependentWorkflowDefinition> downstreamDefinitions =
191-
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
192-
193-
if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
194-
log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode);
195-
return;
190+
final boolean isRootCall = BACKFILL_VISITING_WORKFLOWS.get() == null;
191+
if (isRootCall) {
192+
final Set<Long> visitedCodes = new HashSet<>();
193+
visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
194+
BACKFILL_VISITING_WORKFLOWS.set(visitedCodes);
196195
}
196+
try {
197+
final Set<Long> visitedCodes = BACKFILL_VISITING_WORKFLOWS.get();
197198

198-
// 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream
199-
// backfill
200-
final List<ZonedDateTime> upstreamBackfillDates = backfillTimeList.stream()
201-
.map(DateUtils::stringToZoneDateTime)
202-
.collect(Collectors.toList());
199+
// 1) Query downstream dependent workflows for the current workflow
200+
final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition();
201+
final long upstreamWorkflowCode = upstreamWorkflow.getCode();
203202

204-
// 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO
205-
for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) {
206-
long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode();
203+
List<DependentWorkflowDefinition> downstreamDefinitions =
204+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
207205

208-
// Prevent self dependency
209-
if (downstreamCode == upstreamWorkflowCode) {
210-
log.warn("Skip self dependent workflow {}", downstreamCode);
211-
continue;
206+
if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
207+
log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode);
208+
return;
212209
}
213210

214-
WorkflowDefinition downstreamWorkflow =
215-
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
216-
if (downstreamWorkflow == null) {
217-
log.warn("Skip dependent workflow {}, definition not found", downstreamCode);
218-
continue;
211+
// 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream
212+
// backfill
213+
final List<ZonedDateTime> upstreamBackfillDates = backfillTimeList.stream()
214+
.map(DateUtils::stringToZoneDateTime)
215+
.collect(Collectors.toList());
216+
217+
// 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO
218+
for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) {
219+
long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode();
220+
221+
// Prevent self-dependency and circular dependency chains
222+
if (visitedCodes.contains(downstreamCode)) {
223+
log.warn("Skip circular dependent workflow {}", downstreamCode);
224+
continue;
225+
}
226+
227+
WorkflowDefinition downstreamWorkflow =
228+
workflowDefinitionDao.queryByCode(downstreamCode).orElse(null);
229+
if (downstreamWorkflow == null) {
230+
log.warn("Skip dependent workflow {}, definition not found", downstreamCode);
231+
continue;
232+
}
233+
234+
if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
235+
log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode);
236+
continue;
237+
}
238+
239+
// Currently, reuse the same business date list as upstream for downstream backfill;
240+
// later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition
241+
// (taskParams).
242+
BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
243+
boolean allLevelDependent = originalParams.isAllLevelDependent();
244+
ComplementDependentMode downstreamDependentMode =
245+
allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
246+
247+
BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder()
248+
.runMode(originalParams.getRunMode())
249+
.backfillDateList(upstreamBackfillDates)
250+
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
251+
// Control whether downstream will continue triggering its own dependencies based on
252+
// allLevelDependent flag
253+
.backfillDependentMode(downstreamDependentMode)
254+
.allLevelDependent(allLevelDependent)
255+
.executionOrder(originalParams.getExecutionOrder())
256+
.build();
257+
258+
BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
259+
.loginUser(backfillWorkflowDTO.getLoginUser())
260+
.workflowDefinition(downstreamWorkflow)
261+
.startNodes(null)
262+
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
263+
.taskDependType(backfillWorkflowDTO.getTaskDependType())
264+
.execType(backfillWorkflowDTO.getExecType())
265+
.warningType(backfillWorkflowDTO.getWarningType())
266+
.warningGroupId(downstreamWorkflow.getWarningGroupId())
267+
.runMode(dependentParams.getRunMode())
268+
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
269+
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
270+
.tenantCode(backfillWorkflowDTO.getTenantCode())
271+
.environmentCode(backfillWorkflowDTO.getEnvironmentCode())
272+
.startParamList(backfillWorkflowDTO.getStartParamList())
273+
.dryRun(backfillWorkflowDTO.getDryRun())
274+
.backfillParams(dependentParams)
275+
.build();
276+
277+
log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}",
278+
downstreamCode, upstreamWorkflowCode, backfillTimeList);
279+
280+
// 4) Mark as visiting before recursive trigger to detect cycles
281+
visitedCodes.add(downstreamCode);
282+
execute(dependentBackfillDTO);
219283
}
220-
221-
if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
222-
log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode);
223-
continue;
284+
} finally {
285+
if (isRootCall) {
286+
BACKFILL_VISITING_WORKFLOWS.remove();
224287
}
225-
226-
// Currently, reuse the same business date list as upstream for downstream backfill;
227-
// later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition
228-
// (taskParams).
229-
BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
230-
boolean allLevelDependent = originalParams.isAllLevelDependent();
231-
ComplementDependentMode downstreamDependentMode =
232-
allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;
233-
234-
BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder()
235-
.runMode(originalParams.getRunMode())
236-
.backfillDateList(upstreamBackfillDates)
237-
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
238-
// Control whether downstream will continue triggering its own dependencies based on
239-
// allLevelDependent flag
240-
.backfillDependentMode(downstreamDependentMode)
241-
.allLevelDependent(allLevelDependent)
242-
.executionOrder(originalParams.getExecutionOrder())
243-
.build();
244-
245-
BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
246-
.loginUser(backfillWorkflowDTO.getLoginUser())
247-
.workflowDefinition(downstreamWorkflow)
248-
.startNodes(backfillWorkflowDTO.getStartNodes())
249-
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
250-
.taskDependType(backfillWorkflowDTO.getTaskDependType())
251-
.execType(backfillWorkflowDTO.getExecType())
252-
.warningType(backfillWorkflowDTO.getWarningType())
253-
.warningGroupId(downstreamWorkflow.getWarningGroupId())
254-
.runMode(dependentParams.getRunMode())
255-
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
256-
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
257-
.tenantCode(backfillWorkflowDTO.getTenantCode())
258-
.environmentCode(backfillWorkflowDTO.getEnvironmentCode())
259-
.startParamList(backfillWorkflowDTO.getStartParamList())
260-
.dryRun(backfillWorkflowDTO.getDryRun())
261-
.backfillParams(dependentParams)
262-
.build();
263-
264-
log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}",
265-
downstreamCode, upstreamWorkflowCode, backfillTimeList);
266-
267-
// 4) Recursively call execute to trigger downstream backfill
268-
execute(dependentBackfillDTO);
269288
}
270289
}
271290
}

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ public void testDoBackfillDependentWorkflow_WithDownstream_AllLevelDependent() t
169169
Assertions.assertEquals(ComplementDependentMode.ALL_DEPENDENT, capturedParams.getBackfillDependentMode());
170170
Assertions.assertTrue(capturedParams.isAllLevelDependent());
171171
Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size());
172+
Assertions.assertNull(captured.getStartNodes());
172173
}
173174

174175
@Test
@@ -232,5 +233,83 @@ public void testDoBackfillDependentWorkflow_WithDownstream_SingleLevelDependent(
232233
Assertions.assertEquals(ComplementDependentMode.OFF_MODE, capturedParams.getBackfillDependentMode());
233234
Assertions.assertFalse(capturedParams.isAllLevelDependent());
234235
Assertions.assertEquals(backfillTimeList.size(), capturedParams.getBackfillDateList().size());
236+
Assertions.assertNull(captured.getStartNodes());
237+
}
238+
239+
@Test
240+
public void testDoBackfillDependentWorkflow_SkipWorkflowNotFound() throws Exception {
241+
long upstreamCode = 1000L;
242+
long downstreamCode = 2000L;
243+
244+
WorkflowDefinition upstreamWorkflow =
245+
WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build();
246+
247+
BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder()
248+
.runMode(RunMode.RUN_MODE_SERIAL)
249+
.backfillDateList(Collections.<ZonedDateTime>emptyList())
250+
.backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT)
251+
.allLevelDependent(true)
252+
.executionOrder(ExecutionOrder.ASC_ORDER)
253+
.build();
254+
255+
BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder()
256+
.workflowDefinition(upstreamWorkflow)
257+
.backfillParams(params)
258+
.build();
259+
260+
DependentWorkflowDefinition dep = new DependentWorkflowDefinition();
261+
dep.setWorkflowDefinitionCode(downstreamCode);
262+
263+
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode))
264+
.thenReturn(Collections.singletonList(dep));
265+
when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.empty());
266+
267+
Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod(
268+
"doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class);
269+
method.setAccessible(true);
270+
271+
method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"));
272+
273+
verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any());
274+
}
275+
276+
@Test
277+
public void testDoBackfillDependentWorkflow_SkipOfflineWorkflow() throws Exception {
278+
long upstreamCode = 3000L;
279+
long downstreamCode = 4000L;
280+
281+
WorkflowDefinition upstreamWorkflow =
282+
WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build();
283+
284+
WorkflowDefinition offlineDownstream =
285+
WorkflowDefinition.builder().code(downstreamCode).releaseState(ReleaseState.OFFLINE).build();
286+
287+
BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder()
288+
.runMode(RunMode.RUN_MODE_SERIAL)
289+
.backfillDateList(Collections.<ZonedDateTime>emptyList())
290+
.backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT)
291+
.allLevelDependent(true)
292+
.executionOrder(ExecutionOrder.ASC_ORDER)
293+
.build();
294+
295+
BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder()
296+
.workflowDefinition(upstreamWorkflow)
297+
.backfillParams(params)
298+
.build();
299+
300+
DependentWorkflowDefinition dep = new DependentWorkflowDefinition();
301+
dep.setWorkflowDefinitionCode(downstreamCode);
302+
303+
when(workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamCode))
304+
.thenReturn(Collections.singletonList(dep));
305+
when(workflowDefinitionDao.queryByCode(downstreamCode)).thenReturn(Optional.of(offlineDownstream));
306+
307+
Method method = BackfillWorkflowExecutorDelegate.class.getDeclaredMethod(
308+
"doBackfillDependentWorkflow", BackfillWorkflowDTO.class, List.class);
309+
method.setAccessible(true);
310+
311+
method.invoke(backfillWorkflowExecutorDelegate, dto, Collections.singletonList("2026-02-01 00:00:00"));
312+
313+
verify(backfillWorkflowExecutorDelegate, never()).execute(org.mockito.ArgumentMatchers.any());
235314
}
236315
}

0 commit comments

Comments
 (0)