instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Test / test_closedswarm.py
1 # Written by Njaal Borch
2 # see LICENSE.txt for license information
3 #
4 import time
5 from base64 import encodestring,decodestring
6
7 import unittest
8
9 import os.path
10 from BaseLib.Core.Overlay import permid
11 from BaseLib.Core.BitTornado.BT1.MessageID import *
12
13 from BaseLib.Core.ClosedSwarm import ClosedSwarm
14
15 class ClosedSwarmTest(unittest.TestCase):
16
17     def setUp(self):
18         self.keyfiles = [".node_a_keypair",".node_b_keypair",".torrent_keypair"]
19         for filename in self.keyfiles:
20             if not os.path.exists(filename):
21                 keypair = permid.generate_keypair()
22                 permid.save_keypair(keypair, filename)
23                 
24         self.node_a_keypair = permid.read_keypair(".node_a_keypair")
25         self.node_b_keypair = permid.read_keypair(".node_b_keypair")
26         self.torrent_keypair = permid.read_keypair(".torrent_keypair")
27
28         self.torrent_id = "1234"
29
30         # Shortcuts
31         self.node_a_pub_permid = str(self.node_a_keypair.pub().get_der())
32         self.node_b_pub_permid = str(self.node_b_keypair.pub().get_der())
33         self.torrent_pubkeys = [encodestring(str(self.torrent_keypair.pub().get_der())).replace("\n","")]
34         
35         # Create the certificate for this torrent ("proof of access")
36         self.poa_a = ClosedSwarm.create_poa(self.torrent_id,
37                                             self.torrent_keypair,
38                                             self.node_a_pub_permid)
39
40         self.poa_b = ClosedSwarm.create_poa(self.torrent_id,
41                                             self.torrent_keypair,
42                                             self.node_b_pub_permid)
43         
44         self.cs_a = ClosedSwarm.ClosedSwarm(self.node_a_keypair,
45                                             self.torrent_id,
46                                             self.torrent_pubkeys,
47                                             self.poa_a)
48         
49         self.cs_b = ClosedSwarm.ClosedSwarm(self.node_b_keypair,
50                                             self.torrent_id,
51                                             self.torrent_pubkeys,
52                                             self.poa_b)
53
54
55     def tearDown(self):
56         for filename in self.keyfiles:
57             try:
58                 os.remove(filename)
59             except:
60                 pass
61
62     def _verify_poas(self, poa_a, poa_b):
63         self.assertEquals(poa_a.torrent_id, poa_b.torrent_id)
64         self.assertEquals(poa_a.torrent_pub_key, poa_b.torrent_pub_key)
65         self.assertEquals(poa_a.node_pub_key, poa_b.node_pub_key)
66         self.assertEquals(poa_a.signature, poa_b.signature)
67         self.assertEquals(poa_a.expire_time, poa_b.expire_time)
68         
69     def test_poa_serialization(self):
70
71
72         serialized = self.poa_a.serialize()
73         deserialized = ClosedSwarm.POA.deserialize(serialized)
74         self._verify_poas(self.poa_a, deserialized)
75         deserialized.verify()
76
77         self.poa_a.save("poa.tmp")
78         new_poa = ClosedSwarm.POA.load("poa.tmp")
79         new_poa.verify()
80         
81         # Also serialize/deserialize using lists
82         serialized = self.poa_a.serialize_to_list()
83         deserialized = self.poa_a.deserialize_from_list(serialized)
84         self._verify_poas(self.poa_a, deserialized)
85         deserialized.verify()
86         
87         
88     def test_poa(self):
89         self.poa_a.verify()
90         self.poa_b.verify()
91
92
93         # Test poa expiretime
94         expire_time = time.mktime(time.gmtime())+60 # Expire in one minute
95         
96         self.poa_a = ClosedSwarm.create_poa(self.torrent_id,
97                                             self.torrent_keypair,
98                                             self.node_a_pub_permid,
99                                             expire_time=expire_time)
100         try:
101             self.poa_a.verify()
102         except ClosedSwarm.POAExpiredException:
103             self.fail("POA verify means expired, but it is not")
104
105         expire_time = time.mktime(time.gmtime())-1 # Expire one second ago
106         
107         self.poa_a = ClosedSwarm.create_poa(self.torrent_id,
108                                             self.torrent_keypair,
109                                             self.node_a_pub_permid,
110                                             expire_time=expire_time)
111         try:
112             self.poa_a.verify()
113             self.fail("POA verify does not honor expire time")
114         except ClosedSwarm.POAExpiredException:
115             pass
116
117
118     def test_basic(self):
119         self.assertFalse(self.cs_a.remote_node_authorized)
120         self.assertFalse(self.cs_b.remote_node_authorized)
121
122     def test_node_a_valid(self):
123         """
124         Test that the protocol works if only node A wants to be authorized
125         """
126         msg_1 = self.cs_a.a_create_challenge()
127         msg_2 = self.cs_b.b_create_challenge(msg_1)
128         msg_3 = self.cs_a.a_provide_poa_message(msg_2)
129         msg_4 = self.cs_b.b_provide_poa_message(msg_3, i_am_seeding=True)
130         if msg_4:
131             self.fail("Made POA message for node B even though it is seeding")
132
133         self.assertFalse(self.cs_a.is_remote_node_authorized())
134         self.assertTrue(self.cs_b.is_remote_node_authorized())
135
136
137     def test_poa_message_creation(self):
138         
139         msg_1 = self.cs_a.a_create_challenge()
140         msg_2 = self.cs_b.b_create_challenge(msg_1)
141
142         
143         msg = self.cs_a._create_poa_message(CS_POA_EXCHANGE_A, self.cs_a.my_nonce, self.cs_b.my_nonce)
144         try:
145             self.cs_a._validate_poa_message(msg, self.cs_a.my_nonce, self.cs_b.my_nonce)
146         except Exception,e:
147             self.fail("_create_poa_message and _validate_poa_message do not agree: %s"%e)
148
149
150     def test_both_valid(self):
151         """
152         Test that the protocol works if both nodes wants to be authorized
153         """
154         msg_1 = self.cs_a.a_create_challenge()
155         nonce_a = self.cs_a.my_nonce
156         
157         msg_2 = self.cs_b.b_create_challenge(msg_1)
158         nonce_b = self.cs_b.my_nonce
159
160         msg_3 = self.cs_a.a_provide_poa_message(msg_2)
161
162         self.assertEquals(self.cs_a.remote_nonce, nonce_b, "A's remote nonce is wrong")
163             
164         msg_4 = self.cs_b.b_provide_poa_message(msg_3)
165         self.assertEquals(self.cs_b.remote_nonce, nonce_a, "B's remote nonce is wrong")
166
167         self.assertEquals(self.cs_a.my_nonce, self.cs_b.remote_nonce, "B's remote nonce is not A's nonce")
168         self.assertEquals(self.cs_a.remote_nonce, self.cs_b.my_nonce, "A's remote nonce is not B's nonce")
169
170         self.cs_a.a_check_poa_message(msg_4)
171         
172         
173         self.assertTrue(self.cs_a.is_remote_node_authorized())
174         self.assertTrue(self.cs_b.is_remote_node_authorized())
175
176         
177     def test_not_fresh_node_a(self):
178
179         msg_1 = self.cs_a.a_create_challenge()
180         bad_msg_1 = [CS_CHALLENGE_A,
181                      self.torrent_id,
182                      "badchallenge_a"]
183         msg_2 = self.cs_b.b_create_challenge(bad_msg_1)
184         msg_3 = self.cs_a.a_provide_poa_message(msg_2)
185         msg_4 = self.cs_b.b_provide_poa_message(msg_3)
186         try:
187             self.cs_a.a_check_poa_message(msg_4)
188             self.fail("Did not discover bad signature")
189         except ClosedSwarm.InvalidSignatureException,e:
190             pass
191
192         # Nobody can succeed now, the challenges are bad
193         self.assertFalse(self.cs_a.is_remote_node_authorized())
194         self.assertFalse(self.cs_b.is_remote_node_authorized())
195         
196
197     def test_not_fresh_node_b(self):
198
199         msg_1 = self.cs_a.a_create_challenge()
200         msg_2 = self.cs_b.b_create_challenge(msg_1)
201         bad_msg_2 = [CS_CHALLENGE_B,
202                      self.torrent_id,
203                      "badchallenge_b"]
204         msg_3 = self.cs_a.a_provide_poa_message(bad_msg_2)
205         msg_4 = self.cs_b.b_provide_poa_message(msg_3)
206         try:
207             self.cs_a.a_check_poa_message(msg_4)
208             self.fail("Failed to discover bad POA from B")
209         except:
210             pass
211
212         # Nobody can succeed now, the challenges are bad
213         self.assertFalse(self.cs_a.is_remote_node_authorized())
214         self.assertFalse(self.cs_b.is_remote_node_authorized())
215
216
217     def test_invalid_poa_node_a(self):
218
219         self.cs_a.poa = ClosedSwarm.POA("bad_poa_a", "stuff", "stuff2")
220
221         # Update to a bad POA
222         msg_1 = self.cs_a.a_create_challenge()
223         msg_2 = self.cs_b.b_create_challenge(msg_1)
224         msg_3 = self.cs_a.a_provide_poa_message(msg_2)
225         msg_4 = self.cs_b.b_provide_poa_message(msg_3)
226         self.cs_a.a_check_poa_message(msg_4)
227         
228         self.assertTrue(self.cs_a.is_remote_node_authorized())
229         self.assertFalse(self.cs_b.is_remote_node_authorized())
230         
231
232     def test_very_invalid_poa_node_a(self):
233
234         # Update to a bad POA
235         try:
236             self.cs_a.set_poa("Wrong class!")
237             self.fail("Allows a string as POA!")
238         except:
239             pass
240
241     def test_invalid_swarm_node_b(self):
242
243         # Update to a bad POA
244         self.cs_b.poa = ClosedSwarm.POA("bad_poa_b", "stuff", "stuff2")
245
246         # Update to a bad POA
247         msg_1 = self.cs_a.a_create_challenge()
248         msg_2 = self.cs_b.b_create_challenge(msg_1)
249         msg_3 = self.cs_a.a_provide_poa_message(msg_2)
250         msg_4 = self.cs_b.b_provide_poa_message(msg_3)
251         try:
252             self.cs_a.a_check_poa_message(msg_4)
253             self.fail("Node B failed to discover bad POA")
254         except ClosedSwarm.WrongSwarmException,e:
255             pass
256
257     def test_invalid_poa_node_b(self):
258         self.cs_b.poa = ClosedSwarm.POA(self.torrent_id, "stuff", "stuff2")
259
260         # Update to a bad POA
261         msg_1 = self.cs_a.a_create_challenge()
262         msg_2 = self.cs_b.b_create_challenge(msg_1)
263         msg_3 = self.cs_a.a_provide_poa_message(msg_2)
264         msg_4 = self.cs_b.b_provide_poa_message(msg_3)
265         try:
266             self.cs_a.a_check_poa_message(msg_4)
267             self.fail("Node B failed to discover bad POA")
268         except ClosedSwarm.InvalidPOAException,e:
269             pass
270
271     
272 if __name__ == "__main__":
273
274     print "Performing ClosedSwarm unit tests"
275
276
277     unittest.main()
278
279     print "All done"
280