1 # Written by Njaal Borch
2 # see LICENSE.txt for license information
11 import random # for ping
12 from traceback import print_exc
14 import base64 # Must encode permid
17 from BaseLib.Core.BuddyCast.buddycast import BuddyCastFactory
25 Dummy logger due to code re-use and no use of logger in Tribler
30 def debug(self, message):
32 print >> sys.stderr, "pdisc: DEBUG:", message
34 def info(self, message):
36 print >> sys.stderr, "pdisc: INFO:", message
38 def warning(self, message):
40 print >> sys.stderr, "pdisc: WARNING:", message
42 def error(self, message):
44 print >> sys.stderr, "pdisc: ERROR:", message
46 def fatal(self, message):
48 print >> sys.stderr, "pdisc: FATAL:", message
50 def exception(self, message):
52 print >> sys.stderr, "pdisc: EXCEPTION:", message
59 This class allows nodes to communicate on a local network
64 def __init__(self, config, overlay_bridge, myport, myselversion, peerdb,
65 logger=None, capabilities=None):
67 Initialize the multicast channel. Parameters:
68 - multicast_ipv4_enabled
69 - multicast_ipv6_enabled
71 - multicast_announce - True if the node should announce itself
72 - permid - The ID of the node
73 - multicast_ipv4_address
74 - multicast_ipv6_address
76 If both ipv4_enabled and ipv6_enabled is false, the channel
80 logger - Send logs (debug/info/warning/error/exceptions) to a logger
81 capabilities - Announce a set of capabilities for this node. Should
86 self.myselversion = myselversion
87 self.overlay_bridge = overlay_bridge
96 self.capabilities = capabilities
99 self.announceHandlers = []
100 self.on_node_announce = None
101 self.incoming_pongs = {}
105 self.address_family = socket.AF_INET
106 if self.config['multicast_ipv6_enabled']:
107 if not socket.has_ipv6:
108 self.log.warning("Missing IPv6 support")
110 self.address_family = socket.AF_INET6
112 self.sock = socket.socket(self.address_family,
115 self.sock.setsockopt(socket.SOL_SOCKET,
116 socket.SO_REUSEADDR, 1)
118 for res in socket.getaddrinfo(None,
119 self.config['multicast_port'],
121 socket.SOCK_DGRAM, 0,
124 af, socktype, proto, canonname, sa = res
129 self.log.exception("Error binding")
132 if self.config['multicast_ipv6_enabled']:
133 self.interfaces = self._joinMulticast(self.config['multicast_ipv6_address'],
134 self.config['multicast_port'],
138 self.log.exception("Exception during IPv6 multicast join")
141 if self.config['multicast_ipv4_enabled']:
142 self._joinMulticast(self.config['multicast_ipv4_address'],
143 self.config['multicast_port'],
147 self.log.exception("Exception during IPv4 multicast join")
150 def _getCapabilities(self, elements):
152 Return a list of capabilities from a list of elements - internal function
155 for elem in elements:
156 if elem.startswith("c:"):
157 capabilities.append(elem[2:])
163 def _joinMulticast(self, addr, port, sock):
165 Join a multicast channel - internal function
169 for res in socket.getaddrinfo(addr,
174 af, socktype, proto, canonname, sa = res
178 if af == socket.AF_INET6:
179 # Smurf, must manually reconstruct "::"???
180 # Count the number of colons in the address
181 num_colons = addr.count(":")
185 # Replace double colon with the appropriate number (7)
186 for i in range(num_colons, 8):
187 new_colons = "%s0:" % new_colons
189 addr = addr.replace("::", new_colons)
193 for l in addr.split(":"):
195 addr_pack = addr_pack + struct.pack('!H', word)
197 # Now we try to join the first 32 interfaces
198 # Not too nice, but it is absolutely portable :-)
200 for i in range (1, 32):
202 mreq = addr_pack + struct.pack('l', i)
204 # We're ready, at last
205 sock.setsockopt(socket.IPPROTO_IPV6,
206 socket.IPV6_JOIN_GROUP,
209 self.log.debug("Joined IPv6 multicast on interface %d"%i)
211 # We return the interface indexes that worked
216 if len(interfaces) == 0:
217 self.log.fatal("Could not join on any interface")
218 raise Exception("Could not join multicast on any interface")
222 if af == socket.AF_INET:
226 bytes = map(int, string.split(addr, "."))
228 grpaddr = (grpaddr << 8) | byte
230 # Construct struct mreq from grpaddr and ifaddr
231 ifaddr = socket.INADDR_ANY
232 mreq = struct.pack('ll',
233 socket.htonl(grpaddr),
234 socket.htonl(ifaddr))
236 # Add group membership
238 self.sock.setsockopt(socket.IPPROTO_IP,
239 socket.IP_ADD_MEMBERSHIP,
242 self.log.exception("Exception joining IPv4 multicast")
247 def data_came_in(self, addr, data):
249 Callback function for arriving data. This is non-blocking
250 and will return immediately after queuing the operation for
251 later processing. Called by NetworkThread
253 # Must queue this for actual processing, we're not allowed
255 process_data_func = lambda:self._data_came_in_callback(addr, data)
256 self.overlay_bridge.add_task(process_data_func, 0)
259 def _data_came_in_callback(self, addr, data):
261 Handler function for when data arrives
264 self.log.debug("Got a message from %s"%str(addr))
267 elements = data.split("\n")
269 if elements[0] == "NODE_DISCOVER":
270 if len(elements) < 3:
271 raise Exception("Too few elements")
273 # Only reply if I'm announcing
274 if not self.config["multicast_announce"]:
275 self.log.debug("Not announcing myself")
278 remotePermID = elements[2]
279 self.log.debug("Got node discovery from %s"%remotePermID)
280 # TODO: Do we reply to any node?
282 # Reply with information about me
283 permid_64 = base64.b64encode(self.config['permid']).replace("\n","")
284 msg = "NODE_ANNOUNCE\n%s"%permid_64
287 if self.capabilities:
288 for capability in self.capabilities:
289 msg += "\nc:%s"%capability
291 self.sock.sendto(msg, addr)
293 self.log.error("Could not send announce message to %s: %s"%(str(addr), e))
296 elif elements[0] == "ANNOUNCE":
297 self.handleAnnounce(addr, elements)
298 elif elements[0] == "NODE_ANNOUNCE":
299 # Some node announced itself - handle callbacks if
301 if self.on_node_announce:
303 self.on_node_announce(elements[1], addr,
304 self._getCapabilities(elements))
306 self.log.exception("Exception handling node announce")
307 elif elements[0] == "PING":
308 permid = base64.b64decode(elements[1])
309 if permid == self.config["permid"]:
311 msg = "PONG\n%s\n%s"%(elements[1], elements[2])
312 self._sendMulticast(msg)
313 elif elements[0] == "PONG":
314 nonce = int(elements[2])
315 if self.outstanding_pings.has_key(nonce):
316 self.incoming_pongs[nonce] = time.time()
318 self.log.warning("Got bad discovery message from %s"%str(addr))
320 self.log.exception("Illegal message '%s' from '%s'"%(data, addr[0]))
323 def _send(self, addr, msg):
325 Send a message - internal function
328 for res in socket.getaddrinfo(addr, self.config['multicast_port'],
332 af, socktype, proto, canonname, sa = res
334 sock = socket.socket(af, socktype)
337 self.log.warning("Error sending '%s...' to %s: %s"%(msg[:8], str(sa), e))
341 def discoverNodes(self, timeout=3.0, requiredCapabilities=None):
343 Try to find nodes on the local network and return them in a list
344 of touples on the form
345 (permid, addr, capabilities)
347 Capabilities can be an empty list
349 if requiredCapabilities is specified, only nodes matching one
350 or more of these will be returned
354 # Create NODE_DISCOVER message
355 msg = "NODE_DISCOVER\nTr_OVERLAYSWARM node\npermid:%s"%\
356 base64.b64encode(self.config['permid']).replace("\n","")
358 # First send the discovery message
361 if self.config['multicast_ipv4_enabled']:
362 sockList.append(self._send(self.config['multicast_ipv4_address'], msg))
364 if self.config['multicast_ipv6_enabled']:
365 for iface in self.interfaces:
366 sockList.append(self._send("%s%%%s"%(self.config['multicast_ipv6_address'], iface), msg))
369 endAt = time.time() + timeout
370 while time.time() < endAt:
372 # Wait for answers (these are unicast)
373 SelectList = sockList[:]
375 (InList, OutList, ErrList) = select.select(SelectList, [], [], 1.0)
378 self.log.warning("Select gives error...")
380 while len(InList) > 0:
382 sock2 = InList.pop(0)
385 (data, addr) = sock2.recvfrom(1450)
386 except socket.error, e:
387 self.log.warning("Exception receiving: %s"%e)
391 self.log.warning("Unknown exception receiving")
395 elements = data.split("\n")
396 if len(elements) < 2:
397 self.log.warning("Bad message from %s: %s"%(addr, data))
400 if elements[0] != "NODE_ANNOUNCE":
401 self.log.warning("Unknown message from %s: %s"%(addr, data))
404 permid = base64.b64decode(elements[1])
405 self.log.info("Discovered node %s at (%s)"%(permid, str(addr)))
406 capabilities = self._getCapabilities(elements)
407 if requiredCapabilities:
409 for rc in requiredCapabilities:
410 if rc in capabilities:
415 nodeList.append((permid, addr, capabilities))
417 self.log.warning("Could not understand message: %s"%e)
421 def sendNodeAnnounce(self):
424 Send a node announcement message on multicast
428 msg = "NODE_ANNOUNCE\n%s"%\
429 base64.b64encode(self.config['permid']).replace("\n","")
431 if self.capabilities:
432 for capability in self.capabilities:
433 msg += "\nc:%s"%capability
435 self._sendMulticast(msg)
437 self.log.error("Could not send announce message")
440 def setNodeAnnounceHandler(self, handler):
443 Add a handler function for multicast node announce messages
445 Will get a parameters (permid, address, capabilities)
448 self.on_node_announce = handler
450 def addAnnounceHandler(self, handler):
453 Add an announcement handler for announcement messages (not
457 The callback function will get parameters:
458 (permid, remote_address, parameter_list)
461 self.announceHandlers.append(handler)
463 def removeAnnouncehandler(self, handler):
466 Remove an announce handler (if present)
470 self.announceHandlers.remove(handler)
472 #handler not in list, ignore
475 def handleAnnounce(self, addr, elements):
478 Process an announcement and call any callback handlers
482 if elements[0] != "ANNOUNCE":
483 raise Exception("Announce handler called on non-announce: %s"%\
486 # Announce should be in the form:
488 # base64 encoded permid
493 if len(elements) < 3:
494 raise Exception("Bad announce, too few elements in message")
497 permid = base64.b64decode(elements[1])
498 numElements = int(elements[2])
500 raise Exception("Bad announce message")
502 if len(elements) < 3 + numElements:
503 raise Exception("Incomplete announce message")
505 _list = elements[3:3+numElements]
507 # Loop over list to longs if numbers
511 list.append(long(elem))
515 if len(self.announceHandlers) == 0:
516 self.log.warning("Got node-announce, but I'm missing announce handlers")
519 for handler in self.announceHandlers:
521 self.log.debug("Calling callback handler")
522 handler(permid, addr, list)
524 self.log.exception("Could not activate announce handler callback '%s'"%handler)
527 def handleOVERLAYSWARMAnnounce(self, permid, addr, params):
528 """ Callback function to handle multicast node announcements
530 This one will trigger an overlay connection and then initiate a buddycast
533 # todo: when the port or selversion change this will NOT be
534 # updated in the database. Solution: change the whole
535 # flag_peer_as_local_to_db into check_and_update_peer_in_db
536 # and let it check for the existance and current value of
537 # is_local, port, and selversion. (at no additional queries I
540 self.log.debug("Got Tr_OVERLAYSWARM announce!")
541 port, selversion = params
543 if permid == self.config["permid"]:
544 self.log.debug("Discovered myself")
545 # Discovered myself, which is not interesting
548 if self.flag_peer_as_local_to_db(permid, True):
549 self.log.debug("node flagged as local")
553 # We could not update - this is a new node!
556 self.log.debug("Adding peer at %s to database"%addr[0])
557 self.add_peer_to_db(permid, (addr[0], port), selversion)
559 print >> sys.stderr, "pdisc: Could not add node:",e
562 self.flag_peer_as_local_to_db(permid, True)
563 self.log.debug("node flagged as local")
565 print >> sys.stderr, "pdisc: Could not flag node as local:",e
567 # Now trigger a buddycast exchange
568 bc_core = BuddyCastFactory.getInstance().buddycast_core
570 self.log.debug("Triggering buddycast")
571 bc_core.startBuddyCast(permid)
573 # Also announce myself so that the remote node can see me!
574 params = [self.myport, self.myselversion]
575 self.log.debug("Sending announce myself")
577 self.sendAnnounce(params)
579 self.log.exception("Sending announcement")
581 def sendAnnounce(self, list):
584 Send an announce on local multicast, if enabled
591 # Create ANNOUNCE message
592 msg = "ANNOUNCE\n%s\n%d\n"%\
593 (base64.b64encode(self.config['permid']).replace("\n",""), len(list))
598 self._sendMulticast(msg)
600 def _sendMulticast(self, msg):
603 Send a message buffer on the multicast channels
607 if self.config['multicast_ipv4_enabled']:
608 self._send(self.config['multicast_ipv4_address'], msg)
609 if self.config['multicast_ipv6_enabled']:
610 for iface in self.interfaces:
611 self._send("%s%%%s"%(self.config['multicast_ipv6_address'], iface), msg)
615 def ping(self, permid, numPings=3):
617 Ping a node and return (avg time, min, max) or (None, None, None) if no answer
618 Only one node can be pinged at the time - else this function will not work!
621 self.outstanding_pings = {}
622 self.incoming_pongs = {}
624 # Send a PING via multicast and wait for a multicast response.
625 # Using multicast for both just in case it is different from
628 for i in range(0, numPings):
629 nonce = random.randint(0, 2147483647)
630 msg = "PING\n%s\n%s"%(base64.b64encode(permid).replace("\n",""), nonce)
631 self.outstanding_pings[nonce] = time.time()
632 self._sendMulticast(msg)
635 # Now we gather the results
638 if len(self.incoming_pongs) == 0:
639 return (None, None, None)
645 for nonce in self.outstanding_pings.keys():
646 if self.incoming_pongs.has_key(nonce):
647 diff = self.incoming_pongs[nonce] - self.outstanding_pings[nonce]
657 self.outstanding_pings = {}
658 self.incoming_pongs = {}
659 return (avg, min, max)
661 def add_peer_to_db(self,permid,dns,selversion):
662 # todo: should is_local be set to True?
663 now = int(time.time())
664 peer_data = {'permid':permid, 'ip':dns[0], 'port':dns[1], 'oversion':selversion, 'last_seen':now, 'last_connected':now}
665 self.peer_db.addPeer(permid, peer_data, update_dns=True, update_connected=True, commit=True)
667 def flag_peer_as_local_to_db(self, permid, is_local):
669 Sets the is_local flag for PERMID to IS_LOCAL if and only if
670 PERMID exists in the database, in this case it returns
671 True. Otherwise it returns False.
673 peer = self.peer_db.getPeer(permid, ('is_local',))
675 print >>sys.stderr,"pdisc: flag_peer_as_local returns",peer
678 # Arno, 2010-02-09: Somehow return value is not std.
679 if isinstance(peer,list):
683 if not flag == is_local:
684 self.peer_db.setPeerLocalFlag(permid, is_local)
690 ##print >>sys.stderr,"pdisc: Flagging a peer as local"
691 # return self.peer_db.setPeerLocalFlag(permid, is_local)