Add docstrings for classes and most functions. Limit maximum line length to 79 characters. Update alignment.
ch.setLevel(logging.DEBUG)
# Create formatter.
ch.setLevel(logging.DEBUG)
# Create formatter.
-formatter = logging.Formatter('%(filename)s:%(lineno)s - %(levelname)s: %(message)s')
+formatter = logging.Formatter('%(filename)s:%(lineno)s - '
+ '%(levelname)s: %(message)s')
# Add formatter to console handler.
ch.setFormatter(formatter)
# Add formatter to console handler.
ch.setFormatter(formatter)
# Add console handler to logger.
logger.addHandler(ch)
# Add console handler to logger.
logger.addHandler(ch)
class SwarmDescription(object):
class SwarmDescription(object):
+
+ """Swarm Description class based on ConfigParser."""
+
+ """Initialize the data field as an instance of ConfigParser()."""
self.data = ConfigParser.ConfigParser()
def load(self, swarm_config_file):
self.data = ConfigParser.ConfigParser()
def load(self, swarm_config_file):
- """ Load configuration file and use ConfigParser.
- Expect configuration file without first section for swarm information.
+ """Load configuration file.
+ Expect configuration file without first section for swarm information.
+ Ignore tabs in configuration file.
NOSECTION = 'swarm'
try:
text = open(swarm_config_file).read()
NOSECTION = 'swarm'
try:
text = open(swarm_config_file).read()
self.data.readfp(f)
def store(self, ini_file):
self.data.readfp(f)
def store(self, ini_file):
- """ Write configuration information to file. """
+ """Write configuration information to file."""
f = open(ini_file, 'w')
self.data.write(f)
f.close()
def get_swarm(self):
f = open(ini_file, 'w')
self.data.write(f)
f.close()
def get_swarm(self):
- """ Return Swarm object containing information from config file. """
+ """Return Swarm object containing information from config file."""
swarm_data = dict(self.data.items('swarm'))
swarm = storage.Swarm(swarm_data['torrent_filename'],
swarm_data['data_size'],
swarm_data = dict(self.data.items('swarm'))
swarm = storage.Swarm(swarm_data['torrent_filename'],
swarm_data['data_size'],
return swarm
def get_session_entries(self):
return swarm
def get_session_entries(self):
- """ Return list of SessionEntry instances. """
+ """Return list of SessionEntry instances."""
session_names = self.data.sections()
session_names.remove('swarm')
session_list = []
session_names = self.data.sections()
session_names.remove('swarm')
session_list = []
return session_list
def update_session_entry_id(self, session_entry, cs_id):
return session_list
def update_session_entry_id(self, session_entry, cs_id):
- """ Add or modify client session id. """
+ """Add or modify client session id in self and session_entry."""
session_name = session_entry.data.sections()[0]
self.data.set(session_name, 'client_session_id', str(cs_id))
session_entry.data.set(session_name, 'client_session_id', str(cs_id))
def get_file_archives(self):
session_name = session_entry.data.sections()[0]
self.data.set(session_name, 'client_session_id', str(cs_id))
session_entry.data.set(session_name, 'client_session_id', str(cs_id))
def get_file_archives(self):
- """ Return a list containing all archives from swarm. """
+ """Return a list containing all archives from swarm."""
archives = []
for section in self.data.sections():
try:
archives = []
for section in self.data.sections():
try:
class SessionEntry(object):
class SessionEntry(object):
+
+ """Session Description class based on ConfigParser."""
+
def __init__(self, session):
def __init__(self, session):
+ """Initialize the data field as an instance of ConfigParser()."""
self.data = ConfigParser.ConfigParser()
self.data.add_section(session)
def get_session(self):
self.data = ConfigParser.ConfigParser()
self.data.add_section(session)
def get_session(self):
- """ Return name of the session. """
+ """Return name of the session."""
session = self.data.sections()[0]
cs_data = dict(self.data.items(session))
cs = storage.ClientSession(
session = self.data.sections()[0]
cs_data = dict(self.data.items(session))
cs = storage.ClientSession(
return cs
def get_session_id(self):
return cs
def get_session_id(self):
- """ Return client session id corresponding to the entry.
- If the session entry hasn't been asigned a session id raise error.
+ """Return client session id corresponding to the entry.
+ If the session entry hasn't been asigned a session id raise error.
section = self.data.sections()[0]
try:
cs_id = self.data.get(section, 'client_session_id')
section = self.data.sections()[0]
try:
cs_id = self.data.get(section, 'client_session_id')
logger.debug("No client session id for entry: %s" %section)
return None
logger.debug("No client session id for entry: %s" %section)
return None
class AccessConfig(object):
class AccessConfig(object):
+
+ """Access Configuration class based on ConfigParser."""
+
+ """Initialize the data field as an instance of ConfigParser()."""
self.data = ConfigParser.ConfigParser()
def load(self, access_config_file):
self.data = ConfigParser.ConfigParser()
def load(self, access_config_file):
- """ Load configuration file and use ConfigParser. """
+ """Load configuration file."""
try:
text = open(access_config_file).read()
except IOError:
try:
text = open(access_config_file).read()
except IOError:
self.data.readfp(f)
def store(self, ini_file):
self.data.readfp(f)
def store(self, ini_file):
- """ Write configuration information to file. """
+ """Write configuration information to file."""
f = open(ini_file, 'w')
self.data.write(f)
f.close()
def get_swarm_writer(self):
f = open(ini_file, 'w')
self.data.write(f)
f.close()
def get_swarm_writer(self):
- """ Return storage.SwarmWriter instance.
- For each section create coresponding storage.*Access instance
- and add to SwarmWriter.handlers list.
+ """Return storage.SwarmWriter instance.
+ For each section create coresponding storage.*Access instance
+ and add it to SwarmWriter.handlers list.
sw = storage.SwarmWriter()
for section in self.data.sections():
if section == 'mysql':
sw = storage.SwarmWriter()
for section in self.data.sections():
if section == 'mysql':
ch.setLevel(logging.DEBUG)
# Create formatter.
ch.setLevel(logging.DEBUG)
# Create formatter.
-formatter = logging.Formatter('%(filename)s:%(lineno)s - %(levelname)s: %(message)s')
+formatter = logging.Formatter('%(filename)s:%(lineno)s - '
+ '%(levelname)s: %(message)s')
# Add formatter to console handler.
ch.setFormatter(formatter)
# Add formatter to console handler.
ch.setFormatter(formatter)
self.length = length
self.listen_port = listen_port
self.length = length
self.listen_port = listen_port
class SwarmDataAccess(object):
class SwarmDataAccess(object):
+
+ """Base class for swarm information storage.
+
+ Define main operations on storage objects: Swarm, ClientSession,
+ StatusMessage, PeerStatusMessage, VerboseMessage.
+ Each object has four corresponding operations: add, remove, get, update.
+ """
+
def update_verbose_message(self):
pass
def update_verbose_message(self):
pass
class FileAccess(SwarmDataAccess):
class FileAccess(SwarmDataAccess):
+
+ """Subclass of SwarmDataAccess providing access for file storage."""
+
def __init__(self, path):
def __init__(self, path):
+ """Add base_path attribute received as argument."""
def find_last_numeric_subfolder(path):
def find_last_numeric_subfolder(path):
- """
- Find last numeric folder in base_path folder.
+ """Find last numeric folder in base_path folder.
+
The last numeric folder is the last swarm_id.
"""
The last numeric folder is the last swarm_id.
"""
dir_list = []
pattern = re.compile("[0-9]+")
dir_list = []
pattern = re.compile("[0-9]+")
dir_list.sort()
return dir_list[len(dir_list)-1]
dir_list.sort()
return dir_list[len(dir_list)-1]
class TreeTextFileAccess(FileAccess):
class TreeTextFileAccess(FileAccess):
+
+ """Child class of FileAccess for directory tree structure access."""
+
def __init__(self, path):
def __init__(self, path):
+ """Use superclass method for initialisation of constructor."""
super(TreeTextFileAccess, self).__init__(path)
def add_swarm(self, swarm):
super(TreeTextFileAccess, self).__init__(path)
def add_swarm(self, swarm):
f.close()
def add_peer_status_message(self, msg):
f.close()
def add_peer_status_message(self, msg):
+ """Add peer status msg line to file. msg type is PeerStatusMessage."""
# TODO: id is number of lines in file.
swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
session_path = os.path.join(swarm_path, str(msg.client_session_id))
# TODO: id is number of lines in file.
swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
session_path = os.path.join(swarm_path, str(msg.client_session_id))
f.close()
def add_status_message(self, msg):
f.close()
def add_status_message(self, msg):
+ """Add status msg line to file. msg type is StatusMessage."""
# TODO: id is number of lines in file.
swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
session_path = os.path.join(swarm_path, str(msg.client_session_id))
# TODO: id is number of lines in file.
swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
session_path = os.path.join(swarm_path, str(msg.client_session_id))
f.close()
def add_verbose_message(self, msg):
f.close()
def add_verbose_message(self, msg):
+ """Add verbose msg line to file. msg type is VerboseMessage."""
# TODO: id is number of lines in file.
swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
session_path = os.path.join(swarm_path, str(msg.client_session_id))
# TODO: id is number of lines in file.
swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
session_path = os.path.join(swarm_path, str(msg.client_session_id))
f.close()
class DatabaseAccess(SwarmDataAccess):
f.close()
class DatabaseAccess(SwarmDataAccess):
+
+ """Subclass of SwarmDataAccess providing functions for database usage."""
+
def __init__(self, database):
def __init__(self, database):
+ """Add database attribute received as argument.
+
+ The argument may be a name or a list with options for connection.
+ """
+
self.database = database
def connect(self):
self.database = database
def connect(self):
+ """Initialize access attributes."""
self.conn = None
self.cursor = None
def disconnect(self):
self.conn = None
self.cursor = None
def disconnect(self):
+ """Close connection with database."""
self.cursor.close()
self.conn.close()
class SQLiteDatabaseAccess(DatabaseAccess):
self.cursor.close()
self.conn.close()
class SQLiteDatabaseAccess(DatabaseAccess):
- def __init___(self, database):
+
+ """Child class of DatabaseAccess for SQLite access."""
+
+ def __init__(self, database):
+ """Use superclass method for initialisation of constructor."""
super(SQLiteDatabaseAccess, self).__init__(database)
def connect(self):
super(SQLiteDatabaseAccess, self).__init__(database)
def connect(self):
+ """Connect to sqlite3 database."""
self.conn = sqlite3.connect(self.database)
self.cursor = self.conn.cursor()
# Use foreign key support if available.
self.conn = sqlite3.connect(self.database)
self.cursor = self.conn.cursor()
# Use foreign key support if available.
self.values = self.values + "'" + str(data_value) + "'" + ", "
def add_swarm(self, swarm):
self.values = self.values + "'" + str(data_value) + "'" + ", "
def add_swarm(self, swarm):
+ """Insert swarm in database. swarm type is Swarm."""
self.reset_query()
self.append_to_insert_query("torrent_filename", swarm.torrent_filename)
self.append_to_insert_query("data_size", swarm.data_size)
self.reset_query()
self.append_to_insert_query("torrent_filename", swarm.torrent_filename)
self.append_to_insert_query("data_size", swarm.data_size)
self.conn.commit()
def add_client_session(self, session):
self.conn.commit()
def add_client_session(self, session):
+ """Insert session in database. session type is Session."""
self.reset_query()
self.append_to_insert_query("swarm_id", session.swarm_id)
# TODO: search database for client ID
self.append_to_insert_query("btclient_id",
bittorrent_clients[session.btclient]['id'])
self.append_to_insert_query("system_os", session.system_os)
self.reset_query()
self.append_to_insert_query("swarm_id", session.swarm_id)
# TODO: search database for client ID
self.append_to_insert_query("btclient_id",
bittorrent_clients[session.btclient]['id'])
self.append_to_insert_query("system_os", session.system_os)
- self.append_to_insert_query("system_os_version", session.system_os_version)
+ self.append_to_insert_query("system_os_version",
+ session.system_os_version)
self.append_to_insert_query("system_ram", session.system_ram)
self.append_to_insert_query("system_cpu", session.system_cpu)
self.append_to_insert_query("public_ip", session.public_ip)
self.append_to_insert_query("system_ram", session.system_ram)
self.append_to_insert_query("system_cpu", session.system_cpu)
self.append_to_insert_query("public_ip", session.public_ip)
self.append_to_insert_query("start_time", session.start_time)
self.append_to_insert_query("dht_enabled", session.dht_enabled)
self.append_to_insert_query("pxe_enabled", session.pxe_enabled)
self.append_to_insert_query("start_time", session.start_time)
self.append_to_insert_query("dht_enabled", session.dht_enabled)
self.append_to_insert_query("pxe_enabled", session.pxe_enabled)
- self.append_to_insert_query("streaming_enabled", session.streaming_enabled)
+ self.append_to_insert_query("streaming_enabled",
+ session.streaming_enabled)
self.append_to_insert_query("features", session.features)
self.append_to_insert_query("description", session.description)
self.append_to_insert_query("features", session.features)
self.append_to_insert_query("description", session.description)
1000 * ts.microsecond)
def add_peer_status_message(self, msg):
1000 * ts.microsecond)
def add_peer_status_message(self, msg):
+ """Insert peer status message in database.
+ msg type is PeerStatusMessage.
+ """
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.conn.commit()
def add_status_message(self, msg):
self.conn.commit()
def add_status_message(self, msg):
+ """Insert status msg in database. msg type is StatusMessage."""
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.conn.commit()
def add_verbose_message(self, msg):
self.conn.commit()
def add_verbose_message(self, msg):
+ """Insert verbose msg in database. msg type is VerboseMessage."""
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.conn.commit()
class MySQLDatabaseAccess(DatabaseAccess):
self.conn.commit()
class MySQLDatabaseAccess(DatabaseAccess):
- def __init___(self, database):
+
+ """Child class of DatabaseAccess for MySQL access."""
+
+ def __init__(self, database):
+ """Initialize the database attribute of the class.
+
+ Use superclass method for initialisation of constructor.
+ """
+
super(MySQLDatabaseAccess, self).__init__(database)
def connect(self):
super(MySQLDatabaseAccess, self).__init__(database)
def connect(self):
- # TODO Add support for reading connection information from config_file
+ """Connect to MySQL database."""
self.conn = MySQLdb.Connection(db=self.database['database'],
user=self.database['user'],
passwd=self.database['password'],
host=self.database['host'],
self.conn = MySQLdb.Connection(db=self.database['database'],
user=self.database['user'],
passwd=self.database['password'],
host=self.database['host'],
- port=self.database['port'])
+ port=int(self.database['port']))
self.cursor = self.conn.cursor()
self.conn.commit()
self.cursor = self.conn.cursor()
self.conn.commit()
self.values = self.values + "'" + str(data_value) + "'" + ", "
def add_swarm(self, swarm):
self.values = self.values + "'" + str(data_value) + "'" + ", "
def add_swarm(self, swarm):
+ """Insert swarm in database. swarm type is Swarm."""
self.reset_query()
self.append_to_insert_query("torrent_filename", swarm.torrent_filename)
self.append_to_insert_query("data_size", swarm.data_size)
self.reset_query()
self.append_to_insert_query("torrent_filename", swarm.torrent_filename)
self.append_to_insert_query("data_size", swarm.data_size)
self.conn.commit()
def add_client_session(self, session):
self.conn.commit()
def add_client_session(self, session):
+ """Insert session in database. session type is Session."""
self.reset_query()
self.append_to_insert_query("swarm_id", session.swarm_id)
# TODO: search database for client ID
self.append_to_insert_query("btclient_id",
bittorrent_clients[session.btclient]['id'])
self.append_to_insert_query("system_os", session.system_os)
self.reset_query()
self.append_to_insert_query("swarm_id", session.swarm_id)
# TODO: search database for client ID
self.append_to_insert_query("btclient_id",
bittorrent_clients[session.btclient]['id'])
self.append_to_insert_query("system_os", session.system_os)
- self.append_to_insert_query("system_os_version", session.system_os_version)
+ self.append_to_insert_query("system_os_version",
+ session.system_os_version)
self.append_to_insert_query("system_ram", session.system_ram)
self.append_to_insert_query("system_cpu", session.system_cpu)
self.append_to_insert_query("public_ip", session.public_ip)
self.append_to_insert_query("system_ram", session.system_ram)
self.append_to_insert_query("system_cpu", session.system_cpu)
self.append_to_insert_query("public_ip", session.public_ip)
self.append_to_insert_query("start_time", session.start_time)
self.append_to_insert_query("dht_enabled", session.dht_enabled)
self.append_to_insert_query("pxe_enabled", session.pxe_enabled)
self.append_to_insert_query("start_time", session.start_time)
self.append_to_insert_query("dht_enabled", session.dht_enabled)
self.append_to_insert_query("pxe_enabled", session.pxe_enabled)
- self.append_to_insert_query("streaming_enabled", session.streaming_enabled)
+ self.append_to_insert_query("streaming_enabled",
+ session.streaming_enabled)
self.append_to_insert_query("features", session.features)
self.append_to_insert_query("description", session.description)
self.append_to_insert_query("features", session.features)
self.append_to_insert_query("description", session.description)
%(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second)
def add_peer_status_message(self, msg):
%(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second)
def add_peer_status_message(self, msg):
+ """Insert peer status message in database.
+ msg type is PeerStatusMessage.
+ """
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.conn.commit()
def add_status_message(self, msg):
self.conn.commit()
def add_status_message(self, msg):
+ """Insert status msg in database. msg type is StatusMessage."""
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.conn.commit()
def add_verbose_message(self, msg):
self.conn.commit()
def add_verbose_message(self, msg):
+ """Insert verbose msg in database. msg type is VerboseMessage."""
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.reset_query()
self.append_to_insert_query("client_session_id", msg.client_session_id)
self.conn.commit()
class SwarmWriter(object):
self.conn.commit()
class SwarmWriter(object):
- """
- Wrapper class for swarm storage write actions. Multiple *Access
- objects may be added to the clas resulting in multiple storage types.
+ """Wrapper class for swarm storage write actions.
+
+ Multiple *Access objects may be added to the clas resulting in multiple
+ storage types.
For example, adding a swarm could result in
* adding a table entry in a MySQL database (MySQLDatabaseAccess)
* adding a table entry in an SQLite database (SQLiteDatabaseAccess)
For example, adding a swarm could result in
* adding a table entry in a MySQL database (MySQLDatabaseAccess)
* adding a table entry in an SQLite database (SQLiteDatabaseAccess)
import storage
class SwarmDescriptionTest(unittest.TestCase):
import storage
class SwarmDescriptionTest(unittest.TestCase):
- """
- Test suite for SwarmDescription class in config.py.
- """
+
+ """Test suite for SwarmDescription class in config.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
config_file = "config.sample.ini"
# Class specific variables. Initialized in setUp, used throughout tests.
config_file = "config.sample.ini"
sd = None
def setUp(self):
sd = None
def setUp(self):
- self.test_data = {'swarm': {'torrent_filename': 'fedora-11-cds.torrent',
+ self.test_data = {'swarm': {'torrent_filename':
+ 'fedora-11-cds.torrent',
- 'description': 'BitTorrent Distribution Experiment, Fedora 11, i386 netinst'},
- 'p2p-next-01': {'bittorrent_client': 'libtorrent-rasterbar',
+ 'description': ('BitTorrent Distribution '
+ 'Experiment, Fedora 11, i386 netinst')},
+ 'p2p-next-01': {'bittorrent_client':
+ 'libtorrent-rasterbar',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '1999',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '1999',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
- 'start_time': 'Thu Aug 11 15:36:35 EEST 2010',
+ 'start_time':
+ 'Thu Aug 11 15:36:35 EEST 2010',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
- 'log_file': 'p2p-next-01.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz'},
- 'p2p-next-03': {'bittorrent_client': 'libtorrent-rasterbar',
+ 'log_file':
+ 'p2p-next-01.grid.pub.ro/btex-fe'
+ 'dora-11/dbex-fedora-cds.tar.gz'},
+ 'p2p-next-03': {'bittorrent_client':
+ 'libtorrent-rasterbar',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '2007',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '2007',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
- 'start_time': 'Thu Aug 11 15:36:35 EEST 2010',
+ 'start_time':
+ 'Thu Aug 11 15:36:35 EEST 2010',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
- 'log_file': 'p2p-next-03.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz'},
- 'p2p-next-04': {'bittorrent_client': 'libtorrent-rasterbar',
+ 'log_file':
+ 'p2p-next-03.grid.pub.ro/btex-fe'
+ 'dora-11/dbex-fedora-cds.tar.gz'},
+ 'p2p-next-04': {'bittorrent_client':
+ 'libtorrent-rasterbar',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '2007',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '2007',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
- 'start_time': 'Thu Aug 11 15:36:35 EEST 2010',
+ 'start_time':
+ 'Thu Aug 11 15:36:35 EEST 2010',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
- 'log_file': 'p2p-next-04.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz'},
- 'p2p-next-09': {'bittorrent_client': 'libtorrent-rasterbar',
+ 'log_file':
+ 'p2p-next-04.grid.pub.ro/btex-fe'
+ 'dora-11/dbex-fedora-cds.tar.gz'},
+ 'p2p-next-09': {'bittorrent_client':
+ 'libtorrent-rasterbar',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '1999',
'system_os': 'Linux',
'system_os_version': '2.6.26',
'system_ram': '1999',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
'public_port': '6881',
'ds_limit': '0',
'us_limit': '0',
- 'start_time': 'Thu Aug 11 15:36:35 EEST 2010',
+ 'start_time':
+ 'Thu Aug 11 15:36:35 EEST 2010',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
'dht_enabled': 'true',
'pxe_enabled': 'false',
'streaming_enabled': 'false',
'description': 'NCIT Cluster System',
- 'log_file': 'p2p-next-09.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz'}}
+ 'log_file':
+ 'p2p-next-09.grid.pub.ro/btex-fe'
+ 'dora-11/dbex-fedora-cds.tar.gz'}}
config_file_path = os.path.join(os.path.dirname(__file__),
self.config_file)
config_file_path = os.path.join(os.path.dirname(__file__),
self.config_file)
def test_get_swarm(self):
swarm = self.sd.get_swarm()
expected_swarm = storage.Swarm(
def test_get_swarm(self):
swarm = self.sd.get_swarm()
expected_swarm = storage.Swarm(
- self.test_data['swarm']['torrent_filename'],
- self.test_data['swarm']['data_size'],
- self.test_data['swarm']['description'])
+ self.test_data['swarm']['torrent_filename'],
+ self.test_data['swarm']['data_size'],
+ self.test_data['swarm']['description'])
self.assertEqual(swarm.torrent_filename,
expected_swarm.torrent_filename)
self.assertEqual(swarm.torrent_filename,
expected_swarm.torrent_filename)
def test_get_file_archives(self):
archives = self.sd.get_file_archives()
expected_archives = [
def test_get_file_archives(self):
archives = self.sd.get_file_archives()
expected_archives = [
- 'p2p-next-01.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz',
- 'p2p-next-03.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz',
- 'p2p-next-04.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz',
- 'p2p-next-09.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz']
+ 'p2p-next-01.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz',
+ 'p2p-next-03.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz',
+ 'p2p-next-04.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz',
+ 'p2p-next-09.grid.pub.ro/btex-fedora-11/dbex-fedora-cds.tar.gz']
self.assertEqual(archives.sort(),expected_archives.sort())
self.assertEqual(archives.sort(),expected_archives.sort())
class SessionEntryTest(unittest.TestCase):
class SessionEntryTest(unittest.TestCase):
- """
- Test suite for SessionEntry class in config.py.
- """
+
+ """Test suite for SessionEntry class in config.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
se = None
# Class specific variables. Initialized in setUp, used throughout tests.
se = None
cs_id = self.se.get_session_id()
self.assertEqual(str(cs_id), str(test_cs_id))
cs_id = self.se.get_session_id()
self.assertEqual(str(cs_id), str(test_cs_id))
class AccessConfigTest(unittest.TestCase):
class AccessConfigTest(unittest.TestCase):
- """
- Test suite for AccessConfig class in config.py.
- """
+
+ """Test suite for AccessConfig class in config.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
access_file = "access.sample.ini"
# Class specific variables. Initialized in setUp, used throughout tests.
access_file = "access.sample.ini"
self.assertEqual(expected_handlers, handlers)
self.assertEqual(expected_handlers, handlers)
if __name__ == "__main__":
unittest.main()
if __name__ == "__main__":
unittest.main()
f.close()
class StorageTest(unittest.TestCase):
f.close()
class StorageTest(unittest.TestCase):
- """
- Test suite for stand alone functions in storage.py.
- """
+
+ """Test suite for stand alone functions in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
path = "/tmp/ttfa-test"
# Class specific variables. Initialized in setUp, used throughout tests.
path = "/tmp/ttfa-test"
id = storage.find_last_numeric_subfolder(self.path)
self.assertEqual(id, None)
id = storage.find_last_numeric_subfolder(self.path)
self.assertEqual(id, None)
- def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(self):
+ def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(
+ self):
"""
Test return of correct id when sequential numeric named
subfolders only are present (1, 2, 3, ...).
"""
Test return of correct id when sequential numeric named
subfolders only are present (1, 2, 3, ...).
id = storage.find_last_numeric_subfolder(self.path)
self.assertEqual(id, 5)
id = storage.find_last_numeric_subfolder(self.path)
self.assertEqual(id, 5)
- def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(self):
+ def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(
+ self):
"""
Test return of correct id when nonsequential numeric named
subfolders only are present (32, 5, 423, ...).
"""
Test return of correct id when nonsequential numeric named
subfolders only are present (32, 5, 423, ...).
class TreeTextFileAccessTest(unittest.TestCase):
class TreeTextFileAccessTest(unittest.TestCase):
- """
- Test suite for TreeTextFileAccess class in storage.py.
- """
+
+ """Test suite for TreeTextFileAccess class in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
path = "/tmp/ttfa-test"
# Class specific variables. Initialized in setUp, used throughout tests.
path = "/tmp/ttfa-test"
# Create instance of class and add swarm.
a = storage.TreeTextFileAccess(self.path)
s = storage.Swarm(torrent_filename="fedora.torrent",
# Create instance of class and add swarm.
a = storage.TreeTextFileAccess(self.path)
s = storage.Swarm(torrent_filename="fedora.torrent",
a.add_swarm(s)
expected_swarm_subfolder = 1
expected_swarm_path = os.path.join(self.path,
a.add_swarm(s)
expected_swarm_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_swarm_config = os.path.join(expected_swarm_path, "swarm.conf")
self.assertEqual(os.path.isdir(expected_swarm_path), True)
expected_swarm_config = os.path.join(expected_swarm_path, "swarm.conf")
self.assertEqual(os.path.isdir(expected_swarm_path), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
a.add_client_session(cs)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_session_config = os.path.join(expected_session_path,
expected_session_config = os.path.join(expected_session_path,
self.assertEqual(os.path.isdir(expected_session_path), True)
self.assertEqual(os.path.isfile(expected_session_config), True)
self.assertEqual(os.path.isdir(expected_session_path), True)
self.assertEqual(os.path.isfile(expected_session_config), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
# Add peer status message.
msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
a.add_client_session(cs)
# Add peer status message.
msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
- peer_ip="141.85.224.202", peer_port="12345",
- download_speed=13, upload_speed=98)
+ peer_ip="141.85.224.202",
+ peer_port="12345", download_speed=13,
+ upload_speed=98)
a.add_peer_status_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
a.add_peer_status_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_message_file = os.path.join(expected_session_path,
expected_message_file = os.path.join(expected_session_path,
self.assertEqual(os.path.isfile(expected_message_file), True)
self.assertEqual(os.path.isfile(expected_message_file), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
# Add status message.
msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
a.add_client_session(cs)
# Add status message.
msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
- num_peers=10, num_dht_peers=3, download_speed=102,
- upload_speed=99, download_size=10213, upload_size=3301)
+ num_peers=10, num_dht_peers=3,
+ download_speed=102, upload_speed=99,
+ download_size=10213, upload_size=3301)
a.add_status_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
a.add_status_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_message_file = os.path.join(expected_session_path,
expected_message_file = os.path.join(expected_session_path,
self.assertEqual(os.path.isfile(expected_message_file), True)
self.assertEqual(os.path.isfile(expected_message_file), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
# Add verbose message.
msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
a.add_client_session(cs)
# Add verbose message.
msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
- transfer_direction="send", peer_ip="141.85.224.202",
- peer_port="12345", message_type="CHOKE")
+ transfer_direction="send",
+ peer_ip="141.85.224.202",
+ peer_port="12345", message_type="CHOKE")
a.add_verbose_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
a.add_verbose_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_message_file = os.path.join(expected_session_path,
expected_message_file = os.path.join(expected_session_path,
self.assertEqual(os.path.isfile(expected_message_file), True)
def test_add_verbose_message_number_of_lines(self):
self.assertEqual(True, True)
self.assertEqual(os.path.isfile(expected_message_file), True)
def test_add_verbose_message_number_of_lines(self):
self.assertEqual(True, True)
class SQLiteDatabaseAccessTest(unittest.TestCase):
class SQLiteDatabaseAccessTest(unittest.TestCase):
- """
- Test suite for SQLiteDatabaseAccess class in storage.py.
- """
+
+ """Test suite for SQLiteDatabaseAccess class in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
database = "test.db"
# Class specific variables. Initialized in setUp, used throughout tests.
database = "test.db"
expected_verbmsg_count = 0
def get_swarm_count(self):
expected_verbmsg_count = 0
def get_swarm_count(self):
- """ Retrieve number of entries in `swarms` table. """
+ """Retrieve number of entries in `swarms` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_session_count(self):
return count
def get_session_count(self):
- """ Retrieve number of entries in `client_sessions` table. """
+ """Retrieve number of entries in `client_sessions` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_statmsg_count(self):
return count
def get_statmsg_count(self):
- """ Retrieve number of entries in `status_messages` table. """
+ """Retrieve number of entries in `status_messages` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_pstatmsg_count(self):
return count
def get_pstatmsg_count(self):
- """ Retrieve number of entries in `peer_status_messages` table. """
+ """Retrieve number of entries in `peer_status_messages` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_verbmsg_count(self):
return count
def get_verbmsg_count(self):
- """ Retrieve number of entries in `verbose_messages` table. """
+ """Retrieve number of entries in `verbose_messages` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def setUp(self):
return count
def setUp(self):
- """ Create database file and instantiate objects. """
+ """Create database file and instantiate objects."""
# Create database file. Create tables and indices.
# TODO: Check exceptions.
sql_create_script_path = os.path.join(os.path.dirname(__file__),
# Create database file. Create tables and indices.
# TODO: Check exceptions.
sql_create_script_path = os.path.join(os.path.dirname(__file__),
- self.sql_create_script)
+ self.sql_create_script)
f = open(sql_create_script_path, 'r')
p = subprocess.Popen(["sqlite3", self.database], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
f = open(sql_create_script_path, 'r')
p = subprocess.Popen(["sqlite3", self.database], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
# Populate database.
# TODO: Check exceptions.
sql_init_script_path = os.path.join(os.path.dirname(__file__),
# Populate database.
# TODO: Check exceptions.
sql_init_script_path = os.path.join(os.path.dirname(__file__),
f = open(sql_init_script_path, 'r')
p = subprocess.Popen(["sqlite3", self.database], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
f = open(sql_init_script_path, 'r')
p = subprocess.Popen(["sqlite3", self.database], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
self.expected_verbmsg_count = 2
def tearDown(self):
self.expected_verbmsg_count = 2
def tearDown(self):
- """ Close connection and remove database file. """
+ """Close connection and remove database file."""
os.remove(self.database)
def test_add_swarm_new_entry_in_table(self):
# Add new swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
os.remove(self.database)
def test_add_swarm_new_entry_in_table(self):
# Add new swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_swarm(s)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_swarm(s)
def test_add_client_session_new_entry_in_table(self):
# Add new client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
def test_add_client_session_new_entry_in_table(self):
# Add new client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_client_session(cs)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_client_session(cs)
def test_add_status_message_new_entry_in_table(self):
# Add new status message.
ts = datetime.datetime.strptime("2010-09-12 08:43:15",
def test_add_status_message_new_entry_in_table(self):
# Add new status message.
ts = datetime.datetime.strptime("2010-09-12 08:43:15",
msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
- num_peers=10, num_dht_peers=3, download_speed=102,
- upload_speed=99, download_size=10213, upload_size=3301)
+ num_peers=10, num_dht_peers=3,
+ download_speed=102, upload_speed=99,
+ download_size=10213, upload_size=3301)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_status_message(msg)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_status_message(msg)
def test_add_peer_status_message_new_entry_in_table(self):
# Add new peer status message.
ts = datetime.datetime.strptime("2010-09-12 13:43:25",
def test_add_peer_status_message_new_entry_in_table(self):
# Add new peer status message.
ts = datetime.datetime.strptime("2010-09-12 13:43:25",
msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
- peer_ip="141.85.224.202", peer_port="12345",
- download_speed=13, upload_speed=98)
+ peer_ip="141.85.224.202",
+ peer_port="12345", download_speed=13,
+ upload_speed=98)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_peer_status_message(msg)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_peer_status_message(msg)
self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
- def test_add_verbose_message_new_entry_in_databas(self):
+ def test_add_verbose_message_new_entry_in_database(self):
# Add new verbose message.
ts = datetime.datetime.strptime("2010-09-12 13:43:24",
# Add new verbose message.
ts = datetime.datetime.strptime("2010-09-12 13:43:24",
msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
- transfer_direction="send", peer_ip="141.85.224.202",
- peer_port="12345", message_type="CHOKE")
+ transfer_direction="send",
+ peer_ip="141.85.224.202",
+ peer_port="12345",
+ message_type="CHOKE")
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_verbose_message(msg)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_verbose_message(msg)
class MySQLDatabaseAccessTest(unittest.TestCase):
class MySQLDatabaseAccessTest(unittest.TestCase):
- """
- Test suite for MySQLDatabaseAccess class in storage.py.
- """
+
+ """Test suite for MySQLDatabaseAccess class in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
database = "p2p_test"
# Class specific variables. Initialized in setUp, used throughout tests.
database = "p2p_test"
expected_verbmsg_count = 0
def get_swarm_count(self):
expected_verbmsg_count = 0
def get_swarm_count(self):
- """ Retrieve number of entries in `swarms` table. """
-
+ """Retrieve number of entries in `swarms` table."""
self.cursor.execute('SELECT COUNT(id) FROM swarms')
count = self.cursor.fetchone()[0]
return count
def get_session_count(self):
self.cursor.execute('SELECT COUNT(id) FROM swarms')
count = self.cursor.fetchone()[0]
return count
def get_session_count(self):
- """ Retrieve number of entries in `client_sessions` table. """
+ """Retrieve number of entries in `client_sessions` table."""
self.cursor.execute('SELECT COUNT(id) FROM client_sessions')
count = self.cursor.fetchone()[0]
return count
def get_statmsg_count(self):
self.cursor.execute('SELECT COUNT(id) FROM client_sessions')
count = self.cursor.fetchone()[0]
return count
def get_statmsg_count(self):
- """ Retrieve number of entries in `status_messages` table. """
+ """Retrieve number of entries in `status_messages` table."""
self.cursor.execute('SELECT COUNT(id) FROM status_messages')
count = self.cursor.fetchone()[0]
return count
def get_pstatmsg_count(self):
self.cursor.execute('SELECT COUNT(id) FROM status_messages')
count = self.cursor.fetchone()[0]
return count
def get_pstatmsg_count(self):
- """ Retrieve number of entries in `peer_status_messages` table. """
+ """Retrieve number of entries in `peer_status_messages` table."""
self.cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
count = self.cursor.fetchone()[0]
return count
def get_verbmsg_count(self):
self.cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
count = self.cursor.fetchone()[0]
return count
def get_verbmsg_count(self):
- """ Retrieve number of entries in `verbose_messages` table. """
+ """Retrieve number of entries in `verbose_messages` table."""
self.cursor.execute('SELECT COUNT(id) FROM verbose_messages')
count = self.cursor.fetchone()[0]
return count
def setUp(self):
self.cursor.execute('SELECT COUNT(id) FROM verbose_messages')
count = self.cursor.fetchone()[0]
return count
def setUp(self):
- """ Create database and instantiate objects. """
+ """Create database and instantiate objects."""
# Create database. Create tables and indices.
# TODO: Check exceptions.
# Create database. Create tables and indices.
# TODO: Check exceptions.
"--password=%s" %self.password], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
f.close()
"--password=%s" %self.password], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
f.close()
# Populate database.
# TODO: Check exceptions.
sql_init_script_path = os.path.join(os.path.dirname(__file__),
# Populate database.
# TODO: Check exceptions.
sql_init_script_path = os.path.join(os.path.dirname(__file__),
f = open(sql_init_script_path, 'r')
p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
f = open(sql_init_script_path, 'r')
p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
def test_add_swarm_new_entry_in_table(self):
# Add new swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
def test_add_swarm_new_entry_in_table(self):
# Add new swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_swarm(s)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_swarm(s)
def test_add_client_session_new_entry_in_table(self):
# Add new client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
def test_add_client_session_new_entry_in_table(self):
# Add new client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_client_session(cs)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_client_session(cs)
def test_add_status_message_new_entry_in_table(self):
# Add new status message.
ts = datetime.datetime.strptime("2010-09-12 08:43:15",
def test_add_status_message_new_entry_in_table(self):
# Add new status message.
ts = datetime.datetime.strptime("2010-09-12 08:43:15",
msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
- num_peers=10, num_dht_peers=3, download_speed=102,
- upload_speed=99, download_size=10213, upload_size=3301)
+ num_peers=10, num_dht_peers=3,
+ download_speed=102, upload_speed=99,
+ download_size=10213, upload_size=3301)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_status_message(msg)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_status_message(msg)
def test_add_peer_status_message_new_entry_in_table(self):
# Add new peer status message.
ts = datetime.datetime.strptime("2010-09-12 13:43:25",
def test_add_peer_status_message_new_entry_in_table(self):
# Add new peer status message.
ts = datetime.datetime.strptime("2010-09-12 13:43:25",
msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
- peer_ip="141.85.224.202", peer_port="12345",
- download_speed=13, upload_speed=98)
+ peer_ip="141.85.224.202",
+ peer_port="12345", download_speed=13,
+ upload_speed=98)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_peer_status_message(msg)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_peer_status_message(msg)
self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
- def test_add_verbose_message_new_entry_in_databas(self):
+ def test_add_verbose_message_new_entry_in_database(self):
# Add new verbose message.
ts = datetime.datetime.strptime("2010-09-12 13:43:24",
# Add new verbose message.
ts = datetime.datetime.strptime("2010-09-12 13:43:24",
msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
- transfer_direction="send", peer_ip="141.85.224.202",
- peer_port="12345", message_type="CHOKE")
+ transfer_direction="send",
+ peer_ip="141.85.224.202",
+ peer_port="12345",
+ message_type="CHOKE")
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_verbose_message(msg)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_verbose_message(msg)
self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
if __name__ == "__main__":
unittest.main()
if __name__ == "__main__":
unittest.main()