From: Razvan Deaconescu Date: Mon, 22 Aug 2011 08:28:18 +0000 (+0300) Subject: ppf: Add SQLite access tests. X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=b9d88b8747549defbb219f5a62fcd66ebe364292;p=cs-p2p-next.git ppf: Add SQLite access tests. --- diff --git a/ppf/new/tests/test_storage.py b/ppf/new/tests/test_storage.py index ef4b097..b01def8 100644 --- a/ppf/new/tests/test_storage.py +++ b/ppf/new/tests/test_storage.py @@ -239,7 +239,7 @@ class TreeTextFileAccessTest(unittest.TestCase): public_port="50500", ds_limit=300, us_limit=200) a.add_client_session(cs) - # Add peer status message. + # Add status message. msg = storage.StatusMessage(swarm_id=1, client_session_id=1, num_peers=10, num_dht_peers=3, download_speed=102, upload_speed=99, download_size=10213, upload_size=3301) @@ -274,7 +274,7 @@ class TreeTextFileAccessTest(unittest.TestCase): public_port="50500", ds_limit=300, us_limit=200) a.add_client_session(cs) - # Add peer status message. + # Add verbose message. msg = storage.VerboseMessage(swarm_id=1, client_session_id=1, transfer_direction="send", peer_ip="141.85.224.202", peer_port="12345", message_type="CHOKE") @@ -303,11 +303,77 @@ class SQLiteDatabaseAccessTest(unittest.TestCase): database = "test.db" sql_create_script = "p2p-log-sqlite.sql" sql_init_script = "p2p-init-test.sql" - expected_swarms_count = 0 - expected_sessions_count = 0 + expected_swarm_count = 0 + expected_session_count = 0 expected_statmsg_count = 0 + expected_pstatmsg_count = 0 expected_verbmsg_count = 0 + def get_swarm_count(self): + """ Retrieve number of entries in `swarms` table. """ + conn = sqlite3.connect(self.database) + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(id) FROM swarms') + count = cursor.fetchone()[0] + + cursor.close() + conn.close() + + return count + + def get_session_count(self): + """ Retrieve number of entries in `client_sessions` table. """ + conn = sqlite3.connect(self.database) + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(id) FROM client_sessions') + count = cursor.fetchone()[0] + + cursor.close() + conn.close() + + return count + + def get_statmsg_count(self): + """ Retrieve number of entries in `status_messages` table. """ + conn = sqlite3.connect(self.database) + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(id) FROM status_messages') + count = cursor.fetchone()[0] + + cursor.close() + conn.close() + + return count + + def get_pstatmsg_count(self): + """ Retrieve number of entries in `peer_status_messages` table. """ + conn = sqlite3.connect(self.database) + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(id) FROM peer_status_messages') + count = cursor.fetchone()[0] + + cursor.close() + conn.close() + + return count + + def get_verbmsg_count(self): + """ Retrieve number of entries in `verbose_messages` table. """ + conn = sqlite3.connect(self.database) + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(id) FROM verbose_messages') + count = cursor.fetchone()[0] + + cursor.close() + conn.close() + + return count + def setUp(self): """ Create database file and instantiate objects. """ # Create database file. Create tables and indices. @@ -329,9 +395,10 @@ class SQLiteDatabaseAccessTest(unittest.TestCase): f.close() # Initialize to number of table entries inserted in the init script. - self.expected_swarms_count = 2 - self.expected_sessions_count = 2 + self.expected_swarm_count = 2 + self.expected_session_count = 2 self.expected_statmsg_count = 2 + self.expected_pstatmsg_count = 2 self.expected_verbmsg_count = 2 def tearDown(self): @@ -339,22 +406,84 @@ class SQLiteDatabaseAccessTest(unittest.TestCase): os.remove(self.database) def test_add_swarm_new_entry_in_table(self): - self.assertEqual(True, False) + # Add new swarm. + s = storage.Swarm(torrent_filename="fedora.torrent", + data_size=102400) + a = storage.SQLiteDatabaseAccess(self.database) + a.connect() + a.add_swarm(s) + a.disconnect() + self.expected_swarm_count = self.expected_swarm_count + 1 + + # Select number of swarms. + swarm_count = self.get_swarm_count() + + self.assertEqual(swarm_count, self.expected_swarm_count) def test_add_client_session_new_entry_in_table(self): - self.assertEqual(True, False) + # Add new client session. + cs = storage.ClientSession(swarm_id=1, btclient="Tribler", + system_os="Linux", system_os_version="2.6.26", + system_ram=2048, system_cpu=3000, public_ip="141.85.224.201", + public_port="50500", ds_limit=300, us_limit=200) + a = storage.SQLiteDatabaseAccess(self.database) + a.connect() + a.add_client_session(cs) + a.disconnect() + self.expected_session_count = self.expected_session_count + 1 - def test_add_peer_status_message_new_entry_in_table(self): - self.assertEqual(True, False) + # Select number of swarms. + session_count = self.get_session_count() - def test_add_peer_status_message_new_entry_in_table(self): - self.assertEqual(True, False) + self.assertEqual(session_count, self.expected_session_count) def test_add_status_message_new_entry_in_table(self): - self.assertEqual(True, False) + # Add new status message. + msg = storage.StatusMessage(swarm_id=1, client_session_id=1, + num_peers=10, num_dht_peers=3, download_speed=102, + upload_speed=99, download_size=10213, upload_size=3301) + a = storage.SQLiteDatabaseAccess(self.database) + a.connect() + a.add_status_message(msg) + a.disconnect() + self.expected_statmsg_count = self.expected_statmsg_count + 1 + + # Select number of status messages. + statmsg_count = self.get_statmsg_count() + + self.assertEqual(statmsg_count, self.expected_statmsg_count) + + def test_add_peer_status_message_new_entry_in_table(self): + # Add new peer status message. + msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1, + peer_ip="141.85.224.202", peer_port="12345", + download_speed=13, upload_speed=98) + a = storage.SQLiteDatabaseAccess(self.database) + a.connect() + a.add_peer_status_message(msg) + a.disconnect() + self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1 + + # Select number of peer status messages. + pstatmsg_count = self.get_pstatmsg_count() + + self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count) def test_add_verbose_message_new_entry_in_databas(self): - self.assertEqual(True, False) + # Add new verbose message. + msg = storage.VerboseMessage(swarm_id=1, client_session_id=1, + transfer_direction="send", peer_ip="141.85.224.202", + peer_port="12345", message_type="CHOKE") + a = storage.SQLiteDatabaseAccess(self.database) + a.connect() + a.add_verbose_message(msg) + a.disconnect() + self.expected_verbmsg_count = self.expected_verbmsg_count + 1 + + # Select number of verbose messages. + verbmsg_count = self.get_verbmsg_count() + + self.assertEqual(verbmsg_count, self.expected_verbmsg_count) if __name__ == "__main__": unittest.main()