instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / DecentralizedTracking / kadtracker / test_lookup_manager.py
1 # Copyright (C) 2009 Flutra Osmani, Raul Jimenez
2 # Released under GNU LGPL 2.1
3 # See LICENSE.txt for more information
4
5 from nose.tools import eq_, ok_, assert_raises
6 import test_const as tc
7 import logging, logging_conf
8
9 import time
10
11 import querier
12 from routing_manager import RoutingManagerMock
13 import lookup_manager
14 import message
15 from identifier import Id, ID_SIZE_BYTES
16 from node import Node
17
18 logging_conf.testing_setup(__name__)
19 logger = logging.getLogger('dht')
20
21
22 class TestLookupQueue:
23
24     def setup(self):
25         self.lookup = lookup_manager._LookupQueue(tc.INFO_HASH_ZERO, 4)
26
27     def test_add_pop1(self):
28         nodes = (tc.NODES_LD_IH[157][0],
29                  tc.NODES_LD_IH[158][1],
30                  tc.NODES_LD_IH[154][2],
31                  tc.NODES_LD_IH[159][3],
32                  tc.NODES_LD_IH[158][4],
33                  tc.NODES_LD_IH[152][5],)
34         self.lookup.add(nodes)
35         # Just the 4 closest nodes are added
36         #This second add doesn't affect (duplicates are ignored)
37         self.lookup.add(nodes)
38         eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[152][5])
39         eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[154][2])
40         eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[157][0])
41         eq_(self.lookup.pop_closest_node(), tc.NODES_LD_IH[158][1])
42         # Now the queue is empty
43         assert_raises(IndexError, self.lookup.pop_closest_node)
44         self.lookup.add(nodes)
45         # The nodes added are ingnored
46         assert_raises(IndexError, self.lookup.pop_closest_node)
47
48
49     def _test_add_pop2(self):
50         self.lookup.add(tc.NODES[3:6])
51         eq_(self.lookup.pop_closest_node(), tc.NODES[3])
52         eq_(self.lookup.pop_closest_node(), tc.NODES[4])
53         self.lookup.add(tc.NODES[2:3])
54         eq_(self.lookup.pop_closest_node(), tc.NODES[2])
55         eq_(self.lookup.pop_closest_node(), tc.NODES[5])
56         # Empty
57         assert_raises(IndexError, self.lookup.pop_closest_node)
58         # This add only affects 0,1,6,7
59         self.lookup.add(tc.NODES)
60         eq_(self.lookup.pop_closest_node(), tc.NODES[0])
61         eq_(self.lookup.pop_closest_node(), tc.NODES[1])
62         eq_(self.lookup.pop_closest_node(), tc.NODES[6])
63         eq_(self.lookup.pop_closest_node(), tc.NODES[7])
64
65
66 class TestGetPeersLookup:
67
68     def _callback(self, peers):
69         self.got_peers = peers
70
71     def setup(self):
72         self.got_peers = None
73         querier_ = querier.QuerierMock(tc.CLIENT_ID)
74         bootstrap_nodes = RoutingManagerMock(
75             ).get_closest_rnodes(tc.INFO_HASH_ZERO)
76         self.lookup = lookup_manager.GetPeersLookup(tc.CLIENT_ID,
77                                                 querier_,
78                                                 2,
79                                                 tc.INFO_HASH_ZERO,
80                                                 self._callback,
81                                                 bootstrap_nodes)
82
83     def test_n(self):
84         pass
85         
86     def _test_complete(self):
87         self.lookup.start()
88         """Start sends two parallel queries to the closest
89         bootstrap nodes (to the INFO_HASH)
90
91         """
92         # Ongoing queries to (sorted: oldest first):
93         # 155-4, 157-3, 
94         # Queued nodes to query (sorted by log_distance to info_hash):
95         # 158-1, 159-0
96         # Notice 159-2 is kicked out from the queue
97         logger.critical("")
98         eq_(self.lookup.num_parallel_queries, 2)
99         nodes = [tc.NODES_LD_IH[157][5],
100                  tc.NODES_LD_IH[152][6],
101                  tc.NODES_LD_IH[158][7]]
102         self.lookup._on_response(*_gen_nodes_args(
103                 tc.NODES_LD_IH[157][3],
104                 nodes))
105         eq_(self.lookup._get_announce_candidates(),
106             [tc.NODES_LD_IH[157][3],
107              ])
108         # This response triggers a new query (to 152-6)
109         eq_(self.lookup.num_parallel_queries, 2)
110         # Ongoing queries to (sorted: oldest first):
111         # 155-4, 152-6
112         # Queued nodes to query (sorted by log_distance to info_hash):
113         # 157-5, 158-1, 158-7, 159-0
114         self.lookup._on_timeout(tc.NODES_LD_IH[155][4])
115         eq_(self.lookup.num_parallel_queries, 2)
116         # This timeout triggers a new query (to 157-5)
117         eq_(self.lookup.num_parallel_queries, 2) 
118         # Ongoing queries to (sorted: oldest first):
119         # 155-4, 157-5 
120         # Queued nodes to query (sorted by log_distance to info_hash):
121         # 158-1, 158-7, 159-0
122         self.lookup._on_timeout(tc.NODES_LD_IH[155][4])
123         # This timeout triggers a new query (to 158-1)
124         eq_(self.lookup.num_parallel_queries, 2) 
125         # Ongoing queries to (sorted: oldest first):
126         # 152-6, 158-1
127         # Queued nodes to query (sorted by log_distance to info_hash):
128         # 158-7, 159-0
129         nodes = [tc.NODES_LD_IH[151][8],
130                  tc.NODES_LD_IH[150][9]]
131         self.lookup._on_response(*_gen_nodes_args(
132                 tc.NODES_LD_IH[152][6],
133                 nodes))
134         eq_(self.lookup._get_announce_candidates(),
135             [tc.NODES_LD_IH[152][6],
136              tc.NODES_LD_IH[157][3],
137              ])
138         # This response triggers a new query (to 150-9)
139         eq_(self.lookup.num_parallel_queries, 2) 
140         # Ongoing queries to (sorted: oldest first):
141         # 157-5, 150-9
142         # Queued nodes to query (sorted by log_distance to info_hash):
143         # 151-8, 158-7, 159-0
144         nodes = [tc.NODES_LD_IH[151][10],
145                  tc.NODES_LD_IH[151][11],
146                  tc.NODES_LD_IH[156][12],
147                  tc.NODES_LD_IH[156][13],
148                  ]
149         self.lookup._on_response(*_gen_nodes_args(
150                 tc.NODES_LD_IH[157][5],
151                 nodes))
152         eq_(self.lookup._get_announce_candidates(),
153             [tc.NODES_LD_IH[152][6],
154              tc.NODES_LD_IH[157][3],
155              tc.NODES_LD_IH[157][5],
156                                                ])
157         # This response triggers a new query (to 151-8)
158         eq_(self.lookup.num_parallel_queries, 2) 
159         # Ongoing queries to (sorted: oldest first):
160         # 150-9, 151-8
161         # Queued nodes to query (sorted by log_distance to info_hash):
162         # 151-10, 151-11, 156-12, 156-13
163         # Notice that the lookup queue size limit is 4, therefore
164         # 158-7 and 159-0 are removed from the queue
165         self.lookup._on_error(None, tc.NODES_LD_IH[151][8])
166         # This error triggers a new query (to 151-8)
167         eq_(self.lookup.num_parallel_queries, 2)
168         # Ongoing queries to (sorted: oldest first):
169         # 150-9, 151-10
170         # Queued nodes to query (sorted by log_distance to info_hash):
171         # 151-11, 156-12, 156-13
172         self.lookup._on_timeout(tc.NODES_LD_IH[151][8])
173         # This timeout triggers a new query (to 151-11)
174         eq_(self.lookup.num_parallel_queries, 2)
175         # Ongoing queries to (sorted: oldest first):
176         # 151-10, 151-11
177         # Queued nodes to query (sorted by log_distance to info_hash):
178         # 156-12, 156-13
179         nodes = [tc.NODES_LD_IH[144][14],
180                  tc.NODES_LD_IH[145][15],
181                  tc.NODES_LD_IH[145][16],
182                  tc.NODES_LD_IH[145][17],
183                  ]
184         self.lookup._on_response(*_gen_nodes_args(
185                 tc.NODES_LD_IH[151][10],
186                 nodes))
187         eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[151][10],
188                                                      tc.NODES_LD_IH[152][6],
189                                                      tc.NODES_LD_IH[157][3],
190                                                ])
191         # This response triggers a new query (to 144-14)
192         eq_(self.lookup.num_parallel_queries, 2)
193         # Ongoing queries to (sorted: oldest first):
194         # 151-11, 144-14
195         # Queued nodes to query (sorted by log_distance to info_hash):
196         # Notice 156-13 is removed
197         # 145-15, 145-16, 145-17, 156-12
198         peers = [tc.NO_ADDR]
199         ok_(not self.got_peers)
200         self.lookup._on_response(*_gen_peers_args(
201                 tc.NODES_LD_IH[144][14],
202                 peers))
203         eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
204                                                tc.NODES_LD_IH[151][10],
205                                                tc.NODES_LD_IH[152][6],
206                                                ])
207         ok_(self.got_peers)
208         self.got_peers = False
209         # The response with peers halves parallelism to 1.
210         # No new query is  triggered.
211         eq_(self.lookup.num_parallel_queries, 1)
212         # Ongoing queries to (sorted: oldest first):
213         # 151-11
214         # Queued nodes to query (sorted by log_distance to info_hash):
215         # 145-15, 145-16, 156-12
216         self.lookup._on_timeout(tc.NODES_LD_IH[151][11])
217         # This timeout triggers a new query (to 145-15)
218         eq_(self.lookup.num_parallel_queries, 1)
219         # Ongoing queries to (sorted: oldest first):
220         # 145-15
221         # Queued nodes to query (sorted by log_distance to info_hash):
222         # 145-16, 145-17, 156-12
223         peers = [tc.NO_ADDR]
224         ok_(not self.got_peers)
225         self.lookup._on_response(*_gen_peers_args(
226                 tc.NODES_LD_IH[145][15],
227                 peers))
228         # This response triggers a new query (to 145-16)
229         # The parallelism is not halved (remains 1).
230         eq_(self.lookup.num_parallel_queries, 1)
231         # Ongoing queries to (sorted: oldest first):
232         # 145-16
233         # Queued nodes to query (sorted by log_distance to info_hash):
234         # 145-17, 156-12
235         eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
236                                                tc.NODES_LD_IH[145][15],
237                                                tc.NODES_LD_IH[151][10],
238                                                ])
239         ok_(self.got_peers)
240         self.got_peers = False
241         self.lookup._on_timeout(tc.NODES_LD_IH[145][16])
242         # This timeout triggers a new query (to 145-17)
243         eq_(self.lookup.num_parallel_queries, 1)
244         # Ongoing queries to (sorted: oldest first):
245         # 145-17
246         # Queued nodes to query (sorted by log_distance to info_hash):
247         # 156-12
248         self.lookup._on_timeout(tc.NODES_LD_IH[145][17])
249         # This timeout triggers a new query (to 156-12)
250         return
251         eq_(self.lookup.num_parallel_queries, 1)
252         # Ongoing queries to (sorted: oldest first):
253         # 156-12
254         # Queued nodes to query (sorted by log_distance to info_hash):
255         # 
256         nodes = [tc.NODES_LD_IH[144][18],
257                  tc.NODES_LD_IH[145][19],
258                  ]
259         self.lookup._on_response(*_gen_nodes_args(
260                 tc.NODES_LD_IH[156][12],
261                 nodes))
262         eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
263                                                tc.NODES_LD_IH[145][15],
264                                                tc.NODES_LD_IH[151][10],
265                                                ])
266         # This response triggers a new query (to 144-18)
267         eq_(self.lookup.num_parallel_queries, 1)
268         # Ongoing queries to (sorted: oldest first):
269         # 144-18
270         # Queued nodes to query (sorted by log_distance to info_hash):
271         # 145-19
272         peers = [tc.NO_ADDR]
273         ok_(not self.got_peers)
274         self.lookup._on_response(*_gen_peers_args(
275                 tc.NODES_LD_IH[144][18],
276                 peers))
277         eq_(self.lookup._get_announce_candidates(), [tc.NODES_LD_IH[144][14],
278                                                tc.NODES_LD_IH[144][18],
279                                                tc.NODES_LD_IH[145][15],
280                                                ])
281         ok_(self.got_peers)
282         self.got_peers = False
283         # This timeout triggers a new query (145-19)
284         eq_(self.lookup.num_parallel_queries, 0)
285         # Ongoing queries to (sorted: oldest first):
286         # 145-19
287         # Queued nodes to query (sorted by log_distance to info_hash):
288         #
289         ok_(not self.lookup.is_done)
290         self.lookup._on_timeout(tc.NODES_LD_IH[145][19])
291         # THE END
292         eq_(self.lookup.num_parallel_queries, 0)
293         ok_(self.lookup.is_done)
294
295     def test_dont_query_myself(self):
296         logger.debug('test start')
297         self.lookup.start()
298         # Ongoing queries to (sorted: oldest first):
299         # 155-4, 157-3, 
300         # Queued nodes to query (sorted by log_distance to info_hash):
301         # 158-1, 159-0
302         # Notice 159-2 is kicked out from the queue
303         eq_(self.lookup.num_parallel_queries, 2)
304         nodes = [Node(tc.CLIENT_ADDR, self.lookup._my_id)]
305         self.lookup._on_response(*_gen_nodes_args(
306                 tc.NODES_LD_IH[157][3],
307                 nodes))
308         eq_(self.lookup._get_announce_candidates(),
309             [tc.NODES_LD_IH[157][3],
310              ])
311         # This response triggers a new query to 158-1 (ignoring myself)
312         eq_(self.lookup.num_parallel_queries, 2)
313         # Ongoing queries to (sorted: oldest first):
314         # 155-4, 158-1
315         # Queued nodes to query (sorted by log_distance to info_hash):
316         # 159-0
317         self.lookup._on_timeout(tc.NODES_LD_IH[155][4])
318         # This timeout triggers a new query (to 159-0)
319         eq_(self.lookup.num_parallel_queries, 2) 
320         self.lookup._on_timeout(tc.NODES_LD_IH[158][1])
321         # No more nodes to send queries to
322         eq_(self.lookup.num_parallel_queries, 1)
323         ok_(not self.lookup.is_done)
324         self.lookup._on_timeout(tc.NODES_LD_IH[159][0]) 
325         # No more nodes to send queries to
326         eq_(self.lookup.num_parallel_queries, 0)
327         ok_(self.lookup.is_done)
328
329         
330 class TestLookupManager:
331
332     def _on_got_peers(self, peers):
333         self.got_peers = peers
334     
335     
336     def setup(self):
337         self.got_peers = None
338         querier_ = querier.QuerierMock(tc.CLIENT_ID)
339         routing_m = RoutingManagerMock()
340         self.bootstrap_nodes = routing_m.get_closest_rnodes(
341             tc.INFO_HASH_ZERO)
342         self.lm = lookup_manager.LookupManager(tc.CLIENT_ID,
343                                                querier_,
344                                                routing_m,
345                                                2)
346         self.lookup = self.lm.get_peers(tc.INFO_HASH, self._on_got_peers,
347                                    tc.BT_PORT)
348
349     def test_all_nodes_timeout(self):
350         for node_ in self.bootstrap_nodes:
351             self.lookup._on_timeout(node_)
352         ok_(self.lookup.is_done)
353
354     def test_peers(self):
355         self.lookup._on_response(*_gen_peers_args(
356                 self.bootstrap_nodes[0],
357                 [tc.NO_ADDR]))
358         for node_ in self.bootstrap_nodes[1:]:
359             self.lookup._on_timeout(node_)
360         ok_(self.lookup.is_done)
361     def teardown(self):
362         self.lm.stop()
363         
364 def _gen_nodes_args(node_, nodes):
365     out_msg = message.OutgoingGetPeersResponse(
366         node_.id,
367         tc.TOKEN,
368         nodes2=nodes).encode(tc.TID)
369     in_msg = message.IncomingMsg(out_msg)
370     in_msg.sanitize_response(message.GET_PEERS)
371     return in_msg, node_
372
373 def _gen_peers_args(node_, peers):
374     out_msg = message.OutgoingGetPeersResponse(
375         node_.id,
376         tc.TOKEN,
377         peers=peers).encode(tc.TID)
378     in_msg = message.IncomingMsg(out_msg)
379     in_msg.sanitize_response(message.GET_PEERS)
380     return in_msg, node_
381