instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Test / test_rquery_reply_active.py
1 # Written by Arno Bakker
2 # see LICENSE.txt for license information
3
4 import unittest
5 import sys
6 import time
7 from time import sleep
8 from types import StringType, DictType
9
10 from BaseLib.Core.TorrentDef import TorrentDef
11 from BaseLib.Core.BitTornado.bencode import bencode,bdecode
12 from BaseLib.Core.BitTornado.BT1.MessageID import QUERY, QUERY_REPLY, getMessageName
13
14 from olconn import OLConnection
15 from BaseLib.Test.test_as_server import TestAsServer
16
17 DEBUG=True
18
19 LENGTH = 481
20 LEECHERS = 22
21 SEEDERS = 11
22 CATEGORY = ' Video'
23
24 class TestQueryReplyActive(TestAsServer):
25
26     """  
27     Testing QUERY_REPLY message of Query extension V1 
28
29     This test checks how the Tribler code responds to good and bad 
30     QUERY_REPLY messages. I.e. the Tribler client initiates
31     the dialback by connecting to us and sending a QUERY and we
32     reply with good and bad messages.
33
34     This test allows authoritative answers from superpeers.
35
36     WARNING: Each of the test_ methods should be tested by running the TestCase 
37     in a separate Python interpreter to prevent problems with our singleton 
38     classes, e.g. SuperPeerDB, etc.
39     """
40
41     def setUpPreSession(self):
42         """ override TestAsServer """
43         print >> sys.stderr,"test: Pre Tribler Init"
44         TestAsServer.setUpPreSession(self)
45         print >> sys.stderr,"test: Pre Tribler Init: config_path",self.config_path
46         # Enable remote querying
47         self.config.set_remote_query(True)
48
49     def setUpPostSession(self):
50         """ override TestAsServer """
51         TestAsServer.setUpPostSession(self)
52         self.hispermid = str(self.his_keypair.pub().get_der())
53         self.my_permid = str(self.my_keypair.pub().get_der())
54
55     def pretest_simple(self,keyword):
56         self.pretest_q('SIMPLE',keyword)
57
58     def pretest_simpleplustorrents(self,keyword):
59         self.pretest_q('SIMPLE+METADATA',keyword)
60
61     def pretest_q(self,queryprefix,keyword):
62         
63         query = queryprefix+' '+keyword
64         
65         self.content_name = keyword.upper()+' S22E44'
66         self.tdef = TorrentDef()
67         self.tdef.set_tracker('http://localhost:0/announce')
68         self.tdef.set_piece_length(2 ** 15)
69         self.tdef.create_live(self.content_name,2 ** 16)
70         self.tdef.finalize()
71         
72         # 1. First connect to Tribler
73         self.openconn = OLConnection(self.my_keypair,'localhost',self.hisport)
74         sleep(3)
75         
76         # 2. Make Tribler send query
77         self.query = query
78         self.session.query_connected_peers(query,self.query_usercallback,max_peers_to_query=10)
79
80     def query_usercallback(self,permid,query,hits):
81         
82         print >>sys.stderr,"test: query_usercallback:",`permid`,`query`,`hits`
83         
84         self.assert_(query == self.query)
85         self.assert_(permid == self.my_permid)
86         self.check_good_qreply(hits)
87         
88         # TODO: if SIMPLE+METADATA: check torrent now in db.
89         
90
91     #
92     # Good SIMPLE QUERY, builds on TestQueryReply code
93     #    
94     def singtest_good_simple_reply(self):
95         self.pretest_simple('hallo')
96         self._test_qreply(self.create_good_simple_reply,True)
97
98     #
99     # Good SIMPLE+METADATA QUERY, builds on TestQueryReply code
100     #    
101     def singtest_good_simpleplustorrents_reply(self):
102         self.pretest_simpleplustorrents('hallo')
103         self._test_qreply(self.create_good_simpleplustorrents_reply,True)
104
105
106     #
107     # Good SIMPLE QUERY Unicode, builds on TestQueryReply code
108     #    
109     def singtest_good_simple_reply_unicode(self):
110         self.pretest_simple(u'Ch\u00e8rie')
111         self._test_qreply(self.create_good_simple_reply,True)
112
113     #
114     # Good SIMPLE+METADATA QUERY Unicode, builds on TestQueryReply code
115     #    
116     def singtest_good_simpleplustorrents_reply_unicode(self):
117         self.pretest_simpleplustorrents(u'Ch\u00e8rie')
118         self._test_qreply(self.create_good_simpleplustorrents_reply,True)
119
120
121     #
122     # Bad QUERY, builds on TestQueryReply code
123     #    
124     def singtest_bad_not_bdecodable(self):
125         self.pretest_simple('hallo')
126         self._test_qreply(self.create_not_bdecodable,False)
127
128     #
129     # Bad SIMPLE+METADATA QUERY, builds on TestQueryReply code
130     #    
131     def singtest_bad_not_bdecodable_torrentfile(self):
132         self.pretest_simpleplustorrents('hallo')
133         self._test_qreply(self.create_not_bdecodable_torrentfile,False)
134
135
136     ### TODO: send different valid answers so consensus not reached
137
138     #
139     # Main test code
140     #
141     def _test_qreply(self,gen_qreply,good):
142         print >> sys.stderr,"test: waiting for reply"
143         s = self.openconn
144
145         msg = s.recv()
146         self.assert_(len(msg) > 0)
147         print >> sys.stderr,"test: Received overlay message",getMessageName(msg[0])
148         self.assert_(msg[0] == QUERY)
149         id = self.check_rquery(msg[1:])
150         
151         resp = gen_qreply(id)
152         print >> sys.stderr,"test: sending QUERY_REPLY"
153         s.send(resp)
154         if good:
155             time.sleep(10)
156             # the other side should not have closed the connection, as
157             # this is all valid, so this should not throw an exception:
158             s.send('bla')
159             s.close()
160         else:
161             # the other side should not like this and close the connection
162             self.assert_(len(s.recv())==0)
163             s.close()
164
165
166     def create_good_simple_reply_dict(self,id):
167         r = {}
168         r['content_name'] = self.content_name.encode("UTF-8")
169         r['length'] = LENGTH
170         r['leecher'] = LEECHERS
171         r['seeder'] = SEEDERS
172         r['category'] = CATEGORY
173         # OLPROTO_PROTO_ELEVENTH
174         # set later r['torrent_size'] = 42
175         r['channel_permid'] = '$' * 83
176         r['channel_name'] = 'Nitin Channel' 
177         
178         d2 = {}
179         d2[self.tdef.get_infohash()] = r
180         
181         d = {}
182         d['id'] = id
183         d['a'] = d2
184         return d
185         
186     def create_good_simple_reply(self,id):
187         d = self.create_good_simple_reply_dict(id)
188         bmetainfo = bencode(self.tdef.get_metainfo())
189         d['a'][self.tdef.get_infohash()]['torrent_size'] = len(bmetainfo) 
190         b = bencode(d)
191         return QUERY_REPLY+b
192
193     def create_good_simpleplustorrents_reply(self,id):
194         d = self.create_good_simple_reply_dict(id)
195         bmetainfo = bencode(self.tdef.get_metainfo())
196         d['a'][self.tdef.get_infohash()]['torrent_size'] = len(bmetainfo)
197         d['a'][self.tdef.get_infohash()]['metatype'] = 'application/x-tribler-stream' 
198         d['a'][self.tdef.get_infohash()]['metadata'] = bmetainfo 
199         b = bencode(d)
200         return QUERY_REPLY+b
201
202     
203
204     def check_good_qreply(self,hits):
205         self.assert_(len(hits) == 1)
206         self.assert_(hits.keys()[0] == self.tdef.get_infohash())
207         hit = hits[self.tdef.get_infohash()]
208         self.assert_(hit['content_name'] == self.content_name)
209         self.assert_(hit['length'] == LENGTH)
210         self.assert_(hit['leecher'] == LEECHERS)
211         self.assert_(hit['seeder'] == SEEDERS)
212         self.assert_(hit['category'] ==  CATEGORY)
213     
214         # OLPROTO_VERSION_ELEVENTH
215         bmetainfo = bencode(self.tdef.get_metainfo())
216         self.assert_(hit['torrent_size'] == len(bmetainfo))
217         if self.query.startswith('SIMPLE+METADATA'):
218             self.assert_(hit['metadata'] == bmetainfo)
219
220     def create_not_bdecodable(self,id):
221         return QUERY_REPLY+"bla"
222
223     def create_not_bdecodable_torrentfile(self,id):
224         d = self.create_good_simple_reply_dict(id)
225         d['a'][self.tdef.get_infohash()]['torrent_size'] = 3 # consistent with metadata. Should be named "metasize"
226         d['a'][self.tdef.get_infohash()]['metadata'] = 'bla'
227         b = bencode(d)
228         return QUERY_REPLY+b
229
230     def check_rquery(self,data):
231         d = bdecode(data)
232         self.assert_(type(d) == DictType)
233         self.assert_(d.has_key('q'))
234         q = d['q']
235         self.assert_(type(q) == StringType)
236         self.assert_(d.has_key('id'))
237         id = d['id']
238         self.assert_(type(id) == StringType)
239
240         self.assert_(q == self.query.encode("UTF-8"))
241         return d['id']
242
243
244 def test_suite():
245     suite = unittest.TestSuite()
246     # We should run the tests in a separate Python interpreter to prevent 
247     # problems with our singleton classes, e.g. SuperPeerDB, etc.
248     if len(sys.argv) != 2:
249         print "Usage: python test_rquery_active_reply.py <method name>"
250     else:
251         suite.addTest(TestQueryReplyActive(sys.argv[1]))
252     
253     return suite
254
255 def main():
256     unittest.main(defaultTest='test_suite',argv=[sys.argv[0]])
257     
258 if __name__ == "__main__":
259     main()