instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Test / test_social_overlap.py
1 # Written by Arno Bakker
2 # see LICENSE.txt for license information
3
4 import unittest
5 import os
6 import sys
7 import wx
8 import time
9 from BaseLib.Core.Utilities.Crypto import sha
10 from types import StringType, DictType
11 from threading import Thread
12 from M2Crypto import Rand,EC
13
14 from BaseLib.Test.test_as_server import TestAsServer
15 from olconn import OLConnection
16 from BaseLib.Core.BitTornado.bencode import bencode,bdecode
17 from BaseLib.Core.BitTornado.BT1.MessageID import *
18
19 from BaseLib.Core.SocialNetwork.OverlapMsgHandler import ICON_MAX_SIZE
20
21 DEBUG=True
22
23 class wxServer(Thread):
24     def __init__(self):
25         Thread.__init__(self)
26         self.setDaemon(True)
27         
28         app = wx.App(0)
29         app.MainLoop()
30
31
32 class TestSocialOverlap(TestAsServer):
33     """ 
34     Testing SOCIAL_OVERLAP message of Social Network extension V1
35     """
36     
37     def setUp(self):
38         """ override TestAsServer """
39         TestAsServer.setUp(self)
40         Rand.load_file('randpool.dat', -1)
41
42     def setUpPreSession(self):
43         """ override TestAsServer """
44         TestAsServer.setUpPreSession(self)
45         # Enable social networking
46         self.config.set_social_networking(True)
47
48         # Give him a usericon to send
49         fn = self.make_filename('usericon-ok.jpg')
50         f = open(fn,"rb")
51         data = f.read()
52         f.close()
53         self.config.set_mugshot(data,'image/jpeg')
54
55
56     def setUpPostSession(self):
57         """ override TestAsServer """
58         TestAsServer.setUpPostSession(self)
59
60         self.mypermid = str(self.my_keypair.pub().get_der())
61         self.hispermid = str(self.his_keypair.pub().get_der())        
62         self.myhash = sha(self.mypermid).digest()
63
64
65
66     def tearDown(self):
67         """ override TestAsServer """
68         TestAsServer.tearDown(self)
69         try:
70             os.remove('randpool.dat')
71         except:
72             pass
73
74     def test_all(self):
75         """ 
76             I want to start a Tribler client once and then connect to
77             it many times. So there must be only one test method
78             to prevent setUp() from creating a new client every time.
79
80             The code is constructed so unittest will show the name of the
81             (sub)test where the error occured in the traceback it prints.
82         """
83         # 1. test good SOCIAL_OVERLAP
84         #self.subtest_good_soverlap('Beurre Alexander Lucas')
85         self.subtest_good_soverlap(u'esta\xe7\xe3o04')
86
87         # 2. test various bad SOCIAL_OVERLAP messages
88         self.subtest_bad_not_bdecodable()
89         self.subtest_bad_not_dict1()
90         self.subtest_bad_not_dict2()
91         self.subtest_bad_empty_dict()
92         self.subtest_bad_wrong_dict_keys()
93
94         self.subtest_bad_persinfo()
95
96
97     #
98     # Good SOCIAL_OVERLAP
99     #
100     def subtest_good_soverlap(self,name):
101         """ 
102             test good SOCIAL_OVERLAP messages
103         """
104         print >>sys.stderr,"test: good SOCIAL_OVERLAP"
105         s = OLConnection(self.my_keypair,'localhost',self.hisport)
106         msg = self.create_good_soverlap(name)
107         s.send(msg)
108         resp = s.recv()
109         self.assert_(resp[0] == SOCIAL_OVERLAP)
110         self.check_soverlap(resp[1:])
111         time.sleep(10)
112         # the other side should not have closed the connection, as
113         # this is all valid, so this should not throw an exception:
114         s.send('bla')
115         s.close()
116
117     def create_good_soverlap(self,name=None):
118         d = {}
119         [pi_sig,pi] = self.create_good_persinfo(name)
120
121         d['persinfo'] = pi
122         return self.create_payload(d)
123
124     def create_good_persinfo(self,name=None):
125         pi = {}
126         if name is not None:
127             pi['name'] = name.encode("UTF-8")
128         else:
129             pi['name'] = 'Beurre Alexander Lucas'
130         pi['icontype'] = 'image/jpeg'
131         pi['icondata'] = self.read_usericon_ok()
132         sig = None
133         return [sig,pi]
134
135     def read_usericon_ok(self):
136         return self.read_file(self.make_filename('usericon-ok.jpg'))
137
138     def make_filename(self,filename):
139         """ Test assume to be run from new Tribler/Test """
140         return filename
141
142     def read_file(self,filename):
143         f = open( filename, 'rb')
144         data = f.read()
145         f.close()
146         return data
147     
148     def create_payload(self,r):
149         return SOCIAL_OVERLAP+bencode(r)
150
151     def check_soverlap(self,data):
152         d = bdecode(data)
153         self.assert_(type(d) == DictType)
154         self.assert_(d.has_key('persinfo'))
155         self.check_persinfo(d['persinfo'])
156
157     def check_persinfo(self,d):
158         self.assert_(type(d) == DictType)
159         print >>sys.stderr,"test: persinfo: keys is",d.keys()
160
161         self.assert_(d.has_key('name'))
162         self.assert_(isinstance(d['name'],str))
163         self.assert_(d.has_key('icontype'))
164         if d.has_key('icontype'):
165             print >>sys.stderr,"test: persinfo: HAS ICON"
166         self.assert_(d.has_key('icondata'))
167         self.check_usericon(d['icontype'],d['icondata'])
168
169     def check_usericon(self,icontype,icondata):
170         self.assert_(type(icontype) == StringType)
171         self.assert_(type(icondata) == StringType)
172         idx = icontype.find('/')
173         ridx = icontype.rfind('/')
174         self.assert_(idx != -1)
175         self.assert_(idx == ridx)
176         self.assert_(len(icondata) <= ICON_MAX_SIZE)
177         print >>sys.stderr,"test: check_usericon: len icon is",len(icondata)
178
179     # Bad soverlap
180     #    
181     def subtest_bad_not_bdecodable(self):
182         self._test_bad(self.create_not_bdecodable)
183
184     def subtest_bad_not_dict1(self):
185         self._test_bad(self.create_not_dict1)
186
187     def subtest_bad_not_dict2(self):
188         self._test_bad(self.create_not_dict2)
189
190     def subtest_bad_empty_dict(self):
191         self._test_bad(self.create_empty_dict)
192
193     def subtest_bad_wrong_dict_keys(self):
194         self._test_bad(self.create_wrong_dict_keys)
195
196     #
197     # Bad 'persinfo' 
198     #
199     def subtest_bad_persinfo(self):
200         """ Cut a corner """
201         methods = [
202             self.make_persinfo_not_dict1,
203             self.make_persinfo_not_dict2,
204             self.make_persinfo_empty_dict,
205             self.make_persinfo_wrong_dict_keys,
206             self.make_persinfo_name_not_str,
207             self.make_persinfo_icontype_not_str,
208             self.make_persinfo_icontype_noslash,
209             self.make_persinfo_icondata_not_str,
210             self.make_persinfo_icondata_too_big ]
211         for method in methods:
212             # Hmmm... let's get dirty
213             print >> sys.stderr,"\ntest: ",method,
214             func = lambda: self.create_bad_persinfo(method)
215             self._test_bad(func)
216
217     def _test_bad(self,gen_soverlap_func):
218         print >>sys.stderr,"test: bad SOCIAL_OVERLAP",gen_soverlap_func
219         s = OLConnection(self.my_keypair,'localhost',self.hisport)
220         msg = gen_soverlap_func()
221         s.send(msg)
222         time.sleep(5)
223         # the other side should not like this and close the connection
224         self.assert_(len(s.recv())==0)
225         s.close()
226
227     def create_not_bdecodable(self):
228         return SOCIAL_OVERLAP+"bla"
229
230     def create_not_dict1(self):
231         soverlap = 481
232         return self.create_payload(soverlap)
233
234     def create_not_dict2(self):
235         soverlap = []
236         return self.create_payload(soverlap)
237
238     def create_empty_dict(self):
239         soverlap = {}
240         return self.create_payload(soverlap)
241
242     def create_wrong_dict_keys(self):
243         soverlap = {}
244         soverlap['bla1'] = '\x00\x00\x00\x00\x00\x30\x00\x00'
245         soverlap['bla2'] = '\x00\x00\x00\x00\x00\x30\x00\x00'
246         return self.create_payload(soverlap)
247
248
249     #
250     # Bad persinfo
251     #
252     def create_bad_persinfo(self,gen_persinfo_func):
253         soverlap = {}
254         pi = gen_persinfo_func()
255         soverlap['persinfo'] = pi
256         return self.create_payload(soverlap)
257
258     def make_persinfo_not_dict1(self):
259         return 481
260
261     def make_persinfo_not_dict2(self):
262         return []
263
264     def make_persinfo_empty_dict(self):
265         return {}
266
267     def make_persinfo_wrong_dict_keys(self):
268         pi = {}
269         pi['bla1'] = '\x00\x00\x00\x00\x00\x30\x00\x00'
270         pi['bla2'] = '\x00\x00\x00\x00\x00\x30\x00\x00'
271         return pi
272
273     def make_persinfo_name_not_str(self):
274         [sig,pi] = self.create_good_persinfo()
275         pi['name'] = 481
276         return pi
277
278     def make_persinfo_icontype_not_str(self):
279         [sig,pi] = self.create_good_persinfo()
280         pi['icontype'] = 481
281         return pi
282
283     def make_persinfo_icontype_noslash(self):
284         [sig,pi] = self.create_good_persinfo()
285         pi['icontype'] = 'image#jpeg'
286         return pi
287
288     def make_persinfo_icondata_not_str(self):
289         [sig,pi] = self.create_good_persinfo()
290         pi['icondata'] = 481
291         return pi
292
293     def make_persinfo_icondata_too_big(self):
294         [sig,pi] = self.create_good_persinfo()
295         pi['icondata'] = "".zfill(ICON_MAX_SIZE+100)
296         return pi
297
298 def test_suite():
299     suite = unittest.TestSuite()
300     suite.addTest(unittest.makeSuite(TestSocialOverlap))
301     
302     return suite
303
304 def sign_data(plaintext,keypair):
305     digest = sha(plaintext).digest()
306     return keypair.sign_dsa_asn1(digest)
307
308 def verify_data(plaintext,permid,blob):
309     pubkey = EC.pub_key_from_der(permid)
310     digest = sha(plaintext).digest()
311     return pubkey.verify_dsa_asn1(digest,blob)
312
313
314 if __name__ == "__main__":
315     unittest.main()
316