Skip to content
Draft
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5cd3c0f
docs: add transformers resume design spec
kevssim Mar 27, 2026
91eeaeb
docs: refine transformers resume design spec
kevssim Mar 27, 2026
6eebda8
docs: trim resume state fields
kevssim Mar 27, 2026
cdd9c1b
docs: add npu resume compatibility requirements
kevssim Mar 27, 2026
1542492
chore: ignore local worktrees
kevssim Mar 27, 2026
9883118
wip
kevssim Mar 30, 2026
d41a634
wip
kevssim Mar 30, 2026
21f9918
wip
kevssim Mar 30, 2026
1e59531
fix
kevssim Mar 30, 2026
9bb3f39
wip
kevssim Mar 30, 2026
fdf1f71
fix
kevssim Mar 30, 2026
6cf5160
wip
kevssim Mar 30, 2026
144ffe6
Merge branch 'modelscope:main' into resume_from_ckpt
kevssim Mar 31, 2026
e21f870
lint
kevssim Mar 31, 2026
3359209
Merge branch 'resume_from_ckpt' of https://github.com/kevssim/twinkle…
kevssim Mar 31, 2026
70ebe50
wip
kevssim Mar 31, 2026
483778d
wip
kevssim Mar 31, 2026
039789b
wip
kevssim Mar 31, 2026
54de1a4
wip
kevssim Mar 31, 2026
920ab86
wip
kevssim Mar 31, 2026
ffd6304
lint
kevssim Mar 31, 2026
582bd41
wip
kevssim Mar 31, 2026
9cb6106
wip
kevssim Apr 1, 2026
c0cf72e
wip
kevssim Apr 1, 2026
505a75c
wip
kevssim Apr 1, 2026
a222b5b
fix
kevssim Apr 1, 2026
7499e00
wip
kevssim Apr 1, 2026
cd0b094
doc
kevssim Apr 1, 2026
abf2c2f
wip
kevssim Apr 1, 2026
8bf7a6a
lint
kevssim Apr 1, 2026
27e76c6
Merge remote-tracking branch 'origin/main' into resume_from_ckpt
kevssim Apr 2, 2026
5d68910
Merge remote-tracking branch 'origin' into resume_from_ckpt
kevssim Apr 16, 2026
9326e64
wip
kevssim Apr 16, 2026
670f0c1
feat: add resume_from_checkpoint abstract method to TwinkleModel base
kevssim Apr 21, 2026
784730c
feat(dataloader): add resume_from_checkpoint wrapping skip_consumed_s…
kevssim Apr 21, 2026
3db38e9
feat(transformers): replace load_training_state/read_training_progres…
kevssim Apr 21, 2026
94679d5
feat(megatron): add resume_from_checkpoint and save trainer_state.json
kevssim Apr 21, 2026
832ce87
refactor(cookbook): use model.resume_from_checkpoint API
kevssim Apr 21, 2026
e3a3cd6
feat(types): replace training state request types with ResumeFromChec…
kevssim Apr 21, 2026
a3effab
feat(server): replace training state endpoints with /resume_from_chec…
kevssim Apr 21, 2026
383336d
feat(client): replace training state methods with resume_from_checkpoint
kevssim Apr 21, 2026
54a1db6
docs: update checkpoint/resume documentation for unified API
kevssim Apr 21, 2026
597cbd9
fix: remove stale load_training_state references from __init__.py, mu…
kevssim Apr 21, 2026
c55ab9f
fix(transformers): pass correct file paths to _load_scaler_state and …
kevssim Apr 21, 2026
8f76b7b
fix: guard rng_state.pt existence check, add Config extra=allow to Re…
kevssim Apr 21, 2026
4ffa5c7
wip
kevssim Apr 21, 2026
0b43055
wip
kevssim Apr 21, 2026
c8bc9ab
wip
kevssim Apr 21, 2026
8c0399e
wip
kevssim Apr 21, 2026
94af275
Merge remote-tracking branch 'origin/main' into resume_from_ckpt
kevssim Apr 21, 2026
10b4a20
refactor: delete resume_utils.py, inline logic in fsdp2.py, update docs
kevssim Apr 21, 2026
3df191a
wip
kevssim Apr 23, 2026
deeb648
Merge remote-tracking branch 'origin/main' into resume_from_ckpt
kevssim Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions client_tools/client_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ def generate_models():
GetStateDictResponse,
GetTrainConfigsResponse,
SaveResponse,
TrainingProgressResponse,
)


Expand Down Expand Up @@ -617,6 +618,24 @@ def load(self, name: str, **kwargs) -> None:
)
response.raise_for_status()

def load_training_state(self, name: str, **kwargs) -> Dict[str, Any]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_training_state和read_training_progress什么区别,能否合并为一个呢

"""Load optimizer, scheduler, scaler, RNG, and progress metadata from a checkpoint."""
response = http_post(
url=f'{self.server_url}/load_training_state',
json_data={'name': name, 'adapter_name': self.adapter_name, **kwargs}
)
response.raise_for_status()
return TrainingProgressResponse(**response.json()).result

def read_training_progress(self, name: str, **kwargs) -> Dict[str, Any]:
"""Read progress-only checkpoint metadata for resume-only-model flows."""
response = http_post(
url=f'{self.server_url}/read_training_progress',
json_data={'name': name, 'adapter_name': self.adapter_name, **kwargs}
)
response.raise_for_status()
return TrainingProgressResponse(**response.json()).result

def apply_patch(self, patch_cls: str, **kwargs) -> None:
"""Apply a patch to the model."""
response = http_post(
Expand Down
24 changes: 18 additions & 6 deletions cookbook/client/twinkle/self_host/self_congnition.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,29 @@ def train():
# model.set_lr_scheduler('LinearLR')

# Step 6: Optionally resume from a previous checkpoint
consumed_train_samples = 0
global_step = 0
if resume_path:
logger.info(f'Resuming training from {resume_path}')
model.load(resume_path, load_optimizer=True)
logger.info(f'Resuming model weights from {resume_path}')
model.load(resume_path)
trainer_state = model.load_training_state(resume_path)
dataloader.skip_consumed_samples(trainer_state['consumed_train_samples'])
consumed_train_samples = int(trainer_state['consumed_train_samples'])
global_step = int(trainer_state['cur_step'])

# Step 7: Run the training loop
logger.info(model.get_train_configs().model_dump())

for epoch in range(3):
logger.info(f'Starting epoch {epoch}')
for step, batch in enumerate(dataloader):
for _, batch in enumerate(dataloader):
# Forward pass + backward pass (computes gradients)
model.forward_backward(inputs=batch)

# Step
model.clip_grad_and_step()
consumed_train_samples += len(batch)
global_step += 1
# Equal to the following steps:
# # Clip gradients to prevent exploding gradients (max norm = 1.0)
# model.clip_grad_norm(1.0)
Expand All @@ -125,13 +133,17 @@ def train():
# model.lr_step()

# Log the loss every 2 steps (aligned with gradient accumulation)
if step % 2 == 0:
if global_step % 2 == 0:
# Print metric
metric = model.calculate_metric(is_training=True)
logger.info(f'Current is step {step} of {len(dataloader)}, metric: {metric.result}')
logger.info(f'Current is step {global_step} of {len(dataloader)}, metric: {metric.result}')

# Step 8: Save the trained checkpoint
twinkle_path = model.save(name=f'twinkle-epoch-{epoch}', save_optimizer=True)
twinkle_path = model.save(
name=f'twinkle-epoch-{epoch}',
save_optimizer=True,
consumed_train_samples=consumed_train_samples,
)
logger.info(f'Saved checkpoint: {twinkle_path}')

# Step 9: Upload the checkpoint to ModelScope Hub
Expand Down
120 changes: 83 additions & 37 deletions cookbook/transformers/fsdp2.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pathlib import Path

from peft import LoraConfig
from tqdm import tqdm

Expand All @@ -8,77 +10,121 @@
from twinkle.model import TransformersModel
from twinkle.preprocessor import SelfCognitionProcessor

# Construct a device_mesh, fsdp_size=2, dp=4
device_mesh = DeviceMesh.from_sizes(fsdp_size=2, dp_size=4)
# use torchrun mode
twinkle.initialize(mode='local', global_device_mesh=device_mesh)
from resume_utils import resume_from_checkpoint

logger = get_logger()

MODEL_ID = 'ms://Qwen/Qwen3.5-4B'
DATASET_ID = 'ms://swift/self-cognition'
TEMPLATE_NAME = 'Qwen3_5Template'
MODEL_NAME = 'twinkle大模型'
MODEL_AUTHOR = 'ModelScope社区'
FSDP_SIZE = 2
DP_SIZE = 4
BATCH_SIZE = 8
LEARNING_RATE = 1e-4
GRADIENT_ACCUMULATION_STEPS = 2
LOG_INTERVAL = 20
EVAL_INTERVAL = 40
EVAL_SAMPLES = 100
TRAIN_SAMPLES = 1000

OUTPUT_DIR = './output/fsdp2'
RESUME_FROM_CHECKPOINT = None
RESUME_ONLY_MODEL = False
IGNORE_DATA_SKIP = False
ADAPTER_NAME = 'default'

# Construct a device_mesh
device_mesh = DeviceMesh.from_sizes(fsdp_size=FSDP_SIZE, dp_size=DP_SIZE)
# use torchrun mode
twinkle.initialize(mode='local', global_device_mesh=device_mesh)


def eval(model):
# 100 Samples
dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(100)))
dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B')
dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区'))
def build_dataset(num_samples: int) -> Dataset:
dataset = Dataset(dataset_meta=DatasetMeta(DATASET_ID, data_slice=range(num_samples)))
dataset.set_template(TEMPLATE_NAME, model_id=MODEL_ID)
dataset.map(SelfCognitionProcessor(MODEL_NAME, MODEL_AUTHOR))
dataset.encode()
dataloader = DataLoader(dataset=dataset, batch_size=8)
for step, batch in tqdm(enumerate(dataloader)):
return dataset


def save_checkpoint(model: TransformersModel, checkpoint_name: str, consumed_train_samples: int):
model.save(
checkpoint_name,
output_dir=OUTPUT_DIR,
adapter_name=ADAPTER_NAME,
save_optimizer=True,
consumed_train_samples=consumed_train_samples,
)


def evaluate(model):
dataloader = DataLoader(dataset=build_dataset(EVAL_SAMPLES), batch_size=BATCH_SIZE)
for batch in tqdm(dataloader):
model.forward_only(inputs=batch)
model.calculate_loss()
metrics = model.calculate_metric(is_training=False)
return metrics
return model.calculate_metric(is_training=False)


def train():
# 1000 samples
dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(1000)))
# Set template to prepare encoding
dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B')
# Preprocess the dataset to standard format
dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区'))
# Encode dataset
dataset.encode()
dataset = build_dataset(TRAIN_SAMPLES)
# Global batch size = 8, for GPUs, so 1 sample per GPU
dataloader = DataLoader(dataset=dataset, batch_size=8)
dataloader = DataLoader(dataset=dataset, batch_size=BATCH_SIZE)
# Use a TransformersModel
model = TransformersModel(model_id='ms://Qwen/Qwen3.5-4B')
model = TransformersModel(model_id=MODEL_ID)
model.model._no_split_modules = {'Qwen3_5DecoderLayer'}

lora_config = LoraConfig(r=8, lora_alpha=32, target_modules='all-linear')

# Add a lora to model, with name `default`
# Comment this to use full-parameter training
model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2)
model.add_adapter_to_model(ADAPTER_NAME, lora_config, gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS)
# Add Optimizer for lora `default`
model.set_optimizer(optimizer_cls='AdamW', lr=1e-4)
model.set_optimizer(optimizer_cls='AdamW', lr=LEARNING_RATE)
# Add LRScheduler for lora `default`
model.set_lr_scheduler(
scheduler_cls='CosineWarmupScheduler', num_warmup_steps=5, num_training_steps=len(dataloader))

consumed_train_samples = 0
if RESUME_FROM_CHECKPOINT:
checkpoint_path = Path(RESUME_FROM_CHECKPOINT).expanduser().resolve()
consumed_train_samples = resume_from_checkpoint(
model=model,
dataloader=dataloader,
checkpoint_path=checkpoint_path,
resume_only_model=RESUME_ONLY_MODEL,
ignore_data_skip=IGNORE_DATA_SKIP,
adapter_name=ADAPTER_NAME,
)

logger.info(get_device_placement())
# Print the training config
logger.info(model.get_train_configs())
logger.info(f'Total steps: {len(dataloader)}')
loss_metric = 99.0
optimizer_group = model.optimizer_group[ADAPTER_NAME]
best_loss = float('inf')
# lora: 8G * 8
# full: 18G * 8
for step, batch in enumerate(dataloader):
for batch in dataloader:
# Do forward and backward
model.forward_backward(inputs=batch)
# Step
model.clip_grad_and_step()
if step % 20 == 0:
consumed_train_samples += BATCH_SIZE
cur_step = optimizer_group.cur_step
if cur_step % LOG_INTERVAL == 0:
# Print metric
metric = model.calculate_metric(is_training=True)
logger.info(f'Current is step {step} of {len(dataloader)}, metric: {metric}')
if step > 0 and step % 40 == 0:
metrics = eval(model)
logger.info(f'Current is step {cur_step} of {len(dataloader)}, metric: {metric}')
if cur_step > 0 and cur_step % EVAL_INTERVAL == 0:
metrics = evaluate(model)
logger.info(f'Eval metric: {metrics}')
metrics['step'] = step
if loss_metric > float(metrics['loss']):
model.save(f'checkpoint-{step}')
loss_metric = float(metrics['loss'])
model.save(f'last-checkpoint')
metrics['step'] = cur_step
current_loss = float(metrics['loss'])
if current_loss < best_loss:
save_checkpoint(model, f'checkpoint-{cur_step}', consumed_train_samples)
best_loss = current_loss
save_checkpoint(model, 'last-checkpoint', consumed_train_samples)


if __name__ == '__main__':
Expand Down
55 changes: 55 additions & 0 deletions cookbook/transformers/resume_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pathlib import Path
from typing import Any, Optional

from twinkle import get_logger


logger = get_logger()


def _build_model_kwargs(adapter_name: str) -> dict:
if not adapter_name:
return {}
return {'adapter_name': adapter_name}


def resume_from_checkpoint(
model: Any,
dataloader: Any,
checkpoint_path: Path,
*,
resume_only_model: bool,
ignore_data_skip: bool,
adapter_name: Optional[str] = None) -> int:
adapter_name = adapter_name or ''
checkpoint_dir = str(checkpoint_path)
model_kwargs = _build_model_kwargs(adapter_name)
if model_kwargs:
# Load adapter checkpoint.
model.load(
name=checkpoint_path.name,
output_dir=str(checkpoint_path.parent),
**model_kwargs,
)

if resume_only_model:
# Only load model weights, optionally skip data.
if ignore_data_skip:
logger.info('Resumed weights only and restarted progress from step 0.')
return 0
progress = model.read_training_progress(checkpoint_dir, **model_kwargs)
# Skip consumed samples in dataloader and move optimizer to the right step.
consumed_train_samples = int(progress['consumed_train_samples'])
dataloader.skip_consumed_samples(consumed_train_samples)
optimizer_group = model.optimizer_group[adapter_name]
optimizer_group.cur_step = progress['cur_step']
optimizer_group.gradient_accumulation_steps = progress['gradient_accumulation_steps']
logger.info(f'Skipped {consumed_train_samples} consumed samples.')
return consumed_train_samples

# Load full training state, including model weights, optimizer states, and training progress.
trainer_state = model.load_training_state(checkpoint_dir, **model_kwargs)
consumed_train_samples = int(trainer_state['consumed_train_samples'])
dataloader.skip_consumed_samples(consumed_train_samples)
logger.info(f'Restored full training state from step {trainer_state["cur_step"]}.')
return consumed_train_samples
13 changes: 13 additions & 0 deletions docs/source_en/Components/Model/TransformersModel.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,16 @@ for data in dataloader:
model.forward_backward(...)
model.clip_grad_and_step(..., gradient_accumulation_steps=16)
```

## Checkpoint and Resume

`TransformersModel.save()` can save either weights only or a resumable training checkpoint.

- `model.save(name, save_optimizer=True, consumed_train_samples=...)` saves weights together with optimizer, scheduler, scaler, RNG, and `trainer_state.json`.
- `model.load(name, output_dir=..., adapter_name=...)` restores LoRA / adapter model weights.
- `model.read_training_progress(checkpoint_dir, ...)` reads checkpoint metadata such as `cur_step`, `gradient_accumulation_steps`, and `consumed_train_samples`.
Comment thread
kevssim marked this conversation as resolved.
Outdated
- `model.load_training_state(checkpoint_dir, ...)` restores optimizer-related state and returns the training progress dictionary.

For full-parameter training, restore model weights by constructing `TransformersModel` with the checkpoint path as `model_id`, for example `TransformersModel(model_id='./output/fsdp2/last-checkpoint')`, and then call `load_training_state(...)` to restore optimizer state and training progress.

For end-to-end resume logic, including dataloader skipping, refer to `cookbook/transformers/fsdp2.py` and `cookbook/transformers/resume_utils.py`.
Loading
Loading