Skip to content

Commit fc9853e

Browse files
CopilotEdgeTypE
andcommitted
Add 5 new statistical test plugins with comprehensive tests
Co-authored-by: EdgeTypE <34396598+EdgeTypE@users.noreply.github.com>
1 parent 22f73a3 commit fc9853e

11 files changed

Lines changed: 2123 additions & 0 deletions
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
"""Chi-Square test plugin for uniformity of byte distribution.
2+
3+
The Chi-Square test checks whether the observed byte frequency distribution
4+
significantly deviates from the expected uniform distribution. A low p-value
5+
indicates the data is non-random or biased.
6+
"""
7+
8+
import math
9+
from collections import Counter
10+
from typing import Dict, Any
11+
12+
try:
13+
from ..plugin_api import BytesView, TestResult, TestPlugin
14+
except Exception:
15+
from patternanalyzer.plugin_api import BytesView, TestResult, TestPlugin # type: ignore
16+
17+
18+
class ChiSquareTest(TestPlugin):
19+
"""Chi-Square test for byte frequency uniformity."""
20+
21+
def __init__(self):
22+
"""Initialize the plugin with streaming state."""
23+
# Streaming accumulators
24+
self._counter = Counter()
25+
self._total_bytes = 0
26+
27+
def describe(self) -> str:
28+
"""Return plugin description."""
29+
return "Chi-Square test for uniformity of byte distribution"
30+
31+
def run(self, data: BytesView, params: dict) -> TestResult:
32+
"""Run Chi-Square test in batch mode."""
33+
data_bytes = data.to_bytes()
34+
n = len(data_bytes)
35+
36+
if n == 0:
37+
return TestResult(
38+
test_name="chi_square",
39+
passed=True,
40+
p_value=1.0,
41+
category="statistical",
42+
metrics={"total_bytes": 0, "chi_square_statistic": 0.0},
43+
)
44+
45+
# Count frequency of each byte value
46+
counter = Counter(data_bytes)
47+
48+
# Expected frequency for uniform distribution
49+
expected = n / 256.0
50+
51+
# Calculate chi-square statistic
52+
chi_square = sum((count - expected) ** 2 / expected for count in counter.values())
53+
54+
# Add missing byte values (count = 0) to chi-square
55+
observed_bytes = len(counter)
56+
missing_bytes = 256 - observed_bytes
57+
if missing_bytes > 0:
58+
chi_square += missing_bytes * (expected ** 2 / expected)
59+
60+
# Degrees of freedom = 256 - 1 = 255
61+
df = 255
62+
63+
# Calculate p-value using chi-square CDF
64+
p_value = 1.0 - self._chi_square_cdf(chi_square, df)
65+
66+
# Determine if test passed
67+
alpha = float(params.get("alpha", 0.01))
68+
passed = p_value > alpha
69+
70+
return TestResult(
71+
test_name="chi_square",
72+
passed=passed,
73+
p_value=p_value,
74+
category="statistical",
75+
metrics={
76+
"total_bytes": n,
77+
"chi_square_statistic": chi_square,
78+
"degrees_of_freedom": df,
79+
"unique_bytes": observed_bytes,
80+
},
81+
p_values={"chi_square": p_value},
82+
)
83+
84+
def update(self, chunk: bytes, params: dict) -> None:
85+
"""Update internal accumulators with a chunk of raw bytes."""
86+
if not chunk:
87+
return
88+
self._counter.update(chunk)
89+
self._total_bytes += len(chunk)
90+
91+
def finalize(self, params: dict) -> TestResult:
92+
"""Finalize streaming aggregation and return TestResult."""
93+
n = self._total_bytes
94+
counter = self._counter
95+
96+
# Reset accumulators for possible reuse
97+
self._counter = Counter()
98+
self._total_bytes = 0
99+
100+
if n == 0:
101+
return TestResult(
102+
test_name="chi_square",
103+
passed=True,
104+
p_value=1.0,
105+
category="statistical",
106+
metrics={"total_bytes": 0, "chi_square_statistic": 0.0},
107+
)
108+
109+
# Expected frequency for uniform distribution
110+
expected = n / 256.0
111+
112+
# Calculate chi-square statistic
113+
chi_square = sum((count - expected) ** 2 / expected for count in counter.values())
114+
115+
# Add missing byte values (count = 0) to chi-square
116+
observed_bytes = len(counter)
117+
missing_bytes = 256 - observed_bytes
118+
if missing_bytes > 0:
119+
chi_square += missing_bytes * (expected ** 2 / expected)
120+
121+
# Degrees of freedom = 256 - 1 = 255
122+
df = 255
123+
124+
# Calculate p-value using chi-square CDF
125+
p_value = 1.0 - self._chi_square_cdf(chi_square, df)
126+
127+
# Determine if test passed
128+
alpha = float(params.get("alpha", 0.01))
129+
passed = p_value > alpha
130+
131+
return TestResult(
132+
test_name="chi_square",
133+
passed=passed,
134+
p_value=p_value,
135+
category="statistical",
136+
metrics={
137+
"total_bytes": n,
138+
"chi_square_statistic": chi_square,
139+
"degrees_of_freedom": df,
140+
"unique_bytes": observed_bytes,
141+
},
142+
p_values={"chi_square": p_value},
143+
)
144+
145+
def _chi_square_cdf(self, x: float, df: int) -> float:
146+
"""Approximate chi-square cumulative distribution function.
147+
148+
Uses the relationship between chi-square and gamma distribution.
149+
For large df, uses normal approximation.
150+
"""
151+
if x <= 0:
152+
return 0.0
153+
154+
if df > 100:
155+
# Wilson-Hilferty transformation for large df
156+
z = ((x / df) ** (1.0/3.0) - (1.0 - 2.0/(9.0*df))) / math.sqrt(2.0/(9.0*df))
157+
return self._normal_cdf(z)
158+
159+
# Use incomplete gamma function for small to medium df
160+
return self._gamma_cdf(x / 2.0, df / 2.0)
161+
162+
def _gamma_cdf(self, x: float, k: float) -> float:
163+
"""Approximate gamma CDF using incomplete gamma function."""
164+
if x <= 0:
165+
return 0.0
166+
167+
# Use series expansion for small x*k, continued fraction for large x*k
168+
if x * k < 1.0:
169+
# Series expansion
170+
return self._gamma_series(x, k)
171+
else:
172+
# Continued fraction
173+
return 1.0 - self._gamma_cf(x, k)
174+
175+
def _gamma_series(self, x: float, k: float) -> float:
176+
"""Series expansion for lower incomplete gamma."""
177+
max_iter = 1000
178+
epsilon = 1e-10
179+
180+
result = 1.0 / k
181+
term = result
182+
183+
for n in range(1, max_iter):
184+
term *= x / (k + n)
185+
result += term
186+
if abs(term) < epsilon:
187+
break
188+
189+
return result * math.exp(-x + k * math.log(x) - math.lgamma(k))
190+
191+
def _gamma_cf(self, x: float, k: float) -> float:
192+
"""Continued fraction for upper incomplete gamma."""
193+
max_iter = 1000
194+
epsilon = 1e-10
195+
196+
# Lentz's algorithm
197+
tiny = 1e-30
198+
b = x + 1.0 - k
199+
c = 1.0 / tiny
200+
d = 1.0 / b
201+
h = d
202+
203+
for i in range(1, max_iter):
204+
a = -i * (i - k)
205+
b += 2.0
206+
d = a * d + b
207+
if abs(d) < tiny:
208+
d = tiny
209+
c = b + a / c
210+
if abs(c) < tiny:
211+
c = tiny
212+
d = 1.0 / d
213+
delta = d * c
214+
h *= delta
215+
if abs(delta - 1.0) < epsilon:
216+
break
217+
218+
return h * math.exp(-x + k * math.log(x) - math.lgamma(k))
219+
220+
def _normal_cdf(self, x: float) -> float:
221+
"""Approximation of standard normal cumulative distribution function."""
222+
# Abramowitz and Stegun approximation
223+
a1 = 0.254829592
224+
a2 = -0.284496736
225+
a3 = 1.421413741
226+
a4 = -1.453152027
227+
a5 = 1.061405429
228+
p = 0.3275911
229+
230+
sign = 1 if x >= 0 else -1
231+
x = abs(x) / math.sqrt(2.0)
232+
233+
t = 1.0 / (1.0 + p * x)
234+
y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * math.exp(-x * x)
235+
236+
return 0.5 * (1.0 + sign * y)

0 commit comments

Comments
 (0)