1 # Copyright (C) 2009 Raul Jimenez
2 # Released under GNU LGPL 2.1
3 # See LICENSE.txt for more information
5 from nose.tools import ok_, eq_
9 import logging, logging_conf
16 import test_const as tc
20 logging_conf.testing_setup(__name__)
21 logger = logging.getLogger('dht')
24 RUN_CPU_INTENSIVE_TESTS = False
25 RUN_NETWORK_TESTS = False # Requires a running external DHT node
30 self.ping_msg = message.OutgoingPingQuery(tc.CLIENT_ID)
31 ping_r_out = message.OutgoingPingResponse(tc.SERVER_ID)
32 self.ping_r_in = message.IncomingMsg(ping_r_out.encode(tc.TID))
33 fn_r_out = message.OutgoingFindNodeResponse(tc.SERVER_ID,
35 self.fn_r_in = message.IncomingMsg(fn_r_out.encode(tc.TID))
37 self.got_response = False
38 self.got_error = False
39 self.got_timeout = False
41 self.got_routing_response = False
42 self.got_routing_error = False
43 self.got_routing_timeout = False
44 self.got_routing_nodes_found = False
46 self.query = querier.Query(tc.TID, self.ping_msg.query, tc.SERVER_NODE,
50 self.on_routing_response,
51 self.on_routing_error,
52 self.on_routing_timeout,
53 self.on_routing_nodes_found)
54 self.query.timeout_task = minitwisted.Task(1, self.on_timeout,
57 def on_response(self, response_msg, addr):
58 self.got_response = True
60 def on_error(self, error_msg, addr):
63 def on_timeout(self, addr):
64 self.got_timeout = True
66 def on_routing_response(self, node_):
67 self.got_routing_response = True
69 def on_routing_error(self, node_):
70 self.got_routing_error = True
72 def on_routing_timeout(self, node_):
73 self.got_routing_timeout = True
75 def on_routing_nodes_found(self, node_):
76 self.got_routing_nodes_found = True
79 def test_fire_callback_on_response(self):
80 # the server creates the response
81 pong_msg = message.OutgoingPingResponse(tc.SERVER_ID)
82 pong_data = pong_msg.encode(tc.TID)
83 # rpc_m decodes the response received
84 pong_msg = message.IncomingMsg(pong_data)
85 # querier notifies of the message (callback)
86 self.query.on_response_received(pong_msg)
87 ok_(self.got_response)
88 ok_(not self.got_error)
89 ok_(not self.got_timeout)
91 def test_fire_callback_on_error(self):
92 # the server creates the response
93 error_msg = message.OutgoingErrorMsg(message.GENERIC_E)
94 error_data = error_msg.encode(tc.TID)
95 # rpc_m decodes the response received
96 error_msg = message.IncomingMsg(error_data)
97 # querier notifies of the message (callback)
98 self.query.on_error_received(error_msg)
99 assert not self.got_response and self.got_error
101 def test_on_timeout(self):
102 ok_(not self.got_timeout)
103 ok_(not self.got_routing_timeout)
104 self.query.on_timeout()
105 ok_(self.got_timeout)
106 ok_(self.got_routing_timeout)
108 def test_fire_callback_on_timeout(self):
109 self.query.timeout_task.fire_callbacks()
110 self.query.timeout_task.cancel()
111 assert not self.got_response and not self.got_error \
114 def test_fire_callback_on_late_response(self):
115 self.query.timeout_task.fire_callbacks()
116 self.query.timeout_task.cancel()
117 # the server creates the response
118 pong_msg = message.OutgoingPingResponse(tc.SERVER_ID)
119 pong_data = pong_msg.encode(tc.TID)
120 # rpc_m decodes the response received
121 pong_msg = message.IncomingMsg(pong_data)
122 # querier notifies of the message (but it's too late)
123 self.query.on_response_received(pong_msg)
125 "**IGNORE WARNING LOG**")
126 assert not self.got_response and not self.got_error \
129 def test_invalid_response_received(self):
130 # Response received is invalid
131 self.ping_r_in._msg_dict[message.RESPONSE] = 'zz'
132 ok_(not self.got_response)
134 "**IGNORE WARNING LOG**")
135 self.query.on_response_received(self.ping_r_in)
136 ok_(not self.got_response)
138 def test_response_contains_nodes(self):
139 # Trick query to accept find node response
140 self.query.query = message.FIND_NODE
141 ok_(not self.got_response)
142 ok_(not self.got_routing_response)
143 ok_(not self.got_routing_nodes_found)
144 self.query.on_response_received(self.fn_r_in)
145 ok_(self.got_response)
146 ok_(self.got_routing_response)
147 ok_(self.got_routing_nodes_found)
149 def test_different_node_id(self):
150 # We are expecting response from SERVER_NODE
151 # Here we test if the response contains an ID
152 # different to SERVER_ID
153 self.query.node = node.Node(tc.SERVER_ADDR,
155 ok_(not self.got_response)
156 ok_(not self.got_routing_response)
157 ok_(not self.got_routing_nodes_found)
158 self.query.on_response_received(self.fn_r_in)
159 ok_(not self.got_response)
160 ok_(not self.got_routing_response)
161 ok_(not self.got_routing_nodes_found)
169 if RUN_NETWORK_TESTS:
170 time.sleep(1) # Reduce test interdependence
171 self.got_response = False
172 self.got_timeout = False
173 self.got_error = False
174 self.found_nodes = False
176 self.got_routing_response = False
177 self.got_routing_error = False
178 self.got_routing_timeout = False
179 self.got_routing_nodes_found = False
181 self.querier_mock = querier.QuerierMock(tc.CLIENT_ID)
183 self.r = minitwisted.ThreadedReactor(task_interval=.01)
184 self.rpc_m = rpc_manager.RPCManager(self.r,
186 self.querier = querier.Querier(self.rpc_m,
188 self.querier_routing = querier.Querier(self.rpc_m,
190 self.querier_routing.set_on_response_received_callback(
191 self.on_routing_response)
192 self.querier_routing.set_on_error_received_callback(
193 self.on_routing_error)
194 self.querier_routing.set_on_timeout_callback(
195 self.on_routing_timeout)
196 self.querier_routing.set_on_nodes_found_callback(
197 self.on_routing_nodes_found)
203 def on_response(self, response_msg, node_):
204 self.got_response = True
206 def on_timeout(self, node_):
207 self.got_timeout = True
209 def on_error(self, error_msg, node_):
210 self.got_error = True
212 def on_routing_response(self, node_):
213 self.got_routing_response = True
215 def on_routing_error(self, node_):
216 self.got_routing_error = True
218 def on_routing_timeout(self, node_):
219 self.got_routing_timeout = True
221 def on_routing_nodes_found(self, node_):
222 self.got_routing_nodes_found = True
225 def test_generate_tids(self):
227 if RUN_CPU_INTENSIVE_TESTS:
228 num_tids = pow(2, 16) + 2 #CPU intensive
229 for i in xrange(num_tids):
230 eq_(self.querier._next_tid(),
231 chr(i%256)+chr((i/256)%256))
235 def send_query_and_get_response(self, querier_, later_delay=0):
236 ping_msg = message.OutgoingPingQuery(tc.CLIENT_ID)
237 msg = message.OutgoingFindNodeQuery(tc.CLIENT_ID,
240 task = querier_.send_query_later(later_delay,
247 # This second query is just to have two elements
248 # in the querier_.pending[tc.EXTERNAL_ADDR] list
249 task = querier_.send_query_later(later_delay,
257 node_ = (querier_ == self.querier_mock) and tc.SERVER_NODE
258 query = querier_.send_query(ping_msg, node_ or tc.EXTERNAL_NODE,
260 self.on_timeout, self.on_error,
261 timeout_delay=tc.TIMEOUT_DELAY)
262 # querier creates TID
264 if querier_ is self.querier_mock:
266 # the server creates the response
267 pong_msg = message.OutgoingPingResponse(tc.SERVER_ID)
268 pong_msg_data = pong_msg.encode(msg_tid)
269 # the client gets the response
270 # rpc_m decodes msg and calls callback
271 pong_msg = message.IncomingMsg(pong_msg_data)
272 querier_.on_response_received(pong_msg, tc.SERVER_ADDR)
274 ok_(not self.got_response)
275 ok_(not self.got_timeout)
276 time.sleep(later_delay*2)
277 time.sleep(tc.TIMEOUT_DELAY+.1)
278 ### It crashed (timeout_task.cancelled??????)
281 #TODO2: move the 'real' tests to integration
283 ###############################################
284 ### A DHT node must be running on peer_addr ###
285 ###############################################
286 ok_(self.got_response)
287 ok_(not self.got_timeout)
288 ###############################################
289 ###############################################
291 def send_query_and_get_error(self, querier):
294 ping_msg = message.OutgoingPingQuery()
295 query = querier.send_query(ping_msg, tc.EXTERNAL_NODE,
297 self.on_timeout, self.on_error,
298 timeout_delay=tc.TIMEOUT_DELAY)
299 if querier is self.querier_mock:
300 # the server creates the response
301 error_msg = message.OutgoingErrorMsg(ping_msg.tid,
303 error_data = error_msg.encode()
304 # rpc_m decodes the response received
305 _, _, error_msg_dict = message.decode(error_data)
306 # rpc_m notifies of the message (callback)
307 querier.on_error_received(error_msg_dict, tc.EXTERNAL_NODE)
308 time.sleep(tc.TIMEOUT_DELAY + .1)
310 ### It crashed (timeout_task.cancelled??????)
313 #TODO2: move the 'real' tests to integration
315 ###############################################
316 ### A DHT node must be running on peer_addr ###
317 ###############################################
318 ########## assert self.got_response and not self.got_timeout
319 ###############################################
320 ###############################################
324 def send_query_and_get_timeout(self, querier):
325 ping_msg = message.OutgoingPingQuery(tc.CLIENT_ID)
326 query = querier.send_query(ping_msg, tc.DEAD_NODE,
328 self.on_timeout, self.on_error,
329 timeout_delay=tc.TIMEOUT_DELAY)
330 if querier is self.querier_mock:
331 query.timeout_task.fire_callbacks()
332 time.sleep(tc.TIMEOUT_DELAY + .1)
333 assert not self.got_response and self.got_timeout
335 def test_send_query_mock(self):
336 self.send_query_and_get_response(self.querier_mock)
337 ok_(not self.got_routing_response)
338 ok_(not self.got_routing_nodes_found)
339 ok_(not self.got_routing_timeout)
341 def test_send_query(self):
342 if RUN_NETWORK_TESTS:
343 self.send_query_and_get_response(self.querier)
344 ok_(not self.got_routing_response)
345 ok_(not self.got_routing_nodes_found)
346 ok_(not self.got_routing_timeout)
348 def test_send_query_routing(self):
349 if RUN_NETWORK_TESTS:
350 self.send_query_and_get_response(self.querier_routing)
351 ok_(self.got_routing_response)
352 ok_(not self.got_routing_nodes_found)
353 ok_(not self.got_routing_timeout)
355 def test_send_query_timeout_mock(self):
356 self.send_query_and_get_timeout(self.querier_mock)
357 ok_(not self.got_routing_response)
358 ok_(not self.got_routing_nodes_found)
359 ok_(not self.got_routing_timeout)
361 def test_send_query_timeout(self):
362 self.send_query_and_get_timeout(self.querier)
363 ok_(not self.got_routing_response)
364 ok_(not self.got_routing_nodes_found)
365 ok_(not self.got_routing_timeout)
367 def test_send_query_timeout_routing(self):
368 self.send_query_and_get_timeout(self.querier_routing)
369 ok_(not self.got_routing_response)
370 ok_(not self.got_routing_nodes_found)
371 ok_(self.got_routing_timeout)
373 def test_send_query_later(self):
374 if RUN_NETWORK_TESTS:
375 self.send_query_and_get_response(self.querier_routing, .001)
376 ok_(self.got_routing_response)
377 ok_(self.got_routing_nodes_found)
378 ok_(not self.got_routing_timeout)
380 def test_unsolicited_response(self):
381 # We have a pending response from NO_ADDR (TID \0\0)
382 # but we get a response with different TID
385 ping = message.OutgoingPingQuery(tc.CLIENT_ID)
386 self.querier.send_query(ping,
392 ok_(not self.got_response)
393 ok_(not self.got_routing_response)
394 ok_(not self.got_routing_nodes_found)
396 pong = message.OutgoingPingResponse(tc.SERVER_ID)
397 pong_in = message.IncomingMsg(pong.encode(tc.TID))
399 self.querier.on_response_received(pong_in,
401 ok_(not self.got_response)
402 ok_(not self.got_routing_response)
403 ok_(not self.got_routing_nodes_found)
405 def test_error(self):
406 msg = message.OutgoingErrorMsg(message.GENERIC_E)
407 self.querier.on_error_received(msg, tc.SERVER_ADDR)
411 self.querier_mock.stop()
413 self.querier_routing.stop()