from threading import Thread
from CommanderBase import *
-MSGLEN = 4096
-DEBUG = True
-
class Commander(Thread):
def __init__(self, nodes_xml, swarm_xml):
Thread.__init__(self)
self.nodes = Nodes(nodes_xml);
self.swarm = Swarm(swarm_xml);
+ self.tc = TrafficControl("openvz");
+ self.sshc = paramiko.SSHClient()
+ self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ #self.sshc.load_system_host_keys()
self.Commander = CommanderBase();
+ self.commands = {'list': [self.list, "lists the nodes in xml file"]}
+
+# def list(self):
+# print "shit"
- def startNodeServer(self, node):
- print node.ssh_port, node.username, node.daemon_dir, node.public_address
- self.Commander.startServer(node.public_address,
- int(node.ssh_port),
- node.private_address,
- node.username);
+# def listCommands(self):
+# print "List of commands:"
+# for k, v in self.commands.items():
+# print "\t",k,"\t",v[1]
- def getNodeStatus(self, node):
- self.Commander.getClients(node.public_address,
- int(node.listen_port))
-
- def getNodeOutput(self, node):
- self.Commander.getOutput(node.public_address,
- int(node.listen_port))
-
+ def sendSSHComm(self, hostname, username, port, comm):
+ try:
+ self.sshc.connect(hostname=hostname,
+ username=username,
+ port=port)
+ stdin, stdout, stderr = self.sshc.exec_command(comm)
+ print stdout.readlines()
+ except Exception, e:
+ print e
+ finally:
+ self.sshc.close()
+
+ def sendMultipleSSHComm(self, hostname, username, port, comms):
+ try:
+ self.sshc.connect(hostname=hostname,
+ username=username,
+ port=port)
+ for c in comms:
+ stdin, stdout, stderr = self.sshc.exec_command(comm)
+ except Exception, e:
+ print e
+ finally:
+ self.sshc.close()
+
def applyNodeTC(self, node):
si = self.swarm.getSIByNode(node)
- self.Commander.applyTC(node.public_address,
+ self.tc.config(node.public_address,
node.public_port,
node.public_iface,
node.private_address,
node.private_port,
- node.private_iface,
- si.upload_limit,
- si.download_limit)
+ node.private_iface)
+ self.tc.set_upload_limit(si.upload_limit)
+ self.tc.set_download_limit(si.download_limit)
+
+ commands = self.tc.get_all_commands()
+ #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands)
+ #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands)
+
+ def startNodeServer(self, node):
+ comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + node. private_address
+ print node.ssh_port, node.username, node.daemon_dir, node.public_address
+ self.sendSSHComm(node.public_address,
+ node.username,
+ int(node.ssh_port),
+ comm)
def startSI(self, si):
node = self.nodes.getNodeBySi(si)
+ if not node:
+ print "[ERROR] Swarm Instance unknown! Check XML config files!"
+
base_path = node.getClientBaseDir(si.btclient)
torrent_file = self.swarm.getTorrentFile()
config_data = { CLIENT : si.btclient,
- BASE_DIR : base_path,
- UP_LIMIT : si.download_limit,
- DL_LIMIT : si.upload_limit,
- PORT : node.public_port,
- DL_DIR : si.download_dir,
- LOG_DIR : si.log_dir,
- OUT_FILE : si.output_file,
- LOG_FILE : si.log_file,
- TORRENT : torrent_file }
+ BASE_DIR : base_path,
+ UP_LIMIT : si.download_limit,
+ DL_LIMIT : si.upload_limit,
+ PORT : node.public_port,
+ DL_DIR : si.download_dir,
+ LOG_DIR : si.log_dir,
+ OUT_FILE : si.output_file,
+ OUT_DIR : si.output_dir,
+ LOG_FILE : si.log_file,
+ TORRENT : torrent_file }
- ret = self.Commander.startClient(node.public_address,
- int(node.listen_port),
- config_data)
+ ret = self.Commander.start(node.public_address,
+ int(node.listen_port),
+ config_data)
print ">>>>>>>>", ret
if ret:
si.pid = ret
+ si.running = True
+
def stopSI(self, si):
#self.printDummyCommand(node.public_address, node.listen_port, START_MSG, config_data)
node = self.nodes.getNodeBySi(si)
- self.Commander.stop(node.public_address,
- int(node.listen_port),
- si.pid)
-
+ ret = self.Commander.stop(node.public_address,
+ int(node.listen_port),
+ si.pid)
+
+ def getNodeStatus(self, node):
+ self.Commander.getClients(node.public_address,
+ int(node.listen_port))
+
+ def getNodeOutput(self, node):
+ self.Commander.getOutput(node.public_address,
+ int(node.listen_port))
+
+ def nodeArchive(self, node):
+ self.Commander.archive(node.public_address,
+ int(node.listen_port))
+
def cleanNode(self, node):
pass
def getSIStatus(self, si):
pass
- def startAll(self):
+ def startAllServers(self):
for node in self.nodes.getNodes():
- #self.applyTC(node)
self.startNodeServer(node)
- #self.getNodeStatus(node)
+ def startAll(self):
for si in self.swarm.getSIs():
self.startSI(si)
def statusAll(self):
- for node in self.node.getNodes():
+ for node in self.nodes.getNodes():
self.getNodeStatus(node)
+
+ def archiveAll(self):
+ for node in self.nodes.getNodes():
+ self.nodeArchive(node);
def outputAll(self):
- for node in self.node.getNodes():
+ for node in self.nodes.getNodes():
self.getNodeOutput(node)
def stopAll(self):
def run(self):
self.printUsage()
+ #self.listCommands()
+ #(self.commands['list'][0])()
while True:
- k = raw_input("Choose option:")
+ k = raw_input("Choose option: ")
if k == '': break
- elif k == '1':
+ if k == '1':
+ self.startAllServers()
+ if k == '2':
self.startAll()
- elif k == '2':
+ if k == '3':
+ self.statusAll()
+ if k == '4':
+ self.outputAll()
+ if k == '5':
+ self.archiveAll()
+ if k == '6':
self.stopAll()
def printUsage(self):
print "=========Commander Options=========="
- print "1. Start Swarm"
- print "2. Stop Swarm"
+ print "1. Start Servers"
+ print "2. Start Swarm"
+ print "3. Status Swarm"
+ print "4. Output Swarm"
+ print "5. Archive Swarm"
+ print "6. Stop Swarm"
print "===================================="
if __name__ == "__main__":
from threading import Thread
paramiko.util.log_to_file('/tmp/paramiko.log')
-MSGLEN = 4096
-DEBUG = True
-
class CommanderBase():
- def __init__(self):
- self.tc = TrafficControl("openvz");
- self.sshc = paramiko.SSHClient()
- self.sshc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- #self.sshc.load_system_host_keys()
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- def sendSSHComm(self, hostname, username, port, comm):
- try:
- self.sshc.connect(hostname=hostname,
- username=username,
- port=port)
- stdin, stdout, stderr = self.sshc.exec_command(comm)
- print stdout.readlines()
- except Exception, e:
- print e
- finally:
- self.sshc.close()
-
- def sendMultipleSSHComm(self, hostname, username, port, comms):
- try:
- self.sshc.connect(hostname=hostname,
- username=username,
- port=port)
- for c in comms:
- stdin, stdout, stderr = self.sshc.exec_command(comm)
- except Exception, e:
- print e
- finally:
- self.sshc.close()
def sendComm(self, hostname, port, msg_type, config_data):
- #print "sendcomm ", config_data
try:
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((hostname, port))
self.sendMsg(pickle.dumps((msg_type, config_data)))
response = self.recvMsg()
msg = ''
while 1:
chunk = self.sock.recv(BUFFER_SIZE)
- if not chunk:
- break
- #raise RuntimeError, "socket connection broken"
- msg = msg + chunk
- if len(chunk) < BUFFER_SIZE:
- break
+ if not chunk:
+ break
+ msg = msg + chunk
+ if len(chunk) < BUFFER_SIZE:
+ break
dd = pickle.loads(msg)
return dd
-
- # chunk = self.sock.recv(MSGLEN)
- # if chunk == '':
- # raise RuntimeError, "socket connection broken"
- # msg = msg + chunk
- #
- # return msg
-
- def startServer(self, public_address, ssh_port, private_address, username):
- comm = SERVER_TYPE+" "+SERVER_DIR+SERVER_FILE+" "+"start " + private_address
- print comm
- self.printDummyCommand(public_address, ssh_port, username, comm)
- self.sendSSHComm(public_address, username, ssh_port, comm)
def getClients(self, public_address, listen_port):
- msg_data = self.sendComm(public_address,
- listen_port,
- GET_CLIENTS,
- "")
- # lista (id, type, metafile)
- print msg_data
+ msg_type, msg_data = self.sendComm(public_address,
+ listen_port,
+ GET_CLIENTS,
+ "")
+ if msg_type == ACK_MSG:
+ print "[GET_CLIENTS] Client list: ", msg_data
+ return msg_data
+ else:
+ print msg_data
+ return None
def getOutput(self, public_address, listen_port):
- msg_data = self.sendComm(public_address,
- listen_port,
- GET_OUTPUT,
- "")
- # lista (...
- print msg_data
-
- def applyTC(self, public_address, public_port, public_iface, private_address,
- private_port, private_iface, upload_limit, download_limit):
- si = self.swarm.getSIByNode(node)
- self.tc.config(public_address,
- public_port,
- public_iface,
- private_address,
- private_port,
- private_iface)
- self.tc.set_upload_limit(upload_limit)
- self.tc.set_download_limit(download_limit)
-
- commands = self.tc.get_all_commands()
- #self.printDummyCommand(node.public_address, node.public_port, node.ssh_port, commands)
- #sendMultipleSSHComm(node.public_address, node.username, node.ssh_port, commands)
-
-
- def startClient(self, public_address, listen_port, config_data):
msg_type, msg_data = self.sendComm(public_address,
- listen_port,
- START_MSG,
- config_data)
- #print listen_port
+ listen_port,
+ GET_OUTPUT,
+ "")
if msg_type == ACK_MSG:
+ print "[GET_OUTPUT] Client output: ", msg_data
return msg_data
else:
print msg_data
- return -1
+ return None
- def printDummyCommand(self, public_address, public_port, option1, data):
- print "----------------------"
- print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1)
- print "this data: %s" %(data)
- print "----------------------\n"
-
+ def archive(self, public_address, listen_port):
+ msg_type, msg_data = self.sendComm(public_address,
+ listen_port,
+ ARCHIVE,
+ "")
+ if msg_type == ACK_MSG:
+ print "[ARCHIVE] List of files: "
+ print msg_data
+ return msg_data
+ else:
+ print msg_data
+ return None
+
+ def start(self, public_address, listen_port, config_data):
+ msg_type, msg_data = self.sendComm(public_address,
+ listen_port,
+ START_MSG,
+ config_data)
+ if msg_type == ACK_MSG:
+ print "[START] started client with pid: ", msg_data
+ return msg_data
+ else:
+ print msg_data
+ return False
- def stopClient(self, public_adress, listen_port, client_pid):
+ def stop(self, public_address, listen_port, client_pid):
msg_type, msg_data = self.sendComm(public_address,
- listen_port,
- STOP_MSG,
- client_pid)
+ listen_port,
+ STOP_MSG,
+ client_pid)
if msg_type == ACK_MSG:
- "shutdown client OK"
+ print "[STOP] client shutdown OK"
+ return True
else:
- "error while shutting down client"
- print msg_data
-
+ print "error while shutting down client"
+ print msg_data
+ return None
+
def clean(self):
- pass
-
+ pass
+
+ def printDummyCommand(self, public_address, public_port, option1, data):
+ print "----------------------"
+ print "sending to: [%s:%s] with option %s" %(public_address, public_port, option1)
+ print "this data: %s" %(data)
+ print "----------------------\n"
+
\ No newline at end of file
def __init__(self, id, list):
self.id = id;
self.pid = -1;
+ self.running = False;
self.node_id = list[0];
self.btclient = list[1];
self.upload_limit = list[2];
self.daemon_dir = list[9];
self.daemon_file = list[10];
self.clients_base_dir = list[11];
-# self.clients = {};
-
+
def __str__(self):
return '[%s: %s: %s: %s: %s: %s %s: %s]' \
%(self.id, self.public_address, self.public_port, self.private_address, \
self.nodes.append(Node(id, list))
except IOError, e:
print e
- return None
def getNode(self, id):
for node in self.nodes:
self.swarm.append(SwarmInstance(id, list))
except IOError, e:
print e
- return None
def getSIByNode(self, node):
for si in self.swarm:
<daemon_file>Server.py</daemon_file>
<clients>
<client id="tribler">
- <base>/home/p2p/p2p-clients/tribler/</base>
+ <base>/home/p2p/p2p-clients/tribler</base>
</client>
<client id="libtorrent">
- <base>/home/p2p/p2p-clients/libtorrent/</base>
+ <base>/home/p2p/p2p-clients/libtorrent</base>
</client>
<client id="transmission">
- <base>/home/p2p/p2p-clients/transmission/</base>
+ <base>/usr/bin</base>
</client>
</clients>
</node>
<?xml version="1.0" encoding="ISO-8859-1"?>
<swarm>
<torrent_file>/home/p2p/p2p-meta/bbt316.torrent</torrent_file>
- <instance id="1">
+ <instance id="1">
<node>1</node>
<client>tribler</client>
<upload_limit>512</upload_limit>
<action type="start" delay="00:25:00" />
<action type="start" delay="end" />
</actions>
- </instance>
+ </instance>
</swarm>