From 05d0645758c3ce7f56aed37dfaaec0506eff17e1 Mon Sep 17 00:00:00 2001 From: Marius Sandu-Popa Date: Wed, 3 Feb 2010 14:27:02 +0200 Subject: [PATCH] fixed bugs commander; updated xml parser and files --- bt_comm/client/Client.py | 150 +++++++++++++++++-------------- bt_comm/client/TrafficControl.py | 6 +- bt_comm/client/run.sh | 2 + bt_comm/xml/XMLParser.py | 50 ++++++++--- bt_comm/xml/nodes.xml | 8 +- bt_comm/xml/swarm.xml | 35 +++----- 6 files changed, 143 insertions(+), 108 deletions(-) create mode 100755 bt_comm/client/run.sh diff --git a/bt_comm/client/Client.py b/bt_comm/client/Client.py index dbd4c78..57c3e1f 100644 --- a/bt_comm/client/Client.py +++ b/bt_comm/client/Client.py @@ -12,6 +12,7 @@ MSGLEN = 1024 class Commander(Thread): def __init__(self, nodes_xml, swarm_xml): + Thread.__init__(self) self.nodes = Nodes(nodes_xml); self.swarm = Swarm(swarm_xml); self.tc = TrafficControl("openvz"); @@ -19,105 +20,120 @@ class Commander(Thread): self.sshc.load_system_host_keys() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - def send_sshcomm(self, hostname, username, port, comm): + def sendSSHComm(self, hostname, username, port, comm): try: self.sshc.connect(hostname=hostname, username=username, port=port) stdin, stdout, stderr = self.sshc.exec_command(comm) - self.sshc.close() except Exception as e: print e + finally: + self.sshc.close() - def send_multiple_sshcom(self, hostname, username, port, comms): + def sendMultipleSSHComm(self, hostname, username, port, comms): try: self.sshc.connect(hostname=hostname, username=username, port=port) for c in comms: stdin, stdout, stderr = self.sshc.exec_command(comm) - self.sshc.close() except Exception as e: print e + finally: + self.sshc.close() - def send_comm(self, hostname, port, msg_type, config_data): - self.sock.connect((hostname, port)) - self.send(msg_type) - response = self.recv_msg() - if response == ACK_MSG: - self.send_msg(pickle.dumps(config_data)) + def sendComm(self, hostname, port, msg_type, config_data): + try: + self.sock.connect((hostname, port)) + self.send(msg_type) response = self.recv_msg() - self.sock.close() - - return response + if response == ACK_MSG: + self.sendMsg(pickle.dumps(config_data)) + response = self.recv_msg() + print response + except Exception as e: + print e + finally: + self.sock.close() - def send(self, msg): + def sendMsg(self, msg): totalsent = 0 while totalsent < len(msg): sent = self.sock.send(msg[totalsent:]) if sent == 0: raise RuntimeError, "socket connection broken" totalsent = totalsent + sent + + def recvMsg(self): + msg = '' + chunk = self.sock.recv(MSGLEN) + if chunk == '': + raise RuntimeError, "socket connection broken" + msg = msg + chunk + + return msg - def start_daemon(self, node): - comm = ""; - send_sshcomm(node.public_address, node.username, node.ssh_port, comm) + def startDaemon(self, node): + comm = "python StartMeUpBaby"; + self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, comm) + #sendSSHComm(node.public_address, node.username, node.ssh_port, comm) - def apply_tc(self, node): - si = swarm.getSIByNode(node) - tc.config(node.public_address, node.public_port, node.public_iface, \ + def applyTC(self, node): + si = self.swarm.getSIByNode(node) + self.tc.config(node.public_address, node.public_port, node.public_iface, \ node.private_address, node.private_port, node.private_iface) - tc.set_upload_limit(si.upload_limit) - tc.set_download_limit(si.downoad_limit) + self.tc.set_upload_limit(si.upload_limit) + self.tc.set_download_limit(si.download_limit) - commands = tc.get_all_commands() - send_multiple_sshcomm(node.public_address, node.username, node.ssh_port, commands) - - def send_start(self, node): - - config_data = [] - #~ config_data = [{CLIENT:"tribler", FILE:"Tribler/Tools/cmdline.py", - #~ RUN_TYPE:"script", - #~ INTERPRETER:"python", PREFIX:"PYTHONPATH=.",SUFFIX:"", - #~ UP_LIMIT_OPTION:"",DL_LIMIT_OPTION:"", PORT_OPTION:"-p", - #~ LOG_DIR_OPTION:"-l",DL_DIR_OPTION:"-d"}] - response = send_comm(self, node.public_address, node.public_port, START_MSG, config_data) + commands = self.tc.get_all_commands() + self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands) + #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands) + + def sendSIStart(self, si): + node = self.nodes.getNodeBySi(si) + base_path = node.getClientBaseDir(si.btclient) + torrent_file = self.swarm.getTorrentFile() + config_data = [{'client':si.btclient, + 'base_path':base_path, + 'download_dir':si.download_dir, + 'log_dir':si.log_dir, + 'torrent_file':torrent_file, + 'output_dir':si.output_dir, }] + self.printDummyCommand(node.public_address, node.public_port, START_MSG, config_data) + #sendComm(node.public_address, node.public_port, START_MSG, config_data) + + def printDummyCommand(self, public_address, public_port, option1, data): + print "----------------------" + print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1) + print "this data: %s" %(data) + print "----------------------\n" - def send_stop(self, node): + + def sendSIStop(self, node): pass - def send_status(self, node): + def sendSIStatus(self, node): pass + def startAll(self): + for node in self.nodes.getNodes(): + self.applyTC(node) + self.startDaemon(node) + + for si in self.swarm.getSIs(): + self.sendSIStart(si) + def run(self): - while(1): - k = raw_input('enter choice:') + self.printUsage() + while True: + k = raw_input("Choose option:") if k == '': break - - #~ def recv_msg(self): - #~ msg = '' - #~ chunk = self.sock.recv(MSGLEN) - #~ if chunk == '': - #~ raise RuntimeError, "socket connection broken" - #~ msg = msg + chunk - #~ print msg - #~ return msg - -class TestCommander: - def __init__(self): - nodes = Nodes("../xml/nodes.xml"); - swarm = Swarm("../xml/swarm.xml"); - #print nodes.getNode("2"); - -if __name__ == "__main__": - - tc = TestCommander() + elif k == '1': + self.startAll() - - #~ # test config - #~ config_data = [{CLIENT:"tribler", FILE:"Tribler/Tools/cmdline.py", - #~ RUN_TYPE:"script", - #~ INTERPRETER:"python", PREFIX:"PYTHONPATH=.",SUFFIX:"", - #~ UP_LIMIT_OPTION:"",DL_LIMIT_OPTION:"", PORT_OPTION:"-p", - #~ LOG_DIR_OPTION:"-l",DL_DIR_OPTION:"-d"}] - - #~ response = s.send_command(CONFIG_MSG, config_data) - #~ print response + def printUsage(self): + print "=========Commander Options==========" + print "1. Start Swarm" + print "====================================" +if __name__ == "__main__": + c = Commander("../xml/nodes.xml", "../xml/swarm.xml") + c.start() diff --git a/bt_comm/client/TrafficControl.py b/bt_comm/client/TrafficControl.py index 1399e03..1abe51f 100644 --- a/bt_comm/client/TrafficControl.py +++ b/bt_comm/client/TrafficControl.py @@ -45,16 +45,16 @@ class TrafficControl: def get_upload_limit_commands(self): upload_limit_commands = [] upload_limit_commands.append("tc qdisc add dev %s root handle 1: htb default 90" % (self.vm_iface)) - upload_limit_commands.append("tc class add dev %s parent 1: classid 1:1 htb rate %dkb ceil %dkb" % (self.vm_iface, self.upload_limit, self.upload_limit)) + upload_limit_commands.append("tc class add dev %s parent 1: classid 1:1 htb rate %skb ceil %skb" % (self.vm_iface, self.upload_limit, self.upload_limit)) upload_limit_commands.append("tc qdisc add dev %s parent 1:1 handle 10: sfq perturb 10" % (self.vm_iface)) - upload_limit_commands.append("tc filter add dev %s parent 1:0 protocol ip u32 match ip dst %s match ip dport %d 0xffff classid 1:1" % (self.vm_iface, self.vm_address, self.vm_port)) + upload_limit_commands.append("tc filter add dev %s parent 1:0 protocol ip u32 match ip dst %s match ip dport %s 0xffff classid 1:1" % (self.vm_iface, self.vm_address, self.vm_port)) return upload_limit_commands def get_download_limit_commands(self): download_limit_commands = [] download_limit_commands.append("tc qdisc add dev %s root handle 1: htb default 90" % (self.host_iface)) - download_limit_commands.append("tc class add dev %s parent 1: classid 1:1 htb rate %dkb ceil %dkb" % (self.host_iface, self.download_limit, self.download_limit)) + download_limit_commands.append("tc class add dev %s parent 1: classid 1:1 htb rate %skb ceil %skb" % (self.host_iface, self.download_limit, self.download_limit)) download_limit_commands.append("tc qdisc add dev %s parent 1:1 handle 10: sfq perturb 10" % (self.host_iface)) download_limit_commands.append("tc filter add dev %s parent 1:0 protocol ip u32 match ip src %s classid 1:1" % (self.host_iface, self.vm_address)) diff --git a/bt_comm/client/run.sh b/bt_comm/client/run.sh new file mode 100755 index 0000000..9a41b17 --- /dev/null +++ b/bt_comm/client/run.sh @@ -0,0 +1,2 @@ +#!/bin/bash +PYTHONPATH=/home/marius/cs-p2p-next/bt_comm/:/home/marius/cs-p2p-next/bt_comm/xml/ python Client.py diff --git a/bt_comm/xml/XMLParser.py b/bt_comm/xml/XMLParser.py index a9e0cda..7f6565c 100644 --- a/bt_comm/xml/XMLParser.py +++ b/bt_comm/xml/XMLParser.py @@ -9,7 +9,9 @@ class SwarmInstance: self.download_limit = list[3]; self.download_dir = list[4]; self.log_dir = list[5]; - self.output_dir = list[6]; + self.log_file = list[6]; + self.output_dir = list[7]; + self.output_file = list[8]; def __str__(self): return '[%s: %s: %s: %s: %s: %s: %s: %s]' \ @@ -34,13 +36,16 @@ class Node: return '[%s: %s: %s: %s: %s: %s %s: %s]' \ %(self.id, self.public_address, self.public_port, self.private_address, \ self.private_port, self.ssh_port, self.username, self.clients_base_dir); + + def getClientBaseDir(self, client): + return self.clients_base_dir[client] class Nodes: def __init__(self, nodes_xml): try: tree = etree.parse(nodes_xml) root = tree.getroot() - self.nodes = {} + self.nodes = [] for elem in root: id = elem.get("id") list = [elem2.text for elem2 in elem[:len(elem)-1]] @@ -49,35 +54,56 @@ class Nodes: client_id = elem2.get("id"); client_paths [client_id]=elem2[0].text; list.append(client_paths); - self.nodes[id] = Node(id, list) + self.nodes.append(Node(id, list)) except IOError as e: print e return None def getNode(self, id): - return self.nodes[id]; + for node in self.nodes: + if node.id == id: + return node + + def getNodeBySi(self, si): + for node in self.nodes: + if node.id == si.node_id: + return node + + def getNodes(self): + return self.nodes + class Swarm: def __init__(self, swarm_xml): try: tree = etree.parse(swarm_xml) root = tree.getroot() - self.swarm = {} + self.swarm = [] self.torrent_file = root[0].text for elem in root[1:]: id = elem.get("id") list = [elem2.text for elem2 in elem[:len(elem)-1]] - self.swarm[id] = SwarmInstance(id, list); - print self.swarm[id] + self.swarm.append(SwarmInstance(id, list)) except IOError as e: print e return None def getSIByNode(self, node): - for si in self.swarm.values(): + for si in self.swarm: if si.node_id == node.id: return si - -#sw = Swarm("swarm.xml"); -#print sw.torrent_file -#print sw.getSIByNode("1") + + def getTorrentFile(self): + return self.torrent_file + + def getSIs(self): + return self.swarm + +#~ swarm = Swarm("swarm.xml") +#~ for si in swarm.getSIs(): + #~ print si + +#~ nodes = Nodes("nodes.xml") +#~ for node in nodes.getNodes(): + #~ print node + #~ print node.getClientBaseDir('libtorrent') \ No newline at end of file diff --git a/bt_comm/xml/nodes.xml b/bt_comm/xml/nodes.xml index 68b1303..3f7c0e9 100644 --- a/bt_comm/xml/nodes.xml +++ b/bt_comm/xml/nodes.xml @@ -3,10 +3,10 @@ 141.85.224.201 10150 - + eth0 172.30.10.0 10150 - + venet0 10122 gogu @@ -25,10 +25,10 @@ 141.85.224.202 10250 - + eth0 172.30.20.0 10250 - + venet0 10222 gicu diff --git a/bt_comm/xml/swarm.xml b/bt_comm/xml/swarm.xml index 3a665b9..3862fbe 100644 --- a/bt_comm/xml/swarm.xml +++ b/bt_comm/xml/swarm.xml @@ -3,12 +3,14 @@ cucu 1 - triber + tribler 512 256 - /this/dir - - + /download/dir + /log/dir + log_file + /output/dir + output_file @@ -21,11 +23,13 @@ 2 transmission - - - - - + 128 + 64 + /download/dir + /log/dir + log_file + /output/dir + output_file @@ -33,17 +37,4 @@ - - 3 - libtorrent - - - - - - - - - - -- 2.20.1