2 # see LICENSE.txt for license information
7 from base64 import encodestring, decodestring
9 from traceback import print_exc, print_stack
11 from BaseLib.Core.simpledefs import INFOHASH_LENGTH
12 from BaseLib.__init__ import LIBRARYNAME
13 from BaseLib.Core.Utilities.unicode import dunno2unicode
15 # ONLY USE APSW >= 3.5.9-r1
17 #support_version = (3,5,9)
18 #support_version = (3,3,13)
19 #apsw_version = tuple([int(r) for r in apsw.apswversion().split('-')[0].split('.')])
21 #assert apsw_version >= support_version, "Required APSW Version >= %d.%d.%d."%support_version + " But your version is %d.%d.%d.\n"%apsw_version + \
22 # "Please download and install it from http://code.google.com/p/apsw/"
24 ##Changed from 4 to 5 by andrea for subtitles support
25 CURRENT_MAIN_DB_VERSION = 5
27 TEST_SQLITECACHEDB_UPGRADE = False
28 CREATE_SQL_FILE = None
29 CREATE_SQL_FILE_POSTFIX = os.path.join(LIBRARYNAME, 'schema_sdb_v'+str(CURRENT_MAIN_DB_VERSION)+'.sql')
30 DB_FILE_NAME = 'tribler.sdb'
31 DB_DIR_NAME = 'sqlite' # db file path = DB_DIR_NAME/DB_FILE_NAME
32 DEFAULT_BUSY_TIMEOUT = 10000
33 MAX_SQL_BATCHED_TO_TRANSACTION = 1000 # don't change it unless carefully tested. A transaction with 1000 batched updates took 1.5 seconds
36 SHOW_ALL_EXECUTE = False
46 class Warning(Exception):
49 def init(config, db_exception_handler = None):
50 """ create sqlite database """
51 global CREATE_SQL_FILE
55 torrent_dir = os.path.abspath(config['torrent_collecting_dir'])
56 config_dir = config['state_dir']
57 install_dir = config['install_dir']
58 CREATE_SQL_FILE = os.path.join(install_dir,CREATE_SQL_FILE_POSTFIX)
59 sqlitedb = SQLiteCacheDB.getInstance(db_exception_handler)
61 if config['superpeer']:
62 sqlite_db_path = ':memory:'
64 sqlite_db_path = os.path.join(config_dir, DB_DIR_NAME, DB_FILE_NAME)
65 print >>sys.stderr,"cachedb: init: SQL FILE",sqlite_db_path
67 icon_dir = os.path.abspath(config['peer_icon_path'])
69 sqlitedb.initDB(sqlite_db_path, CREATE_SQL_FILE) # the first place to create db in Tribler
73 SQLiteCacheDB.getInstance().close()
75 def make_filename(config_dir,filename):
76 if config_dir is None:
79 return os.path.join(config_dir,filename)
83 return encodestring(bin).replace("\n","")
86 return decodestring(str)
90 Print the usual traceback information, followed by a listing of all the
91 local variables in each frame.
92 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52215
93 http://initd.org/pub/software/pysqlite/apsw/3.3.13-r1/apsw.html#augmentedstacktraces
96 tb = sys.exc_info()[2]
100 stack.append(tb.tb_frame)
104 print >> sys.stderr, "Locals by frame, innermost last"
108 print >> sys.stderr, "Frame %s in %s at line %s" % (frame.f_code.co_name,
109 frame.f_code.co_filename,
111 for key, value in frame.f_locals.items():
112 print >> sys.stderr, "\t%20s = " % key,
113 #We have to be careful not to cause a new error in our error
114 #printer! Calling str() on an unknown object could cause an
115 #error we don't want.
117 print >> sys.stderr, value
119 print >> sys.stderr, "<ERROR WHILE PRINTING VALUE>"
121 class safe_dict(dict):
122 def __init__(self, *args, **kw):
123 self.lock = threading.RLock()
124 dict.__init__(self, *args, **kw)
126 def __getitem__(self, key):
129 return dict.__getitem__(self, key)
133 def __setitem__(self, key, value):
136 dict.__setitem__(self, key, value)
140 def __delitem__(self, key):
143 dict.__delitem__(self, key)
147 def __contains__(self, key):
150 return dict.__contains__(self, key)
157 return dict.values(self)
161 class SQLiteCacheDBBase:
162 lock = threading.RLock()
164 def __init__(self,db_exception_handler=None):
165 self.exception_handler = db_exception_handler
166 self.cursor_table = safe_dict() # {thread_name:cur}
167 self.cache_transaction_table = safe_dict() # {thread_name:[sql]
168 self.class_variables = safe_dict({'db_path':None,'busytimeout':None}) # busytimeout is in milliseconds
170 self.permid_id = safe_dict()
171 self.infohash_id = safe_dict()
172 self.show_execute = False
174 #TODO: All global variables must be protected to be thread safe?
175 self.status_table = None
176 self.category_table = None
177 self.src_table = None
178 self.applied_pragma_sync_norm = False
183 def close(self, clean=False):
184 # only close the connection object in this thread, don't close other thread's connection object
185 thread_name = threading.currentThread().getName()
186 cur = self.getCursor(create=False)
189 con = cur.getconnection()
193 del self.cursor_table[thread_name]
194 # Arno, 2010-01-25: Remove entry in cache_transaction_table for this thread
196 if thread_name in self.cache_transaction_table.keys():
197 del self.cache_transaction_table[thread_name]
200 if clean: # used for test suite
201 self.permid_id = safe_dict()
202 self.infohash_id = safe_dict()
203 self.exception_handler = None
204 self.class_variables = safe_dict({'db_path':None,'busytimeout':None})
205 self.cursor_table = safe_dict()
206 self.cache_transaction_table = safe_dict()
209 # --------- static functions --------
210 def getCursor(self, create=True):
211 thread_name = threading.currentThread().getName()
212 curs = self.cursor_table
213 cur = curs.get(thread_name, None) # return [cur, cur, lib] or None
214 #print >> sys.stderr, '-------------- getCursor::', len(curs), time(), curs.keys()
215 if cur is None and create:
216 self.openDB(self.class_variables['db_path'], self.class_variables['busytimeout']) # create a new db obj for this thread
217 cur = curs.get(thread_name)
221 def openDB(self, dbfile_path=None, busytimeout=DEFAULT_BUSY_TIMEOUT):
223 Open a SQLite database. Only one and the same database can be opened.
224 @dbfile_path The path to store the database file.
225 Set dbfile_path=':memory:' to create a db in memory.
226 @busytimeout Set the maximum time, in milliseconds, that SQLite will wait if the database is locked.
229 # already opened a db in this thread, reuse it
230 thread_name = threading.currentThread().getName()
231 #print >>sys.stderr,"sqlcachedb: openDB",dbfile_path,thread_name
232 if thread_name in self.cursor_table:
233 #assert dbfile_path == None or self.class_variables['db_path'] == dbfile_path
234 return self.cursor_table[thread_name]
236 assert dbfile_path, "You must specify the path of database file"
238 if dbfile_path.lower() != ':memory:':
239 db_dir,db_filename = os.path.split(dbfile_path)
240 if db_dir and not os.path.isdir(db_dir):
243 con = apsw.Connection(dbfile_path)
244 con.setbusytimeout(busytimeout)
247 self.cursor_table[thread_name] = cur
249 if not self.applied_pragma_sync_norm:
250 # http://www.sqlite.org/pragma.html
251 # When synchronous is NORMAL, the SQLite database engine will still
252 # pause at the most critical moments, but less often than in FULL
253 # mode. There is a very small (though non-zero) chance that a power
254 # failure at just the wrong time could corrupt the database in
255 # NORMAL mode. But in practice, you are more likely to suffer a
256 # catastrophic disk failure or some other unrecoverable hardware
259 self.applied_pragma_sync_norm = True
260 cur.execute("PRAGMA synchronous = NORMAL;")
264 def createDBTable(self, sql_create_table, dbfile_path, busytimeout=DEFAULT_BUSY_TIMEOUT):
266 Create a SQLite database.
267 @sql_create_table The sql statements to create tables in the database.
268 Every statement must end with a ';'.
269 @dbfile_path The path to store the database file. Set dbfile_path=':memory:' to creates a db in memory.
270 @busytimeout Set the maximum time, in milliseconds, that SQLite will wait if the database is locked.
271 Default = 10000 milliseconds
273 cur = self.openDB(dbfile_path, busytimeout)
275 cur.execute(sql_create_table) # it is suggested to include begin & commit in the script
277 def initDB(self, sqlite_filepath,
278 create_sql_filename = None,
279 busytimeout = DEFAULT_BUSY_TIMEOUT,
280 check_version = True,
281 current_db_version = CURRENT_MAIN_DB_VERSION):
283 Create and initialize a SQLite database given a sql script.
284 Only one db can be opened. If the given dbfile_path is different with the opened DB file, warn and exit
285 @configure_dir The directory containing 'bsddb' directory
286 @sql_filename The path of sql script to create the tables in the database
287 Every statement must end with a ';'.
288 @busytimeout Set the maximum time, in milliseconds, to wait and retry
289 if failed to acquire a lock. Default = 5000 milliseconds
291 if create_sql_filename is None:
292 create_sql_filename=CREATE_SQL_FILE
296 # verify db path identity
297 class_db_path = self.class_variables['db_path']
298 if sqlite_filepath is None: # reuse the opened db file?
299 if class_db_path is not None: # yes, reuse it
300 # reuse the busytimeout
301 return self.openDB(class_db_path, self.class_variables['busytimeout'])
302 else: # no db file opened
303 raise Exception, "You must specify the path of database file when open it at the first time"
305 if class_db_path is None: # the first time to open db path, store it
309 # open the db if it exists (by converting from bsd) and is not broken, otherwise create a new one
310 # it will update the db if necessary by checking the version number
311 self.safelyOpenTriblerDB(sqlite_filepath, create_sql_filename, busytimeout, check_version=check_version, current_db_version=current_db_version)
313 self.class_variables = {'db_path': sqlite_filepath, 'busytimeout': int(busytimeout)}
315 return self.openDB() # return the cursor, won't reopen the db
317 elif sqlite_filepath != class_db_path: # not the first time to open db path, check if it is the same
318 raise Exception, "Only one database file can be opened. You have opened %s and are trying to open %s." % (class_db_path, sqlite_filepath)
323 def safelyOpenTriblerDB(self, dbfile_path, sql_create, busytimeout=DEFAULT_BUSY_TIMEOUT, check_version=False, current_db_version=None):
325 open the db if possible, otherwise create a new one
326 update the db if necessary by checking the version number
330 if sqlite db doesn't exist:
333 read sqlite_db_version
334 if sqlite_db_version dosen't exist:
337 close and delete sqlite db if possible
338 create new sqlite db file without sqlite_db_version
339 write sqlite_db_version at last
342 read sqlite_db_version
343 # must ensure these steps after except will not fail, otherwise force to exit
345 if sqlite_db_version < current_db_version:
346 updateDB(sqlite_db_version, current_db_version)
348 update sqlite_db_version at last
352 if not os.path.isfile(dbfile_path):
353 raise Warning("No existing database found. Attempting to creating a new database %s" % repr(dbfile_path))
355 cur = self.openDB(dbfile_path, busytimeout)
357 sqlite_db_version = self.readDBVersion()
358 if sqlite_db_version == NULL or int(sqlite_db_version)<1:
359 raise NotImplementedError
360 except Exception, exception:
361 if isinstance(exception, Warning):
362 # user friendly warning to log the creation of a new database
363 print >>sys.stderr, exception
366 # user unfriendly exception message because something went wrong
369 if os.path.isfile(dbfile_path):
370 self.close(clean=True)
371 os.remove(dbfile_path)
373 if os.path.isfile(sql_create):
375 sql_create_tables = f.read()
378 raise Exception, "Cannot open sql script at %s" % os.path.realpath(sql_create)
380 self.createDBTable(sql_create_tables, dbfile_path, busytimeout)
382 sqlite_db_version = self.readDBVersion()
385 self.checkDB(sqlite_db_version, current_db_version)
387 def checkDB(self, db_ver, curr_ver):
388 # read MyDB and check the version number.
389 if not db_ver or not curr_ver:
390 self.updateDB(db_ver,curr_ver)
393 curr_ver = int(curr_ver)
394 #print "check db", db_ver, curr_ver
395 if db_ver != curr_ver or \
396 (not config_dir is None and os.path.exists(os.path.join(config_dir, "upgradingdb.txt"))):
397 self.updateDB(db_ver,curr_ver)
399 def updateDB(self,db_ver,curr_ver):
402 def readDBVersion(self):
403 cur = self.getCursor()
404 sql = u"select value from MyInfo where entry='version'"
405 res = self.fetchone(sql)
408 return find[0] # throw error if something wrong
412 def writeDBVersion(self, version, commit=True):
413 sql = u"UPDATE MyInfo SET value=? WHERE entry='version'"
414 self.execute_write(sql, [version], commit=commit)
416 def show_sql(self, switch):
417 # temporary show the sql executed
418 self.show_execute = switch
420 # --------- generic functions -------------
425 def _execute(self, sql, args=None):
426 cur = self.getCursor()
428 if SHOW_ALL_EXECUTE or self.show_execute:
429 thread_name = threading.currentThread().getName()
430 print >> sys.stderr, '===', thread_name, '===\n', sql, '\n-----\n', args, '\n======\n'
433 return cur.execute(sql)
435 return cur.execute(sql, args)
436 except Exception, msg:
440 print >> sys.stderr, "cachedb: execute error:", Exception, msg
441 thread_name = threading.currentThread().getName()
442 print >> sys.stderr, '===', thread_name, '===\nSQL Type:', type(sql), '\n-----\n', sql, '\n-----\n', args, '\n======\n'
444 # ARNODB: this is incorrect, it should reraise the exception
445 # such that _transaction can rollback or recommit.
446 # This bug already reported by Johan
450 def execute_read(self, sql, args=None):
451 # this is only called for reading. If you want to write the db, always use execute_write or executemany
452 return self._execute(sql, args)
454 def execute_write(self, sql, args=None, commit=True):
455 self.cache_transaction(sql, args)
459 def executemany(self, sql, args, commit=True):
461 thread_name = threading.currentThread().getName()
462 if thread_name not in self.cache_transaction_table:
463 self.cache_transaction_table[thread_name] = []
464 all = [(sql, arg) for arg in args]
465 self.cache_transaction_table[thread_name].extend(all)
470 def cache_transaction(self, sql, args=None):
471 thread_name = threading.currentThread().getName()
472 if thread_name not in self.cache_transaction_table:
473 self.cache_transaction_table[thread_name] = []
474 self.cache_transaction_table[thread_name].append((sql, args))
476 def transaction(self, sql=None, args=None):
478 self.cache_transaction(sql, args)
480 thread_name = threading.currentThread().getName()
485 sql_queue = self.cache_transaction_table.get(thread_name,None)
489 _sql,_args = sql_queue.pop(0)
496 if not _sql.endswith(';'):
498 sql_full += _sql + '\n'
500 arg_list += list(_args)
503 # if too many sql in cache, split them into batches to prevent processing and locking DB for a long time
504 # TODO: optimize the value of MAX_SQL_BATCHED_TO_TRANSACTION
505 if n % MAX_SQL_BATCHED_TO_TRANSACTION == 0:
506 self._transaction(sql_full, arg_list)
510 self._transaction(sql_full, arg_list)
512 def _transaction(self, sql, args=None):
514 sql = 'BEGIN TRANSACTION; \n' + sql + 'COMMIT TRANSACTION;'
516 self._execute(sql, args)
518 self.commit_retry_if_busy_or_rollback(e,0,sql=sql)
520 def commit_retry_if_busy_or_rollback(self,e,tries,sql=None):
523 SQL_BUSY errors happen at the beginning of the experiment,
524 very quickly after startup (e.g. 0.001 s), so the busy timeout
525 is not honoured for some reason. After the initial errors,
526 they no longer occur.
528 print >>sys.stderr,"sqlcachedb: commit_retry: after",str(e),repr(sql)
530 if str(e).startswith("BusyError"):
532 self._execute("COMMIT")
534 if tries < 5: #self.max_commit_retries
535 # Spec is unclear whether next commit will also has
536 # 'busytimeout' seconds to try to get a write lock.
537 sleep(pow(2.0,tries+2)/100.0)
538 self.commit_retry_if_busy_or_rollback(e2,tries+1)
544 m = "cachedb: TRANSACTION ERROR "+threading.currentThread().getName()+' '+str(e)
548 def rollback(self, tries):
551 self._execute("ROLLBACK")
553 # May be harmless, see above. Unfortunately they don't specify
554 # what the error is when an attempt is made to roll back
555 # an automatically rolled back transaction.
556 m = "cachedb: ROLLBACK ERROR "+threading.currentThread().getName()+' '+str(e)
557 #print >> sys.stderr, 'SQLite Database', m
561 # -------- Write Operations --------
562 def insert(self, table_name, commit=True, **argv):
564 sql = 'INSERT INTO %s (%s) VALUES (?);'%(table_name, argv.keys()[0])
566 questions = '?,'*len(argv)
567 sql = 'INSERT INTO %s %s VALUES (%s);'%(table_name, tuple(argv.keys()), questions[:-1])
568 self.execute_write(sql, argv.values(), commit)
570 def insertMany(self, table_name, values, keys=None, commit=True):
571 """ values must be a list of tuples """
573 questions = u'?,'*len(values[0])
575 sql = u'INSERT INTO %s VALUES (%s);'%(table_name, questions[:-1])
577 sql = u'INSERT INTO %s %s VALUES (%s);'%(table_name, tuple(keys), questions[:-1])
578 self.executemany(sql, values, commit=commit)
580 def update(self, table_name, where=None, commit=True, **argv):
581 sql = u'UPDATE %s SET '%table_name
583 for k,v in argv.iteritems():
585 sql += u'%s %s ?,' % (k, v[0])
592 sql += u' where %s'%where
593 self.execute_write(sql, arg, commit)
595 def delete(self, table_name, commit=True, **argv):
596 sql = u'DELETE FROM %s WHERE '%table_name
598 for k,v in argv.iteritems():
600 sql += u'%s %s ? AND ' % (k, v[0])
603 sql += u'%s=? AND ' % k
606 self.execute_write(sql, argv.values(), commit)
608 # -------- Read Operations --------
609 def size(self, table_name):
610 num_rec_sql = u"SELECT count(*) FROM %s;"%table_name
611 result = self.fetchone(num_rec_sql)
614 def fetchone(self, sql, args=None):
615 # returns NULL: if the result is null
616 # return None: if it doesn't found any match results
617 find = self.execute_read(sql, args)
631 def fetchall(self, sql, args=None, retry=0):
632 res = self.execute_read(sql, args)
637 return [] # should it return None?
639 def getOne(self, table_name, value_name, where=None, conj='and', **kw):
640 """ value_name could be a string, a tuple of strings, or '*'
643 if isinstance(value_name, tuple):
644 value_names = u",".join(value_name)
645 elif isinstance(value_name, list):
646 value_names = u",".join(value_name)
648 value_names = value_name
650 if isinstance(table_name, tuple):
651 table_names = u",".join(table_name)
652 elif isinstance(table_name, list):
653 table_names = u",".join(table_name)
655 table_names = table_name
657 sql = u'select %s from %s'%(value_names, table_names)
667 for k,v in kw.iteritems():
674 sql += u' %s %s ? ' % (k, operator)
676 sql = sql[:-len(conj)]
680 # print >> sys.stderr, 'SQL: %s %s' % (sql, arg)
681 return self.fetchone(sql,arg)
683 def getAll(self, table_name, value_name, where=None, group_by=None, having=None, order_by=None, limit=None, offset=None, conj='and', **kw):
684 """ value_name could be a string, or a tuple of strings
685 order by is represented as order_by
686 group by is represented as group_by
688 if isinstance(value_name, tuple):
689 value_names = u",".join(value_name)
690 elif isinstance(value_name, list):
691 value_names = u",".join(value_name)
693 value_names = value_name
695 if isinstance(table_name, tuple):
696 table_names = u",".join(table_name)
697 elif isinstance(table_name, list):
698 table_names = u",".join(table_name)
700 table_names = table_name
702 sql = u'select %s from %s'%(value_names, table_names)
712 for k,v in kw.iteritems():
720 sql += u' %s %s ?' % (k, operator)
722 sql = sql[:-len(conj)]
727 sql += u' group by ' + group_by
729 sql += u' having ' + having
731 sql += u' order by ' + order_by # you should add desc after order_by to reversely sort, i.e, 'last_seen desc' as order_by
733 sql += u' limit %d'%limit
735 sql += u' offset %d'%offset
738 return self.fetchall(sql, arg) or []
739 except Exception, msg:
740 print >> sys.stderr, "sqldb: Wrong getAll sql statement:", sql
743 # ----- Tribler DB operations ----
745 #------------- useful functions for multiple handlers ----------
746 def insertPeer(self, permid, update=True, commit=True, **argv):
747 """ Insert a peer. permid is the binary permid.
748 If the peer is already in db and update is True, update the peer.
750 peer_id = self.getPeerID(permid)
753 argv['name'] = dunno2unicode(argv['name'])
757 where=u'peer_id=%d'%peer_id
758 self.update('Peer', where, commit=commit, **argv)
760 self.insert('Peer', permid=bin2str(permid), commit=commit, **argv)
763 def deletePeer(self, permid=None, peer_id=None, force=True, commit=True):
765 peer_id = self.getPeerID(permid)
770 self.delete('Peer', peer_id=peer_id, commit=commit)
772 self.delete('Peer', peer_id=peer_id, friend=0, superpeer=0, commit=commit)
773 deleted = not self.hasPeer(permid, check_db=True)
774 if deleted and permid in self.permid_id:
775 self.permid_id.pop(permid)
779 def getPeerID(self, permid):
780 assert isinstance(permid, str), permid
781 # permid must be binary
782 if permid in self.permid_id:
783 return self.permid_id[permid]
785 sql_get_peer_id = "SELECT peer_id FROM Peer WHERE permid==?"
786 peer_id = self.fetchone(sql_get_peer_id, (bin2str(permid),))
788 self.permid_id[permid] = peer_id
792 def hasPeer(self, permid, check_db=False):
794 return bool(self.getPeerID(permid))
796 permid_str = bin2str(permid)
797 sql_get_peer_id = "SELECT peer_id FROM Peer WHERE permid==?"
798 peer_id = self.fetchone(sql_get_peer_id, (permid_str,))
804 def insertInfohash(self, infohash, check_dup=False, commit=True):
805 """ Insert an infohash. infohash is binary """
806 assert isinstance(infohash, str), "INFOHASH has invalid type: %s" % type(infohash)
807 assert len(infohash) == INFOHASH_LENGTH, "INFOHASH has invalid length: %d" % len(infohash)
808 if infohash in self.infohash_id:
810 print >> sys.stderr, 'sqldb: infohash to insert already exists', `infohash`
813 infohash_str = bin2str(infohash)
814 sql_insert_torrent = "INSERT INTO Torrent (infohash) VALUES (?)"
815 self.execute_write(sql_insert_torrent, (infohash_str,), commit)
817 def deleteInfohash(self, infohash=None, torrent_id=None, commit=True):
818 assert infohash is None or isinstance(infohash, str), "INFOHASH has invalid type: %s" % type(infohash)
819 assert infohash is None or len(infohash) == INFOHASH_LENGTH, "INFOHASH has invalid length: %d" % len(infohash)
820 if torrent_id is None:
821 torrent_id = self.getTorrentID(infohash)
823 if torrent_id != None:
824 self.delete('Torrent', torrent_id=torrent_id, commit=commit)
825 if infohash in self.infohash_id:
826 self.infohash_id.pop(infohash)
828 def getTorrentID(self, infohash):
829 assert isinstance(infohash, str), "INFOHASH has invalid type: %s" % type(infohash)
830 assert len(infohash) == INFOHASH_LENGTH, "INFOHASH has invalid length: %d" % len(infohash)
831 if infohash in self.infohash_id:
832 return self.infohash_id[infohash]
834 sql_get_torrent_id = "SELECT torrent_id FROM Torrent WHERE infohash==?"
835 tid = self.fetchone(sql_get_torrent_id, (bin2str(infohash),))
837 self.infohash_id[infohash] = tid
840 def getInfohash(self, torrent_id):
841 sql_get_infohash = "SELECT infohash FROM Torrent WHERE torrent_id==?"
843 ret = self.fetchone(sql_get_infohash, arg)
847 def getTorrentStatusTable(self):
848 if self.status_table is None:
849 st = self.getAll('TorrentStatus', ('lower(name)', 'status_id'))
850 self.status_table = dict(st)
851 return self.status_table
853 def getTorrentCategoryTable(self):
854 # The key is in lower case
855 if self.category_table is None:
856 ct = self.getAll('Category', ('lower(name)', 'category_id'))
857 self.category_table = dict(ct)
858 return self.category_table
860 def getTorrentSourceTable(self):
861 # Don't use lower case because some URLs are case sensitive
862 if self.src_table is None:
863 st = self.getAll('TorrentSource', ('name', 'source_id'))
864 self.src_table = dict(st)
865 return self.src_table
868 res1 = self.getAll('Category', '*')
869 res2 = len(self.getAll('Peer', 'name', 'name is not NULL'))
873 class SQLiteCacheDBV5(SQLiteCacheDBBase):
874 def updateDB(self, fromver, tover):
876 # bring database up to version 2, if necessary
880 -- Patch for BuddyCast 4
882 ALTER TABLE MyPreference ADD COLUMN click_position INTEGER DEFAULT -1;
883 ALTER TABLE MyPreference ADD COLUMN reranking_strategy INTEGER DEFAULT -1;
884 ALTER TABLE Preference ADD COLUMN click_position INTEGER DEFAULT -1;
885 ALTER TABLE Preference ADD COLUMN reranking_strategy INTEGER DEFAULT -1;
886 CREATE TABLE ClicklogSearch (
887 peer_id INTEGER DEFAULT 0,
888 torrent_id INTEGER DEFAULT 0,
889 term_id INTEGER DEFAULT 0,
890 term_order INTEGER DEFAULT 0
892 CREATE INDEX idx_search_term ON ClicklogSearch (term_id);
893 CREATE INDEX idx_search_torrent ON ClicklogSearch (torrent_id);
896 CREATE TABLE ClicklogTerm (
897 term_id INTEGER PRIMARY KEY AUTOINCREMENT DEFAULT 0,
898 term VARCHAR(255) NOT NULL,
899 times_seen INTEGER DEFAULT 0 NOT NULL
901 CREATE INDEX idx_terms_term ON ClicklogTerm(term);
905 self.execute_write(sql, commit=False)
910 -- Patch for Local Peer Discovery
912 ALTER TABLE Peer ADD COLUMN is_local integer DEFAULT 0;
914 self.execute_write(sql, commit=False)
918 -- V2: Patch for VoteCast
920 DROP TABLE IF EXISTS ModerationCast;
921 DROP INDEX IF EXISTS moderationcast_idx;
923 DROP TABLE IF EXISTS Moderators;
924 DROP INDEX IF EXISTS moderators_idx;
926 DROP TABLE IF EXISTS VoteCast;
927 DROP INDEX IF EXISTS votecast_idx;
929 CREATE TABLE VoteCast (
936 CREATE INDEX mod_id_idx
940 CREATE INDEX voter_id_idx
944 CREATE UNIQUE INDEX votecast_idx
948 --- patch for BuddyCast 5 : Creation of Popularity table and relevant stuff
950 CREATE TABLE Popularity (
953 msg_receive_time NUMERIC,
954 size_calc_age NUMERIC,
955 num_seeders INTEGER DEFAULT 0,
956 num_leechers INTEGER DEFAULT 0,
957 num_of_sources INTEGER DEFAULT 0
960 CREATE INDEX Message_receive_time_idx
964 CREATE INDEX Size_calc_age_idx
968 CREATE INDEX Number_of_seeders_idx
972 CREATE INDEX Number_of_leechers_idx
976 CREATE UNIQUE INDEX Popularity_idx
978 (torrent_id, peer_id, msg_receive_time);
980 -- v4: Patch for ChannelCast, Search
982 CREATE TABLE ChannelCast (
992 CREATE INDEX pub_id_idx
996 CREATE INDEX pub_name_idx
1000 CREATE INDEX infohash_ch_idx
1004 ----------------------------------------
1006 CREATE TABLE InvertedIndex (
1011 CREATE INDEX word_idx
1015 CREATE UNIQUE INDEX invertedindex_idx
1019 ----------------------------------------
1021 -- Set all similarity to zero because we are using a new similarity
1022 -- function and the old values no longer correspond to the new ones
1023 UPDATE Peer SET similarity = 0;
1024 UPDATE Torrent SET relevance = 0;
1027 self.execute_write(sql, commit=False)
1031 --------------------------------------
1032 -- Creating Subtitles (future RichMetadata) DB
1033 ----------------------------------
1034 CREATE TABLE Metadata (
1035 metadata_id integer PRIMARY KEY ASC AUTOINCREMENT NOT NULL,
1036 publisher_id text NOT NULL,
1037 infohash text NOT NULL,
1039 timestamp integer NOT NULL,
1040 signature text NOT NULL,
1041 UNIQUE (publisher_id, infohash),
1042 FOREIGN KEY (publisher_id, infohash)
1043 REFERENCES ChannelCast(publisher_id, infohash)
1044 ON DELETE CASCADE -- the fk constraint is not enforced by sqlite
1047 CREATE INDEX infohash_md_idx
1048 on Metadata(infohash);
1050 CREATE INDEX pub_md_idx
1051 on Metadata(publisher_id);
1054 CREATE TABLE Subtitles (
1055 metadata_id_fk integer,
1056 subtitle_lang text NOT NULL,
1057 subtitle_location text,
1058 checksum text NOT NULL,
1059 UNIQUE (metadata_id_fk,subtitle_lang),
1060 FOREIGN KEY (metadata_id_fk)
1061 REFERENCES Metadata(metadata_id)
1062 ON DELETE CASCADE, -- the fk constraint is not enforced by sqlite
1064 -- ISO639-2 uses 3 characters for lang codes
1065 CONSTRAINT lang_code_length
1066 CHECK ( length(subtitle_lang) == 3 )
1070 CREATE INDEX metadata_sub_idx
1071 on Subtitles(metadata_id_fk);
1073 -- Stores the subtitles that peers have as an integer bitmask
1074 CREATE TABLE SubtitlesHave (
1075 metadata_id_fk integer,
1076 peer_id text NOT NULL,
1077 have_mask integer NOT NULL,
1078 received_ts integer NOT NULL, --timestamp indicating when the mask was received
1079 UNIQUE (metadata_id_fk, peer_id),
1080 FOREIGN KEY (metadata_id_fk)
1081 REFERENCES Metadata(metadata_id)
1082 ON DELETE CASCADE, -- the fk constraint is not enforced by sqlite
1084 -- 32 bit unsigned integer
1085 CONSTRAINT have_mask_length
1086 CHECK (have_mask >= 0 AND have_mask < 4294967296)
1089 CREATE INDEX subtitles_have_idx
1090 on SubtitlesHave(metadata_id_fk);
1092 -- this index can boost queries
1093 -- ordered by timestamp on the SubtitlesHave DB
1094 CREATE INDEX subtitles_have_ts
1095 on SubtitlesHave(received_ts);
1098 self.execute_write(sql, commit=False)
1100 # updating version stepwise so if this works, we store it
1101 # regardless of later, potentially failing updates
1102 self.writeDBVersion(CURRENT_MAIN_DB_VERSION, commit=False)
1105 # now the start the process of parsing the torrents to insert into
1106 # InvertedIndex table.
1107 if TEST_SQLITECACHEDB_UPGRADE:
1110 from BaseLib.Core.Session import Session
1111 session = Session.get_instance()
1112 state_dir = session.get_state_dir()
1113 tmpfilename = os.path.join(state_dir,"upgradingdb.txt")
1114 if fromver < 4 or os.path.exists(tmpfilename):
1115 def upgradeTorrents():
1116 # fetch some un-inserted torrents to put into the InvertedIndex
1118 SELECT torrent_id, name, torrent_file_name
1120 WHERE torrent_id NOT IN (SELECT DISTINCT torrent_id FROM InvertedIndex)
1121 AND torrent_file_name IS NOT NULL
1123 records = self.fetchall(sql)
1125 if len(records) == 0:
1126 # upgradation is complete and hence delete the temp file
1127 os.remove(tmpfilename)
1128 if DEBUG: print >> sys.stderr, "DB Upgradation: temp-file deleted", tmpfilename
1131 for torrent_id, name, torrent_file_name in records:
1133 abs_filename = os.path.join(session.get_torrent_collecting_dir(), torrent_file_name)
1134 if not os.path.exists(abs_filename):
1135 raise RuntimeError(".torrent file not found. Use fallback.")
1136 torrentdef = TorrentDef.load(abs_filename)
1137 torrent_name = torrentdef.get_name_as_unicode()
1138 keywords = Set(split_into_keywords(torrent_name))
1139 for filename in torrentdef.get_files_as_unicode():
1140 keywords.update(split_into_keywords(filename))
1143 # failure... most likely the .torrent file
1146 # use keywords from the torrent name
1147 # stored in the database
1148 torrent_name = dunno2unicode(name)
1149 keywords = Set(split_into_keywords(torrent_name))
1151 # store the keywords in the InvertedIndex
1152 # table in the database
1153 if len(keywords) > 0:
1154 values = [(keyword, torrent_id) for keyword in keywords]
1155 self.executemany(u"INSERT OR REPLACE INTO InvertedIndex VALUES(?, ?)", values, commit=False)
1157 print >> sys.stderr, "DB Upgradation: Extending the InvertedIndex table with", len(values), "new keywords for", torrent_name
1159 # now commit, after parsing the batch of torrents
1162 # upgradation not yet complete; comeback after 5 sec
1163 tqueue.add_task(upgradeTorrents, 5)
1166 # Create an empty file to mark the process of upgradation.
1167 # In case this process is terminated before completion of upgradation,
1168 # this file remains even though fromver >= 4 and hence indicating that
1169 # rest of the torrents need to be inserted into the InvertedIndex!
1171 # ensure the temp-file is created, if it is not already
1173 open(tmpfilename, "w")
1174 if DEBUG: print >> sys.stderr, "DB Upgradation: temp-file successfully created"
1176 if DEBUG: print >> sys.stderr, "DB Upgradation: failed to create temp-file"
1178 if DEBUG: print >> sys.stderr, "Upgrading DB .. inserting into InvertedIndex"
1179 from BaseLib.Utilities.TimedTaskQueue import TimedTaskQueue
1180 from sets import Set
1181 from BaseLib.Core.Search.SearchManager import split_into_keywords
1182 from BaseLib.Core.TorrentDef import TorrentDef
1184 # start the upgradation after 10 seconds
1185 tqueue = TimedTaskQueue("UpgradeDB")
1186 tqueue.add_task(upgradeTorrents, 10)
1188 class SQLiteCacheDB(SQLiteCacheDBV5):
1189 __single = None # used for multithreaded singletons pattern
1192 def getInstance(cls, *args, **kw):
1193 # Singleton pattern with double-checking to ensure that it can only create one object
1194 if cls.__single is None:
1197 if cls.__single is None:
1198 cls.__single = cls(*args, **kw)
1199 #print >>sys.stderr,"SqliteCacheDB: getInstance: created is",cls,cls.__single
1204 def __init__(self, *args, **kargs):
1205 # always use getInstance() to create this object
1207 # ARNOCOMMENT: why isn't the lock used on this read?!
1209 if self.__single != None:
1210 raise RuntimeError, "SQLiteCacheDB is singleton"
1211 SQLiteCacheDBBase.__init__(self, *args, **kargs)
1213 if __name__ == '__main__':
1214 configure_dir = sys.argv[1]
1216 config['state_dir'] = configure_dir
1217 config['install_dir'] = u'.'
1218 config['peer_icon_path'] = u'.'
1219 sqlite_test = init(config)