1 # Copyright (C) 2009 Raul Jimenez
2 # Released under GNU LGPL 2.1
3 # See LICENSE.txt for more information
5 from __future__ import with_statement
9 import logging, logging_conf
11 from nose.tools import eq_, ok_, assert_raises
12 import test_const as tc
14 logging_conf.testing_setup(__name__)
15 logger = logging.getLogger('dht')
18 from minitwisted import Task, TaskManager, \
19 ThreadedReactor, ThreadedReactorMock, \
20 ThreadedReactorSocketError
23 ADDRS= (tc.CLIENT_ADDR, tc.SERVER_ADDR)
27 class TestTaskManager:
29 def callback_f(self, callback_id):
30 self.callback_order.append(callback_id)
33 # Order in which callbacks have been fired
34 self.callback_order = []
35 self.task_m = TaskManager()
37 def test_simple(self):
39 self.task_m.add(Task(.01, self.callback_f, i))
41 task = self.task_m.consume_task()
45 logger.debug('%s' % self.callback_order)
46 assert self.callback_order == []
49 task = self.task_m.consume_task()
53 assert self.callback_order == range(5)
55 def test_cancel(self):
57 self.task_m.add(Task(.1, self.callback_f, i))
58 c_task = Task(.1, self.callback_f, 5)
59 self.task_m.add(c_task)
60 for i in xrange(6,10):
61 self.task_m.add(Task(.1, self.callback_f, i))
63 task = self.task_m.consume_task()
67 logger.debug('%s' % self.callback_order)
68 assert self.callback_order == []
69 ok_(not c_task.cancelled)
75 task = self.task_m.consume_task()
79 logger.debug('%s' % self.callback_order)
80 assert self.callback_order == [0,1,2,3,4, 6,7,8,9]
81 # task 5 was cancelled
83 def test_different_delay(self):
84 # NOTICE: this test might fail if your configuration
85 # (interpreter/processor) is too slow
87 task_delays = (1, 1, 1, .5, 1, 1, 2, 1, 1, 1,
88 1, 1.5, 1, 1, 1, 1, .3)
91 ['a', 16, 3, 'b'], #9 is cancelled
92 ['a', 0, 1, 2, 4, 5, 7, 8, 10, 12, 13, 15, 'c', 'b'],
96 tasks = [Task(delay, self.callback_f, i) \
97 for i, delay in enumerate(task_delays)]
101 for i, expected in enumerate(expected_list):
103 task = self.task_m.consume_task()
106 task.fire_callbacks()
107 logger.debug('#: %d, result: %s, expected: %s' % (i,
108 self.callback_order, expected))
109 assert self.callback_order == expected
110 self.callback_order = []
111 self.task_m.add(Task(0, self.callback_f, 'a'))
112 self.task_m.add(Task(.5, self.callback_f, 'b'))
113 self.task_m.add(Task(1, self.callback_f, 'c'))
115 tasks[9].cancel() # too late (already fired)
116 tasks[14].cancel() # should be cancelled
118 def _callback1(self, arg1, arg2):
119 if arg1 == 1 and arg2 == 2:
120 self.callback_order.append(1)
121 def _callback2(self, arg1, arg2):
122 if arg1 == 1 and arg2 == 2:
123 self.callback_order.append(2)
125 def test_callback_list(self):
126 self.task_m.add(Task(tc.TASK_INTERVAL/2,
127 [self._callback1, self._callback2],
129 ok_(self.task_m.consume_task() is None)
130 eq_(self.callback_order, [])
131 time.sleep(tc.TASK_INTERVAL)
132 self.task_m.consume_task().fire_callbacks()
133 eq_(self.callback_order, [1,2])
135 class TestMinitwisted:
137 def on_datagram_received(self, data, addr):
139 self.datagrams_received.append((data, addr))
141 def callback_f(self, callback_id):
143 self.callback_order.append(callback_id)
146 self.lock = threading.Lock()
147 self.datagrams_received = []
148 self.callback_order = []
149 self.client_r = ThreadedReactor(task_interval=tc.TASK_INTERVAL)
150 self.server_r = ThreadedReactor(task_interval=tc.TASK_INTERVAL)
151 self.client_r.listen_udp(tc.CLIENT_ADDR[1], self.on_datagram_received)
152 self.server_r.listen_udp(tc.SERVER_ADDR[1], self.on_datagram_received)
153 self.client_r.start()
154 self.server_r.start()
156 def test_listen_upd(self):
157 r = ThreadedReactor()
159 logger.warning(''.join(
160 ('TESTING LOGS ** IGNORE EXPECTED WARNING ** ',
161 '(udp_listen has not been called)')))
162 self.client_r.sendto(DATA, tc.SERVER_ADDR)
163 while 1: #waiting for data
165 if self.datagrams_received:
167 time.sleep(tc.TASK_INTERVAL)
169 first_datagram = self.datagrams_received.pop(0)
170 logger.debug('first_datagram: %s, %s' % (
172 (DATA, tc.CLIENT_ADDR)))
173 assert first_datagram, (DATA, tc.CLIENT_ADDR)
176 def test_network_callback(self):
177 self.client_r.sendto(DATA, tc.SERVER_ADDR)
178 time.sleep(tc.TASK_INTERVAL)
180 first_datagram = self.datagrams_received.pop(0)
181 logger.debug('first_datagram: %s, %s' % (
183 (DATA, tc.CLIENT_ADDR)))
184 assert first_datagram, (DATA, tc.CLIENT_ADDR)
186 def test_block_flood(self):
187 from floodbarrier import MAX_PACKETS_PER_PERIOD as FLOOD_LIMIT
188 for _ in xrange(FLOOD_LIMIT):
189 self.client_r.sendto(DATA, tc.SERVER_ADDR)
191 self.client_r.sendto(DATA, tc.SERVER_ADDR)
193 "TESTING LOGS ** IGNORE EXPECTED WARNING **")
194 time.sleep(tc.TASK_INTERVAL)
196 logger.debug('datagram processed: %d/%d' % (
197 len(self.datagrams_received),
199 assert len(self.datagrams_received) <= FLOOD_LIMIT
201 def test_call_later(self):
202 self.client_r.call_later(.13, self.callback_f, 1)
203 self.client_r.call_later(.11, self.callback_f, 2)
204 self.client_r.call_later(.01, self.callback_f, 3)
205 task4 = self.client_r.call_later(.01, self.callback_f, 4)
209 logger.debug('callback_order: %s' % self.callback_order)
210 assert self.callback_order == [3]
211 self.callback_order = []
212 self.client_r.call_now(self.callback_f, 5)
215 logger.debug('callback_order: %s' % self.callback_order)
216 assert self.callback_order == [5]
217 self.callback_order = []
218 task6 = self.client_r.call_later(.03, self.callback_f, 6)
222 logger.debug('callback_order: %s' % self.callback_order)
223 assert self.callback_order == [2, 1]
225 def test_network_and_delayed(self):
226 self.client_r.call_later(.2, self.callback_f, 0)
227 self.client_r.call_now(self.callback_f, 1)
228 task2 = self.client_r.call_later(.2, self.callback_f, 2)
230 eq_(self.callback_order, [])
234 logger.debug('callback_order: %s' % self.callback_order)
235 assert self.callback_order == [1]
236 self.callback_order = []
237 assert not self.datagrams_received
238 self.server_r.sendto(DATA, tc.CLIENT_ADDR)
239 time.sleep(.02) # wait for network interruption
241 logger.debug('callback_order: %s' % self.callback_order)
242 assert self.callback_order == []
243 logger.debug('callback_order: %s' % self.callback_order)
244 assert self.datagrams_received.pop(0) == (DATA, tc.SERVER_ADDR)
245 task2.cancel() #inside critical region??
246 time.sleep(.1) # wait for task 0 (task 2 should be cancelled)
248 assert self.callback_order == [0]
249 assert not self.datagrams_received
251 def test_sendto_socket_error(self):
252 logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
253 self.client_r.sendto('z', (tc.NO_ADDR[0], 0))
259 class TestSocketErrors:
261 def _callback(self, *args, **kwargs):
262 self.callback_fired = True
265 self.callback_fired = False
266 self.r = ThreadedReactorSocketError()
267 self.r.listen_udp(tc.CLIENT_ADDR[1], lambda x,y:None)
269 def test_sendto(self):
270 logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
271 self.r.sendto('z', tc.NO_ADDR)
273 def test_recvfrom(self):
275 r2 = ThreadedReactor()
276 r2.listen_udp(tc.SERVER_ADDR[1], lambda x,y:None)
277 logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
278 r2.sendto('z', tc.CLIENT_ADDR)
279 # self.r will call recvfrom (which raises socket.error)
280 time.sleep(tc.TASK_INTERVAL)
281 ok_(not self.callback_fired)
284 def test_sendto_too_large_data_string(self):
285 logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
286 self.r.sendto('z'*12345, tc.NO_ADDR)
291 class TestMockThreadedReactor:
296 def _callback(self, *args):
299 def test_mock_threaded_reactor(self):
301 Just making sure that the interface is the same
304 r = ThreadedReactor(task_interval=.1)
305 rm = ThreadedReactorMock(task_interval=.1)
307 r.listen_udp(tc.CLIENT_ADDR[1], lambda x,y:None)
308 rm.listen_udp(tc.CLIENT_ADDR[1], lambda x,y:None)
313 r.sendto(DATA, tc.CLIENT_ADDR)
314 rm.sendto(DATA, tc.CLIENT_ADDR)
316 r.call_later(.1, self._callback)
317 rm.call_later(.1, self._callback)