public_port="50500", ds_limit=300, us_limit=200)
a.add_client_session(cs)
- # Add peer status message.
+ # Add status message.
msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
num_peers=10, num_dht_peers=3, download_speed=102,
upload_speed=99, download_size=10213, upload_size=3301)
public_port="50500", ds_limit=300, us_limit=200)
a.add_client_session(cs)
- # Add peer status message.
+ # Add verbose message.
msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
transfer_direction="send", peer_ip="141.85.224.202",
peer_port="12345", message_type="CHOKE")
database = "test.db"
sql_create_script = "p2p-log-sqlite.sql"
sql_init_script = "p2p-init-test.sql"
- expected_swarms_count = 0
- expected_sessions_count = 0
+ expected_swarm_count = 0
+ expected_session_count = 0
expected_statmsg_count = 0
+ expected_pstatmsg_count = 0
expected_verbmsg_count = 0
+ def get_swarm_count(self):
+ """ Retrieve number of entries in `swarms` table. """
+ conn = sqlite3.connect(self.database)
+ cursor = conn.cursor()
+
+ cursor.execute('SELECT COUNT(id) FROM swarms')
+ count = cursor.fetchone()[0]
+
+ cursor.close()
+ conn.close()
+
+ return count
+
+ def get_session_count(self):
+ """ Retrieve number of entries in `client_sessions` table. """
+ conn = sqlite3.connect(self.database)
+ cursor = conn.cursor()
+
+ cursor.execute('SELECT COUNT(id) FROM client_sessions')
+ count = cursor.fetchone()[0]
+
+ cursor.close()
+ conn.close()
+
+ return count
+
+ def get_statmsg_count(self):
+ """ Retrieve number of entries in `status_messages` table. """
+ conn = sqlite3.connect(self.database)
+ cursor = conn.cursor()
+
+ cursor.execute('SELECT COUNT(id) FROM status_messages')
+ count = cursor.fetchone()[0]
+
+ cursor.close()
+ conn.close()
+
+ return count
+
+ def get_pstatmsg_count(self):
+ """ Retrieve number of entries in `peer_status_messages` table. """
+ conn = sqlite3.connect(self.database)
+ cursor = conn.cursor()
+
+ cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
+ count = cursor.fetchone()[0]
+
+ cursor.close()
+ conn.close()
+
+ return count
+
+ def get_verbmsg_count(self):
+ """ Retrieve number of entries in `verbose_messages` table. """
+ conn = sqlite3.connect(self.database)
+ cursor = conn.cursor()
+
+ cursor.execute('SELECT COUNT(id) FROM verbose_messages')
+ count = cursor.fetchone()[0]
+
+ cursor.close()
+ conn.close()
+
+ return count
+
def setUp(self):
""" Create database file and instantiate objects. """
# Create database file. Create tables and indices.
f.close()
# Initialize to number of table entries inserted in the init script.
- self.expected_swarms_count = 2
- self.expected_sessions_count = 2
+ self.expected_swarm_count = 2
+ self.expected_session_count = 2
self.expected_statmsg_count = 2
+ self.expected_pstatmsg_count = 2
self.expected_verbmsg_count = 2
def tearDown(self):
os.remove(self.database)
def test_add_swarm_new_entry_in_table(self):
- self.assertEqual(True, False)
+ # Add new swarm.
+ s = storage.Swarm(torrent_filename="fedora.torrent",
+ data_size=102400)
+ a = storage.SQLiteDatabaseAccess(self.database)
+ a.connect()
+ a.add_swarm(s)
+ a.disconnect()
+ self.expected_swarm_count = self.expected_swarm_count + 1
+
+ # Select number of swarms.
+ swarm_count = self.get_swarm_count()
+
+ self.assertEqual(swarm_count, self.expected_swarm_count)
def test_add_client_session_new_entry_in_table(self):
- self.assertEqual(True, False)
+ # Add new client session.
+ cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
+ system_os="Linux", system_os_version="2.6.26",
+ system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
+ public_port="50500", ds_limit=300, us_limit=200)
+ a = storage.SQLiteDatabaseAccess(self.database)
+ a.connect()
+ a.add_client_session(cs)
+ a.disconnect()
+ self.expected_session_count = self.expected_session_count + 1
- def test_add_peer_status_message_new_entry_in_table(self):
- self.assertEqual(True, False)
+ # Select number of swarms.
+ session_count = self.get_session_count()
- def test_add_peer_status_message_new_entry_in_table(self):
- self.assertEqual(True, False)
+ self.assertEqual(session_count, self.expected_session_count)
def test_add_status_message_new_entry_in_table(self):
- self.assertEqual(True, False)
+ # Add new status message.
+ msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
+ num_peers=10, num_dht_peers=3, download_speed=102,
+ upload_speed=99, download_size=10213, upload_size=3301)
+ a = storage.SQLiteDatabaseAccess(self.database)
+ a.connect()
+ a.add_status_message(msg)
+ a.disconnect()
+ self.expected_statmsg_count = self.expected_statmsg_count + 1
+
+ # Select number of status messages.
+ statmsg_count = self.get_statmsg_count()
+
+ self.assertEqual(statmsg_count, self.expected_statmsg_count)
+
+ def test_add_peer_status_message_new_entry_in_table(self):
+ # Add new peer status message.
+ msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
+ peer_ip="141.85.224.202", peer_port="12345",
+ download_speed=13, upload_speed=98)
+ a = storage.SQLiteDatabaseAccess(self.database)
+ a.connect()
+ a.add_peer_status_message(msg)
+ a.disconnect()
+ self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1
+
+ # Select number of peer status messages.
+ pstatmsg_count = self.get_pstatmsg_count()
+
+ self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
def test_add_verbose_message_new_entry_in_databas(self):
- self.assertEqual(True, False)
+ # Add new verbose message.
+ msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
+ transfer_direction="send", peer_ip="141.85.224.202",
+ peer_port="12345", message_type="CHOKE")
+ a = storage.SQLiteDatabaseAccess(self.database)
+ a.connect()
+ a.add_verbose_message(msg)
+ a.disconnect()
+ self.expected_verbmsg_count = self.expected_verbmsg_count + 1
+
+ # Select number of verbose messages.
+ verbmsg_count = self.get_verbmsg_count()
+
+ self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
if __name__ == "__main__":
unittest.main()