From 17d4165f02087cd75cf511f656c7b10ae48d008c Mon Sep 17 00:00:00 2001 From: Marius Sandu-Popa Date: Thu, 22 Apr 2010 18:27:42 +0300 Subject: [PATCH] autorun: changed Commander; updated xml config files --- autorun/commander/Commander.py | 160 +++++++++++++++++++-------- autorun/commander/CommanderBase.py | 168 +++++++++++------------------ autorun/commander/XMLParser.py | 6 +- autorun/xml/nodes.xml | 6 +- autorun/xml/swarm.xml | 4 +- 5 files changed, 183 insertions(+), 161 deletions(-) diff --git a/autorun/commander/Commander.py b/autorun/commander/Commander.py index 5d2b955..0464143 100644 --- a/autorun/commander/Commander.py +++ b/autorun/commander/Commander.py @@ -5,93 +5,145 @@ from XMLParser import * from threading import Thread from CommanderBase import * -MSGLEN = 4096 -DEBUG = True - class Commander(Thread): def __init__(self, nodes_xml, swarm_xml): Thread.__init__(self) self.nodes = Nodes(nodes_xml); self.swarm = Swarm(swarm_xml); + self.tc = TrafficControl("openvz"); + self.sshc = paramiko.SSHClient() + self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + #self.sshc.load_system_host_keys() self.Commander = CommanderBase(); + self.commands = {'list': [self.list, "lists the nodes in xml file"]} + +# def list(self): +# print "shit" - def startNodeServer(self, node): - print node.ssh_port, node.username, node.daemon_dir, node.public_address - self.Commander.startServer(node.public_address, - int(node.ssh_port), - node.private_address, - node.username); +# def listCommands(self): +# print "List of commands:" +# for k, v in self.commands.items(): +# print "\t",k,"\t",v[1] - def getNodeStatus(self, node): - self.Commander.getClients(node.public_address, - int(node.listen_port)) - - def getNodeOutput(self, node): - self.Commander.getOutput(node.public_address, - int(node.listen_port)) - + def sendSSHComm(self, hostname, username, port, comm): + try: + self.sshc.connect(hostname=hostname, + username=username, + port=port) + stdin, stdout, stderr = self.sshc.exec_command(comm) + print stdout.readlines() + except Exception, e: + print e + finally: + self.sshc.close() + + def sendMultipleSSHComm(self, hostname, username, port, comms): + try: + self.sshc.connect(hostname=hostname, + username=username, + port=port) + for c in comms: + stdin, stdout, stderr = self.sshc.exec_command(comm) + except Exception, e: + print e + finally: + self.sshc.close() + def applyNodeTC(self, node): si = self.swarm.getSIByNode(node) - self.Commander.applyTC(node.public_address, + self.tc.config(node.public_address, node.public_port, node.public_iface, node.private_address, node.private_port, - node.private_iface, - si.upload_limit, - si.download_limit) + node.private_iface) + self.tc.set_upload_limit(si.upload_limit) + self.tc.set_download_limit(si.download_limit) + + commands = self.tc.get_all_commands() + #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands) + #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands) + + def startNodeServer(self, node): + comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + node. private_address + print node.ssh_port, node.username, node.daemon_dir, node.public_address + self.sendSSHComm(node.public_address, + node.username, + int(node.ssh_port), + comm) def startSI(self, si): node = self.nodes.getNodeBySi(si) + if not node: + print "[ERROR] Swarm Instance unknown! Check XML config files!" + base_path = node.getClientBaseDir(si.btclient) torrent_file = self.swarm.getTorrentFile() config_data = { CLIENT : si.btclient, - BASE_DIR : base_path, - UP_LIMIT : si.download_limit, - DL_LIMIT : si.upload_limit, - PORT : node.public_port, - DL_DIR : si.download_dir, - LOG_DIR : si.log_dir, - OUT_FILE : si.output_file, - LOG_FILE : si.log_file, - TORRENT : torrent_file } + BASE_DIR : base_path, + UP_LIMIT : si.download_limit, + DL_LIMIT : si.upload_limit, + PORT : node.public_port, + DL_DIR : si.download_dir, + LOG_DIR : si.log_dir, + OUT_FILE : si.output_file, + OUT_DIR : si.output_dir, + LOG_FILE : si.log_file, + TORRENT : torrent_file } - ret = self.Commander.startClient(node.public_address, - int(node.listen_port), - config_data) + ret = self.Commander.start(node.public_address, + int(node.listen_port), + config_data) print ">>>>>>>>", ret if ret: si.pid = ret + si.running = True + def stopSI(self, si): #self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data) node = self.nodes.getNodeBySi(si) - self.Commander.stop(node.public_address, - int(node.listen_port), - si.pid) - + ret = self.Commander.stop(node.public_address, + int(node.listen_port), + si.pid) + + def getNodeStatus(self, node): + self.Commander.getClients(node.public_address, + int(node.listen_port)) + + def getNodeOutput(self, node): + self.Commander.getOutput(node.public_address, + int(node.listen_port)) + + def nodeArchive(self, node): + self.Commander.archive(node.public_address, + int(node.listen_port)) + def cleanNode(self, node): pass def getSIStatus(self, si): pass - def startAll(self): + def startAllServers(self): for node in self.nodes.getNodes(): - #self.applyTC(node) self.startNodeServer(node) - #self.getNodeStatus(node) + def startAll(self): for si in self.swarm.getSIs(): self.startSI(si) def statusAll(self): - for node in self.node.getNodes(): + for node in self.nodes.getNodes(): self.getNodeStatus(node) + + def archiveAll(self): + for node in self.nodes.getNodes(): + self.nodeArchive(node); def outputAll(self): - for node in self.node.getNodes(): + for node in self.nodes.getNodes(): self.getNodeOutput(node) def stopAll(self): @@ -100,18 +152,32 @@ class Commander(Thread): def run(self): self.printUsage() + #self.listCommands() + #(self.commands['list'][0])() while True: - k = raw_input("Choose option:") + k = raw_input("Choose option: ") if k == '': break - elif k == '1': + if k == '1': + self.startAllServers() + if k == '2': self.startAll() - elif k == '2': + if k == '3': + self.statusAll() + if k == '4': + self.outputAll() + if k == '5': + self.archiveAll() + if k == '6': self.stopAll() def printUsage(self): print "=========Commander Options==========" - print "1. Start Swarm" - print "2. Stop Swarm" + print "1. Start Servers" + print "2. Start Swarm" + print "3. Status Swarm" + print "4. Output Swarm" + print "5. Archive Swarm" + print "6. Stop Swarm" print "====================================" if __name__ == "__main__": diff --git a/autorun/commander/CommanderBase.py b/autorun/commander/CommanderBase.py index 85ef7d5..afcd96a 100644 --- a/autorun/commander/CommanderBase.py +++ b/autorun/commander/CommanderBase.py @@ -8,44 +8,11 @@ from TrafficControl import * from threading import Thread paramiko.util.log_to_file('/tmp/paramiko.log') -MSGLEN = 4096 -DEBUG = True - class CommanderBase(): - def __init__(self): - self.tc = TrafficControl("openvz"); - self.sshc = paramiko.SSHClient() - self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - #self.sshc.load_system_host_keys() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - def sendSSHComm(self, hostname, username, port, comm): - try: - self.sshc.connect(hostname=hostname, - username=username, - port=port) - stdin, stdout, stderr = self.sshc.exec_command(comm) - print stdout.readlines() - except Exception, e: - print e - finally: - self.sshc.close() - - def sendMultipleSSHComm(self, hostname, username, port, comms): - try: - self.sshc.connect(hostname=hostname, - username=username, - port=port) - for c in comms: - stdin, stdout, stderr = self.sshc.exec_command(comm) - except Exception, e: - print e - finally: - self.sshc.close() def sendComm(self, hostname, port, msg_type, config_data): - #print "sendcomm ", config_data try: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((hostname, port)) self.sendMsg(pickle.dumps((msg_type, config_data))) response = self.recvMsg() @@ -68,93 +35,84 @@ class CommanderBase(): msg = '' while 1: chunk = self.sock.recv(BUFFER_SIZE) - if not chunk: - break - #raise RuntimeError, "socket connection broken" - msg = msg + chunk - if len(chunk) < BUFFER_SIZE: - break + if not chunk: + break + msg = msg + chunk + if len(chunk) < BUFFER_SIZE: + break dd = pickle.loads(msg) return dd - - # chunk = self.sock.recv(MSGLEN) - # if chunk == '': - # raise RuntimeError, "socket connection broken" - # msg = msg + chunk - # - # return msg - - def startServer(self, public_address, ssh_port, private_address, username): - comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + private_address - print comm - self.printDummyCommand(public_address, ssh_port, username, comm) - self.sendSSHComm(public_address, username, ssh_port, comm) def getClients(self, public_address, listen_port): - msg_data = self.sendComm(public_address, - listen_port, - GET_CLIENTS, - "") - # lista (id, type, metafile) - print msg_data + msg_type, msg_data = self.sendComm(public_address, + listen_port, + GET_CLIENTS, + "") + if msg_type == ACK_MSG: + print "[GET_CLIENTS] Client list: ", msg_data + return msg_data + else: + print msg_data + return None def getOutput(self, public_address, listen_port): - msg_data = self.sendComm(public_address, - listen_port, - GET_OUTPUT, - "") - # lista (... - print msg_data - - def applyTC(self, public_address, public_port, public_iface, private_address, - private_port, private_iface, upload_limit, download_limit): - si = self.swarm.getSIByNode(node) - self.tc.config(public_address, - public_port, - public_iface, - private_address, - private_port, - private_iface) - self.tc.set_upload_limit(upload_limit) - self.tc.set_download_limit(download_limit) - - commands = self.tc.get_all_commands() - #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands) - #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands) - - - def startClient(self, public_address, listen_port, config_data): msg_type, msg_data = self.sendComm(public_address, - listen_port, - START_MSG, - config_data) - #print listen_port + listen_port, + GET_OUTPUT, + "") if msg_type == ACK_MSG: + print "[GET_OUTPUT] Client output: ", msg_data return msg_data else: print msg_data - return -1 + return None - def printDummyCommand(self, public_address, public_port, option1, data): - print "----------------------" - print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1) - print "this data: %s" %(data) - print "----------------------\n" - + def archive(self, public_address, listen_port): + msg_type, msg_data = self.sendComm(public_address, + listen_port, + ARCHIVE, + "") + if msg_type == ACK_MSG: + print "[ARCHIVE] List of files: " + print msg_data + return msg_data + else: + print msg_data + return None + + def start(self, public_address, listen_port, config_data): + msg_type, msg_data = self.sendComm(public_address, + listen_port, + START_MSG, + config_data) + if msg_type == ACK_MSG: + print "[START] started client with pid: ", msg_data + return msg_data + else: + print msg_data + return False - def stopClient(self, public_adress, listen_port, client_pid): + def stop(self, public_address, listen_port, client_pid): msg_type, msg_data = self.sendComm(public_address, - listen_port, - STOP_MSG, - client_pid) + listen_port, + STOP_MSG, + client_pid) if msg_type == ACK_MSG: - "shutdown client OK" + print "[STOP] client shutdown OK" + return True else: - "error while shutting down client" - print msg_data - + print "error while shutting down client" + print msg_data + return None + def clean(self): - pass - + pass + + def printDummyCommand(self, public_address, public_port, option1, data): + print "----------------------" + print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1) + print "this data: %s" %(data) + print "----------------------\n" + \ No newline at end of file diff --git a/autorun/commander/XMLParser.py b/autorun/commander/XMLParser.py index 82c1fe9..532486b 100644 --- a/autorun/commander/XMLParser.py +++ b/autorun/commander/XMLParser.py @@ -4,6 +4,7 @@ class SwarmInstance: def __init__(self, id, list): self.id = id; self.pid = -1; + self.running = False; self.node_id = list[0]; self.btclient = list[1]; self.upload_limit = list[2]; @@ -37,8 +38,7 @@ class Node: self.daemon_dir = list[9]; self.daemon_file = list[10]; self.clients_base_dir = list[11]; -# self.clients = {}; - + def __str__(self): return '[%s: %s: %s: %s: %s: %s %s: %s]' \ %(self.id, self.public_address, self.public_port, self.private_address, \ @@ -73,7 +73,6 @@ class Nodes: self.nodes.append(Node(id, list)) except IOError, e: print e - return None def getNode(self, id): for node in self.nodes: @@ -102,7 +101,6 @@ class Swarm: self.swarm.append(SwarmInstance(id, list)) except IOError, e: print e - return None def getSIByNode(self, node): for si in self.swarm: diff --git a/autorun/xml/nodes.xml b/autorun/xml/nodes.xml index b5e973a..0e549e9 100644 --- a/autorun/xml/nodes.xml +++ b/autorun/xml/nodes.xml @@ -14,13 +14,13 @@ Server.py - /home/p2p/p2p-clients/tribler/ + /home/p2p/p2p-clients/tribler - /home/p2p/p2p-clients/libtorrent/ + /home/p2p/p2p-clients/libtorrent - /home/p2p/p2p-clients/transmission/ + /usr/bin diff --git a/autorun/xml/swarm.xml b/autorun/xml/swarm.xml index 4899737..c9669c5 100644 --- a/autorun/xml/swarm.xml +++ b/autorun/xml/swarm.xml @@ -1,7 +1,7 @@ /home/p2p/p2p-meta/bbt316.torrent - + 1 tribler 512 @@ -19,5 +19,5 @@ - + -- 2.20.1