Version: 1.0 | Role: Senior Big Data Architect & Research Lead Target: Developer Team Implementation Guide | Audience: Workshop / SIGMOD Paper Prototype
- Project Overview & Goals
- System Architecture Summary
- HIFUN DSL Definition (JSON-Based Subset)
- Proposed Directory Structure
- Phase 1 — Environment Setup
- Phase 2 — Data Preparation
- Phase 3 — Core Logic Implementation
- Phase 4 — ML Pipeline
- Phase 5 — Execution Engine Integration
- Phase 6 — Evaluation & Experiments
- Algorithm Pseudocode Reference
- Baseline vs. Learned Model: Experimental Instructions
- Risk Register & Mitigation
- References
Modern applications execute mixed workloads over both relational stores (SQL) and graph/document stores (NoSQL). A single high-level HIFUN query may contain subexpressions that are best served by different runtimes — Spark SQL for joins and aggregations, and GraphFrames/GraphX for traversals and motif finding.
Manually specifying execution engines per subquery is error-prone. This system automatically decomposes a HIFUN query and routes each subexpression to the optimal engine using a trained ML classifier.
| # | Question |
|---|---|
| RQ1 | Can a lightweight ML model (decision tree, XGBoost) reliably predict whether a HIFUN subexpression runs faster as SQL or graph? |
| RQ2 | Which features (cardinality, degree, selectivity, historical runtime) are most predictive? |
| RQ3 | How much improvement does learned routing provide over rule-based heuristics? |
- Correctness: Results of routed queries match reference (naive in-memory) execution.
- Latency: Learned routing achieves ≥10% median latency improvement over rule-based baseline on mixed-workload queries.
- Model inference time: <10ms per subexpression (does not dominate planning overhead).
- Reproducibility: All experiments runnable from a single shell script with publicly available datasets.
┌─────────────────────────────────────────────────────────────┐
│ APPLICATION LAYER │
│ User submits JSON-DSL HIFUN Query │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ QUERY INTELLIGENCE LAYER │
│ ┌───────────┐ ┌────────────────┐ ┌──────────────────┐ │
│ │ Parser │→ │ Decomposer │→ │ Feature Extractor│ │
│ │ (AST Gen) │ │(Subexpressions)│ │ (Vector Builder) │ │
│ └───────────┘ └────────────────┘ └──────────────────┘ │
│ │ │
│ ┌─────────────────┐ │
│ │ ML Router Model │ │
│ │ (XGBoost/DTree) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ SQL Route │ Graph Route
▼ ▼
┌──────────────────────┐ ┌──────────────────────────┐
│ Spark SQL Engine │ │ GraphFrames / GraphX │
│ (Catalyst Optimizer)│ │ (BFS, Motif Finding) │
└──────────────────────┘ └──────────────────────────┘
│ │
└──────────────┬───────────────┘
▼
┌───────────────────────┐
│ Result Composer │
│ (Join / Aggregate │
│ Partial Results) │
└───────────────────────┘
│
┌───────────────────────┐
│ Final Output │
└───────────────────────┘
│
┌───────────────────────┐
│ Runtime Logger │
│ (SQLite / JSON DB) │ ←── Feeds retraining
└───────────────────────┘
Because a full HIFUN parser is complex, this prototype uses a JSON-based DSL that captures the 4 essential operator types: FILTER, MAP, JOIN, and TRAVERSAL.
{
"query_id": "string",
"description": "human-readable label",
"operations": [
{
"op_id": "string (unique per subexpression)",
"type": "FILTER | MAP | JOIN | TRAVERSAL | AGGREGATE",
"source": "table_name or graph_name",
"fields": ["col1", "col2"],
"predicate": {
"column": "string",
"operator": "= | > | < | >= | <= | IN | LIKE",
"value": "scalar or list"
},
"join": {
"right_source": "table_name",
"left_key": "column",
"right_key": "column",
"join_type": "INNER | LEFT | RIGHT"
},
"traversal": {
"start_vertex_filter": {"column": "string", "value": "scalar"},
"edge_label": "string",
"direction": "IN | OUT | BOTH",
"max_hops": "integer",
"return_fields": ["col1", "col2"]
},
"aggregate": {
"group_by": ["col1"],
"functions": [{"func": "SUM | COUNT | AVG | MAX | MIN", "column": "col"}]
},
"depends_on": ["op_id1", "op_id2"]
}
]
}Example A — Pure Relational Join Query
{
"query_id": "q_tpch_001",
"description": "TPC-H orders joined with customers filtered by region",
"operations": [
{
"op_id": "s1",
"type": "FILTER",
"source": "customer",
"fields": ["c_custkey", "c_name", "c_nationkey"],
"predicate": {"column": "c_regionkey", "operator": "=", "value": 2},
"depends_on": []
},
{
"op_id": "s2",
"type": "JOIN",
"source": "orders",
"fields": ["o_orderkey", "o_custkey", "o_totalprice"],
"join": {
"right_source": "s1",
"left_key": "o_custkey",
"right_key": "c_custkey",
"join_type": "INNER"
},
"depends_on": ["s1"]
},
{
"op_id": "s3",
"type": "AGGREGATE",
"source": "s2",
"fields": ["c_name"],
"aggregate": {
"group_by": ["c_name"],
"functions": [{"func": "SUM", "column": "o_totalprice"}]
},
"depends_on": ["s2"]
}
]
}Example B — Mixed Traversal + Relational Query
{
"query_id": "q_snb_002",
"description": "Friends of a person (graph) joined with post counts (SQL)",
"operations": [
{
"op_id": "s1",
"type": "TRAVERSAL",
"source": "social_graph",
"traversal": {
"start_vertex_filter": {"column": "person_id", "value": 123},
"edge_label": "KNOWS",
"direction": "BOTH",
"max_hops": 2,
"return_fields": ["person_id", "name"]
},
"depends_on": []
},
{
"op_id": "s2",
"type": "AGGREGATE",
"source": "posts",
"fields": ["creator_id"],
"aggregate": {
"group_by": ["creator_id"],
"functions": [{"func": "COUNT", "column": "post_id"}]
},
"depends_on": []
},
{
"op_id": "s3",
"type": "JOIN",
"source": "s1",
"fields": ["person_id", "name", "post_count"],
"join": {
"right_source": "s2",
"left_key": "person_id",
"right_key": "creator_id",
"join_type": "LEFT"
},
"depends_on": ["s1", "s2"]
}
]
}| Operator | Best Engine (Rule) | Key Feature Signals |
|---|---|---|
FILTER |
SQL | High selectivity → SQL; low selectivity over graph → GRAPH |
MAP |
SQL | Always SQL unless on graph vertices |
JOIN |
SQL | Large cardinality → SQL |
TRAVERSAL |
GRAPH | High avg degree + low hops → GRAPH |
AGGREGATE |
SQL (usually) | Post-traversal aggregates may stay in GRAPH |
hifun_router/
│
├── README.md # Project overview + quickstart
├── requirements.txt # Python dependencies
├── docker-compose.yml # Spark + Jupyter environment
├── Makefile # Shortcut commands (setup, train, evaluate)
│
├── config/
│ ├── spark_config.py # SparkSession settings (local/cluster)
│ ├── paths.py # Dataset and model paths
│ └── feature_schema.json # Canonical feature vector definition
│
├── dsl/
│ ├── schema.json # JSON DSL schema (jsonschema format)
│ ├── validator.py # DSL schema validation
│ └── sample_queries/
│ ├── tpch_queries.json # TPC-H sample DSL queries
│ ├── snb_queries.json # SNB mixed queries
│ └── synthetic_queries.json # Generated test queries
│
├── parser/
│ ├── __init__.py
│ ├── dsl_parser.py # JSON DSL → internal AST (Python dict tree)
│ └── ast_nodes.py # Dataclasses: QueryNode, FilterNode, JoinNode, etc.
│
├── decomposer/
│ ├── __init__.py
│ └── query_decomposer.py # QueryDecomposer class — splits AST into subexpressions
│
├── features/
│ ├── __init__.py
│ ├── feature_extractor.py # FeatureExtractor class — builds feature vectors
│ ├── stats_collector.py # Collects cardinality, degree stats from Spark/Hive
│ └── historical_store.py # SQLite-based runtime history lookup
│
├── model/
│ ├── __init__.py
│ ├── trainer.py # Training pipeline: load data, fit, CV, save
│ ├── predictor.py # ModelPredictor class — loads artifact, runs inference
│ ├── feature_importance.py # SHAP analysis and ablation
│ └── artifacts/
│ ├── classifier_v1.pkl # Trained XGBoost model
│ └── feature_schema_v1.json # Feature names matching model artifact
│
├── execution/
│ ├── __init__.py
│ ├── sql_generator.py # SQLGenerator — DSL subexpression → Spark SQL/DataFrame
│ ├── graph_generator.py # GraphGenerator — DSL subexpression → GraphFrames code
│ └── result_composer.py # ResultComposer — merges partial DataFrames
│
├── router/
│ ├── __init__.py
│ └── hybrid_router.py # HybridRouter — orchestrates full query execution
│
├── data/
│ ├── raw/ # Raw downloaded datasets
│ ├── parquet/ # Preprocessed Parquet tables
│ ├── graphs/ # Edge lists as Parquet (GraphFrames input)
│ └── stats/ # Precomputed column stats JSON files
│
├── training_data/
│ ├── labeled_runs.csv # (op_id, features..., actual_sql_ms, actual_graph_ms, label)
│ └── collection_script.py # Runs both engines per subexpression to generate labels
│
├── experiments/
│ ├── run_baselines.py # Runs always-SQL, always-GRAPH, rule-based
│ ├── run_learned.py # Runs ML-routed execution
│ ├── ablation_study.py # Feature ablation experiments
│ └── results/ # Output CSVs + plots
│
├── tests/
│ ├── test_parser.py
│ ├── test_decomposer.py
│ ├── test_features.py
│ ├── test_correctness.py # Compares ML-routed output vs reference executor
│ └── reference_executor.py # Naive in-memory Python executor for correctness checks
│
└── notebooks/
├── 01_data_exploration.ipynb
├── 02_feature_analysis.ipynb
├── 03_model_training.ipynb
└── 04_results_visualization.ipynb
| Tool | Version | Purpose |
|---|---|---|
| Python | 3.10+ | Core language |
| Apache Spark | 3.4+ | SQL + GraphFrames runtime |
| GraphFrames | 0.8.3+ | Graph processing on Spark |
| PySpark | 3.4+ | Python Spark bindings |
| scikit-learn | 1.4+ | Decision tree baseline model |
| XGBoost | 2.0+ | Primary ML classifier |
| SHAP | 0.44+ | Feature importance analysis |
| pandas | 2.0+ | Local data manipulation |
| SQLite3 | (stdlib) | Historical runtime store |
| jsonschema | 4.x | DSL query validation |
| Docker | 24+ | Reproducible environment (optional) |
# Clone repo
git clone https://github.com/your-org/hifun_router.git
cd hifun_router
# Create virtual environment
python -m venv .venv && source .venv/bin/activate
# Install dependencies
pip install -r requirements.txtrequirements.txt:
pyspark==3.4.2
graphframes==0.8.3
xgboost==2.0.3
scikit-learn==1.4.0
shap==0.44.0
pandas==2.1.4
numpy==1.26.3
jsonschema==4.21.1
matplotlib==3.8.2
seaborn==0.13.1
pyarrow==14.0.2
lightgbm==4.2.0
from pyspark.sql import SparkSession
def get_spark_session(app_name: str = "HIFUN_Router") -> SparkSession:
spark = (
SparkSession.builder
.appName(app_name)
.master("local[*]")
.config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12")
.config("spark.sql.shuffle.partitions", "8")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
return spark| Dataset | Type | Use Case | Source |
|---|---|---|---|
| TPC-H (SF 1–5) | Relational | Join-heavy SQL queries | tpc.org |
| LDBC SNB (Interactive) | Graph + Relational | Mixed traversal + SQL | ldbcouncil.org |
| IMDB Public Dump | Relational + Graph | Real-world relationships | imdb.com/interfaces |
| Synthetic Power-Law Graphs | Graph | Degree distribution control | NetworkX / SNAP |
TPC-H → Parquet:
# Generate TPC-H data using dbgen
cd tpch-kit && ./dbgen -s 1 -f
# Convert to Parquet via PySpark
python data/scripts/tpch_to_parquet.py --input ./tpch-kit --output data/parquet/tpch/LDBC SNB → Parquet + GraphFrames Edge List:
python data/scripts/snb_to_parquet.py \
--input data/raw/snb/ \
--tables data/parquet/snb/ \
--edges data/graphs/snb_edges.parquetSynthetic Graph Generator:
# data/scripts/generate_synthetic.py
import networkx as nx
import pandas as pd
def generate_powerlaw_graph(n_nodes, avg_degree, seed=42):
G = nx.barabasi_albert_graph(n_nodes, avg_degree // 2, seed=seed)
edges = pd.DataFrame(G.edges(), columns=["src", "dst"])
vertices = pd.DataFrame({"id": list(G.nodes()), "attr1": range(n_nodes)})
return vertices, edges
# Vary avg_degree in [2, 5, 10, 20, 50] to stress-test routing thresholdsFor each table and graph, precompute and save to data/stats/<name>_stats.json:
def compute_table_stats(spark, table_name: str, parquet_path: str) -> dict:
df = spark.read.parquet(parquet_path)
stats = {
"table_name": table_name,
"row_count": df.count(),
"column_count": len(df.columns),
"columns": {}
}
for col in df.columns:
stats["columns"][col] = {
"null_count": df.filter(df[col].isNull()).count(),
"distinct_count": df.select(col).distinct().count(),
"min": df.agg({col: "min"}).collect()[0][0],
"max": df.agg({col: "max"}).collect()[0][0],
}
return stats
def compute_graph_stats(spark, edge_parquet: str) -> dict:
edges = spark.read.parquet(edge_parquet)
degree_df = edges.groupBy("src").count()
stats_row = degree_df.selectExpr(
"avg(count) as avg_degree",
"max(count) as max_degree",
"stddev(count) as stddev_degree",
"count(*) as vertex_count"
).collect()[0]
return {
"avg_degree": stats_row.avg_degree,
"max_degree": stats_row.max_degree,
"stddev_degree": stats_row.stddev_degree,
"vertex_count": stats_row.vertex_count,
"edge_count": edges.count()
}Responsibilities: Validate incoming JSON DSL query, convert to an internal Python AST (tree of QueryNode dataclasses).
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
@dataclass
class QueryNode:
op_id: str
op_type: str # FILTER, MAP, JOIN, TRAVERSAL, AGGREGATE
source: str
fields: List[str]
predicate: Optional[Dict] = None
join: Optional[Dict] = None
traversal: Optional[Dict] = None
aggregate: Optional[Dict] = None
depends_on: List[str] = field(default_factory=list)
class DSLParser:
def parse(self, query_json: dict) -> List[QueryNode]:
"""
1. Validate JSON against schema (jsonschema.validate).
2. For each operation in query_json["operations"]:
a. Instantiate a QueryNode dataclass.
b. Append to ordered list.
3. Return list of QueryNodes in dependency order (topological sort).
"""
...
def _topological_sort(self, nodes: List[QueryNode]) -> List[QueryNode]:
"""
Kahn's algorithm on depends_on edges.
Ensures parent subexpressions are processed before children.
"""
...Responsibilities: Takes the parsed list of QueryNodes and groups them into candidate routing units — contiguous subexpressions of the same logical type that can be dispatched independently or together.
| Rule | Action |
|---|---|
A TRAVERSAL node is always its own candidate unit |
Isolate for GRAPH routing |
Contiguous FILTER + JOIN + AGGREGATE chains form one SQL candidate unit |
Group for SQL routing |
A MAP node is merged with its parent unit |
Avoid fragmentation |
| Nodes with no shared dependencies are candidates to run in parallel | Mark parallelizable=True |
CLASS QueryDecomposer:
METHOD decompose(nodes: List[QueryNode]) -> List[SubExpression]:
STEP 1: Build dependency graph (adjacency list from depends_on).
STEP 2: Identify TRAVERSAL nodes -> each becomes its own SubExpression.
STEP 3: For non-TRAVERSAL nodes, apply contiguous grouping:
- BFS from root nodes (no dependencies).
- While current chain is FILTER/JOIN/MAP/AGGREGATE:
-> Continue adding to current group.
- When TRAVERSAL is encountered OR dependency from a TRAVERSAL result:
-> Close current group as one SubExpression.
-> Start new group after the TRAVERSAL.
STEP 4: Assign each SubExpression:
- sub_id: unique identifier
- nodes: list of QueryNodes in this unit
- primary_op_type: dominant operator (TRAVERSAL or RELATIONAL)
- depends_on_subs: sub_ids this unit depends on
- parallelizable: True if no shared upstream dependency
STEP 5: Return List[SubExpression]
SubExpression dataclass:
@dataclass
class SubExpression:
sub_id: str
nodes: List[QueryNode]
primary_op_type: str # "RELATIONAL" or "TRAVERSAL"
depends_on_subs: List[str]
parallelizable: bool
estimated_output_rows: int = 0 # filled by FeatureExtractorResponsibilities: Given a SubExpression and precomputed table/graph statistics, build a fixed-length numeric feature vector for the ML classifier.
The canonical feature vector has 22 features, defined in config/feature_schema.json:
{
"features": [
{"name": "op_count_filter", "type": "int", "description": "# FILTER nodes in subexpression"},
{"name": "op_count_join", "type": "int", "description": "# JOIN nodes in subexpression"},
{"name": "op_count_traversal", "type": "int", "description": "# TRAVERSAL nodes in subexpression"},
{"name": "op_count_aggregate", "type": "int", "description": "# AGGREGATE nodes in subexpression"},
{"name": "op_count_map", "type": "int", "description": "# MAP nodes in subexpression"},
{"name": "ast_depth", "type": "int", "description": "Max depth of AST subtree"},
{"name": "has_traversal", "type": "bool", "description": "1 if any TRAVERSAL node present"},
{"name": "max_hops", "type": "int", "description": "Max traversal hops (0 if none)"},
{"name": "input_cardinality_log", "type": "float", "description": "log10(estimated input rows)"},
{"name": "output_cardinality_log", "type": "float", "description": "log10(estimated output rows)"},
{"name": "selectivity", "type": "float", "description": "Fraction of rows passing filters [0,1]"},
{"name": "avg_degree", "type": "float", "description": "Avg out-degree of source graph (0 if relational)"},
{"name": "max_degree", "type": "float", "description": "Max out-degree of source graph (0 if relational)"},
{"name": "degree_skew", "type": "float", "description": "stddev/avg of degree distribution (0 if relational)"},
{"name": "num_projected_columns", "type": "int", "description": "# output columns in subexpression"},
{"name": "has_index", "type": "bool", "description": "1 if predicate column has index"},
{"name": "join_fanout", "type": "float", "description": "Estimated output rows / input rows for join (1.0 if no join)"},
{"name": "estimated_shuffle_bytes_log", "type": "float", "description": "log10(rows_out * avg_row_bytes) if SQL"},
{"name": "estimated_traversal_ops", "type": "float", "description": "start_vertices * avg_degree^max_hops (0 if SQL)"},
{"name": "hist_avg_runtime_ms", "type": "float", "description": "Avg runtime of similar past subexpressions (-1 if unknown)"},
{"name": "hist_runtime_variance", "type": "float", "description": "Variance of past runtimes (-1 if unknown)"},
{"name": "num_tables_joined", "type": "int", "description": "# distinct tables referenced in subexpression"}
]
}CLASS FeatureExtractor:
CONSTRUCTOR(stats_dir, history_db_path):
- Load all JSON stats files from stats_dir into self.stats dict.
- Connect to SQLite history DB.
METHOD extract(sub_expr: SubExpression) -> np.ndarray:
STEP 1 — Query Shape Features:
op_count_filter = count nodes where op_type == "FILTER"
op_count_join = count nodes where op_type == "JOIN"
op_count_traversal = count nodes where op_type == "TRAVERSAL"
op_count_aggregate = count nodes where op_type == "AGGREGATE"
op_count_map = count nodes where op_type == "MAP"
ast_depth = compute max depth by BFS from root node
has_traversal = 1 if op_count_traversal > 0 else 0
max_hops = max(node.traversal.max_hops for traversal nodes, default 0)
STEP 2 — Data Statistics Features:
source = sub_expr.nodes[0].source
table_stats = self.stats.get(source, {})
input_cardinality_log = log10(table_stats["row_count"] + 1)
selectivity = compute_selectivity(sub_expr, table_stats)
output_cardinality_log = log10(input_cardinality * selectivity + 1)
avg_degree = graph_stats["avg_degree"] if source is graph else 0.0
max_degree = graph_stats["max_degree"] if source is graph else 0.0
degree_skew = graph_stats["stddev_degree"] / avg_degree if avg_degree > 0 else 0.0
num_projected_columns = len(sub_expr.nodes[-1].fields)
has_index = check stats for index on predicate column -> 1 or 0
join_fanout = estimated_output_rows / input_cardinality (JOIN nodes only)
STEP 3 — Engine Cost Proxies:
avg_row_bytes = num_projected_columns * 8 # rough estimate
estimated_shuffle_bytes_log = log10(output_cardinality * avg_row_bytes + 1)
IF has_traversal:
start_vertices = apply_filter_selectivity(input_cardinality)
estimated_traversal_ops = start_vertices * (avg_degree ** max_hops)
ELSE:
estimated_traversal_ops = 0.0
num_tables_joined = count distinct source names in sub_expr.nodes
STEP 4 — Historical Features:
fingerprint = hash(sorted(op_types) + source_name)
history_row = query SQLite WHERE fingerprint = fingerprint ORDER BY ts DESC
IF history_row:
hist_avg_runtime_ms = history_row["avg_runtime_ms"]
hist_runtime_variance = history_row["variance_ms"]
ELSE:
hist_avg_runtime_ms = hist_runtime_variance = -1.0
STEP 5 — Assemble vector:
RETURN np.array([all 22 values in schema order], dtype=float32)
For each query in the training set:
FOR each query JSON file in dsl/sample_queries/:
1. Parse -> AST -> Decompose -> List of SubExpressions.
2. FOR each SubExpression s:
a. Execute s via SQLGenerator -> Spark SQL -> measure wall-clock ms.
b. Execute s via GraphGenerator -> GraphFrames -> measure wall-clock ms.
c. IF sql_ms < graph_ms: label = "SQL" ELSE label = "GRAPH".
d. Extract feature vector via FeatureExtractor.
e. Append row to training_data/labeled_runs.csv:
(sub_id, query_id, dataset, ...22 features..., sql_ms, graph_ms, label)
Columns in labeled_runs.csv:
sub_id, query_id, dataset, op_count_filter, op_count_join, op_count_traversal,
op_count_aggregate, op_count_map, ast_depth, has_traversal, max_hops,
input_cardinality_log, output_cardinality_log, selectivity, avg_degree,
max_degree, degree_skew, num_projected_columns, has_index, join_fanout,
estimated_shuffle_bytes_log, estimated_traversal_ops, hist_avg_runtime_ms,
hist_runtime_variance, num_tables_joined, sql_runtime_ms, graph_runtime_ms, label
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
FEATURE_COLS = [...] # 22 feature names from schema
def train(labeled_data_path: str, model_out: str, cv_folds: int = 5):
df = pd.read_csv(labeled_data_path)
X = df[FEATURE_COLS].values
y = (df["label"] == "GRAPH").astype(int).values # 0=SQL, 1=GRAPH
# Model 1: Decision Tree (interpretable baseline)
dt = DecisionTreeClassifier(max_depth=6, min_samples_leaf=10)
dt_scores = cross_val_score(dt, X, y, cv=StratifiedKFold(cv_folds), scoring="f1")
print(f"Decision Tree CV F1: {dt_scores.mean():.3f} +/- {dt_scores.std():.3f}")
# Model 2: XGBoost (primary classifier)
xgb_clf = xgb.XGBClassifier(
n_estimators=200, max_depth=5, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8, random_state=42,
use_label_encoder=False, eval_metric="logloss"
)
xgb_scores = cross_val_score(xgb_clf, X, y, cv=StratifiedKFold(cv_folds), scoring="f1")
print(f"XGBoost CV F1: {xgb_scores.mean():.3f} +/- {xgb_scores.std():.3f}")
# Train final model on all data
xgb_clf.fit(X, y)
joblib.dump(xgb_clf, model_out)
print(f"Model saved to {model_out}")import shap
import matplotlib.pyplot as plt
def generate_shap_report(model, X_train, feature_names, output_dir):
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_train)
shap.summary_plot(shap_values, X_train, feature_names=feature_names, show=False)
plt.savefig(f"{output_dir}/shap_summary.pdf", bbox_inches="tight")
plt.close()import joblib
import numpy as np
import time
class ModelPredictor:
def __init__(self, model_path: str):
self.model = joblib.load(model_path)
def predict(self, feature_vector: np.ndarray) -> str:
"""Returns 'SQL' or 'GRAPH' in <5ms."""
t0 = time.perf_counter()
pred = self.model.predict(feature_vector.reshape(1, -1))[0]
elapsed_ms = (time.perf_counter() - t0) * 1000
# Log inference time to ensure it stays <10ms
assert elapsed_ms < 10, f"Inference too slow: {elapsed_ms:.1f}ms"
return "GRAPH" if pred == 1 else "SQL"Translates a SubExpression with label SQL into PySpark DataFrame operations:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
class SQLGenerator:
def __init__(self, spark: SparkSession, parquet_dir: str):
self.spark = spark
self.parquet_dir = parquet_dir
self._cache: dict = {} # sub_id -> DataFrame (for composed results)
def generate(self, sub_expr) -> DataFrame:
df = None
for node in sub_expr.nodes:
if node.op_type == "FILTER":
df = self._apply_filter(node, df)
elif node.op_type == "JOIN":
df = self._apply_join(node, df)
elif node.op_type == "AGGREGATE":
df = self._apply_aggregate(node, df)
elif node.op_type == "MAP":
df = df.select(node.fields)
return df
def _apply_filter(self, node, df) -> DataFrame:
if df is None:
df = self.spark.read.parquet(f"{self.parquet_dir}/{node.source}")
p = node.predicate
return df.filter(F.col(p["column"]).cast("string") == str(p["value"]))
def _apply_join(self, node, df) -> DataFrame:
right_source = node.join["right_source"]
right_df = (self._cache.get(right_source) or
self.spark.read.parquet(f"{self.parquet_dir}/{right_source}"))
return df.join(right_df,
df[node.join["left_key"]] == right_df[node.join["right_key"]],
how=node.join["join_type"].lower())
def _apply_aggregate(self, node, df) -> DataFrame:
agg_exprs = [
getattr(F, fn["func"].lower())(fn["column"]).alias(f"{fn['func']}_{fn['column']}")
for fn in node.aggregate["functions"]
]
return df.groupBy(node.aggregate["group_by"]).agg(*agg_exprs)Translates a SubExpression with TRAVERSAL nodes into GraphFrames operations:
from graphframes import GraphFrame
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
class GraphGenerator:
def __init__(self, spark: SparkSession, graph_dir: str):
self.spark = spark
self.graph_dir = graph_dir
def generate(self, sub_expr) -> DataFrame:
for node in sub_expr.nodes:
if node.op_type == "TRAVERSAL":
return self._apply_traversal(node)
raise ValueError("No TRAVERSAL node found in graph subexpression")
def _apply_traversal(self, node) -> DataFrame:
t = node.traversal
vertices = self.spark.read.parquet(
f"{self.graph_dir}/{node.source}_vertices.parquet")
edges = self.spark.read.parquet(
f"{self.graph_dir}/{node.source}_edges.parquet")
gf = GraphFrame(vertices, edges)
start_filter = t["start_vertex_filter"]
result = gf.bfs(
fromExpr=f"{start_filter['column']} = {start_filter['value']}",
toExpr="id IS NOT NULL",
edgeFilter=f"relationship = '{t['edge_label']}'",
maxPathLength=t["max_hops"]
)
return result.select(t["return_fields"])class ResultComposer:
def compose(self, partial_results: dict, query_nodes: list) -> DataFrame:
"""
Merges partial DataFrames from SQL and GRAPH engines
according to the original depends_on relationship in the DSL.
Algorithm:
1. Build result map: {sub_id -> DataFrame}.
2. For any JOIN node that spans sub_ids:
a. Fetch left and right DataFrames from result map.
b. Perform Spark join using join keys from DSL.
c. Replace both entries in map with the composed result.
3. Return final composed DataFrame.
"""
...import time
class HybridRouter:
def __init__(self, spark, predictor, sql_gen, graph_gen, composer,
feature_extractor, logger):
self.spark = spark
self.predictor = predictor
self.sql_gen = sql_gen
self.graph_gen = graph_gen
self.composer = composer
self.feature_extractor = feature_extractor
self.logger = logger
def execute(self, query_json: dict):
# 1. Parse
nodes = DSLParser().parse(query_json)
# 2. Decompose
sub_expressions = QueryDecomposer().decompose(nodes)
# 3. Route + Execute
partial_results = {}
for sub in sub_expressions:
fv = self.feature_extractor.extract(sub)
engine = self.predictor.predict(fv)
t0 = time.perf_counter()
if engine == "SQL":
df = self.sql_gen.generate(sub)
else:
df = self.graph_gen.generate(sub)
elapsed_ms = (time.perf_counter() - t0) * 1000
partial_results[sub.sub_id] = df
self.logger.log(sub, engine, fv, elapsed_ms)
# 4. Compose
return self.composer.compose(partial_results, nodes)| Metric | Measurement Method |
|---|---|
| Wall-clock latency (median, p95) | time.perf_counter() in HybridRouter |
| Total shuffle bytes | Spark REST API: GET /api/v1/applications/{id}/stages — shuffleWriteBytes |
| CPU time | Spark executor metrics via SparkContext listener |
| Model inference time | Logged in ModelPredictor.predict() |
| Routing accuracy | Fraction of subexpressions where model chose the faster engine |
| Correctness | Row count + SHA256 checksum vs reference executor |
Stage 1 — Microbenchmarks (Decision Boundary Mapping)
Goal: Understand at what values of avg_degree and selectivity the routing decision flips.
# experiments/microbenchmark.py
for avg_degree in [2, 5, 10, 20, 50]:
for selectivity in [0.01, 0.05, 0.1, 0.25, 0.5, 1.0]:
generate_synthetic_graph(avg_degree=avg_degree)
run_single_traversal_subexpression(selectivity=selectivity)
record(sql_ms, graph_ms)
# Plot heatmap: avg_degree vs selectivity -> winner engineStage 2 — End-to-End Mixed Queries
# experiments/run_learned.py
for dataset in ["tpch_sf1", "snb_interactive", "imdb", "synthetic"]:
for query in load_queries(dataset):
result = HybridRouter().execute(query)
latency_ms = measure()
shuffle_bytes = measure()
record(dataset, query_id, "learned", latency_ms, shuffle_bytes)Stage 3 — Robustness & Online Retraining
- Evaluate a model trained on TPC-H applied to SNB (cross-dataset generalization).
- Measure accuracy degradation on out-of-distribution queries.
- Retrain with 20% new SNB-labeled samples; measure recovery in routing accuracy.
FUNCTION run_query(query_json):
nodes <- DSLParser.parse(query_json)
sub_exprs <- QueryDecomposer.decompose(nodes)
results <- {}
PARALLEL_GROUPS <- group sub_exprs by dependency level
FOR each group G in PARALLEL_GROUPS (in topological order):
PARALLEL FOR each sub IN G:
features <- FeatureExtractor.extract(sub)
engine <- ModelPredictor.predict(features) // "SQL" or "GRAPH"
IF engine == "SQL":
df <- SQLGenerator.generate(sub)
ELSE:
df <- GraphGenerator.generate(sub)
results[sub.sub_id] <- df
RuntimeLogger.log(sub.sub_id, engine, features, elapsed_ms)
final_df <- ResultComposer.compose(results, nodes)
RETURN final_df
FUNCTION compute_selectivity(sub_expr, table_stats):
IF no FILTER node in sub_expr:
RETURN 1.0
filter_node <- first FILTER node
col_stats <- table_stats["columns"][filter_node.predicate["column"]]
op <- filter_node.predicate["operator"]
IF op == "=":
RETURN 1.0 / col_stats["distinct_count"]
ELIF op IN [">", "<", ">=", "<="]:
col_range <- col_stats["max"] - col_stats["min"]
pred_val <- filter_node.predicate["value"]
IF op IN [">", ">="]:
RETURN (col_stats["max"] - pred_val) / col_range
ELSE:
RETURN (pred_val - col_stats["min"]) / col_range
ELIF op == "IN":
RETURN len(filter_node.predicate["value"]) / col_stats["distinct_count"]
ELSE:
RETURN 0.1 // conservative default
python experiments/run_baselines.py \
--strategy always_sql \
--queries dsl/sample_queries/ \
--output experiments/results/always_sql.csvImplementation: Override HybridRouter to always call SQLGenerator, bypassing the ML predictor entirely.
python experiments/run_baselines.py \
--strategy always_graph \
--queries dsl/sample_queries/ \
--output experiments/results/always_graph.csv# Rule: TRAVERSAL nodes -> GRAPH; everything else -> SQL
def rule_based_route(sub_expr) -> str:
return "GRAPH" if sub_expr.primary_op_type == "TRAVERSAL" else "SQL"python experiments/run_baselines.py \
--strategy rule_based \
--queries dsl/sample_queries/ \
--output experiments/results/rule_based.csv# Step 1: Train (if not already done)
python model/trainer.py \
--data training_data/labeled_runs.csv \
--output model/artifacts/classifier_v1.pkl
# Step 2: Evaluate
python experiments/run_learned.py \
--model model/artifacts/classifier_v1.pkl \
--queries dsl/sample_queries/ \
--output experiments/results/learned_routing.csvpython experiments/compare_results.py \
--results experiments/results/ \
--metrics latency_ms shuffle_bytes routing_accuracy \
--output experiments/results/comparison_table.csvExpected Output Table Format:
| Strategy | Median Latency (ms) | p95 Latency (ms) | Shuffle Bytes (MB) | Routing Accuracy |
|---|---|---|---|---|
| Always SQL | baseline | baseline | baseline | — |
| Always Graph | — | — | — | — |
| Rule-Based | — | — | — | — |
| Learned (XGBoost) | target: -15% | target: -10% | target: -20% | target: >85% |
python experiments/ablation_study.py \
--model model/artifacts/classifier_v1.pkl \
--data training_data/labeled_runs.csv \
--drop_features avg_degree selectivity input_cardinality_log hist_avg_runtime_ms \
--output experiments/results/ablation.csvFor each feature group: retrain model with that group removed, compare CV F1 vs the full 22-feature model. Report delta accuracy per removed group to identify the most critical features.
| Risk | Likelihood | Impact | Mitigation Strategy |
|---|---|---|---|
| Labeling cost — running both engines per subexpression is slow | High | High | Sample 500–1000 subexpressions; use SF=1 for TPC-H during label collection |
| Poor cardinality estimates degrade feature quality | Medium | High | Add histogram-based sampling; use approx_count_distinct in Spark |
| GraphFrames incompatibility with Spark version | Medium | Medium | Pin versions in requirements.txt; validate setup in Docker first |
| Correctness failures in result composition | Medium | High | Unit tests with reference executor; validate row counts + SHA256 checksums |
| Model overfits to TPC-H, fails on SNB | Medium | Medium | Stratified sampling across datasets; online retraining stage (Stage 3) |
| BFS explodes for high-degree nodes in graph traversal | Low | High | Cap max_hops <= 3; add vertex count limit to GraphFrames BFS call |
-
Spyratos, N. et al. — HIFUN: A High-Level Functional Query Language for Big Data Analytics. (HIFUN foundational semantics — cite in Introduction and System Design.)
-
Yu, X. et al. — Cost-Based or Learning-Based? A Hybrid Query Optimizer for Query Plan Selection. PVLDB. (Hybrid learned optimizer context — cite in Related Work.)
-
Anneser, C. et al. — Learned Query Optimization for Any SQL Database (AutoSteer). (Learned SQL optimizer — cite in Related Work.)
-
Li, Y. et al. — A Learned Cost Model for Big Data Query Processing / LOOPLINE. (Deep cost model comparison — cite in Related Work.)
-
Ye, J. et al. — LEAP: A Low-Cost Spark SQL Query Optimizer. (Spark-based optimization — directly relevant, cite in Related Work.)
End of Blueprint — Version 1.0
Recommended Next Steps for the Team:
- Set up environment per Phase 1. Confirm Spark + GraphFrames run locally in Docker.
- Ingest TPC-H SF=1 and SNB-small per the Phase 2 ingestion scripts.
- Implement
DSLParser+QueryDecomposerwith full unit tests before touching ML code.- Run label collection on 10 sample queries to validate the pipeline before scaling up.
- Train the first model iteration and verify that inference time stays under 10ms.
- Run all four routing strategies and produce the comparison table for the paper.