instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / DecentralizedTracking / kadtracker / test_querier.py
1 # Copyright (C) 2009 Raul Jimenez
2 # Released under GNU LGPL 2.1
3 # See LICENSE.txt for more information
4
5 from nose.tools import ok_, eq_
6
7 import sys
8 import time
9 import logging, logging_conf
10
11 import node
12 import identifier
13 import message
14 import minitwisted
15 import rpc_manager
16 import test_const as tc
17
18 import querier
19
20 logging_conf.testing_setup(__name__)
21 logger = logging.getLogger('dht')
22
23
24 RUN_CPU_INTENSIVE_TESTS = False
25 RUN_NETWORK_TESTS = False # Requires a running external DHT node
26
27 class TestQuery:
28
29     def setup(self):
30         self.ping_msg = message.OutgoingPingQuery(tc.CLIENT_ID)
31         ping_r_out = message.OutgoingPingResponse(tc.SERVER_ID)
32         self.ping_r_in = message.IncomingMsg(ping_r_out.encode(tc.TID))
33         fn_r_out = message.OutgoingFindNodeResponse(tc.SERVER_ID,
34                                                     nodes2=tc.NODES)
35         self.fn_r_in = message.IncomingMsg(fn_r_out.encode(tc.TID))
36
37         self.got_response = False
38         self.got_error = False
39         self.got_timeout = False
40         
41         self.got_routing_response = False
42         self.got_routing_error = False
43         self.got_routing_timeout = False
44         self.got_routing_nodes_found = False
45
46         self.query = querier.Query(tc.TID, self.ping_msg.query, tc.SERVER_NODE,
47                                    self.on_response,
48                                    self.on_error,
49                                    self.on_timeout,
50                                    self.on_routing_response,
51                                    self.on_routing_error,
52                                    self.on_routing_timeout,
53                                    self.on_routing_nodes_found)
54         self.query.timeout_task = minitwisted.Task(1, self.on_timeout,
55                                                    tc.SERVER_NODE) 
56         
57     def on_response(self, response_msg, addr):
58         self.got_response = True
59
60     def on_error(self, error_msg, addr):
61         self.got_error = True
62
63     def on_timeout(self, addr):
64         self.got_timeout = True
65
66     def on_routing_response(self, node_):
67         self.got_routing_response = True
68
69     def on_routing_error(self, node_):
70         self.got_routing_error = True
71
72     def on_routing_timeout(self, node_):
73         self.got_routing_timeout = True
74
75     def on_routing_nodes_found(self, node_):
76         self.got_routing_nodes_found = True
77
78
79     def test_fire_callback_on_response(self):
80         # the server creates the response
81         pong_msg = message.OutgoingPingResponse(tc.SERVER_ID)
82         pong_data = pong_msg.encode(tc.TID)
83         # rpc_m decodes the response received
84         pong_msg = message.IncomingMsg(pong_data)
85         # querier notifies of the message (callback)
86         self.query.on_response_received(pong_msg)
87         ok_(self.got_response)
88         ok_(not self.got_error)
89         ok_(not self.got_timeout)
90
91     def test_fire_callback_on_error(self):
92         # the server creates the response
93         error_msg = message.OutgoingErrorMsg(message.GENERIC_E)
94         error_data = error_msg.encode(tc.TID)
95         # rpc_m decodes the response received
96         error_msg = message.IncomingMsg(error_data)
97         # querier notifies of the message (callback)
98         self.query.on_error_received(error_msg)
99         assert not self.got_response and self.got_error
100
101     def test_on_timeout(self):
102         ok_(not self.got_timeout)
103         ok_(not self.got_routing_timeout)
104         self.query.on_timeout()
105         ok_(self.got_timeout)
106         ok_(self.got_routing_timeout)
107         
108     def test_fire_callback_on_timeout(self):
109         self.query.timeout_task.fire_callbacks()
110         self.query.timeout_task.cancel()
111         assert not self.got_response and not self.got_error \
112                and self.got_timeout
113         
114     def test_fire_callback_on_late_response(self):
115         self.query.timeout_task.fire_callbacks()
116         self.query.timeout_task.cancel()
117         # the server creates the response
118         pong_msg = message.OutgoingPingResponse(tc.SERVER_ID)
119         pong_data = pong_msg.encode(tc.TID)
120         # rpc_m decodes the response received
121         pong_msg = message.IncomingMsg(pong_data)
122         # querier notifies of the message (but it's too late)
123         self.query.on_response_received(pong_msg)
124         logger.warning(
125             "**IGNORE WARNING LOG**")
126         assert not self.got_response and not self.got_error \
127                and self.got_timeout
128         
129     def test_invalid_response_received(self):
130         # Response received is invalid
131         self.ping_r_in._msg_dict[message.RESPONSE] = 'zz'
132         ok_(not self.got_response) 
133         logger.warning(
134             "**IGNORE WARNING LOG**")
135         self.query.on_response_received(self.ping_r_in)
136         ok_(not self.got_response)
137
138     def test_response_contains_nodes(self):
139         # Trick query to accept find node response
140         self.query.query = message.FIND_NODE
141         ok_(not self.got_response)
142         ok_(not self.got_routing_response)
143         ok_(not self.got_routing_nodes_found)
144         self.query.on_response_received(self.fn_r_in)
145         ok_(self.got_response)
146         ok_(self.got_routing_response)
147         ok_(self.got_routing_nodes_found)
148
149     def test_different_node_id(self):
150         # We are expecting response from SERVER_NODE
151         # Here we test if the response contains an ID
152         # different to SERVER_ID
153         self.query.node = node.Node(tc.SERVER_ADDR,
154                                     tc.CLIENT_ID)
155         ok_(not self.got_response)
156         ok_(not self.got_routing_response)
157         ok_(not self.got_routing_nodes_found)
158         self.query.on_response_received(self.fn_r_in)
159         ok_(not self.got_response)
160         ok_(not self.got_routing_response)
161         ok_(not self.got_routing_nodes_found)
162
163     def teardown(self):
164         pass
165
166 class TestQuerier:
167
168     def setup(self):
169         if RUN_NETWORK_TESTS:
170             time.sleep(1) # Reduce test interdependence
171         self.got_response = False
172         self.got_timeout = False
173         self.got_error = False
174         self.found_nodes = False
175
176         self.got_routing_response = False
177         self.got_routing_error = False
178         self.got_routing_timeout = False
179         self.got_routing_nodes_found = False
180
181         self.querier_mock = querier.QuerierMock(tc.CLIENT_ID)
182
183         self.r = minitwisted.ThreadedReactor(task_interval=.01)
184         self.rpc_m = rpc_manager.RPCManager(self.r,
185                                             tc.CLIENT_ADDR[1])
186         self.querier = querier.Querier(self.rpc_m,
187                                             tc.CLIENT_NODE)
188         self.querier_routing = querier.Querier(self.rpc_m,
189                                                tc.CLIENT_NODE)
190         self.querier_routing.set_on_response_received_callback(
191             self.on_routing_response)
192         self.querier_routing.set_on_error_received_callback(
193             self.on_routing_error)
194         self.querier_routing.set_on_timeout_callback(
195             self.on_routing_timeout)
196         self.querier_routing.set_on_nodes_found_callback(
197             self.on_routing_nodes_found)
198   
199         self.r.start()
200
201
202         
203     def on_response(self, response_msg, node_):
204         self.got_response = True
205
206     def on_timeout(self, node_):
207         self.got_timeout = True
208
209     def on_error(self, error_msg, node_):
210         self.got_error = True
211
212     def on_routing_response(self, node_):
213         self.got_routing_response = True
214
215     def on_routing_error(self, node_):
216         self.got_routing_error = True
217
218     def on_routing_timeout(self, node_):
219         self.got_routing_timeout = True
220
221     def on_routing_nodes_found(self, node_):
222         self.got_routing_nodes_found = True
223
224
225     def test_generate_tids(self):
226         num_tids = 1000
227         if RUN_CPU_INTENSIVE_TESTS:
228             num_tids =  pow(2, 16) + 2 #CPU intensive
229         for i in xrange(num_tids):
230             eq_(self.querier._next_tid(),
231                 chr(i%256)+chr((i/256)%256))
232
233         
234         
235     def send_query_and_get_response(self, querier_, later_delay=0):
236         ping_msg = message.OutgoingPingQuery(tc.CLIENT_ID)
237         msg = message.OutgoingFindNodeQuery(tc.CLIENT_ID,
238                                                  tc.CLIENT_ID)
239         if later_delay:
240             task = querier_.send_query_later(later_delay,
241                                              msg,
242                                              tc.EXTERNAL_NODE,
243                                              self.on_response,
244                                              self.on_timeout,
245                                              self.on_error,
246                                              tc.TIMEOUT_DELAY)
247             # This second query is just to have two elements
248             # in the querier_.pending[tc.EXTERNAL_ADDR] list
249             task = querier_.send_query_later(later_delay,
250                                              msg,
251                                              tc.EXTERNAL_NODE,
252                                              self.on_response,
253                                              self.on_timeout,
254                                              self.on_error,
255                                              tc.TIMEOUT_DELAY)
256         else:
257             node_ = (querier_ == self.querier_mock) and tc.SERVER_NODE
258             query = querier_.send_query(ping_msg, node_ or tc.EXTERNAL_NODE,
259                                         self.on_response,
260                                         self.on_timeout, self.on_error,
261                                         timeout_delay=tc.TIMEOUT_DELAY)
262         # querier creates TID
263         msg_tid = '\0\0'
264         if querier_ is self.querier_mock:
265             # server gets query
266             # the server creates the response
267             pong_msg = message.OutgoingPingResponse(tc.SERVER_ID)
268             pong_msg_data = pong_msg.encode(msg_tid)
269             # the client gets the response
270             # rpc_m decodes msg and calls callback
271             pong_msg = message.IncomingMsg(pong_msg_data)
272             querier_.on_response_received(pong_msg, tc.SERVER_ADDR)
273         if later_delay:
274             ok_(not self.got_response)
275             ok_(not self.got_timeout)
276             time.sleep(later_delay*2)
277         time.sleep(tc.TIMEOUT_DELAY+.1)
278         ### It crashed (timeout_task.cancelled??????)
279
280
281         #TODO2: move the 'real' tests to integration
282         
283         ###############################################
284         ### A DHT node must be running on peer_addr ###
285         ###############################################
286         ok_(self.got_response)
287         ok_(not self.got_timeout)
288         ###############################################
289         ###############################################
290
291     def send_query_and_get_error(self, querier):
292
293
294         ping_msg = message.OutgoingPingQuery()
295         query = querier.send_query(ping_msg, tc.EXTERNAL_NODE,
296                                    self.on_response,
297                                    self.on_timeout, self.on_error,
298                                    timeout_delay=tc.TIMEOUT_DELAY)
299         if querier is self.querier_mock:
300             # the server creates the response
301             error_msg = message.OutgoingErrorMsg(ping_msg.tid,
302                                                  message.GENERIC_E)
303             error_data = error_msg.encode()
304             # rpc_m decodes the response received
305             _, _, error_msg_dict = message.decode(error_data)
306             # rpc_m notifies of the message (callback)
307             querier.on_error_received(error_msg_dict, tc.EXTERNAL_NODE)
308         time.sleep(tc.TIMEOUT_DELAY + .1)
309         
310         ### It crashed (timeout_task.cancelled??????)
311
312
313         #TODO2: move the 'real' tests to integration
314         
315         ###############################################
316         ### A DHT node must be running on peer_addr ###
317         ###############################################
318         ########## assert self.got_response and not self.got_timeout
319         ###############################################
320         ###############################################
321
322
323
324     def send_query_and_get_timeout(self, querier):
325         ping_msg = message.OutgoingPingQuery(tc.CLIENT_ID)
326         query = querier.send_query(ping_msg, tc.DEAD_NODE,
327                                    self.on_response,
328                                    self.on_timeout, self.on_error,
329                                    timeout_delay=tc.TIMEOUT_DELAY)
330         if querier is self.querier_mock:
331             query.timeout_task.fire_callbacks()
332         time.sleep(tc.TIMEOUT_DELAY + .1)
333         assert not self.got_response and self.got_timeout
334
335     def test_send_query_mock(self):
336         self.send_query_and_get_response(self.querier_mock)
337         ok_(not self.got_routing_response)
338         ok_(not self.got_routing_nodes_found)
339         ok_(not self.got_routing_timeout)
340
341     def test_send_query(self):
342         if RUN_NETWORK_TESTS:
343             self.send_query_and_get_response(self.querier)
344             ok_(not self.got_routing_response)
345             ok_(not self.got_routing_nodes_found)
346             ok_(not self.got_routing_timeout)
347
348     def test_send_query_routing(self):
349         if RUN_NETWORK_TESTS:
350             self.send_query_and_get_response(self.querier_routing)
351             ok_(self.got_routing_response)
352             ok_(not self.got_routing_nodes_found)
353             ok_(not self.got_routing_timeout)
354
355     def test_send_query_timeout_mock(self):
356         self.send_query_and_get_timeout(self.querier_mock)
357         ok_(not self.got_routing_response)
358         ok_(not self.got_routing_nodes_found)
359         ok_(not self.got_routing_timeout)
360
361     def test_send_query_timeout(self):
362         self.send_query_and_get_timeout(self.querier)
363         ok_(not self.got_routing_response)
364         ok_(not self.got_routing_nodes_found)
365         ok_(not self.got_routing_timeout)
366
367     def test_send_query_timeout_routing(self):
368         self.send_query_and_get_timeout(self.querier_routing)
369         ok_(not self.got_routing_response)
370         ok_(not self.got_routing_nodes_found)
371         ok_(self.got_routing_timeout)
372
373     def test_send_query_later(self):
374         if RUN_NETWORK_TESTS:
375             self.send_query_and_get_response(self.querier_routing, .001)
376             ok_(self.got_routing_response)
377             ok_(self.got_routing_nodes_found)
378             ok_(not self.got_routing_timeout)
379
380     def test_unsolicited_response(self):
381         # We have a pending response from NO_ADDR (TID \0\0)
382         # but we get a response with different TID
383
384         # client
385         ping = message.OutgoingPingQuery(tc.CLIENT_ID)
386         self.querier.send_query(ping,
387                                 tc.SERVER_NODE,
388                                 self.on_response,
389                                 self.on_error,
390                                 self.on_timeout,
391                                 tc.TIMEOUT_DELAY)
392         ok_(not self.got_response)
393         ok_(not self.got_routing_response)
394         ok_(not self.got_routing_nodes_found)
395         # server
396         pong = message.OutgoingPingResponse(tc.SERVER_ID)
397         pong_in = message.IncomingMsg(pong.encode(tc.TID))
398         # client
399         self.querier.on_response_received(pong_in,
400                                                tc.SERVER_ADDR)
401         ok_(not self.got_response)
402         ok_(not self.got_routing_response)
403         ok_(not self.got_routing_nodes_found)
404
405     def test_error(self):
406         msg = message.OutgoingErrorMsg(message.GENERIC_E)
407         self.querier.on_error_received(msg, tc.SERVER_ADDR)
408
409
410     def teardown(self):
411         self.querier_mock.stop()
412         self.querier.stop()
413         self.querier_routing.stop()
414         
415