Thread's dataflow integration for incremental code analysis, using ReCoco (a Rust-only fork of CocoIndex) for content-addressed caching and dependency tracking.
thread-flow bridges Thread's imperative AST analysis engine with ReCoco's declarative dataflow framework, enabling persistent incremental updates and multi-backend storage. It provides:
- ✅ Content-Addressed Caching: 50x+ performance gains via automatic incremental updates
- ✅ Dependency Tracking: File-level and symbol-level dependency graph management
- ✅ Multi-Backend Storage: Postgres (CLI), D1 (Edge), and in-memory (testing)
- ✅ Dual Deployment: Single codebase compiles to CLI (Rayon parallelism) and Edge (tokio async)
- ✅ Language Extractors: Built-in support for Rust, Python, TypeScript, and Go
┌─────────────────────────────────────────────────────────────┐
│ thread-flow Crate │
├─────────────────────────────────────────────────────────────┤
│ Incremental System │
│ ├─ Analyzer: Change detection & invalidation │
│ ├─ Extractors: Language-specific dependency parsing │
│ │ ├─ Rust: use declarations, pub use re-exports │
│ │ ├─ Python: import/from...import statements │
│ │ ├─ TypeScript: ES6 imports, CommonJS requires │
│ │ └─ Go: import blocks, module path resolution │
│ ├─ Graph: BFS traversal, topological sort, cycles │
│ └─ Storage: Backend abstraction with factory pattern │
│ ├─ Postgres: Connection pooling, prepared statements │
│ ├─ D1: Cloudflare REST API, HTTP client │
│ └─ InMemory: Testing and development │
├─────────────────────────────────────────────────────────────┤
│ ReCoco Integration │
│ ├─ Bridge: Adapts Thread → ReCoco operators │
│ ├─ Flows: Declarative analysis pipeline builder │
│ └─ Runtime: CLI (Rayon) vs Edge (tokio) strategies │
└─────────────────────────────────────────────────────────────┘
Add to your Cargo.toml:
[dependencies]
thread-flow = { version = "0.1", features = ["postgres-backend", "parallel"] }use thread_flow::incremental::{
create_backend, BackendType, BackendConfig,
IncrementalAnalyzer,
};
use std::path::PathBuf;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create storage backend
let backend = create_backend(
BackendType::Postgres,
BackendConfig::Postgres {
database_url: std::env::var("DATABASE_URL")?,
},
).await?;
// Initialize analyzer
let mut analyzer = IncrementalAnalyzer::new(backend);
// Analyze changes
let files = vec![
PathBuf::from("src/main.rs"),
PathBuf::from("src/lib.rs"),
];
let result = analyzer.analyze_changes(&files).await?;
println!("Changed: {} files", result.changed_files.len());
println!("Affected: {} files", result.affected_files.len());
println!("Cache hit rate: {:.1}%", result.cache_hit_rate * 100.0);
println!("Analysis time: {}µs", result.analysis_time_us);
Ok(())
}use thread_flow::incremental::extractors::{RustDependencyExtractor, LanguageDetector};
use std::path::Path;
async fn extract_dependencies(file_path: &Path) -> Result<Vec<DependencyEdge>, Box<dyn std::error::Error>> {
let source = tokio::fs::read_to_string(file_path).await?;
// Detect language
let detector = LanguageDetector::new();
let lang = detector.detect_from_path(file_path)?;
// Extract dependencies
let extractor = RustDependencyExtractor::new();
let edges = extractor.extract(file_path, &source)?;
Ok(edges)
}use thread_flow::incremental::IncrementalAnalyzer;
use std::path::PathBuf;
async fn handle_file_change(
analyzer: &mut IncrementalAnalyzer,
changed_file: PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {
// Invalidate dependents
let affected = analyzer.invalidate_dependents(&[changed_file.clone()]).await?;
println!("Invalidated {} dependent files", affected.len());
// Re-analyze affected files
let mut files_to_analyze = vec![changed_file];
files_to_analyze.extend(affected);
let result = analyzer.reanalyze_invalidated(&files_to_analyze).await?;
println!("Re-analyzed {} files in {}µs",
result.changed_files.len(),
result.analysis_time_us
);
Ok(())
}| Feature | Description | Default |
|---|---|---|
postgres-backend |
Postgres storage with connection pooling | ✅ |
d1-backend |
Cloudflare D1 backend for edge deployment | ❌ |
parallel |
Rayon-based parallelism (CLI only) | ✅ |
caching |
Query result caching with Moka | ❌ |
recoco-minimal |
Local file source for ReCoco | ✅ |
recoco-postgres |
PostgreSQL target for ReCoco | ✅ |
worker |
Edge deployment optimizations | ❌ |
CLI Deployment (recommended):
thread-flow = { version = "0.1", features = ["postgres-backend", "parallel"] }Edge Deployment (Cloudflare Workers):
thread-flow = { version = "0.1", features = ["d1-backend", "worker"] }Testing:
[dev-dependencies]
thread-flow = "0.1" # InMemory backend always availableUses Postgres for persistent storage with Rayon for CPU-bound parallelism:
use thread_flow::incremental::{create_backend, BackendType, BackendConfig};
let backend = create_backend(
BackendType::Postgres,
BackendConfig::Postgres {
database_url: "postgresql://localhost/thread".to_string(),
},
).await?;
// Configure for CLI
// - Rayon parallel processing enabled via `parallel` feature
// - Connection pooling via deadpool-postgres
// - Batch operations for improved throughputPerformance targets:
- Storage latency: <10ms p95
- Cache hit rate: >90%
- Parallel speedup: 3-4x on quad-core
Uses Cloudflare D1 for distributed storage with tokio async I/O:
use thread_flow::incremental::{create_backend, BackendType, BackendConfig};
let backend = create_backend(
BackendType::D1,
BackendConfig::D1 {
account_id: std::env::var("CF_ACCOUNT_ID")?,
database_id: std::env::var("CF_DATABASE_ID")?,
api_token: std::env::var("CF_API_TOKEN")?,
},
).await?;
// Configure for Edge
// - HTTP API client for D1 REST API
// - Async-first with tokio runtime
// - No filesystem access (worker feature)Performance targets:
- Storage latency: <50ms p95
- Cache hit rate: >90%
- Horizontal scaling across edge locations
Comprehensive API docs and integration guides:
- Incremental System: See incremental module docs
- D1 Integration: See
docs/api/D1_INTEGRATION_API.md - ReCoco Bridge: See bridge module docs
- Language Extractors: See extractors module docs
Run examples with:
# Observability instrumentation
cargo run --example observability_example
# D1 local testing (requires D1 emulator)
cargo run --example d1_local_test
# D1 integration testing (requires D1 credentials)
cargo run --example d1_integration_test --features d1-backend# Run all tests
cargo nextest run --all-features
# Run incremental system tests
cargo nextest run -p thread-flow --test incremental_integration_tests
# Run backend-specific tests
cargo nextest run -p thread-flow --test incremental_postgres_tests --features postgres-backend
cargo nextest run -p thread-flow --test incremental_d1_tests --features d1-backend
# Run performance regression tests
cargo nextest run -p thread-flow --test performance_regression_tests# Fingerprint performance
cargo bench --bench fingerprint_benchmark
# D1 profiling (requires credentials)
cargo bench --bench d1_profiling --features d1-backend
# Load testing
cargo bench --bench load_test- Fingerprint computation: <5µs per file (Blake3)
- Dependency extraction: 1-10ms per file (language-dependent)
- Graph traversal: O(V+E) for BFS invalidation
- Cache hit rate: >90% typical, >95% ideal
| Backend | Read Latency (p95) | Write Latency (p95) | Throughput |
|---|---|---|---|
| InMemory | <1ms | <1ms | 10K+ ops/sec |
| Postgres | <10ms | <15ms | 1K+ ops/sec |
| D1 | <50ms | <100ms | 100+ ops/sec |
| Language | Parse Time (p95) | Complexity |
|---|---|---|
| Rust | 2-5ms | High (macros, visibility) |
| TypeScript | 1-3ms | Medium (ESM + CJS) |
| Python | 1-2ms | Low (simple imports) |
| Go | 1-3ms | Medium (module resolution) |
# Install development tools
mise install
# Run tests
cargo nextest run --all-features
# Run linting
cargo clippy --all-features
# Format code
cargo fmt- Service-Library Dual Architecture: Features consider both library API design AND service deployment
- Test-First Development: Tests → Approve → Fail → Implement (mandatory)
- Constitutional Compliance: All changes must adhere to Thread Constitution v2.0.0
See CLAUDE.md for complete development guidelines.
AGPL-3.0-or-later
thread-ast-engine: Core AST parsing and pattern matchingthread-language: Language definitions and tree-sitter parsersthread-services: High-level service interfacesrecoco: ReCoco dataflow engine (Rust-only CocoIndex fork)
Status: Production-ready (Phase 5 complete) Maintainer: Knitli Inc. knitli@knit.li Contributors: Claude Sonnet 4.5 noreply@anthropic.com