instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Test / test_status.py
1 # Written by Njaal Borch
2 # see LICENSE.txt for license information
3 import unittest
4 import threading
5
6 import time
7
8 from BaseLib.Core.Statistics.Status import Status
9 from BaseLib.Core.Statistics.Status import LivingLabReporter
10
11 class TestOnChangeStatusReporter(Status.OnChangeStatusReporter):
12     
13     name = None
14     value = None
15
16     def report(self, element):
17         self.name = element.name
18         self.value = element.value
19
20 class TestPeriodicStatusReporter(Status.PeriodicStatusReporter):
21     last_value = None
22
23     def report(self):
24         elements = self.get_elements()
25         # Actually report
26         assert len(elements) == 1
27         self.last_value = elements[0].get_value()
28
29 class StatusTest(unittest.TestCase):
30     """
31     Unit tests for the Status class
32
33     
34     """
35     
36     def setUp(self):
37         pass
38     def tearDown(self):
39         pass
40     
41     def testBasic(self):
42
43         status = Status.get_status_holder("UnitTest")
44         status.reset()
45         
46         self.assertNotEqual(status, None)
47
48         self.assertEquals(status.get_name(), "UnitTest")
49         
50     def testInt(self):
51         
52         status = Status.get_status_holder("UnitTest")
53         status.reset()
54         self.assertNotEqual(status, None)
55
56         i = status.create_status_element("TestInteger")
57         self.assertEquals(i.get_name(), "TestInteger")
58
59         x = status.get_status_element("TestInteger")
60         self.assertEquals(x, i)
61
62         # Test set and get values
63         for j in range(0,10):
64             i.set_value(j)
65             self.assertEquals(i.get_value(), j)
66
67         # Clean up
68         status.remove_status_element(i)
69         try:
70             status.get_status_element("TestInteger")
71             self.fail("Remove does not remove status element 'TestInteger'")
72         except Status.NoSuchElementException, e:
73             # Expected
74             pass
75
76     def testInvalid(self):
77         status = Status.get_status_holder("UnitTest")
78         status.reset()
79
80         try:
81             i = status.create_status_element(None)
82             self.fail("Does not throw exception with no name")
83         except AssertionError, e:
84             pass
85
86         try:
87             status.get_status_element(None)
88             self.fail("Invalid get_status_element does not throw exception")
89         except AssertionError,e:
90             pass
91
92         try:
93             status.remove_status_element(None)
94             self.fail("Invalid remove_status_element does not throw exception")
95         except AssertionError,e:
96             pass
97
98         elem = Status.StatusElement("name", "description")
99         try:
100             status.remove_status_element(elem)
101             self.fail("Invalid remove_status_element does not throw exception")
102         except Status.NoSuchElementException,e:
103             pass
104             
105         
106     def testPolicy_ON_CHANGE(self):
107
108         status = Status.get_status_holder("UnitTest")
109         status.reset()
110         reporter = TestOnChangeStatusReporter("On change")
111         status.add_reporter(reporter)
112         i = status.create_status_element("TestInteger")
113
114         for x in range(0, 10):
115             i.set_value(x)
116             if x != reporter.value:
117                 self.fail("Callback does not work for ON_CHANGE policy")
118             if reporter.name != "TestInteger":
119                 self.fail("On_Change callback get's the wrong parameter, got '%s', expected 'TestInteger'"%reporter.name)
120
121         # Clean up
122         status.remove_status_element(i)
123         
124
125     def testPolicy_PERIODIC(self):
126
127         status = Status.get_status_holder("UnitTest")
128         status.reset()
129
130         reporter = TestPeriodicStatusReporter("Periodic, 0.4sec", 0.4)
131         status.add_reporter(reporter)
132         i = status.create_status_element("TestInteger")
133
134         for x in range(0, 5):
135             i.set_value(x)
136             self.assertEquals(reporter.last_value, None) # Not updated yet
137             
138         time.sleep(1)
139         
140         assert reporter.last_value == 4
141
142         for x in range(5, 9):
143             self.assertEquals(reporter.last_value, 4) # Not updated yet
144             i.set_value(x)
145         time.sleep(1)
146
147         self.assertEquals(reporter.last_value, 8)
148
149         # Clean up
150         status.remove_status_element(i)
151
152         reporter.stop()
153
154     def test_LLReporter_element(self):
155
156         status = Status.get_status_holder("UnitTest")
157         status.reset()
158         reporter = TestLivingLabPeriodicReporter("Living lab test reporter", 1.0)
159         status.add_reporter(reporter)
160         i = status.create_status_element("TestInteger")
161         i.set_value(1233)
162
163         b = status.create_status_element("Binary")
164         b.set_value("".join([chr(n) for n in range(0, 255)]))
165         
166         reporter.wait_for_post(5.0)
167
168         reporter.stop()
169         time.sleep(1)
170         
171         self.assertEquals(len(reporter.get_errors()), 0)
172
173         status.remove_status_element(i)
174         status.remove_status_element(b)
175
176     def test_LLReporter_event(self):
177
178         status = Status.get_status_holder("UnitTest")
179         status.reset()
180         reporter = TestLivingLabPeriodicReporter("Living lab test reporter", 1.0)
181         status.add_reporter(reporter)
182         event = status.create_event("SomeEvent")
183         event.add_value("123")
184         event.add_value("456")
185         status.add_event(event)
186         
187         reporter.wait_for_post(5.0)
188
189         reporter.stop()
190         time.sleep(1)
191         
192         self.assertEquals(len(reporter.get_errors()), 0)
193
194         status.remove_event(event)
195
196 class TestLivingLabPeriodicReporter(LivingLabReporter.LivingLabPeriodicReporter):
197
198     def __init__(self, name, report_time):
199         self.errors = []
200         LivingLabReporter.LivingLabPeriodicReporter.__init__(self, name, report_time, "myid", self.count_errors)
201         self.xml = None
202         self.cond = threading.Condition()
203
204     def wait_for_post(self, timeout):
205         self.cond.acquire()
206         try:
207             if self.xml:
208                 return True
209         
210             self.cond.wait(timeout)
211             if self.xml:
212                 return True
213             raise Exception("Timeout")
214         finally:
215             self.cond.release()
216             
217
218     def post(self, xml):
219         # TODO: Check the XML?
220         print xml
221         self.xml = xml
222         self.cond.acquire()
223         self.cond.notifyAll()
224         self.cond.release()
225
226     def count_errors(self, zero, error):
227         print "ERROR",error
228         self.errors.append(error)
229
230     def get_errors(self):
231         return self.errors
232
233 if __name__ == "__main__":
234
235     print "Testing Status module"
236     
237     unittest.main()
238     
239     print "All done"