1 # Copyright (C) 2009 Flutra Osmani, Raul Jimenez
2 # Released under GNU LGPL 2.1
3 # See LICENSE.txt for more information
5 from nose.tools import eq_, ok_, assert_raises
6 import test_const as tc
7 import logging, logging_conf
12 from routing_manager import RoutingManagerMock
15 from identifier import Id, ID_SIZE_BYTES
18 logging_conf.testing_setup(__name__)
19 logger = logging.getLogger('dht')
22 class TestLookupQueue:
25 self.lookup = lookup_manager._LookupQueue(tc.INFO_HASH_ZERO, 4)
27 def test_add_pop1(self):
28 nodes = (tc.NODES_LD_IH[157][0],
29 tc.NODES_LD_IH[158][1],
30 tc.NODES_LD_IH[154][2],
31 tc.NODES_LD_IH[159][3],
32 tc.NODES_LD_IH[158][4],
33 tc.NODES_LD_IH[152][5],)
34 self.lookup.add(nodes)
35 # Just the 4 closest nodes are added
36 #This second add doesn't affect (duplicates are ignored)
37 self.lookup.add(nodes)
38 eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[152][5])
39 eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[154][2])
40 eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[157][0])
41 eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[158][1])
42 # Now the queue is empty
43 assert_raises(IndexError, self.lookup.pop_closest_node)
44 self.lookup.add(nodes)
45 # The nodes added are ingnored
46 assert_raises(IndexError, self.lookup.pop_closest_node)
49 def _test_add_pop2(self):
50 self.lookup.add(tc.NODES[3:6])
51 eq_(self.lookup.pop_closest_node(), tc.NODES[3])
52 eq_(self.lookup.pop_closest_node(), tc.NODES[4])
53 self.lookup.add(tc.NODES[2:3])
54 eq_(self.lookup.pop_closest_node(), tc.NODES[2])
55 eq_(self.lookup.pop_closest_node(), tc.NODES[5])
57 assert_raises(IndexError, self.lookup.pop_closest_node)
58 # This add only affects 0,1,6,7
59 self.lookup.add(tc.NODES)
60 eq_(self.lookup.pop_closest_node(), tc.NODES[0])
61 eq_(self.lookup.pop_closest_node(), tc.NODES[1])
62 eq_(self.lookup.pop_closest_node(), tc.NODES[6])
63 eq_(self.lookup.pop_closest_node(), tc.NODES[7])
66 class TestGetPeersLookup:
68 def _callback(self, peers):
69 self.got_peers = peers
73 querier_ = querier.QuerierMock(tc.CLIENT_ID)
74 bootstrap_nodes = RoutingManagerMock(
75 ).get_closest_rnodes(tc.INFO_HASH_ZERO)
76 self.lookup = lookup_manager.GetPeersLookup(tc.CLIENT_ID,
86 def _test_complete(self):
88 """Start sends two parallel queries to the closest
89 bootstrap nodes (to the INFO_HASH)
92 # Ongoing queries to (sorted: oldest first):
94 # Queued nodes to query (sorted by log_distance to info_hash):
96 # Notice 159-2 is kicked out from the queue
98 eq_(self.lookup.num_parallel_queries, 2)
99 nodes = [tc.NODES_LD_IH[157][5],
100 tc.NODES_LD_IH[152][6],
101 tc.NODES_LD_IH[158][7]]
102 self.lookup._on_response(*_gen_nodes_args(
103 tc.NODES_LD_IH[157][3],
105 eq_(self.lookup._get_announce_candidates(),
106 [tc.NODES_LD_IH[157][3],
108 # This response triggers a new query (to 152-6)
109 eq_(self.lookup.num_parallel_queries, 2)
110 # Ongoing queries to (sorted: oldest first):
112 # Queued nodes to query (sorted by log_distance to info_hash):
113 # 157-5, 158-1, 158-7, 159-0
114 self.lookup._on_timeout(tc.NODES_LD_IH[155][4])
115 eq_(self.lookup.num_parallel_queries, 2)
116 # This timeout triggers a new query (to 157-5)
117 eq_(self.lookup.num_parallel_queries, 2)
118 # Ongoing queries to (sorted: oldest first):
120 # Queued nodes to query (sorted by log_distance to info_hash):
121 # 158-1, 158-7, 159-0
122 self.lookup._on_timeout(tc.NODES_LD_IH[155][4])
123 # This timeout triggers a new query (to 158-1)
124 eq_(self.lookup.num_parallel_queries, 2)
125 # Ongoing queries to (sorted: oldest first):
127 # Queued nodes to query (sorted by log_distance to info_hash):
129 nodes = [tc.NODES_LD_IH[151][8],
130 tc.NODES_LD_IH[150][9]]
131 self.lookup._on_response(*_gen_nodes_args(
132 tc.NODES_LD_IH[152][6],
134 eq_(self.lookup._get_announce_candidates(),
135 [tc.NODES_LD_IH[152][6],
136 tc.NODES_LD_IH[157][3],
138 # This response triggers a new query (to 150-9)
139 eq_(self.lookup.num_parallel_queries, 2)
140 # Ongoing queries to (sorted: oldest first):
142 # Queued nodes to query (sorted by log_distance to info_hash):
143 # 151-8, 158-7, 159-0
144 nodes = [tc.NODES_LD_IH[151][10],
145 tc.NODES_LD_IH[151][11],
146 tc.NODES_LD_IH[156][12],
147 tc.NODES_LD_IH[156][13],
149 self.lookup._on_response(*_gen_nodes_args(
150 tc.NODES_LD_IH[157][5],
152 eq_(self.lookup._get_announce_candidates(),
153 [tc.NODES_LD_IH[152][6],
154 tc.NODES_LD_IH[157][3],
155 tc.NODES_LD_IH[157][5],
157 # This response triggers a new query (to 151-8)
158 eq_(self.lookup.num_parallel_queries, 2)
159 # Ongoing queries to (sorted: oldest first):
161 # Queued nodes to query (sorted by log_distance to info_hash):
162 # 151-10, 151-11, 156-12, 156-13
163 # Notice that the lookup queue size limit is 4, therefore
164 # 158-7 and 159-0 are removed from the queue
165 self.lookup._on_error(None, tc.NODES_LD_IH[151][8])
166 # This error triggers a new query (to 151-8)
167 eq_(self.lookup.num_parallel_queries, 2)
168 # Ongoing queries to (sorted: oldest first):
170 # Queued nodes to query (sorted by log_distance to info_hash):
171 # 151-11, 156-12, 156-13
172 self.lookup._on_timeout(tc.NODES_LD_IH[151][8])
173 # This timeout triggers a new query (to 151-11)
174 eq_(self.lookup.num_parallel_queries, 2)
175 # Ongoing queries to (sorted: oldest first):
177 # Queued nodes to query (sorted by log_distance to info_hash):
179 nodes = [tc.NODES_LD_IH[144][14],
180 tc.NODES_LD_IH[145][15],
181 tc.NODES_LD_IH[145][16],
182 tc.NODES_LD_IH[145][17],
184 self.lookup._on_response(*_gen_nodes_args(
185 tc.NODES_LD_IH[151][10],
187 eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[151][10],
188 tc.NODES_LD_IH[152][6],
189 tc.NODES_LD_IH[157][3],
191 # This response triggers a new query (to 144-14)
192 eq_(self.lookup.num_parallel_queries, 2)
193 # Ongoing queries to (sorted: oldest first):
195 # Queued nodes to query (sorted by log_distance to info_hash):
196 # Notice 156-13 is removed
197 # 145-15, 145-16, 145-17, 156-12
199 ok_(not self.got_peers)
200 self.lookup._on_response(*_gen_peers_args(
201 tc.NODES_LD_IH[144][14],
203 eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
204 tc.NODES_LD_IH[151][10],
205 tc.NODES_LD_IH[152][6],
208 self.got_peers = False
209 # The response with peers halves parallelism to 1.
210 # No new query is triggered.
211 eq_(self.lookup.num_parallel_queries, 1)
212 # Ongoing queries to (sorted: oldest first):
214 # Queued nodes to query (sorted by log_distance to info_hash):
215 # 145-15, 145-16, 156-12
216 self.lookup._on_timeout(tc.NODES_LD_IH[151][11])
217 # This timeout triggers a new query (to 145-15)
218 eq_(self.lookup.num_parallel_queries, 1)
219 # Ongoing queries to (sorted: oldest first):
221 # Queued nodes to query (sorted by log_distance to info_hash):
222 # 145-16, 145-17, 156-12
224 ok_(not self.got_peers)
225 self.lookup._on_response(*_gen_peers_args(
226 tc.NODES_LD_IH[145][15],
228 # This response triggers a new query (to 145-16)
229 # The parallelism is not halved (remains 1).
230 eq_(self.lookup.num_parallel_queries, 1)
231 # Ongoing queries to (sorted: oldest first):
233 # Queued nodes to query (sorted by log_distance to info_hash):
235 eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
236 tc.NODES_LD_IH[145][15],
237 tc.NODES_LD_IH[151][10],
240 self.got_peers = False
241 self.lookup._on_timeout(tc.NODES_LD_IH[145][16])
242 # This timeout triggers a new query (to 145-17)
243 eq_(self.lookup.num_parallel_queries, 1)
244 # Ongoing queries to (sorted: oldest first):
246 # Queued nodes to query (sorted by log_distance to info_hash):
248 self.lookup._on_timeout(tc.NODES_LD_IH[145][17])
249 # This timeout triggers a new query (to 156-12)
251 eq_(self.lookup.num_parallel_queries, 1)
252 # Ongoing queries to (sorted: oldest first):
254 # Queued nodes to query (sorted by log_distance to info_hash):
256 nodes = [tc.NODES_LD_IH[144][18],
257 tc.NODES_LD_IH[145][19],
259 self.lookup._on_response(*_gen_nodes_args(
260 tc.NODES_LD_IH[156][12],
262 eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
263 tc.NODES_LD_IH[145][15],
264 tc.NODES_LD_IH[151][10],
266 # This response triggers a new query (to 144-18)
267 eq_(self.lookup.num_parallel_queries, 1)
268 # Ongoing queries to (sorted: oldest first):
270 # Queued nodes to query (sorted by log_distance to info_hash):
273 ok_(not self.got_peers)
274 self.lookup._on_response(*_gen_peers_args(
275 tc.NODES_LD_IH[144][18],
277 eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
278 tc.NODES_LD_IH[144][18],
279 tc.NODES_LD_IH[145][15],
282 self.got_peers = False
283 # This timeout triggers a new query (145-19)
284 eq_(self.lookup.num_parallel_queries, 0)
285 # Ongoing queries to (sorted: oldest first):
287 # Queued nodes to query (sorted by log_distance to info_hash):
289 ok_(not self.lookup.is_done)
290 self.lookup._on_timeout(tc.NODES_LD_IH[145][19])
292 eq_(self.lookup.num_parallel_queries, 0)
293 ok_(self.lookup.is_done)
295 def test_dont_query_myself(self):
296 logger.debug('test start')
298 # Ongoing queries to (sorted: oldest first):
300 # Queued nodes to query (sorted by log_distance to info_hash):
302 # Notice 159-2 is kicked out from the queue
303 eq_(self.lookup.num_parallel_queries, 2)
304 nodes = [Node(tc.CLIENT_ADDR, self.lookup._my_id)]
305 self.lookup._on_response(*_gen_nodes_args(
306 tc.NODES_LD_IH[157][3],
308 eq_(self.lookup._get_announce_candidates(),
309 [tc.NODES_LD_IH[157][3],
311 # This response triggers a new query to 158-1 (ignoring myself)
312 eq_(self.lookup.num_parallel_queries, 2)
313 # Ongoing queries to (sorted: oldest first):
315 # Queued nodes to query (sorted by log_distance to info_hash):
317 self.lookup._on_timeout(tc.NODES_LD_IH[155][4])
318 # This timeout triggers a new query (to 159-0)
319 eq_(self.lookup.num_parallel_queries, 2)
320 self.lookup._on_timeout(tc.NODES_LD_IH[158][1])
321 # No more nodes to send queries to
322 eq_(self.lookup.num_parallel_queries, 1)
323 ok_(not self.lookup.is_done)
324 self.lookup._on_timeout(tc.NODES_LD_IH[159][0])
325 # No more nodes to send queries to
326 eq_(self.lookup.num_parallel_queries, 0)
327 ok_(self.lookup.is_done)
330 class TestLookupManager:
332 def _on_got_peers(self, peers):
333 self.got_peers = peers
337 self.got_peers = None
338 querier_ = querier.QuerierMock(tc.CLIENT_ID)
339 routing_m = RoutingManagerMock()
340 self.bootstrap_nodes = routing_m.get_closest_rnodes(
342 self.lm = lookup_manager.LookupManager(tc.CLIENT_ID,
346 self.lookup = self.lm.get_peers(tc.INFO_HASH, self._on_got_peers,
349 def test_all_nodes_timeout(self):
350 for node_ in self.bootstrap_nodes:
351 self.lookup._on_timeout(node_)
352 ok_(self.lookup.is_done)
354 def test_peers(self):
355 self.lookup._on_response(*_gen_peers_args(
356 self.bootstrap_nodes[0],
358 for node_ in self.bootstrap_nodes[1:]:
359 self.lookup._on_timeout(node_)
360 ok_(self.lookup.is_done)
364 def _gen_nodes_args(node_, nodes):
365 out_msg = message.OutgoingGetPeersResponse(
368 nodes2=nodes).encode(tc.TID)
369 in_msg = message.IncomingMsg(out_msg)
370 in_msg.sanitize_response(message.GET_PEERS)
373 def _gen_peers_args(node_, peers):
374 out_msg = message.OutgoingGetPeersResponse(
377 peers=peers).encode(tc.TID)
378 in_msg = message.IncomingMsg(out_msg)
379 in_msg.sanitize_response(message.GET_PEERS)