restructured Commander
authorMarius Sandu-Popa <sandupopamarius@gmail.com>
Fri, 16 Apr 2010 10:55:58 +0000 (13:55 +0300)
committerMarius Sandu-Popa <sandupopamarius@gmail.com>
Fri, 16 Apr 2010 10:56:15 +0000 (13:56 +0300)
autorun/commander/Commander.py
autorun/commander/CommanderBase.py [new file with mode: 0644]

index 5b47e87..c99b299 100644 (file)
@@ -1,12 +1,9 @@
 import sys, socket
-import pickle
-import paramiko
 import os
 from Util import *
 from XMLParser import *
-from TrafficControl import *
 from threading import Thread
-paramiko.util.log_to_file('/tmp/paramiko.log')
+from CommanderBase import *
 
 MSGLEN = 4096
 DEBUG = True
@@ -16,96 +13,42 @@ class Commander(Thread):
                Thread.__init__(self)
                self.nodes = Nodes(nodes_xml);
                self.swarm = Swarm(swarm_xml);
-               self.tc = TrafficControl("openvz");
-               self.sshc = paramiko.SSHClient()
-               self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-               #self.sshc.load_system_host_keys()
-               self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-               
-       def sendSSHComm(self, hostname, username, port, comm):
-               try:
-                       self.sshc.connect(hostname=hostname,  
-                                       username=username, 
-                                       port=port)
-                       stdin, stdout, stderr = self.sshc.exec_command(comm)
-                       print stdout.readlines()
-               except Exception, e:
-                       print e
-               finally:
-                       self.sshc.close()
-       
-       def sendMultipleSSHComm(self, hostname, username, port, comms):
-               try:
-                       self.sshc.connect(hostname=hostname,  
-                                       username=username, 
-                                       port=port)
-                       for c in comms:
-                               stdin, stdout, stderr = self.sshc.exec_command(comm)
-               except Exception, e:
-                       print e
-               finally: 
-                       self.sshc.close()
-       
-       def sendComm(self, hostname, port, msg_type, config_data):
-               try:
-                       self.sock.connect((hostname, port))
-                       self.sendMsg(pickle.dumps((msg_type, config_data)))
-                       response = self.recvMsg()
-                       return response
-               except Exception, e:
-                       print "Ups: Could not complete request"
-                       print e
-               finally:
-                       self.sock.close()
-
-       def sendMsg(self, msg):
-               totalsent = 0
-               while totalsent < len(msg):
-                       sent = self.sock.send(msg[totalsent:])
-                       if sent == 0:
-                               raise RuntimeError, "socket connection broken"
-                       totalsent = totalsent + sent
+               self.Commander = CommanderBase();       
        
-       def recvMsg(self):
-               msg = ''
-               chunk = self.sock.recv(MSGLEN)
-               if chunk == '':
-                       raise RuntimeError, "socket connection broken"
-               msg = msg + chunk
-               
-               return msg
-                       
-       def startDaemon(self, node):
-               print node.daemon_dir
-               comm = SERVER_TYPE+" "+SERVER_DIR+ " "+SERVER_FILE+" "+"start"   
-                       #+ node.public_address + " " + node.listen_port
-               print comm
-               #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, comm)
-               #self.sendSSHComm(node.public_address, node.username, int(node.ssh_port), comm)
-               #self.sendSSHComm(node.public_address, node.username, int(node.ssh_port), "ps -ef | grep Server")
+       def startNodeServer(self, node):
+               print node.ssh_port, node.username, node.daemon_dir, node.public_address
+               self.Commander.startServer(node.public_address, 
+                                               int(node.ssh_port), 
+                                               node.private_address, 
+                                               node.username);
 
        def getNodeStatus(self, node):  
                msg_data = self.sendComm(node.public_address, 
                                        int(node.listen_port), 
                                        GET_CLIENTS, 
                                        "")
-               print msg_data          
+               # lista (id, type, metafile)
+               print msg_data                  
+
+       def getNodeOutput(self, node):
+               msg_data = self.sendComm(node.public_address, 
+                                       int(node.listen_port), 
+                                       GET_OUTPUT, 
+                                       "")
+               # lista (
+               print msg_data
                
-       def applyTC(self, node):
+       def applyNodeTC(self, node):
                si = self.swarm.getSIByNode(node)
-               self.tc.config(node.public_address, 
-                               node.public_port, 
-                               node.public_iface,
-                               node.private_address, 
-                               node.private_port, 
-                               node.private_iface)
-               self.tc.set_upload_limit(si.upload_limit)
-               self.tc.set_download_limit(si.download_limit)
-               
-               commands = self.tc.get_all_commands()
-               #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands)
-               #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands)
-       
+               self.Commander.applyTC(node.public_address, 
+                                       node.public_port, 
+                                       node.public_iface,
+                                       node.private_address, 
+                                       node.private_port, 
+                                       node.private_iface,
+                                       si.upload_limit,
+                                       si.download_limit)
+
        def startSI(self, si):
                node = self.nodes.getNodeBySi(si)
                base_path = node.getClientBaseDir(si.btclient)
@@ -121,39 +64,20 @@ class Commander(Thread):
                                OUT_FILE : si.output_file,
                                LOG_FILE : si.log_file,
                                TORRENT : torrent_file }
-               
-               #self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data)
-               msg_type, msg_data = self.sendComm(node.public_address, 
-                                       int(node.listen_port), 
-                                       START_MSG, 
-                                       config_data)
-               if(msg_type == ACK_MSG):
-                       si.pid = msg_data
        
-               else:
-                       print msg_data          
-       
-       def printDummyCommand(self, public_address, public_port, option1, data):
-               print "----------------------"
-               print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1)
-               print "this data: %s" %(data)
-               print "----------------------\n"
-               
+               ret = self.Commander.startClient(node.public_address, 
+                                               int(node.listen_port), 
+                                               config_data)    
+               if ret:
+                       si.pid = msg_data
        
        def stopSI(self, si):   
                #self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data)
                node = self.nodes.getNodeBySi(si)
-               msg_type, msg_data = self.sendComm(node.public_address, 
-                                       int(node.listen_port), 
-                                       STOP_MSG,
-                                       si.pid) 
+               self.Commander.stop(node.public_address, 
+                               int(node.listen_port), 
+                               si.pid) 
                        
-               if(msg_type == ACK_MSG):
-                       "shutdown client OK"
-               else:
-                       "error while shutting down client"
-                       print msg_data          
-
        def cleanNode(self, node):
                pass    
 
@@ -163,10 +87,10 @@ class Commander(Thread):
        def startAll(self):
                for node in self.nodes.getNodes():
                        #self.applyTC(node)
-                       self.startDaemon(node)
+                       self.startNodeServer(node)
                
                for si in self.swarm.getSIs():
-                       self.sendSIStart(si)
+                       self.startSI(si)
                        
        def run(self):
                self.printUsage()
diff --git a/autorun/commander/CommanderBase.py b/autorun/commander/CommanderBase.py
new file mode 100644 (file)
index 0000000..f0d503f
--- /dev/null
@@ -0,0 +1,147 @@
+import sys, socket
+import pickle
+import paramiko
+import os
+from Util import *
+from XMLParser import *
+from TrafficControl import *
+from threading import Thread
+paramiko.util.log_to_file('/tmp/paramiko.log')
+
+MSGLEN = 4096
+DEBUG = True
+
+class CommanderBase():
+       def __init__(self):
+               self.tc = TrafficControl("openvz");
+               self.sshc = paramiko.SSHClient()
+               self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+               #self.sshc.load_system_host_keys()
+               self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+               
+       def sendSSHComm(self, hostname, username, port, comm):
+               try:
+                       self.sshc.connect(hostname=hostname,  
+                                       username=username, 
+                                       port=port)
+                       stdin, stdout, stderr = self.sshc.exec_command(comm)
+                       print stdout.readlines()
+               except Exception, e:
+                       print e
+               finally:
+                       self.sshc.close()
+       
+       def sendMultipleSSHComm(self, hostname, username, port, comms):
+               try:
+                       self.sshc.connect(hostname=hostname,  
+                                       username=username, 
+                                       port=port)
+                       for c in comms:
+                               stdin, stdout, stderr = self.sshc.exec_command(comm)
+               except Exception, e:
+                       print e
+               finally: 
+                       self.sshc.close()
+       
+       def sendComm(self, hostname, port, msg_type, config_data):
+               try:
+                       self.sock.connect((hostname, port))
+                       self.sendMsg(pickle.dumps((msg_type, config_data)))
+                       response = self.recvMsg()
+                       return response
+               except Exception, e:
+                       print "Ups: Could not complete request"
+                       print e
+               finally:
+                       self.sock.close()
+
+       def sendMsg(self, msg):
+               totalsent = 0
+               while totalsent < len(msg):
+                       sent = self.sock.send(msg[totalsent:])
+                       if sent == 0:
+                               raise RuntimeError, "socket connection broken"
+                       totalsent = totalsent + sent
+       
+       def recvMsg(self):
+               msg = ''
+               chunk = self.sock.recv(MSGLEN)
+               if chunk == '':
+                       raise RuntimeError, "socket connection broken"
+               msg = msg + chunk
+               
+               return msg
+                       
+       def startServer(self, public_address, ssh_port, private_address, username):
+               comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + private_address
+               print comm
+               self.printDummyCommand(public_address, ssh_port, username, comm)
+               self.sendSSHComm(public_address, username, ssh_port, comm)
+
+       def getClients(self, public_address, listen_port):      
+               msg_data = self.sendComm(public_address, 
+                                       listen_port, 
+                                       GET_CLIENTS, 
+                                       "")
+               # lista (id, type, metafile)
+               print msg_data                  
+
+       def getOutput(self, public_address, listen_port):
+               msg_data = self.sendComm(public_address, 
+                                       listen_port, 
+                                       GET_OUTPUT, 
+                                       "")
+               # lista (...
+               print msg_data
+               
+       def applyTC(self, public_address, public_port, public_iface, private_address, 
+                               private_port, private_iface, upload_limit, download_limit):
+               si = self.swarm.getSIByNode(node)
+               self.tc.config(public_address, 
+                               public_port, 
+                               public_iface,
+                               private_address, 
+                               private_port, 
+                               private_iface)
+               self.tc.set_upload_limit(upload_limit)
+               self.tc.set_download_limit(download_limit)
+               
+               commands = self.tc.get_all_commands()
+               #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands)
+               #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands)
+
+
+       def startClient(self, public_address, listen_port, config_data):        
+               msg_type, msg_data = self.sendComm(public_address, 
+                                                       listen_port, 
+                                                       START_MSG, 
+                                                       config_data)
+               print listen_port
+               if msg_type == ACK_MSG:
+                       return msg_data
+               else:
+                       print msg_data
+                       return -1
+       
+       def printDummyCommand(self, public_address, public_port, option1, data):
+               print "----------------------"
+               print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1)
+               print "this data: %s" %(data)
+               print "----------------------\n"
+               
+       
+       def stopClient(self, public_adress, listen_port, client_pid):   
+               msg_type, msg_data = self.sendComm(public_address, 
+                                               listen_port, 
+                                               STOP_MSG,
+                                               client_pid) 
+                       
+               if msg_type == ACK_MSG:
+                       "shutdown client OK"
+               else:
+                       "error while shutting down client"
+                       print msg_data          
+
+       def clean(self):
+               pass    
+