instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / DecentralizedTracking / kadtracker / test_minitwisted.py
1 # Copyright (C) 2009 Raul Jimenez
2 # Released under GNU LGPL 2.1
3 # See LICENSE.txt for more information
4
5 from __future__ import with_statement
6 import threading
7 import time
8
9 import logging, logging_conf
10
11 from nose.tools import eq_, ok_, assert_raises
12 import test_const as tc
13
14 logging_conf.testing_setup(__name__)
15 logger = logging.getLogger('dht')
16
17
18 from minitwisted import Task, TaskManager, \
19      ThreadedReactor, ThreadedReactorMock, \
20      ThreadedReactorSocketError
21
22
23 ADDRS= (tc.CLIENT_ADDR, tc.SERVER_ADDR)
24 DATA = 'testing...'
25
26
27 class TestTaskManager:
28     
29     def callback_f(self, callback_id):
30         self.callback_order.append(callback_id)
31         
32     def setup(self):
33         # Order in which callbacks have been fired
34         self.callback_order = []
35         self.task_m = TaskManager()
36
37     def test_simple(self):
38         for i in xrange(5):
39             self.task_m.add(Task(.01, self.callback_f, i))
40         while True:
41             task = self.task_m.consume_task()
42             if task is None:
43                 break
44             task.fire_callback()
45         logger.debug('%s' % self.callback_order)
46         assert self.callback_order == []
47         time.sleep(.01)
48         while True:
49             task = self.task_m.consume_task()
50             if task is None:
51                 break
52             task.fire_callbacks() 
53         assert self.callback_order == range(5)
54
55     def test_cancel(self):
56         for i in xrange(5):
57             self.task_m.add(Task(.1, self.callback_f, i))
58         c_task = Task(.1, self.callback_f, 5)
59         self.task_m.add(c_task)
60         for i in xrange(6,10):
61             self.task_m.add(Task(.1, self.callback_f, i))
62         while True:
63             task = self.task_m.consume_task()
64             if task is None:
65                 break
66             task.fire_callback()
67         logger.debug('%s' % self.callback_order)
68         assert self.callback_order == []
69         ok_(not c_task.cancelled)
70         c_task.cancel()
71         ok_(c_task.cancelled)
72         
73         time.sleep(.1)
74         while True:
75             task = self.task_m.consume_task()
76             if task is None:
77                 break
78             task.fire_callbacks()
79         logger.debug('%s' % self.callback_order)
80         assert self.callback_order == [0,1,2,3,4,  6,7,8,9]
81         # task 5 was cancelled        
82
83     def test_different_delay(self):
84 #         NOTICE: this test might fail if your configuration
85 #         (interpreter/processor) is too slow
86         
87         task_delays = (1, 1, 1, .5, 1, 1, 2, 1, 1, 1,
88                        1, 1.5, 1, 1, 1, 1, .3)
89                        
90         expected_list = ([],
91                          ['a', 16, 3, 'b'], #9 is cancelled
92                          ['a', 0, 1, 2, 4, 5, 7, 8, 10, 12, 13, 15, 'c', 'b'],
93                          ['a', 11, 'c', 'b'],
94                          ['a', 6, 'c', 'b'],
95             )
96         tasks = [Task(delay, self.callback_f, i) \
97                  for i, delay in enumerate(task_delays)]
98         for task in tasks:
99             self.task_m.add(task)
100
101         for i, expected in enumerate(expected_list):
102             while True:
103                 task = self.task_m.consume_task()
104                 if task is None:
105                     break
106                 task.fire_callbacks()
107             logger.debug('#: %d, result: %s, expected: %s' % (i,
108                                               self.callback_order, expected))
109             assert self.callback_order == expected
110             self.callback_order = []
111             self.task_m.add(Task(0, self.callback_f, 'a'))
112             self.task_m.add(Task(.5, self.callback_f, 'b'))
113             self.task_m.add(Task(1, self.callback_f, 'c'))
114             time.sleep(.5)
115             tasks[9].cancel() # too late (already fired) 
116             tasks[14].cancel() # should be cancelled
117
118     def _callback1(self, arg1, arg2):
119         if arg1 == 1 and arg2 == 2:
120             self.callback_order.append(1)
121     def _callback2(self, arg1, arg2):
122         if arg1 == 1 and arg2 == 2:
123             self.callback_order.append(2)
124     
125     def test_callback_list(self):
126         self.task_m.add(Task(tc.TASK_INTERVAL/2,
127                               [self._callback1, self._callback2],
128                               1, 2))
129         ok_(self.task_m.consume_task() is None)
130         eq_(self.callback_order, [])
131         time.sleep(tc.TASK_INTERVAL)
132         self.task_m.consume_task().fire_callbacks()
133         eq_(self.callback_order, [1,2])
134
135 class TestMinitwisted:
136
137     def on_datagram_received(self, data, addr):
138         with self.lock:
139             self.datagrams_received.append((data, addr))
140
141     def callback_f(self, callback_id):
142         with self.lock:
143             self.callback_order.append(callback_id)
144             
145     def setup(self):
146         self.lock = threading.Lock()
147         self.datagrams_received = []
148         self.callback_order = []
149         self.client_r = ThreadedReactor(task_interval=tc.TASK_INTERVAL)
150         self.server_r = ThreadedReactor(task_interval=tc.TASK_INTERVAL)
151         self.client_r.listen_udp(tc.CLIENT_ADDR[1], self.on_datagram_received)
152         self.server_r.listen_udp(tc.SERVER_ADDR[1], self.on_datagram_received)
153         self.client_r.start()
154         self.server_r.start()
155
156     def test_listen_upd(self):
157         r = ThreadedReactor()
158         r.start()
159         logger.warning(''.join(
160             ('TESTING LOGS ** IGNORE EXPECTED WARNING ** ',
161              '(udp_listen has not been called)')))
162         self.client_r.sendto(DATA, tc.SERVER_ADDR)
163         while 1: #waiting for data
164             with self.lock:
165                 if self.datagrams_received:
166                     break
167             time.sleep(tc.TASK_INTERVAL)
168         with self.lock:
169             first_datagram = self.datagrams_received.pop(0)
170             logger.debug('first_datagram: %s, %s' % (
171                     first_datagram,
172                     (DATA, tc.CLIENT_ADDR)))
173             assert first_datagram, (DATA, tc.CLIENT_ADDR)
174         r.stop()
175             
176     def test_network_callback(self):
177         self.client_r.sendto(DATA, tc.SERVER_ADDR)
178         time.sleep(tc.TASK_INTERVAL)
179         with self.lock:
180             first_datagram = self.datagrams_received.pop(0)
181             logger.debug('first_datagram: %s, %s' % (
182                     first_datagram,
183                     (DATA, tc.CLIENT_ADDR)))
184             assert first_datagram, (DATA, tc.CLIENT_ADDR)
185
186     def test_block_flood(self):
187         from floodbarrier import MAX_PACKETS_PER_PERIOD as FLOOD_LIMIT
188         for _ in xrange(FLOOD_LIMIT):
189             self.client_r.sendto(DATA, tc.SERVER_ADDR)
190         for _ in xrange(10):
191             self.client_r.sendto(DATA, tc.SERVER_ADDR)
192             logger.warning(
193                 "TESTING LOGS ** IGNORE EXPECTED WARNING **")
194         time.sleep(tc.TASK_INTERVAL)
195         with self.lock:
196             logger.debug('datagram processed: %d/%d' % (
197                               len(self.datagrams_received),
198                               FLOOD_LIMIT))
199             assert len(self.datagrams_received) <= FLOOD_LIMIT
200
201     def test_call_later(self):
202         self.client_r.call_later(.13, self.callback_f, 1)
203         self.client_r.call_later(.11, self.callback_f, 2)
204         self.client_r.call_later(.01, self.callback_f, 3)
205         task4 = self.client_r.call_later(.01, self.callback_f, 4)
206         task4.cancel()
207         time.sleep(.03)
208         with self.lock:
209             logger.debug('callback_order: %s' % self.callback_order)
210             assert self.callback_order == [3]
211             self.callback_order = []
212         self.client_r.call_now(self.callback_f, 5)
213         time.sleep(.03)
214         with self.lock:
215             logger.debug('callback_order: %s' % self.callback_order)
216             assert self.callback_order == [5]
217             self.callback_order = []
218         task6 = self.client_r.call_later(.03, self.callback_f, 6)
219         task6.cancel()
220         time.sleep(.1)
221         with self.lock:
222             logger.debug('callback_order: %s' % self.callback_order)
223             assert self.callback_order == [2, 1]
224
225     def test_network_and_delayed(self):
226         self.client_r.call_later(.2, self.callback_f, 0)
227         self.client_r.call_now(self.callback_f, 1)
228         task2 = self.client_r.call_later(.2, self.callback_f, 2)
229         with self.lock:
230             eq_(self.callback_order, [])
231         time.sleep(.1)
232
233         with self.lock:
234             logger.debug('callback_order: %s' % self.callback_order)
235             assert self.callback_order == [1]
236             self.callback_order = []
237             assert not self.datagrams_received
238         self.server_r.sendto(DATA, tc.CLIENT_ADDR)
239         time.sleep(.02) # wait for network interruption
240         with self.lock:
241             logger.debug('callback_order: %s' % self.callback_order)
242             assert self.callback_order == []
243             logger.debug('callback_order: %s' % self.callback_order)
244             assert self.datagrams_received.pop(0) == (DATA, tc.SERVER_ADDR)
245             task2.cancel() #inside critical region??
246         time.sleep(.1) # wait for task 0 (task 2 should be cancelled)
247         with self.lock:
248             assert self.callback_order == [0]
249             assert not self.datagrams_received
250
251     def test_sendto_socket_error(self): 
252         logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
253         self.client_r.sendto('z', (tc.NO_ADDR[0], 0))
254
255     def teardown(self):
256         self.client_r.stop()
257         self.server_r.stop()
258
259 class TestSocketErrors:
260
261     def _callback(self, *args, **kwargs):
262         self.callback_fired = True
263     
264     def setup(self):
265         self.callback_fired = False
266         self.r = ThreadedReactorSocketError()
267         self.r.listen_udp(tc.CLIENT_ADDR[1], lambda x,y:None)
268
269     def test_sendto(self):
270         logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
271         self.r.sendto('z', tc.NO_ADDR)
272
273     def test_recvfrom(self):
274         self.r.start()
275         r2 = ThreadedReactor()
276         r2.listen_udp(tc.SERVER_ADDR[1], lambda x,y:None)
277         logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
278         r2.sendto('z', tc.CLIENT_ADDR)
279         # self.r will call recvfrom (which raises socket.error)
280         time.sleep(tc.TASK_INTERVAL)
281         ok_(not self.callback_fired)
282         self.r.stop()
283
284     def test_sendto_too_large_data_string(self):
285         logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
286         self.r.sendto('z'*12345, tc.NO_ADDR)
287             
288
289
290         
291 class TestMockThreadedReactor:
292
293     def setup(self):
294         pass
295
296     def _callback(self, *args):
297         pass
298
299     def test_mock_threaded_reactor(self):
300         '''
301         Just making sure that the interface is the same
302
303         '''
304         r = ThreadedReactor(task_interval=.1)
305         rm = ThreadedReactorMock(task_interval=.1)
306
307         r.listen_udp(tc.CLIENT_ADDR[1], lambda x,y:None)
308         rm.listen_udp(tc.CLIENT_ADDR[1], lambda x,y:None)
309
310         r.start()
311         rm.start()
312
313         r.sendto(DATA, tc.CLIENT_ADDR)
314         rm.sendto(DATA, tc.CLIENT_ADDR)
315         
316         r.call_later(.1, self._callback)
317         rm.call_later(.1, self._callback)
318 #        time.sleep(.002)
319         r.stop()
320         rm.stop()