instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / Multicast / Multicast.py
1 # Written by Njaal Borch
2 # see LICENSE.txt for license information
3
4 import socket
5 import threading
6 import struct
7 import select
8 import string
9 import sys
10 import time
11 import random # for ping
12 from traceback import print_exc
13
14 import base64 # Must encode permid
15
16
17 from BaseLib.Core.BuddyCast.buddycast import BuddyCastFactory
18
19
20 DEBUG = False
21
22 class MyLogger:
23
24     """
25     Dummy logger due to code re-use and no use of logger in Tribler
26     
27     """
28     enabled = DEBUG
29     
30     def debug(self, message):
31         if self.enabled:
32             print >> sys.stderr, "pdisc: DEBUG:", message
33     
34     def info(self, message):
35         if self.enabled:
36             print >> sys.stderr, "pdisc: INFO:", message
37
38     def warning(self, message):
39         if self.enabled:
40             print >> sys.stderr, "pdisc: WARNING:", message
41
42     def error(self, message):
43         if self.enabled:
44             print >> sys.stderr, "pdisc: ERROR:", message
45
46     def fatal(self, message):
47         if self.enabled:
48             print >> sys.stderr, "pdisc: FATAL:", message
49
50     def exception(self, message):
51         if self.enabled:
52             print >> sys.stderr, "pdisc: EXCEPTION:", message
53             import traceback
54             traceback.print_exc()
55             
56 class Multicast:
57
58     """
59     This class allows nodes to communicate on a local network
60     using IP multicast
61     
62     """
63
64     def __init__(self, config, overlay_bridge, myport, myselversion, peerdb,
65                  logger=None, capabilities=None):
66         """
67         Initialize the multicast channel.  Parameters:
68           - multicast_ipv4_enabled
69           - multicast_ipv6_enabled
70           - multicast_port
71           - multicast_announce - True if the node should announce itself
72           - permid - The ID of the node
73           - multicast_ipv4_address
74           - multicast_ipv6_address
75           
76         If both ipv4_enabled and ipv6_enabled is false, the channel
77         will not do anything.
78
79         Other parameters:
80         logger - Send logs (debug/info/warning/error/exceptions) to a logger
81         capabilities - Announce a set of capabilities for this node.  Should
82                        be a list
83         
84         """
85         self.myport = myport
86         self.myselversion = myselversion
87         self.overlay_bridge = overlay_bridge
88         self.peer_db = peerdb
89         
90         if logger:
91             self.log = logger
92         else:
93             self.log = MyLogger()
94
95         self.config = config
96         self.capabilities = capabilities
97
98         self.enabled = False
99         self.announceHandlers = []
100         self.on_node_announce = None
101         self.incoming_pongs = {}
102
103         self.interfaces = []
104
105         self.address_family = socket.AF_INET
106         if self.config['multicast_ipv6_enabled']:
107             if not socket.has_ipv6:
108                 self.log.warning("Missing IPv6 support")
109             else:
110                 self.address_family = socket.AF_INET6
111         
112         self.sock = socket.socket(self.address_family,
113                                   socket.SOCK_DGRAM)
114         
115         self.sock.setsockopt(socket.SOL_SOCKET,
116                              socket.SO_REUSEADDR, 1)
117                 
118         for res in socket.getaddrinfo(None,
119                                       self.config['multicast_port'],
120                                       self.address_family,
121                                       socket.SOCK_DGRAM, 0,
122                                       socket.AI_PASSIVE):
123             
124             af, socktype, proto, canonname, sa = res
125                 
126             try:
127                 self.sock.bind(sa)
128             except:
129                 self.log.exception("Error binding")
130
131         try:
132             if self.config['multicast_ipv6_enabled']:
133                 self.interfaces = self._joinMulticast(self.config['multicast_ipv6_address'],
134                                                       self.config['multicast_port'],
135                                                       self.sock)
136                 self.enabled = True
137         except:
138             self.log.exception("Exception during IPv6 multicast join")
139
140         try:
141             if self.config['multicast_ipv4_enabled']:
142                 self._joinMulticast(self.config['multicast_ipv4_address'],
143                                     self.config['multicast_port'],
144                                     self.sock)
145                 self.enabled = True
146         except:
147             self.log.exception("Exception during IPv4 multicast join")
148
149         
150     def _getCapabilities(self, elements):
151         """
152         Return a list of capabilities from a list of elements - internal function
153         """
154         capabilities = []
155         for elem in elements:
156             if elem.startswith("c:"):
157                 capabilities.append(elem[2:])
158         return capabilities
159
160     def getSocket(self):
161         return self.sock
162
163     def _joinMulticast(self, addr, port, sock):
164         """
165         Join a multicast channel - internal function
166         """
167         import struct
168         
169         for res in socket.getaddrinfo(addr,
170                                       port,
171                                       socket.AF_UNSPEC,
172                                       socket.SOCK_DGRAM):
173             
174             af, socktype, proto, canonname, sa = res
175             
176             break
177
178         if af == socket.AF_INET6:
179             # Smurf, must manually reconstruct "::"???
180             # Count the number of colons in the address
181             num_colons = addr.count(":")
182             
183             new_colons = ":"
184             
185             # Replace double colon with the appropriate number (7)
186             for i in range(num_colons, 8):
187                 new_colons = "%s0:" % new_colons
188                 
189             addr = addr.replace("::", new_colons)
190                 
191             addr_pack = ''
192         
193             for l in addr.split(":"):
194                 word = int(l,16)
195                 addr_pack = addr_pack + struct.pack('!H', word)
196
197             # Now we try to join the first 32 interfaces
198             # Not too nice, but it is absolutely portable :-)
199             interfaces = []
200             for i in range (1, 32):
201                 try:
202                     mreq = addr_pack + struct.pack('l', i)
203                 
204                     # We're ready, at last
205                     sock.setsockopt(socket.IPPROTO_IPV6,
206                                     socket.IPV6_JOIN_GROUP,
207                                     mreq)
208                     ok = True
209                     self.log.debug("Joined IPv6 multicast on interface %d"%i)
210
211                     # We return the interface indexes that worked
212                     interfaces.append(i)
213                 except Exception,e:
214                     pass
215
216             if len(interfaces) == 0:
217                 self.log.fatal("Could not join on any interface")
218                 raise Exception("Could not join multicast on any interface")
219
220             return interfaces
221
222         if af == socket.AF_INET:
223             
224             addr_pack = ''
225             grpaddr = 0
226             bytes = map(int, string.split(addr, "."))
227             for byte in bytes:
228                 grpaddr = (grpaddr << 8) | byte
229                 
230             # Construct struct mreq from grpaddr and ifaddr
231             ifaddr = socket.INADDR_ANY
232             mreq = struct.pack('ll',
233                                socket.htonl(grpaddr),
234                                socket.htonl(ifaddr))
235             
236             # Add group membership
237             try:
238                 self.sock.setsockopt(socket.IPPROTO_IP,
239                                      socket.IP_ADD_MEMBERSHIP,
240                                      mreq)
241             except Exception,e:
242                 self.log.exception("Exception joining IPv4 multicast")
243                 
244             return []
245
246
247     def data_came_in(self, addr, data):
248         """
249         Callback function for arriving data.  This is non-blocking
250         and will return immediately after queuing the operation for
251         later processing. Called by NetworkThread
252         """
253         # Must queue this for actual processing, we're not allowed
254         # to block here
255         process_data_func = lambda:self._data_came_in_callback(addr, data)
256         self.overlay_bridge.add_task(process_data_func, 0)
257         
258         
259     def _data_came_in_callback(self, addr, data):
260         """
261         Handler function for when data arrives
262         """
263         
264         self.log.debug("Got a message from %s"%str(addr))
265         # Look at message
266         try:
267             elements = data.split("\n")
268
269             if elements[0] == "NODE_DISCOVER":
270                 if len(elements) < 3:
271                     raise Exception("Too few elements")
272
273                 # Only reply if I'm announcing
274                 if not self.config["multicast_announce"]:
275                     self.log.debug("Not announcing myself")
276                     return
277
278                 remotePermID = elements[2]
279                 self.log.debug("Got node discovery from %s"%remotePermID)
280                 # TODO: Do we reply to any node?
281
282                 # Reply with information about me
283                 permid_64 = base64.b64encode(self.config['permid']).replace("\n","")
284                 msg = "NODE_ANNOUNCE\n%s"%permid_64
285
286                 # Add capabilities
287                 if self.capabilities:
288                     for capability in self.capabilities:
289                         msg += "\nc:%s"%capability
290                 try:
291                     self.sock.sendto(msg, addr)
292                 except Exception,e:
293                     self.log.error("Could not send announce message to %s: %s"%(str(addr), e))
294                     return
295                 
296             elif elements[0] == "ANNOUNCE":
297                 self.handleAnnounce(addr, elements)
298             elif elements[0] == "NODE_ANNOUNCE":
299                 # Some node announced itself - handle callbacks if
300                 # the app wants it
301                 if self.on_node_announce:
302                     try:
303                         self.on_node_announce(elements[1], addr,
304                                               self._getCapabilities(elements))
305                     except Exception,e:
306                         self.log.exception("Exception handling node announce")
307             elif elements[0] == "PING":
308                 permid = base64.b64decode(elements[1])
309                 if permid == self.config["permid"]:
310                     # I should reply
311                     msg = "PONG\n%s\n%s"%(elements[1], elements[2])
312                     self._sendMulticast(msg)
313             elif elements[0] == "PONG":
314                 nonce = int(elements[2])
315                 if self.outstanding_pings.has_key(nonce):
316                     self.incoming_pongs[nonce] = time.time()
317             else:
318                 self.log.warning("Got bad discovery message from %s"%str(addr))
319         except Exception,e:
320             self.log.exception("Illegal message '%s' from '%s'"%(data, addr[0]))
321
322                
323     def _send(self, addr, msg):
324         """
325         Send a message - internal function
326         """
327         
328         for res in socket.getaddrinfo(addr, self.config['multicast_port'],
329                                       socket.AF_UNSPEC,
330                                       socket.SOCK_DGRAM):
331             
332             af, socktype, proto, canonname, sa = res
333         try:
334             sock =  socket.socket(af, socktype)
335             sock.sendto(msg, sa)
336         except Exception,e:
337             self.log.warning("Error sending '%s...' to %s: %s"%(msg[:8], str(sa), e))
338
339         return sock
340
341     def discoverNodes(self, timeout=3.0, requiredCapabilities=None):
342         """
343         Try to find nodes on the local network and return them in a list
344         of touples on the form
345         (permid, addr, capabilities)
346
347         Capabilities can be an empty list
348
349         if requiredCapabilities is specified, only nodes matching one
350         or more of these will be returned
351         
352         """
353
354         # Create NODE_DISCOVER message
355         msg = "NODE_DISCOVER\nTr_OVERLAYSWARM node\npermid:%s"%\
356               base64.b64encode(self.config['permid']).replace("\n","")
357
358         # First send the discovery message
359         addrList = []
360         sockList = []
361         if self.config['multicast_ipv4_enabled']:
362             sockList.append(self._send(self.config['multicast_ipv4_address'], msg))
363             
364         if self.config['multicast_ipv6_enabled']:
365             for iface in self.interfaces:
366                 sockList.append(self._send("%s%%%s"%(self.config['multicast_ipv6_address'], iface), msg))
367             
368         nodeList = []
369         endAt = time.time() + timeout
370         while time.time() < endAt:
371
372             # Wait for answers (these are unicast)
373             SelectList = sockList[:]
374
375             (InList, OutList, ErrList) = select.select(SelectList, [], [], 1.0)
376
377             if len(ErrList) < 0:
378                 self.log.warning("Select gives error...")
379
380             while len(InList) > 0:
381
382                 sock2 = InList.pop(0)
383
384                 try:
385                     (data, addr) = sock2.recvfrom(1450)
386                 except socket.error, e:
387                     self.log.warning("Exception receiving: %s"%e)
388                     continue
389                 except Exception,e:
390                     print_exc()
391                     self.log.warning("Unknown exception receiving")
392                     continue
393
394                 try:
395                     elements = data.split("\n")
396                     if len(elements) < 2:
397                         self.log.warning("Bad message from %s: %s"%(addr, data))
398                         continue
399
400                     if elements[0] != "NODE_ANNOUNCE":
401                         self.log.warning("Unknown message from %s: %s"%(addr, data))
402                         continue
403
404                     permid = base64.b64decode(elements[1])
405                     self.log.info("Discovered node %s at (%s)"%(permid, str(addr)))
406                     capabilities = self._getCapabilities(elements)
407                     if requiredCapabilities:
408                         ok = False
409                         for rc in requiredCapabilities:
410                             if rc in capabilities:
411                                 ok = True
412                                 break
413                         if not ok:
414                             continue
415                     nodeList.append((permid, addr, capabilities))
416                 except Exception,e:
417                     self.log.warning("Could not understand message: %s"%e)
418                         
419         return nodeList
420
421     def sendNodeAnnounce(self):
422
423         """
424         Send a node announcement message on multicast
425
426         """
427
428         msg = "NODE_ANNOUNCE\n%s"%\
429               base64.b64encode(self.config['permid']).replace("\n","")
430
431         if self.capabilities:
432             for capability in self.capabilities:
433                 msg += "\nc:%s"%capability
434         try:
435             self._sendMulticast(msg)
436         except:
437             self.log.error("Could not send announce message")
438
439
440     def setNodeAnnounceHandler(self, handler):
441
442         """
443         Add a handler function for multicast node announce messages
444
445         Will get a parameters (permid, address, capabilities)
446         
447         """
448         self.on_node_announce = handler
449         
450     def addAnnounceHandler(self, handler):
451
452         """
453         Add an announcement handler for announcement messages (not
454
455         node discovery)
456
457         The callback function will get parameters:
458            (permid, remote_address, parameter_list)
459
460         """
461         self.announceHandlers.append(handler)
462
463     def removeAnnouncehandler(self, handler):
464
465         """
466         Remove an announce handler (if present)
467         
468         """
469         try:
470             self.announceHandlers.remove(handler)
471         except:
472             #handler not in list, ignore
473             pass
474         
475     def handleAnnounce(self, addr, elements):
476
477         """
478         Process an announcement and call any callback handlers
479         
480         """
481
482         if elements[0] != "ANNOUNCE":
483             raise Exception("Announce handler called on non-announce: %s"%\
484                             elements[0])
485
486         # Announce should be in the form:
487         # ANNOUNCE
488         # base64 encoded permid
489         # numElements
490         # element1
491         # element2
492         # ...
493         if len(elements) < 3:
494             raise Exception("Bad announce, too few elements in message")
495
496         try:
497             permid = base64.b64decode(elements[1])
498             numElements = int(elements[2])
499         except:
500             raise Exception("Bad announce message")
501
502         if len(elements) < 3 + numElements:
503             raise Exception("Incomplete announce message")
504         
505         _list = elements[3:3+numElements]
506         
507         # Loop over list to longs if numbers
508         list = []
509         for elem in _list:
510             if elem.isdigit():
511                 list.append(long(elem))
512             else:
513                 list.append(elem)
514
515         if len(self.announceHandlers) == 0:
516             self.log.warning("Got node-announce, but I'm missing announce handlers")
517             
518         # Handle the message
519         for handler in self.announceHandlers:
520             try:
521                 self.log.debug("Calling callback handler")
522                 handler(permid, addr, list)
523             except:
524                 self.log.exception("Could not activate announce handler callback '%s'"%handler)
525         
526
527     def handleOVERLAYSWARMAnnounce(self, permid, addr, params):
528         """ Callback function to handle multicast node announcements
529
530         This one will trigger an overlay connection and then initiate a buddycast
531         exchange
532         """
533         # todo: when the port or selversion change this will NOT be
534         # updated in the database. Solution: change the whole
535         # flag_peer_as_local_to_db into check_and_update_peer_in_db
536         # and let it check for the existance and current value of
537         # is_local, port, and selversion. (at no additional queries I
538         # might add)
539
540         self.log.debug("Got Tr_OVERLAYSWARM announce!")
541         port, selversion = params
542
543         if permid == self.config["permid"]:
544             self.log.debug("Discovered myself")
545             # Discovered myself, which is not interesting
546             return
547
548         if self.flag_peer_as_local_to_db(permid, True):
549             self.log.debug("node flagged as local")
550             # Updated ok
551             return
552
553         # We could not update - this is a new node!
554         try:
555             try:
556                 self.log.debug("Adding peer at %s to database"%addr[0])
557                 self.add_peer_to_db(permid, (addr[0], port), selversion)
558             except Exception,e:
559                 print >> sys.stderr, "pdisc: Could not add node:",e
560
561             try:
562                 self.flag_peer_as_local_to_db(permid, True)
563                 self.log.debug("node flagged as local")
564             except Exception,e:
565                 print >> sys.stderr, "pdisc: Could not flag node as local:",e
566
567             # Now trigger a buddycast exchange
568             bc_core = BuddyCastFactory.getInstance().buddycast_core
569             if bc_core:
570                 self.log.debug("Triggering buddycast")
571                 bc_core.startBuddyCast(permid)
572         finally:
573                 # Also announce myself so that the remote node can see me!
574                 params = [self.myport, self.myselversion]
575                 self.log.debug("Sending announce myself")
576                 try:
577                     self.sendAnnounce(params)
578                 except:
579                     self.log.exception("Sending announcement")
580         
581     def sendAnnounce(self, list):
582
583         """
584         Send an announce on local multicast, if enabled
585         
586         """
587
588         if not self.enabled:
589             return
590
591         # Create ANNOUNCE message
592         msg = "ANNOUNCE\n%s\n%d\n"%\
593               (base64.b64encode(self.config['permid']).replace("\n",""), len(list))
594
595         for elem in list:
596             msg += "%s\n"%elem
597
598         self._sendMulticast(msg)
599
600     def _sendMulticast(self, msg):
601
602         """
603         Send a message buffer on the multicast channels
604         
605         """
606         
607         if self.config['multicast_ipv4_enabled']:
608             self._send(self.config['multicast_ipv4_address'], msg)
609         if self.config['multicast_ipv6_enabled']:
610             for iface in self.interfaces:
611                 self._send("%s%%%s"%(self.config['multicast_ipv6_address'], iface), msg)
612
613         
614
615     def ping(self, permid, numPings=3):
616         """
617         Ping a node and return (avg time, min, max) or (None, None, None) if no answer
618         Only one node can be pinged at the time - else this function will not work!
619         """
620
621         self.outstanding_pings = {}
622         self.incoming_pongs = {}
623         
624         # Send a PING via multicast and wait for a multicast response.
625         # Using multicast for both just in case it is different from
626         # unicast
627
628         for i in range(0, numPings):
629             nonce = random.randint(0, 2147483647)
630             msg = "PING\n%s\n%s"%(base64.b64encode(permid).replace("\n",""), nonce)
631             self.outstanding_pings[nonce] = time.time()
632             self._sendMulticast(msg)
633             time.sleep(0.250)
634             
635         # Now we gather the results
636         time.sleep(0.5)
637
638         if len(self.incoming_pongs) == 0:
639             return (None, None, None)
640         
641         max = 0
642         min = 2147483647
643         total = 0
644         num = 0
645         for nonce in self.outstanding_pings.keys():
646             if self.incoming_pongs.has_key(nonce):
647                 diff = self.incoming_pongs[nonce] - self.outstanding_pings[nonce]
648                 if diff > max:
649                     max = diff
650                 if diff < min:
651                     min = diff
652                 total += diff
653                 num += 1
654
655         avg = total/num
656
657         self.outstanding_pings = {}
658         self.incoming_pongs = {}
659         return (avg, min, max)
660
661     def add_peer_to_db(self,permid,dns,selversion):    
662         # todo: should is_local be set to True?
663         now = int(time.time())
664         peer_data = {'permid':permid, 'ip':dns[0], 'port':dns[1], 'oversion':selversion, 'last_seen':now, 'last_connected':now}
665         self.peer_db.addPeer(permid, peer_data, update_dns=True, update_connected=True, commit=True)
666
667     def flag_peer_as_local_to_db(self, permid, is_local):
668         """
669         Sets the is_local flag for PERMID to IS_LOCAL if and only if
670         PERMID exists in the database, in this case it returns
671         True. Otherwise it returns False.
672         """
673         peer = self.peer_db.getPeer(permid, ('is_local',))
674         
675         print >>sys.stderr,"pdisc: flag_peer_as_local returns",peer
676         
677         if not peer is None:
678             # Arno, 2010-02-09: Somehow return value is not std.
679             if isinstance(peer,list):
680                 flag = peer[0]
681             else:
682                 flag = peer
683             if not flag == is_local:
684                 self.peer_db.setPeerLocalFlag(permid, is_local)
685             return True
686         return False
687             
688         # if is_local:
689         #     pass
690             ##print >>sys.stderr,"pdisc: Flagging a peer as local"
691         # return self.peer_db.setPeerLocalFlag(permid, is_local)
692