Skip to content

Commit d735252

Browse files
committed
Add advanced statistical and ML plugins to PatternLab
Introduces new plugins for advanced statistical tests (Dieharder, TestU01, DFT spectral, entropy), cryptographic structure detection (ECB, frequency, known constants), and machine learning-based anomaly/label detection (autoencoder, LSTM/GRU, classifier). Updates CLI to support these plugins and handle numpy/bytes in JSON output. Engine now gracefully skips missing/unregistered tests. Adds corresponding tests for new plugins and removes obsolete report/test_results JSON files.
1 parent 297f9fc commit d735252

34 files changed

Lines changed: 3331 additions & 1099 deletions

patternlab/cli.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,24 @@
55
import os
66
from .engine import Engine
77
from .plugin_api import serialize_testresult
8-
8+
9+
def _json_default(obj):
10+
"""Fallback serializer to convert numpy scalars/arrays and bytes into JSON-friendly types."""
11+
try:
12+
import numpy as _np # type: ignore
13+
if isinstance(obj, _np.generic):
14+
return obj.item()
15+
if isinstance(obj, _np.ndarray):
16+
return obj.tolist()
17+
except Exception:
18+
pass
19+
if isinstance(obj, (bytes, bytearray)):
20+
try:
21+
return bytes(obj).hex()
22+
except Exception:
23+
return str(obj)
24+
return str(obj)
25+
926
try:
1027
import yaml # optional dependency for YAML config files
1128
except Exception:
@@ -83,6 +100,21 @@ def analyze(input_file, output_file, xor_value, discover, config_path, profile,
83100
"fft_spectral",
84101
"autocorrelation",
85102
"linear_complexity",
103+
# Advanced tests added by integration
104+
"diehard_birthday_spacings",
105+
"diehard_overlapping_sums",
106+
"diehard_3d_spheres",
107+
"testu01_smallcrush",
108+
"dft_spectral_advanced",
109+
"hurst_exponent",
110+
# Crypto-specific analysis plugins (ECB/frequency/known-constants)
111+
"ecb_detector",
112+
"frequency_pattern",
113+
"known_constants_search",
114+
# Machine-learning based anomaly detection plugins
115+
"lstm_gru_anomaly",
116+
"autoencoder_anomaly",
117+
"classifier_labeler",
86118
]
87119

88120
file_conf = {}
@@ -160,7 +192,8 @@ def analyze(input_file, output_file, xor_value, discover, config_path, profile,
160192

161193
# Write output directly as JSON following the new schema
162194
with open(output_file, 'w', encoding='utf-8') as f:
163-
json.dump(output, f, indent=2)
195+
# Use custom default serializer to handle numpy types and bytes produced by plugins
196+
json.dump(output, f, indent=2, default=_json_default)
164197

165198
click.echo(f"Analysis complete. Results written to {output_file}")
166199

patternlab/engine.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,11 @@ def _analyze_impl(self, input_bytes: bytes, config: Dict[str, Any]) -> Dict[str,
589589
raw_results.append({"test_name": rem.get('name'), "status": "skipped", "reason": "budget_exhausted"})
590590
break
591591

592-
tp = self._tests[c['name']]
592+
# Allow missing/unregistered tests to be gracefully skipped instead of raising KeyError.
593+
tp = self._tests.get(c['name'])
594+
if tp is None:
595+
raw_results.append({"test_name": c.get('name'), "status": "skipped", "reason": "test_not_registered"})
596+
continue
593597
reqs = getattr(tp, 'requires', []) or []
594598
missing_reqs: List[str] = []
595599
# Lazy caches for data views that may be expensive
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
from __future__ import annotations
2+
"""
3+
Autoencoder tabanlı anomalisi tespiti plugin.
4+
5+
- TestPlugin'den türetilir.
6+
- BytesView kullanır.
7+
- Hem batch (run) hem streaming (update/finalize) API'sini destekler.
8+
- Varsayılan olarak hızlı stub modu kullanır (use_stub=True).
9+
- Gerçek model için params['model_path'] ile keras modeli lazy-load eder.
10+
"""
11+
12+
from typing import Dict, Any
13+
import time
14+
try:
15+
from ..plugin_api import TestPlugin, TestResult, BytesView
16+
except Exception:
17+
from patternlab.plugin_api import TestPlugin, TestResult, BytesView # type: ignore
18+
19+
20+
class AutoencoderAnomalyPlugin(TestPlugin):
21+
"""
22+
Params:
23+
- model_path: optional path to saved keras autoencoder
24+
- window_size: int (default 1024)
25+
- downsample: int
26+
- reconstruction_threshold: float (anomaly threshold)
27+
- use_stub: bool (default True)
28+
- max_buffer_bytes: int for streaming buffer
29+
"""
30+
31+
requires = []
32+
33+
def __init__(self):
34+
self._buffer = bytearray()
35+
self._model = None
36+
self._model_path = None
37+
38+
def describe(self) -> str:
39+
return "Autoencoder-based reconstruction anomaly detection (batch + streaming)."
40+
41+
def _load_model(self, model_path: str):
42+
if self._model is not None and self._model_path == model_path:
43+
return self._model
44+
try:
45+
import tensorflow as tf # type: ignore
46+
model = tf.keras.models.load_model(model_path)
47+
self._model = model
48+
self._model_path = model_path
49+
return model
50+
except Exception as e:
51+
raise RuntimeError(f"failed_to_load_model:{e}")
52+
53+
def _bytes_to_series(self, b: BytesView, window_size: int, downsample: int = 1):
54+
mv = b.data
55+
arr = mv.tobytes()
56+
if downsample <= 1:
57+
series = [float(x) / 255.0 for x in arr[:window_size]]
58+
else:
59+
series = []
60+
step = downsample
61+
for i in range(0, min(len(arr), window_size * step), step):
62+
block = arr[i:i+step]
63+
if not block:
64+
break
65+
series.append(sum(block) / (len(block) * 255.0))
66+
if len(series) < window_size:
67+
series += [0.0] * (window_size - len(series))
68+
else:
69+
series = series[:window_size]
70+
return series
71+
72+
def run(self, data: BytesView, params: Dict[str, Any]) -> TestResult:
73+
start = time.time()
74+
use_stub = params.get("use_stub", True)
75+
window_size = int(params.get("window_size", min(len(data), 1024)))
76+
downsample = int(params.get("downsample", 1))
77+
threshold = float(params.get("reconstruction_threshold", 0.02))
78+
inference_timeout_ms = int(params.get("inference_timeout_ms", 5000))
79+
80+
series = self._bytes_to_series(data, window_size, downsample)
81+
82+
if use_stub:
83+
# Simple reconstruction via moving-average filter as stub
84+
import statistics
85+
smoothed = []
86+
k = 5
87+
for i in range(len(series)):
88+
window = series[max(0, i-k+1):i+1]
89+
smoothed.append(sum(window)/len(window))
90+
mse = sum((a-b)**2 for a,b in zip(series, smoothed)) / (len(series) or 1)
91+
passed = mse < threshold
92+
tr = TestResult(
93+
test_name="autoencoder_anomaly",
94+
passed=bool(passed),
95+
p_value=max(0.0, min(1.0, 1.0 - mse)),
96+
category="ml_anomaly",
97+
metrics={"reconstruction_mse": mse, "method": "stub"},
98+
time_ms=(time.time()-start)*1000.0,
99+
bytes_processed=len(data),
100+
)
101+
return tr
102+
103+
model_path = params.get("model_path")
104+
if not model_path:
105+
raise ValueError("model_path is required when use_stub=False")
106+
107+
model = self._load_model(model_path)
108+
109+
import numpy as np
110+
x = np.array(series, dtype=np.float32).reshape((1, len(series), 1))
111+
t0 = time.time()
112+
preds = model.predict(x, verbose=0)
113+
t1 = time.time()
114+
elapsed_ms = (t1 - t0) * 1000.0
115+
if elapsed_ms > inference_timeout_ms:
116+
raise RuntimeError("inference_timeout")
117+
118+
# interpret as reconstruction
119+
recon = preds.reshape(-1)[:len(series)]
120+
mse = float(((recon - np.array(series))**2).mean())
121+
passed = mse < threshold
122+
tr = TestResult(
123+
test_name="autoencoder_anomaly",
124+
passed=bool(passed),
125+
p_value=max(0.0, min(1.0, 1.0 - mse)),
126+
category="ml_anomaly",
127+
metrics={"reconstruction_mse": mse, "method": "model"},
128+
time_ms=(time.time()-start)*1000.0,
129+
bytes_processed=len(data),
130+
)
131+
return tr
132+
133+
def update(self, chunk: bytes, params: Dict[str, Any]) -> None:
134+
max_buffer = int(params.get("max_buffer_bytes", 10 * 1024 * 1024))
135+
if not isinstance(chunk, (bytes, bytearray)):
136+
raise ValueError("chunk must be bytes")
137+
self._buffer.extend(chunk)
138+
if len(self._buffer) > max_buffer:
139+
extra = len(self._buffer) - max_buffer
140+
del self._buffer[:extra]
141+
142+
def finalize(self, params: Dict[str, Any]) -> TestResult:
143+
bv = BytesView(bytes(self._buffer))
144+
params = dict(params)
145+
params.setdefault("use_stub", True)
146+
tr = self.run(bv, params)
147+
tr.metrics.setdefault("streaming", True)
148+
return tr
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
from __future__ import annotations
2+
"""
3+
Classifier labeler plugin.
4+
5+
- TestPlugin'den türetilir.
6+
- BytesView kullanır.
7+
- Streaming ve batch destekler.
8+
- Lazy-load scikit-learn modeller (joblib) veya stub heuristics.
9+
"""
10+
11+
from typing import Dict, Any, Optional, List
12+
import time
13+
14+
try:
15+
from ..plugin_api import TestPlugin, TestResult, BytesView
16+
except Exception:
17+
from patternlab.plugin_api import TestPlugin, TestResult, BytesView # type: ignore
18+
19+
class ClassifierLabelerPlugin(TestPlugin):
20+
"""
21+
Params:
22+
- model_path: optional path to sklearn joblib file
23+
- labels: optional list of labels expected by the model (default common set)
24+
- use_stub: bool (default True) -> quick heuristic
25+
- inference_timeout_ms: int
26+
- max_buffer_bytes: int for streaming
27+
"""
28+
29+
requires = []
30+
31+
def __init__(self):
32+
self._buffer = bytearray()
33+
self._model = None
34+
self._model_path = None
35+
36+
def describe(self) -> str:
37+
return "Classifier-based labeler (ransomware/encrypted/etc.) using pre-trained sklearn models."
38+
39+
def _load_model(self, model_path: str):
40+
if self._model is not None and self._model_path == model_path:
41+
return self._model
42+
try:
43+
import joblib # type: ignore
44+
model = joblib.load(model_path)
45+
# Ensure model supports predict and predict_proba
46+
if not hasattr(model, "predict"):
47+
raise RuntimeError("model_missing_predict")
48+
self._model = model
49+
self._model_path = model_path
50+
return model
51+
except Exception as e:
52+
raise RuntimeError(f"failed_to_load_model:{e}")
53+
54+
def _extract_features(self, b: BytesView, n_bins: int = 64):
55+
# Simple, fast histogram of byte values normalized; suitable for sklearn classifiers
56+
mv = b.data
57+
arr = mv.tobytes()
58+
bins = [0] * n_bins
59+
if arr:
60+
step = 256 // n_bins
61+
for v in arr:
62+
idx = min(n_bins - 1, v // step)
63+
bins[idx] += 1
64+
total = len(arr)
65+
features = [c / total for c in bins]
66+
else:
67+
features = [0.0] * n_bins
68+
return features
69+
70+
def run(self, data: BytesView, params: Dict[str, Any]) -> TestResult:
71+
start = time.time()
72+
use_stub = params.get("use_stub", True)
73+
model_path = params.get("model_path")
74+
labels: Optional[List[str]] = params.get("labels")
75+
inference_timeout_ms = int(params.get("inference_timeout_ms", 2000))
76+
77+
features = self._extract_features(data, n_bins=int(params.get("n_bins", 64)))
78+
79+
if use_stub:
80+
# Heuristic: high entropy -> likely encrypted; high repetition of small set -> maybe ransomware marker
81+
import math, statistics
82+
entropy = 0.0
83+
for p in features:
84+
if p > 0:
85+
entropy -= p * math.log2(p)
86+
# simple thresholds (heuristic)
87+
is_encrypted = entropy > 7.0
88+
is_ransomware = features[0] > 0.2 and features[255 // max(1, (256//len(features)))] > 0.01 if len(features) > 1 else False
89+
detected = "encrypted" if is_encrypted else ("ransomware" if is_ransomware else "unknown")
90+
p_value = min(1.0, entropy / 8.0)
91+
tr = TestResult(
92+
test_name="classifier_labeler",
93+
passed= not (detected in ("encrypted","ransomware")),
94+
p_value=float(p_value),
95+
category="ml_label",
96+
metrics={"label": detected, "entropy": entropy, "method": "stub"},
97+
time_ms=(time.time()-start)*1000.0,
98+
bytes_processed=len(data),
99+
)
100+
return tr
101+
102+
if not model_path:
103+
raise ValueError("model_path is required when use_stub=False")
104+
105+
model = self._load_model(model_path)
106+
107+
import numpy as np
108+
x = np.array(features, dtype=np.float32).reshape((1, -1))
109+
110+
t0 = time.time()
111+
# predict_proba may not be available for all models; fallback to decision_function or predict
112+
probs = None
113+
label = None
114+
try:
115+
if hasattr(model, "predict_proba"):
116+
probs = model.predict_proba(x)[0].tolist()
117+
classes = getattr(model, "classes_", None)
118+
if classes is None:
119+
classes = list(range(len(probs)))
120+
# choose top
121+
top_idx = int(np.argmax(probs))
122+
label = str(classes[top_idx])
123+
else:
124+
pred = model.predict(x)[0]
125+
label = str(pred)
126+
probs = None
127+
except Exception as e:
128+
raise RuntimeError(f"inference_failed:{e}")
129+
t1 = time.time()
130+
elapsed_ms = (t1 - t0) * 1000.0
131+
if elapsed_ms > inference_timeout_ms:
132+
raise RuntimeError("inference_timeout")
133+
134+
metrics = {"label": label, "method": "model"}
135+
if probs is not None:
136+
metrics["probabilities"] = probs
137+
tr = TestResult(
138+
test_name="classifier_labeler",
139+
passed=(label not in ("ransomware","encrypted")),
140+
p_value=(1.0 - max(probs)) if probs is not None else None,
141+
category="ml_label",
142+
metrics=metrics,
143+
time_ms=(time.time()-start)*1000.0,
144+
bytes_processed=len(data),
145+
)
146+
return tr
147+
148+
def update(self, chunk: bytes, params: Dict[str, Any]) -> None:
149+
max_buffer = int(params.get("max_buffer_bytes", 10 * 1024 * 1024))
150+
if not isinstance(chunk, (bytes, bytearray)):
151+
raise ValueError("chunk must be bytes")
152+
self._buffer.extend(chunk)
153+
if len(self._buffer) > max_buffer:
154+
extra = len(self._buffer) - max_buffer
155+
del self._buffer[:extra]
156+
157+
def finalize(self, params: Dict[str, Any]) -> TestResult:
158+
bv = BytesView(bytes(self._buffer))
159+
params = dict(params)
160+
params.setdefault("use_stub", True)
161+
tr = self.run(bv, params)
162+
tr.metrics.setdefault("streaming", True)
163+
return tr

0 commit comments

Comments
 (0)