instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / TrackerChecking / TorrentChecking.py
1 # written by Yuan Yuan, Jie Yang
2 # see LICENSE.txt for license information
3 #
4 #  
5 # New Tracker Checking Algortihm by Jie Yang
6 # ==========================
7
8 # Each time when a torrent checking thread starts, it uses one policy to select
9 # a torrent to check. The question turns to how to set the weight of these policies.
10 #
11 # Policy 1: Random 1/3
12 #   Randomly select a torrent to collect (last_check < 5 min ago)
13 #
14 # Policy 2: Oldest (unknown) first  1/3
15 #   Select the non-dead torrent which was not been checked for the longest time (last_check < 5 min ago)
16 #
17 # Policy 3: Popular (good) first    1/3
18 #   Select the non-dead most popular (3*num_seeders+num_leechers) one which has not been checked in last N seconds
19 #   (The default N = 4 hours, so at most 4h/torrentchecking_interval popular peers)
20 #
21 #===============================================================================
22
23 import sys
24 import threading
25 from threading import Thread
26 from random import sample
27 from time import time
28
29 from BaseLib.Core.BitTornado.bencode import bdecode
30 from BaseLib.TrackerChecking.TrackerChecking import trackerChecking
31 from BaseLib.Core.CacheDB.sqlitecachedb import safe_dict
32
33
34 # LAYERVIOLATION: careful: uses two threads depending on code, make sure we have DB session per thread.
35 from BaseLib.Core.CacheDB.CacheDBHandler import TorrentDBHandler
36 from BaseLib.Core.DecentralizedTracking.mainlineDHTChecker import mainlineDHTChecker
37 #from BaseLib.Core.Overlay.OverlayThreadingBridge import OverlayThreadingBridge
38
39 DEBUG = False
40
41 class TorrentChecking(Thread):
42     
43     def __init__(self, infohash=None):
44         Thread.__init__(self)
45         self.setName('TorrentChecking'+self.getName())
46         if DEBUG:
47             print >> sys.stderr, 'TorrentChecking: Started torrentchecking', threading.currentThread().getName()
48         self.setDaemon(True)
49         
50         self.infohash = infohash
51         self.retryThreshold = 10
52         self.gnThreashold = 0.9
53         self.mldhtchecker = mainlineDHTChecker.getInstance()
54         self.db_thread = None   # if it is set, use another thread to access db
55         #self.db_thread = OverlayThreadingBridge.getInstance()
56         
57     def selectPolicy(self):
58         policies = ["oldest", "random", "popular"]
59         return sample(policies, 1)[0]
60         
61     def readTorrent(self, torrent):
62         try:
63             torrent_path = torrent['torrent_path']
64             f = open(torrent_path,'rb')
65             _data = f.read()
66             f.close()
67             data = bdecode(_data)
68             assert 'info' in data
69             del data['info']
70             torrent['info'] = data
71             return torrent
72         except Exception:
73             #print_exc()
74             return torrent
75             
76     def run(self):
77         """ Gets one torrent from good or unknown list and checks it """
78         
79         try:
80             if DEBUG:
81                 print >> sys.stderr, "Torrent Checking: RUN", threading.currentThread().getName()
82                 
83             event = threading.Event()
84             return_value = safe_dict()
85             return_value['event'] = event
86             return_value['torrent'] = None
87             if self.infohash is None:   # select torrent by a policy
88                 policy = self.selectPolicy()
89                 if self.db_thread:
90                     self.db_thread.add_task(lambda:
91                         TorrentDBHandler.getInstance().selectTorrentToCheck(policy=policy, return_value=return_value))
92                 else:
93                     TorrentDBHandler.getInstance().selectTorrentToCheck(policy=policy, return_value=return_value)
94             else:   # know which torrent to check
95                 if self.db_thread:
96                     self.db_thread.add_task(lambda:TorrentDBHandler.getInstance().selectTorrentToCheck(infohash=self.infohash, return_value=return_value))
97                 else:
98                     TorrentDBHandler.getInstance().selectTorrentToCheck(infohash=self.infohash, return_value=return_value)
99             event.wait(60.0)
100             
101             torrent = return_value['torrent']
102             if DEBUG:
103                 print >> sys.stderr, "Torrent Checking: get value from DB:", torrent
104             
105             if not torrent:
106                 return
107     
108             if self.infohash is None and torrent['ignored_times'] > 0:
109                 if DEBUG:
110                     print >> sys.stderr, 'Torrent_checking: torrent: %s' % torrent
111                 kw = { 'ignored_times': torrent['ignored_times']-1 }
112                 if self.db_thread:
113                     self.db_thread.add_task(lambda:
114                         TorrentDBHandler.getInstance().updateTracker(torrent['infohash'], kw))
115                 else:
116                     TorrentDBHandler.getInstance().updateTracker(torrent['infohash'], kw)
117                 return
118     
119             # may be block here because the internet IO
120             torrent = self.readTorrent(torrent)    # read the torrent 
121             if 'info' not in torrent:    #torrent has been deleted
122                 if self.db_thread:
123                     self.db_thread.add_task(lambda:
124                         TorrentDBHandler.getInstance().deleteTorrent(torrent['infohash']))
125                 else:
126                     TorrentDBHandler.getInstance().deleteTorrent(torrent['infohash'])
127                 return
128             
129             # TODO: tracker checking also needs to be update
130             if DEBUG:
131                 print >> sys.stderr, "Tracker Checking"
132             trackerChecking(torrent)
133             
134             # Must come after tracker check, such that if tracker dead and DHT still alive, the
135             # status is still set to good
136             self.mldhtchecker.lookup(torrent['infohash'])
137             
138             self.updateTorrentInfo(torrent)            # set the ignored_times
139             
140             kw = {
141                 'last_check_time': int(time()),
142                 'seeder': torrent['seeder'],
143                 'leecher': torrent['leecher'],
144                 'status': torrent['status'],
145                 'ignored_times': torrent['ignored_times'],
146                 'retried_times': torrent['retried_times'],
147                 #'info': torrent['info']
148                 }
149             
150             if DEBUG:
151                 print >> sys.stderr, "Torrent Checking: selectTorrentToCheck:", kw
152             
153             if self.db_thread:
154                 self.db_thread.add_task(lambda:
155                     TorrentDBHandler.getInstance().updateTorrent(torrent['infohash'], **kw))
156             else:
157                 TorrentDBHandler.getInstance().updateTorrent(torrent['infohash'], **kw)
158         finally:
159             if not self.db_thread:
160                 TorrentDBHandler.getInstance().close()
161             
162 #===============================================================================
163 #    def tooFast(self, torrent):
164 #        interval_time = long(time()) - torrent["last_check_time"]
165 #        if interval_time < 60 * 5:
166 #            return True
167 #        return False
168 #===============================================================================
169     
170     def updateTorrentInfo(self,torrent):
171         if torrent["status"] == "good":
172             torrent["ignored_times"] = 0
173         elif torrent["status"] == "unknown":
174             if torrent["retried_times"] > self.retryThreshold:    # set to dead
175                 torrent["ignored_times"] = 0
176                 torrent["status"] = "dead"
177             else:
178                 torrent["retried_times"] += 1 
179                 torrent["ignored_times"] = torrent["retried_times"]
180         elif torrent["status"] == "dead": # dead
181             if torrent["retried_times"] < self.retryThreshold:
182                 torrent["retried_times"] += 1 
183                     
184     def tooMuchRetry(self, torrent):
185         if (torrent["retried_times"] > self.retryThreshold):
186             return True
187         return False
188
189
190 if __name__ == '__main__':
191     from BaseLib.Core.CacheDB.sqlitecachedb import init as init_db, str2bin
192     configure_dir = sys.argv[1]
193     config = {}
194     config['state_dir'] = configure_dir
195     config['install_dir'] = '.'
196     config['peer_icon_path'] = '.'
197     init_db(config)
198     t = TorrentChecking()
199     t.start()
200     t.join()
201     
202     
203     infohash_str = 'TkFX5S4qd2DPW63La/VObgOH/Nc='
204     infohash = str2bin(infohash_str)
205     
206     del t
207     
208     t = TorrentChecking(infohash)
209     t.start()
210     t.join()
211