instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Test / test_channelcast.py
1 # Written by Arno Bakker\r
2 # see LICENSE.txt for license information\r
3 \r
4 # TODO: let one hit to SIMPLE+METADATA be P2PURL\r
5 import unittest\r
6 import os\r
7 import sys\r
8 import time\r
9 import tempfile\r
10 import shutil\r
11 from BaseLib.Core.Utilities.Crypto import sha\r
12 from types import StringType, DictType, IntType\r
13 from M2Crypto import EC\r
14 from copy import deepcopy\r
15 from BaseLib.Test.test_as_server import TestAsServer\r
16 from olconn import OLConnection\r
17 from BaseLib.Core.API import *\r
18 from BaseLib.Core.BitTornado.bencode import bencode,bdecode\r
19 from BaseLib.Core.BitTornado.BT1.MessageID import *\r
20 from BaseLib.Core.BuddyCast.moderationcast_util import validChannelCastMsg, validVoteCastMsg\r
21 from BaseLib.Core.BuddyCast.channelcast import ChannelCastCore\r
22 from BaseLib.Core.BuddyCast.buddycast import BuddyCastCore\r
23 from BaseLib.Core.BuddyCast.votecast import VoteCastCore\r
24 from BaseLib.Core.CacheDB.sqlitecachedb import str2bin,bin2str\r
25 \r
26 DEBUG=True\r
27 \r
28 class TestChannels(TestAsServer):\r
29     """ \r
30     Testing QUERY message of Social Network extension V1\r
31     """\r
32     \r
33     def setUpPreSession(self):\r
34         """ override TestAsServer """\r
35         TestAsServer.setUpPreSession(self)\r
36         self.config.set_buddycast(True)\r
37         BuddyCastCore.TESTASSERVER = True\r
38         ChannelCastCore.TESTASSERVER = True\r
39         VoteCastCore.TESTASSERVER = True\r
40         self.config.set_start_recommender(True)\r
41         self.config.set_bartercast(True) \r
42         self.config.set_remote_query(True)\r
43         self.config.set_crawler(False)       \r
44         self.config.set_torrent_collecting_dir(os.path.join(self.config_path, "tmp_torrent_collecting"))\r
45 \r
46         # Write superpeers.txt and DB schema\r
47         self.install_path = tempfile.mkdtemp()\r
48         spdir = os.path.join(self.install_path, LIBRARYNAME, 'Core')\r
49         os.makedirs(spdir)\r
50 \r
51         statsdir = os.path.join(self.install_path, LIBRARYNAME, 'Core', 'Statistics')\r
52         os.makedirs(statsdir)\r
53         \r
54         superpeerfilename = os.path.join(spdir, 'superpeer.txt')\r
55         print >> sys.stderr,"test: writing empty superpeers to",superpeerfilename\r
56         f = open(superpeerfilename, "w")\r
57         f.write('# Leeg')\r
58         f.close()\r
59 \r
60         self.config.set_install_dir(self.install_path)\r
61         \r
62         srcfiles = []\r
63         srcfiles.append(os.path.join(LIBRARYNAME,"schema_sdb_v5.sql"))\r
64         for srcfile in srcfiles:\r
65             sfn = os.path.join('..','..',srcfile)\r
66             dfn = os.path.join(self.install_path,srcfile)\r
67             print >>sys.stderr,"test: copying",sfn,dfn\r
68             shutil.copyfile(sfn,dfn)\r
69 \r
70 \r
71     def setUpPostSession(self):\r
72         """ override TestAsServer """\r
73         TestAsServer.setUpPostSession(self)\r
74 \r
75         self.mypermid = str(self.my_keypair.pub().get_der())\r
76         self.hispermid = str(self.his_keypair.pub().get_der())\r
77 \r
78         \r
79     def setupDB(self,nickname):\r
80         # Change at runtime. Must be set before DB inserts\r
81         self.session.set_nickname(nickname)\r
82         \r
83         self.torrent_db = self.session.open_dbhandler(NTFY_TORRENTS)\r
84         self.channelcast_db = self.session.open_dbhandler(NTFY_CHANNELCAST)\r
85         self.votecast_db = self.session.open_dbhandler(NTFY_VOTECAST)\r
86         try:\r
87             # Add some torrents belonging to own channel\r
88             tdef1, self.bmetainfo1 = self.get_default_torrent('sumfilename1','Hallo S01E10')\r
89             dbrec= self.torrent_db.addExternalTorrent(tdef1, extra_info={"filename":"sumfilename1"})\r
90             self.infohash1 = tdef1.get_infohash()\r
91             self.channelcast_db.addOwnTorrent(tdef1)\r
92             \r
93             tdef2, self.bmetainfo2 = self.get_default_torrent('sumfilename2','Hallo S02E01')\r
94             dbrec = self.torrent_db.addExternalTorrent(tdef2, extra_info={"filename":"sumfilename2"})\r
95             self.infohash2 = tdef2.get_infohash()\r
96             self.torrenthash2 = sha(self.bmetainfo2).digest()\r
97             self.channelcast_db.addOwnTorrent(tdef2)\r
98     \r
99             tdef3, self.bmetainfo3 = self.get_default_torrent('sumfilename3','Halo Demo')\r
100             self.torrent_db.addExternalTorrent(tdef3, extra_info={"filename":"sumfilename3"})\r
101             self.infohash3 = tdef3.get_infohash()\r
102             self.torrenthash3 = sha(self.bmetainfo3).digest()\r
103             self.channelcast_db.addOwnTorrent(tdef3)\r
104             \r
105             # Now, add some votes\r
106             self.votecast_db.subscribe("MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTU")\r
107             self.votecast_db.spam("MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTX")\r
108             vote = {'mod_id':"MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTU", 'voter_id':"MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAIV8h+eS+vQ+0uqZNv3MYYTLo5s0JP+cmkvJ7U4JAHhfRv1wCqZSKIuY7Q+3ESezhRnnmmX4pbOVhKTX",'vote':1, 'time_stamp':132314}\r
109             self.votecast_db.addVote(vote)\r
110         except:\r
111             print_exc()\r
112         \r
113 \r
114     def tearDown(self):\r
115         TestAsServer.tearDown(self)\r
116         self.session.close_dbhandler(self.torrent_db)\r
117       \r
118 \r
119     def get_default_torrent(self,filename,title,paths=None):\r
120         metainfo = {}\r
121         metainfo['announce'] = 'http://localhost:0/announce'\r
122         metainfo['announce-list'] = []\r
123         metainfo['creation date'] = int(time.time())\r
124         metainfo['encoding'] = 'UTF-8'\r
125         info = {}\r
126         info['name'] = title.encode("UTF-8")\r
127         info['piece length'] = 2 ** 16\r
128         info['pieces'] = '*' * 20\r
129         if paths is None:\r
130             info['length'] = 481\r
131         else:\r
132             d1 = {}\r
133             d1['path'] = [paths[0].encode("UTF-8")]\r
134             d1['length'] = 201\r
135             d2 = {}\r
136             d2['path'] = [paths[1].encode("UTF-8")]\r
137             d2['length'] = 280\r
138             info['files'] = [d1,d2]\r
139             \r
140         metainfo['info'] = info\r
141         path = os.path.join(self.config.get_torrent_collecting_dir(),filename)\r
142         tdef = TorrentDef.load_from_dict(metainfo)\r
143         tdef.save(path)\r
144         return tdef, bencode(metainfo)\r
145 \r
146 \r
147     def singtest_plain_nickname(self):\r
148         self._test_all("nick")\r
149         \r
150     def singtest_unicode_nickname(self):\r
151         self._test_all(u"nick\u00f3")\r
152 \r
153 \r
154     def _test_all(self,nickname):\r
155         """ \r
156             I want to start a Tribler client once and then connect to\r
157             it many times. So there must be only one test method\r
158             to prevent setUp() from creating a new client every time.\r
159 \r
160             The code is constructed so unittest will show the name of the\r
161             (sub)test where the error occured in the traceback it prints.\r
162         """\r
163         \r
164         self.setupDB(nickname)\r
165         \r
166         # test ChannelCast\r
167         self.subtest_channelcast()\r
168         \r
169         # test VoteCast\r
170         self.subtest_votecast()\r
171         \r
172         # test ChannelQuery-keyword\r
173         self.subtest_channel_keyword_query(nickname)\r
174         \r
175         # test ChannelQuery-permid\r
176         self.subtest_channel_permid_query(nickname)\r
177         \r
178         #test voting\r
179         self.subtest_voting()\r
180 \r
181     def subtest_voting(self):\r
182         self.votecast_db.unsubscribe(bin2str(self.mypermid))\r
183         self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),None)\r
184         #print >> sys.stderr, self.votecast_db.getAll()\r
185 \r
186         self.votecast_db.spam(bin2str(self.mypermid))\r
187         self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),-1)\r
188         #print >> sys.stderr, self.votecast_db.getAll()\r
189                 \r
190         self.votecast_db.subscribe(bin2str(self.mypermid))\r
191         self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),2)\r
192         #print >> sys.stderr, self.votecast_db.getAll()\r
193         \r
194         self.votecast_db.unsubscribe(bin2str(self.mypermid))\r
195         self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),None)\r
196         #print >> sys.stderr, self.votecast_db.getAll()\r
197         \r
198         self.votecast_db.spam(bin2str(self.mypermid))\r
199         self.assertEqual(self.votecast_db.getVote(bin2str(self.mypermid),bin2str(self.hispermid)),-1)\r
200         #print >> sys.stderr, self.votecast_db.getAll()\r
201         \r
202     def check_chquery_reply(self, data, nickname):\r
203         d = bdecode(data)\r
204         self.assert_(type(d) == DictType)\r
205         self.assert_(d.has_key('a'))\r
206         self.assert_(d.has_key('id'))\r
207         id = d['id']\r
208         self.assert_(type(id) == StringType)\r
209         self.assert_(validChannelCastMsg(d['a'])==True)\r
210         self.assert_(len(d['a']) > 0)\r
211         for key,val in d['a'].iteritems():\r
212             self.assert_(val['publisher_name'] == nickname.encode("UTF-8"))\r
213             self.assert_(val['publisher_id'] == self.hispermid)\r
214 \r
215     def subtest_channel_permid_query(self,nickname):\r
216         print >>sys.stderr,"test: chquery permid-----------------------------"\r
217         s = OLConnection(self.my_keypair,'localhost',self.hisport)\r
218         data = {}\r
219         uq = u'CHANNEL p '+ bin2str(self.hispermid)\r
220         data['q'] = uq.encode("UTF-8")\r
221         data['id'] = 'b' * 20\r
222         msg = QUERY + bencode(data)\r
223         s.send(msg)\r
224         resp = s.recv()\r
225         #print >> sys.stderr, "printing resp", resp\r
226         if len(resp) > 0:\r
227             print >>sys.stderr,"test: chquery: got",getMessageName(resp[0])\r
228         self.assert_(resp[0]==QUERY_REPLY)\r
229         self.check_chquery_reply(resp[1:],nickname)\r
230         print >>sys.stderr,"test:",`bdecode(resp[1:])`\r
231         s.close()\r
232         \r
233     def subtest_channel_keyword_query(self,nickname):\r
234         print >>sys.stderr,"test: chquery keyword-----------------------------"\r
235         s = OLConnection(self.my_keypair,'localhost',self.hisport)\r
236         data = {}\r
237         uq = u'CHANNEL k '+nickname\r
238         data['q'] = uq.encode("UTF-8")\r
239         data['id'] = 'b' * 20\r
240         msg = QUERY + bencode(data)\r
241         s.send(msg)\r
242         resp = s.recv()\r
243         #print >> sys.stderr, "printing resp", resp\r
244         if len(resp) > 0:\r
245             print >>sys.stderr,"test: chquery: got",getMessageName(resp[0])\r
246         self.assert_(resp[0]==QUERY_REPLY)\r
247         self.check_chquery_reply(resp[1:],nickname)\r
248         print >>sys.stderr,"test:",`bdecode(resp[1:])`\r
249         s.close()\r
250         \r
251     def subtest_votecast(self):\r
252         print >>sys.stderr,"test: votecast-----------------------------"\r
253         s = OLConnection(self.my_keypair,'localhost',self.hisport)\r
254         vcast = VoteCastCore(None, s, self.session, None, log = '', dnsindb = None)\r
255         \r
256         #Send Good VoteCast message\r
257         vdata = {self.hispermid:{'vote':-1,'time_stamp':12345345}}\r
258         print >> sys.stderr, "Test Good VoteCast", `vdata`\r
259         msg = VOTECAST+bencode(vdata)\r
260         s.send(msg)\r
261         resp = s.recv()\r
262         #print >> sys.stderr, "printing resp", resp\r
263         if len(resp) > 0:\r
264             print >>sys.stderr,"test: votecast: got",getMessageName(resp[0])\r
265         self.assert_(resp[0]==VOTECAST)\r
266         print >>sys.stderr, "test: votecast: got msg", `bdecode(resp[1:])`\r
267         vdata_rcvd = bdecode(resp[1:])\r
268         self.assert_(validVoteCastMsg(vdata_rcvd)==True)\r
269         s.close()\r
270         \r
271         #Now, send a bad ChannelCast messages\r
272         # The other side should close the connection\r
273         \r
274         #Bad time_stamp: it can only int\r
275         vdata = {bin2str(self.hispermid):{'vote':-1,'time_stamp':'halo'}}\r
276         self.subtest_bad_votecast(vdata)\r
277         \r
278         #Bad Vote: Vote can only -1 or 2\r
279         vdata = {bin2str(self.hispermid):{'vote':-15,'time_stamp':12345345}}\r
280         self.subtest_bad_votecast(vdata)\r
281         \r
282         # Bad Message format ... Correct format is 'time_stamp'\r
283         vdata = {bin2str(self.hispermid):{'vote':-15,'timestamp':12345345}}\r
284         self.subtest_bad_votecast(vdata)\r
285         \r
286         print>>sys.stderr, "End of votecast test"\r
287     \r
288     def subtest_bad_votecast(self, vdata):\r
289         s = OLConnection(self.my_keypair,'localhost',self.hisport)\r
290         vcast = VoteCastCore(None, s, self.session, None, log = '', dnsindb = None)\r
291         print >> sys.stderr, "Test Bad VoteCast", `vdata`\r
292         msg = VOTECAST+bencode(vdata)\r
293         s.send(msg)\r
294         self.assert_(len(s.recv())==0)\r
295         s.close()\r
296                     \r
297     def subtest_channelcast(self):\r
298         print >>sys.stderr,"test: channelcast----------------------"\r
299         s = OLConnection(self.my_keypair,'localhost',self.hisport)\r
300         chcast = ChannelCastCore(None, s, self.session, None, log = '', dnsindb = None)\r
301         \r
302         #Send Empty ChannelCast message\r
303         chdata = {}\r
304         print >> sys.stderr, "Test Good ChannelCast", `chdata`\r
305         msg = CHANNELCAST+bencode(chdata)\r
306         s.send(msg)\r
307         resp = s.recv()\r
308         if len(resp) > 0:\r
309             print >>sys.stderr,"test: channelcast: got",getMessageName(resp[0])\r
310         self.assert_(resp[0]==CHANNELCAST)\r
311         print >>sys.stderr, "test: channelcast: got msg", `bdecode(resp[1:])`\r
312         chdata_rcvd = bdecode(resp[1:])\r
313         self.assert_(validChannelCastMsg(chdata_rcvd)==True)\r
314         s.close() \r
315         \r
316         #Now, send a bad ChannelCast message.\r
317         # The other side should close the connection\r
318         # Create bad message by manipulating a good one\r
319         #bad infohash\r
320         chdata = deepcopy(chdata_rcvd)\r
321         for k,v in chdata.items():\r
322             v['infohash'] = 234\r
323         self.subtest_bad_channelcast(chdata)\r
324         \r
325         #bad torrentname\r
326         chdata = deepcopy(chdata_rcvd)\r
327         for k,v in chdata.items():\r
328             v['torrentname'] = 1231\r
329         self.subtest_bad_channelcast(chdata)\r
330         \r
331         #bad signature.. temporarily disabled. \r
332         # Got to enable when signature validation in validChannelCastMsg are enabled\r
333 #        chdata = deepcopy(chdata_rcvd)\r
334 #        value_list = chdata.values()\r
335 #        if len(value_list)>0:\r
336 #            chdata['sdfg234sadf'] = value_list[0]\r
337 #            self.subtest_bad_channelcast(chdata)\r
338                 \r
339         #Bad message format\r
340         chdata = {'2343ww34':''}\r
341         self.subtest_bad_channelcast(chdata)\r
342         \r
343         #Bad \r
344         print>>sys.stderr, "End of channelcast test---------------------------"\r
345                \r
346     \r
347     def subtest_bad_channelcast(self, chdata):\r
348         s = OLConnection(self.my_keypair,'localhost',self.hisport)\r
349         chcast = ChannelCastCore(None, s, self.session, None, log = '', dnsindb = None)\r
350         print >> sys.stderr, "Test Bad ChannelCast", `chdata`\r
351         msg = CHANNELCAST+bencode(chdata)\r
352         s.send(msg)\r
353         self.assert_(len(s.recv())==0)\r
354         s.close()\r
355             \r
356 \r
357 \r
358 def test_suite():\r
359     suite = unittest.TestSuite()\r
360     # We should run the tests in a separate Python interpreter to prevent \r
361     # problems with our singleton classes, e.g. PeerDB, etc.\r
362     if len(sys.argv) != 2:\r
363         print "Usage: python test_channelcasst.py <method name>"\r
364     else:\r
365         suite.addTest(TestChannels(sys.argv[1]))\r
366     \r
367     return suite\r
368 \r
369 def main():\r
370     unittest.main(defaultTest='test_suite',argv=[sys.argv[0]])\r
371 \r
372 if __name__ == "__main__":\r
373     main()\r