instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Test / test_g2g.py
1 # Written by Arno Bakker
2 # see LICENSE.txt for license information
3 #
4
5 import unittest
6 import os
7 import sys
8 import time
9 import socket
10 from traceback import print_exc
11 from types import DictType,StringType,IntType
12
13 from BaseLib.Test.test_as_server import TestAsServer
14 from olconn import OLConnection
15 from btconn import BTConnection
16 from BaseLib.Core.TorrentDef import TorrentDef
17 from BaseLib.Core.DownloadConfig import DownloadStartupConfig
18 from BaseLib.Core.BitTornado.bencode import bencode,bdecode
19 from BaseLib.Core.BitTornado.BT1.MessageID import *
20 from BaseLib.Core.DownloadConfig import *
21
22 DEBUG=True
23 G2G_ID = 235
24
25 class TestG2G(TestAsServer):
26     """ 
27     Testing EXTEND G2G message V2:
28
29     See BitTornado/BT1/Connecter.py
30     """
31
32     def setUp(self):
33         """ override TestAsServer """
34         TestAsServer.setUp(self)
35         print >>sys.stderr,"test: Giving MyLaunchMany time to startup"
36         time.sleep(3)
37         print >>sys.stderr,"test: MyLaunchMany should have started up"
38     
39     def setUpPostSession(self):
40         """ override TestAsServer """
41         TestAsServer.setUpPostSession(self)
42
43         # Let Tribler start downloading an non-functioning torrent, so
44         # we can talk to a normal download engine.
45         
46         self.torrentfn = os.path.join('extend_hs_dir','dummydata.merkle.torrent')
47         tdef = TorrentDef.load(self.torrentfn)
48
49         dscfg = DownloadStartupConfig()
50         dscfg.set_dest_dir(self.config_path)
51         dscfg.set_video_event_callback(self.vod_ready_callback)
52         
53         self.d = self.session.start_download(tdef,dscfg)
54         
55         # This is the infohash of the torrent in test/extend_hs_dir
56         self.infohash = '\xccg\x07\xe2\x9e!]\x16\xae{\xb8\x10?\xf9\xa5\xf9\x07\xfdBk'
57         self.mylistenport = 4810
58
59     def vod_ready_callback(self,d,event,params):
60         pass
61
62     def test_all(self):
63         """ 
64             I want to start a Tribler client once and then connect to
65             it many times. So there must be only one test method
66             to prevent setUp() from creating a new client every time.
67
68             The code is constructed so unittest will show the name of the
69             (sub)test where the error occured in the traceback it prints.
70         """
71         self.subtest_good_tribler_g2g_v2()
72         self.subtest_bad_g2g_v2()
73
74     #
75     # Good g2g_v2 message
76     #
77     def subtest_good_tribler_g2g_v2(self):
78         self._test_good(self.create_good_tribler_extend_hs_v2,infohash=self.infohash)
79         
80         # We've said we're a Tribler peer, and we initiated the connection, so 
81         # now *we* should now try to establish an overlay-swarm connection.
82         s = OLConnection(self.my_keypair,'localhost',self.hisport,mylistenport=self.mylistenport)
83         # the connection should be intact, so this should not throw an
84         # exception:
85         time.sleep(5)
86         s.send('bla')
87         s.close()
88
89     def _test_good(self,msg_gen_func,options=None,infohash=None,g2g_id=G2G_ID):
90         if options is None and infohash is None:
91             s = BTConnection('localhost',self.hisport)
92         elif options is None:
93             s = BTConnection('localhost',self.hisport,user_infohash=infohash)
94         elif infohash is None:
95             s = BTConnection('localhost',self.hisport,user_option_pattern=options)
96         else:
97             s = BTConnection('localhost',self.hisport,user_option_pattern=options,user_infohash=infohash)
98             
99         if DEBUG:
100             print "test: Creating test HS message",msg_gen_func,"g2g_id",g2g_id
101         msg = msg_gen_func(g2g_id=g2g_id)
102         s.send(msg)
103         s.read_handshake_medium_rare()
104
105         # Send our g2g_v2 message to Tribler
106         msg = self.create_good_g2g_v2(g2g_id=g2g_id)
107         s.send(msg)
108         
109         time.sleep(5)
110
111         # Tribler should send an EXTEND HS message back
112         try:
113             s.s.settimeout(10.0)
114             resp = s.recv()
115             self.assert_(len(resp) > 0)
116             self.assert_(resp[0] == EXTEND)
117             self.check_tribler_extend_hs_v2(resp[1:])
118         except socket.timeout:
119             print >> sys.stderr,"test: Timeout, bad, peer didn't reply with EXTEND HS message"
120             self.assert_(False)
121
122         # Tribler should send an g2g_v2 message after a while
123         print "test: Setting 60 second timeout to see if Tribler sends periodic g2g_v2"
124         
125         # Extreme h4xor
126         connlist = self.d.sd.dow.connecter.connections.values()[:]
127         piece = '\xab' * (2 ** 14)
128         for conn in connlist:
129             conn.queue_g2g_piece_xfer(0,0,piece)
130         
131         try:
132             s.s.settimeout(70.0)
133             while True:
134                 resp = s.recv()
135                 self.assert_(len(resp) > 0)
136                 print "test: Tribler returns",getMessageName(resp[0])
137                 if resp[0] == EXTEND:
138                     self.check_g2g_v2(resp[1:],g2g_id=g2g_id)
139                     s.close()
140                     break
141         except socket.timeout:
142             print >> sys.stderr,"test: Timeout, bad, peer didn't reply with EXTEND g2g_v2 message"
143             self.assert_(False)
144
145         
146     def create_good_tribler_extend_hs_v2(self,g2g_id=G2G_ID):
147         d = {}
148         d['m'] = {'Tr_OVERLAYSWARM':253,'Tr_G2G_v2':g2g_id}
149         d['p'] = self.mylistenport
150         d['v'] = 'Tribler 4.2.0'
151         d['e'] = 0
152         bd = bencode(d)
153         return EXTEND+chr(0)+bd
154
155     def check_tribler_extend_hs_v2(self,data):
156         self.assert_(data[0] == chr(0))
157         d = bdecode(data[1:])
158         self.assert_(type(d) == DictType)
159         self.assert_('m' in d.keys())
160         m = d['m']
161         self.assert_(type(m) == DictType)
162         self.assert_('Tr_OVERLAYSWARM' in m.keys())
163         val = m['Tr_OVERLAYSWARM']
164         self.assert_(type(val) == IntType)
165         self.assert_(val == 253)
166         self.assert_('Tr_G2G_v2' in m.keys())
167         val = m['Tr_G2G_v2']
168         self.assert_(type(val) == IntType)
169         self.assert_(val == G2G_ID)
170
171     def create_good_g2g_v2(self,g2g_id=G2G_ID):
172         d = {'0':'d','1':'b'}
173         bd = bencode(d)
174         return EXTEND+chr(g2g_id)+bd
175
176     def check_g2g_v2(self,data,g2g_id):
177         self.assert_(data[0] == chr(g2g_id))
178         d = bdecode(data[1:])
179         
180         print >>sys.stderr,"test: l is",`d`
181         
182         self.assert_(type(d) == DictType)
183         for k,v in d.iteritems():
184             self.assert_(type(k) == StringType)
185             self.assert_(type(v) == StringType)
186             self.assert_(ord(k) > 0)
187             self.assert_(ord(v) <= 100)
188             
189     #
190     # Bad EXTEND handshake message
191     #    
192     def subtest_bad_g2g_v2(self):
193         methods = [self.create_empty,
194             self.create_ext_id_not_byte,
195             self.create_not_bdecodable,
196             self.create_not_dict1,
197             self.create_not_dict2,
198             self.create_key_not_int,
199             self.create_val_not_str,
200             self.create_val_too_big]
201
202         for m in methods:
203             self._test_bad(m)
204
205     #
206     # Main test code for bad EXTEND g2g_v2 messages
207     #
208     def _test_bad(self,gen_drequest_func):
209         options = '\x00\x00\x00\x00\x00\x10\x00\x00'
210         s = BTConnection('localhost',self.hisport,user_option_pattern=options,user_infohash=self.infohash)
211         print >> sys.stderr,"\ntest: ",gen_drequest_func
212         
213         hsmsg = self.create_good_tribler_extend_hs_v2()
214         s.send(hsmsg)
215         
216         msg = gen_drequest_func()
217         s.send(msg)
218         time.sleep(5)
219         
220         # the other side should not like this and close the connection
221         try:
222             s.s.settimeout(10.0)
223             s.read_handshake_medium_rare(close_ok = True)
224             while True:
225                 resp = s.recv()
226                 if len(resp) > 0:
227                     print >>sys.stderr,"test: Got",getMessageName(resp[0]),"from peer"
228                     self.assert_(resp[0] == EXTEND or resp[0]==UNCHOKE)
229                 else:
230                     self.assert_(len(resp)==0)
231                     s.close()
232                     break
233         except socket.timeout:
234             print >> sys.stderr,"test: Timeout, bad, peer didn't close connection"
235             self.assert_(False)
236
237     #
238     # Bad message creators
239     # 
240     def create_empty(self):
241         return EXTEND+chr(G2G_ID)
242
243     def create_ext_id_not_byte(self):
244         return EXTEND+'Hallo kijkbuiskinderen'
245     
246     def create_not_bdecodable(self):
247         return EXTEND+chr(G2G_ID)+"bla"
248
249     def create_not_dict1(self):
250         d = 481
251         bd = bencode(d)
252         return EXTEND+chr(G2G_ID)+bd
253
254     def create_not_dict2(self):
255         d = []
256         bd = bencode(d)
257         return EXTEND+chr(G2G_ID)+bd
258
259     def create_key_not_int(self):
260         d = {'hallo':'d'}
261         bd = bencode(d)
262         return EXTEND+chr(G2G_ID)+bd
263         
264     def create_val_not_str(self):
265         d = {'481':481}
266         bd = bencode(d)
267         return EXTEND+chr(G2G_ID)+bd
268
269     def create_val_too_big(self):
270         d = {'481':chr(129)}
271         bd = bencode(d)
272         return EXTEND+chr(G2G_ID)+bd
273
274
275 def test_suite():
276     suite = unittest.TestSuite()
277     suite.addTest(unittest.makeSuite(TestG2G))
278     
279     return suite
280
281 if __name__ == "__main__":
282     unittest.main()
283