From 53fe9e3aec5b336629c126b477301c5a2944d204 Mon Sep 17 00:00:00 2001 From: Marius Sandu-Popa Date: Fri, 16 Apr 2010 13:55:58 +0300 Subject: [PATCH] restructured Commander --- autorun/commander/Commander.py | 152 ++++++++--------------------- autorun/commander/CommanderBase.py | 147 ++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 114 deletions(-) create mode 100644 autorun/commander/CommanderBase.py diff --git a/autorun/commander/Commander.py b/autorun/commander/Commander.py index 5b47e87..c99b299 100644 --- a/autorun/commander/Commander.py +++ b/autorun/commander/Commander.py @@ -1,12 +1,9 @@ import sys, socket -import pickle -import paramiko import os from Util import * from XMLParser import * -from TrafficControl import * from threading import Thread -paramiko.util.log_to_file('/tmp/paramiko.log') +from CommanderBase import * MSGLEN = 4096 DEBUG = True @@ -16,96 +13,42 @@ class Commander(Thread): Thread.__init__(self) self.nodes = Nodes(nodes_xml); self.swarm = Swarm(swarm_xml); - self.tc = TrafficControl("openvz"); - self.sshc = paramiko.SSHClient() - self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - #self.sshc.load_system_host_keys() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - def sendSSHComm(self, hostname, username, port, comm): - try: - self.sshc.connect(hostname=hostname, - username=username, - port=port) - stdin, stdout, stderr = self.sshc.exec_command(comm) - print stdout.readlines() - except Exception, e: - print e - finally: - self.sshc.close() - - def sendMultipleSSHComm(self, hostname, username, port, comms): - try: - self.sshc.connect(hostname=hostname, - username=username, - port=port) - for c in comms: - stdin, stdout, stderr = self.sshc.exec_command(comm) - except Exception, e: - print e - finally: - self.sshc.close() - - def sendComm(self, hostname, port, msg_type, config_data): - try: - self.sock.connect((hostname, port)) - self.sendMsg(pickle.dumps((msg_type, config_data))) - response = self.recvMsg() - return response - except Exception, e: - print "Ups: Could not complete request" - print e - finally: - self.sock.close() - - def sendMsg(self, msg): - totalsent = 0 - while totalsent < len(msg): - sent = self.sock.send(msg[totalsent:]) - if sent == 0: - raise RuntimeError, "socket connection broken" - totalsent = totalsent + sent + self.Commander = CommanderBase(); - def recvMsg(self): - msg = '' - chunk = self.sock.recv(MSGLEN) - if chunk == '': - raise RuntimeError, "socket connection broken" - msg = msg + chunk - - return msg - - def startDaemon(self, node): - print node.daemon_dir - comm = SERVER_TYPE+" "+SERVER_DIR+ " "+SERVER_FILE+" "+"start" - #+ node.public_address + " " + node.listen_port - print comm - #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, comm) - #self.sendSSHComm(node.public_address, node.username, int(node.ssh_port), comm) - #self.sendSSHComm(node.public_address, node.username, int(node.ssh_port), "ps -ef | grep Server") + def startNodeServer(self, node): + print node.ssh_port, node.username, node.daemon_dir, node.public_address + self.Commander.startServer(node.public_address, + int(node.ssh_port), + node.private_address, + node.username); def getNodeStatus(self, node): msg_data = self.sendComm(node.public_address, int(node.listen_port), GET_CLIENTS, "") - print msg_data + # lista (id, type, metafile) + print msg_data + + def getNodeOutput(self, node): + msg_data = self.sendComm(node.public_address, + int(node.listen_port), + GET_OUTPUT, + "") + # lista ( + print msg_data - def applyTC(self, node): + def applyNodeTC(self, node): si = self.swarm.getSIByNode(node) - self.tc.config(node.public_address, - node.public_port, - node.public_iface, - node.private_address, - node.private_port, - node.private_iface) - self.tc.set_upload_limit(si.upload_limit) - self.tc.set_download_limit(si.download_limit) - - commands = self.tc.get_all_commands() - #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands) - #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands) - + self.Commander.applyTC(node.public_address, + node.public_port, + node.public_iface, + node.private_address, + node.private_port, + node.private_iface, + si.upload_limit, + si.download_limit) + def startSI(self, si): node = self.nodes.getNodeBySi(si) base_path = node.getClientBaseDir(si.btclient) @@ -121,39 +64,20 @@ class Commander(Thread): OUT_FILE : si.output_file, LOG_FILE : si.log_file, TORRENT : torrent_file } - - #self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data) - msg_type, msg_data = self.sendComm(node.public_address, - int(node.listen_port), - START_MSG, - config_data) - if(msg_type == ACK_MSG): - si.pid = msg_data - else: - print msg_data - - def printDummyCommand(self, public_address, public_port, option1, data): - print "----------------------" - print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1) - print "this data: %s" %(data) - print "----------------------\n" - + ret = self.Commander.startClient(node.public_address, + int(node.listen_port), + config_data) + if ret: + si.pid = msg_data def stopSI(self, si): #self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data) node = self.nodes.getNodeBySi(si) - msg_type, msg_data = self.sendComm(node.public_address, - int(node.listen_port), - STOP_MSG, - si.pid) + self.Commander.stop(node.public_address, + int(node.listen_port), + si.pid) - if(msg_type == ACK_MSG): - "shutdown client OK" - else: - "error while shutting down client" - print msg_data - def cleanNode(self, node): pass @@ -163,10 +87,10 @@ class Commander(Thread): def startAll(self): for node in self.nodes.getNodes(): #self.applyTC(node) - self.startDaemon(node) + self.startNodeServer(node) for si in self.swarm.getSIs(): - self.sendSIStart(si) + self.startSI(si) def run(self): self.printUsage() diff --git a/autorun/commander/CommanderBase.py b/autorun/commander/CommanderBase.py new file mode 100644 index 0000000..f0d503f --- /dev/null +++ b/autorun/commander/CommanderBase.py @@ -0,0 +1,147 @@ +import sys, socket +import pickle +import paramiko +import os +from Util import * +from XMLParser import * +from TrafficControl import * +from threading import Thread +paramiko.util.log_to_file('/tmp/paramiko.log') + +MSGLEN = 4096 +DEBUG = True + +class CommanderBase(): + def __init__(self): + self.tc = TrafficControl("openvz"); + self.sshc = paramiko.SSHClient() + self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + #self.sshc.load_system_host_keys() + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def sendSSHComm(self, hostname, username, port, comm): + try: + self.sshc.connect(hostname=hostname, + username=username, + port=port) + stdin, stdout, stderr = self.sshc.exec_command(comm) + print stdout.readlines() + except Exception, e: + print e + finally: + self.sshc.close() + + def sendMultipleSSHComm(self, hostname, username, port, comms): + try: + self.sshc.connect(hostname=hostname, + username=username, + port=port) + for c in comms: + stdin, stdout, stderr = self.sshc.exec_command(comm) + except Exception, e: + print e + finally: + self.sshc.close() + + def sendComm(self, hostname, port, msg_type, config_data): + try: + self.sock.connect((hostname, port)) + self.sendMsg(pickle.dumps((msg_type, config_data))) + response = self.recvMsg() + return response + except Exception, e: + print "Ups: Could not complete request" + print e + finally: + self.sock.close() + + def sendMsg(self, msg): + totalsent = 0 + while totalsent < len(msg): + sent = self.sock.send(msg[totalsent:]) + if sent == 0: + raise RuntimeError, "socket connection broken" + totalsent = totalsent + sent + + def recvMsg(self): + msg = '' + chunk = self.sock.recv(MSGLEN) + if chunk == '': + raise RuntimeError, "socket connection broken" + msg = msg + chunk + + return msg + + def startServer(self, public_address, ssh_port, private_address, username): + comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + private_address + print comm + self.printDummyCommand(public_address, ssh_port, username, comm) + self.sendSSHComm(public_address, username, ssh_port, comm) + + def getClients(self, public_address, listen_port): + msg_data = self.sendComm(public_address, + listen_port, + GET_CLIENTS, + "") + # lista (id, type, metafile) + print msg_data + + def getOutput(self, public_address, listen_port): + msg_data = self.sendComm(public_address, + listen_port, + GET_OUTPUT, + "") + # lista (... + print msg_data + + def applyTC(self, public_address, public_port, public_iface, private_address, + private_port, private_iface, upload_limit, download_limit): + si = self.swarm.getSIByNode(node) + self.tc.config(public_address, + public_port, + public_iface, + private_address, + private_port, + private_iface) + self.tc.set_upload_limit(upload_limit) + self.tc.set_download_limit(download_limit) + + commands = self.tc.get_all_commands() + #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands) + #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands) + + + def startClient(self, public_address, listen_port, config_data): + msg_type, msg_data = self.sendComm(public_address, + listen_port, + START_MSG, + config_data) + print listen_port + if msg_type == ACK_MSG: + return msg_data + else: + print msg_data + return -1 + + def printDummyCommand(self, public_address, public_port, option1, data): + print "----------------------" + print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1) + print "this data: %s" %(data) + print "----------------------\n" + + + def stopClient(self, public_adress, listen_port, client_pid): + msg_type, msg_data = self.sendComm(public_address, + listen_port, + STOP_MSG, + client_pid) + + if msg_type == ACK_MSG: + "shutdown client OK" + else: + "error while shutting down client" + print msg_data + + def clean(self): + pass + -- 2.20.1