Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions node/rustchain_v2_integrated_v2.2.1_rip200.py
Original file line number Diff line number Diff line change
Expand Up @@ -6860,8 +6860,7 @@ def send_sophiacheck_alert(alert_type, message, data):
def wallet_transfer_v2():
"""Transfer RTC between miner wallets - NOW WITH 2-PHASE COMMIT"""
# SECURITY: Require admin key for internal transfers
admin_key = request.headers.get("X-Admin-Key", "")
if not hmac.compare_digest(admin_key, os.environ.get("RC_ADMIN_KEY", "")):
if not is_admin(request):
return jsonify({
"error": "Unauthorized - admin key required",
"hint": "Use /wallet/transfer/signed for user transfers"
Expand Down Expand Up @@ -6959,8 +6958,7 @@ def wallet_transfer_v2():
@app.route('/pending/list', methods=['GET'])
def list_pending():
"""List all pending transfers"""
admin_key = request.headers.get("X-Admin-Key", "") or request.headers.get("X-API-Key", "")
if not hmac.compare_digest(admin_key, os.environ.get("RC_ADMIN_KEY", "")):
if not is_admin(request):
return jsonify({"error": "Unauthorized"}), 401

status_filter = request.args.get('status', 'pending')
Expand Down Expand Up @@ -7002,8 +7000,7 @@ def list_pending():
@app.route('/pending/void', methods=['POST'])
def void_pending():
"""Admin: Void a pending transfer before confirmation"""
admin_key = request.headers.get("X-Admin-Key", "")
if not hmac.compare_digest(admin_key, os.environ.get("RC_ADMIN_KEY", "")):
if not is_admin(request):
return jsonify({"error": "Unauthorized"}), 401

data = request.get_json()
Expand Down Expand Up @@ -7076,8 +7073,7 @@ def void_pending():
@app.route('/pending/confirm', methods=['POST'])
def confirm_pending():
"""Worker: Confirm pending transfers that have passed the delay period"""
admin_key = request.headers.get("X-Admin-Key", "")
if not hmac.compare_digest(admin_key, os.environ.get("RC_ADMIN_KEY", "")):
if not is_admin(request):
return jsonify({"error": "Unauthorized"}), 401

now = int(time.time())
Expand Down Expand Up @@ -7166,8 +7162,7 @@ def confirm_pending():
@app.route('/pending/integrity', methods=['GET'])
def check_integrity():
"""Check balance integrity: sum of ledger should match balances"""
admin_key = request.headers.get("X-Admin-Key", "") or request.headers.get("X-API-Key", "")
if not hmac.compare_digest(admin_key, os.environ.get("RC_ADMIN_KEY", "")):
if not is_admin(request):
return jsonify({"error": "Unauthorized"}), 401

with sqlite3.connect(DB_PATH) as db:
Expand Down
140 changes: 140 additions & 0 deletions node/tests/test_integrated_admin_fail_closed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# SPDX-License-Identifier: MIT

import importlib.util
import os
import sqlite3
import sys
import tempfile
import unittest
from contextlib import closing


NODE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
MODULE_PATH = os.path.join(NODE_DIR, "rustchain_v2_integrated_v2.2.1_rip200.py")
ADMIN_KEY = "0123456789abcdef0123456789abcdef"


class TestIntegratedAdminFailClosed(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._import_tmp = tempfile.TemporaryDirectory()
cls._prev_db_path = os.environ.get("RUSTCHAIN_DB_PATH")
cls._prev_admin_key = os.environ.get("RC_ADMIN_KEY")
os.environ["RUSTCHAIN_DB_PATH"] = os.path.join(cls._import_tmp.name, "import.db")
os.environ["RC_ADMIN_KEY"] = ADMIN_KEY

if NODE_DIR not in sys.path:
sys.path.insert(0, NODE_DIR)

spec = importlib.util.spec_from_file_location(
"rustchain_integrated_admin_fail_closed_test",
MODULE_PATH,
)
cls.mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(cls.mod)

@classmethod
def tearDownClass(cls):
if cls._prev_db_path is None:
os.environ.pop("RUSTCHAIN_DB_PATH", None)
else:
os.environ["RUSTCHAIN_DB_PATH"] = cls._prev_db_path
if cls._prev_admin_key is None:
os.environ.pop("RC_ADMIN_KEY", None)
else:
os.environ["RC_ADMIN_KEY"] = cls._prev_admin_key
cls._import_tmp.cleanup()

def setUp(self):
self._tmp = tempfile.TemporaryDirectory()
self.db_path = os.path.join(self._tmp.name, "admin_fail_closed.db")
self._prev_module_db = self.mod.DB_PATH
self._prev_admin_env = os.environ.get("RC_ADMIN_KEY")
self.mod.DB_PATH = self.db_path
os.environ["RC_ADMIN_KEY"] = ADMIN_KEY
self._init_db()
self.client = self.mod.app.test_client()

def tearDown(self):
self.mod.DB_PATH = self._prev_module_db
if self._prev_admin_env is None:
os.environ.pop("RC_ADMIN_KEY", None)
else:
os.environ["RC_ADMIN_KEY"] = self._prev_admin_env
self._tmp.cleanup()

def _init_db(self):
with closing(sqlite3.connect(self.db_path)) as db:
db.executescript(
"""
CREATE TABLE balances (
miner_id TEXT PRIMARY KEY,
amount_i64 INTEGER DEFAULT 0,
balance_rtc REAL DEFAULT 0
);
CREATE TABLE ledger (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
epoch INTEGER NOT NULL,
miner_id TEXT NOT NULL,
delta_i64 INTEGER NOT NULL,
reason TEXT
);
CREATE TABLE pending_ledger (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
epoch INTEGER NOT NULL,
from_miner TEXT NOT NULL,
to_miner TEXT NOT NULL,
amount_i64 INTEGER NOT NULL,
reason TEXT,
status TEXT DEFAULT 'pending',
created_at INTEGER NOT NULL,
confirms_at INTEGER NOT NULL,
tx_hash TEXT,
voided_by TEXT,
voided_reason TEXT,
confirmed_at INTEGER
);
CREATE UNIQUE INDEX idx_pending_ledger_tx_hash ON pending_ledger(tx_hash);
"""
)
db.execute(
"INSERT INTO balances (miner_id, amount_i64, balance_rtc) VALUES (?, ?, ?)",
("alice", 10_000_000, 10.0),
)
db.execute(
"INSERT INTO balances (miner_id, amount_i64, balance_rtc) VALUES (?, ?, ?)",
("bob", 0, 0.0),
)

def test_wallet_transfer_rejects_empty_header_when_admin_key_unset(self):
os.environ.pop("RC_ADMIN_KEY", None)

resp = self.client.post(
"/wallet/transfer",
json={"from_miner": "alice", "to_miner": "bob", "amount_rtc": 1},
)

self.assertEqual(resp.status_code, 401)
with closing(sqlite3.connect(self.db_path)) as db:
pending_count = db.execute("SELECT COUNT(*) FROM pending_ledger").fetchone()[0]
self.assertEqual(pending_count, 0)

def test_pending_confirm_rejects_empty_header_when_admin_key_unset(self):
os.environ.pop("RC_ADMIN_KEY", None)

resp = self.client.post("/pending/confirm", json={})

self.assertEqual(resp.status_code, 401)

def test_pending_integrity_rejects_empty_header_when_admin_key_unset(self):
os.environ.pop("RC_ADMIN_KEY", None)

resp = self.client.get("/pending/integrity")

self.assertEqual(resp.status_code, 401)


if __name__ == "__main__":
unittest.main()
Loading