diff --git a/cmd/kafscale-cli/main.go b/cmd/kafscale-cli/main.go new file mode 100644 index 0000000..d8c9921 --- /dev/null +++ b/cmd/kafscale-cli/main.go @@ -0,0 +1,379 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "path" + "slices" + "strconv" + "strings" + "time" + + metadatapb "github.com/KafScale/platform/pkg/gen/metadata" + "github.com/KafScale/platform/pkg/metadata" + "github.com/KafScale/platform/pkg/protocol" + "github.com/KafScale/platform/pkg/storage" +) + +var ( + newS3Client = storage.NewS3Client + newEtcdStore = metadata.NewEtcdStore + newMemoryS3 = func() storage.S3Client { return storage.NewMemoryS3Client() } +) + +type restoreConfig struct { + SourceTopic string + SourceNamespace string + TargetTopic string + TargetNamespace string + RestoreTo time.Time + Partitions []int32 +} + +func main() { + ctx := context.Background() + if err := run(ctx, os.Args[1:], os.Stdout, os.Stderr); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func run(ctx context.Context, args []string, stdout io.Writer, stderr io.Writer) error { + if len(args) == 0 { + writeUsage(stderr) + return fmt.Errorf("command required") + } + switch args[0] { + case "restore": + return runRestoreCommand(ctx, args[1:], stdout) + case "-h", "--help", "help": + writeUsage(stdout) + return nil + default: + writeUsage(stderr) + return fmt.Errorf("unknown command %q", args[0]) + } +} + +func writeUsage(w io.Writer) { + _, _ = fmt.Fprintln(w, "usage: kafscale-cli restore --topic --target-topic --to ") +} + +func runRestoreCommand(ctx context.Context, args []string, stdout io.Writer) error { + fs := flag.NewFlagSet("restore", flag.ContinueOnError) + fs.SetOutput(io.Discard) + + var ( + sourceTopic = fs.String("topic", "", "Source topic to recover from") + sourceNamespace = fs.String("namespace", envOrDefault("KAFSCALE_NAMESPACE", "default"), "Source namespace") + targetTopic = fs.String("target-topic", "", "Target topic to create and populate") + targetNamespace = fs.String("target-namespace", "", "Target namespace (defaults to source namespace)") + restoreToRaw = fs.String("to", "", "Restore cutoff in RFC3339 format") + partitionsRaw = fs.String("partitions", "", "Optional comma-separated partition list") + ) + if err := fs.Parse(args); err != nil { + return err + } + if *sourceTopic == "" { + return fmt.Errorf("--topic is required") + } + if *targetTopic == "" { + return fmt.Errorf("--target-topic is required") + } + if *restoreToRaw == "" { + return fmt.Errorf("--to is required") + } + restoreTo, err := time.Parse(time.RFC3339, *restoreToRaw) + if err != nil { + return fmt.Errorf("parse --to: %w", err) + } + partitions, err := parsePartitions(*partitionsRaw) + if err != nil { + return err + } + if *targetNamespace == "" { + *targetNamespace = *sourceNamespace + } + + s3Client, err := s3ClientFromEnv(ctx) + if err != nil { + return err + } + + store, err := metadataStoreFromEnv(ctx) + if err != nil { + return err + } + defer func() { _ = store.EtcdClient().Close() }() + + return executeRestore(ctx, stdout, restoreConfig{ + SourceTopic: *sourceTopic, + SourceNamespace: *sourceNamespace, + TargetTopic: *targetTopic, + TargetNamespace: *targetNamespace, + RestoreTo: restoreTo, + Partitions: partitions, + }, s3Client, store) +} + +func s3ClientFromEnv(ctx context.Context) (storage.S3Client, error) { + if envBoolDefault("KAFSCALE_USE_MEMORY_S3", false) { + return newMemoryS3(), nil + } + return newS3Client(ctx, storage.S3Config{ + Bucket: strings.TrimSpace(os.Getenv("KAFSCALE_S3_BUCKET")), + Region: strings.TrimSpace(os.Getenv("KAFSCALE_S3_REGION")), + Endpoint: strings.TrimSpace(os.Getenv("KAFSCALE_S3_ENDPOINT")), + ForcePathStyle: envBoolDefault("KAFSCALE_S3_PATH_STYLE", strings.TrimSpace(os.Getenv("KAFSCALE_S3_ENDPOINT")) != ""), + AccessKeyID: strings.TrimSpace(os.Getenv("KAFSCALE_S3_ACCESS_KEY")), + SecretAccessKey: strings.TrimSpace(os.Getenv("KAFSCALE_S3_SECRET_KEY")), + SessionToken: strings.TrimSpace(os.Getenv("KAFSCALE_S3_SESSION_TOKEN")), + KMSKeyARN: strings.TrimSpace(os.Getenv("KAFSCALE_S3_KMS_ARN")), + MaxConnections: envIntDefault("KAFSCALE_S3_CONCURRENCY", 16), + }) +} + +func metadataStoreFromEnv(ctx context.Context) (*metadata.EtcdStore, error) { + return newEtcdStore(ctx, metadata.ClusterMetadata{}, metadata.EtcdStoreConfig{ + Endpoints: splitCSV(strings.TrimSpace(os.Getenv("KAFSCALE_ETCD_ENDPOINTS"))), + Username: strings.TrimSpace(os.Getenv("KAFSCALE_ETCD_USERNAME")), + Password: strings.TrimSpace(os.Getenv("KAFSCALE_ETCD_PASSWORD")), + DialTimeout: 5 * time.Second, + }) +} + +func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3Client storage.S3Client, store *metadata.EtcdStore) error { + if store == nil { + return fmt.Errorf("metadata store required") + } + if s3Client == nil { + return fmt.Errorf("s3 client required") + } + + sourceMeta, err := store.Metadata(ctx, []string{cfg.SourceTopic}) + if err != nil { + return err + } + if len(sourceMeta.Topics) == 0 || sourceMeta.Topics[0].ErrorCode != 0 { + return metadata.ErrUnknownTopic + } + + sourcePartitions := make(map[int32]struct{}, len(sourceMeta.Topics[0].Partitions)) + for _, partition := range sourceMeta.Topics[0].Partitions { + sourcePartitions[partition.Partition] = struct{}{} + } + for _, partition := range cfg.Partitions { + if _, ok := sourcePartitions[partition]; !ok { + return fmt.Errorf("partition %d not present in source topic %s", partition, cfg.SourceTopic) + } + } + + sourceCfg, err := store.FetchTopicConfig(ctx, cfg.SourceTopic) + if err != nil { + return err + } + + if _, err := store.CreateTopic(ctx, metadata.TopicSpec{ + Name: cfg.TargetTopic, + NumPartitions: sourceCfg.Partitions, + ReplicationFactor: int16(sourceCfg.ReplicationFactor), + }); err != nil { + return err + } + + targetCfg := cloneTopicConfig(sourceCfg) + targetCfg.Name = cfg.TargetTopic + targetCfg.CreatedAt = time.Now().UTC().Format(time.RFC3339) + if err := store.UpdateTopicConfig(ctx, targetCfg); err != nil { + return err + } + + result, err := storage.RecoverTopicToTimestamp(ctx, s3Client, storage.TopicRecoveryConfig{ + SourceNamespace: cfg.SourceNamespace, + SourceTopic: cfg.SourceTopic, + TargetNamespace: cfg.TargetNamespace, + TargetTopic: cfg.TargetTopic, + RestoreTo: cfg.RestoreTo, + Partitions: cfg.Partitions, + }) + if err != nil { + return err + } + + targetMeta, err := store.Metadata(ctx, []string{cfg.TargetTopic}) + if err != nil { + return err + } + if len(targetMeta.Topics) == 0 || targetMeta.Topics[0].ErrorCode != 0 { + return metadata.ErrUnknownTopic + } + + recoveredByPartition := make(map[int32]storage.RecoveredPartition, len(result.Partitions)) + for _, partition := range result.Partitions { + recoveredByPartition[partition.Partition] = partition + if partition.LastOffset >= 0 { + if err := store.UpdateOffsets(ctx, cfg.TargetTopic, partition.Partition, partition.LastOffset); err != nil { + return err + } + } + } + + if err := writePartitionStates(ctx, store, cfg.TargetTopic, targetMeta.Topics[0].Partitions, recoveredByPartition); err != nil { + return err + } + + _, _ = fmt.Fprintf(stdout, "restored %s to %s up to %s\n", cfg.SourceTopic, cfg.TargetTopic, cfg.RestoreTo.UTC().Format(time.RFC3339)) + for _, partition := range result.Partitions { + _, _ = fmt.Fprintf(stdout, "partition=%d segments=%d last_offset=%d\n", partition.Partition, partition.SegmentsCopied, partition.LastOffset) + } + return nil +} + +func writePartitionStates(ctx context.Context, store *metadata.EtcdStore, topic string, partitions []protocol.MetadataPartition, recovered map[int32]storage.RecoveredPartition) error { + for _, partition := range partitions { + summary, ok := recovered[partition.Partition] + state := &metadatapb.PartitionState{ + Topic: topic, + Partition: partition.Partition, + LeaderBroker: fmt.Sprintf("%d", partition.Leader), + LeaderEpoch: partition.LeaderEpoch, + } + if ok && summary.LastOffset >= 0 { + state.LogEndOffset = summary.LastOffset + 1 + state.HighWatermark = summary.LastOffset + 1 + if len(summary.Segments) > 0 { + last := summary.Segments[len(summary.Segments)-1] + state.ActiveSegment = path.Base(last.TargetKey) + state.Segments = make([]*metadatapb.SegmentInfo, 0, len(summary.Segments)) + for _, segment := range summary.Segments { + state.Segments = append(state.Segments, &metadatapb.SegmentInfo{ + BaseOffset: segment.BaseOffset, + SizeBytes: segment.SizeBytes, + CreatedAt: segment.CreatedAt.UTC().Format(time.RFC3339), + }) + } + } + } + payload, err := metadata.EncodePartitionState(state) + if err != nil { + return err + } + putCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + _, err = store.EtcdClient().Put(putCtx, metadata.PartitionStateKey(topic, partition.Partition), string(payload)) + cancel() + if err != nil { + return err + } + } + return nil +} + +func cloneTopicConfig(cfg *metadatapb.TopicConfig) *metadatapb.TopicConfig { + if cfg == nil { + return nil + } + cloned := &metadatapb.TopicConfig{ + Name: cfg.Name, + Partitions: cfg.Partitions, + ReplicationFactor: cfg.ReplicationFactor, + RetentionMs: cfg.RetentionMs, + RetentionBytes: cfg.RetentionBytes, + SegmentBytes: cfg.SegmentBytes, + CreatedAt: cfg.CreatedAt, + Config: make(map[string]string, len(cfg.Config)), + } + for key, value := range cfg.Config { + cloned.Config[key] = value + } + return cloned +} + +func parsePartitions(raw string) ([]int32, error) { + if strings.TrimSpace(raw) == "" { + return nil, nil + } + parts := splitCSV(raw) + out := make([]int32, 0, len(parts)) + seen := make(map[int32]struct{}, len(parts)) + for _, part := range parts { + parsed, err := strconv.ParseInt(part, 10, 32) + if err != nil { + return nil, fmt.Errorf("parse partition %q: %w", part, err) + } + val := int32(parsed) + if _, ok := seen[val]; ok { + continue + } + seen[val] = struct{}{} + out = append(out, val) + } + slices.Sort(out) + return out, nil +} + +func splitCSV(raw string) []string { + if strings.TrimSpace(raw) == "" { + return nil + } + parts := strings.Split(raw, ",") + out := make([]string, 0, len(parts)) + for _, part := range parts { + val := strings.TrimSpace(part) + if val != "" { + out = append(out, val) + } + } + return out +} + +func envOrDefault(key string, fallback string) string { + if val := strings.TrimSpace(os.Getenv(key)); val != "" { + return val + } + return fallback +} + +func envBoolDefault(key string, fallback bool) bool { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return fallback + } + switch strings.ToLower(raw) { + case "1", "true", "yes", "on": + return true + case "0", "false", "no", "off": + return false + default: + return fallback + } +} + +func envIntDefault(key string, fallback int) int { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return fallback + } + val, err := strconv.Atoi(raw) + if err != nil || val <= 0 { + return fallback + } + return val +} diff --git a/cmd/kafscale-cli/main_test.go b/cmd/kafscale-cli/main_test.go new file mode 100644 index 0000000..0a547a9 --- /dev/null +++ b/cmd/kafscale-cli/main_test.go @@ -0,0 +1,367 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "errors" + "strings" + "testing" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/KafScale/platform/internal/testutil" + metadatapb "github.com/KafScale/platform/pkg/gen/metadata" + "github.com/KafScale/platform/pkg/metadata" + "github.com/KafScale/platform/pkg/protocol" + "github.com/KafScale/platform/pkg/storage" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestRunHelpAndUnknownCommand(t *testing.T) { + var stdout bytes.Buffer + if err := run(context.Background(), []string{"help"}, &stdout, &bytes.Buffer{}); err != nil { + t.Fatalf("run help: %v", err) + } + if !strings.Contains(stdout.String(), "usage: kafscale-cli restore") { + t.Fatalf("unexpected help output: %s", stdout.String()) + } + + err := run(context.Background(), []string{"wat"}, &bytes.Buffer{}, &bytes.Buffer{}) + if err == nil || !strings.Contains(err.Error(), "unknown command") { + t.Fatalf("expected unknown command error, got %v", err) + } +} + +func TestRunRestoreCommandUsesInjectedClients(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + store, err := metadata.NewEtcdStore(ctx, metadata.ClusterMetadata{ + Brokers: []protocol.MetadataBroker{ + {NodeID: 1, Host: "broker-0", Port: 9092}, + }, + ControllerID: 1, + }, metadata.EtcdStoreConfig{Endpoints: endpoints}) + if err != nil { + t.Fatalf("NewEtcdStore: %v", err) + } + defer func() { _ = store.EtcdClient().Close() }() + + if _, err := store.CreateTopic(ctx, metadata.TopicSpec{ + Name: "orders", + NumPartitions: 1, + ReplicationFactor: 1, + }); err != nil { + t.Fatalf("CreateTopic: %v", err) + } + + s3 := storage.NewMemoryS3Client() + artifact, err := storage.BuildSegment(storage.SegmentWriterConfig{IndexIntervalMessages: 1}, []storage.RecordBatch{ + { + BaseOffset: 0, + LastOffsetDelta: 0, + MessageCount: 1, + Bytes: make([]byte, 70), + }, + }, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC)) + if err != nil { + t.Fatalf("BuildSegment: %v", err) + } + if err := s3.UploadSegment(ctx, "default/orders/0/segment-00000000000000000000.kfs", artifact.SegmentBytes); err != nil { + t.Fatalf("UploadSegment: %v", err) + } + if err := s3.UploadIndex(ctx, "default/orders/0/segment-00000000000000000000.index", artifact.IndexBytes); err != nil { + t.Fatalf("UploadIndex: %v", err) + } + + prevS3 := newS3Client + prevStore := newEtcdStore + prevMemory := newMemoryS3 + t.Cleanup(func() { + newS3Client = prevS3 + newEtcdStore = prevStore + newMemoryS3 = prevMemory + }) + newS3Client = func(context.Context, storage.S3Config) (storage.S3Client, error) { + return s3, nil + } + newEtcdStore = func(context.Context, metadata.ClusterMetadata, metadata.EtcdStoreConfig) (*metadata.EtcdStore, error) { + return store, nil + } + newMemoryS3 = func() storage.S3Client { return s3 } + + t.Setenv("KAFSCALE_S3_BUCKET", "bucket") + t.Setenv("KAFSCALE_S3_REGION", "us-east-1") + t.Setenv("KAFSCALE_ETCD_ENDPOINTS", strings.Join(endpoints, ",")) + + var stdout bytes.Buffer + if err := run(context.Background(), []string{ + "restore", + "--topic", "orders", + "--target-topic", "orders-wrapper", + "--to", "2026-05-13T12:05:00Z", + }, &stdout, &bytes.Buffer{}); err != nil { + t.Fatalf("run restore: %v", err) + } + if !strings.Contains(stdout.String(), "orders-wrapper") { + t.Fatalf("unexpected restore output: %s", stdout.String()) + } +} + +func TestRunRestoreCommandRequiresTopic(t *testing.T) { + err := runRestoreCommand(context.Background(), []string{ + "--target-topic", "orders-restore", + "--to", "2026-05-13T12:05:00Z", + }, &bytes.Buffer{}) + if err == nil || !strings.Contains(err.Error(), "--topic is required") { + t.Fatalf("expected missing topic error, got %v", err) + } +} + +func TestS3ClientFromEnvUsesMemoryToggle(t *testing.T) { + prevMemory := newMemoryS3 + t.Cleanup(func() { newMemoryS3 = prevMemory }) + + mem := storage.NewMemoryS3Client() + newMemoryS3 = func() storage.S3Client { return mem } + t.Setenv("KAFSCALE_USE_MEMORY_S3", "true") + + client, err := s3ClientFromEnv(context.Background()) + if err != nil { + t.Fatalf("s3ClientFromEnv: %v", err) + } + if client != mem { + t.Fatal("expected injected memory s3 client") + } +} + +func TestMetadataStoreFromEnvPassesConfig(t *testing.T) { + prevStore := newEtcdStore + t.Cleanup(func() { newEtcdStore = prevStore }) + + t.Setenv("KAFSCALE_ETCD_ENDPOINTS", "http://a:2379,http://b:2379") + t.Setenv("KAFSCALE_ETCD_USERNAME", "user") + t.Setenv("KAFSCALE_ETCD_PASSWORD", "pass") + + var got metadata.EtcdStoreConfig + sentinel := errors.New("stop") + newEtcdStore = func(_ context.Context, _ metadata.ClusterMetadata, cfg metadata.EtcdStoreConfig) (*metadata.EtcdStore, error) { + got = cfg + return nil, sentinel + } + + err := func() error { + _, err := metadataStoreFromEnv(context.Background()) + return err + }() + if !errors.Is(err, sentinel) { + t.Fatalf("expected sentinel error, got %v", err) + } + if len(got.Endpoints) != 2 || got.Endpoints[0] != "http://a:2379" || got.Endpoints[1] != "http://b:2379" { + t.Fatalf("unexpected endpoints: %+v", got.Endpoints) + } + if got.Username != "user" || got.Password != "pass" { + t.Fatalf("unexpected auth config: %+v", got) + } +} + +func TestExecuteRestoreCreatesRecoveredTopic(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + store, err := metadata.NewEtcdStore(ctx, metadata.ClusterMetadata{ + Brokers: []protocol.MetadataBroker{ + {NodeID: 1, Host: "broker-0", Port: 9092}, + }, + ControllerID: 1, + }, metadata.EtcdStoreConfig{Endpoints: endpoints}) + if err != nil { + t.Fatalf("NewEtcdStore: %v", err) + } + defer func() { _ = store.EtcdClient().Close() }() + + if _, err := store.CreateTopic(ctx, metadata.TopicSpec{ + Name: "orders", + NumPartitions: 1, + ReplicationFactor: 1, + }); err != nil { + t.Fatalf("CreateTopic: %v", err) + } + cfg, err := store.FetchTopicConfig(ctx, "orders") + if err != nil { + t.Fatalf("FetchTopicConfig: %v", err) + } + cfg.RetentionMs = 60000 + cfg.Config = map[string]string{"cleanup.policy": "delete"} + if err := store.UpdateTopicConfig(ctx, cfg); err != nil { + t.Fatalf("UpdateTopicConfig: %v", err) + } + + s3 := storage.NewMemoryS3Client() + artifact, err := storage.BuildSegment(storage.SegmentWriterConfig{IndexIntervalMessages: 1}, []storage.RecordBatch{ + { + BaseOffset: 0, + LastOffsetDelta: 0, + MessageCount: 1, + Bytes: make([]byte, 70), + }, + }, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC)) + if err != nil { + t.Fatalf("BuildSegment: %v", err) + } + if err := s3.UploadSegment(ctx, "default/orders/0/segment-00000000000000000000.kfs", artifact.SegmentBytes); err != nil { + t.Fatalf("UploadSegment: %v", err) + } + if err := s3.UploadIndex(ctx, "default/orders/0/segment-00000000000000000000.index", artifact.IndexBytes); err != nil { + t.Fatalf("UploadIndex: %v", err) + } + + var stdout bytes.Buffer + if err := executeRestore(ctx, &stdout, restoreConfig{ + SourceTopic: "orders", + SourceNamespace: "default", + TargetTopic: "orders-restored", + TargetNamespace: "default", + RestoreTo: time.Date(2026, 5, 13, 12, 5, 0, 0, time.UTC), + }, s3, store); err != nil { + t.Fatalf("executeRestore: %v", err) + } + if !strings.Contains(stdout.String(), "restored orders to orders-restored") { + t.Fatalf("unexpected stdout: %s", stdout.String()) + } + + meta, err := store.Metadata(ctx, []string{"orders-restored"}) + if err != nil { + t.Fatalf("Metadata: %v", err) + } + if len(meta.Topics) != 1 || meta.Topics[0].ErrorCode != 0 || meta.Topics[0].Topic == nil || *meta.Topics[0].Topic != "orders-restored" { + t.Fatalf("unexpected target topic metadata: %+v", meta.Topics) + } + + targetCfg, err := store.FetchTopicConfig(ctx, "orders-restored") + if err != nil { + t.Fatalf("FetchTopicConfig target: %v", err) + } + if targetCfg.RetentionMs != 60000 { + t.Fatalf("expected retention to be copied, got %d", targetCfg.RetentionMs) + } + if targetCfg.Config["cleanup.policy"] != "delete" { + t.Fatalf("expected config to be copied, got %+v", targetCfg.Config) + } + + nextOffset, err := store.NextOffset(ctx, "orders-restored", 0) + if err != nil { + t.Fatalf("NextOffset: %v", err) + } + if nextOffset != 1 { + t.Fatalf("expected next offset 1, got %d", nextOffset) + } + + if _, err := s3.DownloadSegment(ctx, "default/orders-restored/0/segment-00000000000000000000.kfs", nil); err != nil { + t.Fatalf("DownloadSegment restored: %v", err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 3 * time.Second}) + if err != nil { + t.Fatalf("new etcd client: %v", err) + } + defer func() { _ = cli.Close() }() + + resp, err := cli.Get(ctx, metadata.PartitionStateKey("orders-restored", 0)) + if err != nil { + t.Fatalf("get partition state: %v", err) + } + if len(resp.Kvs) != 1 { + t.Fatalf("expected partition state, got %d", len(resp.Kvs)) + } + state, err := metadata.DecodePartitionState(resp.Kvs[0].Value) + if err != nil { + t.Fatalf("DecodePartitionState: %v", err) + } + if state.LogEndOffset != 1 || state.HighWatermark != 1 { + t.Fatalf("unexpected partition offsets: %+v", state) + } + if state.ActiveSegment != "segment-00000000000000000000.kfs" { + t.Fatalf("unexpected active segment: %+v", state) + } +} + +func TestCloneTopicConfig(t *testing.T) { + cfg := &metadatapb.TopicConfig{ + Name: "orders", + Partitions: 1, + ReplicationFactor: 1, + RetentionMs: 42, + Config: map[string]string{"a": "b"}, + } + cloned := cloneTopicConfig(cfg) + if cloned == cfg { + t.Fatal("expected clone to allocate a new object") + } + cloned.Config["a"] = "c" + if cfg.Config["a"] != "b" { + t.Fatal("expected source config map to stay untouched") + } +} + +func TestParsePartitions(t *testing.T) { + partitions, err := parsePartitions("2,0,2,1") + if err != nil { + t.Fatalf("parsePartitions: %v", err) + } + if want := []int32{0, 1, 2}; len(partitions) != len(want) || partitions[0] != want[0] || partitions[1] != want[1] || partitions[2] != want[2] { + t.Fatalf("unexpected partitions: %+v", partitions) + } +} + +func TestExecuteRestoreRejectsUnknownPartition(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + store, err := metadata.NewEtcdStore(ctx, metadata.ClusterMetadata{ + Brokers: []protocol.MetadataBroker{ + {NodeID: 1, Host: "broker-0", Port: 9092}, + }, + ControllerID: 1, + Topics: []protocol.MetadataTopic{ + { + Topic: kmsg.StringPtr("orders"), + Partitions: []protocol.MetadataPartition{ + {Partition: 0, Leader: 1, Replicas: []int32{1}, ISR: []int32{1}}, + }, + }, + }, + }, metadata.EtcdStoreConfig{Endpoints: endpoints}) + if err != nil { + t.Fatalf("NewEtcdStore: %v", err) + } + defer func() { _ = store.EtcdClient().Close() }() + + err = executeRestore(ctx, &bytes.Buffer{}, restoreConfig{ + SourceTopic: "orders", + SourceNamespace: "default", + TargetTopic: "orders-restored", + TargetNamespace: "default", + RestoreTo: time.Now().UTC(), + Partitions: []int32{99}, + }, storage.NewMemoryS3Client(), store) + if err == nil || !strings.Contains(err.Error(), "partition 99") { + t.Fatalf("expected unknown partition error, got %v", err) + } +} diff --git a/docs/operations.md b/docs/operations.md index 83abcb3..11f1926 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -233,6 +233,27 @@ When the KafScale operator manages etcd, each cluster pod runs ```restore init c The restore image must include `/bin/sh` and `etcdctl`. Override with `KAFSCALE_OPERATOR_ETCD_SNAPSHOT_ETCDCTL_IMAGE` if you use a custom image. +### Topic Recovery On The DR Spine + +For broker topic data, KafScale now supports **segment-granular recovery into a new topic** on the DR side. This is intentionally not an in-place rollback on the primary cluster. + +Use `kafscale-cli restore` to create a fresh target topic, copy `.kfs` segment/index pairs up to a cutoff timestamp, and set the recovered topic's next offsets: + +```bash +kafscale-cli restore \ + --topic orders \ + --target-topic orders-restore-20260513 \ + --to 2026-05-13T14:23:00Z +``` + +Operational semantics: + +- Recovery runs against the existing KafScale S3 + etcd control plane, including `KAFSCALE_S3_BUCKET`, `KAFSCALE_S3_REGION`, `KAFSCALE_S3_ENDPOINT`, `KAFSCALE_S3_PATH_STYLE`, and `KAFSCALE_ETCD_ENDPOINTS`. +- The target topic must be new. KafScale refuses to restore over an existing persisted topic. +- Recovery is **segment-granular**. The cutoff uses the immutable segment creation time, then copies contiguous segment/index pairs up to the first segment created after that timestamp. +- Offsets are preserved inside the recovered topic so replay, validation, and downstream cutover can happen without rewriting the source topic. +- The safer pattern is restore, validate, then cut consumers or downstream jobs over deliberately. + ### Consumer Offsets After Restore Etcd restores recover committed consumer offsets. If a consumer has **no committed offsets**, it may start at the end and see zero records even though data exists in S3. In production: diff --git a/pkg/storage/recovery.go b/pkg/storage/recovery.go new file mode 100644 index 0000000..743bc3b --- /dev/null +++ b/pkg/storage/recovery.go @@ -0,0 +1,306 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "path" + "sort" + "strconv" + "strings" + "time" +) + +const segmentHeaderLen = 32 + +// TopicRecoveryConfig defines a segment-granular topic restore into a new topic. +type TopicRecoveryConfig struct { + SourceNamespace string + SourceTopic string + TargetNamespace string + TargetTopic string + RestoreTo time.Time + Partitions []int32 +} + +// RecoveredSegment describes one copied segment/index pair. +type RecoveredSegment struct { + Partition int32 + BaseOffset int64 + LastOffset int64 + SizeBytes int64 + CreatedAt time.Time + SourceKey string + TargetKey string + SourceIndex string + TargetIndex string +} + +// RecoveredPartition summarizes copied data for one partition. +type RecoveredPartition struct { + Partition int32 + SegmentsCopied int + LastOffset int64 + Segments []RecoveredSegment +} + +// TopicRecoveryResult is the outcome of a restore run. +type TopicRecoveryResult struct { + SourceNamespace string + SourceTopic string + TargetNamespace string + TargetTopic string + RestoreTo time.Time + SegmentsCopied int + Partitions []RecoveredPartition +} + +type sourceSegment struct { + RecoveredSegment +} + +// RecoverTopicToTimestamp copies immutable segment/index pairs from one topic to +// another up to the first segment created after the requested cutoff. +func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecoveryConfig) (*TopicRecoveryResult, error) { + if s3 == nil { + return nil, fmt.Errorf("s3 client required") + } + if cfg.SourceTopic == "" { + return nil, fmt.Errorf("source topic required") + } + if cfg.TargetTopic == "" { + return nil, fmt.Errorf("target topic required") + } + if cfg.RestoreTo.IsZero() { + return nil, fmt.Errorf("restore timestamp required") + } + if cfg.SourceNamespace == "" { + cfg.SourceNamespace = "default" + } + if cfg.TargetNamespace == "" { + cfg.TargetNamespace = cfg.SourceNamespace + } + if cfg.SourceNamespace == cfg.TargetNamespace && cfg.SourceTopic == cfg.TargetTopic { + return nil, fmt.Errorf("target topic must differ from source topic") + } + + targetPrefix := path.Join(cfg.TargetNamespace, cfg.TargetTopic) + existing, err := s3.ListSegments(ctx, targetPrefix) + if err != nil { + return nil, err + } + for _, obj := range existing { + if strings.HasSuffix(obj.Key, ".kfs") { + return nil, fmt.Errorf("target topic already has persisted segments under %s", targetPrefix) + } + } + + sourcePrefix := path.Join(cfg.SourceNamespace, cfg.SourceTopic) + objects, err := s3.ListSegments(ctx, sourcePrefix) + if err != nil { + return nil, err + } + + allowedPartitions := make(map[int32]struct{}, len(cfg.Partitions)) + for _, partition := range cfg.Partitions { + allowedPartitions[partition] = struct{}{} + } + + segmentsByPartition := make(map[int32][]sourceSegment) + for _, obj := range objects { + if !strings.HasSuffix(obj.Key, ".kfs") { + continue + } + segment, err := inspectSourceSegment(ctx, s3, obj, cfg.SourceNamespace, cfg.SourceTopic) + if err != nil { + return nil, err + } + if len(allowedPartitions) > 0 { + if _, ok := allowedPartitions[segment.Partition]; !ok { + continue + } + } + segmentsByPartition[segment.Partition] = append(segmentsByPartition[segment.Partition], segment) + } + + partitions := make([]int32, 0, len(segmentsByPartition)) + for partition := range segmentsByPartition { + partitions = append(partitions, partition) + } + sort.Slice(partitions, func(i, j int) bool { return partitions[i] < partitions[j] }) + + result := &TopicRecoveryResult{ + SourceNamespace: cfg.SourceNamespace, + SourceTopic: cfg.SourceTopic, + TargetNamespace: cfg.TargetNamespace, + TargetTopic: cfg.TargetTopic, + RestoreTo: cfg.RestoreTo.UTC(), + Partitions: make([]RecoveredPartition, 0, len(partitions)), + } + + for _, partition := range partitions { + segments := segmentsByPartition[partition] + sort.Slice(segments, func(i, j int) bool { return segments[i].BaseOffset < segments[j].BaseOffset }) + + summary := RecoveredPartition{ + Partition: partition, + LastOffset: -1, + Segments: make([]RecoveredSegment, 0, len(segments)), + } + for _, segment := range segments { + if segment.CreatedAt.After(cfg.RestoreTo) { + break + } + + segmentBytes, err := s3.DownloadSegment(ctx, segment.SourceKey, nil) + if err != nil { + return nil, err + } + indexBytes, err := s3.DownloadIndex(ctx, segment.SourceIndex) + if err != nil { + return nil, err + } + + targetSegmentKey := segmentObjectKey(cfg.TargetNamespace, cfg.TargetTopic, partition, segment.BaseOffset) + targetIndexKey := segmentIndexKey(cfg.TargetNamespace, cfg.TargetTopic, partition, segment.BaseOffset) + if err := s3.UploadSegment(ctx, targetSegmentKey, segmentBytes); err != nil { + return nil, err + } + if err := s3.UploadIndex(ctx, targetIndexKey, indexBytes); err != nil { + return nil, err + } + + copied := segment.RecoveredSegment + copied.TargetKey = targetSegmentKey + copied.TargetIndex = targetIndexKey + summary.Segments = append(summary.Segments, copied) + summary.SegmentsCopied++ + summary.LastOffset = copied.LastOffset + result.SegmentsCopied++ + } + result.Partitions = append(result.Partitions, summary) + } + + return result, nil +} + +func inspectSourceSegment(ctx context.Context, s3 S3Client, obj S3Object, namespace string, topic string) (sourceSegment, error) { + partition, baseOffset, err := parseSegmentLocation(obj.Key, namespace, topic) + if err != nil { + return sourceSegment{}, err + } + if obj.Size < segmentFooterLen { + return sourceSegment{}, fmt.Errorf("segment %s too small", obj.Key) + } + + headerBytes, err := s3.DownloadSegment(ctx, obj.Key, &ByteRange{Start: 0, End: segmentHeaderLen - 1}) + if err != nil { + return sourceSegment{}, err + } + createdAt, err := parseSegmentHeaderCreatedAt(headerBytes) + if err != nil { + return sourceSegment{}, err + } + + start := obj.Size - segmentFooterLen + footerBytes, err := s3.DownloadSegment(ctx, obj.Key, &ByteRange{Start: start, End: obj.Size - 1}) + if err != nil { + return sourceSegment{}, err + } + lastOffset, err := parseSegmentFooter(footerBytes) + if err != nil { + return sourceSegment{}, err + } + + return sourceSegment{ + RecoveredSegment: RecoveredSegment{ + Partition: partition, + BaseOffset: baseOffset, + LastOffset: lastOffset, + SizeBytes: obj.Size, + CreatedAt: createdAt, + SourceKey: obj.Key, + SourceIndex: segmentIndexKey(namespace, topic, partition, baseOffset), + }, + }, nil +} + +func parseSegmentLocation(key string, namespace string, topic string) (int32, int64, error) { + prefix := path.Join(namespace, topic) + "/" + if !strings.HasPrefix(key, prefix) { + return 0, 0, fmt.Errorf("segment %s not under %s", key, prefix) + } + trimmed := strings.TrimPrefix(key, prefix) + parts := strings.Split(trimmed, "/") + if len(parts) != 2 { + return 0, 0, fmt.Errorf("segment %s has unexpected layout", key) + } + partition, err := strconv.ParseInt(parts[0], 10, 32) + if err != nil { + return 0, 0, fmt.Errorf("parse partition from %s: %w", key, err) + } + baseOffset, ok := parseSegmentBaseOffset(parts[1]) + if !ok { + return 0, 0, fmt.Errorf("parse base offset from %s", key) + } + return int32(partition), baseOffset, nil +} + +func parseSegmentHeaderCreatedAt(data []byte) (time.Time, error) { + if len(data) < segmentHeaderLen { + return time.Time{}, fmt.Errorf("header too small") + } + reader := bytes.NewReader(data) + magic := make([]byte, len(segmentMagic)) + if _, err := reader.Read(magic); err != nil { + return time.Time{}, err + } + if string(magic) != segmentMagic { + return time.Time{}, fmt.Errorf("invalid segment magic") + } + var version uint16 + if err := binary.Read(reader, binary.BigEndian, &version); err != nil { + return time.Time{}, err + } + var flags uint16 + if err := binary.Read(reader, binary.BigEndian, &flags); err != nil { + return time.Time{}, err + } + var baseOffset int64 + if err := binary.Read(reader, binary.BigEndian, &baseOffset); err != nil { + return time.Time{}, err + } + var messageCount int32 + if err := binary.Read(reader, binary.BigEndian, &messageCount); err != nil { + return time.Time{}, err + } + var createdAtMillis int64 + if err := binary.Read(reader, binary.BigEndian, &createdAtMillis); err != nil { + return time.Time{}, err + } + return time.UnixMilli(createdAtMillis).UTC(), nil +} + +func segmentObjectKey(namespace string, topic string, partition int32, baseOffset int64) string { + return path.Join(namespace, topic, fmt.Sprintf("%d", partition), fmt.Sprintf("segment-%020d.kfs", baseOffset)) +} + +func segmentIndexKey(namespace string, topic string, partition int32, baseOffset int64) string { + return path.Join(namespace, topic, fmt.Sprintf("%d", partition), fmt.Sprintf("segment-%020d.index", baseOffset)) +} diff --git a/pkg/storage/recovery_test.go b/pkg/storage/recovery_test.go new file mode 100644 index 0000000..47e2de6 --- /dev/null +++ b/pkg/storage/recovery_test.go @@ -0,0 +1,110 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "testing" + "time" +) + +func TestRecoverTopicToTimestampCopiesEligibleSegments(t *testing.T) { + s3 := NewMemoryS3Client() + older := time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC) + newer := older.Add(30 * time.Minute) + + uploadRecoverySegment(t, s3, "default", "orders", 0, 0, older) + uploadRecoverySegment(t, s3, "default", "orders", 0, 1, newer) + + result, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: older.Add(10 * time.Minute), + }) + if err != nil { + t.Fatalf("RecoverTopicToTimestamp: %v", err) + } + if result.SegmentsCopied != 1 { + t.Fatalf("expected 1 copied segment, got %d", result.SegmentsCopied) + } + if len(result.Partitions) != 1 { + t.Fatalf("expected 1 partition summary, got %d", len(result.Partitions)) + } + partition := result.Partitions[0] + if partition.Partition != 0 { + t.Fatalf("expected partition 0, got %d", partition.Partition) + } + if partition.LastOffset != 0 { + t.Fatalf("expected last offset 0, got %d", partition.LastOffset) + } + if partition.SegmentsCopied != 1 { + t.Fatalf("expected 1 copied segment for partition, got %d", partition.SegmentsCopied) + } + + segKey := "default/orders-restore/0/segment-00000000000000000000.kfs" + if _, err := s3.DownloadSegment(context.Background(), segKey, nil); err != nil { + t.Fatalf("download restored segment: %v", err) + } + if _, err := s3.DownloadIndex(context.Background(), "default/orders-restore/0/segment-00000000000000000000.index"); err != nil { + t.Fatalf("download restored index: %v", err) + } + if _, err := s3.DownloadSegment(context.Background(), "default/orders-restore/0/segment-00000000000000000001.kfs", nil); err == nil { + t.Fatal("expected newer segment to be excluded") + } +} + +func TestRecoverTopicToTimestampRejectsExistingTarget(t *testing.T) { + s3 := NewMemoryS3Client() + created := time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC) + uploadRecoverySegment(t, s3, "default", "orders", 0, 0, created) + uploadRecoverySegment(t, s3, "default", "orders-restore", 0, 0, created) + + _, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: created.Add(time.Minute), + }) + if err == nil { + t.Fatal("expected existing target restore to fail") + } +} + +func uploadRecoverySegment(t *testing.T, s3 *MemoryS3Client, namespace string, topic string, partition int32, baseOffset int64, created time.Time) { + t.Helper() + + artifact, err := BuildSegment(SegmentWriterConfig{IndexIntervalMessages: 1}, []RecordBatch{ + { + BaseOffset: baseOffset, + LastOffsetDelta: 0, + MessageCount: 1, + Bytes: make([]byte, 70), + }, + }, created) + if err != nil { + t.Fatalf("BuildSegment: %v", err) + } + + if err := s3.UploadSegment(context.Background(), segmentObjectKey(namespace, topic, partition, baseOffset), artifact.SegmentBytes); err != nil { + t.Fatalf("UploadSegment: %v", err) + } + if err := s3.UploadIndex(context.Background(), segmentIndexKey(namespace, topic, partition, baseOffset), artifact.IndexBytes); err != nil { + t.Fatalf("UploadIndex: %v", err) + } +}