--- /dev/null
+import os,sys
+import signal
+import logging
+from Util import *
+
+from BitTorrentClientRun import *
+from TransmissionRun import *
+from TriblerRun import *
+from HrktorrentRun import *
+from TriblerStatusParser import *
+
+class BTClientsControl:
+
+ """
+ Controls BitTorrent Clients and manages their output.
+ 2010, Adriana Draghici, adriana.draghici@cti.pub.ro
+ """
+ processes_fd = {} # keeps lists of file descriptors for each process
+ processes_info = {} # keeps lists of BT clients name, torrent name and status log name for each process
+
+ logger = None
+
+ def __init__(self):
+ logging.basicConfig(level = logging.DEBUG)
+ self.logger = logging.getLogger('autorun.BTClientsControl')
+ self.logger.setLevel(logging.DEBUG)
+
+ def start_bt_client(self, bt_client_data):
+ """ Starts a process for a BitTorrent client and returns its pid.
+ @return: -1, if any error is encountered
+ """
+
+ btcr = None
+
+ if bt_client_data[CLIENT] == TRANSMISSION:
+ btcr = TransmissionRun(bt_client_data[BASE_DIR])
+ elif bt_client_data[CLIENT] == TRIBLER:
+ btcr = TriblerRun(bt_client_data[BASE_DIR])
+ elif bt_client_data[CLIENT] == HRKTORRENT:
+ btcr = HrktorrentRun(bt_client_data[BASE_DIR])
+ else:
+ self.error_msg = "Unknown BitTorrent client: " + bt_client_data[CLIENT] + "."
+ return -1
+ try:
+ btcr.config_run(bt_client_data[DL_DIR], bt_client_data[OUT_DIR],
+ bt_client_data[OUT_FILE], bt_client_data[LOG_DIR],
+ bt_client_data[LOG_FILE], bt_client_data[PORT],
+ bt_client_data[DL_LIMIT], bt_client_data[UP_LIMIT],
+ bt_client_data[TORRENT], )
+
+ btcr.start()
+ [pid, log_fd, output_fd] = btcr.run_client(btcr.simple_run_command, bt_client_data[CLIENT])
+ self.processes_fd[pid] = [log_fd, output_fd]
+ self.processes_info[pid] = [bt_client_data[CLIENT],
+ bt_client_data[TORRENT],
+ bt_client_data[OUT_DIR] + "/" + bt_client_data[OUT_FILE]]
+
+ self.logger.debug(" started client with pid = " + str(pid))
+ return pid
+
+ except Exception, ex:
+ self.logger.error(" Exception occured: " + str(ex))
+ self.error_msg = "Error occurred when starting the BT Client: " + str(ex) +"."
+ return -1
+
+ def stop_bt_client(self, pid):
+ """ Stops a BT client by killing it."""
+
+ self.error_msg = ""
+ try:
+ int_pid = int(pid)
+ if self.check_pid(pid) == -1:
+ return -1
+ os.kill(int_pid, signal.SIGKILL) # kill generates zombies
+ os.wait()
+ self.processes_fd[int_pid][0].close()
+ self.processes_fd[int_pid][1].close()
+
+ del self.processes_fd[int_pid]
+ del self.processes_info[int_pid]
+
+ self.logger.debug(" killed process with pid = " + str(pid))
+ except Exception ,ex:
+ self.logger.error(" Exception occured: " + str(ex))
+ self.error_msg = "Could not kill process with pid " + str(pid)
+ return -1
+
+ def resume_bt_client(self, pid):
+ self.error_msg = ""
+ try:
+ if self.check_pid(int(pid)) == -1:
+ return -1
+ os.kill(int(pid),signal.SIGCONT)
+ self.logger.debug("resumed process with pid = " + str(pid))
+ return 0
+ except Exception ,ex:
+ self.logger.error(" Exception occured: " + str(ex))
+ self.error_msg = "Could not resume process with pid " + str(pid)
+ return -1
+
+ def suspend_bt_client(self, pid):
+ self.error_msg = ""
+ try:
+ if self.check_pid(pid) == -1:
+ return -1
+ os.kill(int(pid),signal.SIGSTOP)
+ self.logger.debug(" Suspended process with pid = " + str(pid))
+ return 0
+ except Exception ,ex:
+ self.logger.error(" Exception occured: " + str(ex))
+ self.error_msg = "Could not suspend process with pid " + str(pid)
+ return -1
+
+ def check_pid(self, pid):
+ try:
+ int_pid = int(pid)
+ if int_pid == -1 or int_pid not in self.processes_fd.keys():
+ raise Exception
+ except Exception,ex: # conversion exception might occur if pid is not a number
+ self.error_msg = "Invalid pid " + str(pid)
+ return -1
+ return 0
+
+
+ def get_error_msg(self):
+ return self.error_msg
import os.path
import time
import pickle
-import signal
import struct
import logging
from daemon import Daemon
from Util import *
-from Util import SERVER_HOST, SERVER_PORT
-from BitTorrentClientRun import *
-from TransmissionRun import *
-from TriblerRun import *
-from HrktorrentRun import *
-from TriblerStatusParser import *
+from Util import SERVER_PORT
+from BTClientsControl import BTClientsControl
-class MyDaemon(Daemon):
+class ServerDaemon(Daemon):
"""
Server class
2010, Adriana Draghici, adriana.draghici@cti.pub.ro
ip = ""
port = 0
- processes_fd = {} # keeps lists of file descriptors for each process
- processes_info = {} # keeps lists of BT clients name, torrent name and status log name for each process
logger = None
- START_ERR1 = -1 # returned when starting a client, if that client is not supported
- START_ERR2 = -2 # an exception occured when creating a client process
def __init__(self, pidfile, ip='', stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
Daemon.__init__(self, pidfile, stdin, stdout, stderr)
self.logger.debug(" sent message: " + str(data))
- def save_download_info(self, data):
- """ Stores data about log files in a text file. """
- try:
- f = open(SESSIONS_FILE, 'a')
- #f.write ("# session " + strftime("%d-%m-%Y %H:%M:%S", localtime())+ "\n")
- num_lines = len(data.keys())
- if PORT in data:
- num_lines = num_lines -1;
- if BASE_DIR in data:
- num_lines = num_lines -1;
-
- f.write(str(num_lines)+"\n")
- for k in data.keys():
- if k == PORT or k == BASE_DIR:
- continue
- f.write(str(k)+":"+str(data[k])+"\n")
-
- f.close()
- except Exception, ex:
- self.logger(" Exception while saving transfer info: " + str(ex))
- raise
-
-
- def create_archive_file(file_path):
- try:
- print "Archive file: ", file_path
-
- if os.path.isfile(file_path):
- # archive file
- archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
- command = "zip " + archive_name + " " + file_path
- print command
- os.system(command)
- # remove file
- command = "rm " + file_path
- os.system(command)
- return 0
- return -1
- except Exception,ex:
- raise
-
-
- def create_archive_dir_logs(dir_path):
- try:
- if not os.path.isfile(dir_path):
- # archive all log files
- archive_name = dir_path + "/" + "hrk_" + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
- command = "zip "+ archive_name + " " + dir_path + "/libtorrent*" + " " + dir_path + "/*log"
- print command
- os.system(command)
-
- # remove log files
- command = "rm " + dir_path + "/libtorrent*" + " " + dir_path + "/*log"
- os.system(command)
- return 0
- return -1
- except Exception,ex:
- raise
-
- def add_to_output_msg(self, file_list, info_dict):
- """ Constructs a list with log files info - paths, client, metafile
- @param file_list - list to which the created logs info list is appended. """
-
- if CLIENT not in info_dict and TORRENT not in info_dict:
- return -1
- info_list = [info_dict[CLIENT],info_dict[TORRENT]]
- if LOG_DIR in info_dict:
- info_list.append((info_dict[LOG_DIR], LOG_DIR))
- if OUT_DIR in info_dict:
- info_list.append((info_dict[OUT_DIR], OUT_DIR))
- if OUT_FILE in info_dict:
- info_list.append((info_dict[OUT_FILE], OUT_FILE))
- if LOG_FILE in info_dict:
- info_list.append((info_dict[LOG_FILE], LOG_FILE))
-
- self.logger.debug(" read transfer log info: " + str(info_list))
-
- file_list.append (info_list)
- return 0 # success
-
- def archive_files(self, info_dict, archives_list):
- """ Archives status and verbose message log files. """
- err_msg = ""
- try:
- if LOG_DIR in info_dict:
- if info_dict[CLIENT] == HRKTORRENT:
- if create_archive_dir_logs(info_dict[DL_DIR]) < 0:
- err_msg = "Could not archive files from dir " + file_path
- else: archives_list.append(info_dict[DL_DIR] + "/*" )
- else:
- if LOG_FILE in info_dict:
- file_path = info_dict[LOG_DIR] + "/" + info_dict[LOG_FILE]
- if create_archive_file(file_path) < 0:
- err_msg = "File does not exist: " + file_path
- else: archives_list.append(file_path)
-
- if OUT_DIR in info_dict and OUT_FILE in info_dict:
- file_path = info_dict[OUT_DIR] + "/" + info_dict[OUT_FILE]
- if create_archive_file(file_path) < 0:
- err_msg = "File does not exist: " + file_path
- else: archives_list.append(file_path)
-
- if err_msg != "":
- self.logger.error(err_msg)
-
- except Exception, ex:
- self.logger.error("Exception while archiving files: " + str(ex))
- raise
-
- def read_download_info(self, file_list, create_archive = False, archives_list = []):
- """ Reads all the contents of the file that stores info about
- self.logger files and folders.
- """
- if not os.path.exists(SESSIONS_FILE):
- self.logger.error(" No sessions_file")
- return -1
- else:
- f = open(SESSIONS_FILE,"r")
- line = f.readline()
- size = 0
- info_dict = {}
- dicts = [] #list with all info_dict dictionaries
- try:
- while line != '':
- if size == 0 :
- if info_dict != {} and info_dict not in dicts:
- if self.add_to_output_msg(file_list, info_dict) < 0:
- raise Exception
- dicts.append(info_dict)
- if create_archive:
- self.archive_files(info_dict,archives_list)
-
- size = int(line)
- info_dict = {}
- else:
- parts = line.strip().split(':')
- if len(parts) != 2:
- raise Exception
- info_dict[parts[0]] = parts[1]
- size = size - 1
- line = f.readline()
-
- if size == 0 and info_dict != {} and info_dict not in dicts:
- if self.add_to_output_msg(file_list, info_dict) < 0:
- raise Exception
- if create_archive:
- self.archive_files(info_dict, archives_list)
-
- except Exception, e:
- self.logger.error(" wrong file type for sessions file\n" + str(e))
- f.close()
- return -1
-
- f.close()
- return 0
-
-
- def get_client_status(self, transfer_id, line_parts = []):
- """
- Runs a parser for the obtaining the last line from the
- torrent transfer's status file (output file).
- @param transfer_id identifier for the transfer
- @param line_parts list with line's components
- @return error message if any
- """
-
- if int(transfer_id) not in self.processes_info:
- return "Invalid transfer id: " + str(transfer_id)
- status_file_path = self.processes_info[transfer_id][2]
- client = self.processes_info[transfer_id][0]
- if client == TRIBLER:
- parser = TriblerStatusParser(status_file_path)
- elif client == HRKTORRENT:
- parser = TriblerStatusParser(status_file_path)
- else:
- return "Functionality not supported for client "+ client
- line_parts.append(parser.parse_last_status_line(""))
- if line_parts == "":
- return "Error occured while reading status file " + status_file_path
- return ""
-
- def start_bt_client(self, bt_client_data):
- """ Starts a process for a BitTorrent client and returns its pid.
- @return: -1, if any error is encountered
- """
-
- btcr = None
-
- if bt_client_data[CLIENT] == TRANSMISSION:
- btcr = TransmissionRun(bt_client_data[BASE_DIR])
- elif bt_client_data[CLIENT] == TRIBLER:
- btcr = TriblerRun(bt_client_data[BASE_DIR])
- elif bt_client_data[CLIENT] == HRKTORRENT:
- btcr = HrktorrentRun(bt_client_data[BASE_DIR])
- else:
- return self.START_ERR1
- try:
- btcr.config_run(bt_client_data[DL_DIR], bt_client_data[OUT_DIR],
- bt_client_data[OUT_FILE], bt_client_data[LOG_DIR],
- bt_client_data[LOG_FILE], bt_client_data[PORT],
- bt_client_data[DL_LIMIT], bt_client_data[UP_LIMIT],
- bt_client_data[TORRENT], )
-
- btcr.start()
- [pid, log_fd, output_fd] = btcr.run_client(btcr.simple_run_command,
- bt_client_data[CLIENT])
- self.processes_fd[pid] = [log_fd, output_fd]
- self.processes_info[pid] = [bt_client_data[CLIENT],
- bt_client_data[TORRENT],
- bt_client_data[OUT_DIR] + "/" + bt_client_data[OUT_FILE]]
-
- self.logger.debug(" started client with pid = " + str(pid))
- return pid
-
- except Exception, ex:
- self.logger.error(" Exception occured: " + str(ex))
- return self.START_ERR2
-
- def stop_bt_client(self, pid):
- """ Stops a BT client by killing it."""
- try:
- int_pid = int(pid)
- if int_pid == -1 or int_pid not in self.processes_fd.keys():
- return "Invalid pid " + str(pid)
- os.kill(int_pid, signal.SIGKILL) # kill generates zombies
- os.wait()
- self.processes_fd[int_pid][0].close()
- self.processes_fd[int_pid][1].close()
-
- del self.processes_fd[int_pid]
- del self.processes_info[int_pid]
-
- self.logger.debug(" killed process with pid = " + str(pid))
- return ""
-
- except Exception ,ex:
- self.logger.error(" Exception occured: " + str(ex))
- return "Could not kill process with pid " + str(pid)
def set_linger(self,sock, l_onoff, l_linger):
"""Sets the SO_LINGER value on a socket."""
def do_Server(self, ip, port):
""" Accepts socket connections and receives messages from commander."""
+ btcc = BTClientsControl()
+ err_msg = ""
+
try:
self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.logger.debug( " host ip = %s, port = %s"%(ip,SERVER_PORT))
self.serversocket.listen(10) #max 10 requests
self.set_linger(self.serversocket,1, 0)
except Exception, e:
- self.logger.error("Exception occured while connecting: " + str(e))
+ self.logger.error(" Exception occured while connecting: " + str(e))
return
while(1):
try:
self.logger.debug(" accepted connection from " +str(address))
msg = self.recv_pickled_data(clientsock)
- err_msg = ''
response = ''
if msg[0] == START_MSG:
bt_client_data = msg[1]
- response = self.start_bt_client(bt_client_data) #returns client_pid
- if response == self.START_ERR2:
- err_msg = "Error occurred while starting client"
- elif response == self.START_ERR1:
- err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported."
- else:
- try:
- self.save_download_info(bt_client_data)
- except Exception:
- err_msg = "Client started but transfer info could not be saved."
-
+ response = btcc.start_bt_client(bt_client_data) #returns client_pid
+ if response < -1:
+ err_msg = btcc.get_error_msg()
+
+ #try:
+ # self.save_download_info(bt_client_data)
+ #except Exception:
+ # err_msg = "Client started but transfer info could not be saved."
elif msg[0] == STOP_MSG:
client_pid = msg[1]
- err_msg = self.stop_bt_client(client_pid)
-
- elif msg[0] == GET_OUTPUT:
- response = []
- if self.read_download_info(response) < 0:
- err_msg = "Error encountered while reading file"
-
- elif msg[0] == GET_CLIENTS:
- response = []
- print self.processes_info
- try:
- for k in self. processes_info.keys():
- response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file
- except Exception:
- err_msg = "Error: could not retrive running clients list"
-
- elif msg[0] == ARCHIVE:
- response = []
- if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING
- try :
- if self.read_download_info([], True, response) < 0: # read file path and create archives
- err_msg = "Error: could not archive log files."
- else: # erase sessions file
- f = open(SESSIONS_FILE, "w")
- f.close()
- except Exception , ex:
- err_msg = "Error: could not archive log files."
- else:
- err_msg = "Cannot archive logs while clients are running"
-
- elif msg[0] == GET_STATUS:
- line_parts = []
- err_msg = self.get_client_status(msg[1], line_parts)
- if len(err_msg) == 0:
- response = {}
-
- print len(line_parts)
- if len(line_parts[0]) >= 7:
- response[TIMESTAMP] = line_parts[0][0]
- response[NUM_PEERS] = line_parts[0][1]
- response[DHT] = line_parts[0][2]
- response[DL_SPEED] = line_parts[0][3]
- response[UP_SPEED] = line_parts[0][4]
- response[DL_SIZE] = line_parts[0][5]
- response[ETA] = line_parts[0][6]
+ response = btcc.stop_bt_client(client_pid)
+ if response < -1:
+ err_msg = btcc.get_error_msg()
+ else: response = ""
+
+ elif msg[0] == RESUME_MSG:
+ client_pid = msg[1]
+ response = btcc.resume_bt_client(client_pid)
+ if response < -1:
+ err_msg = btcc.get_error_msg()
+ else: response = ""
+ elif msg[0] == SUSPEND_MSG:
+ client_pid = msg[1]
+ response = btcc.suspend_bt_client(client_pid)
+ if response < -1:
+ err_msg = btcc.get_error_msg()
+ else: response = ""
+
else:
err_msg = "Error: wrong message type";
clientsock.close()
except Exception, ex:
- self.logger.error("Exception occured: "+ str(ex))
+ self.logger.error(" Exception occured: "+ str(ex))
if __name__ == "__main__":
if len(sys.argv) >= 2:
-
-
if 'start' == sys.argv[1]:
if(len(sys.argv) != 3):
print "usage:\n\t %s start <host_ip> " % sys.argv[0]
sys.exit(2)
- daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err')
+ daemon = ServerDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err')
daemon.start()
elif 'stop' == sys.argv[1]:
- daemon = MyDaemon(DAEMON_PID_PATH)
+ daemon = ServerDaemon(DAEMON_PID_PATH)
daemon.stop()
elif 'restart' == sys.argv[1]:
- daemon = MyDaemon(DAEMON_PID_PATH)
+ daemon = ServerDaemon(DAEMON_PID_PATH)
daemon.restart()
elif 'no' == sys.argv[1]: #no daemon
- daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err')
+ daemon = ServerDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err')
daemon.do_Server(sys.argv[2],"")
else:
print "Unknown command"