autorun: changed Commander; updated xml config files
authorMarius Sandu-Popa <sandupopamarius@gmail.com>
Thu, 22 Apr 2010 15:27:42 +0000 (18:27 +0300)
committerMarius Sandu-Popa <sandupopamarius@gmail.com>
Thu, 22 Apr 2010 15:27:54 +0000 (18:27 +0300)
autorun/commander/Commander.py
autorun/commander/CommanderBase.py
autorun/commander/XMLParser.py
autorun/xml/nodes.xml
autorun/xml/swarm.xml

index 5d2b955..0464143 100644 (file)
@@ -5,93 +5,145 @@ from XMLParser import *
 from threading import Thread
 from CommanderBase import *
 
-MSGLEN = 4096
-DEBUG = True
-
 class Commander(Thread):
        def __init__(self, nodes_xml, swarm_xml):
                Thread.__init__(self)
                self.nodes = Nodes(nodes_xml);
                self.swarm = Swarm(swarm_xml);
+               self.tc = TrafficControl("openvz");
+               self.sshc = paramiko.SSHClient()
+               self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+               #self.sshc.load_system_host_keys()      
                self.Commander = CommanderBase();       
+               self.commands = {'list': [self.list, "lists the nodes in xml file"]}
+               
+#      def list(self):
+#              print "shit"
        
-       def startNodeServer(self, node):
-               print node.ssh_port, node.username, node.daemon_dir, node.public_address
-               self.Commander.startServer(node.public_address, 
-                                               int(node.ssh_port), 
-                                               node.private_address, 
-                                               node.username);
+#      def listCommands(self):
+#              print "List of commands:"
+#              for k, v in self.commands.items():
+#                      print "\t",k,"\t",v[1]
 
-       def getNodeStatus(self, node):  
-               self.Commander.getClients(node.public_address, 
-                                       int(node.listen_port)) 
-
-       def getNodeOutput(self, node):
-               self.Commander.getOutput(node.public_address, 
-                                       int(node.listen_port)) 
-               
+       def sendSSHComm(self, hostname, username, port, comm):
+               try:
+                       self.sshc.connect(hostname=hostname,  
+                                                       username=username, 
+                                                       port=port)
+                       stdin, stdout, stderr = self.sshc.exec_command(comm)
+                       print stdout.readlines()
+               except Exception, e:
+                       print e
+               finally:
+                       self.sshc.close()
+       
+       def sendMultipleSSHComm(self, hostname, username, port, comms):
+               try:
+                       self.sshc.connect(hostname=hostname,  
+                                                       username=username, 
+                                                       port=port)
+                       for c in comms:
+                               stdin, stdout, stderr = self.sshc.exec_command(comm)
+               except Exception, e:
+                       print e
+               finally: 
+                       self.sshc.close()
+       
        def applyNodeTC(self, node):
                si = self.swarm.getSIByNode(node)
-               self.Commander.applyTC(node.public_address, 
+               self.tc.config(node.public_address, 
                                        node.public_port, 
                                        node.public_iface,
                                        node.private_address, 
                                        node.private_port, 
-                                       node.private_iface,
-                                       si.upload_limit,
-                                       si.download_limit)
+                                       node.private_iface)
+               self.tc.set_upload_limit(si.upload_limit)
+               self.tc.set_download_limit(si.download_limit)
+               
+               commands = self.tc.get_all_commands()
+               #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands)
+               #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands)       
+       
+       def startNodeServer(self, node):
+               comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + node. private_address
+               print node.ssh_port, node.username, node.daemon_dir, node.public_address
+               self.sendSSHComm(node.public_address, 
+                                               node.username, 
+                                               int(node.ssh_port), 
+                                               comm)
 
        def startSI(self, si):
                node = self.nodes.getNodeBySi(si)
+               if not node:
+                       print "[ERROR] Swarm Instance unknown! Check XML config files!"
+               
                base_path = node.getClientBaseDir(si.btclient)
                torrent_file = self.swarm.getTorrentFile()
                
                config_data = { CLIENT : si.btclient, 
-                               BASE_DIR : base_path, 
-                               UP_LIMIT : si.download_limit,
-                               DL_LIMIT : si.upload_limit,
-                               PORT : node.public_port,
-                               DL_DIR : si.download_dir, 
-                               LOG_DIR : si.log_dir, 
-                               OUT_FILE : si.output_file,
-                               LOG_FILE : si.log_file,
-                               TORRENT : torrent_file }
+                                       BASE_DIR : base_path, 
+                                       UP_LIMIT : si.download_limit,
+                                       DL_LIMIT : si.upload_limit,
+                                       PORT : node.public_port,
+                                       DL_DIR : si.download_dir, 
+                                       LOG_DIR : si.log_dir, 
+                                       OUT_FILE : si.output_file,
+                                       OUT_DIR : si.output_dir,
+                                       LOG_FILE : si.log_file,
+                                       TORRENT : torrent_file }
        
-               ret = self.Commander.startClient(node.public_address, 
-                                               int(node.listen_port), 
-                                               config_data)    
+               ret = self.Commander.start(node.public_address, 
+                                                       int(node.listen_port), 
+                                                       config_data)    
                print ">>>>>>>>", ret
                if ret:
                        si.pid = ret
+                       si.running = True
+               
        
        def stopSI(self, si):   
                #self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data)
                node = self.nodes.getNodeBySi(si)
-               self.Commander.stop(node.public_address, 
-                               int(node.listen_port), 
-                               si.pid) 
-                       
+               ret = self.Commander.stop(node.public_address, 
+                                                       int(node.listen_port), 
+                                                       si.pid)
+
+       def getNodeStatus(self, node):  
+               self.Commander.getClients(node.public_address, 
+                                       int(node.listen_port)) 
+
+       def getNodeOutput(self, node):
+               self.Commander.getOutput(node.public_address, 
+                                       int(node.listen_port)) 
+               
+       def nodeArchive(self, node):
+               self.Commander.archive(node.public_address, 
+                                       int(node.listen_port))
+
        def cleanNode(self, node):
                pass    
 
        def getSIStatus(self, si):
                pass    
        
-       def startAll(self):
+       def startAllServers(self):
                for node in self.nodes.getNodes():
-                       #self.applyTC(node)
                        self.startNodeServer(node)
-                       #self.getNodeStatus(node)                       
 
+       def startAll(self):
                for si in self.swarm.getSIs():
                        self.startSI(si)
 
        def statusAll(self):
-               for node in self.node.getNodes():
+               for node in self.nodes.getNodes():
                        self.getNodeStatus(node)
+
+       def archiveAll(self):
+               for node in self.nodes.getNodes():
+                       self.nodeArchive(node);
        
        def outputAll(self):
-               for node in self.node.getNodes():
+               for node in self.nodes.getNodes():
                        self.getNodeOutput(node)
 
        def stopAll(self):
@@ -100,18 +152,32 @@ class Commander(Thread):
                        
        def run(self):
                self.printUsage()
+               #self.listCommands()
+               #(self.commands['list'][0])()
                while True:
-                       k = raw_input("Choose option:")
+                       k = raw_input("Choose option: ")
                        if k == '': break
-                       elif k == '1':
+                       if k == '1': 
+                               self.startAllServers()
+                       if k == '2': 
                                self.startAll()
-                       elif k == '2':
+                       if k == '3':
+                               self.statusAll()
+                       if k == '4':
+                               self.outputAll()
+                       if k == '5':
+                               self.archiveAll()
+                       if k == '6':
                                self.stopAll()
        
        def printUsage(self):
                print "=========Commander Options=========="
-               print "1. Start Swarm"
-               print "2. Stop Swarm"
+               print "1. Start Servers"
+               print "2. Start Swarm"
+               print "3. Status Swarm"
+               print "4. Output Swarm"
+               print "5. Archive Swarm"
+               print "6. Stop Swarm"
                print "===================================="
 
 if __name__ == "__main__":
index 85ef7d5..afcd96a 100644 (file)
@@ -8,44 +8,11 @@ from TrafficControl import *
 from threading import Thread
 paramiko.util.log_to_file('/tmp/paramiko.log')
 
-MSGLEN = 4096
-DEBUG = True
-
 class CommanderBase():
-       def __init__(self):
-               self.tc = TrafficControl("openvz");
-               self.sshc = paramiko.SSHClient()
-               self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-               #self.sshc.load_system_host_keys()
-               self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-               
-       def sendSSHComm(self, hostname, username, port, comm):
-               try:
-                       self.sshc.connect(hostname=hostname,  
-                                       username=username, 
-                                       port=port)
-                       stdin, stdout, stderr = self.sshc.exec_command(comm)
-                       print stdout.readlines()
-               except Exception, e:
-                       print e
-               finally:
-                       self.sshc.close()
-       
-       def sendMultipleSSHComm(self, hostname, username, port, comms):
-               try:
-                       self.sshc.connect(hostname=hostname,  
-                                       username=username, 
-                                       port=port)
-                       for c in comms:
-                               stdin, stdout, stderr = self.sshc.exec_command(comm)
-               except Exception, e:
-                       print e
-               finally: 
-                       self.sshc.close()
        
        def sendComm(self, hostname, port, msg_type, config_data):
-               #print "sendcomm ", config_data
                try:
+                       self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        self.sock.connect((hostname, port))
                        self.sendMsg(pickle.dumps((msg_type, config_data)))
                        response = self.recvMsg()
@@ -68,93 +35,84 @@ class CommanderBase():
                msg = ''
                while 1:
                        chunk = self.sock.recv(BUFFER_SIZE)
-                       if not chunk:
-                               break
-                       #raise RuntimeError, "socket connection broken"
-                       msg = msg + chunk
-                       if len(chunk) < BUFFER_SIZE:
-                               break
+                       if not chunk:
+                               break
+                       msg = msg + chunk
+                       if len(chunk) < BUFFER_SIZE:
+                               break
 
                dd = pickle.loads(msg)
                return dd
-               
-       #       chunk = self.sock.recv(MSGLEN)
-       #       if chunk == '':
-       #               raise RuntimeError, "socket connection broken"
-       #       msg = msg + chunk
-       #       
-       #       return msg
-                       
-       def startServer(self, public_address, ssh_port, private_address, username):
-               comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + private_address
-               print comm
-               self.printDummyCommand(public_address, ssh_port, username, comm)
-               self.sendSSHComm(public_address, username, ssh_port, comm)
 
        def getClients(self, public_address, listen_port):      
-               msg_data = self.sendComm(public_address, 
-                                       listen_port, 
-                                       GET_CLIENTS, 
-                                       "")
-               # lista (id, type, metafile)
-               print msg_data                  
+               msg_type, msg_data = self.sendComm(public_address, 
+                                                                               listen_port, 
+                                                                               GET_CLIENTS, 
+                                                                               "")
+               if msg_type == ACK_MSG:
+                       print "[GET_CLIENTS] Client list: ", msg_data 
+                       return msg_data
+               else:
+                       print msg_data
+                       return None
 
        def getOutput(self, public_address, listen_port):
-               msg_data = self.sendComm(public_address, 
-                                       listen_port, 
-                                       GET_OUTPUT, 
-                                       "")
-               # lista (...
-               print msg_data
-               
-       def applyTC(self, public_address, public_port, public_iface, private_address, 
-                               private_port, private_iface, upload_limit, download_limit):
-               si = self.swarm.getSIByNode(node)
-               self.tc.config(public_address, 
-                               public_port, 
-                               public_iface,
-                               private_address, 
-                               private_port, 
-                               private_iface)
-               self.tc.set_upload_limit(upload_limit)
-               self.tc.set_download_limit(download_limit)
-               
-               commands = self.tc.get_all_commands()
-               #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands)
-               #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands)
-
-
-       def startClient(self, public_address, listen_port, config_data):        
                msg_type, msg_data = self.sendComm(public_address, 
-                                                       listen_port, 
-                                                       START_MSG, 
-                                                       config_data)
-               #print listen_port
+                                                                               listen_port, 
+                                                                               GET_OUTPUT, 
+                                                                               "")
                if msg_type == ACK_MSG:
+                       print "[GET_OUTPUT] Client output: ", msg_data 
                        return msg_data
                else:
                        print msg_data
-                       return -1
+                       return None
        
-       def printDummyCommand(self, public_address, public_port, option1, data):
-               print "----------------------"
-               print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1)
-               print "this data: %s" %(data)
-               print "----------------------\n"
-               
+       def archive(self, public_address, listen_port):
+               msg_type, msg_data = self.sendComm(public_address, 
+                                                                               listen_port, 
+                                                                               ARCHIVE, 
+                                                                               "")
+               if msg_type == ACK_MSG:
+                       print "[ARCHIVE] List of files: "
+                       print  msg_data 
+                       return msg_data
+               else:
+                       print msg_data
+                       return None
+
+       def start(self, public_address, listen_port, config_data):      
+               msg_type, msg_data = self.sendComm(public_address, 
+                                                                               listen_port, 
+                                                                               START_MSG, 
+                                                                               config_data)
+               if msg_type == ACK_MSG:
+                       print "[START] started client with pid: ", msg_data 
+                       return msg_data
+               else:
+                       print msg_data
+                       return False
        
-       def stopClient(self, public_adress, listen_port, client_pid):   
+       def stop(self, public_address, listen_port, client_pid):
                msg_type, msg_data = self.sendComm(public_address, 
-                                               listen_port, 
-                                               STOP_MSG,
-                                               client_pid) 
+                                                                               listen_port, 
+                                                                               STOP_MSG,
+                                                                               client_pid) 
                        
                if msg_type == ACK_MSG:
-                       "shutdown client OK"
+                       print "[STOP] client shutdown OK"
+                       return True
                else:
-                       "error while shutting down client"
-                       print msg_data          
-
+                       print "error while shutting down client"
+                       print msg_data
+                       return None
+       
        def clean(self):
-               pass    
-
+               pass
+       
+       def printDummyCommand(self, public_address, public_port, option1, data):
+               print "----------------------"
+               print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1)
+               print "this data: %s" %(data)
+               print "----------------------\n"
+       
\ No newline at end of file
index 82c1fe9..532486b 100644 (file)
@@ -4,6 +4,7 @@ class SwarmInstance:
        def __init__(self, id, list):
                self.id = id;
                self.pid = -1;
+               self.running = False;
                self.node_id = list[0];
                self.btclient = list[1];
                self.upload_limit = list[2];
@@ -37,8 +38,7 @@ class Node:
                self.daemon_dir = list[9];
                self.daemon_file = list[10];
                self.clients_base_dir = list[11];
-#              self.clients = {};
-       
+               
        def __str__(self):
                return '[%s: %s: %s: %s: %s: %s %s: %s]' \
                        %(self.id, self.public_address, self.public_port, self.private_address, \
@@ -73,7 +73,6 @@ class Nodes:
                                self.nodes.append(Node(id, list))
                except IOError, e:
                        print e
-                       return None
        
        def getNode(self, id):
                for node in self.nodes:
@@ -102,7 +101,6 @@ class Swarm:
                                self.swarm.append(SwarmInstance(id, list))
                except IOError, e:
                        print e
-                       return None
                        
        def  getSIByNode(self, node):
                for si in self.swarm:
index b5e973a..0e549e9 100644 (file)
        <daemon_file>Server.py</daemon_file>
        <clients>
            <client id="tribler">
-               <base>/home/p2p/p2p-clients/tribler/</base>
+               <base>/home/p2p/p2p-clients/tribler</base>
            </client>
            <client id="libtorrent">
-               <base>/home/p2p/p2p-clients/libtorrent/</base>
+               <base>/home/p2p/p2p-clients/libtorrent</base>
            </client>
            <client id="transmission">
-               <base>/home/p2p/p2p-clients/transmission/</base>
+               <base>/usr/bin</base>
            </client>
         </clients>
     </node>
index 4899737..c9669c5 100644 (file)
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="ISO-8859-1"?>
 <swarm>
     <torrent_file>/home/p2p/p2p-meta/bbt316.torrent</torrent_file>
-    <instance id="1">
+     <instance id="1">
        <node>1</node>
        <client>tribler</client>
        <upload_limit>512</upload_limit>
@@ -19,5 +19,5 @@
            <action type="start" delay="00:25:00" />
            <action type="start" delay="end" />
        </actions>
-    </instance>
+    </instance>  
 </swarm>