f.close()
class StorageTest(unittest.TestCase):
- """
- Test suite for stand alone functions in storage.py.
- """
+
+ """Test suite for stand alone functions in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
path = "/tmp/ttfa-test"
id = storage.find_last_numeric_subfolder(self.path)
self.assertEqual(id, None)
- def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(self):
+ def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(
+ self):
"""
Test return of correct id when sequential numeric named
subfolders only are present (1, 2, 3, ...).
id = storage.find_last_numeric_subfolder(self.path)
self.assertEqual(id, 5)
- def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(self):
+ def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(
+ self):
"""
Test return of correct id when nonsequential numeric named
subfolders only are present (32, 5, 423, ...).
class TreeTextFileAccessTest(unittest.TestCase):
- """
- Test suite for TreeTextFileAccess class in storage.py.
- """
+
+ """Test suite for TreeTextFileAccess class in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
path = "/tmp/ttfa-test"
# Create instance of class and add swarm.
a = storage.TreeTextFileAccess(self.path)
s = storage.Swarm(torrent_filename="fedora.torrent",
- data_size=102400)
+ data_size=102400)
a.add_swarm(s)
expected_swarm_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_swarm_config = os.path.join(expected_swarm_path, "swarm.conf")
self.assertEqual(os.path.isdir(expected_swarm_path), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
- data_size=102400)
+ data_size=102400)
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_session_config = os.path.join(expected_session_path,
- "client_session.conf")
+ "client_session.conf")
self.assertEqual(os.path.isdir(expected_session_path), True)
self.assertEqual(os.path.isfile(expected_session_config), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
- data_size=102400)
+ data_size=102400)
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
# Add peer status message.
msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
- peer_ip="141.85.224.202", peer_port="12345",
- download_speed=13, upload_speed=98)
+ peer_ip="141.85.224.202",
+ peer_port="12345", download_speed=13,
+ upload_speed=98)
a.add_peer_status_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_message_file = os.path.join(expected_session_path,
- "peer_status.txt")
+ "peer_status.txt")
self.assertEqual(os.path.isfile(expected_message_file), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
- data_size=102400)
+ data_size=102400)
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
# Add status message.
msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
- num_peers=10, num_dht_peers=3, download_speed=102,
- upload_speed=99, download_size=10213, upload_size=3301)
+ num_peers=10, num_dht_peers=3,
+ download_speed=102, upload_speed=99,
+ download_size=10213, upload_size=3301)
a.add_status_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_message_file = os.path.join(expected_session_path,
- "status.txt")
+ "status.txt")
self.assertEqual(os.path.isfile(expected_message_file), True)
# Add swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
- data_size=102400)
+ data_size=102400)
a.add_swarm(s)
# Add client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a.add_client_session(cs)
# Add verbose message.
msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
- transfer_direction="send", peer_ip="141.85.224.202",
- peer_port="12345", message_type="CHOKE")
+ transfer_direction="send",
+ peer_ip="141.85.224.202",
+ peer_port="12345", message_type="CHOKE")
a.add_verbose_message(msg)
expected_swarm_subfolder = 1
expected_session_subfolder = 1
expected_swarm_path = os.path.join(self.path,
- str(expected_swarm_subfolder))
+ str(expected_swarm_subfolder))
expected_session_path = os.path.join(expected_swarm_path,
- str(expected_session_subfolder))
+ str(expected_session_subfolder))
expected_message_file = os.path.join(expected_session_path,
- "verbose.txt")
+ "verbose.txt")
self.assertEqual(os.path.isfile(expected_message_file), True)
def test_add_verbose_message_number_of_lines(self):
self.assertEqual(True, True)
+
class SQLiteDatabaseAccessTest(unittest.TestCase):
- """
- Test suite for SQLiteDatabaseAccess class in storage.py.
- """
+
+ """Test suite for SQLiteDatabaseAccess class in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
database = "test.db"
expected_verbmsg_count = 0
def get_swarm_count(self):
- """ Retrieve number of entries in `swarms` table. """
+ """Retrieve number of entries in `swarms` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_session_count(self):
- """ Retrieve number of entries in `client_sessions` table. """
+ """Retrieve number of entries in `client_sessions` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_statmsg_count(self):
- """ Retrieve number of entries in `status_messages` table. """
+ """Retrieve number of entries in `status_messages` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_pstatmsg_count(self):
- """ Retrieve number of entries in `peer_status_messages` table. """
+ """Retrieve number of entries in `peer_status_messages` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def get_verbmsg_count(self):
- """ Retrieve number of entries in `verbose_messages` table. """
+ """Retrieve number of entries in `verbose_messages` table."""
conn = sqlite3.connect(self.database)
cursor = conn.cursor()
return count
def setUp(self):
- """ Create database file and instantiate objects. """
+ """Create database file and instantiate objects."""
# Create database file. Create tables and indices.
# TODO: Check exceptions.
sql_create_script_path = os.path.join(os.path.dirname(__file__),
- self.sql_create_script)
+ self.sql_create_script)
f = open(sql_create_script_path, 'r')
p = subprocess.Popen(["sqlite3", self.database], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
# Populate database.
# TODO: Check exceptions.
sql_init_script_path = os.path.join(os.path.dirname(__file__),
- self.sql_init_script)
+ self.sql_init_script)
f = open(sql_init_script_path, 'r')
p = subprocess.Popen(["sqlite3", self.database], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
self.expected_verbmsg_count = 2
def tearDown(self):
- """ Close connection and remove database file. """
+ """Close connection and remove database file."""
os.remove(self.database)
def test_add_swarm_new_entry_in_table(self):
# Add new swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
- data_size=102400)
+ data_size=102400)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_swarm(s)
def test_add_client_session_new_entry_in_table(self):
# Add new client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_client_session(cs)
def test_add_status_message_new_entry_in_table(self):
# Add new status message.
ts = datetime.datetime.strptime("2010-09-12 08:43:15",
- "%Y-%m-%d %H:%M:%S")
+ "%Y-%m-%d %H:%M:%S")
msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
- num_peers=10, num_dht_peers=3, download_speed=102,
- upload_speed=99, download_size=10213, upload_size=3301)
+ num_peers=10, num_dht_peers=3,
+ download_speed=102, upload_speed=99,
+ download_size=10213, upload_size=3301)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_status_message(msg)
def test_add_peer_status_message_new_entry_in_table(self):
# Add new peer status message.
ts = datetime.datetime.strptime("2010-09-12 13:43:25",
- "%Y-%m-%d %H:%M:%S")
+ "%Y-%m-%d %H:%M:%S")
msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
- peer_ip="141.85.224.202", peer_port="12345",
- download_speed=13, upload_speed=98)
+ peer_ip="141.85.224.202",
+ peer_port="12345", download_speed=13,
+ upload_speed=98)
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_peer_status_message(msg)
self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
- def test_add_verbose_message_new_entry_in_databas(self):
+ def test_add_verbose_message_new_entry_in_database(self):
# Add new verbose message.
ts = datetime.datetime.strptime("2010-09-12 13:43:24",
- "%Y-%m-%d %H:%M:%S")
+ "%Y-%m-%d %H:%M:%S")
msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
- transfer_direction="send", peer_ip="141.85.224.202",
- peer_port="12345", message_type="CHOKE")
+ transfer_direction="send",
+ peer_ip="141.85.224.202",
+ peer_port="12345",
+ message_type="CHOKE")
a = storage.SQLiteDatabaseAccess(self.database)
a.connect()
a.add_verbose_message(msg)
class MySQLDatabaseAccessTest(unittest.TestCase):
- """
- Test suite for MySQLDatabaseAccess class in storage.py.
- """
+
+ """Test suite for MySQLDatabaseAccess class in storage.py."""
# Class specific variables. Initialized in setUp, used throughout tests.
database = "p2p_test"
expected_verbmsg_count = 0
def get_swarm_count(self):
- """ Retrieve number of entries in `swarms` table. """
-
+ """Retrieve number of entries in `swarms` table."""
self.cursor.execute('SELECT COUNT(id) FROM swarms')
count = self.cursor.fetchone()[0]
return count
def get_session_count(self):
- """ Retrieve number of entries in `client_sessions` table. """
+ """Retrieve number of entries in `client_sessions` table."""
self.cursor.execute('SELECT COUNT(id) FROM client_sessions')
count = self.cursor.fetchone()[0]
return count
def get_statmsg_count(self):
- """ Retrieve number of entries in `status_messages` table. """
+ """Retrieve number of entries in `status_messages` table."""
self.cursor.execute('SELECT COUNT(id) FROM status_messages')
count = self.cursor.fetchone()[0]
return count
def get_pstatmsg_count(self):
- """ Retrieve number of entries in `peer_status_messages` table. """
+ """Retrieve number of entries in `peer_status_messages` table."""
self.cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
count = self.cursor.fetchone()[0]
return count
def get_verbmsg_count(self):
- """ Retrieve number of entries in `verbose_messages` table. """
+ """Retrieve number of entries in `verbose_messages` table."""
self.cursor.execute('SELECT COUNT(id) FROM verbose_messages')
count = self.cursor.fetchone()[0]
return count
def setUp(self):
- """ Create database and instantiate objects. """
+ """Create database and instantiate objects."""
# Create database. Create tables and indices.
# TODO: Check exceptions.
"--password=%s" %self.password], stdin=f)
ret = os.waitpid(p.pid, 0)[1]
f.close()
-
+
# Populate database.
# TODO: Check exceptions.
sql_init_script_path = os.path.join(os.path.dirname(__file__),
- self.sql_init_script)
+ self.sql_init_script)
f = open(sql_init_script_path, 'r')
p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
def test_add_swarm_new_entry_in_table(self):
# Add new swarm.
s = storage.Swarm(torrent_filename="fedora.torrent",
- data_size=102400)
+ data_size=102400)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_swarm(s)
def test_add_client_session_new_entry_in_table(self):
# Add new client session.
cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
- system_os="Linux", system_os_version="2.6.26",
- system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
+ system_os="Linux",
+ system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000,
+ public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300,
+ us_limit=200)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_client_session(cs)
def test_add_status_message_new_entry_in_table(self):
# Add new status message.
ts = datetime.datetime.strptime("2010-09-12 08:43:15",
- "%Y-%m-%d %H:%M:%S")
+ "%Y-%m-%d %H:%M:%S")
msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
- num_peers=10, num_dht_peers=3, download_speed=102,
- upload_speed=99, download_size=10213, upload_size=3301)
+ num_peers=10, num_dht_peers=3,
+ download_speed=102, upload_speed=99,
+ download_size=10213, upload_size=3301)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_status_message(msg)
def test_add_peer_status_message_new_entry_in_table(self):
# Add new peer status message.
ts = datetime.datetime.strptime("2010-09-12 13:43:25",
- "%Y-%m-%d %H:%M:%S")
+ "%Y-%m-%d %H:%M:%S")
msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
- peer_ip="141.85.224.202", peer_port="12345",
- download_speed=13, upload_speed=98)
+ peer_ip="141.85.224.202",
+ peer_port="12345", download_speed=13,
+ upload_speed=98)
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_peer_status_message(msg)
self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
- def test_add_verbose_message_new_entry_in_databas(self):
+ def test_add_verbose_message_new_entry_in_database(self):
# Add new verbose message.
ts = datetime.datetime.strptime("2010-09-12 13:43:24",
- "%Y-%m-%d %H:%M:%S")
+ "%Y-%m-%d %H:%M:%S")
msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
- transfer_direction="send", peer_ip="141.85.224.202",
- peer_port="12345", message_type="CHOKE")
+ transfer_direction="send",
+ peer_ip="141.85.224.202",
+ peer_port="12345",
+ message_type="CHOKE")
a = storage.MySQLDatabaseAccess(self.database)
a.connect()
a.add_verbose_message(msg)
self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
+
if __name__ == "__main__":
unittest.main()