Skip to content

feat: resume interrupted dataset generation runs (sync + async engine)#526

Open
przemekboruta wants to merge 14 commits intoNVIDIA-NeMo:mainfrom
przemekboruta:main
Open

feat: resume interrupted dataset generation runs (sync + async engine)#526
przemekboruta wants to merge 14 commits intoNVIDIA-NeMo:mainfrom
przemekboruta:main

Conversation

@przemekboruta
Copy link
Copy Markdown
Contributor

@przemekboruta przemekboruta commented Apr 13, 2026

Summary

Closes #525

Adds resume: bool = False to DataDesigner.create() and DatasetBuilder.build(). When resume=True, generation picks up from where the interrupted run left off — for both the sync and async engines.

dd = DataDesigner(...)
dd.add_column(...)

# First run — interrupted mid-way
results = dd.create(config_builder, num_records=10_000)

# After restart — picks up from the last completed batch/row-group
results = dd.create(config_builder, num_records=10_000, resume=True)

Changes

Layer Change
ArtifactStorage resume: bool = False field; resolved_dataset_name skips timestamp logic on resume; new clear_partial_results()
DatasetBatchManager.start() New start_batch and initial_actual_num_records params (default 0, no breakage)
DatasetBuilder.build() New resume param; _load_resume_state() reads and validates metadata.json; _build_with_resume() skips completed batches (sync); _build_async() skips completed row groups (async)
RowGroupBufferManager.__init__() New initial_actual_num_records and initial_total_num_batches params to seed counters on resume
DatasetBuilder._find_completed_row_group_ids() New helper — scans parquet-files/ for batch_*.parquet to determine which async row groups are already done
finalize_row_group closure Now writes incremental metadata.json after every row-group checkpoint (not just at the end), making all async runs resumable if interrupted
DataDesigner.create() Exposes resume, passes it through to ArtifactStorage and builder.build()

Validation and error cases

  • Missing metadata.jsonDatasetGenerationError (interrupted before any batch completed)
  • num_records mismatch → DatasetGenerationError
  • buffer_size mismatch → DatasetGenerationError
  • Dataset already complete → warning logged, returns existing path (both engines)

Test plan

  • test_resolved_dataset_name_resume_uses_existing_folder
  • test_resolved_dataset_name_resume_raises_when_no_existing_folder
  • test_resolved_dataset_name_resume_raises_when_folder_is_empty
  • test_clear_partial_results_removes_partial_folder
  • test_clear_partial_results_is_noop_when_no_partial_folder
  • test_start_with_start_batch
  • test_start_with_initial_actual_num_records
  • test_start_with_start_batch_and_initial_actual_num_records
  • test_start_default_values_unchanged
  • test_build_resume_raises_without_metadata
  • test_build_resume_raises_on_num_records_mismatch
  • test_build_resume_raises_on_buffer_size_mismatch
  • test_build_resume_logs_warning_when_already_complete
  • test_find_completed_row_group_ids_empty_dir
  • test_find_completed_row_group_ids_with_files
  • test_find_completed_row_group_ids_ignores_non_batch_files
  • test_build_async_resume_logs_warning_when_already_complete
  • test_build_async_resume_raises_without_metadata
  • test_initial_actual_num_records
  • test_initial_total_num_batches_reflected_in_metadata

- ArtifactStorage gains a `resume: bool = False` field
- resolved_dataset_name skips timestamp logic when resume=True,
  returning the existing dataset folder name as-is
- Raises ArtifactStorageError on resume=True when the target folder
  is absent or empty (no data to resume from)
- New clear_partial_results() removes in-flight partial results
  left over from an interrupted run

Fixes NVIDIA-NeMo#525
DatasetBatchManager.start() now accepts:
- start_batch: int = 0  — first batch index to process
- initial_actual_num_records: int = 0  — records already on disk

Both default to 0 so all existing call sites are unaffected.

Fixes NVIDIA-NeMo#525
- build() gains a resume: bool = False parameter
- _load_resume_state() reads metadata.json and validates that
  num_records and buffer_size match the original run
- _build_with_resume() skips completed batches, clears in-flight
  partial results, and continues from the first incomplete batch
- Raises DatasetGenerationError with clear messages for:
  - missing metadata.json (interrupted before first batch completes)
  - num_records mismatch
  - buffer_size mismatch
  - DATA_DESIGNER_ASYNC_ENGINE=1 (not yet supported)
- Logs a warning and returns early when dataset is already complete

Fixes NVIDIA-NeMo#525
- create() gains resume: bool = False
- _create_resource_provider() passes resume to ArtifactStorage
- builder.build() receives the resume flag

Fixes NVIDIA-NeMo#525
Covers:
- ArtifactStorage.resolved_dataset_name with resume=True
- ArtifactStorage.clear_partial_results()
- DatasetBatchManager.start() with start_batch and
  initial_actual_num_records
- DatasetBuilder.build(resume=True): missing metadata, num_records
  mismatch, buffer_size mismatch, already-complete detection

Fixes NVIDIA-NeMo#525
@przemekboruta przemekboruta requested a review from a team as a code owner April 13, 2026 11:15
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 13, 2026

Greptile Summary

This PR adds resume: bool = False to DataDesigner.create() and DatasetBuilder.build(), enabling generation to pick up from the last completed batch (sync) or row group (async) after an interrupted run. Both engines are now supported: the sync path reads num_completed_batches from metadata.json and seeds the batch loop with start_batch, while the async path derives ground-truth state from the filesystem via _find_completed_row_group_ids() — correctly handling the write-metadata crash window.

The two previously-flagged P1 issues (processors re-running on an already-complete dataset, and stale initial_actual_num_records from metadata in the async crash window) are now addressed: generated=False guards the run_after_generation call, and the async path sources both counters from the filesystem rather than from potentially-lagging metadata.

Confidence Score: 5/5

This PR is safe to merge — the resume logic is well-structured, the async crash-window is handled via filesystem truth rather than lagging metadata, and both previously-flagged P1 issues have been correctly resolved.

The sync path correctly seeds DatasetBatchManager with start_batch and initial_actual_num_records from metadata.json. The async path avoids the metadata lag by scanning parquet-files/batch_*.parquet directly, computing actual record counts from completed row-group IDs. The generated flag properly guards run_after_generation(), preventing destructive processor re-runs on already-complete datasets. Test coverage is thorough across both paths including the crash-window scenario.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Core resume logic: _build_with_resume (sync) and updated _build_async (async) both correctly gate run_after_generation behind the generated flag; async path correctly sources both counters from filesystem via _find_completed_row_group_ids; _load_resume_state correctly validates run-parameter compatibility.
packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py Adds resume field and clear_partial_results(); resolved_dataset_name correctly short-circuits timestamp logic on resume and raises ArtifactStorageError when no existing folder is found.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py Adds start_batch and initial_actual_num_records to start(); correctly applies them after reset() so subsequent state is seeded with resume values.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py Adds initial_actual_num_records and initial_total_num_batches constructor params to seed counters for resumed async runs; straightforward and correct.
packages/data-designer/src/data_designer/interface/data_designer.py Threads resume through to ArtifactStorage and builder.build(); public API changes are minimal and non-breaking.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Comprehensive coverage of resume paths including crash-window scenario, num_records/buffer_size mismatch, already-complete detection, and processor-skip guard; module-level imports appear mid-file (E402) but logic is sound.
packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py New tests cover all combinations of start_batch and initial_actual_num_records; default-unchanged test guards regressions.
packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py Tests cover resume flag's three states (existing folder, missing folder, empty folder) and clear_partial_results happy/noop paths.

Sequence Diagram

sequenceDiagram
    participant U as User
    participant DD as DataDesigner.create()
    participant AS as ArtifactStorage
    participant DB as DatasetBuilder.build()
    participant BM as DatasetBatchManager
    participant FS as Filesystem

    U->>DD: create(resume=True)
    DD->>AS: ArtifactStorage(resume=True)
    AS->>FS: check artifact_path/dataset_name exists?
    FS-->>AS: exists (resume) or raise ArtifactStorageError
    AS-->>DD: storage with resolved_dataset_name

    DD->>DB: build(resume=True)
    DB->>AS: clear_partial_results()
    AS->>FS: rmtree(tmp-partial-parquet-files/) if exists

    alt Sync path
        DB->>DB: _load_resume_state() → read metadata.json
        DB->>BM: start(num_records, buffer_size, start_batch=N, initial_actual_num_records=M)
        BM->>BM: reset() then set _current_batch_number=N, _actual_num_records=M
        loop batches N..total
            DB->>DB: _run_batch(batch_idx)
            DB->>BM: finish_batch()
            BM->>AS: write_metadata(num_completed_batches=batch_idx+1)
        end
        DB->>BM: finish()
        DB-->>DD: generated=True → run_after_generation()

    else Async path
        DB->>FS: glob(parquet-files/batch_*.parquet)
        FS-->>DB: completed_row_group_ids (filesystem truth)
        DB->>DB: compute initial_actual_num_records from completed ids
        DB->>BM: start(num_records, buffer_size, start_batch=0)
        loop row groups (skipping completed)
            DB->>BM: process row group
            DB->>AS: finalize_row_group → write_metadata (incremental)
        end
        DB-->>DD: generated=True → run_after_generation()
    end

    alt Already complete
        DB-->>DD: generated=False → skip run_after_generation()
    end
Loading

Reviews (8): Last reviewed commit: "fix(builder): derive initial_actual_num_..." | Re-trigger Greptile

…INE=1)

- Add _find_completed_row_group_ids() to scan parquet-files/ for already-written
  row groups by parsing batch_*.parquet filenames
- _build_async() now accepts resume=True: loads metadata, finds completed row groups,
  clears partial results, and logs progress; returns early if all row groups are done
- _prepare_async_run() accepts skip_row_groups, initial_actual_num_records, and
  initial_total_num_batches so the scheduler only processes remaining row groups
  and RowGroupBufferManager starts from the correct counts
- RowGroupBufferManager.__init__ gains initial_actual_num_records and
  initial_total_num_batches params to seed the counters on resume
- finalize_row_group closure now writes incremental metadata after each checkpoint
  so any run (resume or not) can be resumed if interrupted mid-way
- Remove the guard that rejected resume=True with DATA_DESIGNER_ASYNC_ENGINE=1
- Add tests for all new paths
@przemekboruta przemekboruta changed the title feat: resume interrupted dataset generation runs (sync engine) feat: resume interrupted dataset generation runs (sync + async engine) Apr 13, 2026
…set already complete

_build_with_resume and _build_async now return False when the dataset is already
complete (early-return path), True otherwise. build() skips
_processor_runner.run_after_generation() on False, preventing processors from
calling shutil.rmtree and rewriting an already-finalized dataset.

Fixes the issue raised in review: greptile P1 comment on PR NVIDIA-NeMo#526.
…sync resume

Metadata can lag by one row group if a crash occurs between
move_partial_result_to_final_file_path and write_metadata. Using
len(completed_ids) from the filesystem scan instead of
state.num_completed_batches ensures the final metadata reflects the
actual number of parquet files present, not the potentially stale
metadata count.
@github-actions
Copy link
Copy Markdown
Contributor

Issue #525 has been triaged. The linked issue check is being re-evaluated.

@andreatgretel andreatgretel added the agent-review Trigger agentic CI review label Apr 13, 2026
…efore first batch)

When a run is interrupted before any row group or batch completes, metadata.json
is never written. Previously resume=True would raise DatasetGenerationError in
this case. Now build() detects the missing file, logs an info message, clears
any leftover partial results and falls back to a clean fresh run.

This is the common scenario for small datasets (fewer records than buffer_size)
where all records fit in a single row group.
…ync resume

In the crash window (row group written to disk but write_metadata crashed before
updating the file), both initial_total_num_batches and initial_actual_num_records
now use the filesystem-discovered completed_ids as source of truth.  Previously
initial_actual_num_records was read from potentially stale metadata, causing
actual_num_records in the final metadata to be undercounted by one row group.

Also adds a test covering the partial-resume crash-window scenario.
@andreatgretel andreatgretel added the agent-review Trigger agentic CI review label Apr 16, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #526 — Resume interrupted dataset generation runs (sync + async engine)

Summary

This PR adds a resume: bool = False parameter to DataDesigner.create() and DatasetBuilder.build(), enabling users to resume interrupted dataset generation from the last completed batch (sync) or row group (async). The implementation touches 5 source files and 4 test files across the data-designer-engine and data-designer packages.

Scope: ~860 additions, ~16 deletions across 10 files (including a plan doc and comprehensive tests).

The feature is well-designed: it leverages existing metadata.json checkpoints, validates run-parameter compatibility, handles edge cases (already-complete, no-metadata, parameter mismatch), and correctly separates the sync and async resume paths. The plan diverged from implementation in a positive way — the async engine now supports resume (the plan initially deferred it).

Findings

High Severity

(H1) _load_resume_state return value discarded in async resume path
dataset_builder.py:411 — In _build_async, when resume=True, the call self._load_resume_state(num_records, buffer_size) is made for validation only — the returned _ResumeState is discarded. This is intentional (the async path derives state from the filesystem instead), but it's confusing. The validation-only intent should be made explicit, e.g. by extracting a _validate_resume_params() method or assigning to _ with a comment. As-is, a future maintainer might remove the "unused" call and break parameter validation for async resume.

Medium Severity

(M1) _find_completed_row_group_ids parses batch filenames with split("_", 1)[1]
dataset_builder.py:381 — The glob pattern is batch_*.parquet and the ID is extracted via p.stem.split("_", 1)[1]. This works for batch_00000"00000"int("00000") = 0. However, if a file like batch_00000_extra.parquet appeared (e.g., from a future format change), split("_", 1)[1] would yield "00000_extra" and int() would raise ValueError, which is caught. This is acceptable but fragile. Consider using a regex r"^batch_(\d+)$" on the stem for robustness.

(M2) initial_actual_num_records calculation assumes uniform batch sizes
dataset_builder.py:418-420 — The async resume path computes initial_actual_num_records as:

sum(min(buffer_size, num_records - rg_id * buffer_size) for rg_id in completed_ids)

This formula assumes each row group was written with exactly min(buffer_size, remaining) rows, ignoring dropped rows. If the original run dropped rows within a row group (e.g., due to generation failures), the actual count would be lower. However, actual_num_records in the sync path also counts written records (not requested), and the metadata from write_metadata stores the true post-drop count. This means the filesystem-derived count may overestimate vs. what was actually written. The comment at line 414 acknowledges metadata may lag, but the formula's assumption about no drops could lead to inflated actual_num_records in the final metadata when some rows were dropped in completed groups.

(M3) batch_manager.start() calls reset() which deletes files on resume path
dataset_batch_manager.py:177start() calls self.reset() which sets _current_batch_number = 0 and _actual_num_records = 0, then immediately overrides them. The reset(delete_files=False) call is harmless here (it doesn't delete files), but it does zero-out internal state that's immediately overwritten. While functionally correct, this coupling is subtle — if reset() ever gains side effects beyond zeroing counters, the resume path would break silently.

Low Severity

(L1) Plan/implementation divergence: async engine support
The plan document (plans/525/resume-interrupted-runs.md) states in the Design Decisions table: "Async engine: Raise DatasetGenerationError if DATA_DESIGNER_ASYNC_ENGINE=1 with resume=True" and in Trade-offs: "Resume support for async engine: deferred to a follow-up." The implementation fully supports async resume. The plan should be updated to reflect the actual implementation.

(L2) _ResumeState.buffer_size field is redundant
dataset_builder.py:91_ResumeState stores buffer_size but it's always set to the same buffer_size parameter that was already validated. The field is never read after construction in _build_with_resume — the method uses the buffer_size parameter directly. The field could be removed to avoid confusion.

(L3) Incremental metadata writes add I/O overhead to async engine
dataset_builder.py:443write_metadata is now called after every row group checkpoint in finalize_row_group. For large datasets with many small row groups, this adds per-row-group disk I/O. The trade-off (resumability vs. performance) is reasonable, but worth noting in documentation or the PR description. The final write_metadata call at line 478 is documented as redundant ("overwrites the last incremental write with identical content") — good.

(L4) Test file has mid-file imports
test_dataset_builder.py:927-429 — The resume test section re-imports json, Path, and ArtifactStorage with underscore-prefixed aliases (_json, _Path, _ArtifactStorage) mid-file. While this works, it's unconventional and potentially confusing. Standard practice is to add imports at the top of the file.

(L5) No validation of start_batch or initial_actual_num_records bounds
dataset_batch_manager.py:165-166 — The new start_batch and initial_actual_num_records parameters have no validation (e.g., start_batch >= 0, start_batch <= num_batches, initial_actual_num_records >= 0). Since these are only called from internal resume code that validates upstream, this is acceptable — but defensive checks would prevent misuse if the method is called from new paths in the future.

Positive Observations

  • Comprehensive test coverage: 20+ new test cases covering validation errors, already-complete detection, async/sync paths, filesystem-vs-metadata crash window scenarios, and processor non-invocation on skip.
  • Clean separation of sync/async resume: The sync path uses _build_with_resume with DatasetBatchManager, while the async path extends _build_async with skip_row_groups and filesystem-based counters. No shared mutable state between the two paths.
  • Filesystem as source of truth for async: The decision to derive initial_actual_num_records from the filesystem rather than potentially-stale metadata (lines 414-420) handles the crash window correctly and is well-documented.
  • Graceful degradation for missing metadata: The build() method at line 188 handles the case where metadata.json is missing (interrupted before any batch completed) by logging and restarting fresh, rather than raising an error. This is a UX improvement over the plan's original "raise error" approach.
  • No breaking changes: All new parameters default to their pre-existing behavior (resume=False, start_batch=0, initial_actual_num_records=0).
  • Incremental metadata writes enable async resumability — a meaningful improvement over the plan's deferred-async-resume decision.

Verdict

Approve with suggestions. The implementation is solid, well-tested, and handles edge cases thoughtfully. The high-severity finding (H1) is a readability/maintainability concern rather than a correctness bug — the discarded return value works because _load_resume_state raises on validation failure. The medium-severity findings (M1-M3) are minor robustness concerns. None of these block merging, but H1 and M2 are worth addressing before or shortly after merge.

@github-actions github-actions bot removed the agent-review Trigger agentic CI review label Apr 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: resume interrupted dataset generation runs

2 participants