1 # Written by Arno Bakker
\r
2 # see LICENSE.txt for license information
\r
4 # TODO: let one hit to SIMPLE+METADATA be P2PURL
\r
11 from BaseLib.Core.Utilities.Crypto import sha
\r
12 from types import StringType, DictType, IntType
\r
13 from M2Crypto import EC
\r
14 from copy import deepcopy
\r
15 from BaseLib.Test.test_as_server import TestAsServer
\r
16 from olconn import OLConnection
\r
17 from BaseLib.Core.API import *
\r
18 from BaseLib.Core.BitTornado.bencode import bencode,bdecode
\r
19 from BaseLib.Core.BitTornado.BT1.MessageID import *
\r
20 from BaseLib.Core.BuddyCast.moderationcast_util import validChannelCastMsg, validVoteCastMsg
\r
21 from BaseLib.Core.BuddyCast.channelcast import ChannelCastCore
\r
22 from BaseLib.Core.BuddyCast.buddycast import BuddyCastCore
\r
23 from BaseLib.Core.BuddyCast.votecast import VoteCastCore
\r
24 from BaseLib.Core.CacheDB.sqlitecachedb import str2bin,bin2str
\r
28 class TestChannels(TestAsServer):
\r
30 Testing QUERY message of Social Network extension V1
\r
33 def setUpPreSession(self):
\r
34 """ override TestAsServer """
\r
35 TestAsServer.setUpPreSession(self)
\r
36 self.config.set_buddycast(True)
\r
37 BuddyCastCore.TESTASSERVER = True
\r
38 ChannelCastCore.TESTASSERVER = True
\r
39 VoteCastCore.TESTASSERVER = True
\r
40 self.config.set_start_recommender(True)
\r
41 self.config.set_bartercast(True)
\r
42 self.config.set_remote_query(True)
\r
43 self.config.set_crawler(False)
\r
44 self.config.set_torrent_collecting_dir(os.path.join(self.config_path, "tmp_torrent_collecting"))
\r
46 # Write superpeers.txt and DB schema
\r
47 self.install_path = tempfile.mkdtemp()
\r
48 spdir = os.path.join(self.install_path, LIBRARYNAME, 'Core')
\r
51 statsdir = os.path.join(self.install_path, LIBRARYNAME, 'Core', 'Statistics')
\r
52 os.makedirs(statsdir)
\r
54 superpeerfilename = os.path.join(spdir, 'superpeer.txt')
\r
55 print >> sys.stderr,"test: writing empty superpeers to",superpeerfilename
\r
56 f = open(superpeerfilename, "w")
\r
60 self.config.set_install_dir(self.install_path)
\r
63 srcfiles.append(os.path.join(LIBRARYNAME,"schema_sdb_v5.sql"))
\r
64 for srcfile in srcfiles:
\r
65 sfn = os.path.join('..','..',srcfile)
\r
66 dfn = os.path.join(self.install_path,srcfile)
\r
67 print >>sys.stderr,"test: copying",sfn,dfn
\r
68 shutil.copyfile(sfn,dfn)
\r
71 def setUpPostSession(self):
\r
72 """ override TestAsServer """
\r
73 TestAsServer.setUpPostSession(self)
\r
75 self.mypermid = str(self.my_keypair.pub().get_der())
\r
76 self.hispermid = str(self.his_keypair.pub().get_der())
\r
79 def setupDB(self,nickname):
\r
80 # Change at runtime. Must be set before DB inserts
\r
81 self.session.set_nickname(nickname)
\r
83 self.torrent_db = self.session.open_dbhandler(NTFY_TORRENTS)
\r
84 self.channelcast_db = self.session.open_dbhandler(NTFY_CHANNELCAST)
\r
85 self.votecast_db = self.session.open_dbhandler(NTFY_VOTECAST)
\r
87 # Add some torrents belonging to own channel
\r
88 tdef1, self.bmetainfo1 = self.get_default_torrent('sumfilename1','Hallo S01E10')
\r
89 dbrec= self.torrent_db.addExternalTorrent(tdef1, extra_info={"filename":"sumfilename1"})
\r
90 self.infohash1 = tdef1.get_infohash()
\r
91 self.channelcast_db.addOwnTorrent(tdef1)
\r
93 tdef2, self.bmetainfo2 = self.get_default_torrent('sumfilename2','Hallo S02E01')
\r
94 dbrec = self.torrent_db.addExternalTorrent(tdef2, extra_info={"filename":"sumfilename2"})
\r
95 self.infohash2 = tdef2.get_infohash()
\r
96 self.torrenthash2 = sha(self.bmetainfo2).digest()
\r
97 self.channelcast_db.addOwnTorrent(tdef2)
\r
99 tdef3, self.bmetainfo3 = self.get_default_torrent('sumfilename3','Halo Demo')
\r
100 self.torrent_db.addExternalTorrent(tdef3, extra_info={"filename":"sumfilename3"})
\r
101 self.infohash3 = tdef3.get_infohash()
\r
102 self.torrenthash3 = sha(self.bmetainfo3).digest()
\r
103 self.channelcast_db.addOwnTorrent(tdef3)
\r
105 # Now, add some votes
\r
106 self.votecast_db.subscribe("MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTU")
\r
107 self.votecast_db.spam("MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTX")
\r
108 vote = {'mod_id':"MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTU", 'voter_id':"MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTX",'vote':1, 'time_stamp':132314}
\r
109 self.votecast_db.addVote(vote)
\r
114 def tearDown(self):
\r
115 TestAsServer.tearDown(self)
\r
116 self.session.close_dbhandler(self.torrent_db)
\r
119 def get_default_torrent(self,filename,title,paths=None):
\r
121 metainfo['announce'] = 'http://localhost:0/announce'
\r
122 metainfo['announce-list'] = []
\r
123 metainfo['creation date'] = int(time.time())
\r
124 metainfo['encoding'] = 'UTF-8'
\r
126 info['name'] = title.encode("UTF-8")
\r
127 info['piece length'] = 2 ** 16
\r
128 info['pieces'] = '*' * 20
\r
130 info['length'] = 481
\r
133 d1['path'] = [paths[0].encode("UTF-8")]
\r
136 d2['path'] = [paths[1].encode("UTF-8")]
\r
138 info['files'] = [d1,d2]
\r
140 metainfo['info'] = info
\r
141 path = os.path.join(self.config.get_torrent_collecting_dir(),filename)
\r
142 tdef = TorrentDef.load_from_dict(metainfo)
\r
144 return tdef, bencode(metainfo)
\r
147 def singtest_plain_nickname(self):
\r
148 self._test_all("nick")
\r
150 def singtest_unicode_nickname(self):
\r
151 self._test_all(u"nick\u00f3")
\r
154 def _test_all(self,nickname):
\r
156 I want to start a Tribler client once and then connect to
\r
157 it many times. So there must be only one test method
\r
158 to prevent setUp() from creating a new client every time.
\r
160 The code is constructed so unittest will show the name of the
\r
161 (sub)test where the error occured in the traceback it prints.
\r
164 self.setupDB(nickname)
\r
167 self.subtest_channelcast()
\r
170 self.subtest_votecast()
\r
172 # test ChannelQuery-keyword
\r
173 self.subtest_channel_keyword_query(nickname)
\r
175 # test ChannelQuery-permid
\r
176 self.subtest_channel_permid_query(nickname)
\r
179 self.subtest_voting()
\r
181 def subtest_voting(self):
\r
182 self.votecast_db.unsubscribe(bin2str(self.mypermid))
\r
183 self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),None)
\r
184 #print >> sys.stderr, self.votecast_db.getAll()
\r
186 self.votecast_db.spam(bin2str(self.mypermid))
\r
187 self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),-1)
\r
188 #print >> sys.stderr, self.votecast_db.getAll()
\r
190 self.votecast_db.subscribe(bin2str(self.mypermid))
\r
191 self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),2)
\r
192 #print >> sys.stderr, self.votecast_db.getAll()
\r
194 self.votecast_db.unsubscribe(bin2str(self.mypermid))
\r
195 self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),None)
\r
196 #print >> sys.stderr, self.votecast_db.getAll()
\r
198 self.votecast_db.spam(bin2str(self.mypermid))
\r
199 self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),-1)
\r
200 #print >> sys.stderr, self.votecast_db.getAll()
\r
202 def check_chquery_reply(self, data, nickname):
\r
204 self.assert_(type(d) == DictType)
\r
205 self.assert_(d.has_key('a'))
\r
206 self.assert_(d.has_key('id'))
\r
208 self.assert_(type(id) == StringType)
\r
209 self.assert_(validChannelCastMsg(d['a'])==True)
\r
210 self.assert_(len(d['a']) > 0)
\r
211 for key,val in d['a'].iteritems():
\r
212 self.assert_(val['publisher_name'] == nickname.encode("UTF-8"))
\r
213 self.assert_(val['publisher_id'] == self.hispermid)
\r
215 def subtest_channel_permid_query(self,nickname):
\r
216 print >>sys.stderr,"test: chquery permid-----------------------------"
\r
217 s = OLConnection(self.my_keypair,'localhost',self.hisport)
\r
219 uq = u'CHANNEL p '+ bin2str(self.hispermid)
\r
220 data['q'] = uq.encode("UTF-8")
\r
221 data['id'] = 'b' * 20
\r
222 msg = QUERY + bencode(data)
\r
225 #print >> sys.stderr, "printing resp", resp
\r
227 print >>sys.stderr,"test: chquery: got",getMessageName(resp[0])
\r
228 self.assert_(resp[0]==QUERY_REPLY)
\r
229 self.check_chquery_reply(resp[1:],nickname)
\r
230 print >>sys.stderr,"test:",`bdecode(resp[1:])`
\r
233 def subtest_channel_keyword_query(self,nickname):
\r
234 print >>sys.stderr,"test: chquery keyword-----------------------------"
\r
235 s = OLConnection(self.my_keypair,'localhost',self.hisport)
\r
237 uq = u'CHANNEL k '+nickname
\r
238 data['q'] = uq.encode("UTF-8")
\r
239 data['id'] = 'b' * 20
\r
240 msg = QUERY + bencode(data)
\r
243 #print >> sys.stderr, "printing resp", resp
\r
245 print >>sys.stderr,"test: chquery: got",getMessageName(resp[0])
\r
246 self.assert_(resp[0]==QUERY_REPLY)
\r
247 self.check_chquery_reply(resp[1:],nickname)
\r
248 print >>sys.stderr,"test:",`bdecode(resp[1:])`
\r
251 def subtest_votecast(self):
\r
252 print >>sys.stderr,"test: votecast-----------------------------"
\r
253 s = OLConnection(self.my_keypair,'localhost',self.hisport)
\r
254 vcast = VoteCastCore(None, s, self.session, None, log = '', dnsindb = None)
\r
256 #Send Good VoteCast message
\r
257 vdata = {self.hispermid:{'vote':-1,'time_stamp':12345345}}
\r
258 print >> sys.stderr, "Test Good VoteCast", `vdata`
\r
259 msg = VOTECAST+bencode(vdata)
\r
262 #print >> sys.stderr, "printing resp", resp
\r
264 print >>sys.stderr,"test: votecast: got",getMessageName(resp[0])
\r
265 self.assert_(resp[0]==VOTECAST)
\r
266 print >>sys.stderr, "test: votecast: got msg", `bdecode(resp[1:])`
\r
267 vdata_rcvd = bdecode(resp[1:])
\r
268 self.assert_(validVoteCastMsg(vdata_rcvd)==True)
\r
271 #Now, send a bad ChannelCast messages
\r
272 # The other side should close the connection
\r
274 #Bad time_stamp: it can only int
\r
275 vdata = {bin2str(self.hispermid):{'vote':-1,'time_stamp':'halo'}}
\r
276 self.subtest_bad_votecast(vdata)
\r
278 #Bad Vote: Vote can only -1 or 2
\r
279 vdata = {bin2str(self.hispermid):{'vote':-15,'time_stamp':12345345}}
\r
280 self.subtest_bad_votecast(vdata)
\r
282 # Bad Message format ... Correct format is 'time_stamp'
\r
283 vdata = {bin2str(self.hispermid):{'vote':-15,'timestamp':12345345}}
\r
284 self.subtest_bad_votecast(vdata)
\r
286 print>>sys.stderr, "End of votecast test"
\r
288 def subtest_bad_votecast(self, vdata):
\r
289 s = OLConnection(self.my_keypair,'localhost',self.hisport)
\r
290 vcast = VoteCastCore(None, s, self.session, None, log = '', dnsindb = None)
\r
291 print >> sys.stderr, "Test Bad VoteCast", `vdata`
\r
292 msg = VOTECAST+bencode(vdata)
\r
294 self.assert_(len(s.recv())==0)
\r
297 def subtest_channelcast(self):
\r
298 print >>sys.stderr,"test: channelcast----------------------"
\r
299 s = OLConnection(self.my_keypair,'localhost',self.hisport)
\r
300 chcast = ChannelCastCore(None, s, self.session, None, log = '', dnsindb = None)
\r
302 #Send Empty ChannelCast message
\r
304 print >> sys.stderr, "Test Good ChannelCast", `chdata`
\r
305 msg = CHANNELCAST+bencode(chdata)
\r
309 print >>sys.stderr,"test: channelcast: got",getMessageName(resp[0])
\r
310 self.assert_(resp[0]==CHANNELCAST)
\r
311 print >>sys.stderr, "test: channelcast: got msg", `bdecode(resp[1:])`
\r
312 chdata_rcvd = bdecode(resp[1:])
\r
313 self.assert_(validChannelCastMsg(chdata_rcvd)==True)
\r
316 #Now, send a bad ChannelCast message.
\r
317 # The other side should close the connection
\r
318 # Create bad message by manipulating a good one
\r
320 chdata = deepcopy(chdata_rcvd)
\r
321 for k,v in chdata.items():
\r
322 v['infohash'] = 234
\r
323 self.subtest_bad_channelcast(chdata)
\r
326 chdata = deepcopy(chdata_rcvd)
\r
327 for k,v in chdata.items():
\r
328 v['torrentname'] = 1231
\r
329 self.subtest_bad_channelcast(chdata)
\r
331 #bad signature.. temporarily disabled.
\r
332 # Got to enable when signature validation in validChannelCastMsg are enabled
\r
333 # chdata = deepcopy(chdata_rcvd)
\r
334 # value_list = chdata.values()
\r
335 # if len(value_list)>0:
\r
336 # chdata['sdfg234sadf'] = value_list[0]
\r
337 # self.subtest_bad_channelcast(chdata)
\r
339 #Bad message format
\r
340 chdata = {'2343ww34':''}
\r
341 self.subtest_bad_channelcast(chdata)
\r
344 print>>sys.stderr, "End of channelcast test---------------------------"
\r
347 def subtest_bad_channelcast(self, chdata):
\r
348 s = OLConnection(self.my_keypair,'localhost',self.hisport)
\r
349 chcast = ChannelCastCore(None, s, self.session, None, log = '', dnsindb = None)
\r
350 print >> sys.stderr, "Test Bad ChannelCast", `chdata`
\r
351 msg = CHANNELCAST+bencode(chdata)
\r
353 self.assert_(len(s.recv())==0)
\r
359 suite = unittest.TestSuite()
\r
360 # We should run the tests in a separate Python interpreter to prevent
\r
361 # problems with our singleton classes, e.g. PeerDB, etc.
\r
362 if len(sys.argv) != 2:
\r
363 print "Usage: python test_channelcasst.py <method name>"
\r
365 suite.addTest(TestChannels(sys.argv[1]))
\r
370 unittest.main(defaultTest='test_suite',argv=[sys.argv[0]])
\r
372 if __name__ == "__main__":
\r