Skip to content

Commit a57d956

Browse files
author
Jonathan Warren
committed
Merge pull request #242 from Atheros1/master
Modularize PyBitmessage into multiple files
2 parents 958cf03 + 32aaaf2 commit a57d956

10 files changed

Lines changed: 983 additions & 933 deletions

src/bitmessagemain.py

Lines changed: 81 additions & 933 deletions
Large diffs are not rendered by default.

src/class_addressGenerator.py

Lines changed: 277 additions & 0 deletions
Large diffs are not rendered by default.

src/class_singleCleaner.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import threading
2+
import shared
3+
import time
4+
from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers
5+
6+
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
7+
It cleans these data structures in memory:
8+
inventory (moves data to the on-disk sql database)
9+
10+
It cleans these tables on the disk:
11+
inventory (clears data more than 2 days and 12 hours old)
12+
pubkeys (clears pubkeys older than 4 weeks old which we have not used personally)
13+
14+
It resends messages when there has been no response:
15+
resends getpubkey messages in 4 days (then 8 days, then 16 days, etc...)
16+
resends msg messages in 4 days (then 8 days, then 16 days, etc...)
17+
18+
'''
19+
20+
21+
class singleCleaner(threading.Thread):
22+
23+
def __init__(self):
24+
threading.Thread.__init__(self)
25+
26+
def run(self):
27+
timeWeLastClearedInventoryAndPubkeysTables = 0
28+
29+
while True:
30+
shared.sqlLock.acquire()
31+
shared.UISignalQueue.put((
32+
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
33+
for hash, storedValue in shared.inventory.items():
34+
objectType, streamNumber, payload, receivedTime = storedValue
35+
if int(time.time()) - 3600 > receivedTime:
36+
t = (hash, objectType, streamNumber, payload, receivedTime)
37+
shared.sqlSubmitQueue.put(
38+
'''INSERT INTO inventory VALUES (?,?,?,?,?)''')
39+
shared.sqlSubmitQueue.put(t)
40+
shared.sqlReturnQueue.get()
41+
del shared.inventory[hash]
42+
shared.sqlSubmitQueue.put('commit')
43+
shared.UISignalQueue.put(('updateStatusBar', ''))
44+
shared.sqlLock.release()
45+
shared.broadcastToSendDataQueues((
46+
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
47+
# If we are running as a daemon then we are going to fill up the UI
48+
# queue which will never be handled by a UI. We should clear it to
49+
# save memory.
50+
if shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'):
51+
shared.UISignalQueue.queue.clear()
52+
if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380:
53+
timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
54+
# inventory (moves data from the inventory data structure to
55+
# the on-disk sql database)
56+
shared.sqlLock.acquire()
57+
# inventory (clears pubkeys after 28 days and everything else
58+
# after 2 days and 12 hours)
59+
t = (int(time.time()) - lengthOfTimeToLeaveObjectsInInventory, int(
60+
time.time()) - lengthOfTimeToHoldOnToAllPubkeys)
61+
shared.sqlSubmitQueue.put(
62+
'''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''')
63+
shared.sqlSubmitQueue.put(t)
64+
shared.sqlReturnQueue.get()
65+
66+
# pubkeys
67+
t = (int(time.time()) - lengthOfTimeToHoldOnToAllPubkeys,)
68+
shared.sqlSubmitQueue.put(
69+
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''')
70+
shared.sqlSubmitQueue.put(t)
71+
shared.sqlReturnQueue.get()
72+
shared.sqlSubmitQueue.put('commit')
73+
74+
t = ()
75+
shared.sqlSubmitQueue.put(
76+
'''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='awaitingpubkey' OR status='msgsent') AND folder='sent') ''') # If the message's folder='trash' then we'll ignore it.
77+
shared.sqlSubmitQueue.put(t)
78+
queryreturn = shared.sqlReturnQueue.get()
79+
for row in queryreturn:
80+
if len(row) < 5:
81+
shared.printLock.acquire()
82+
sys.stderr.write(
83+
'Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
84+
time.sleep(3)
85+
shared.printLock.release()
86+
break
87+
toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber = row
88+
if status == 'awaitingpubkey':
89+
if int(time.time()) - lastactiontime > (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (pubkeyretrynumber))):
90+
print 'It has been a long time and we haven\'t heard a response to our getpubkey request. Sending again.'
91+
try:
92+
del neededPubkeys[
93+
toripe] # We need to take this entry out of the neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently.
94+
except:
95+
pass
96+
97+
shared.UISignalQueue.put((
98+
'updateStatusBar', 'Doing work necessary to again attempt to request a public key...'))
99+
t = (int(
100+
time.time()), pubkeyretrynumber + 1, toripe)
101+
shared.sqlSubmitQueue.put(
102+
'''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''')
103+
shared.sqlSubmitQueue.put(t)
104+
shared.sqlReturnQueue.get()
105+
shared.sqlSubmitQueue.put('commit')
106+
shared.workerQueue.put(('sendmessage', ''))
107+
else: # status == msgsent
108+
if int(time.time()) - lastactiontime > (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))):
109+
print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.'
110+
t = (int(
111+
time.time()), msgretrynumber + 1, 'msgqueued', ackdata)
112+
shared.sqlSubmitQueue.put(
113+
'''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''')
114+
shared.sqlSubmitQueue.put(t)
115+
shared.sqlReturnQueue.get()
116+
shared.sqlSubmitQueue.put('commit')
117+
shared.workerQueue.put(('sendmessage', ''))
118+
shared.UISignalQueue.put((
119+
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...'))
120+
shared.sqlSubmitQueue.put('commit')
121+
shared.sqlLock.release()
122+
time.sleep(300)

0 commit comments

Comments
 (0)