instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / UPnP / ssdp / ssdpclient.py
1 # Written by Ingar Arntzen
2 # see LICENSE.txt for license information
3
4 """
5 This module implements the SSDP deamon of a UPnP control point.
6 """
7 import time
8 import uuid as uuid_module
9 import ssdpmessage
10 import ssdpdaemon
11
12 _LOG_TAG = "SSDPClient"
13
14 ##############################################
15 # SSDP CLIENT DAEMON
16 ##############################################
17
18 class SSDPClient(ssdpdaemon.SSDPDaemon):
19
20     """
21     This implements the SSDP deamon of UPnP control points.
22     
23     This class is implemented in a non-blocking, event-based manner.
24     Execution is outsourced to the given task_runner.
25     """
26
27     def __init__(self, task_runner, logger=None):
28
29         ssdpdaemon.SSDPDaemon.__init__(self, task_runner, logger)
30
31         # Devices
32         self._ssdp_devices = {} # uuid : SSDPDevice
33
34         # Event Handlers
35         self._add_handler = lambda uuid: None
36         self._remove_handler = lambda uuid: None
37
38     ##############################################
39     # PUBLIC API
40     ##############################################
41
42
43     def set_add_handler(self, handler):
44         """Add handler is executed whener a device is added."""
45         self._add_handler = handler
46
47     def set_remove_handler(self, handler):
48         """Remove handler is executed whener a device is removed."""
49         self._remove_handler = handler
50
51     def get_ssdp_device(self, uuid):
52         """Given a uuid, get reference to the local representation of a
53         remote SSDP root device."""
54         return self._ssdp_devices.get(uuid, None)
55
56     def search(self, target="upnp:rootdevice"):
57         """Multicast a SSDP search for root devices on the local network. """
58         msg = ssdpmessage.SearchMessage()
59         msg.init(max_delay=3, st=target)
60         self.multicast(msg)
61
62     def close(self):
63         """Close sockets and cancel tasks."""
64         ssdpdaemon.SSDPDaemon.close(self)
65         for device in self._ssdp_devices.values():
66             device.close()
67
68     ##############################################
69     # OVERRIDE HANDLERS
70     ##############################################
71
72     def startup(self):
73         """Extending Startup by adding Search."""
74         ssdpdaemon.SSDPDaemon.startup(self)
75         self.search()
76
77     def handle_search(self, msg, sock_addr):
78         """Handlers the receipt of a SSDP Search message."""
79         self.log("IGNORE %s from %s" % (msg.type, sock_addr))
80
81     def handle_reply(self, msg, sock_addr):
82         """Handles the receipt of a SSDP Reply message."""
83         self._handle_announce_or_reply(msg, sock_addr)
84
85     def handle_announce(self, msg, sock_addr):
86         """Handles the receipt of a SSDP Announce message."""
87         self._handle_announce_or_reply(msg, sock_addr)
88
89     def _handle_announce_or_reply(self, msg, sock_addr):
90         """Handles the receipt of a SSDP Announce message
91         or a SSDP Reply message."""
92         # uuid
93         tokens = msg.usn.split(":")
94         if len(tokens) != 5:
95             # use only those announce messages that has a specific
96             # structure :
97             # "uuid:<uuid>::upnp:rootdevice"
98             self.log("IGNORE %s [%s]" % (msg.type, sock_addr[0]))
99             return
100         uuid = uuid_module.UUID(tokens[1])
101         # renew
102         if self._ssdp_devices.has_key(uuid):
103             self._renew_device(uuid, msg.max_age)
104         # new
105         else:
106             # target
107             if isinstance(msg, ssdpmessage.ReplyMessage):
108                 target = msg.st
109             elif isinstance(msg, ssdpmessage.AnnounceMessage):
110                 target = msg.nt
111             ssdp_device = SSDPDevice(self.task_runner,
112                                      uuid, msg.max_age, 
113                                      msg.location, target, 
114                                      msg.osversion, 
115                                      msg.productversion)
116             self._add_device(ssdp_device)
117
118     def handle_unannounce(self, msg, sock_addr):
119         """Handles the receipt of a SSDP UnAnnounce message."""
120         # Handle UnAnnounces for root devices exclusively.
121         # usn = "uuid:73721e4e-0a84-4985-97e2-974b2c50323b"
122         tokens = msg.usn.split(":")
123         if len(tokens) != 2:
124             self.log("IGNORE %s [%s]" % (msg.type, sock_addr[0]))
125             return
126         uuid = uuid_module.UUID(tokens[1])
127         self._remove_device(uuid)
128
129
130     ##############################################
131     # PRIVATE UTILITY
132     ##############################################
133        
134     def _handle_expiry(self, uuid):
135         """A device has expired, causing it to be removed."""
136         self._remove_device(uuid)
137
138     def _add_device(self, ssdp_device):
139         """Add new SSDP root device."""
140         uuid = ssdp_device.uuid
141         self._ssdp_devices[uuid] = ssdp_device
142         ssdp_device.set_expiry_handler(self._handle_expiry)
143         self.log("ADD [%d] %s" % (ssdp_device.max_age, uuid))
144         # Publish Event ADD
145         self.task_runner.add_task(self._add_handler, 
146                                   args=(uuid,ssdp_device.location))
147
148     def _renew_device(self, uuid, max_age):
149         """Receive announce from already known device."""
150         self._ssdp_devices[uuid].alive(max_age)
151         self.log("ALIVE [%d] %s" % (max_age, uuid))
152         
153     def _remove_device(self, uuid):
154         """Remove device."""
155         if self._ssdp_devices.has_key(uuid):
156             del self._ssdp_devices[uuid]
157             self.log("REMOVE %s" % (uuid))
158             # Publish Event REMOVE            
159             self.task_runner.add_task(self._remove_handler, 
160                                       args=(uuid,))
161
162
163 ##############################################
164 # SSDP DEVICE
165 ##############################################
166
167 class SSDPDevice:
168
169     """This represents a local view of a remote SSDP root device."""
170
171     def __init__(self, task_runner, 
172                  uuid, max_age, location, search_target, 
173                  os_version, product_version):
174
175         self.uuid = uuid
176         self.location = location
177         self.search_target = search_target
178         self.os_version = os_version
179         self.product_version = product_version
180         self.max_age = max_age
181         self.expiry = None
182         self._expired = False
183
184         self._task_runner = task_runner
185         self._expiry_handler = lambda uuid: None
186         self._task = None
187         self._new_timeout(max_age)
188
189     # Private Methods
190
191     def _new_timeout(self, max_age):
192         """Register a new liveness timeout for device."""
193         # Cancel old timeout.
194         if self._task:
195             self._task.cancel()
196         # Update expire
197         self.expiry = time.time() + max_age
198         # Create new timeout
199         self._task = self._task_runner.add_delay_task(
200             max_age, self._handle_timeout)
201         
202     def _handle_timeout(self):
203         """Timeout handler."""
204         self._expired = True
205         self._expiry_handler(self.uuid)
206
207     # Public Methods
208
209     def set_expiry_handler(self, handler):
210         """Set handler to be executed whenever device has been
211         timed out without any signs of liveness."""
212         self._expiry_handler = handler
213
214     def alive(self, max_age):
215         """Invoked whenever a signal is received that 
216         suggests that the remote device is live and well."""
217         self._new_timeout(max_age)
218
219     def is_alive(self):
220         """Check if device is alive (local view)."""
221         return not self._expired
222
223     def close(self):
224         """Cancel timeout task associated with device, if any. """
225         if self._task:
226             self._task.cancel()
227         
228
229 ##############################################
230 # MAIN
231 ##############################################
232
233 if __name__ == "__main__":
234     
235
236     class TestClient:
237         """TestClient wraps SSDPClient to add some event handlers."""
238
239         def __init__(self, ssdp_client):
240             self._ssdp_client = ssdp_client
241             self._ssdp_client.set_add_handler(self.add_handler)
242             self._ssdp_client.set_remove_handler(self.remove_handler)
243
244         def add_handler(self, uuid, location):
245             """Executed when device with given uuid has been added."""
246             print "ADD %s %s" % (uuid, location)
247
248         def remove_handler(self, uuid):
249             """Executed when device with given uuid has been removed."""
250             print "REMOVE %s" % uuid
251
252     class MockLogger:
253         """Mockup Logger object."""
254         def __init__(self):
255             pass
256         def log(self, log_tag, msg):
257             """Log to std out."""
258             print log_tag, msg
259
260     import BaseLib.UPnP.common.taskrunner as taskrunner
261     TR = taskrunner.TaskRunner()
262     CLIENT = SSDPClient(TR, MockLogger())
263     TEST = TestClient(CLIENT)
264     TR.add_task(CLIENT.startup)
265     try:
266         TR.run_forever()
267     except KeyboardInterrupt:
268         print
269         CLIENT.close()