1 # written by Yuan Yuan, Jie Yang
2 # see LICENSE.txt for license information
5 # New Tracker Checking Algortihm by Jie Yang
6 # ==========================
8 # Each time when a torrent checking thread starts, it uses one policy to select
9 # a torrent to check. The question turns to how to set the weight of these policies.
11 # Policy 1: Random 1/3
12 # Randomly select a torrent to collect (last_check < 5 min ago)
14 # Policy 2: Oldest (unknown) first 1/3
15 # Select the non-dead torrent which was not been checked for the longest time (last_check < 5 min ago)
17 # Policy 3: Popular (good) first 1/3
18 # Select the non-dead most popular (3*num_seeders+num_leechers) one which has not been checked in last N seconds
19 # (The default N = 4 hours, so at most 4h/torrentchecking_interval popular peers)
21 #===============================================================================
25 from threading import Thread
26 from random import sample
29 from BaseLib.Core.BitTornado.bencode import bdecode
30 from BaseLib.TrackerChecking.TrackerChecking import trackerChecking
31 from BaseLib.Core.CacheDB.sqlitecachedb import safe_dict
34 # LAYERVIOLATION: careful: uses two threads depending on code, make sure we have DB session per thread.
35 from BaseLib.Core.CacheDB.CacheDBHandler import TorrentDBHandler
36 from BaseLib.Core.DecentralizedTracking.mainlineDHTChecker import mainlineDHTChecker
37 #from BaseLib.Core.Overlay.OverlayThreadingBridge import OverlayThreadingBridge
41 class TorrentChecking(Thread):
43 def __init__(self, infohash=None):
45 self.setName('TorrentChecking'+self.getName())
47 print >> sys.stderr, 'TorrentChecking: Started torrentchecking', threading.currentThread().getName()
50 self.infohash = infohash
51 self.retryThreshold = 10
52 self.gnThreashold = 0.9
53 self.mldhtchecker = mainlineDHTChecker.getInstance()
54 self.db_thread = None # if it is set, use another thread to access db
55 #self.db_thread = OverlayThreadingBridge.getInstance()
57 def selectPolicy(self):
58 policies = ["oldest", "random", "popular"]
59 return sample(policies, 1)[0]
61 def readTorrent(self, torrent):
63 torrent_path = torrent['torrent_path']
64 f = open(torrent_path,'rb')
70 torrent['info'] = data
77 """ Gets one torrent from good or unknown list and checks it """
81 print >> sys.stderr, "Torrent Checking: RUN", threading.currentThread().getName()
83 event = threading.Event()
84 return_value = safe_dict()
85 return_value['event'] = event
86 return_value['torrent'] = None
87 if self.infohash is None: # select torrent by a policy
88 policy = self.selectPolicy()
90 self.db_thread.add_task(lambda:
91 TorrentDBHandler.getInstance().selectTorrentToCheck(policy=policy, return_value=return_value))
93 TorrentDBHandler.getInstance().selectTorrentToCheck(policy=policy, return_value=return_value)
94 else: # know which torrent to check
96 self.db_thread.add_task(lambda:TorrentDBHandler.getInstance().selectTorrentToCheck(infohash=self.infohash, return_value=return_value))
98 TorrentDBHandler.getInstance().selectTorrentToCheck(infohash=self.infohash, return_value=return_value)
101 torrent = return_value['torrent']
103 print >> sys.stderr, "Torrent Checking: get value from DB:", torrent
108 if self.infohash is None and torrent['ignored_times'] > 0:
110 print >> sys.stderr, 'Torrent_checking: torrent: %s' % torrent
111 kw = { 'ignored_times': torrent['ignored_times']-1 }
113 self.db_thread.add_task(lambda:
114 TorrentDBHandler.getInstance().updateTracker(torrent['infohash'], kw))
116 TorrentDBHandler.getInstance().updateTracker(torrent['infohash'], kw)
119 # may be block here because the internet IO
120 torrent = self.readTorrent(torrent) # read the torrent
121 if 'info' not in torrent: #torrent has been deleted
123 self.db_thread.add_task(lambda:
124 TorrentDBHandler.getInstance().deleteTorrent(torrent['infohash']))
126 TorrentDBHandler.getInstance().deleteTorrent(torrent['infohash'])
129 # TODO: tracker checking also needs to be update
131 print >> sys.stderr, "Tracker Checking"
132 trackerChecking(torrent)
134 # Must come after tracker check, such that if tracker dead and DHT still alive, the
135 # status is still set to good
136 self.mldhtchecker.lookup(torrent['infohash'])
138 self.updateTorrentInfo(torrent) # set the ignored_times
141 'last_check_time': int(time()),
142 'seeder': torrent['seeder'],
143 'leecher': torrent['leecher'],
144 'status': torrent['status'],
145 'ignored_times': torrent['ignored_times'],
146 'retried_times': torrent['retried_times'],
147 #'info': torrent['info']
151 print >> sys.stderr, "Torrent Checking: selectTorrentToCheck:", kw
154 self.db_thread.add_task(lambda:
155 TorrentDBHandler.getInstance().updateTorrent(torrent['infohash'], **kw))
157 TorrentDBHandler.getInstance().updateTorrent(torrent['infohash'], **kw)
159 if not self.db_thread:
160 TorrentDBHandler.getInstance().close()
162 #===============================================================================
163 # def tooFast(self, torrent):
164 # interval_time = long(time()) - torrent["last_check_time"]
165 # if interval_time < 60 * 5:
168 #===============================================================================
170 def updateTorrentInfo(self,torrent):
171 if torrent["status"] == "good":
172 torrent["ignored_times"] = 0
173 elif torrent["status"] == "unknown":
174 if torrent["retried_times"] > self.retryThreshold: # set to dead
175 torrent["ignored_times"] = 0
176 torrent["status"] = "dead"
178 torrent["retried_times"] += 1
179 torrent["ignored_times"] = torrent["retried_times"]
180 elif torrent["status"] == "dead": # dead
181 if torrent["retried_times"] < self.retryThreshold:
182 torrent["retried_times"] += 1
184 def tooMuchRetry(self, torrent):
185 if (torrent["retried_times"] > self.retryThreshold):
190 if __name__ == '__main__':
191 from BaseLib.Core.CacheDB.sqlitecachedb import init as init_db, str2bin
192 configure_dir = sys.argv[1]
194 config['state_dir'] = configure_dir
195 config['install_dir'] = '.'
196 config['peer_icon_path'] = '.'
198 t = TorrentChecking()
203 infohash_str = 'TkFX5S4qd2DPW63La/VObgOH/Nc='
204 infohash = str2bin(infohash_str)
208 t = TorrentChecking(infohash)