1 # Written by Njaal Borch
2 # see LICENSE.txt for license information
5 from base64 import encodestring,decodestring
10 from BaseLib.Core.Overlay import permid
11 from BaseLib.Core.BitTornado.BT1.MessageID import *
13 from BaseLib.Core.ClosedSwarm import ClosedSwarm
15 class ClosedSwarmTest(unittest.TestCase):
18 self.keyfiles = [".node_a_keypair",".node_b_keypair",".torrent_keypair"]
19 for filename in self.keyfiles:
20 if not os.path.exists(filename):
21 keypair = permid.generate_keypair()
22 permid.save_keypair(keypair, filename)
24 self.node_a_keypair = permid.read_keypair(".node_a_keypair")
25 self.node_b_keypair = permid.read_keypair(".node_b_keypair")
26 self.torrent_keypair = permid.read_keypair(".torrent_keypair")
28 self.torrent_id = "1234"
31 self.node_a_pub_permid = str(self.node_a_keypair.pub().get_der())
32 self.node_b_pub_permid = str(self.node_b_keypair.pub().get_der())
33 self.torrent_pubkeys = [encodestring(str(self.torrent_keypair.pub().get_der())).replace("\n","")]
35 # Create the certificate for this torrent ("proof of access")
36 self.poa_a = ClosedSwarm.create_poa(self.torrent_id,
38 self.node_a_pub_permid)
40 self.poa_b = ClosedSwarm.create_poa(self.torrent_id,
42 self.node_b_pub_permid)
44 self.cs_a = ClosedSwarm.ClosedSwarm(self.node_a_keypair,
49 self.cs_b = ClosedSwarm.ClosedSwarm(self.node_b_keypair,
56 for filename in self.keyfiles:
62 def _verify_poas(self, poa_a, poa_b):
63 self.assertEquals(poa_a.torrent_id, poa_b.torrent_id)
64 self.assertEquals(poa_a.torrent_pub_key, poa_b.torrent_pub_key)
65 self.assertEquals(poa_a.node_pub_key, poa_b.node_pub_key)
66 self.assertEquals(poa_a.signature, poa_b.signature)
67 self.assertEquals(poa_a.expire_time, poa_b.expire_time)
69 def test_poa_serialization(self):
72 serialized = self.poa_a.serialize()
73 deserialized = ClosedSwarm.POA.deserialize(serialized)
74 self._verify_poas(self.poa_a, deserialized)
77 self.poa_a.save("poa.tmp")
78 new_poa = ClosedSwarm.POA.load("poa.tmp")
81 # Also serialize/deserialize using lists
82 serialized = self.poa_a.serialize_to_list()
83 deserialized = self.poa_a.deserialize_from_list(serialized)
84 self._verify_poas(self.poa_a, deserialized)
94 expire_time = time.mktime(time.gmtime())+60 # Expire in one minute
96 self.poa_a = ClosedSwarm.create_poa(self.torrent_id,
98 self.node_a_pub_permid,
99 expire_time=expire_time)
102 except ClosedSwarm.POAExpiredException:
103 self.fail("POA verify means expired, but it is not")
105 expire_time = time.mktime(time.gmtime())-1 # Expire one second ago
107 self.poa_a = ClosedSwarm.create_poa(self.torrent_id,
108 self.torrent_keypair,
109 self.node_a_pub_permid,
110 expire_time=expire_time)
113 self.fail("POA verify does not honor expire time")
114 except ClosedSwarm.POAExpiredException:
118 def test_basic(self):
119 self.assertFalse(self.cs_a.remote_node_authorized)
120 self.assertFalse(self.cs_b.remote_node_authorized)
122 def test_node_a_valid(self):
124 Test that the protocol works if only node A wants to be authorized
126 msg_1 = self.cs_a.a_create_challenge()
127 msg_2 = self.cs_b.b_create_challenge(msg_1)
128 msg_3 = self.cs_a.a_provide_poa_message(msg_2)
129 msg_4 = self.cs_b.b_provide_poa_message(msg_3, i_am_seeding=True)
131 self.fail("Made POA message for node B even though it is seeding")
133 self.assertFalse(self.cs_a.is_remote_node_authorized())
134 self.assertTrue(self.cs_b.is_remote_node_authorized())
137 def test_poa_message_creation(self):
139 msg_1 = self.cs_a.a_create_challenge()
140 msg_2 = self.cs_b.b_create_challenge(msg_1)
143 msg = self.cs_a._create_poa_message(CS_POA_EXCHANGE_A, self.cs_a.my_nonce, self.cs_b.my_nonce)
145 self.cs_a._validate_poa_message(msg, self.cs_a.my_nonce, self.cs_b.my_nonce)
147 self.fail("_create_poa_message and _validate_poa_message do not agree: %s"%e)
150 def test_both_valid(self):
152 Test that the protocol works if both nodes wants to be authorized
154 msg_1 = self.cs_a.a_create_challenge()
155 nonce_a = self.cs_a.my_nonce
157 msg_2 = self.cs_b.b_create_challenge(msg_1)
158 nonce_b = self.cs_b.my_nonce
160 msg_3 = self.cs_a.a_provide_poa_message(msg_2)
162 self.assertEquals(self.cs_a.remote_nonce, nonce_b, "A's remote nonce is wrong")
164 msg_4 = self.cs_b.b_provide_poa_message(msg_3)
165 self.assertEquals(self.cs_b.remote_nonce, nonce_a, "B's remote nonce is wrong")
167 self.assertEquals(self.cs_a.my_nonce, self.cs_b.remote_nonce, "B's remote nonce is not A's nonce")
168 self.assertEquals(self.cs_a.remote_nonce, self.cs_b.my_nonce, "A's remote nonce is not B's nonce")
170 self.cs_a.a_check_poa_message(msg_4)
173 self.assertTrue(self.cs_a.is_remote_node_authorized())
174 self.assertTrue(self.cs_b.is_remote_node_authorized())
177 def test_not_fresh_node_a(self):
179 msg_1 = self.cs_a.a_create_challenge()
180 bad_msg_1 = [CS_CHALLENGE_A,
183 msg_2 = self.cs_b.b_create_challenge(bad_msg_1)
184 msg_3 = self.cs_a.a_provide_poa_message(msg_2)
185 msg_4 = self.cs_b.b_provide_poa_message(msg_3)
187 self.cs_a.a_check_poa_message(msg_4)
188 self.fail("Did not discover bad signature")
189 except ClosedSwarm.InvalidSignatureException,e:
192 # Nobody can succeed now, the challenges are bad
193 self.assertFalse(self.cs_a.is_remote_node_authorized())
194 self.assertFalse(self.cs_b.is_remote_node_authorized())
197 def test_not_fresh_node_b(self):
199 msg_1 = self.cs_a.a_create_challenge()
200 msg_2 = self.cs_b.b_create_challenge(msg_1)
201 bad_msg_2 = [CS_CHALLENGE_B,
204 msg_3 = self.cs_a.a_provide_poa_message(bad_msg_2)
205 msg_4 = self.cs_b.b_provide_poa_message(msg_3)
207 self.cs_a.a_check_poa_message(msg_4)
208 self.fail("Failed to discover bad POA from B")
212 # Nobody can succeed now, the challenges are bad
213 self.assertFalse(self.cs_a.is_remote_node_authorized())
214 self.assertFalse(self.cs_b.is_remote_node_authorized())
217 def test_invalid_poa_node_a(self):
219 self.cs_a.poa = ClosedSwarm.POA("bad_poa_a", "stuff", "stuff2")
221 # Update to a bad POA
222 msg_1 = self.cs_a.a_create_challenge()
223 msg_2 = self.cs_b.b_create_challenge(msg_1)
224 msg_3 = self.cs_a.a_provide_poa_message(msg_2)
225 msg_4 = self.cs_b.b_provide_poa_message(msg_3)
226 self.cs_a.a_check_poa_message(msg_4)
228 self.assertTrue(self.cs_a.is_remote_node_authorized())
229 self.assertFalse(self.cs_b.is_remote_node_authorized())
232 def test_very_invalid_poa_node_a(self):
234 # Update to a bad POA
236 self.cs_a.set_poa("Wrong class!")
237 self.fail("Allows a string as POA!")
241 def test_invalid_swarm_node_b(self):
243 # Update to a bad POA
244 self.cs_b.poa = ClosedSwarm.POA("bad_poa_b", "stuff", "stuff2")
246 # Update to a bad POA
247 msg_1 = self.cs_a.a_create_challenge()
248 msg_2 = self.cs_b.b_create_challenge(msg_1)
249 msg_3 = self.cs_a.a_provide_poa_message(msg_2)
250 msg_4 = self.cs_b.b_provide_poa_message(msg_3)
252 self.cs_a.a_check_poa_message(msg_4)
253 self.fail("Node B failed to discover bad POA")
254 except ClosedSwarm.WrongSwarmException,e:
257 def test_invalid_poa_node_b(self):
258 self.cs_b.poa = ClosedSwarm.POA(self.torrent_id, "stuff", "stuff2")
260 # Update to a bad POA
261 msg_1 = self.cs_a.a_create_challenge()
262 msg_2 = self.cs_b.b_create_challenge(msg_1)
263 msg_3 = self.cs_a.a_provide_poa_message(msg_2)
264 msg_4 = self.cs_b.b_provide_poa_message(msg_3)
266 self.cs_a.a_check_poa_message(msg_4)
267 self.fail("Node B failed to discover bad POA")
268 except ClosedSwarm.InvalidPOAException,e:
272 if __name__ == "__main__":
274 print "Performing ClosedSwarm unit tests"