instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Test / test_buddycast2_datahandler.py
1 # Written by Jie Yang
2 # see LICENSE.txt for license information
3
4 # Arno, pychecker-ing: the addTarget and getTarget methods of JobQueue are
5 # no longer there, this code needs to be updated.
6
7 # 17/02/10 Boudewijn: this test reads a superpeer log to get actual
8 # buddycast messages.  However, these messages were wrtten to the log
9 # using readableBuddyCastMsg(...) and are NOT made back into normal
10 # buddycast messages.  This causes some buddycast messages to be
11 # silently dropped.
12
13 import os
14 import sys
15 import unittest
16 from traceback import print_exc
17 from shutil import copy as copyFile, move
18 from time import sleep
19 import base64
20 import math
21
22 from BaseLib.Core.defaults import *
23 from BaseLib.Core.BuddyCast.buddycast import DataHandler, BuddyCastFactory
24 from BaseLib.Core.CacheDB.CacheDBHandler import *
25 from BaseLib.Category.Category import Category
26 from BaseLib.Utilities.TimedTaskQueue import TimedTaskQueue
27 from BaseLib.Core.Statistics.Crawler import Crawler
28 from bak_tribler_sdb import *
29
30
31 STATE_FILE_NAME_PATH = os.path.join(FILES_DIR, 'tribler.sdb-journal')
32 S_TORRENT_PATH_BACKUP = os.path.join(FILES_DIR, 'bak_single.torrent')
33 S_TORRENT_PATH = os.path.join(FILES_DIR, 'single.torrent')
34
35 M_TORRENT_PATH_BACKUP = os.path.join(FILES_DIR, 'bak_multiple.torrent')    
36 M_TORRENT_PATH = os.path.join(FILES_DIR, 'multiple.torrent')    
37 BUSYTIMEOUT = 5000
38
39
40 def init():
41     init_bak_tribler_sdb()
42
43     db = SQLiteCacheDB.getInstance()
44     db.initDB(TRIBLER_DB_PATH, busytimeout=BUSYTIMEOUT)
45     
46     print >>sys.stderr,"OPENING DB",TRIBLER_DB_PATH
47     
48     #db.execute_write('drop index Torrent_relevance_idx')
49     TorrentDBHandler.getInstance().register(Category.getInstance('..'),'.')
50
51
52 class FakeSession:
53     sessconfig = {}
54     def get_permid(self, *args, **kargs):
55         return base64.decodestring('MG0CAQEEHR/bQNvwga7Ury5+8vg/DTGgmMpGCz35Zs/2iz7coAcGBSuBBAAaoUADPgAEAL2I5yVc1+dWVEx3nbriRKJmOSlQePZ9LU7yYQoGABMvU1uGHvqnT9t+53eaCGziV12MZ1g2p0GLmZP9\n' )
56
57     def get_moderationcast_moderations_per_have(self, *args, **kargs):
58         return 100
59
60     def add_observer(self, *args, **kargs):
61         pass
62
63     def get_votecast_recent_votes(self):
64         return sessdefaults['votecast_recent_votes']
65     
66     def get_votecast_random_votes(self):
67         return sessdefaults['votecast_random_votes']
68
69
70 class FakeLauchMany:
71     
72     def __init__(self):
73         self.session = FakeSession()
74         self.crawler = Crawler.get_instance(self.session)
75         
76         self.my_db          = MyDBHandler.getInstance()
77         self.peer_db        = PeerDBHandler.getInstance()
78         self.torrent_db     = TorrentDBHandler.getInstance()
79         self.torrent_db.register(Category.getInstance(),'.')
80         self.mypref_db      = MyPreferenceDBHandler.getInstance()
81         self.pref_db        = PreferenceDBHandler.getInstance()
82         self.superpeer_db   = SuperPeerDBHandler.getInstance()
83         self.friend_db      = FriendDBHandler.getInstance()
84         self.bartercast_db  = BarterCastDBHandler.getInstance()
85         self.bartercast_db.registerSession(self.session)
86         self.secure_overlay = FakeSecureOverlay()
87 #        torrent_collecting_dir = os.path.abspath(config['torrent_collecting_dir'])
88         self.listen_port = 1234
89
90         self.channelcast_db = ChannelCastDBHandler.getInstance()
91         self.channelcast_db.registerSession(self.session)
92
93         self.votecast_db = VoteCastDBHandler.getInstance()
94         self.votecast_db.registerSession(self.session)
95         self.simi_db        = SimilarityDBHandler.getInstance()
96         self.pops_db = PopularityDBHandler.getInstance()
97
98     def get_ext_ip(self):
99         return None
100     
101     def set_activity(self, NTFY_ACT_RECOMMEND, buf):
102         pass
103     
104 class FakeThread:
105     def join(self):
106         pass
107     
108 class FakeSecureOverlay:
109     def get_dns_from_peerdb(self, permid):
110         return None    
111     
112 class FakeOverlayBridge:
113     
114     def __init__(self):
115         self.thread = FakeThread()
116                     
117     def add_task(self, task, time=0, id=None):
118         if task == 'stop':
119             return
120         task()
121
122
123 class TestBuddyCastDataHandler(unittest.TestCase):
124     
125     def setUp(self):
126         # prepare database
127
128         launchmany = FakeLauchMany()
129         self.overlay_bridge = TimedTaskQueue() 
130         #self.overlay_bridge = FakeOverlayBridge()
131         self.data_handler = DataHandler(launchmany, self.overlay_bridge, max_num_peers=2500)
132
133     def tearDown(self):
134         self.overlay_bridge.add_task('quit')
135         
136     def test_postInit(self):
137         #self.data_handler.postInit()
138         self.data_handler.postInit(1,50,0, 50)
139         #from time import sleep
140         
141 class TestBuddyCast(unittest.TestCase):
142     
143     def setUp(self):
144         # prepare database
145
146         launchmany = FakeLauchMany()
147         self.overlay_bridge = TimedTaskQueue() 
148         #self.overlay_bridge = FakeOverlayBridge()
149         superpeer=False # enable it to test superpeer
150         self.bc = BuddyCastFactory.getInstance(superpeer=superpeer)
151         self.bc.register(self.overlay_bridge, launchmany, None, 
152                  None, None, True)
153
154     def tearDown(self):
155         self.overlay_bridge.add_task('quit')
156         print "Before join"
157
158     def remove_t_index(self):
159         indices = [
160         'Torrent_length_idx',
161         'Torrent_creation_date_idx',
162         'Torrent_relevance_idx',
163         'Torrent_num_seeders_idx',
164         'Torrent_num_leechers_idx',
165         #'Torrent_name_idx',
166         ]
167         for index in indices:
168             sql = 'drop index ' + index
169             self.data_handler.torrent_db._db.execute_write(sql)
170             
171     def remove_p_index(self):
172         indices = [
173         'Peer_name_idx',
174         'Peer_ip_idx',
175         'Peer_similarity_idx',
176         'Peer_last_seen_idx',
177         'Peer_last_connected_idx',
178         'Peer_num_peers_idx',
179         'Peer_num_torrents_idx'
180         ]
181         for index in indices:
182             sql = 'drop index ' + index
183             self.data_handler.peer_db._db.execute_write(sql)
184
185     def local_test(self):
186                 
187         self.remove_t_index()
188         self.remove_p_index()
189                 
190         from BaseLib.Test.log_parser import get_buddycast_data
191         
192         #start_time = time()
193         #print >> sys.stderr, "buddycast: ******************* start local test"
194         costs = []
195         self.data_handler.postInit(updatesim=False)
196         for permid, selversion, msg in get_buddycast_data(os.path.join(FILES_DIR,'superpeer120070902sp7001.log')):
197             message = bencode(msg)
198             #print 'got msg:', permid, selversion, message
199
200             try:
201                 s = time()
202                 self.bc.gotBuddyCastMessage(message, permid, selversion)
203                 cost = time()-s
204                 costs.append(cost)
205             except:
206                 print_exc()
207                 break
208
209             print 'got msg: %d %.2f %.2f %.2f %.2f' %(len(costs), cost, min(costs), sum(costs)/len(costs), max(costs))
210         # with all indices, min/avg/max:  0.00 1.78 4.57 seconds
211         # without index, min/avg/max:  0.00 1.38 3.43 seconds  (58)
212         print "Done"
213        
214     def test_start(self):
215         try:
216             self.bc.olthread_register(start=False)
217             self.data_handler = self.bc.data_handler
218             self.local_test()
219             print "Sleeping for 10 secs"
220             sleep(10)
221             print "Done2"
222             
223         except:
224             print_exc()
225             self.assert_(False)
226     
227 def test_suite():
228     suite = unittest.TestSuite()
229     suite.addTest(unittest.makeSuite(TestBuddyCastDataHandler))
230     suite.addTest(unittest.makeSuite(TestBuddyCast))
231     
232     return suite
233
234     
235 def main():
236     init()
237     unittest.main(defaultTest='test_suite')
238
239 if __name__ == '__main__':
240     main()
241     
242