diff --git a/rips/rustchain-core/src/anti_spoof/network_challenge.py b/rips/rustchain-core/src/anti_spoof/network_challenge.py index 42d3a4ac6..4ec0060f0 100644 --- a/rips/rustchain-core/src/anti_spoof/network_challenge.py +++ b/rips/rustchain-core/src/anti_spoof/network_challenge.py @@ -18,7 +18,6 @@ """ import hashlib -import hmac import json import os import secrets @@ -28,6 +27,28 @@ from typing import Optional, Dict, List, Tuple from enum import Enum +try: + from cryptography.exceptions import InvalidSignature + from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, + ) + from cryptography.hazmat.primitives.serialization import ( + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, + ) +except ImportError: # pragma: no cover - exercised only in minimal installs + InvalidSignature = Exception + Ed25519PrivateKey = None + Ed25519PublicKey = None + Encoding = None + NoEncryption = None + PrivateFormat = None + PublicFormat = None + + class ChallengeType(Enum): FULL = 0x00 # All hardware tests TIMEBASE = 0x01 # PowerPC timebase only @@ -51,6 +72,7 @@ class Challenge: """A cryptographic challenge sent to a validator""" challenge_id: str challenge_type: int + target_pubkey: str # Intended responder nonce: bytes # 32 bytes of randomness timestamp: int # Unix timestamp in milliseconds timeout_ms: int # Response must arrive within this window @@ -63,6 +85,7 @@ def to_bytes(self) -> bytes: return ( self.challenge_id.encode() + struct.pack('>B', self.challenge_type) + + self.target_pubkey.encode() + self.nonce + struct.pack('>Q', self.timestamp) + struct.pack('>I', self.timeout_ms) + @@ -125,6 +148,67 @@ class ValidationResult: serial_ok: bool failure_reasons: List[str] + +def _require_ed25519() -> None: + if Ed25519PrivateKey is None or Ed25519PublicKey is None: + raise RuntimeError("cryptography is required for Ed25519 challenge signatures") + + +def _clean_hex(value: str) -> str: + value = value.strip() + if value.startswith(("0x", "0X")): + return value[2:] + return value + + +def _load_ed25519_private_key(private_key) -> Ed25519PrivateKey: + _require_ed25519() + if isinstance(private_key, Ed25519PrivateKey): + return private_key + if isinstance(private_key, str): + private_key = bytes.fromhex(_clean_hex(private_key)) + if not isinstance(private_key, bytes) or len(private_key) != 32: + raise ValueError("Ed25519 private key must be 32 raw bytes or hex") + return Ed25519PrivateKey.from_private_bytes(private_key) + + +def _private_key_bytes(private_key) -> bytes: + private_key = _load_ed25519_private_key(private_key) + return private_key.private_bytes( + encoding=Encoding.Raw, + format=PrivateFormat.Raw, + encryption_algorithm=NoEncryption(), + ) + + +def derive_validator_pubkey(private_key) -> str: + """Return the hex Ed25519 public key for a raw private key.""" + private_key = _load_ed25519_private_key(private_key) + return private_key.public_key().public_bytes( + encoding=Encoding.Raw, + format=PublicFormat.Raw, + ).hex() + + +def generate_validator_private_key() -> bytes: + """Generate raw Ed25519 private-key bytes for a validator instance.""" + _require_ed25519() + return _private_key_bytes(Ed25519PrivateKey.generate()) + + +def _verify_ed25519(pubkey_hex: str, signature: bytes, payload: bytes) -> bool: + if Ed25519PublicKey is None: + return False + if not pubkey_hex or not signature: + return False + try: + public_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(_clean_hex(pubkey_hex))) + public_key.verify(signature, payload) + return True + except (ValueError, InvalidSignature): + return False + + class AntiSpoofValidator: """ Validates challenge responses to detect emulators. @@ -169,34 +253,59 @@ def generate_challenge( self, target_pubkey: str, expected_hardware: Dict, - challenger_privkey: bytes, # For signing + challenger_privkey: bytes, # Raw Ed25519 private key for signing challenge_type: ChallengeType = ChallengeType.FULL ) -> Challenge: """Generate a new challenge for a validator""" + private_key = _load_ed25519_private_key(challenger_privkey) challenge = Challenge( challenge_id=secrets.token_hex(16), challenge_type=challenge_type.value, + target_pubkey=target_pubkey, nonce=secrets.token_bytes(32), timestamp=int(time.time() * 1000), timeout_ms=self._get_timeout_for_hardware(expected_hardware), expected_hardware=expected_hardware, - challenger_pubkey=hashlib.sha256(challenger_privkey).hexdigest()[:40], + challenger_pubkey=derive_validator_pubkey(private_key), signature=b'' # Will be filled ) # Sign the challenge - challenge.signature = hmac.new( - challenger_privkey, - challenge.to_bytes(), - hashlib.sha256 - ).digest() + challenge.signature = private_key.sign(challenge.to_bytes()) # Store for later validation self.challenge_history[challenge.challenge_id] = challenge return challenge + def validate_challenge_signature(self, challenge: Challenge) -> bool: + """Verify that the advertised challenger public key signed the challenge.""" + return _verify_ed25519( + challenge.challenger_pubkey, + challenge.signature, + challenge.to_bytes(), + ) + + def sign_response(self, response: ChallengeResponse, responder_privkey: bytes) -> ChallengeResponse: + """Sign a response and bind it to the responder Ed25519 public key.""" + private_key = _load_ed25519_private_key(responder_privkey) + responder_pubkey = derive_validator_pubkey(private_key) + if response.responder_pubkey and response.responder_pubkey != responder_pubkey: + raise ValueError("response responder_pubkey does not match responder private key") + response.responder_pubkey = responder_pubkey + response.response_hash = response.hash() + response.signature = private_key.sign(response.to_bytes()) + return response + + def validate_response_signature(self, response: ChallengeResponse) -> bool: + """Verify that the advertised responder public key signed the response.""" + return _verify_ed25519( + response.responder_pubkey, + response.signature, + response.to_bytes(), + ) + def _get_timeout_for_hardware(self, hardware: Dict) -> int: """Calculate appropriate timeout based on hardware age""" tier = hardware.get('tier', 'modern') @@ -225,6 +334,21 @@ def validate_response( failures = [] confidence = 100.0 + challenge_signature_ok = self.validate_challenge_signature(challenge) + if not challenge_signature_ok: + failures.append("Challenge signature invalid or not bound to challenger public key") + confidence -= 100.0 + + response_signature_ok = self.validate_response_signature(response) + if not response_signature_ok: + failures.append("Response signature invalid or not bound to responder public key") + confidence -= 100.0 + + target_identity_ok = response.responder_pubkey == challenge.target_pubkey + if not target_identity_ok: + failures.append("Response signer does not match challenge target public key") + confidence -= 100.0 + # 1. Check timing window response_time = response.response_timestamp - challenge.timestamp timing_ok = self._check_timing(response_time, challenge.timeout_ms, failures) @@ -262,12 +386,19 @@ def validate_response( # 6. Verify response hash computed_hash = response.hash() - if computed_hash != response.response_hash: + response_hash_ok = computed_hash == response.response_hash + if not response_hash_ok: failures.append("Response hash mismatch - tampered data") confidence -= 50.0 # Final determination - valid = confidence >= 50.0 + valid = ( + confidence >= 50.0 + and challenge_signature_ok + and response_signature_ok + and target_identity_ok + and response_hash_ok + ) return ValidationResult( valid=valid, @@ -438,8 +569,20 @@ class NetworkChallengeProtocol: MAX_FAILURES_BEFORE_SLASH = 3 # 3 failures = slashed FAILURE_PENALTY_PERCENT = 10 # 10% reward penalty per failure - def __init__(self, validator_pubkey: str, hardware_profile: Dict): - self.pubkey = validator_pubkey + def __init__( + self, + validator_pubkey: Optional[str], + hardware_profile: Dict, + validator_private_key: Optional[bytes] = None, + ): + if validator_private_key is None: + validator_private_key = generate_validator_private_key() + self._signing_key = _load_ed25519_private_key(validator_private_key) + derived_pubkey = derive_validator_pubkey(self._signing_key) + if validator_pubkey and _clean_hex(validator_pubkey) != derived_pubkey: + raise ValueError("validator_pubkey does not match validator_private_key") + + self.pubkey = derived_pubkey self.hardware = hardware_profile self.validator = AntiSpoofValidator() self.pending_challenges: Dict[str, Challenge] = {} @@ -458,13 +601,10 @@ def should_challenge(self, block_height: int, target_pubkey: str) -> bool: def create_challenge(self, target_pubkey: str, target_hardware: Dict) -> Challenge: """Create a challenge for another validator""" - # Use pubkey as signing key for demo (use real keys in production) - privkey = hashlib.sha256(self.pubkey.encode()).digest() - challenge = self.validator.generate_challenge( target_pubkey=target_pubkey, expected_hardware=target_hardware, - challenger_privkey=privkey, + challenger_privkey=self._signing_key, challenge_type=ChallengeType.FULL ) @@ -569,7 +709,8 @@ def print_economic_analysis(): validator = AntiSpoofValidator() # Generate challenge - privkey = secrets.token_bytes(32) + privkey = generate_validator_private_key() + responder_privkey = generate_validator_private_key() challenge = validator.generate_challenge( target_pubkey="target_validator_pubkey", expected_hardware=expected_hardware, @@ -594,10 +735,11 @@ def print_economic_analysis(): jitter_variance=25, # 2.5% variance - natural pipeline_cycles=1200, response_hash=b'', - responder_pubkey="responder_key", + responder_pubkey="", signature=b'' ) real_response.response_hash = real_response.hash() + validator.sign_response(real_response, responder_privkey) print("\n --- REAL HARDWARE RESPONSE ---") result = validator.validate_response(challenge, real_response) @@ -620,10 +762,11 @@ def print_economic_analysis(): jitter_variance=1, # Too consistent! Emulator detected pipeline_cycles=1200, response_hash=b'', - responder_pubkey="emulator_key", + responder_pubkey="", signature=b'' ) emu_response.response_hash = emu_response.hash() + validator.sign_response(emu_response, responder_privkey) print("\n --- EMULATOR RESPONSE ---") result = validator.validate_response(challenge, emu_response) diff --git a/tests/test_network_challenge_signatures.py b/tests/test_network_challenge_signatures.py new file mode 100644 index 000000000..5c087a34d --- /dev/null +++ b/tests/test_network_challenge_signatures.py @@ -0,0 +1,140 @@ +# SPDX-License-Identifier: MIT +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + +import pytest + + +pytest.importorskip("cryptography") + +ROOT = Path(__file__).resolve().parents[1] +MODULE_PATH = ROOT / "rips" / "rustchain-core" / "src" / "anti_spoof" / "network_challenge.py" + + +def _load_module(): + spec = importlib.util.spec_from_file_location("network_challenge_under_test", MODULE_PATH) + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def _hardware_profile(): + return { + "tier": "modern", + "openfirmware": {"serial_number": "G84243AZQ6P"}, + } + + +def _valid_response(module, challenge, responder_private_key): + response = module.ChallengeResponse( + challenge_id=challenge.challenge_id, + response_timestamp=challenge.timestamp + 1000, + timebase_value=173470036125283, + cache_l1_ticks=150, + cache_l2_ticks=450, + cache_ratio=3.0, + memory_ticks=15000, + thermal_celsius=43, + hardware_serial="G84243AZQ6P", + jitter_variance=25, + pipeline_cycles=1200, + response_hash=b"", + responder_pubkey="", + signature=b"", + ) + response.response_hash = response.hash() + return module.AntiSpoofValidator().sign_response(response, responder_private_key) + + +def test_protocol_rejects_mismatched_registered_pubkey(): + module = _load_module() + private_key = module.generate_validator_private_key() + public_key = module.derive_validator_pubkey(private_key) + + protocol = module.NetworkChallengeProtocol(public_key, _hardware_profile(), private_key) + + assert protocol.pubkey == public_key + with pytest.raises(ValueError, match="validator_pubkey"): + module.NetworkChallengeProtocol("00" * 32, _hardware_profile(), private_key) + + +def test_challenge_signature_is_bound_to_advertised_pubkey(): + module = _load_module() + private_key = module.generate_validator_private_key() + validator = module.AntiSpoofValidator() + challenge = validator.generate_challenge( + target_pubkey="target", + expected_hardware=_hardware_profile(), + challenger_privkey=private_key, + ) + + assert challenge.challenger_pubkey == module.derive_validator_pubkey(private_key) + assert validator.validate_challenge_signature(challenge) is True + + challenge.signature = b"\x00" * 64 + assert validator.validate_challenge_signature(challenge) is False + + +def test_validate_response_rejects_forged_response_signature(): + module = _load_module() + challenger_private_key = module.generate_validator_private_key() + responder_private_key = module.generate_validator_private_key() + validator = module.AntiSpoofValidator() + challenge = validator.generate_challenge( + target_pubkey=module.derive_validator_pubkey(responder_private_key), + expected_hardware=_hardware_profile(), + challenger_privkey=challenger_private_key, + ) + response = _valid_response(module, challenge, responder_private_key) + + good_result = validator.validate_response(challenge, response) + assert good_result.valid is True + assert good_result.failure_reasons == [] + + response.signature = b"\x00" * 64 + result = validator.validate_response(challenge, response) + + assert result.valid is False + assert any("Response signature invalid" in reason for reason in result.failure_reasons) + + +def test_validate_response_rejects_responder_that_is_not_challenge_target(): + module = _load_module() + challenger_private_key = module.generate_validator_private_key() + victim_private_key = module.generate_validator_private_key() + attacker_private_key = module.generate_validator_private_key() + validator = module.AntiSpoofValidator() + challenge = validator.generate_challenge( + target_pubkey=module.derive_validator_pubkey(victim_private_key), + expected_hardware=_hardware_profile(), + challenger_privkey=challenger_private_key, + ) + response = _valid_response(module, challenge, attacker_private_key) + + result = validator.validate_response(challenge, response) + + assert result.valid is False + assert any("does not match challenge target" in reason for reason in result.failure_reasons) + + +def test_validate_response_rejects_forged_challenge_signature(): + module = _load_module() + challenger_private_key = module.generate_validator_private_key() + responder_private_key = module.generate_validator_private_key() + validator = module.AntiSpoofValidator() + challenge = validator.generate_challenge( + target_pubkey=module.derive_validator_pubkey(responder_private_key), + expected_hardware=_hardware_profile(), + challenger_privkey=challenger_private_key, + ) + response = _valid_response(module, challenge, responder_private_key) + + challenge.signature = b"\x00" * 64 + result = validator.validate_response(challenge, response) + + assert result.valid is False + assert any("Challenge signature invalid" in reason for reason in result.failure_reasons)