-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfetch_data_blocks_transactions.py
More file actions
156 lines (120 loc) · 5.73 KB
/
fetch_data_blocks_transactions.py
File metadata and controls
156 lines (120 loc) · 5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import subprocess
import json
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
class NexaCLI:
def __init__(self, cli_path, rpc_user, rpc_password):
self.cli_path = f'"{cli_path}"'
self.rpc_user = rpc_user
self.rpc_password = rpc_password
def run_command(self, command):
full_command = f"{self.cli_path} -rpcuser={self.rpc_user} -rpcpassword={self.rpc_password} {command}"
try:
result = subprocess.run(full_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
if result.stdout:
return result.stdout.strip()
elif result.stderr:
print(f"Error: {result.stderr.strip()}")
return ""
except Exception as e:
print(f"Exception occurred: {e}")
return ""
def get_latest_block_height(cli):
# Get the latest block height
command = "getblockcount"
output = cli.run_command(command)
if output.isdigit():
return int(output.strip())
else:
print(f"Failed to retrieve block count: {output}")
return None
def get_block_data(cli, block_height):
command = f"getblockhash {block_height}"
block_hash = cli.run_command(command).strip()
if not block_hash:
print(f"Failed to retrieve block hash for height {block_height}")
return {}
command = f"getblock {block_hash} 1"
block_data_output = cli.run_command(command)
try:
block_data = json.loads(block_data_output)
except json.JSONDecodeError as e:
print(f"JSONDecodeError for block {block_height}: {e}")
print(f"Command output: {block_data_output}")
return {}
return {block_height: block_data}
def get_transaction_data(cli, txid):
command = f"getrawtransaction {txid} 1"
output = cli.run_command(command)
if "error code: -5" in output:
print(f"getrawtransaction failed for {txid}, transaction might not be indexed or is not in the mempool.")
return None
if not output:
print(f"Error: No output for transaction {txid}")
return None
try:
transaction_data = json.loads(output)
except json.JSONDecodeError as e:
print(f"JSONDecodeError for transaction {txid}: {e}")
print(f"Command output: {output}")
return None
return transaction_data
def get_latest_n_blocks(cli, n, num_processes=None):
latest_block_height = get_latest_block_height(cli)
if latest_block_height is None:
return {}
if n > latest_block_height + 1:
print(f"Warning: Requested {n} blocks, but only {latest_block_height + 1} are available. Adjusting to {latest_block_height + 1}.")
n = latest_block_height + 1
block_heights = [latest_block_height - i for i in range(n)]
print(f"Starting block data retrieval with {num_processes} parallel processes...")
blocks_dict = {}
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = {executor.submit(get_block_data, cli, block_height): block_height for block_height in block_heights}
for i, future in enumerate(as_completed(futures)):
block_data = future.result()
blocks_dict.update(block_data)
# Print progress every 10 iterations
if (i + 1) % 10 == 0 or (i + 1) == len(block_heights):
percentage = ((i + 1) / len(block_heights)) * 100
print(f"Retrieved block {list(block_data.keys())[0]} ({i + 1}/{len(block_heights)}) - {percentage:.2f}% complete")
return blocks_dict
def get_all_transactions(cli, blocks_dict, num_processes=None):
txids = [txid for block_data in blocks_dict.values() for txid in block_data.get('txid', [])]
print(f"Starting transaction data retrieval with {num_processes} parallel processes...")
transactions_data = []
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = {executor.submit(get_transaction_data, cli, txid): txid for txid in txids}
for i, future in enumerate(as_completed(futures)):
tx_data = future.result()
if tx_data:
transactions_data.append(tx_data)
# Print real-time progress
percentage = ((i + 1) / len(txids)) * 100
if i % 10 == 0: # Print every 10 iterations
print(f"Processed transaction ({i + 1}/{len(txids)}) - {percentage:.2f}% complete")
return transactions_data
def save_block_data(df_blocks, save_json=False):
if save_json:
json_file_name = "nexa_last_blocks.json"
df_blocks.to_json(json_file_name, orient='records')
print(f"Block data saved to {json_file_name}")
def save_transaction_data(transactions_data, save_json=False):
if save_json:
json_file_name = "nexa_transactions.json"
with open(json_file_name, "w") as f:
json.dump(transactions_data, f, indent=4)
print(f"Transaction data saved to {json_file_name}")
if __name__ == "__main__":
cli_path = "C:/Program Files/Nexa/daemon/nexa-cli"
rpc_user = "myusername"
rpc_password = "mypassword"
cli = NexaCLI(cli_path, rpc_user, rpc_password)
n_blocks = 100
num_processes = 8 # You can adjust this number based on your system's capabilities
blocks_dict = get_latest_n_blocks(cli, n_blocks, num_processes)
df_blocks = pd.DataFrame.from_dict(blocks_dict, orient='index')
print(df_blocks)
save_block_data(df_blocks, save_json=True)
transactions_dict = get_all_transactions(cli, blocks_dict, num_processes)
save_transaction_data(transactions_dict, save_json=True)