Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 162 additions & 19 deletions rips/rustchain-core/src/anti_spoof/network_challenge.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"""

import hashlib
import hmac
import json
import os
import secrets
Expand All @@ -28,6 +27,28 @@
from typing import Optional, Dict, List, Tuple
from enum import Enum

try:
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
except ImportError: # pragma: no cover - exercised only in minimal installs
InvalidSignature = Exception
Ed25519PrivateKey = None
Ed25519PublicKey = None
Encoding = None
NoEncryption = None
PrivateFormat = None
PublicFormat = None


class ChallengeType(Enum):
FULL = 0x00 # All hardware tests
TIMEBASE = 0x01 # PowerPC timebase only
Expand All @@ -51,6 +72,7 @@ class Challenge:
"""A cryptographic challenge sent to a validator"""
challenge_id: str
challenge_type: int
target_pubkey: str # Intended responder
nonce: bytes # 32 bytes of randomness
timestamp: int # Unix timestamp in milliseconds
timeout_ms: int # Response must arrive within this window
Expand All @@ -63,6 +85,7 @@ def to_bytes(self) -> bytes:
return (
self.challenge_id.encode() +
struct.pack('>B', self.challenge_type) +
self.target_pubkey.encode() +
self.nonce +
struct.pack('>Q', self.timestamp) +
struct.pack('>I', self.timeout_ms) +
Expand Down Expand Up @@ -125,6 +148,67 @@ class ValidationResult:
serial_ok: bool
failure_reasons: List[str]


def _require_ed25519() -> None:
if Ed25519PrivateKey is None or Ed25519PublicKey is None:
raise RuntimeError("cryptography is required for Ed25519 challenge signatures")


def _clean_hex(value: str) -> str:
value = value.strip()
if value.startswith(("0x", "0X")):
return value[2:]
return value


def _load_ed25519_private_key(private_key) -> Ed25519PrivateKey:
_require_ed25519()
if isinstance(private_key, Ed25519PrivateKey):
return private_key
if isinstance(private_key, str):
private_key = bytes.fromhex(_clean_hex(private_key))
if not isinstance(private_key, bytes) or len(private_key) != 32:
raise ValueError("Ed25519 private key must be 32 raw bytes or hex")
return Ed25519PrivateKey.from_private_bytes(private_key)


def _private_key_bytes(private_key) -> bytes:
private_key = _load_ed25519_private_key(private_key)
return private_key.private_bytes(
encoding=Encoding.Raw,
format=PrivateFormat.Raw,
encryption_algorithm=NoEncryption(),
)


def derive_validator_pubkey(private_key) -> str:
"""Return the hex Ed25519 public key for a raw private key."""
private_key = _load_ed25519_private_key(private_key)
return private_key.public_key().public_bytes(
encoding=Encoding.Raw,
format=PublicFormat.Raw,
).hex()


def generate_validator_private_key() -> bytes:
"""Generate raw Ed25519 private-key bytes for a validator instance."""
_require_ed25519()
return _private_key_bytes(Ed25519PrivateKey.generate())


def _verify_ed25519(pubkey_hex: str, signature: bytes, payload: bytes) -> bool:
if Ed25519PublicKey is None:
return False
if not pubkey_hex or not signature:
return False
try:
public_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(_clean_hex(pubkey_hex)))
public_key.verify(signature, payload)
return True
except (ValueError, InvalidSignature):
return False


class AntiSpoofValidator:
"""
Validates challenge responses to detect emulators.
Expand Down Expand Up @@ -169,34 +253,59 @@ def generate_challenge(
self,
target_pubkey: str,
expected_hardware: Dict,
challenger_privkey: bytes, # For signing
challenger_privkey: bytes, # Raw Ed25519 private key for signing
challenge_type: ChallengeType = ChallengeType.FULL
) -> Challenge:
"""Generate a new challenge for a validator"""
private_key = _load_ed25519_private_key(challenger_privkey)

challenge = Challenge(
challenge_id=secrets.token_hex(16),
challenge_type=challenge_type.value,
target_pubkey=target_pubkey,
nonce=secrets.token_bytes(32),
timestamp=int(time.time() * 1000),
timeout_ms=self._get_timeout_for_hardware(expected_hardware),
expected_hardware=expected_hardware,
challenger_pubkey=hashlib.sha256(challenger_privkey).hexdigest()[:40],
challenger_pubkey=derive_validator_pubkey(private_key),
signature=b'' # Will be filled
)

# Sign the challenge
challenge.signature = hmac.new(
challenger_privkey,
challenge.to_bytes(),
hashlib.sha256
).digest()
challenge.signature = private_key.sign(challenge.to_bytes())

# Store for later validation
self.challenge_history[challenge.challenge_id] = challenge

return challenge

def validate_challenge_signature(self, challenge: Challenge) -> bool:
"""Verify that the advertised challenger public key signed the challenge."""
return _verify_ed25519(
challenge.challenger_pubkey,
challenge.signature,
challenge.to_bytes(),
)

def sign_response(self, response: ChallengeResponse, responder_privkey: bytes) -> ChallengeResponse:
"""Sign a response and bind it to the responder Ed25519 public key."""
private_key = _load_ed25519_private_key(responder_privkey)
responder_pubkey = derive_validator_pubkey(private_key)
if response.responder_pubkey and response.responder_pubkey != responder_pubkey:
raise ValueError("response responder_pubkey does not match responder private key")
response.responder_pubkey = responder_pubkey
response.response_hash = response.hash()
response.signature = private_key.sign(response.to_bytes())
return response

def validate_response_signature(self, response: ChallengeResponse) -> bool:
"""Verify that the advertised responder public key signed the response."""
return _verify_ed25519(
response.responder_pubkey,
response.signature,
response.to_bytes(),
)

def _get_timeout_for_hardware(self, hardware: Dict) -> int:
"""Calculate appropriate timeout based on hardware age"""
tier = hardware.get('tier', 'modern')
Expand Down Expand Up @@ -225,6 +334,21 @@ def validate_response(
failures = []
confidence = 100.0

challenge_signature_ok = self.validate_challenge_signature(challenge)
if not challenge_signature_ok:
failures.append("Challenge signature invalid or not bound to challenger public key")
confidence -= 100.0

response_signature_ok = self.validate_response_signature(response)
if not response_signature_ok:
failures.append("Response signature invalid or not bound to responder public key")
confidence -= 100.0

target_identity_ok = response.responder_pubkey == challenge.target_pubkey
if not target_identity_ok:
failures.append("Response signer does not match challenge target public key")
confidence -= 100.0

# 1. Check timing window
response_time = response.response_timestamp - challenge.timestamp
timing_ok = self._check_timing(response_time, challenge.timeout_ms, failures)
Expand Down Expand Up @@ -262,12 +386,19 @@ def validate_response(

# 6. Verify response hash
computed_hash = response.hash()
if computed_hash != response.response_hash:
response_hash_ok = computed_hash == response.response_hash
if not response_hash_ok:
failures.append("Response hash mismatch - tampered data")
confidence -= 50.0

# Final determination
valid = confidence >= 50.0
valid = (
confidence >= 50.0
and challenge_signature_ok
and response_signature_ok
and target_identity_ok
and response_hash_ok
)

return ValidationResult(
valid=valid,
Expand Down Expand Up @@ -438,8 +569,20 @@ class NetworkChallengeProtocol:
MAX_FAILURES_BEFORE_SLASH = 3 # 3 failures = slashed
FAILURE_PENALTY_PERCENT = 10 # 10% reward penalty per failure

def __init__(self, validator_pubkey: str, hardware_profile: Dict):
self.pubkey = validator_pubkey
def __init__(
self,
validator_pubkey: Optional[str],
hardware_profile: Dict,
validator_private_key: Optional[bytes] = None,
):
if validator_private_key is None:
validator_private_key = generate_validator_private_key()
self._signing_key = _load_ed25519_private_key(validator_private_key)
derived_pubkey = derive_validator_pubkey(self._signing_key)
if validator_pubkey and _clean_hex(validator_pubkey) != derived_pubkey:
raise ValueError("validator_pubkey does not match validator_private_key")

self.pubkey = derived_pubkey
self.hardware = hardware_profile
self.validator = AntiSpoofValidator()
self.pending_challenges: Dict[str, Challenge] = {}
Expand All @@ -458,13 +601,10 @@ def should_challenge(self, block_height: int, target_pubkey: str) -> bool:

def create_challenge(self, target_pubkey: str, target_hardware: Dict) -> Challenge:
"""Create a challenge for another validator"""
# Use pubkey as signing key for demo (use real keys in production)
privkey = hashlib.sha256(self.pubkey.encode()).digest()

challenge = self.validator.generate_challenge(
target_pubkey=target_pubkey,
expected_hardware=target_hardware,
challenger_privkey=privkey,
challenger_privkey=self._signing_key,
challenge_type=ChallengeType.FULL
)

Expand Down Expand Up @@ -569,7 +709,8 @@ def print_economic_analysis():
validator = AntiSpoofValidator()

# Generate challenge
privkey = secrets.token_bytes(32)
privkey = generate_validator_private_key()
responder_privkey = generate_validator_private_key()
challenge = validator.generate_challenge(
target_pubkey="target_validator_pubkey",
expected_hardware=expected_hardware,
Expand All @@ -594,10 +735,11 @@ def print_economic_analysis():
jitter_variance=25, # 2.5% variance - natural
pipeline_cycles=1200,
response_hash=b'',
responder_pubkey="responder_key",
responder_pubkey="",
signature=b''
)
real_response.response_hash = real_response.hash()
validator.sign_response(real_response, responder_privkey)

print("\n --- REAL HARDWARE RESPONSE ---")
result = validator.validate_response(challenge, real_response)
Expand All @@ -620,10 +762,11 @@ def print_economic_analysis():
jitter_variance=1, # Too consistent! Emulator detected
pipeline_cycles=1200,
response_hash=b'',
responder_pubkey="emulator_key",
responder_pubkey="",
signature=b''
)
emu_response.response_hash = emu_response.hash()
validator.sign_response(emu_response, responder_privkey)

print("\n --- EMULATOR RESPONSE ---")
result = validator.validate_response(challenge, emu_response)
Expand Down
Loading