From 0a2a4804b154e84838f46f4da5c1d55abdb1c612 Mon Sep 17 00:00:00 2001 From: Marius Sandu-Popa Date: Thu, 29 Apr 2010 17:20:18 +0300 Subject: [PATCH] upgraded Commander --- autorun/commander/Commander.py | 265 ++++++++++++++++------------- autorun/commander/CommanderBase.py | 30 +--- autorun/commander/XMLParser.py | 8 +- 3 files changed, 166 insertions(+), 137 deletions(-) diff --git a/autorun/commander/Commander.py b/autorun/commander/Commander.py index 0464143..b942f74 100644 --- a/autorun/commander/Commander.py +++ b/autorun/commander/Commander.py @@ -1,5 +1,6 @@ import sys, socket import os +import re from Util import * from XMLParser import * from threading import Thread @@ -15,68 +16,99 @@ class Commander(Thread): self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #self.sshc.load_system_host_keys() self.Commander = CommanderBase(); - self.commands = {'list': [self.list, "lists the nodes in xml file"]} - -# def list(self): -# print "shit" + self.commands = { + 'list': [self.simpleC, 'Lists the nodes in xml file.'], + 'boot':[self.nodeC, 'Bootstrap server. Options: all | x .'], + 'start': [self.clientC, 'Start clients. Options: all | x | x,y .'], + 'stop': [self.clientC, 'Stop clients. Options: all | x | x,y .'], + 'archive': [self.nodeC, 'Archive output information of clients.'+\ + 'Options: all | x .'], + 'getclients': [self.nodeC, 'Get running clients. Options: all | x .'], + 'getoutput': [self.nodeC, 'Get clients output. Options: all | x .'], + 'help': [self.simpleC, "Show This Message."], + 'exit': [self.simpleC, "Exit Commander."] + } -# def listCommands(self): -# print "List of commands:" -# for k, v in self.commands.items(): -# print "\t",k,"\t",v[1] - - def sendSSHComm(self, hostname, username, port, comm): - try: - self.sshc.connect(hostname=hostname, - username=username, - port=port) - stdin, stdout, stderr = self.sshc.exec_command(comm) - print stdout.readlines() - except Exception, e: - print e - finally: - self.sshc.close() - - def sendMultipleSSHComm(self, hostname, username, port, comms): - try: - self.sshc.connect(hostname=hostname, - username=username, - port=port) - for c in comms: - stdin, stdout, stderr = self.sshc.exec_command(comm) - except Exception, e: - print e - finally: - self.sshc.close() + def simpleC(self, comm, argv): + if comm == 'list': + for node in self.nodes.getNodes(): + self.printNodeInfo(node) + if comm == 'help': + print "List of commands:" + keys = self.commands.keys() + keys.sort() + for k in keys: + if len(k)<7: + print " " + k + "\t\t- " + self.commands[k][1] + else: + print " " + k + "\t- " + self.commands[k][1] + print "(where x is for node id and y is for client id)" + if comm=='exit': + sys.exit() - def applyNodeTC(self, node): - si = self.swarm.getSIByNode(node) - self.tc.config(node.public_address, - node.public_port, - node.public_iface, - node.private_address, - node.private_port, - node.private_iface) - self.tc.set_upload_limit(si.upload_limit) - self.tc.set_download_limit(si.download_limit) - - commands = self.tc.get_all_commands() - #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands) - #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands) + def nodeC(self, comm, argv): + argc = len(argv) + callbacks = {'boot':self.startNodeServer, + 'archive':self.nodeArchive, + 'getoutput':self.getNodeOutput, + 'getclients':self.getNodeStatus} + if argc == 0 or argc == 2: + print "Invalid arguments" + return + if argv[0] == 'all': + for node in self.nodes.getNodes(): + print callbacks[comm](node) + elif argc==1: + node = self.nodes.getNode(argv[0]) + if not node: + print "Unknown node id!" + return + print callbacks[comm](node) + def clientC(self, comm, argv): + argc = len(argv) + callbacks = {'start':self.startSI, + 'stop':self.stopSI} + if argc == 0: + print "Invalid arguments" + return + if argv[0] == 'all': + for si in self.swarm.getSIs(): + node = self.nodes.getNodeBySi(si) + if not node: + print "Incompatible XML files!" + return + print callbacks[comm](node, si) + elif argc==1: + node = self.nodes.getNode(argv[0]) + if not node: + print "Unknown node id!" + return + si = self.swarm.getSIByNode(node) + if not si: + print "Unknown client id!" + return + print callbacks[comm](node, si) + else: + node = self.nodes.getNode(argv[0]) + if not node: + print "Unknown node id!" + return + si = self.swarm.getSI(argv[1]) + if not si: + print "Unknown client id!" + return + print callbacks[comm](node, si) + def startNodeServer(self, node): - comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + node. private_address - print node.ssh_port, node.username, node.daemon_dir, node.public_address + comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+\ + " "+"start " + node. private_address self.sendSSHComm(node.public_address, node.username, int(node.ssh_port), comm) - def startSI(self, si): - node = self.nodes.getNodeBySi(si) - if not node: - print "[ERROR] Swarm Instance unknown! Check XML config files!" - + def startSI(self, node, si): base_path = node.getClientBaseDir(si.btclient) torrent_file = self.swarm.getTorrentFile() @@ -98,88 +130,93 @@ class Commander(Thread): print ">>>>>>>>", ret if ret: si.pid = ret - si.running = True - - def stopSI(self, si): - #self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data) - node = self.nodes.getNodeBySi(si) - ret = self.Commander.stop(node.public_address, + def stopSI(self, node, si): + return self.Commander.stop(node.public_address, int(node.listen_port), si.pid) def getNodeStatus(self, node): - self.Commander.getClients(node.public_address, + return self.Commander.getClients(node.public_address, int(node.listen_port)) def getNodeOutput(self, node): - self.Commander.getOutput(node.public_address, + return self.Commander.getOutput(node.public_address, int(node.listen_port)) def nodeArchive(self, node): - self.Commander.archive(node.public_address, + return self.Commander.archive(node.public_address, int(node.listen_port)) def cleanNode(self, node): pass - - def getSIStatus(self, si): - pass - def startAllServers(self): - for node in self.nodes.getNodes(): - self.startNodeServer(node) - - def startAll(self): - for si in self.swarm.getSIs(): - self.startSI(si) - - def statusAll(self): - for node in self.nodes.getNodes(): - self.getNodeStatus(node) - - def archiveAll(self): - for node in self.nodes.getNodes(): - self.nodeArchive(node); + def sendSSHComm(self, hostname, username, port, comm): + try: + self.sshc.connect(hostname=hostname, + username=username, + port=port) + stdin, stdout, stderr = self.sshc.exec_command(comm) + print stdout.readlines() + except Exception, e: + print e + finally: + self.sshc.close() + + def sendMultipleSSHComm(self, hostname, username, port, comms): + try: + self.sshc.connect(hostname=hostname, + username=username, + port=port) + for c in comms: + stdin, stdout, stderr = self.sshc.exec_command(comm) + except Exception, e: + print e + finally: + self.sshc.close() - def outputAll(self): - for node in self.nodes.getNodes(): - self.getNodeOutput(node) + def applyNodeTC(self, node): + si = self.swarm.getSIByNode(node) + self.tc.config(node.public_address, + node.public_port, + node.public_iface, + node.private_address, + node.private_port, + node.private_iface) + self.tc.set_upload_limit(si.upload_limit) + self.tc.set_download_limit(si.download_limit) + + commands = self.tc.get_all_commands() + #~ self.sendMultipleSSHComm(node.public_address, + #~ node.username, + #~ node.ssh_port, + #~ commands) - def stopAll(self): - for si in self.swarm.getSIs(): - self.stopSI(si) - def run(self): - self.printUsage() - #self.listCommands() - #(self.commands['list'][0])() + self.about() while True: - k = raw_input("Choose option: ") - if k == '': break - if k == '1': - self.startAllServers() - if k == '2': - self.startAll() - if k == '3': - self.statusAll() - if k == '4': - self.outputAll() - if k == '5': - self.archiveAll() - if k == '6': - self.stopAll() + try: + input = raw_input("commander> ") + ret = re.match(r'[a-z]+(\s+all|\s+[0-9]+,[0-9]+|\s+[0-9]+)?$', input) + if not ret: + raise KeyError + argv = re.split(r'\W+', input) + (self.commands[argv[0]][0])(argv[0], argv[1:]) + except KeyError: + print "Unknown command" + + def printNodeInfo(self, node): + si = self.swarm.getSIByNode(node) + print 'node ' + node.id + ' (' + node.public_address +\ + ":" +node.public_port + " -> " + node.private_address +\ + ":" +node.listen_port +' ):' + print ' - client ' + si.id +' (' + si.btclient + ', Down:' + si.download_limit + \ + " KB/s" + ", Up:" +si.upload_limit + " KB/s" + ' )' + + def about(self): + print "Commander version 1.0.3" + print "Enter \"help\" for instructions" - def printUsage(self): - print "=========Commander Options==========" - print "1. Start Servers" - print "2. Start Swarm" - print "3. Status Swarm" - print "4. Output Swarm" - print "5. Archive Swarm" - print "6. Stop Swarm" - print "====================================" - if __name__ == "__main__": c = Commander("../xml/nodes.xml", "../xml/swarm.xml") c.start() diff --git a/autorun/commander/CommanderBase.py b/autorun/commander/CommanderBase.py index afcd96a..a6cd43b 100644 --- a/autorun/commander/CommanderBase.py +++ b/autorun/commander/CommanderBase.py @@ -50,11 +50,9 @@ class CommanderBase(): GET_CLIENTS, "") if msg_type == ACK_MSG: - print "[GET_CLIENTS] Client list: ", msg_data return msg_data else: - print msg_data - return None + error("get_clients", msg_data) def getOutput(self, public_address, listen_port): msg_type, msg_data = self.sendComm(public_address, @@ -62,11 +60,9 @@ class CommanderBase(): GET_OUTPUT, "") if msg_type == ACK_MSG: - print "[GET_OUTPUT] Client output: ", msg_data return msg_data else: - print msg_data - return None + error("get_output", msg_data) def archive(self, public_address, listen_port): msg_type, msg_data = self.sendComm(public_address, @@ -74,12 +70,9 @@ class CommanderBase(): ARCHIVE, "") if msg_type == ACK_MSG: - print "[ARCHIVE] List of files: " - print msg_data return msg_data else: - print msg_data - return None + error("archive", msg_data) def start(self, public_address, listen_port, config_data): msg_type, msg_data = self.sendComm(public_address, @@ -87,11 +80,9 @@ class CommanderBase(): START_MSG, config_data) if msg_type == ACK_MSG: - print "[START] started client with pid: ", msg_data return msg_data else: - print msg_data - return False + error("start_client", msg_data) def stop(self, public_address, listen_port, client_pid): msg_type, msg_data = self.sendComm(public_address, @@ -100,19 +91,14 @@ class CommanderBase(): client_pid) if msg_type == ACK_MSG: - print "[STOP] client shutdown OK" return True else: - print "error while shutting down client" - print msg_data - return None + error("stop_client", msg_data) def clean(self): pass - def printDummyCommand(self, public_address, public_port, option1, data): - print "----------------------" - print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1) - print "this data: %s" %(data) - print "----------------------\n" + def error(self, op, msg): + print "[Server Error]: The server denied the " + op + " operation" + print "[Server Error]: " + msg \ No newline at end of file diff --git a/autorun/commander/XMLParser.py b/autorun/commander/XMLParser.py index 532486b..0077364 100644 --- a/autorun/commander/XMLParser.py +++ b/autorun/commander/XMLParser.py @@ -4,7 +4,7 @@ class SwarmInstance: def __init__(self, id, list): self.id = id; self.pid = -1; - self.running = False; + #~ self.running = False; self.node_id = list[0]; self.btclient = list[1]; self.upload_limit = list[2]; @@ -26,6 +26,7 @@ class SwarmInstance: class Node: def __init__(self, id, list): self.id = id; + #~ self.running = False; self.public_address = list[0]; self.public_port = list[1]; self.public_iface = list[2]; @@ -106,6 +107,11 @@ class Swarm: for si in self.swarm: if si.node_id == node.id: return si + + def getSI(self, id): + for si in self.swarm: + if si.id == id: + return si def getTorrentFile(self): return self.torrent_file -- 2.20.1