Skip to content

Commit 9da3c3d

Browse files
committed
Merge branch 'AadityaKhurana-message_signature'
2 parents a93da8b + bdb6644 commit 9da3c3d

2 files changed

Lines changed: 133 additions & 54 deletions

File tree

bitcoinutils/keys.py

Lines changed: 92 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -528,74 +528,113 @@ class PublicKey:
528528
returns the corresponding P2trAddress object
529529
"""
530530

531-
def __init__(self, hex_str: str) -> None:
531+
def __init__(self, hex_str: str = None, message: str = None, signature: bytes = None) -> None:
532532
"""
533533
Parameters
534534
----------
535-
hex_str : str
535+
hex_str : str, optional
536536
the public key in hex string
537+
538+
In case of generating public key from message and signature:-
539+
message : str, optional
540+
The original message that was signed
541+
signature : bytes, optional
542+
A 65-byte Bitcoin signature (1-byte recovery ID + 64-byte ECDSA signature).
537543
538544
Raises
539545
------
540546
TypeError
541547
If first byte of public key (corresponding to SEC format) is
542548
invalid.
549+
If neither hex_str nor (message, signature) are provided
550+
ValueError
551+
If message is empty when attempting recovery
552+
If signature is not exactly 65 bytes
553+
If an invalid recovery ID is detected
543554
"""
544-
hex_str = hex_str.strip()
555+
if hex_str:
556+
hex_str = hex_str.strip()
545557

546-
# Normalize hex string by removing '0x' prefix and any whitespace
547-
if hex_str.lower().startswith('0x'):
548-
hex_str = hex_str[2:]
558+
# Normalize hex string by removing '0x' prefix and any whitespace
559+
if hex_str.lower().startswith('0x'):
560+
hex_str = hex_str[2:]
549561

550-
# expects key as hex string - SEC format
551-
first_byte_in_hex = hex_str[:2] # 2 hex chars = 1 byte
552-
hex_bytes = h_to_b(hex_str)
562+
# expects key as hex string - SEC format
563+
first_byte_in_hex = hex_str[:2] # 2 hex chars = 1 byte
564+
hex_bytes = h_to_b(hex_str)
553565

554-
taproot = False
566+
taproot = False
555567

556-
# check if compressed or not
557-
if len(hex_bytes) > 33:
558-
# uncompressed - SEC format: 0x04 + x + y coordinates (x,y are 32 byte
559-
# numbers)
568+
# check if compressed or not
569+
if len(hex_bytes) > 33:
570+
# uncompressed - SEC format: 0x04 + x + y coordinates (x,y are 32 byte
571+
# numbers)
560572

561-
# remove first byte and instantiate ecdsa key
562-
self.key = VerifyingKey.from_string(hex_bytes[1:], curve=SECP256k1)
563-
elif len(hex_bytes) > 31:
564-
# key is either compressed or in x-only taproot format
573+
# remove first byte and instantiate ecdsa key
574+
self.key = VerifyingKey.from_string(hex_bytes[1:], curve=SECP256k1)
575+
elif len(hex_bytes) > 31:
576+
# key is either compressed or in x-only taproot format
565577

566-
# taproot public keys are exactly 32 bytes
567-
if len(hex_bytes) == 32:
568-
taproot = True
578+
# taproot public keys are exactly 32 bytes
579+
if len(hex_bytes) == 32:
580+
taproot = True
569581

570-
# compressed - SEC FORMAT: 0x02|0x03 + x coordinate (if 02 then y
571-
# is even else y is odd. Calculate y and then instantiate the ecdsa key
572-
x_coord = int(hex_str[2:], 16)
582+
# compressed - SEC FORMAT: 0x02|0x03 + x coordinate (if 02 then y
583+
# is even else y is odd. Calculate y and then instantiate the ecdsa key
584+
x_coord = int(hex_str[2:], 16)
573585

574-
# y = modulo_square_root( (x**3 + 7) mod p ) -- there will be 2 y values
575-
y_values = sqrt_mod(
576-
(x_coord**3 + 7) % Secp256k1Params._p, Secp256k1Params._p, True
577-
)
586+
# y = modulo_square_root( (x**3 + 7) mod p ) -- there will be 2 y values
587+
y_values = sqrt_mod(
588+
(x_coord**3 + 7) % Secp256k1Params._p, Secp256k1Params._p, True
589+
)
578590

579-
assert y_values is not None
580-
# check SEC format's first byte to determine which of the 2 values to use
581-
if first_byte_in_hex == "02" or taproot:
582-
# y is the even value
583-
if y_values[0] % 2 == 0: # type: ignore
584-
y_coord = y_values[0] # type: ignore
591+
assert y_values is not None
592+
# check SEC format's first byte to determine which of the 2 values to use
593+
if first_byte_in_hex == "02" or taproot:
594+
# y is the even value
595+
if y_values[0] % 2 == 0: # type: ignore
596+
y_coord = y_values[0] # type: ignore
597+
else:
598+
y_coord = y_values[1] # type: ignore
599+
elif first_byte_in_hex == "03":
600+
# y is the odd value
601+
if y_values[0] % 2 == 0: # type: ignore
602+
y_coord = y_values[1] # type: ignore
603+
else:
604+
y_coord = y_values[0] # type: ignore
585605
else:
586-
y_coord = y_values[1] # type: ignore
587-
elif first_byte_in_hex == "03":
588-
# y is the odd value
589-
if y_values[0] % 2 == 0: # type: ignore
590-
y_coord = y_values[1] # type: ignore
591-
else:
592-
y_coord = y_values[0] # type: ignore
593-
else:
594-
raise TypeError("Invalid SEC compressed format")
595-
596-
uncompressed_hex = f"{x_coord:064x}{y_coord:064x}"
597-
uncompressed_hex_bytes = h_to_b(uncompressed_hex)
598-
self.key = VerifyingKey.from_string(uncompressed_hex_bytes, curve=SECP256k1)
606+
raise TypeError("Invalid SEC compressed format")
607+
608+
uncompressed_hex = f"{x_coord:064x}{y_coord:064x}"
609+
uncompressed_hex_bytes = h_to_b(uncompressed_hex)
610+
self.key = VerifyingKey.from_string(uncompressed_hex_bytes, curve=SECP256k1)
611+
elif message or signature:
612+
if not message:
613+
raise ValueError("Empty message provided for public key recovery.")
614+
615+
if(len(signature) != 65):
616+
raise ValueError("Invalid signature length, must be exactly 65 bytes")
617+
618+
# The compressed signature is of the format: recovery_id (1 byte) | r (32 bytes) | s (32 bytes)
619+
# We subtract the prefix(27) for uncompressed signatures and an additional 4 (31) for compressed signatures to get the recovery id
620+
recovery_id = signature[0] - 31
621+
if not (0 <= recovery_id <= 3): # A valid recovery ID is between 0 and 3
622+
raise ValueError(f"Invalid recovery ID: expected 31-34, got {signature[0]}")
623+
624+
signature = signature[1:] #Remove recovery id from signature
625+
626+
# All bitcoin signatures include the magic prefix. It is just a string
627+
# added to the message to distinguish Bitcoin-specific messages.
628+
message_magic = add_magic_prefix(message)
629+
# create message digest
630+
message_digest = hashlib.sha256(hashlib.sha256(message_magic).digest()).digest()
631+
632+
recovered_keys = VerifyingKey.from_public_key_recovery_with_digest(
633+
signature, message_digest, curve=SECP256k1, hashfunc = hashlib.sha256, sigdecode=sigdecode_string
634+
)
635+
self.key = recovered_keys[recovery_id]
636+
else:
637+
raise TypeError("Either 'hex_str' or ('message', 'signature') must be provided.")
599638

600639
@classmethod
601640
def from_hex(cls, hex_str: str) -> PublicKey:
@@ -665,11 +704,12 @@ def is_y_even(self) -> bool:
665704
return y % 2 == 0
666705

667706
@classmethod
668-
def from_message_signature(cls, signature):
669-
# TODO implement (add signature=None in __init__, etc.)
670-
# TODO plus does this apply to DER signatures as well?
671-
# return cls(signature=signature)
672-
raise BaseException("NO-OP!")
707+
def from_message_signature(cls, message, signature):
708+
"""Recovers a public key from a Bitcoin-signed message and a 65-byte compressed signature.
709+
"""
710+
#Note: Only works for compressed signatures because DER encoding does not contain the recovery id
711+
return cls(message=message, signature=signature)
712+
# raise BaseException("NO-OP!")
673713

674714
@classmethod
675715
def verify_message(cls, address: str, signature: str, message: str) -> bool:

tests/test_keys.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
)
2424
from bitcoinutils.script import Script
2525
from bitcoinutils.hdwallet import HDWallet
26+
from base64 import b64decode
2627

2728

2829
class TestPrivateKeys(unittest.TestCase):
@@ -72,6 +73,13 @@ def setUp(self):
7273
b"\x08\xa8\xfd\x17\xb4H\xa6\x85T\x19\x9cG\xd0\x8f\xfb\x10\xd4\xb8"
7374
)
7475
self.address = "1EHNa6Q4Jz2uvNExL497mE43ikXhwF6kZm"
76+
77+
# Message public key recovery tests
78+
self.valid_message = "Hello, Bitcoin!"
79+
# 65-byte Bitcoin signature (1-byte recovery ID + 64-byte ECDSA signature)
80+
self.valid_signature = b'\x1f\x0c\xfc\xd8V\xec27)\xa7\xfc\x02:\xda\xcfT\xb2*\x02\x16.\xe2s\x7f\x18[&^\xb3e\xee3"KN\xfct\x011Z[\x05\xb5\xea\n!\xe8\xce\x9em\x89/\xf2\xa0\x15\x83{\x7f\x9e\xba+\xb4\xf8&\x15'
81+
# Known valid public key corresponding to the message + signature
82+
self.expected_public_key = '02649abc7094d2783670255073ccfd132677555ca84045c5a005611f25ef51fdbf'
7583

7684
def test_pubkey_creation(self):
7785
pub1 = PublicKey(self.public_key_hex)
@@ -98,6 +106,38 @@ def test_pubkey_to_hash160(self):
98106
def test_pubkey_x_only(self):
99107
pub = PublicKey(self.public_key_hex)
100108
self.assertEqual(pub.to_x_only_hex(), self.public_key_hex[2:66])
109+
110+
#Tests for PublicKey recovery from message and signature
111+
def test_public_key_recovery_valid(self):
112+
"""Test successful public key recovery from a valid message and signature"""
113+
pubkey = PublicKey(message=self.valid_message, signature=self.valid_signature)
114+
self.assertEqual(pubkey.key.to_string("compressed").hex(), self.expected_public_key)
115+
116+
def test_invalid_signature_length(self):
117+
"""Test handling of invalid signature length (not 65 bytes)"""
118+
short_signature = self.valid_signature[:60] # Truncate signature to 60 bytes
119+
with self.assertRaises(ValueError) as context:
120+
PublicKey(message=self.valid_message, signature=short_signature)
121+
self.assertEqual(str(context.exception), "Invalid signature length, must be exactly 65 bytes")
122+
123+
def test_invalid_recovery_id(self):
124+
"""Test handling of an invalid recovery ID"""
125+
invalid_signature = bytes([50]) + self.valid_signature[1:] # Modify recovery ID to 50
126+
with self.assertRaises(ValueError) as context:
127+
PublicKey(message=self.valid_message, signature=invalid_signature)
128+
self.assertIn("Invalid recovery ID", str(context.exception))
129+
130+
def test_missing_parameters(self):
131+
"""Test that missing both hex_str and (message, signature) raises an error"""
132+
with self.assertRaises(TypeError) as context:
133+
PublicKey()
134+
self.assertEqual(str(context.exception), "Either 'hex_str' or ('message', 'signature') must be provided.")
135+
136+
def test_empty_message(self):
137+
"""Test handling of an empty message for public key recovery"""
138+
with self.assertRaises(ValueError) as context:
139+
PublicKey(message="", signature=self.valid_signature)
140+
self.assertEqual(str(context.exception), "Empty message provided for public key recovery.")
101141

102142

103143
class TestP2pkhAddresses(unittest.TestCase):
@@ -311,7 +351,6 @@ def test_legacy_address_from_mnemonic(self):
311351
hdw.from_path("m/44'/1'/0'/0/3")
312352
address = hdw.get_private_key().get_public_key().get_address()
313353
self.assertTrue(address.to_string(), self.legacy_address_m_44_1h_0h_0_3)
314-
315-
354+
316355
if __name__ == "__main__":
317356
unittest.main()

0 commit comments

Comments
 (0)