From 835c7928a7f8f40d06cc817d9853a4ee16824417 Mon Sep 17 00:00:00 2001 From: Adriana Draghici Date: Thu, 6 May 2010 20:18:01 +0000 Subject: [PATCH] autorun: changes to support hrktorrent --- autorun/PROTOCOL | 7 +- autorun/Util.py | 49 ++++-- autorun/server/BitTorrentClientRun.py | 19 ++- autorun/server/Client.py | 19 +-- autorun/server/HrktorrentRun.py | 4 +- autorun/server/Server.py | 220 +++++++++++++------------- 6 files changed, 173 insertions(+), 145 deletions(-) diff --git a/autorun/PROTOCOL b/autorun/PROTOCOL index 287c984..54d3873 100644 --- a/autorun/PROTOCOL +++ b/autorun/PROTOCOL @@ -56,9 +56,10 @@ Serverul pastreaza un fisier cu info despre toate fisierele downloadate: DL_LIMIT:"256", PORT:"9999", DL_DIR:"/this/dir", - LOG_DIR:"/this/dir", - OUT_FILE: "output_file", ??? nu stim exact scopul lui - LOG_FILE:"log_file", + LOG_DIR:"/this/dir", !!! "" dc nu trebuie specificat + LOG_FILE:"log_file", !!! "" dc nu trebuie specificat (eg. pt hrktorrent) + OUT_DIR:"/this/dir", + OUT_FILE: "output_file", TORRENT: "torrent file" } <----- tuplu (ACK, pid_client) / (ERROR, error_msg) diff --git a/autorun/Util.py b/autorun/Util.py index 279afce..a27ced2 100644 --- a/autorun/Util.py +++ b/autorun/Util.py @@ -4,9 +4,8 @@ DAEMON_PID_PATH = "/var/tmp/autorun-server.pid" SERVER_DIR = "/home/p2p/cs-p2p-next/autorun/server/" SERVER_FILE = "Server.py" SERVER_TYPE = "python" -SERVER_PORT = 10004 +SERVER_PORT = 10004 SERVER_HOST = "172.16.20.3" - import os def get_sessions_file_path(): dirpath = os.environ['HOME']+"/"+".autorun/" @@ -60,19 +59,41 @@ ETA = "eta" DHT = "dht" NUM_PEERS = "num_peers" + import os.path import time def create_archive_file(file_path): - print "Archive file: ", file_path - if os.path.isfile(file_path): - # archive file - archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip" - command = "zip " + archive_name + " " + file_path - print command - os.system(command) - # remove file - command = "rm " + file_path - os.system(command) - return 0 - return -1 + try: + print "Archive file: ", file_path + + if os.path.isfile(file_path): + # archive file + archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip" + command = "zip " + archive_name + " " + file_path + print command + os.system(command) + # remove file + command = "rm " + file_path + os.system(command) + return 0 + return -1 + except Exception,ex: + raise + + +def create_archive_dir_logs(dir_path): + try: + if not os.path.isfile(dir_path): + # archive all log files + archive_name = dir_path + "/" + "hrk_" + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip" + command = "zip "+ archive_name + " " + dir_path + "/libtorrent*" + " " + dir_path + "/*log" + print command + os.system(command) + # remove log files + command = "rm " + dir_path + "libtorrent *" + " " + dir_path + "/*log" + os.system(command) + return 0 + return -1 + except Exception,ex: + raise diff --git a/autorun/server/BitTorrentClientRun.py b/autorun/server/BitTorrentClientRun.py index c336e03..7115372 100644 --- a/autorun/server/BitTorrentClientRun.py +++ b/autorun/server/BitTorrentClientRun.py @@ -64,7 +64,6 @@ class BitTorrentClientRun: try: # split command args = shlex.split(command) - # remove redirectation parameters for i in range(0, len(args)): if args[i].find(">") > -1 : @@ -72,18 +71,18 @@ class BitTorrentClientRun: args.pop(i) break; #self.my_logger.debug(" BitTorrentClientRun: command =" + str(args)) - - log_redirect = open(self.log_dir+"/"+self.log_file,"a") - output_redirect = open(self.output_dir+"/"+self.output_file,"a") - + output_redirect = open(self.output_dir+"/"+self.output_file,"w") self.my_logger.debug(" output redirect to file " + self.output_dir+"/"+self.output_file) - self.my_logger.debug(" log redirect to file " + self.log_dir+"/"+self.log_file) - + log_redirect = open("/tmp/err","w") # because hrktorrent does not have logs at stderr output + if self.log_file != "": + log_redirect = open(self.log_dir+"/"+self.log_file,"w") + self.my_logger.debug(" log redirect to file " + self.log_dir+"/"+self.log_file) p=subprocess.Popen(args, shell=False, #does not create sh process - stdout=output_redirect, - stderr=log_redirect) + stdout=output_redirect, + stderr=log_redirect) + pid = p.pid - self.my_logger.debug(" BitTorrentClientRun: pid =" + str(pid)) + self.my_logger.debug(" BitTorrentClientRun: pid = " + str(pid)) return [pid, log_redirect, output_redirect] except Exception, ex: diff --git a/autorun/server/Client.py b/autorun/server/Client.py index e139ba4..a4d6c1a 100644 --- a/autorun/server/Client.py +++ b/autorun/server/Client.py @@ -110,26 +110,26 @@ def test_all_commands(torrent_file): BASE_DIR:"/home/p2p/p2p-clients/hrktorrent", TORRENT: "/home/p2p/p2p-meta/" + torrent_name+".torrent", DL_DIR: "/home/p2p/p2p-dld/hrktorrent", - LOG_DIR: "/home/p2p/p2p-log/hrktorrent", + LOG_DIR: "/home/p2p/p2p-dld/hrktorrent", OUT_DIR: "/home/p2p/p2p-log/hrktorrent", OUT_FILE: torrent_name + ".out", #status messages - LOG_FILE: "hrktorrent-" + torrent_name + ".log", #verbose messages + LOG_FILE: "", #verbose messages } #print s.send_command(GET_OUTPUT, "") - response = s.send_command(START_MSG, start_data_tribler) + """response = s.send_command(START_MSG, start_data_tribler) print response pid1 = response[1] print pid1 - - time.sleep(7) + """ + """time.sleep(7) s = MySocket() s.connect(SERVER_HOST, SERVER_PORT) - response = s.send_command(GET_STATUS, pid1) + response = s.send_command(ARCHIVE,"") print response - + """ """response = s.send_command(START_MSG, start_data_transmission) print response pid2 = response[1] @@ -140,18 +140,19 @@ def test_all_commands(torrent_file): response = s.send_command(GET_CLIENTS,"") print response """ - time.sleep(5) + """time.sleep(3)""" """ s = MySocket() s.connect(SERVER_HOST, SERVER_PORT) response = s.send_command(STOP_MSG, pid2) print response """ + """ s = MySocket() s.connect(SERVER_HOST, SERVER_PORT) response = s.send_command(STOP_MSG, pid1) print response - + """ def test_send_recv(): # test 1 diff --git a/autorun/server/HrktorrentRun.py b/autorun/server/HrktorrentRun.py index 43e79b7..540f494 100644 --- a/autorun/server/HrktorrentRun.py +++ b/autorun/server/HrktorrentRun.py @@ -15,8 +15,8 @@ DEBUG = False class HrktorrentRun(BitTorrentClientRun): def __init__(self, base_path): BitTorrentClientRun.__init__(self, base_path, - "$base_path/hrktorrent --download_dir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file", - "$base_path/hrktorrent --download_dir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file") + "$base_path/hrktorrent --downloaddir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file", + "$base_path/hrktorrent --downloaddir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file") def main(): diff --git a/autorun/server/Server.py b/autorun/server/Server.py index 9da44d2..bd00d64 100644 --- a/autorun/server/Server.py +++ b/autorun/server/Server.py @@ -97,7 +97,7 @@ class MyDaemon(Daemon): f.close() except Exception, ex: self.logger(" Exception while saving transfer info: " + str(ex)) - raise Exception + raise def add_to_output_msg(self, file_list, info_dict): """ Constructs a list with self.logger file info - paths, client, metafile @@ -122,34 +122,32 @@ class MyDaemon(Daemon): def archive_files(self, info_dict, archives_list): """ Archives status and verbose message log files. """ - log_path = "" - out_path = "" err_msg = "" try: if LOG_DIR in info_dict: - log_path = info_dict[LOG_DIR] - if LOG_FILE in info_dict: - file_path = log_path + "/" + info_dict[LOG_FILE] - if create_archive_file(file_path) < -1: - err_msg = "File does not exist: ", file_path - else: archives_list.append(file_path) - - if OUT_DIR in info_dict: - out_path = info_dict[OUT_DIR] - else: - out_path = log_path - - if OUT_FILE in info_dict: - file_path = out_path + "/" + info_dict[OUT_FILE] - if create_archive_file(file_path) < -1: - err_msg = "File does not exist: ", file_path + if info_dict[CLIENT] == HRKTORRENT: + if create_archive_dir_logs(info_dict[DL_DIR]) < 0: + err_msg = "Could not archive files from dir " + file_path + else: archives_list.append(info_dict[DL_DIR] + "/*" ) + else: + if LOG_FILE in info_dict: + file_path = info_dict[LOG_DIR] + "/" + info_dict[LOG_FILE] + if create_archive_file(file_path) < 0: + err_msg = "File does not exist: " + file_path + else: archives_list.append(file_path) + + if OUT_DIR in info_dict and OUT_FILE in info_dict: + file_path = info_dict[OUT_DIR] + "/" + info_dict[OUT_FILE] + if create_archive_file(file_path) < 0: + err_msg = "File does not exist: " + file_path else: archives_list.append(file_path) + if err_msg != "": - self.logger.error(" " + err_msg) + self.logger.error(err_msg) + except Exception, ex: self.logger.error("Exception while archiving files: " + str(ex)) - raise Exception - + raise def read_download_info(self, file_list, create_archive = False, archives_list = []): """ Reads all the contents of the file that stores info about @@ -166,7 +164,6 @@ class MyDaemon(Daemon): dicts = [] #list with all info_dict dictionaries try: while line != '': - print line if size == 0 : if info_dict != {} and info_dict not in dicts: if self.add_to_output_msg(file_list, info_dict) < 0: @@ -261,20 +258,24 @@ class MyDaemon(Daemon): def stop_bt_client(self, pid): """ Stops a BT client by killing it.""" - - int_pid = int(pid) - if int_pid == -1 or int_pid not in self.processes_fd.keys(): - return "Invalid pid " + str(pid) - os.kill(int_pid, signal.SIGKILL) # kill generates zombies - os.wait() - self.processes_fd[int_pid][0].close() - self.processes_fd[int_pid][1].close() - - del self.processes_fd[int_pid] - del self.processes_info[int_pid] + try: + int_pid = int(pid) + if int_pid == -1 or int_pid not in self.processes_fd.keys(): + return "Invalid pid " + str(pid) + os.kill(int_pid, signal.SIGKILL) # kill generates zombies + os.wait() + self.processes_fd[int_pid][0].close() + self.processes_fd[int_pid][1].close() + + del self.processes_fd[int_pid] + del self.processes_info[int_pid] - self.logger.debug(" killed process with pid = " + str(pid)) - return "" + self.logger.debug(" killed process with pid = " + str(pid)) + return "" + + except Exception ,ex: + self.logger.error(" Exception occured: " + str(ex)) + return "Could not kill process with pid " + str(pid) def set_linger(self,sock, l_onoff, l_linger): """Sets the SO_LINGER value on a socket.""" @@ -296,85 +297,89 @@ class MyDaemon(Daemon): self.logger.error("Exception occured while connecting: " + str(e)) return while(1): - self.logger.debug(" accepting connections") - (clientsock, address) = self.serversocket.accept(); - self.logger.debug(" accepted connection from " +str(address)) - - msg = self.recv_pickled_data(clientsock) + try: + self.logger.debug(" accepting connections") + (clientsock, address) = self.serversocket.accept(); + self.logger.debug(" accepted connection from " +str(address)) + + msg = self.recv_pickled_data(clientsock) + + err_msg = '' + response = '' + + if msg[0] == START_MSG: + bt_client_data = msg[1] + response = self.start_bt_client(bt_client_data) #returns client_pid + if response == self.START_ERR2: + err_msg = "Error occurred while starting client" + elif response == self.START_ERR1: + err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported." + else: + try: + self.save_download_info(bt_client_data) + except Exception: + err_msg = "Client started but transfer info could not be saved." + + elif msg[0] == STOP_MSG: + client_pid = msg[1] + err_msg = self.stop_bt_client(client_pid) - err_msg = '' - response = '' - if msg[0] == START_MSG: - bt_client_data = msg[1] - response = self.start_bt_client(bt_client_data) #returns client_pid - if response == self.START_ERR1: - err_msg = "Error occurred while starting client" - elif response == self.START_ERR2: - err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported." - else: + elif msg[0] == GET_OUTPUT: + response = [] + if self.read_download_info(response) < 0: + err_msg = "Error encountered while reading file" + + elif msg[0] == GET_CLIENTS: + response = [] + print self.processes_info try: - self.save_download_info(bt_client_data) + for k in self. processes_info.keys(): + response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file except Exception: - err_msg = "Client started but transfer info could not be saved." - - elif msg[0] == STOP_MSG: - client_pid = msg[1] - err_msg = self.stop_bt_client(client_pid) + err_msg = "Error: could not retrive running clients list" - elif msg[0] == GET_OUTPUT: - response = [] - if self.read_download_info(response) < 0: - err_msg = "Error encountered while reading file" - - elif msg[0] == GET_CLIENTS: - response = [] - print self.processes_info - try: - for k in self. processes_info.keys(): - response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file - except Exception: - err_msg = "Error: could not retrive running clients list" - - elif msg[0] == ARCHIVE: - response = [] - if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING - try : - if self.read_download_info([], True, response) < 0: # read file path and create archives + elif msg[0] == ARCHIVE: + response = [] + if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING + try : + if self.read_download_info([], True, response) < 0: # read file path and create archives + err_msg = "Error: could not archive log files." + else: # erase sessions file + f = open(SESSIONS_FILE, "w") + f.close() + except Exception , ex: err_msg = "Error: could not archive log files." - else: # erase sessions file - f = open(SESSIONS_FILE, "w") - f.close() - except Exception , ex: - err_msg = "Error: could not archive log files." + else: + err_msg = "Cannot archive logs while clients are running" + + elif msg[0] == GET_STATUS: + line_parts = [] + err_msg = self.get_client_status(msg[1], line_parts) + if len(err_msg) == 0: + response = {} + + print len(line_parts) + if len(line_parts[0]) >= 7: + response[TIMESTAMP] = line_parts[0][0] + response[NUM_PEERS] = line_parts[0][1] + response[DHT] = line_parts[0][2] + response[DL_SPEED] = line_parts[0][3] + response[UP_SPEED] = line_parts[0][4] + response[DL_SIZE] = line_parts[0][5] + response[ETA] = line_parts[0][6] + else: - err_msg = "Cannot archive logs while clients are running" - elif msg[0] == GET_STATUS: - line_parts = [] - err_msg = self.get_client_status(msg[1], line_parts) - if len(err_msg) == 0: - response = {} - - print len(line_parts) - if len(line_parts[0]) >= 7: - response[TIMESTAMP] = line_parts[0][0] - response[NUM_PEERS] = line_parts[0][1] - response[DHT] = line_parts[0][2] - response[DL_SPEED] = line_parts[0][3] - response[UP_SPEED] = line_parts[0][4] - response[DL_SIZE] = line_parts[0][5] - response[ETA] = line_parts[0][6] - - else: - err_msg = "Error: wrong message type"; + err_msg = "Error: wrong message type"; - if err_msg != '': - self.send_pickled_data(clientsock,(ERROR_MSG,err_msg)) - self.logger.debug(" Sending error message: " + err_msg) - else: - self.send_pickled_data(clientsock, (ACK_MSG, response)) - - clientsock.close() + if err_msg != '': + self.send_pickled_data(clientsock,(ERROR_MSG,err_msg)) + self.logger.debug(" Sending error message: " + err_msg) + else: + self.send_pickled_data(clientsock, (ACK_MSG, response)) + clientsock.close() + except Exception, ex: + self.logger.error("Exception occured: "+ str(ex)) if __name__ == "__main__": @@ -407,3 +412,4 @@ if __name__ == "__main__": sys.exit(2) + -- 2.20.1